Make intermediate store for shuffle tasks an extension point#11492
Make intermediate store for shuffle tasks an extension point#11492maytasm merged 6 commits intoapache:masterfrom
Conversation
|
This pull request introduces 1 alert when merging caabe48 into c98e7c3 - view on LGTM.com new alerts:
|
|
This pull request introduces 1 alert when merging 3a7a2d6 into c98e7c3 - view on LGTM.com new alerts:
|
|
This pull request introduces 1 alert when merging 6086f65 into fcb908d - view on LGTM.com new alerts:
|
| } | ||
| ).build(); | ||
| try { | ||
| long size = partitionFile.get().size(); |
There was a problem hiding this comment.
This will potentially read the entire file to get the size. I believe partitionFile.length() is just a metadata lookup on the file to get the length.
There was a problem hiding this comment.
partitionFile is of type Optional<ByteSource>. The get is to get the ByteSource from the wrapping Optional class.
There was a problem hiding this comment.
I was looking at the default implementation of ByteSource#size() which opens the stream, reads it and counts the bytes. It looks like FileByteSource has overridden that implementation to return the length of the File without reading the bytes.
At this point, I'm mostly concerned about whether someone can trip over this by changing the implementation and not realizing that this operation should be fast. Anyways, that's a problem for another time.
There was a problem hiding this comment.
The LocalIntermediaryDataManager uses Files.asByteSource() which returns FileByteSource which do file.length() which is the same as current behavior. The extension (that implements IntermediaryDataManager) can extends ByteSource and provide a method of returning size efficiently. I added javadoc to indicate this
| JsonConfigProvider.bind(binder, "druid", DruidNode.class, Parent.class); | ||
| JsonConfigProvider.bind(binder, "druid.worker", WorkerConfig.class); | ||
|
|
||
| CliPeon.configureIntermediaryData(binder); |
There was a problem hiding this comment.
Are there any integration tests that verify shuffle with indexers continue to work after this change? I haven't looked at the existing integration tests closely
There was a problem hiding this comment.
There are many ITs that run indexers with the new config unset (which basically fallback to using "local" storage for storing intermediary segments via LocalIntermediaryDataManager). Specifically, input source integration test with Indexer runs ingestion with Hashed partitioning and maxNumConcurrentSubTasks=10, which would run the ingestion in two phases (first phase which persist to local using LocalIntermediaryDataManager and second phase which reads segments from first phase). Similarly, there are also some other ITs in compaction/auto compaction that uses Hashed partitioning.
|
The LGTM is a false positive. We would only return the |
|
This pull request introduces 1 alert when merging aa2a2c2 into fcb908d - view on LGTM.com new alerts:
|
|
missed this, @maytasm are you already working on support for deep storage as intermediate data store as I am also in middle of implementing it. |
|
Deep store support feature - #11507 |
Make intermediate store for shuffle tasks an extension point
Description
This PR makes IntermediaryDataManager and ShuffleClient an interface with method that can be implements by extension to customize intermediate storage location for shuffle tasks. For example, implementation can be added via extensions to support different cloud storages to store intermediate data for shuffle tasks.
More details: #11297
Note that IntermediaryDataManager has been renamed to LocalIntermediaryDataManager with a few additional changes:
@Overridefor methods that are now interfaced by (new) IntermediaryDataManager interface class.This PR has: