From 76696ad300a4c532b9cd4360cf551226bc062815 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 30 Jan 2023 12:48:54 +0530 Subject: [PATCH 01/14] readOnly for PartitionedOutputChannel --- .../frame/processor/PartitionedOutputChannel.java | 10 ++++++++++ .../org/apache/druid/frame/processor/SuperSorter.java | 5 ++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java b/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java index 3e455545b049..d6ed118ad5e5 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java @@ -119,4 +119,14 @@ public PartitionedOutputChannel mapWritableChannel(final Function Date: Fri, 24 Feb 2023 15:50:02 +0530 Subject: [PATCH 02/14] add readonly when memoizing the composing channel factory --- .../druid/frame/processor/ComposingOutputChannelFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java b/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java index 8ca9aa4f6fa9..9378756cdebc 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java @@ -114,7 +114,7 @@ public PartitionedOutputChannel openPartitionedChannel(String name, boolean dele } })::get; writableFrameChannelsBuilder.add(() -> channel.get().getWritableChannel()); - readableFrameChannelSuppliersBuilder.add(() -> channel.get().getReadableChannelSupplier().get()); + readableFrameChannelSuppliersBuilder.add(() -> channel.get().readOnly().getReadableChannelSupplier().get()); } // the map maintains a mapping of channels which have the data for a given partition. // it is useful to identify the readable channels to open in the composition while reading the partition data. From 7bb3dae16dafb21f81fb67d6dbb8f217c9d53790 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 1 Mar 2023 10:21:29 +0530 Subject: [PATCH 03/14] reduce memory footprint of composing writable channel --- .../ComposingWritableFrameChannel.java | 50 ++++++++++++++----- .../ComposingOutputChannelFactory.java | 8 +++ .../druid/frame/processor/OutputChannel.java | 26 ++++++++-- .../processor/PartitionedOutputChannel.java | 28 ++++++++--- 4 files changed, 89 insertions(+), 23 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java b/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java index b9ca905ed2bf..19bc72d03cb8 100644 --- a/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java +++ b/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java @@ -22,6 +22,9 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.frame.processor.OutputChannel; +import org.apache.druid.frame.processor.PartitionedOutputChannel; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.ResourceLimitExceededException; @@ -38,16 +41,29 @@ */ public class ComposingWritableFrameChannel implements WritableFrameChannel { - private final List> channels; + @Nullable + private final List> outputChannelSuppliers; + + @Nullable + private final List> partitionedOutputChannelSuppliers; + + private final List> writableChannelSuppliers; private final Map> partitionToChannelMap; private int currentIndex; public ComposingWritableFrameChannel( - List> channels, + @Nullable List> outputChannelSuppliers, + @Nullable List> partitionedOutputChannelSuppliers, + List> writableChannelSuppliers, Map> partitionToChannelMap ) { - this.channels = Preconditions.checkNotNull(channels, "channels is null"); + if (outputChannelSuppliers != null && partitionedOutputChannelSuppliers != null) { + throw new IAE("Atmost one of outputChannelSuppliers and partitionedOutputChannelSuppliers can be provided"); + } + this.outputChannelSuppliers = outputChannelSuppliers; + this.partitionedOutputChannelSuppliers = partitionedOutputChannelSuppliers; + this.writableChannelSuppliers = Preconditions.checkNotNull(writableChannelSuppliers, "writableChannelSuppliers is null"); this.partitionToChannelMap = Preconditions.checkNotNull(partitionToChannelMap, "partitionToChannelMap is null"); this.currentIndex = 0; @@ -56,12 +72,12 @@ public ComposingWritableFrameChannel( @Override public void write(FrameWithPartition frameWithPartition) throws IOException { - if (currentIndex >= channels.size()) { - throw new ISE("No more channels available to write. Total available channels : " + channels.size()); + if (currentIndex >= writableChannelSuppliers.size()) { + throw new ISE("No more channels available to write. Total available channels : " + writableChannelSuppliers.size()); } try { - channels.get(currentIndex).get().write(frameWithPartition); + writableChannelSuppliers.get(currentIndex).get().write(frameWithPartition); partitionToChannelMap.computeIfAbsent(frameWithPartition.partition(), k -> Sets.newHashSetWithExpectedSize(1)) .add(currentIndex); } @@ -70,9 +86,19 @@ public void write(FrameWithPartition frameWithPartition) throws IOException // exception is automatically passed up to the user incase all the channels are exhausted. If in future, more // cases come up to dictate control flow, then we can switch to returning a custom object from the channel's write // operation. - channels.get(currentIndex).get().close(); + writableChannelSuppliers.get(currentIndex).get().close(); + + // We are converting the corresponding channel to read only after exhausting it because that channel won't be used + // for writes anymore + if (outputChannelSuppliers != null) { + outputChannelSuppliers.get(currentIndex).get().convertToReadOnly(); + } + if (partitionedOutputChannelSuppliers != null) { + partitionedOutputChannelSuppliers.get(currentIndex).get().convertToReadOnly(); + } + currentIndex++; - if (currentIndex >= channels.size()) { + if (currentIndex >= writableChannelSuppliers.size()) { throw rlee; } write(frameWithPartition); @@ -82,7 +108,7 @@ public void write(FrameWithPartition frameWithPartition) throws IOException @Override public void fail(@Nullable Throwable cause) throws IOException { - for (Supplier channel : channels) { + for (Supplier channel : writableChannelSuppliers) { channel.get().fail(cause); } } @@ -90,14 +116,14 @@ public void fail(@Nullable Throwable cause) throws IOException @Override public void close() throws IOException { - if (currentIndex < channels.size()) { - channels.get(currentIndex).get().close(); + if (currentIndex < writableChannelSuppliers.size()) { + writableChannelSuppliers.get(currentIndex).get().close(); } } @Override public ListenableFuture writabilityFuture() { - return channels.get(currentIndex).get().writabilityFuture(); + return writableChannelSuppliers.get(currentIndex).get().writabilityFuture(); } } diff --git a/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java b/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java index 9378756cdebc..4b77a740ece6 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java @@ -60,6 +60,7 @@ public OutputChannel openChannel(int partitionNumber) { ImmutableList.Builder> writableFrameChannelSuppliersBuilder = ImmutableList.builder(); ImmutableList.Builder> readableFrameChannelSuppliersBuilder = ImmutableList.builder(); + ImmutableList.Builder> outputChannelSupplierBuilder = ImmutableList.builder(); for (OutputChannelFactory channelFactory : channelFactories) { // open channel lazily Supplier channel = @@ -71,6 +72,7 @@ public OutputChannel openChannel(int partitionNumber) throw new UncheckedIOException(e); } })::get; + outputChannelSupplierBuilder.add(channel); writableFrameChannelSuppliersBuilder.add(() -> channel.get().getWritableChannel()); readableFrameChannelSuppliersBuilder.add(() -> channel.get().getReadableChannelSupplier().get()); } @@ -79,6 +81,8 @@ public OutputChannel openChannel(int partitionNumber) // it is useful to identify the readable channels to open in the composition while reading the partition data. Map> partitionToChannelMap = new HashMap<>(); ComposingWritableFrameChannel writableFrameChannel = new ComposingWritableFrameChannel( + outputChannelSupplierBuilder.build(), + null, writableFrameChannelSuppliersBuilder.build(), partitionToChannelMap ); @@ -103,6 +107,7 @@ public PartitionedOutputChannel openPartitionedChannel(String name, boolean dele ImmutableList.Builder> writableFrameChannelsBuilder = ImmutableList.builder(); ImmutableList.Builder> readableFrameChannelSuppliersBuilder = ImmutableList.builder(); + ImmutableList.Builder> partitionedOutputChannelSupplierBuilder = ImmutableList.builder(); for (OutputChannelFactory channelFactory : channelFactories) { Supplier channel = Suppliers.memoize(() -> { @@ -113,6 +118,7 @@ public PartitionedOutputChannel openPartitionedChannel(String name, boolean dele throw new UncheckedIOException(e); } })::get; + partitionedOutputChannelSupplierBuilder.add(channel); writableFrameChannelsBuilder.add(() -> channel.get().getWritableChannel()); readableFrameChannelSuppliersBuilder.add(() -> channel.get().readOnly().getReadableChannelSupplier().get()); } @@ -121,6 +127,8 @@ public PartitionedOutputChannel openPartitionedChannel(String name, boolean dele Map> partitionToChannelMap = new HashMap<>(); ComposingWritableFrameChannel writableFrameChannel = new ComposingWritableFrameChannel( + null, + partitionedOutputChannelSupplierBuilder.build(), writableFrameChannelsBuilder.build(), partitionToChannelMap ); diff --git a/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java b/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java index 779fa19c73a5..875b2314353c 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Suppliers; +import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.frame.allocation.MemoryAllocator; import org.apache.druid.frame.channel.FrameWithPartition; import org.apache.druid.frame.channel.ReadableFrameChannel; @@ -42,11 +43,16 @@ */ public class OutputChannel { + @GuardedBy("this") @Nullable - private final WritableFrameChannel writableChannel; + private WritableFrameChannel writableChannel; + + @GuardedBy("this") @Nullable - private final MemoryAllocator frameMemoryAllocator; + private MemoryAllocator frameMemoryAllocator; + private final Supplier readableChannelSupplier; + private final int partitionNumber; private OutputChannel( @@ -102,7 +108,7 @@ public static OutputChannel nil(final int partitionNumber) /** * Returns the writable channel of this pair. The producer writes to this channel. */ - public WritableFrameChannel getWritableChannel() + public synchronized WritableFrameChannel getWritableChannel() { if (writableChannel == null) { throw new ISE("Writable channel is not available"); @@ -114,7 +120,7 @@ public WritableFrameChannel getWritableChannel() /** * Returns the memory allocator for the writable channel. The producer uses this to generate frames for the channel. */ - public MemoryAllocator getFrameMemoryAllocator() + public synchronized MemoryAllocator getFrameMemoryAllocator() { if (frameMemoryAllocator == null) { throw new ISE("Writable channel is not available"); @@ -142,7 +148,7 @@ public int getPartitionNumber() return partitionNumber; } - public OutputChannel mapWritableChannel(final Function mapFn) + public synchronized OutputChannel mapWritableChannel(final Function mapFn) { if (writableChannel == null) { return this; @@ -164,4 +170,14 @@ public OutputChannel readOnly() { return new OutputChannel(null, null, readableChannelSupplier, partitionNumber); } + + /** + * Removes the reference to the {@link #writableChannel} and {@link #frameMemoryAllocator} from the object, making + * it more efficient + */ + public synchronized void convertToReadOnly() + { + this.writableChannel = null; + this.frameMemoryAllocator = null; + } } diff --git a/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java b/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java index d6ed118ad5e5..4fda503a33ce 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Suppliers; +import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.frame.allocation.MemoryAllocator; import org.apache.druid.frame.channel.PartitionedReadableFrameChannel; import org.apache.druid.frame.channel.WritableFrameChannel; @@ -37,10 +38,15 @@ */ public class PartitionedOutputChannel { + + @GuardedBy("this") @Nullable - private final WritableFrameChannel writableChannel; + private WritableFrameChannel writableChannel; + + @GuardedBy("this") @Nullable - private final MemoryAllocator frameMemoryAllocator; + private MemoryAllocator frameMemoryAllocator; + private final Supplier readableChannelSupplier; private PartitionedOutputChannel( @@ -78,7 +84,7 @@ public static PartitionedOutputChannel pair( /** * Returns the writable channel of this pair. The producer writes to this channel. */ - public WritableFrameChannel getWritableChannel() + public synchronized WritableFrameChannel getWritableChannel() { if (writableChannel == null) { throw new ISE("Writable channel is not available"); @@ -90,7 +96,7 @@ public WritableFrameChannel getWritableChannel() /** * Returns the memory allocator for the writable channel. The producer uses this to generate frames for the channel. */ - public MemoryAllocator getFrameMemoryAllocator() + public synchronized MemoryAllocator getFrameMemoryAllocator() { if (frameMemoryAllocator == null) { throw new ISE("Writable channel is not available"); @@ -102,12 +108,12 @@ public MemoryAllocator getFrameMemoryAllocator() /** * Returns the partitioned readable channel supplier of this pair. The consumer reads from this channel. */ - public Supplier getReadableChannelSupplier() + public synchronized Supplier getReadableChannelSupplier() { return readableChannelSupplier; } - public PartitionedOutputChannel mapWritableChannel(final Function mapFn) + public synchronized PartitionedOutputChannel mapWritableChannel(final Function mapFn) { if (writableChannel == null) { return this; @@ -129,4 +135,14 @@ public PartitionedOutputChannel readOnly() { return new PartitionedOutputChannel(null, null, readableChannelSupplier); } + + /** + * Removes the reference to the {@link #writableChannel} and {@link #frameMemoryAllocator} from the object, making + * it more efficient + */ + public synchronized void convertToReadOnly() + { + this.writableChannel = null; + this.frameMemoryAllocator = null; + } } From c94da3c658e19e120917b90dce9de9c5d43ee961 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 6 Mar 2023 14:56:48 +0530 Subject: [PATCH 04/14] add uts for writable channel switching --- .../druid/frame/processor/OutputChannel.java | 10 +- .../processor/PartitionedOutputChannel.java | 10 +- .../ComposingWritableFrameChannelTest.java | 135 ++++++++++++++++++ 3 files changed, 149 insertions(+), 6 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java diff --git a/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java b/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java index 875b2314353c..a33e494a7166 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java @@ -106,12 +106,14 @@ public static OutputChannel nil(final int partitionNumber) } /** - * Returns the writable channel of this pair. The producer writes to this channel. + * Returns the writable channel of this pair. The producer writes to this channel. Throws ISE if the output channel is + * read only. */ public synchronized WritableFrameChannel getWritableChannel() { if (writableChannel == null) { - throw new ISE("Writable channel is not available"); + throw new ISE("Writable channel is not available. The output channel might be marked as read-only," + + " hence no writes are allowed."); } else { return writableChannel; } @@ -119,11 +121,13 @@ public synchronized WritableFrameChannel getWritableChannel() /** * Returns the memory allocator for the writable channel. The producer uses this to generate frames for the channel. + * Throws ISE if the output channel is read only. */ public synchronized MemoryAllocator getFrameMemoryAllocator() { if (frameMemoryAllocator == null) { - throw new ISE("Writable channel is not available"); + throw new ISE("Frame allocator is not available. The output channel might be marked as read-only," + + " hence memory allocator is not required."); } else { return frameMemoryAllocator; } diff --git a/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java b/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java index 4fda503a33ce..34ad2c232342 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java @@ -82,12 +82,14 @@ public static PartitionedOutputChannel pair( } /** - * Returns the writable channel of this pair. The producer writes to this channel. + * Returns the writable channel of this pair. The producer writes to this channel. Throws ISE if the output channel is + * read only. */ public synchronized WritableFrameChannel getWritableChannel() { if (writableChannel == null) { - throw new ISE("Writable channel is not available"); + throw new ISE("Writable channel is not available. The output channel might be marked as read-only," + + " hence no writes are allowed."); } else { return writableChannel; } @@ -95,11 +97,13 @@ public synchronized WritableFrameChannel getWritableChannel() /** * Returns the memory allocator for the writable channel. The producer uses this to generate frames for the channel. + * Throws ISE if the output channel is read only. */ public synchronized MemoryAllocator getFrameMemoryAllocator() { if (frameMemoryAllocator == null) { - throw new ISE("Writable channel is not available"); + throw new ISE("Frame allocator is not available. The output channel might be marked as read-only," + + " hence memory allocator is not required."); } else { return frameMemoryAllocator; } diff --git a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java new file mode 100644 index 000000000000..537b8152c883 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.channel; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.allocation.ArenaMemoryAllocator; +import org.apache.druid.frame.processor.OutputChannel; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.ResourceLimitExceededException; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.function.Supplier; + + +public class ComposingWritableFrameChannelTest +{ + @Test + public void testComposingWritableChannelSwitchesProperly() throws IOException + { + + // This frame channel writes a single frame + WritableFrameChannel writableFrameChannel1 = new LimitedWritableFrameChannel(2); + WritableFrameChannel writableFrameChannel2 = new LimitedWritableFrameChannel(100); + + Supplier readableFrameChannelSupplier1 = () -> null; + Supplier readableFrameChannelSupplier2 = () -> null; + + OutputChannel outputChannel1 = OutputChannel.pair( + writableFrameChannel1, + ArenaMemoryAllocator.createOnHeap(1), + readableFrameChannelSupplier1, + 1 + ); + OutputChannel outputChannel2 = OutputChannel.pair( + writableFrameChannel2, + ArenaMemoryAllocator.createOnHeap(1), + readableFrameChannelSupplier2, + 2 + ); + + Map> partitionToChannelMap = new HashMap(); + + ComposingWritableFrameChannel composingWritableFrameChannel = new ComposingWritableFrameChannel( + ImmutableList.of( + () -> outputChannel1, + () -> outputChannel2 + ), + null, + ImmutableList.of( + () -> writableFrameChannel1, + () -> writableFrameChannel2 + ), + partitionToChannelMap + ); + + composingWritableFrameChannel.write(new FrameWithPartition(Mockito.mock(Frame.class), 1)); + composingWritableFrameChannel.write(new FrameWithPartition(Mockito.mock(Frame.class), 2)); + composingWritableFrameChannel.write(new FrameWithPartition(Mockito.mock(Frame.class), 3)); + + partitionToChannelMap.get(0); + + // Assert the location of the channels where the frames have been written to + Assert.assertEquals(ImmutableSet.of(0), partitionToChannelMap.get(1)); + Assert.assertEquals(ImmutableSet.of(0), partitionToChannelMap.get(2)); + Assert.assertEquals(ImmutableSet.of(1), partitionToChannelMap.get(3)); + + // Test if the older channel has been converted to read only + Assert.assertThrows(ISE.class, () -> outputChannel1.getWritableChannel()); + } + + static class LimitedWritableFrameChannel implements WritableFrameChannel + { + private final int maxFrames; + private int curFrame = 0; + + public LimitedWritableFrameChannel(int maxFrames) + { + this.maxFrames = maxFrames; + } + + @Override + public void write(FrameWithPartition frameWithPartition) throws IOException + { + if (curFrame >= maxFrames) { + throw new ResourceLimitExceededException("Cannot write more frames to the channel"); + } + ++curFrame; + } + + @Override + public void fail(@Nullable Throwable cause) throws IOException + { + + } + + @Override + public void close() throws IOException + { + + } + + @Override + public ListenableFuture writabilityFuture() + { + return null; + } + } +} From d3fa5840f3bf3601ad9f96dc8f4321fa2a225669 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 6 Mar 2023 15:04:41 +0530 Subject: [PATCH 05/14] cleanup unnecessary line --- .../druid/frame/channel/ComposingWritableFrameChannelTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java index 537b8152c883..795c8a86b2e1 100644 --- a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java +++ b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java @@ -84,8 +84,6 @@ public void testComposingWritableChannelSwitchesProperly() throws IOException composingWritableFrameChannel.write(new FrameWithPartition(Mockito.mock(Frame.class), 2)); composingWritableFrameChannel.write(new FrameWithPartition(Mockito.mock(Frame.class), 3)); - partitionToChannelMap.get(0); - // Assert the location of the channels where the frames have been written to Assert.assertEquals(ImmutableSet.of(0), partitionToChannelMap.get(1)); Assert.assertEquals(ImmutableSet.of(0), partitionToChannelMap.get(2)); From c953439375e205f17ed85082975531e367787b98 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 6 Mar 2023 15:25:05 +0530 Subject: [PATCH 06/14] add comments --- .../frame/processor/ComposingOutputChannelFactory.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java b/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java index 4b77a740ece6..cf94262ac353 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java @@ -74,7 +74,9 @@ public OutputChannel openChannel(int partitionNumber) })::get; outputChannelSupplierBuilder.add(channel); writableFrameChannelSuppliersBuilder.add(() -> channel.get().getWritableChannel()); - readableFrameChannelSuppliersBuilder.add(() -> channel.get().getReadableChannelSupplier().get()); + // We read the output channel once they have been written to, and therefore it is space efficient and safe to + // save their read only copies + readableFrameChannelSuppliersBuilder.add(() -> channel.get().readOnly().getReadableChannelSupplier().get()); } // the map maintains a mapping of channels which have the data for a given partition. @@ -120,6 +122,8 @@ public PartitionedOutputChannel openPartitionedChannel(String name, boolean dele })::get; partitionedOutputChannelSupplierBuilder.add(channel); writableFrameChannelsBuilder.add(() -> channel.get().getWritableChannel()); + // We read the output channel once they have been written to, and therefore it is space efficient and safe to + // save their read only copies readableFrameChannelSuppliersBuilder.add(() -> channel.get().readOnly().getReadableChannelSupplier().get()); } // the map maintains a mapping of channels which have the data for a given partition. From de0ef08525aa49a11c8ce46770e60a670aa9d732 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 7 Mar 2023 07:12:42 +0530 Subject: [PATCH 07/14] intellij inspections --- .../channel/ComposingWritableFrameChannelTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java index 795c8a86b2e1..a3292b37dd17 100644 --- a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java +++ b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java @@ -65,7 +65,7 @@ public void testComposingWritableChannelSwitchesProperly() throws IOException 2 ); - Map> partitionToChannelMap = new HashMap(); + Map> partitionToChannelMap = new HashMap<>(); ComposingWritableFrameChannel composingWritableFrameChannel = new ComposingWritableFrameChannel( ImmutableList.of( @@ -90,7 +90,7 @@ public void testComposingWritableChannelSwitchesProperly() throws IOException Assert.assertEquals(ImmutableSet.of(1), partitionToChannelMap.get(3)); // Test if the older channel has been converted to read only - Assert.assertThrows(ISE.class, () -> outputChannel1.getWritableChannel()); + Assert.assertThrows(ISE.class, outputChannel1::getWritableChannel); } static class LimitedWritableFrameChannel implements WritableFrameChannel @@ -104,7 +104,7 @@ public LimitedWritableFrameChannel(int maxFrames) } @Override - public void write(FrameWithPartition frameWithPartition) throws IOException + public void write(FrameWithPartition frameWithPartition) { if (curFrame >= maxFrames) { throw new ResourceLimitExceededException("Cannot write more frames to the channel"); @@ -113,13 +113,13 @@ public void write(FrameWithPartition frameWithPartition) throws IOException } @Override - public void fail(@Nullable Throwable cause) throws IOException + public void fail(@Nullable Throwable cause) { } @Override - public void close() throws IOException + public void close() { } From 7f58af0e622bade8420b5e73afdbaa8c5c0247ab Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 7 Mar 2023 11:07:40 +0530 Subject: [PATCH 08/14] test failure --- .../apache/druid/frame/processor/OutputChannelsTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java index e51b8d31c55d..16426657aa92 100644 --- a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java +++ b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java @@ -83,7 +83,8 @@ public void test_readOnly() MatcherAssert.assertThat( e, - ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable channel is not available")) + ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo( + "Writable channel is not available. The output channel might be marked as read-only, hence no writes are allowed.")) ); final IllegalStateException e2 = Assert.assertThrows( @@ -93,7 +94,8 @@ public void test_readOnly() MatcherAssert.assertThat( e2, - ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable channel is not available")) + ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo( + "Frame allocator is not available. The output channel might be marked as read-only, hence memory allocator is not required.")) ); } } From 652d770fea4ab4899a0ee3397799bb3783b20c5e Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 7 Mar 2023 16:59:37 +0530 Subject: [PATCH 09/14] test failure 2 --- .../org/apache/druid/frame/processor/OutputChannelTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java index 88a51f67e7ea..25e6490d1b75 100644 --- a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java +++ b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java @@ -42,14 +42,16 @@ public void test_nil() final IllegalStateException e1 = Assert.assertThrows(IllegalStateException.class, channel::getWritableChannel); MatcherAssert.assertThat( e1, - ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable channel is not available")) + ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo( + "Writable channel is not available. The output channel might be marked as read-only, hence no writes are allowed.")) ); // No writable channel: cannot call getFrameMemoryAllocator. final IllegalStateException e2 = Assert.assertThrows(IllegalStateException.class, channel::getFrameMemoryAllocator); MatcherAssert.assertThat( e2, - ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable channel is not available")) + ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo( + "Frame allocator is not available. The output channel might be marked as read-only, hence memory allocator is not required.")) ); // Mapping the writable channel of a nil channel has no effect, because there is no writable channel. From a4f26ddbd9c198702f71de9d6f80380186ea70e1 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Thu, 9 Mar 2023 09:24:13 +0530 Subject: [PATCH 10/14] Trigger Build From e55b8decc8e94ced2d6cb44fd88d8b6df7278794 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Thu, 9 Mar 2023 15:53:56 +0530 Subject: [PATCH 11/14] fix build issue due to merge --- .../channel/ComposingWritableFrameChannelTest.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java index a3292b37dd17..715af01d9ff9 100644 --- a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java +++ b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java @@ -112,6 +112,11 @@ public void write(FrameWithPartition frameWithPartition) ++curFrame; } + @Override + public void write(Frame frame) throws IOException + { + } + @Override public void fail(@Nullable Throwable cause) { @@ -124,6 +129,12 @@ public void close() } + @Override + public boolean isClosed() + { + return false; + } + @Override public ListenableFuture writabilityFuture() { From f542de946962f9a3915c2acb0c14805cb5f4b1b8 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Thu, 9 Mar 2023 16:27:22 +0530 Subject: [PATCH 12/14] build failures in outputchannels --- .../java/org/apache/druid/frame/processor/OutputChannel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java b/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java index a847a142135d..e1377eddca35 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java @@ -207,7 +207,7 @@ public ReadableFrameChannel getReadableChannel() /** * Whether {@link #getReadableChannel()} is ready to use. */ - public boolean isReadableChannelReady() + public synchronized boolean isReadableChannelReady() { return readableChannelUsableWhileWriting || writableChannel == null || writableChannel.isClosed(); } From ea6c834573fc24b0d363668d1b7129611f63c977 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Thu, 9 Mar 2023 17:02:15 +0530 Subject: [PATCH 13/14] Trigger Build From 6746c19059f7c310a1e708457ad76e401272aef2 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Thu, 9 Mar 2023 23:44:09 +0530 Subject: [PATCH 14/14] build fix --- .../druid/frame/channel/ComposingWritableFrameChannelTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java index 715af01d9ff9..c2968b357953 100644 --- a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java +++ b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java @@ -113,7 +113,7 @@ public void write(FrameWithPartition frameWithPartition) } @Override - public void write(Frame frame) throws IOException + public void write(Frame frame) { }