Check for handoff of upgraded segments#16162
Conversation
| committerSupplier.setMetadata(i + 1); | ||
| Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk()); | ||
| } | ||
| committerSupplier.setMetadata(1); |
There was a problem hiding this comment.
This change is intentional to avoid any uncertainty that may arise due to the order of segment ids.
By adding a single row, exactly one segment is created and we can verify its id in the exception message.
There was a problem hiding this comment.
Can't we control the order in the test?
|
what does |
abhishekagarwal87
left a comment
There was a problem hiding this comment.
Minor comments. LGTM otherwise.
| final Object callerMetadata = metadata == null | ||
| ? null | ||
| : ((AppenderatorDriverMetadata) metadata).getCallerMetadata(); | ||
| final Set<DataSegment> upgradedSegments = new HashSet<>(); |
There was a problem hiding this comment.
this should be moved inside the retry block, right?
There was a problem hiding this comment.
I think that we return the result after the retry block as well and would need the upgraded segments there.
Am I missing something?
|
|
||
| private final Object commitMetadata; | ||
| private final ImmutableList<DataSegment> segments; | ||
| private final ImmutableSet<DataSegment> upgradedSegments; |
There was a problem hiding this comment.
please add a comment that these are extra versions created in case a replace happened in between.
There was a problem hiding this comment.
Better to have this as a javadoc on getUpgradedSegments().
kfaraz
left a comment
There was a problem hiding this comment.
Thanks for the fix, @AmatyaAvadhanula ! Left some comments.
|
|
||
| private final Object commitMetadata; | ||
| private final ImmutableList<DataSegment> segments; | ||
| private final ImmutableSet<DataSegment> upgradedSegments; |
There was a problem hiding this comment.
Why does this class need to distinguish between upgraded segments and root ones? From the POV of this class, all of these segments were committed and thus should all be handed off.
There was a problem hiding this comment.
From the POV of segment handoff, the original and upgraded segments need not be distinguished from each other.
However, there are sanity checks where we must ensure that the committed segments are the same as the ones that were requested. Maintaining the upgraded segments separately helps with this check.
When the check fails and segments need to be killed from deep storage, we would also have to ensure that we do not run into errors trying to kill the same deep storage location multiple times. While this could be done by creating a set of load specs before every kill, I think it may be neater to maintain the orignal and upgraded sets separately for such purposes.
There was a problem hiding this comment.
there are sanity checks where we must ensure that the committed segments are the same as the ones that were requested.
I should think the checks just need to verify that atleast everything that was requested has been committed. There could be other stuff that got committed too.
Also, could you please link the places where these sanity checks are being performed?
There was a problem hiding this comment.
Looking at the code, the cleanup happens in BaseAppenderatorDriver.publishInBackground itself. For cleanup, we would use the original requested segmentsAndCommitMetadata and not the one we freshly created and returned.
That's what we seem to be doing in the current impl in this PR as well.
| throw new RuntimeException(e); | ||
| } | ||
| return segmentsAndCommitMetadata; | ||
| return segmentsAndCommitMetadata.withUpgradedSegments(upgradedSegments); |
There was a problem hiding this comment.
We shouldn't have to distinguish between upgraded and other segments here. Instead of creating a new SegmentsAndCommitMetadata with upgraded segments, we should create a new SegmentsAndCommitMetadata which has a single set containing all the segments (upgraded and otherwise) which were committed and thus must be handed off.
| return segmentsAndCommitMetadata.withUpgradedSegments(upgradedSegments); | |
| return new SegmentsAndCommitMetadata(publishResult.getSegments(), metadata); |
So, essentially, the original segmentsAndCommitMetadata object passed in to this method represents segments we wanted to commit and the new object would represent segments that were actually committed.
There was a problem hiding this comment.
This can be addressed later.
|
Marking as draft as there is a potential problem that still needs to be addressed. A sink One must ensure that the sink |
…upgraded_segments
…upgraded_segments
…upgraded_segments
| } | ||
|
|
||
| public Map<SegmentId, SegmentId> getAnnouncedSegmentsToParentSegments() | ||
| public Map<SegmentId, String> getAnnouncedSegmentsToParentSegments() |
Check notice
Code scanning / CodeQL
Exposing internal representation
kfaraz
left a comment
There was a problem hiding this comment.
Left some comments, a couple of files have not been reviewed yet.
| final RequestBuilder requestBuilder | ||
| = new RequestBuilder(HttpMethod.POST, "/pendingSegmentVersion") | ||
| .jsonContent(jsonMapper, new PendingSegmentVersions(basePendingSegment, newVersionOfSegment)); | ||
| .jsonContent(jsonMapper, pendingSegmentRecord); |
There was a problem hiding this comment.
Why do we need to change this API? The task side doesn't seem to need any info other than the base segment id and the upgraded segment id.
Postponing this refactor until later might simplify this PR a bit.
There was a problem hiding this comment.
We no longer have the original pending segment's SegmentIdWithShardSpec to continue using this API.
| throw new RuntimeException(e); | ||
| } | ||
| return segmentsAndCommitMetadata; | ||
| return segmentsAndCommitMetadata.withUpgradedSegments(upgradedSegments); |
There was a problem hiding this comment.
This can be addressed later.
| expectedException.expect(ExecutionException.class); | ||
| expectedException.expectCause(CoreMatchers.instanceOf(ISE.class)); | ||
| expectedException.expectMessage( | ||
| "Fail test while dropping segment[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123]" |
There was a problem hiding this comment.
Why don't we have the segment ID in the message anymore?
There was a problem hiding this comment.
We do but I'm observing that there is some flakiness in the test.
My local setup throws an exception with the same segment id.
The Github actions fail with a different segment id.
| committerSupplier.setMetadata(i + 1); | ||
| Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk()); | ||
| } | ||
| committerSupplier.setMetadata(1); |
There was a problem hiding this comment.
Can't we control the order in the test?
| baseSegmentToUpgradedSegments.get(basePendingSegment).add(newSegmentVersion); | ||
| upgradedSegmentToBaseSegment.put(newSegmentVersion, basePendingSegment); |
There was a problem hiding this comment.
These two operations can be put inside a method. Also, I guess it is better to use computeIfAbsent for baseSegmentToUpgradedSegments.
There was a problem hiding this comment.
We ensure that the base segment has been added for every sink that is present.
i.e the set in the value of the map needs to contain the key as well.
2dc3b10 to
248d614
Compare
…aAvadhanula/druid into wait_for_handoff_upgraded_segments
| * Unannounces the given base segment and all its upgraded versions. | ||
| */ | ||
| private void unannounceAllVersionsOfSegment(DataSegment baseSegment) throws IOException | ||
| private void unannounceAllVersionsOfSegment(DataSegment baseSegment) |
There was a problem hiding this comment.
Maybe add a comment in the javadoc that this method should be synchronized on the corresponding sink. Alternatively, you could even take the sink as an argument and synchronize on it inside the method rather than relying on the callers to do so.
There was a problem hiding this comment.
Synchronized inside the method. Thanks!
| if (baseId == null) { | ||
| return; | ||
| } | ||
| baseSegmentToUpgradedSegments.get(baseId).remove(id); |
There was a problem hiding this comment.
Even though the set here cannot be null, it is best to handle the null case too.
There was a problem hiding this comment.
This no longer needs to be handled because of #16162 (comment)
| if (baseSegmentToUpgradedSegments.get(baseId).isEmpty()) { | ||
| baseSegmentToUpgradedSegments.remove(baseId); |
There was a problem hiding this comment.
Why should we do this here? Why can't we do this at the end of unannounceAllVersionsOfSegment?
There was a problem hiding this comment.
Done. Thanks for suggesting the simpilfication
| public void registerNewVersionOfPendingSegment(PendingSegmentRecord pendingSegmentRecord) throws IOException | ||
| { | ||
| SegmentIdWithShardSpec basePendingSegment = idToPendingSegment.get(pendingSegmentRecord.getUpgradedFromSegmentId()); | ||
| SegmentIdWithShardSpec newSegmentVersion = pendingSegmentRecord.getId(); |
There was a problem hiding this comment.
Rename for homogeneity with rest of the Druid code:
| SegmentIdWithShardSpec newSegmentVersion = pendingSegmentRecord.getId(); | |
| SegmentIdWithShardSpec upgradedPendingSegment = pendingSegmentRecord.getId(); |
There was a problem hiding this comment.
Done. Renamed the method as well
| // The base segment is associated with itself in the maps to maintain all the upgraded ids of a sink. | ||
| baseSegmentToUpgradedSegments.put(identifier, new HashSet<>()); | ||
| baseSegmentToUpgradedSegments.get(identifier).add(identifier); | ||
| upgradedSegmentToBaseSegment.put(identifier, identifier); |
There was a problem hiding this comment.
It's not anymore.
kfaraz
left a comment
There was a problem hiding this comment.
Looks good, have some minor queries, none of which are blockers to this PR.
| ); | ||
| } | ||
|
|
||
| private ListenableFuture<?> abandonSegment( |
There was a problem hiding this comment.
Could you add a javadoc to this method? When exactly is a segment abandoned?
There was a problem hiding this comment.
Every segment is abandoned when the StreamAppenderator is closed or cleared.
A segment is also marked to be abandoned by the StreamAppenderatorDriver when it has been handed off
|
Thanks a lot for the changes, @AmatyaAvadhanula ! |
|
@kfaraz, @abhishekagarwal87 Thank you for the reviews |
Changes: 1) Check for handoff of upgraded realtime segments. 2) Drop sink only when all associated realtime segments have been abandoned. 3) Delete pending segments upon commit to prevent unnecessary upgrades and partition space exhaustion when a concurrent replace happens. This also prevents potential data duplication. 4) Register pending segment upgrade only on those tasks to which the segment is associated.
Changes: 1) Check for handoff of upgraded realtime segments. 2) Drop sink only when all associated realtime segments have been abandoned. 3) Delete pending segments upon commit to prevent unnecessary upgrades and partition space exhaustion when a concurrent replace happens. This also prevents potential data duplication. 4) Register pending segment upgrade only on those tasks to which the segment is associated. Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
CHANGES
Currently streaming ingestion tasks check for the handoff of segments in SegmentsAndCommitMetadata.getSegments() which contains the set of segments allocated by the task.
This can lead to a temporary missing of data from queries while the upgraded segments are being handed off as they will be dropped from peons when the older versions of segments are committed.
This patch adds the set of upgraded segments to the class, to enable checking for handoff of the upgraded segments as well.
If a sink S is associated with (realtime) segment versions S0, S1 and S2, we must wait until all of them have been unannounced before dropping the sink.
A realtime task such as index_kafka or index_kinesis may append segments multiple times during its lifetime.
Deleting pending segments in the same transaction in which they are committed prevents unneeded upgrades and partition space exhaustion when a concurrent replace happens.
More importantly this helps with change (2) to prevent a race where an upgraded pending segment is announced for a committed segment causing temporary data duplication.
If there are N pending realtime segments and T tasks, we make O(N * T) calls today by trying to upgrade each of the N pending segments on each of the T tasks.
By checking if
task.baseSequenceName == pendingSegment.taskAllocatorId, we can reduce this to O(N)This PR has: