Allow using composed storage for SuperSorter intermediate data#13368
Allow using composed storage for SuperSorter intermediate data#13368rohangarg merged 19 commits intoapache:masterfrom
Conversation
2eb7df2 to
ad9f07e
Compare
LakshSingla
left a comment
There was a problem hiding this comment.
Thanks for the PR 🚀 Left some initial comments around the helper classes. Will review the implementation later.
| public static final String CTX_ENABLE_DURABLE_SHUFFLE_STORAGE = "durableShuffleStorage"; | ||
| private static final boolean DEFAULT_ENABLE_DURABLE_SHUFFLE_STORAGE = false; | ||
|
|
||
| public static final String CTX_ENABLE_DURABLE_TASK_INTERMEDIATE_STORAGE = "durableTaskIntermediateStorage"; |
There was a problem hiding this comment.
I think that we should somehow try to merge this config property with durableShuffleStorage to make it easier for the users to configure. Adding another property means that the user can now have up to 4 configurations of durableShuffleStorage and durableTaskIntermediateStorage each with different semantics. WDYT?
There was a problem hiding this comment.
I had initially added the separate feature flag because durable storage reads were coming to be very slow - which would lead to slow UTs and also slow unintended user queries. Now that #13373 is merged, I have removed the extra flag. Also made the super sorter test run with both file based and durable storage based output channels
| * @return InputStream starting from the given offset limited by the given size | ||
| * @throws IOException if the path is not present or the unable to read the data present on the path | ||
| */ | ||
| InputStream readRange(String path, long from, long size) throws IOException; |
There was a problem hiding this comment.
Should the behaviour of the implementations be defined here under following conditions:
- from > size of underlying storage (undefined, error, or we return empty output)
- from + size > size of underlying storage (undefined, error, or we chomp the stream)
- from < 0 (undefined, error, or from indicates index from the end)
- size < 0 (undefined, error, or we read in reverse)
There was a problem hiding this comment.
have added some validation checks for inputs in local and s3 connector. haven't done the size based check in s3 connector yet since it'll require a status api call for the object first - I'm not sure if it is necessary to do that check as of now.
|
This pull request introduces 4 alerts when merging 7c437c0 into 9e938b5 - view on LGTM.com new alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
gianm
left a comment
There was a problem hiding this comment.
Skimmed through and had some comments on the interfaces.
| import java.util.function.Function; | ||
| import java.util.function.Supplier; | ||
|
|
||
| public class PartitionedOutputChannel |
There was a problem hiding this comment.
I wonder if this could be rolled into OutputChannel? i.e. by adding a method like, ReadableFrameChannel getReadableChannel(int partitionNumber). It would throw an error for non-partitioned outputs.
There was a problem hiding this comment.
I tried rolling both of them into the same class - but it gets a bit messy since OutputChannel also takes partitionNumber as an input and provides method to fetch it. So, the operations related to partitionNumber would also have to be checked before returning error or output along with getReadableChannel(partition) and getReadableChannel()
| * Provides an interface to read a partitioned frame channel. The channel might have frames with multiple partitions | ||
| * in it. | ||
| */ | ||
| public interface PartitionedReadableFrameChannel extends Closeable |
There was a problem hiding this comment.
This is more of a supplier than an actual channel, so PartitionedReadableFrameChannelSupplier, or something like that, would be a clearer name.
Javadocs should also describe what happens when this is closed.
There was a problem hiding this comment.
I tried changing it to PartitionedReadableFrameChannelSupplier but it becomes a bit confusing later on since the readable channels are sent around wrapped in another Supplier object. It ends up looking like Supplier< PartitionedReadableFrameChannelSupplier>.
I have changed the method name a bit in the interface to make things more clear. Please let me know if this looks ok or should we change to supplier suffix for more clarity
7c437c0 to
3553f9e
Compare
|
This pull request introduces 4 alerts when merging 3553f9e into 7cf761c - view on LGTM.com new alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
7064142 to
159c5d7
Compare
|
This pull request introduces 4 alerts when merging 159c5d7 into db7c29c - view on LGTM.com new alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
| public void close() | ||
| { | ||
| try { | ||
| storageConnector.deleteFile(frameFileFullPath); |
There was a problem hiding this comment.
If the footerFile and frameFileFullPath are not created in the channel then should they be deleted on close or should the responsibility lie with the object's creator?
There was a problem hiding this comment.
These files are created by the writable channel and are passed to the readable channel - so I think it should be ok to delete them when the readable channel closes. The resources are passed b/w writable-readable channel since the output-channel doesn't provide an explicit close method.
| this.frameFileFooterSupplier = frameFileFooterSupplier; | ||
| this.frameFileFullPath = frameFileFullPath; | ||
| this.remoteInputStreamPool = remoteInputStreamPool; | ||
| this.footerFile = footerFile; |
There was a problem hiding this comment.
nit: Do we need this? I didn't find relevance of this param in the file apart from in the close method.
There was a problem hiding this comment.
It is needed since we're deleting the file with the close of the readable channel
…uper_sorter Conflicts: extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
|
This pull request introduces 4 alerts when merging 55bddeb into 50963ed - view on LGTM.com new alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
cryptoe
left a comment
There was a problem hiding this comment.
Need to go through the frameFileFooter code. Will finish the review by EOD
| this.durableStageStorageEnabled = MultiStageQueryContext.isDurableShuffleStorageEnabled( | ||
| QueryContext.of(task.getContext()) | ||
| ); | ||
| this.durableTaskIntermediateStorageEnabled = MultiStageQueryContext.isDurableTaskIntermediateStorageEnabled( |
There was a problem hiding this comment.
I think we do not need this parameter at all. Eventually, we would use tiered spilling so I think we should be okay if we are spilling everything to remote storage.
| - sh -c "dmesg | egrep -i '(oom|out of memory|kill process|killed).*' -C 1 || exit 0" | ||
| - free -m | ||
| - ${MVN} -pl ${MAVEN_PROJECTS} jacoco:report | ||
| - travis_wait 15 ${MVN} -pl ${MAVEN_PROJECTS} jacoco:report |
There was a problem hiding this comment.
I think this change would be orthogonal to the PR no?
There was a problem hiding this comment.
No, this is needed since SuperSorterTest runs for > 10 minutes sometimes and doesn't produce any log in a successful case. So, the wait for test to finish is increased to 15 minutes.
| @Override | ||
| public InputStream readRange(String path, long from, long size) throws IOException | ||
| { | ||
| if (from < 0 || size < 0) { |
There was a problem hiding this comment.
Can the public InputStream read(String path) throws IOException and this method use the same common base method?
Maybe pass the method the GetObjectRequest , that way there is less duplication.
There was a problem hiding this comment.
Thanks for the pointer, combined their logic.
| ); | ||
| } | ||
|
|
||
|
|
There was a problem hiding this comment.
Nit: Lets java doc this method.
| /** | ||
| * Create a channel pair tagged with a particular name and a flag to delete the channel data after its read. | ||
| */ | ||
| PartitionedOutputChannel openPartitionedChannel(String name, boolean deleteAfterRead) throws IOException; |
There was a problem hiding this comment.
I have a similar comment like @gianm . I think the two interface calls looks very similar.
Couple of questions:
- Does partitionName have to be a string and not an int or another way is why not work with partition numbers only ?
- From the javadoc, the pair seems to be of a OutputChannel and the partition name and if the channel data needs to be deleted on exit. My question is the called would be interested in the output channel for that "partition" and clean up. So why do we need another API for this. We could have a cleaning output channel factory impl which could do the cleaning for us no ?
There was a problem hiding this comment.
- the name is not a partition name, but rather a generic name for the channel which can contain multiple partitions.
- FrameFiles can be shared amongst multiple independent readers currently - and they are deleted only when all references to the file are removed. So, that's why had to pass the deletion flag down to the channel creator. A cleaning factory could be created but then it would also have to do ref counting per frame file before cleaning.
| private final Supplier<PartitionedReadableFrameChannel> readableChannelSupplier; | ||
|
|
||
| private PartitionedOutputChannel( | ||
| @Nullable final WritableFrameChannel writableChannel, |
There was a problem hiding this comment.
can the nullable be removed since you are doing a precheck in pair method ?
| import java.util.function.Function; | ||
| import java.util.function.Supplier; | ||
|
|
||
| public class PartitionedOutputChannel |
There was a problem hiding this comment.
class level docs would be very helpful here.
Outline could be:
- What does this class do.
- Where is it used.
There was a problem hiding this comment.
Thanks for the pointer, added the javadocs
| new CountingWritableFrameChannel( | ||
| baseChannel.getWritableChannel(), | ||
| channelCounters, | ||
| null |
There was a problem hiding this comment.
why is empty partition Number passed here ?
There was a problem hiding this comment.
It is passed because the channel can contain multiple partitions, so the partition number is looked upon every frame being written
|
|
||
| /** | ||
| * Encapsulation for ƒrame file footer related operations. The footer must be wrapped in a memory object (the memory | ||
| * can be physical or mmaped). Some verifications are also done on the footer to see if it is not corrupted. |
There was a problem hiding this comment.
nit: lets link the java doc of frameFile to this class so that we can find out the format of the frame footer.
There was a problem hiding this comment.
thanks for the pointer, done!
| final Supplier<Long> channelSizeSupplier = countingOutputStream::getCount; | ||
|
|
||
| final File footerFile = new File(tmpDir, fileName + "_footer"); | ||
| // build supplier for reader the footer of the underlying frame file |
|
|
||
| final Supplier<Long> channelSizeSupplier = countingOutputStream::getCount; | ||
|
|
||
| final File footerFile = new File(tmpDir, fileName + "_footer"); |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression
| // read the footer into a file and map it to memory | ||
| FileUtils.mkdirp(footerFile.getParentFile()); | ||
| Preconditions.checkState(footerFile.createNewFile(), "Unable to create local footer file"); | ||
| try (FileOutputStream footerFileStream = new FileOutputStream(footerFile); |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression
cryptoe
left a comment
There was a problem hiding this comment.
Took a look at the composing channel. Left some comments.
| } | ||
|
|
||
| @Override | ||
| public InputStream readRange(String path, long from, long size) throws IOException |
There was a problem hiding this comment.
nit: from-> start seems better. Feel free to ignore this.
| MultiStageQueryContext.isComposedIntermediateSuperSorterStorageEnabled( | ||
| task.getQuerySpec().getQuery().context() | ||
| ) | ||
| ).put( |
There was a problem hiding this comment.
If the CTX_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE only then CTX_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES should be set no ?
There was a problem hiding this comment.
I think they can work independently - the local bytes limit just enforces a resource limit on the local storage and the composition storage enables joining local disk and durable storage together.
Currently, the CTX_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES is defaulted as Long.MAX_VALUE and is used in the normal flow as well to keep consistency. WDYT?
There was a problem hiding this comment.
I think we should throw an error if one sets CTX_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE and CTX_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES is not set.
What will happen is the users will set this flag thinking stuff is going to work but they would be proved wrong.
As @paul-rogers, throwing errors is a way for people to force reading documentation ^-^
There was a problem hiding this comment.
Yes, I agree with the documentation part but setting CTX_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES is also not enough for composition. The user would also have to set the durable storage setting and all the configurations along with it. And in future, if any memory storage is allowed in composition, then that is needed too.
So, I thought the documentation for the composition parameter would clearly state what all is needed to configure it properly.
| ); | ||
| } | ||
|
|
||
| public static long getIntermediateSuperSorterStorageMaxLocalBytes(final QueryContext queryContext) |
There was a problem hiding this comment.
If isComposedIntermediateSuperSorterStorageEnabled is enabled and getIntermediateSuperSorterStorageMaxLocalBytes is not set ie default value, what will happen?
There was a problem hiding this comment.
The default values for getIntermediateSuperSorterStorageMaxLocalBytes is Long.MAX_VALUE so effectively it will only use local disk as before.
| { | ||
| Preconditions.checkState(byteCount >= 0, "Can't reserve negative bytes"); | ||
| if (currentBytes + byteCount > maxBytes) { | ||
| throw new ResourceLimitExceededException(""); |
There was a problem hiding this comment.
I think there is a todo here.
There was a problem hiding this comment.
Yes, thanks for catching! updated the message.
| import org.apache.druid.query.ResourceLimitExceededException; | ||
|
|
||
| /** | ||
| * Tracks the byte usage with an upper bound bytes limit. Reservaction of bytes beyond limit throws |
There was a problem hiding this comment.
Nit spelling: Reservaction
| * A composed readable channel to read frames. The channel can encapsulate multiple readable channels in it and | ||
| * automatically switches to next channels once the currently read channel is finished. | ||
| */ | ||
| public class ComposingReadableFrameChannel implements ReadableFrameChannel |
There was a problem hiding this comment.
Please mention that this is not threadsafe as well.
There was a problem hiding this comment.
Marked it with NotThreadSafe annotation.
| @Override | ||
| public boolean isFinished() | ||
| { | ||
| initCurrentChannel(); |
There was a problem hiding this comment.
Should we make this class threadSafe ?
There was a problem hiding this comment.
I don't think there can be multiple readers for the same composing readable channel - am I missing something?
| @Override | ||
| public void close() | ||
| { | ||
| if (currentChannel != null) { |
There was a problem hiding this comment.
Shouldnt we close all the "channels here"
Releases any resources associated with this readable channel. After calling this, you should not call any other methods on the channel.
There was a problem hiding this comment.
Same case as writable channels - the previous ones are closed before moving to next and the future ones are lazy.
| } | ||
|
|
||
| try { | ||
| channels.get(currentIndex).get().write(frameWithPartition); |
There was a problem hiding this comment.
We are relying on an exception as a signal to try the next Writer.
So should we enfore a "sorterBySize" ordering in the channel Supplier?
IMHO, can we respond with a boolean if the write fails instead of relying on an exception?
There was a problem hiding this comment.
The ordering is fixed currently in the composing factory - it is always disk followed by durable storage.
I had tried using a custom object/error instead of an exception but that seemed overkill currently since there's only one such case right now. I have added a comment explaining the need and also that we can create custom objects in future if the logic becomes complex
| public void close() throws IOException | ||
| { | ||
| if (currentIndex < channels.size()) { | ||
| channels.get(currentIndex).get().close(); |
There was a problem hiding this comment.
should we close all channels ?
There was a problem hiding this comment.
The write channels are created lazily, so closing the upcoming ones is not needed. And the previous ones are closed before moving to the next one.
…uper_sorter Conflicts: extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
| } | ||
| if (byteTracker != null) { | ||
| byteTracker.release(fileLength); | ||
| // only release the bytes taken by frames, we don't track the header and footer as of now |
There was a problem hiding this comment.
Just curious why was this change needed from the file length to fileLen - header - footer.
There was a problem hiding this comment.
this was needed since we're not tracking header and footer size as of now (due to difficulties with empty frame files). I have added an explanation for this in the code - it could be picked up in the future if needed.
Also, the assumption is that frame data would be much larger than the header + footer sizes.
PR apache#13368 replaced a FrameFile cache with a PartitionedOutputChannel cache. Unfortunately, these objects are heavyweight: each has a writable channel and a frame memory allocator with an 8MB arena. This patch replaces it with a Supplier<PartitionedReadableFrameChannel>, which is lighter weight, and is all that was needed anyway.
Currently, the SuperSorter intermediate data can only be stored on locally mounted disks. Many a times, this leads to MSQ job failures due to limited local space available. To remedy this problem, we want to support composable storage for SuperSorter intermediate data which allows the data to be stored on multiple storage systems through fallbacks.
To implement composable storage for SuperSorter intermediate data, first we need to allow for using durable storage in SuperSorter's intermediate data. For that, now :
Further, now a CompositionOutputChannelFactory is created which takes in an ordered list of OutputChannelFactory. The composition factory writes/reads to/from an output channel only until the channel allows it, and transparently moves ahead to the next output channel factory in the list once the current channel reaches its limits. As of now, the composition can only be made of local storage and durable storage - in future we'd also want to allow for memory storage to be a part of the composition.
The composition factory can be enabled with the boolean context parameter :
composedIntermediateSuperSorterStorageEnabled.Also, to set a limit on the maximum amount of local space SuperSorter's intermediate output can take, we can set the following numeric context parameter :
intermediateSuperSorterStorageMaxLocalBytes.Key changed/added classes in this PR
PartitionedOutputChannelPartitionedReadableFrameChannelFrameFileFooterDurablePartitionedReadableFrameChannelByteTrackerComposingReadableFrameChannelComposingWritableFrameChannelComposingOutputChannelFactoryFrameFileWriterRelease Note
Enable composed storage for SuperSorter intermediate data by enabling
composedIntermediateSuperSorterStorageEnabledcontext parameter. Along with this parameter, the user should also configureintermediateSuperSorterStorageMaxLocalBytesto limit the amount of intermediate data being written. The recommendation is to set this ~ 70% of the disk available to an MSQ task. Finally to allow falling back to the durable storage, the durable storage feature flag and configurations should also be configured.This PR has: