Allow reordered segment allocation in kafka indexing service#5805
Allow reordered segment allocation in kafka indexing service#5805gianm merged 3 commits intoapache:masterfrom
Conversation
| void setAppendingSegment(SegmentWithState appendingSegment) | ||
| { | ||
| // There should be only one appending segment at any time | ||
| Preconditions.checkState(this.appendingSegment == null); |
There was a problem hiding this comment.
Please include an error message here. (Probably a "WTF?!" message if it should never happen.)
| this.appendingSegment = appendingSegment; | ||
| } | ||
|
|
||
| void addAppendFinishedSegment(SegmentWithState appendFinishedSegment) |
There was a problem hiding this comment.
Is this only supposed to be using during bootstrapping (startJob)? It doesn't seem like it would make sense otherwise. It could be clearer if this was made into a constructor instead: something that takes a list of initial segments. (Up to you though - this is just a suggestion)
There was a problem hiding this comment.
Sounds good. Fixed.
| // UNIQUE key for the row, ensuring sequences do not fork in two directions. | ||
| // Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines | ||
| // have difficulty with large unique keys (see https://github.com/druid-io/druid/issues/2319) | ||
| final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode( |
There was a problem hiding this comment.
There's a bit too much code duplication here. Please share some more code between this method and the other similar one. I know it is slightly different, but it seems close enough that it could be shared. Perhaps take a string for the secondary key and have that either be the previousId (in one path) or the interval (in another path).
| // Avoiding ON DUPLICATE KEY since it's not portable. | ||
| // Avoiding try/catch since it may cause inadvertent transaction-splitting. | ||
|
|
||
| // UNIQUE key for the row, ensuring sequences do not fork in two directions. |
There was a problem hiding this comment.
This comment is not accurate (its purpose is no longer "ensuring sequences do not fork in two directions"; it changed so now its purpose is to ensure we don't have more than one segment per sequence per interval).
| .asBytes() | ||
| ); | ||
|
|
||
| handle.createStatement( |
There was a problem hiding this comment.
This code seems shareable too.
| ) throws IOException | ||
| { | ||
| return append(row, sequenceName, null, false, true); | ||
| return append(row, sequenceName, null, true, true); |
There was a problem hiding this comment.
Why is the BatchAppenderatorDriver skipping the lineage check now? I thought it could still make more than one segment per interval if it's running in non-incremental-publishing mode.
| .filter(segmentWithState -> segmentWithState.getState() == SegmentState.APPENDING) | ||
| .map(SegmentWithState::getSegmentIdentifier) | ||
| .collect(Collectors.toList()); | ||
| final Map<SegmentIdentifier, SegmentWithState> requestedSegmentIdsForSequences = getAppendingSegments(sequenceNames) |
There was a problem hiding this comment.
What is the reason for moving the creation of requestedSegmentIdsForSequences from after the push, to before the push? Is it fixing something?
There was a problem hiding this comment.
I think it shouldn't fix anything, but is more reliable and understandable.
…5805) * Allow reordered segment allocation in kafka indexing service * address comments * fix a bug
…5805) * Allow reordered segment allocation in kafka indexing service * address comments * fix a bug
…5805) * Allow reordered segment allocation in kafka indexing service * address comments * fix a bug
Fix #5761.
Major changes are:
SegmentsOfInterval)IndexerSQLMetadataStorageCoordinator.allocatePendingSegment()to respectskipSegmentLineageCheckto avoid the unique constraint violation forsequence_name_prev_id_sha1. IfskipSegmentLineageCheckis true,sequence_prev_idis always an empty string andsequence_name_prev_id_sha1is created as below.skipSegmentLineageCheckcan be still false for backward compatibility.This change is