-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Use unique segment paths for Kafka indexing #5692
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,7 @@ | |
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.UUID; | ||
|
|
||
| @ExtensionPoint | ||
| public interface DataSegmentPusher | ||
|
|
@@ -44,36 +45,48 @@ public interface DataSegmentPusher | |
| * Pushes index files and segment descriptor to deep storage. | ||
| * @param file directory containing index files | ||
| * @param segment segment descriptor | ||
| * @param replaceExisting overwrites existing objects if true, else leaves existing objects unchanged on conflict. | ||
| * The behavior of the indexer determines whether this should be true or false. For example, | ||
| * since Tranquility does not guarantee that replica tasks will generate indexes with the same | ||
| * data, the first segment pushed should be favored since otherwise multiple historicals may | ||
| * load segments with the same identifier but different contents which is a bad situation. On | ||
| * the other hand, indexers that maintain exactly-once semantics by storing checkpoint data can | ||
| * lose or repeat data if it fails to write a segment because it already exists and overwriting | ||
| * is not permitted. This situation can occur if a task fails after pushing to deep storage but | ||
| * before writing to the metadata storage, see: https://github.com/druid-io/druid/issues/5161. | ||
| * @param useUniquePath if true, pushes to a unique file path. This prevents situations where task failures or replica | ||
| * tasks can either overwrite or fail to overwrite existing segments leading to the possibility | ||
| * of different versions of the same segment ID containing different data. As an example, a Kafka | ||
| * indexing task starting at offset A and ending at offset B may push a segment to deep storage | ||
| * and then fail before writing the loadSpec to the metadata table, resulting in a replacement | ||
| * task being spawned. This replacement will also start at offset A but will read to offset C and | ||
| * will then push a segment to deep storage and write the loadSpec metadata. Without unique file | ||
| * paths, this can only work correctly if new segments overwrite existing segments. Suppose that | ||
| * at this point the task then fails so that the supervisor retries again from offset A. This 3rd | ||
| * attempt will overwrite the segments in deep storage before failing to write the loadSpec | ||
| * metadata, resulting in inconsistencies in the segment data now in deep storage and copies of | ||
| * the segment already loaded by historicals. | ||
| * | ||
| * If replaceExisting is true, existing objects MUST be overwritten, since failure to do so | ||
| * will break exactly-once semantics. If replaceExisting is false, existing objects SHOULD be | ||
| * prioritized but it is acceptable if they are overwritten (deep storages may be eventually | ||
| * consistent or otherwise unable to support transactional writes). | ||
| * If unique paths are used, caller is responsible for cleaning up segments that were pushed but | ||
| * were not written to the metadata table (for example when using replica tasks). | ||
| * @return segment descriptor | ||
| * @throws IOException | ||
| */ | ||
| DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException; | ||
| DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException; | ||
|
|
||
| //use map instead of LoadSpec class to avoid dependency pollution. | ||
| Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath); | ||
|
|
||
| /** | ||
| * @deprecated backward-compatibiliy shim that should be removed on next major release; | ||
| * use {@link #getStorageDir(DataSegment, boolean)} instead. | ||
| */ | ||
| @Deprecated | ||
| default String getStorageDir(DataSegment dataSegment) | ||
| { | ||
| return getDefaultStorageDir(dataSegment); | ||
| return getStorageDir(dataSegment, false); | ||
| } | ||
|
|
||
| default String getStorageDir(DataSegment dataSegment, boolean useUniquePath) | ||
| { | ||
| return getDefaultStorageDir(dataSegment, useUniquePath); | ||
| } | ||
|
|
||
| default String makeIndexPathName(DataSegment dataSegment, String indexName) | ||
| { | ||
| return StringUtils.format("./%s/%s", getStorageDir(dataSegment), indexName); | ||
| // This is only called from Hadoop batch which doesn't require unique segment paths so set useUniquePath=false | ||
| return StringUtils.format("./%s/%s", getStorageDir(dataSegment, false), indexName); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -89,13 +102,19 @@ default List<String> getAllowedPropertyPrefixesForHadoop() | |
| // If above format is ever changed, make sure to change it appropriately in other places | ||
| // e.g. HDFSDataSegmentKiller uses this information to clean the version, interval and dataSource directories | ||
| // on segment deletion if segment being deleted was the only segment | ||
| static String getDefaultStorageDir(DataSegment segment) | ||
| static String getDefaultStorageDir(DataSegment segment, boolean useUniquePath) | ||
| { | ||
| return JOINER.join( | ||
| segment.getDataSource(), | ||
| StringUtils.format("%s_%s", segment.getInterval().getStart(), segment.getInterval().getEnd()), | ||
| segment.getVersion(), | ||
| segment.getShardSpec().getPartitionNum() | ||
| segment.getShardSpec().getPartitionNum(), | ||
| useUniquePath ? generateUniquePath() : null | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you think about receiving a suffix as a parameter and making the caller to be responsible for passing a unique suffix (like number of task attempts)? It would be safer than a short random UUID.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, I can go either way on that. I actually think it would be less safe to have the suffix passed as a parameter, since a caller may not in general understand all the possible failure modes and may provide a non-random suffix that doesn't actually help the issue, and also may not understand what are valid filesystem characters for the suffix (but we can validate for this). The advantage would be that all segments generated by a particular task could be tagged with the same prefix for tracking, if that's something interesting to us. A UUID chopped to 5 characters still has over 1 million possibilities so I'm not too worried about collisions.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. It sounds good. |
||
| ); | ||
| } | ||
|
|
||
| static String generateUniquePath() | ||
| { | ||
| return UUID.randomUUID().toString(); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if a deep storage pusher impl doesn't respect
useUniquePath? This will likely happen for extensions whose authors don't notice the signature change here. It's probably acceptable if what happens is you just get the old behavior.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, this is a little janky, but note that the signature didn't really change, but the meaning of that last boolean parameter did. At first I wanted to force a signature change so that implementors would have to acknowledge the change, but as discussed, decided to make it 'backward-compatible' for the point release. So implementors who don't notice the signature change will get 'replaceExisting=true' behavior for Kafka indexing tasks and 'replaceExisting=false' for all other task types, which seemed reasonable to me (since
replaceExisting=truewas added primarily for the Kafka indexing task type as well)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds fair.