diff --git a/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java b/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java index 7f2a61e437e6..c576c236961e 100644 --- a/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java +++ b/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java @@ -22,6 +22,9 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.frame.processor.OutputChannel; +import org.apache.druid.frame.processor.PartitionedOutputChannel; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.ResourceLimitExceededException; @@ -38,16 +41,29 @@ */ public class ComposingWritableFrameChannel implements WritableFrameChannel { - private final List> channels; + @Nullable + private final List> outputChannelSuppliers; + + @Nullable + private final List> partitionedOutputChannelSuppliers; + + private final List> writableChannelSuppliers; private final Map> partitionToChannelMap; private int currentIndex; public ComposingWritableFrameChannel( - List> channels, + @Nullable List> outputChannelSuppliers, + @Nullable List> partitionedOutputChannelSuppliers, + List> writableChannelSuppliers, Map> partitionToChannelMap ) { - this.channels = Preconditions.checkNotNull(channels, "channels is null"); + if (outputChannelSuppliers != null && partitionedOutputChannelSuppliers != null) { + throw new IAE("Atmost one of outputChannelSuppliers and partitionedOutputChannelSuppliers can be provided"); + } + this.outputChannelSuppliers = outputChannelSuppliers; + this.partitionedOutputChannelSuppliers = partitionedOutputChannelSuppliers; + this.writableChannelSuppliers = Preconditions.checkNotNull(writableChannelSuppliers, "writableChannelSuppliers is null"); this.partitionToChannelMap = Preconditions.checkNotNull(partitionToChannelMap, "partitionToChannelMap is null"); this.currentIndex = 0; @@ -56,12 +72,12 @@ public ComposingWritableFrameChannel( @Override public void write(FrameWithPartition frameWithPartition) throws IOException { - if (currentIndex >= channels.size()) { - throw new ISE("No more channels available to write. Total available channels : " + channels.size()); + if (currentIndex >= writableChannelSuppliers.size()) { + throw new ISE("No more channels available to write. Total available channels : " + writableChannelSuppliers.size()); } try { - channels.get(currentIndex).get().write(frameWithPartition); + writableChannelSuppliers.get(currentIndex).get().write(frameWithPartition); partitionToChannelMap.computeIfAbsent(frameWithPartition.partition(), k -> Sets.newHashSetWithExpectedSize(1)) .add(currentIndex); } @@ -70,9 +86,19 @@ public void write(FrameWithPartition frameWithPartition) throws IOException // exception is automatically passed up to the user incase all the channels are exhausted. If in future, more // cases come up to dictate control flow, then we can switch to returning a custom object from the channel's write // operation. - channels.get(currentIndex).get().close(); + writableChannelSuppliers.get(currentIndex).get().close(); + + // We are converting the corresponding channel to read only after exhausting it because that channel won't be used + // for writes anymore + if (outputChannelSuppliers != null) { + outputChannelSuppliers.get(currentIndex).get().convertToReadOnly(); + } + if (partitionedOutputChannelSuppliers != null) { + partitionedOutputChannelSuppliers.get(currentIndex).get().convertToReadOnly(); + } + currentIndex++; - if (currentIndex >= channels.size()) { + if (currentIndex >= writableChannelSuppliers.size()) { throw rlee; } write(frameWithPartition); @@ -82,7 +108,7 @@ public void write(FrameWithPartition frameWithPartition) throws IOException @Override public void fail(@Nullable Throwable cause) throws IOException { - for (Supplier channel : channels) { + for (Supplier channel : writableChannelSuppliers) { channel.get().fail(cause); } } @@ -90,21 +116,21 @@ public void fail(@Nullable Throwable cause) throws IOException @Override public void close() throws IOException { - if (currentIndex < channels.size()) { - channels.get(currentIndex).get().close(); - currentIndex = channels.size(); + if (currentIndex < writableChannelSuppliers.size()) { + writableChannelSuppliers.get(currentIndex).get().close(); + currentIndex = writableChannelSuppliers.size(); } } @Override public boolean isClosed() { - return currentIndex == channels.size(); + return currentIndex == writableChannelSuppliers.size(); } @Override public ListenableFuture writabilityFuture() { - return channels.get(currentIndex).get().writabilityFuture(); + return writableChannelSuppliers.get(currentIndex).get().writabilityFuture(); } } diff --git a/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java b/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java index 8ca9aa4f6fa9..cf94262ac353 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java @@ -60,6 +60,7 @@ public OutputChannel openChannel(int partitionNumber) { ImmutableList.Builder> writableFrameChannelSuppliersBuilder = ImmutableList.builder(); ImmutableList.Builder> readableFrameChannelSuppliersBuilder = ImmutableList.builder(); + ImmutableList.Builder> outputChannelSupplierBuilder = ImmutableList.builder(); for (OutputChannelFactory channelFactory : channelFactories) { // open channel lazily Supplier channel = @@ -71,14 +72,19 @@ public OutputChannel openChannel(int partitionNumber) throw new UncheckedIOException(e); } })::get; + outputChannelSupplierBuilder.add(channel); writableFrameChannelSuppliersBuilder.add(() -> channel.get().getWritableChannel()); - readableFrameChannelSuppliersBuilder.add(() -> channel.get().getReadableChannelSupplier().get()); + // We read the output channel once they have been written to, and therefore it is space efficient and safe to + // save their read only copies + readableFrameChannelSuppliersBuilder.add(() -> channel.get().readOnly().getReadableChannelSupplier().get()); } // the map maintains a mapping of channels which have the data for a given partition. // it is useful to identify the readable channels to open in the composition while reading the partition data. Map> partitionToChannelMap = new HashMap<>(); ComposingWritableFrameChannel writableFrameChannel = new ComposingWritableFrameChannel( + outputChannelSupplierBuilder.build(), + null, writableFrameChannelSuppliersBuilder.build(), partitionToChannelMap ); @@ -103,6 +109,7 @@ public PartitionedOutputChannel openPartitionedChannel(String name, boolean dele ImmutableList.Builder> writableFrameChannelsBuilder = ImmutableList.builder(); ImmutableList.Builder> readableFrameChannelSuppliersBuilder = ImmutableList.builder(); + ImmutableList.Builder> partitionedOutputChannelSupplierBuilder = ImmutableList.builder(); for (OutputChannelFactory channelFactory : channelFactories) { Supplier channel = Suppliers.memoize(() -> { @@ -113,14 +120,19 @@ public PartitionedOutputChannel openPartitionedChannel(String name, boolean dele throw new UncheckedIOException(e); } })::get; + partitionedOutputChannelSupplierBuilder.add(channel); writableFrameChannelsBuilder.add(() -> channel.get().getWritableChannel()); - readableFrameChannelSuppliersBuilder.add(() -> channel.get().getReadableChannelSupplier().get()); + // We read the output channel once they have been written to, and therefore it is space efficient and safe to + // save their read only copies + readableFrameChannelSuppliersBuilder.add(() -> channel.get().readOnly().getReadableChannelSupplier().get()); } // the map maintains a mapping of channels which have the data for a given partition. // it is useful to identify the readable channels to open in the composition while reading the partition data. Map> partitionToChannelMap = new HashMap<>(); ComposingWritableFrameChannel writableFrameChannel = new ComposingWritableFrameChannel( + null, + partitionedOutputChannelSupplierBuilder.build(), writableFrameChannelsBuilder.build(), partitionToChannelMap ); diff --git a/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java b/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java index ac0e0a5fac5d..e1377eddca35 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Suppliers; +import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.frame.allocation.MemoryAllocator; import org.apache.druid.frame.channel.FrameWithPartition; import org.apache.druid.frame.channel.ReadableFrameChannel; @@ -42,11 +43,16 @@ */ public class OutputChannel { + @GuardedBy("this") @Nullable - private final WritableFrameChannel writableChannel; + private WritableFrameChannel writableChannel; + + @GuardedBy("this") @Nullable - private final MemoryAllocator frameMemoryAllocator; + private MemoryAllocator frameMemoryAllocator; + private final Supplier readableChannelSupplier; + private final boolean readableChannelUsableWhileWriting; private final int partitionNumber; @@ -157,12 +163,14 @@ public static OutputChannel nil(final int partitionNumber) } /** - * Returns the writable channel of this pair. The producer writes to this channel. + * Returns the writable channel of this pair. The producer writes to this channel. Throws ISE if the output channel is + * read only. */ - public WritableFrameChannel getWritableChannel() + public synchronized WritableFrameChannel getWritableChannel() { if (writableChannel == null) { - throw new ISE("Writable channel is not available"); + throw new ISE("Writable channel is not available. The output channel might be marked as read-only," + + " hence no writes are allowed."); } else { return writableChannel; } @@ -170,11 +178,13 @@ public WritableFrameChannel getWritableChannel() /** * Returns the memory allocator for the writable channel. The producer uses this to generate frames for the channel. + * Throws ISE if the output channel is read only. */ - public MemoryAllocator getFrameMemoryAllocator() + public synchronized MemoryAllocator getFrameMemoryAllocator() { if (frameMemoryAllocator == null) { - throw new ISE("Writable channel is not available"); + throw new ISE("Frame allocator is not available. The output channel might be marked as read-only," + + " hence memory allocator is not required."); } else { return frameMemoryAllocator; } @@ -197,7 +207,7 @@ public ReadableFrameChannel getReadableChannel() /** * Whether {@link #getReadableChannel()} is ready to use. */ - public boolean isReadableChannelReady() + public synchronized boolean isReadableChannelReady() { return readableChannelUsableWhileWriting || writableChannel == null || writableChannel.isClosed(); } @@ -212,7 +222,7 @@ public int getPartitionNumber() return partitionNumber; } - public OutputChannel mapWritableChannel(final Function mapFn) + public synchronized OutputChannel mapWritableChannel(final Function mapFn) { if (writableChannel == null) { return this; @@ -235,4 +245,14 @@ public OutputChannel readOnly() { return OutputChannel.readOnly(readableChannelSupplier, partitionNumber); } + + /** + * Removes the reference to the {@link #writableChannel} and {@link #frameMemoryAllocator} from the object, making + * it more efficient + */ + public synchronized void convertToReadOnly() + { + this.writableChannel = null; + this.frameMemoryAllocator = null; + } } diff --git a/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java b/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java index 3e455545b049..34ad2c232342 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Suppliers; +import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.frame.allocation.MemoryAllocator; import org.apache.druid.frame.channel.PartitionedReadableFrameChannel; import org.apache.druid.frame.channel.WritableFrameChannel; @@ -37,10 +38,15 @@ */ public class PartitionedOutputChannel { + + @GuardedBy("this") @Nullable - private final WritableFrameChannel writableChannel; + private WritableFrameChannel writableChannel; + + @GuardedBy("this") @Nullable - private final MemoryAllocator frameMemoryAllocator; + private MemoryAllocator frameMemoryAllocator; + private final Supplier readableChannelSupplier; private PartitionedOutputChannel( @@ -76,12 +82,14 @@ public static PartitionedOutputChannel pair( } /** - * Returns the writable channel of this pair. The producer writes to this channel. + * Returns the writable channel of this pair. The producer writes to this channel. Throws ISE if the output channel is + * read only. */ - public WritableFrameChannel getWritableChannel() + public synchronized WritableFrameChannel getWritableChannel() { if (writableChannel == null) { - throw new ISE("Writable channel is not available"); + throw new ISE("Writable channel is not available. The output channel might be marked as read-only," + + " hence no writes are allowed."); } else { return writableChannel; } @@ -89,11 +97,13 @@ public WritableFrameChannel getWritableChannel() /** * Returns the memory allocator for the writable channel. The producer uses this to generate frames for the channel. + * Throws ISE if the output channel is read only. */ - public MemoryAllocator getFrameMemoryAllocator() + public synchronized MemoryAllocator getFrameMemoryAllocator() { if (frameMemoryAllocator == null) { - throw new ISE("Writable channel is not available"); + throw new ISE("Frame allocator is not available. The output channel might be marked as read-only," + + " hence memory allocator is not required."); } else { return frameMemoryAllocator; } @@ -102,12 +112,12 @@ public MemoryAllocator getFrameMemoryAllocator() /** * Returns the partitioned readable channel supplier of this pair. The consumer reads from this channel. */ - public Supplier getReadableChannelSupplier() + public synchronized Supplier getReadableChannelSupplier() { return readableChannelSupplier; } - public PartitionedOutputChannel mapWritableChannel(final Function mapFn) + public synchronized PartitionedOutputChannel mapWritableChannel(final Function mapFn) { if (writableChannel == null) { return this; @@ -119,4 +129,24 @@ public PartitionedOutputChannel mapWritableChannel(final Function readableFrameChannelSupplier1 = () -> null; + Supplier readableFrameChannelSupplier2 = () -> null; + + OutputChannel outputChannel1 = OutputChannel.pair( + writableFrameChannel1, + ArenaMemoryAllocator.createOnHeap(1), + readableFrameChannelSupplier1, + 1 + ); + OutputChannel outputChannel2 = OutputChannel.pair( + writableFrameChannel2, + ArenaMemoryAllocator.createOnHeap(1), + readableFrameChannelSupplier2, + 2 + ); + + Map> partitionToChannelMap = new HashMap<>(); + + ComposingWritableFrameChannel composingWritableFrameChannel = new ComposingWritableFrameChannel( + ImmutableList.of( + () -> outputChannel1, + () -> outputChannel2 + ), + null, + ImmutableList.of( + () -> writableFrameChannel1, + () -> writableFrameChannel2 + ), + partitionToChannelMap + ); + + composingWritableFrameChannel.write(new FrameWithPartition(Mockito.mock(Frame.class), 1)); + composingWritableFrameChannel.write(new FrameWithPartition(Mockito.mock(Frame.class), 2)); + composingWritableFrameChannel.write(new FrameWithPartition(Mockito.mock(Frame.class), 3)); + + // Assert the location of the channels where the frames have been written to + Assert.assertEquals(ImmutableSet.of(0), partitionToChannelMap.get(1)); + Assert.assertEquals(ImmutableSet.of(0), partitionToChannelMap.get(2)); + Assert.assertEquals(ImmutableSet.of(1), partitionToChannelMap.get(3)); + + // Test if the older channel has been converted to read only + Assert.assertThrows(ISE.class, outputChannel1::getWritableChannel); + } + + static class LimitedWritableFrameChannel implements WritableFrameChannel + { + private final int maxFrames; + private int curFrame = 0; + + public LimitedWritableFrameChannel(int maxFrames) + { + this.maxFrames = maxFrames; + } + + @Override + public void write(FrameWithPartition frameWithPartition) + { + if (curFrame >= maxFrames) { + throw new ResourceLimitExceededException("Cannot write more frames to the channel"); + } + ++curFrame; + } + + @Override + public void write(Frame frame) + { + } + + @Override + public void fail(@Nullable Throwable cause) + { + + } + + @Override + public void close() + { + + } + + @Override + public boolean isClosed() + { + return false; + } + + @Override + public ListenableFuture writabilityFuture() + { + return null; + } + } +} diff --git a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java index ac0b03b286df..6e5b904ab3f9 100644 --- a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java +++ b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java @@ -44,14 +44,16 @@ public void test_nil() final IllegalStateException e1 = Assert.assertThrows(IllegalStateException.class, channel::getWritableChannel); MatcherAssert.assertThat( e1, - ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable channel is not available")) + ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo( + "Writable channel is not available. The output channel might be marked as read-only, hence no writes are allowed.")) ); // No writable channel: cannot call getFrameMemoryAllocator. final IllegalStateException e2 = Assert.assertThrows(IllegalStateException.class, channel::getFrameMemoryAllocator); MatcherAssert.assertThat( e2, - ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable channel is not available")) + ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo( + "Frame allocator is not available. The output channel might be marked as read-only, hence memory allocator is not required.")) ); // Mapping the writable channel of a nil channel has no effect, because there is no writable channel. diff --git a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java index dbef92b047af..a22c7e92bfce 100644 --- a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java +++ b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java @@ -86,7 +86,8 @@ public void test_readOnly() MatcherAssert.assertThat( e, - ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable channel is not available")) + ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo( + "Writable channel is not available. The output channel might be marked as read-only, hence no writes are allowed.")) ); final IllegalStateException e2 = Assert.assertThrows( @@ -96,7 +97,8 @@ public void test_readOnly() MatcherAssert.assertThat( e2, - ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable channel is not available")) + ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo( + "Frame allocator is not available. The output channel might be marked as read-only, hence memory allocator is not required.")) ); } }