Skip to content

Associate pending segments with the tasks that requested them#16144

Merged
kfaraz merged 28 commits intoapache:masterfrom
AmatyaAvadhanula:pending_segments_with_tasks
Apr 17, 2024
Merged

Associate pending segments with the tasks that requested them#16144
kfaraz merged 28 commits intoapache:masterfrom
AmatyaAvadhanula:pending_segments_with_tasks

Conversation

@AmatyaAvadhanula
Copy link
Copy Markdown
Contributor

@AmatyaAvadhanula AmatyaAvadhanula commented Mar 18, 2024

This PR aims to associate pending segments with the task groups that created them.

Motivation:

  1. The association facilitates clean up of unneeded pending segments as soon as all tasks in a group exit.

Pending segment clean up from helps delete entries immediately after tasks exit and can alleviate the load on the metadata store during segment allocations. This can also help with segment allocation failures due to conflicting pending segments that are no longer needed in some cases.


  1. This also allows a change in the way segment transactional append and replace actions to commit segments are used for concurrent append and replace.
    A) When a concurrent replace occurs, it can upgrade any pending segments for concurrent append tasks to the replacing task's lock version.
    B) When an appending task commits segments, it can not only commit the pending segments that it directly created but also their upgraded versions.

Previously, upgraded pending segments created by replace tasks could have different ids than the ones committed by the appending task finally. This doesn't affect batch appends.
However for concurrent streaming ingestion, there could be a race where an upgraded pending segment on the indexer would correspond to the same root segment as the parent of a different upgraded segment committed by the job.
In such a case, there could be a brief period when the upgraded realtime segment is being served, while the committed segment with a different id is also being served on a historical. Since their ids are distinct, the broker would allow both their results to be merged leading to data dupilcation in queries.

The change in protocol ensures that an append action upgrades a segment set which corresponds exactly to the pending segment upgrades made by the concurrent replace action, and eliminates any dupilcation in query results that may occur due to the above race.


Implementation details:

Changes to `druid_pendingSegments` metadata table:

group_id -> task replica group id for streaming ingestion
	             index_parallel task id for native batch ingestion
	             controller id for MSQ Insert

parent -> The pending segment using which the current entry was upgraded



* Pending segment clean up (Invoke on overlord directly instead of creating new task action)
- When a ParallelIndexSupervisorTask or MSQController task exits, clean all pending segments associated with its id. Useful in the case of task failures
- If a streaming ingestion job exits and there are no other active tasks corresponding to its base sequence name, clear all pending segments associated with it.


Changes to task actions:

* SegmentAllocateAction
- Associate pending segment with group_id at the time of pending segment write to metadata store. parent = self


* SegmentTransactionalReplace
- For a replace lock held over an interval:
    transaction {
      commit input segments contained within interval
      upgrade ids in the upgradeSegments table corresponding to this task to the replace lock's version and commit them
      fetch payload, group_id for pending segments 
      upgrade each such pending segment to the replace lock's version with the corresponding parent
    }
    For every pending segment with version == replace lock version:
	Fetch payload, group_id or the pending segment and relay them to the supervisor
	The supervisor relays the payloads to all the tasks with the corresponding group_id to serve realtime queries


* SegmentTransactionalAppend
- For an append lock held over an interval:
    transaction {
      commit input segments contained within interval
      if there is an active replace lock over the interval:
        add an entry for the inputSegment corresponding to the replace lock's task in the upgradeSegments table
      fetch pending segments with parent contained within the input segments, and commit them
    }

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@github-actions github-actions Bot added Area - Batch Ingestion Area - Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Mar 18, 2024
Comment thread server/src/main/java/org/apache/druid/metadata/PendingSegment.java Fixed
)
)
);
alterPendingSegmentsTableAddParentIdAndTaskGroup(tableName);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be better to move this function out to createPendingSegmentsTable() after the call to this method.

log.info("Table[%s] already has column[task_group].", tableName);
} else {
log.info("Adding column[task_group] to table[%s].", tableName);
statements.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN task_group VARCHAR(255)", tableName));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to validateSegmentsTable(), do we also need validation that the pending segments table is upgraded to the desired schema?

Comment thread server/src/main/java/org/apache/druid/metadata/PendingSegment.java Outdated
Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for the changes, @AmatyaAvadhanula !

I have tried to spend some more time with the approach and I agree with you that this seems to be the best path forward.

  • It simplifies the upgrade logic to some extent.
  • It offers the opportunity to actively clean up pending segments once they are not needed anymore.
  • It probably also safeguards against OL crashes. In the current impl (before this PR), the OL going down would probably cause pending segment to upgraded pending segment mappings to get lost as they are not clearly persisted anywhere. The prev_segment_id might have that info but it is vague since that column has a very broad definition at this point.

Suggestions
These are my main suggestions:

  • Rename parent_id to upgraded_from_segment_id. That clarifies the exact meaning and purpose of this column.
  • This column should be null for an original (non-upgraded) pending segments.
  • Rename task_group to something more distinct. We can either stick to base_sequence_name because afaict, this value is always going to be the same as the baseSequenceName. Otherwise, how about we call it task_allocator_id?
  • Leave out the cleanup logic in the TaskLockbox, we can do that later.

@@ -135,14 +135,16 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
if (startMetadata == null) {
publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
segments,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point, the plan was to have one action for just committing segments and another action for committing segments and metadata both. So we decided to keep a method that would just commit segments.

But we eventually decided against having the two actions as it didn't really serve a lot of purpose. So now we could simplify the IndexerMetadataStorageCoordinator interface too.

task.getId()
);
}
final String pendingSegmentGroup = task.getPendingSegmentGroup();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would advise keeping the cleanup logic in a separate PR. We will be able to focus and test on it better.

Comment thread server/src/main/java/org/apache/druid/metadata/PendingSegment.java Outdated
Comment thread indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java Outdated
);
}

protected void setSupervisorManager(SupervisorManager supervisorManager)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than this, have a separate createTaskActionToolbox method that accepts a SupervisorManager.

Comment thread indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java Outdated
@kfaraz kfaraz marked this pull request as ready for review April 5, 2024 04:21
);
}

public Map<SegmentId, SegmentId> getAnnouncedSegmentsToParentSegments()

Check notice

Code scanning / CodeQL

Exposing internal representation

getAnnouncedSegmentsToParentSegments exposes the internal representation stored in field announcedSegmentsToParentSegments. The value may be modified [after this call to getAnnouncedSegmentsToParentSegments](1). getAnnouncedSegmentsToParentSegments exposes the internal representation stored in field announcedSegmentsToParentSegments. The value may be modified [after this call to getAnnouncedSegmentsToParentSegments](2).
@SuppressWarnings("UnstableApiUsage")
public String computeSequenceNamePrevIdSha1(boolean skipSegmentLineageCheck)
{
final Hasher hasher = Hashing.sha1().newHasher()

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation

Invoking [Hashing.sha1](1) should be avoided because it has been deprecated.
Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more comments. Yet to review the cleanup logic in TaskLockbox, will try to get it done soon.

pendingSegment.getId().asSegmentId().toString(),
pendingSegment.getId()
));
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradedPendingSegments = new HashMap<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, rename to segmentToParent (we can omit the prefix pending as this method only deals with pending segments, and it can be taken for granted.)

// this set should be accessed under the giant lock.
private final Set<String> activeTasks = new HashSet<>();

// Stores map of pending task group of tasks to the set of their ids.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines +50 to +57
* Pseudo code (for a single interval):
* For an append lock held over an interval:
* transaction {
* commit input segments contained within interval
* if there is an active replace lock over the interval:
* add an entry for the inputSegment corresponding to the replace lock's task in the upgradeSegments table
* fetch pending segments with parent contained within the input segments, and commit them
* }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem appropriate to have the implementation described as pseudo-code. Someone might as well read the code. It is better to briefly describe the key points of the implementation in a list fashion. (This is not a blocker for the PR).

* your task for the segment intervals.
*
* <pre>
* Pseudo code (for a single interval)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment regarding pseudo-code.

Comment on lines +23 to +24
* An interface to be implemented by every appending task that allocates pending segments.
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* An interface to be implemented by every appending task that allocates pending segments.
*/
* An append task that can allocate pending segments. All concrete {@link Task} implementations that need to allocate pending segments must implement this interface.
*/

Comment thread server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java Outdated
Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed cleanup logic, flow looks okay. Left some comments. Will take another final pass after all the existing comments are addressed.

// this set should be accessed under the giant lock.
private final Set<String> activeTasks = new HashSet<>();

// Stores map of pending task group of tasks to the set of their ids.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address this and also rename this map to activeAllocatorIdToTaskIds. You need not declare it as a HashMap, just a Map would suffice.

Comment thread server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java Outdated
Comment on lines +38 to +42
* <li> id -> id (Unique identifier for pending segment) <li/>
* <li> sequence_name -> sequenceName (sequence name used for segment allocation) <li/>
* <li> sequence_prev_id -> sequencePrevId (previous segment id used for segment allocation) <li/>
* <li> upgraded_from_segment_id -> upgradedFromSegmentId (Id of the root segment from which this was upgraded) <li/>
* <li> task_allocator_id -> taskAllocatorId (Associates a task / task group / replica group with the pending segment) <li/>
Copy link
Copy Markdown
Contributor

@kfaraz kfaraz Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This list is not needed here, the description of the fields should be in the javadocs of the respective getters. (not a blocker for this PR).

Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are leftover comments that can be addressed in a follow up.


@JsonTypeName(MSQWorkerTask.TYPE)
public class MSQWorkerTask extends AbstractTask
public class MSQWorkerTask extends AbstractTask implements PendingSegmentAllocatingTask
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class need not implement PendingSegmentAllocatingTask as it never actually does any allocation. The allocation is always done by the controller task.

this can be addressed in a follow up PR.

Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> segmentToParent = new HashMap<>();
pendingSegments.forEach(pendingSegment -> {
if (pendingSegment.getUpgradedFromSegmentId() != null
&& !pendingSegment.getUpgradedFromSegmentId().equals(pendingSegment.getId().asSegmentId().toString())) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the upgradedFromSegmentId ever be equal to the id itself?
A normal/root pending segment (i.e. one created by allocation and not upgrade) would have upgraded_from_segment_id as null, right?

"Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
);
final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments here would be helpful.

));
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> segmentToParent = new HashMap<>();
pendingSegments.forEach(pendingSegment -> {
if (pendingSegment.getUpgradedFromSegmentId() != null
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we only look at pending segments that were upgraded by this task rather than all upgraded pending segments?

/**
*/
public class NoopTask extends AbstractTask
public class NoopTask extends AbstractTask implements PendingSegmentAllocatingTask
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does NoopTask need to implement the new interface for the purpose of tests?

* @param taskAllocatorId task id / task group / replica group for an appending task
* @return number of pending segments deleted from the metadata store
*/
int deletePendingSegmentsForTaskGroup(String taskAllocatorId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method needs to be renamed.

}
catch (Exception e) {
log.error(e, "PendingSegment[%s] mapping update request to version[%s] on Supervisor[%s] failed",
log.error(e, "PendingSegmentRecord[%s] mapping update request to version[%s] on Supervisor[%s] failed",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is not needed.

// always insert empty previous sequence id
insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1);
insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1,
taskAllocatorId
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting is off.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area - Batch Ingestion Area - Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants