Skip to content

Do not kill segments with referenced load specs from deep storage#16667

Merged
AmatyaAvadhanula merged 20 commits intoapache:masterfrom
AmatyaAvadhanula:segment_kill_with_upgrades
Jul 15, 2024
Merged

Do not kill segments with referenced load specs from deep storage#16667
AmatyaAvadhanula merged 20 commits intoapache:masterfrom
AmatyaAvadhanula:segment_kill_with_upgrades

Conversation

@AmatyaAvadhanula
Copy link
Copy Markdown
Contributor

@AmatyaAvadhanula AmatyaAvadhanula commented Jun 27, 2024

Description

Druid now allows upgrades of segments to higher versions with the same load specs. This was introduced as part of concurrent append and replace and may have other uses.

When a load spec is shared by several segments it is important that the deep storage location is killed only after all the metadata references to it are nuked. This is currently being handled only for used segments.

Changes

This PR ensures that every used and unused segment reference is considered before killing files on deep storage.

  1. Track lineage of segment upgrades by adding a new column: upgraded_from_segment_id:
  • Add a new column to druid_segments table in the metadata store: upgraded_from_segment_id, which indicates the root ancestor to which the load spec originally belonged.

  • Modify SegmentTransactionalAppendAction and SegmentTransactionalReplaceAction to populate the upgraded_from_segment_id and add tests.

  1. Modify kill tasks to handle references to load specs:
  • Fire a task action to determine segment upgrade metadata for the batch prior to nuking
  • Nuke segments in batch
  • Fire task action to fetch all references to self ids, and upgraded ids fetched previously
  • Determine segments with unreferenced load specs
  • Delete determined subset from deep storage

Release note

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@kfaraz kfaraz marked this pull request as ready for review June 28, 2024 06:59
Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AmatyaAvadhanula , thanks for addressing this issue!

While updating the SegmentNukeAction does make the change very simple, it is semantically incorrect. Any task action should just be expected to return the result of the performed action. It is very difficult to reason about a segment delete operation returning "root segment IDs that are unreferenced".

Ideally, we would want the kill task to do something like this:

  1. In every iteration of the for loop inside the kill task, identify the segments to be nuked.
  2. (new task action) Determine the root segment IDs of the segments to be nuked.
  3. Nuke the segments.
  4. (new task action) Determine which root segment IDs are still referenced.
  5. Filter out the results and kill from deep storage only the load specs for the unreferenced IDs.

IIUC, the current set of changes is trying to perform 2, 3 and 4 in the same action, which would lead to confusion and sub-optimal performances.

No. 2 need not be a separate action. We can update the RetrieveUnusedSegmentsAction to return a new DataSegmentWithRootId class, which has all the fields of DataSegment class along with a nullable field rootSegmentId. (We could even just add the new field to DataSegment class. I am okay with that too, as it would be null all the time except here.)

No. 4 has to be a separate action though.

Rolling upgrades
When the overlord is on an older version, 2 will return null (if we reuse RetrieveUnusedSegmentsAction) and 4 will throw an exception. In this case, we just fallback to doing what we do today.

Nomenclature
Let's go withupgraded_from_segment_id instead as it is already being used with pending segments. Its meaning is also clear since "upgrade" is now a core concept.

Let me know what you think.

@AmatyaAvadhanula AmatyaAvadhanula requested a review from kfaraz June 28, 2024 17:03
Comment thread server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java Outdated
Copy link
Copy Markdown
Contributor

@abhishekrb19 abhishekrb19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the addition of a new column root_segment_id in the segments table, are there other use cases outside of kill tasks where this could be useful? I'm also wondering about the storage footprint this additional bookkeeping might cause at scale.

In general, do we have a sense of how often segments get upgraded? "It depends" is a fair answer :) but I'm curious if it's worth tracking this information in a separate table vs adding a new column to the existing segments table.

@AmatyaAvadhanula
Copy link
Copy Markdown
Contributor Author

@kfaraz @abhishekrb19
Thank you for your feedback!

The reason I had made the change within an existing TaskAction besides simplicity was to ensure that segments killed from the metadata store actually get killed from deep storage.

(new task action) Determine the root segment IDs of the segments to be nuked.
Nuke the segments.
(new task action) Determine which root segment IDs are still referenced.

Having multiple actions would mean that a lock revokation between the actions can cause the task to fail after nuking the segments during determination of segments to kill from deep storage. In such a case, segments may not be cleaned up from deep storage.
This would not have happened with a single task action happening within a critical section.
Please do let me know if this understanding is incorrect.


However, I've changed the behaviour so that one can determine the segments to be killed before they are killed, with the assumption that every segment in the batch would be killed.
This can now be broken into a separate task action, and I'll make the change.

@kfaraz
Copy link
Copy Markdown
Contributor

kfaraz commented Jun 29, 2024

Having multiple actions would mean that a lock revokation between the actions can cause the task to fail after nuking the segments during determination of segments to kill from deep storage. In such a case, segments may not be cleaned up from deep storage.

@AmatyaAvadhanula , just to clarify, the new task actions are simple read actions and do not need to check if the relevant segments are covered by locks or not. They do not need to run the SELECT statements in critical sections either. So locks being revoked would have no effect on the action unless the task itself has been killed by the Overlord.

But I am not sure whether the Overlord explicitly kills a task whose locks have been revoked.
And if it does, it could happen in the current implementation too.

@kfaraz
Copy link
Copy Markdown
Contributor

kfaraz commented Jun 29, 2024

@abhishekrb19 , I have tried to address your queries below.

Regarding the addition of a new column root_segment_id in the segments table, are there other use cases outside of kill tasks where this could be useful?

Right now, kill task correctness and debugging are the only two usages of this column.

I'm also wondering about the storage footprint this additional bookkeeping might cause at scale.

True, it is additional bookkeeping but it will become crucial when "concurrent append/replace" is widely adopted and made the default cluster behaviour.

In general, do we have a sense of how often segments get upgraded? "It depends" is a fair answer :) but I'm curious if it's worth tracking this information in a separate table vs adding a new column to the existing segments table.

This column will be populated only when a segment has actually been upgraded from another. That happens when a replace task (say compaction) runs concurrently with an append task (say Kafka ingestion), which will be a fairly common occurrence once "concurrent append and replace" is enabled and skipOffsetFromLatest is set to a low value.

For all non-upgraded segments, this column value would be null.

We would prefer to have this in the druid_segments table as:

  • This column represents core nature of the segment rather than an additional piece of info. It can be thought of as a proxy for the complete load spec, which is part of the segment payload itself.
  • Keeping it in the same table means we don't need to fire another statement in the transaction when we are trying to read or write this value. This value is always written at the time an upgrade segment is INSERTed and typically read with the rest of the segment columns.
  • I did consider adding a separate table but AFAICT, that doesn't affect the metadata store footprint in any way. (Except maybe the index but that should be a fairly small footprint.)

Please let me know if this makes sense and if you have any other concern with adding this column to the druid_segments table.

Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AmatyaAvadhanula , the code seems somewhat more complicated than it needs to be, since you are trying to do two things in the new task action.

You need two separate task actions:

  1. FindUpgradedFromSegmentIds

    Input: Set of segment IDs you want to nuke
    (you can skip on sending the whole DataSegment to reduce payload size)

    Output: Map from input ID to its upgraded_from_segment_id.

    Implementation:
# In batches of 100
SELECT id, upgraded_from_segment_id WHERE id IN (%s)

The kill task must check if the task action returned no mapping for a certain segment ID. In that case, just use the segment ID itself as its upgraded_from_segment_id.

You may even completely skip out on adding the first task action by just adding this map to RetrieveUnusedSegmentsAction itself, the query wiring is already there, I think.

Perform the SegmentNukeAction, and then perform the following action:

  1. FindUpgradedToSegmentIds

    Input: Set of upgraded_from_segment_id (plus self where upgraded_from_segment_id was null) determined in the previous step

    Ouput: Map from input ID to set of its children.

    Implementation:
# In batches of 100, add a condition on datasource too if the index uses it
SELECT id, upgraded_from_segment_id WHERE upgraded_from_segment_id IN (%s)

Now filter out the segment IDs which still have any child and delete the deep storage files for the rest.

@AmatyaAvadhanula AmatyaAvadhanula requested a review from kfaraz July 1, 2024 08:09
Comment thread server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java Outdated
Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updated changes, @AmatyaAvadhanula . Left a few more suggestions.

Comment thread server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java Outdated
Comment thread server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java Outdated
Copy link
Copy Markdown
Contributor

@abhishekrb19 abhishekrb19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This column represents core nature of the segment rather than an additional piece of info. It can be thought of as a proxy for the complete load spec, which is part of the segment payload itself.
Keeping it in the same table means we don't need to fire another statement in the transaction when we are trying to read or write this value. This value is always written at the time an upgrade segment is INSERTed and typically read with the rest of the segment columns.

Okay, makes sense. The access patterns and ease of usage seem to justify having the column in the segments table itself. I see the column is also part of the pending segments table.

Thanks for the clarifications! @kfaraz @AmatyaAvadhanula

import java.util.Objects;

/**
* Information about the id and upgraded from segment id created as a result of segment upgrades
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"ID and upgraded from segment ID" -- for the former, can we mention child segment ID or something to disambiguate the multiple IDs?

// Kill segments from the deep storage only if their load specs are not being used by any used segments
final List<DataSegment> segmentsToBeKilled = unusedSegments
// Kill segments from the deep storage only if their load specs are not being used by any other segments
// We still need to check with used load specs as segment upgrades were introduced before this change
Copy link
Copy Markdown
Contributor

@abhishekrb19 abhishekrb19 Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the concurrent compaction feature is still experimental, do we have the liberty to clean up firing the used segment task action and this additional check? Removing it could potentially lead to a kill correctness issue and leave dangling segments for datasources where concurrent compaction is enabled. It's essentially one less task action, but in terms of the evolution of experimental features, can we afford to break compatibility in some sense?

// upgraded_from_segment_id is the first segment to which the same load spec originally belonged
// Load specs can be shared as a result of segment version upgrade
// This column is null for segments that haven't been upgraded.
columnNameTypes.put("upgraded_from_segment_id", "VARCHAR(255)");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For completeness, should we also add a check in validateSegmentsTable() to look for the presence of the new column upgraded_from_segment_id? When auto-table creation isn't enabled, we should let the operator know that the column and index should be added manually if it's not already there.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an existing shortcoming that applies to other tables too.
Maybe we should deal with it in a separate PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping the conversation as unresolved so that we know that is to be addressed later.

@kfaraz
Copy link
Copy Markdown
Contributor

kfaraz commented Jul 3, 2024

@AmatyaAvadhanula , let's finalize the following structure for the responses of the task actions.

RetrieveUpgradedFromSegmentIdsAction should return

class UpgradedFromSegmentsResponse
{
      // Map from a segment ID to the segment ID from which it was upgraded
      // There should be no entry in the map for an original non-upgraded segment
      private final Map<String, String> upgradedFromSegmentIds;
}

RetrieveUpgradedToSegmentIdsAction should return

class UpgradedToSegmentsResponse
{
      // Map from a segment ID to a set containing
      // 1. all segment IDs that were upgraded from it AND are still present in the metadata store
      // 2. the segment ID itself if and only if it is a non-upgraded segment AND is still present in the metadata store
      private final Map<String, Set<String>> upgradedToSegmentIds;
}

Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update and for adding the tests, @AmatyaAvadhanula !
Left some comments.

Comment thread server/src/main/java/org/apache/druid/metadata/UpgradedFromSegmentsResponse.java Outdated
Comment thread server/src/main/java/org/apache/druid/metadata/UpgradedFromSegmentsResponse.java Outdated
Comment thread server/src/main/java/org/apache/druid/metadata/UpgradedToSegmentsResponse.java Outdated
Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the back and forth, @AmatyaAvadhanula .
But the PR needs to be refined a little.

@AmatyaAvadhanula AmatyaAvadhanula requested a review from kfaraz July 15, 2024 03:55
Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your patience through this review, @AmatyaAvadhanula !
Changes look good, minor tweaks can be done later.

final Map<String, String> upgradedFromSegmentIds = new HashMap<>();
connector.retryWithHandle(
handle -> {
Query<Map<String, Object>> query = handle.createQuery(sql)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow up work:
This query should probably be executed on batches of 100 segment IDs at a time.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this controlled by the batch size of kill tasks to some extent?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would be controlled by the batch size of kill. But it is still possible for someone to either increase those limits or just fire this action with a large set of segment IDs. The overlord side should have its own safeguards.

.forEach(id -> upgradedToSegmentIds.computeIfAbsent(id, k -> new HashSet<>()).add(id));
connector.retryWithHandle(
handle -> {
Query<Map<String, Object>> query = handle.createQuery(sql)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow up work:
This one should probably be broken up in batches too.

@AmatyaAvadhanula
Copy link
Copy Markdown
Contributor Author

Thank you for the reviews, @kfaraz and @abhishekrb19!

@AmatyaAvadhanula AmatyaAvadhanula merged commit d6c760f into apache:master Jul 15, 2024
@kfaraz kfaraz deleted the segment_kill_with_upgrades branch July 15, 2024 08:53
sreemanamala pushed a commit to sreemanamala/druid that referenced this pull request Aug 6, 2024
…ache#16667)

Do not kill segments with referenced load specs from deep storage
@writer-jill
Copy link
Copy Markdown
Contributor

@kfaraz this came up in the review of the release notes for Druid 31. Can you please write a user-facing release note for this change? How does the change affect users? Thanks.

writer-jill added a commit to writer-jill/druid that referenced this pull request Sep 26, 2024
@kfaraz kfaraz added this to the 31.0.0 milestone Oct 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants