From 58ccdc0453f12ff48040db01584c7f54bbe1c634 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 9 Mar 2023 17:22:02 -0800 Subject: [PATCH] SuperSorter: Avoid storing extra writable channels. PR #13368 replaced a FrameFile cache with a PartitionedOutputChannel cache. Unfortunately, these objects are heavyweight: each has a writable channel and a frame memory allocator with an 8MB arena. This patch replaces it with a Supplier, which is lighter weight, and is all that was needed anyway. --- .../druid/frame/processor/SuperSorter.java | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java b/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java index 1be686be6ffa..73b8f1819cb5 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java @@ -71,6 +71,7 @@ import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.stream.IntStream; /** @@ -145,7 +146,8 @@ public class SuperSorter private List outputChannels = null; @GuardedBy("runWorkersLock") - private final Map levelAndRankToReadableChannelMap = new HashMap<>(); + private final Map> levelAndRankToReadableChannelMap = + new HashMap<>(); @GuardedBy("runWorkersLock") private int activeProcessors = 0; @@ -511,9 +513,7 @@ private boolean runNextMiddleMerger() if (inputsReady.remove(i)) { String levelAndRankKey = mergerOutputFileName(inLevel, i); PartitionedReadableFrameChannel partitionedReadableFrameChannel = - levelAndRankToReadableChannelMap.remove(levelAndRankKey) - .getReadableChannelSupplier() - .get(); + levelAndRankToReadableChannelMap.remove(levelAndRankKey).get(); in.add(partitionedReadableFrameChannel.getReadableFrameChannel(0)); partitionedReadableFrameChannels.add(partitionedReadableFrameChannel); } @@ -569,7 +569,6 @@ private boolean runNextUltimateMerger() for (long i = 0; i < numInputs; i++) { in.add( levelAndRankToReadableChannelMap.get(mergerOutputFileName(inLevel, i)) - .getReadableChannelSupplier() .get() .getReadableFrameChannel(ultimateMergersRunSoFar) ); @@ -599,19 +598,19 @@ private void runMerger( String levelAndRankKey = mergerOutputFileName(level, rank); if (totalMergingLevels != UNKNOWN_LEVEL && level == totalMergingLevels - 1) { + // Ultimate output. final int intRank = Ints.checkedCast(rank); final OutputChannel outputChannel = outputChannelFactory.openChannel(intRank); outputChannels.set(intRank, outputChannel.readOnly()); frameAllocatorFactory = new SingleMemoryAllocatorFactory(outputChannel.getFrameMemoryAllocator()); writableChannel = outputChannel.getWritableChannel(); } else { - PartitionedOutputChannel partitionedOutputChannel = intermediateOutputChannelFactory.openPartitionedChannel( - levelAndRankKey, - true - ); + // Inner channels, between mergers in the merge hierarchy. + PartitionedOutputChannel partitionedOutputChannel = + intermediateOutputChannelFactory.openPartitionedChannel(levelAndRankKey, true); writableChannel = partitionedOutputChannel.getWritableChannel(); frameAllocatorFactory = new SingleMemoryAllocatorFactory(partitionedOutputChannel.getFrameMemoryAllocator()); - levelAndRankToReadableChannelMap.put(levelAndRankKey, partitionedOutputChannel); + levelAndRankToReadableChannelMap.put(levelAndRankKey, partitionedOutputChannel.getReadableChannelSupplier()); } final FrameChannelMerger worker = @@ -802,10 +801,10 @@ private void cleanUp() outputsReadyByLevel.clear(); inputBuffer.clear(); - for (Map.Entry cleanupEntry : + for (Map.Entry> cleanupEntry : levelAndRankToReadableChannelMap.entrySet()) { try { - cleanupEntry.getValue().getReadableChannelSupplier().get().close(); + cleanupEntry.getValue().get().close(); } catch (IOException e) { throw new UncheckedIOException("Unable to close channel for name : " + cleanupEntry.getKey(), e);