From 057d2fcc6fc4b0ac503eeadbc86a427fcb0ac49a Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 17 Sep 2024 02:47:12 -0700 Subject: [PATCH] FrameChannelMerger: Fix incorrect behavior of finished(). Previously, the processor used "remainingChannels" to track the number of non-null entries of currentFrame. Now, "remainingChannels" tracks the number of channels that are unfinished. The difference is subtle. In the previous code, when an input channel was blocked upon exiting nextFrame(), the "currentFrames" entry would be null, and therefore the "remainingChannels" variable would be decremented. After the next await and call to populateCurrentFramesAndTournamentTree(), "remainingChannels" would be incremented if the channel had become unblocked after awaiting. This means that finished(), which returned true if remainingChannels was zero, would not be reliable if called between nextFrame() and the next await + populateCurrentFramesAndTournamentTree(). This patch changes things such that finished() is always reliable. This fixes a regression introduced in PR #16911, which added a call to finished() that was, at that time, unsafe. --- .../frame/processor/FrameChannelMerger.java | 27 ++++-- .../frame/processor/SuperSorterTest.java | 47 +++++++++- ...sAsyncPartitionedReadableFrameChannel.java | 51 +++++++++++ .../test/AlwaysAsyncReadableFrameChannel.java | 85 +++++++++++++++++++ 4 files changed, 198 insertions(+), 12 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/frame/processor/test/AlwaysAsyncPartitionedReadableFrameChannel.java create mode 100644 processing/src/test/java/org/apache/druid/frame/processor/test/AlwaysAsyncReadableFrameChannel.java diff --git a/processing/src/main/java/org/apache/druid/frame/processor/FrameChannelMerger.java b/processing/src/main/java/org/apache/druid/frame/processor/FrameChannelMerger.java index e806f98b9264..662fd001b02d 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/FrameChannelMerger.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/FrameChannelMerger.java @@ -19,8 +19,10 @@ package org.apache.druid.frame.processor; +import it.unimi.dsi.fastutil.ints.IntAVLTreeSet; import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import it.unimi.dsi.fastutil.ints.IntSet; +import it.unimi.dsi.fastutil.ints.IntSets; import org.apache.druid.frame.Frame; import org.apache.druid.frame.channel.FrameWithPartition; import org.apache.druid.frame.channel.ReadableFrameChannel; @@ -68,7 +70,11 @@ public class FrameChannelMerger implements FrameProcessor private final long rowLimit; private long rowsOutput = 0; private int currentPartition = 0; - private int remainingChannels; + + /** + * Channels that still have input to read. + */ + private final IntSet remainingChannels; // ColumnSelectorFactory that always reads from the current row in the merged sequence. final MultiColumnSelectorFactory mergedColumnSelectorFactory; @@ -119,7 +125,7 @@ public FrameChannelMerger( this.partitions = partitionsToUse; this.rowLimit = rowLimit; this.currentFrames = new FramePlus[inputChannels.size()]; - this.remainingChannels = 0; + this.remainingChannels = new IntAVLTreeSet(IntSets.fromTo(0, inputChannels.size())); this.tournamentTree = new TournamentTree( inputChannels.size(), (k1, k2) -> { @@ -241,7 +247,7 @@ private FrameWithPartition nextFrame() if (rowLimit != UNLIMITED && rowsOutput >= rowLimit) { // Limit reached; we're done. Arrays.fill(currentFrames, null); - remainingChannels = 0; + remainingChannels.clear(); } else { // Continue reading the currentChannel. final FramePlus channelFramePlus = currentFrames[currentChannel]; @@ -251,7 +257,6 @@ private FrameWithPartition nextFrame() // Done reading current frame from "channel". // Clear it and see if there is another one available for immediate loading. currentFrames[currentChannel] = null; - remainingChannels--; final ReadableFrameChannel channel = inputChannels.get(currentChannel); @@ -265,10 +270,10 @@ private FrameWithPartition nextFrame() break; } else { currentFrames[currentChannel] = framePlus; - remainingChannels++; } } else if (channel.isFinished()) { // Done reading this channel. Fall through and continue with other channels. + remainingChannels.remove(currentChannel); } else { // Nothing available, not finished; we can't continue. Finish up the current frame and return it. break; @@ -282,9 +287,12 @@ private FrameWithPartition nextFrame() } } + /** + * Returns whether all input is done being read. + */ private boolean finished() { - return remainingChannels == 0; + return remainingChannels.isEmpty(); } @Override @@ -302,7 +310,7 @@ private IntSet populateCurrentFramesAndTournamentTree() final IntSet await = new IntOpenHashSet(); for (int i = 0; i < inputChannels.size(); i++) { - if (currentFrames[i] == null) { + if (currentFrames[i] == null && remainingChannels.contains(i)) { final ReadableFrameChannel channel = inputChannels.get(i); if (channel.canRead()) { @@ -312,9 +320,10 @@ private IntSet populateCurrentFramesAndTournamentTree() await.add(i); } else { currentFrames[i] = framePlus; - remainingChannels++; } - } else if (!channel.isFinished()) { + } else if (channel.isFinished()) { + remainingChannels.remove(i); + } else { await.add(i); } } diff --git a/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java b/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java index 7a885af49c54..80e7f6352d00 100644 --- a/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java +++ b/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java @@ -38,6 +38,8 @@ import org.apache.druid.frame.key.KeyTestUtils; import org.apache.druid.frame.key.RowKey; import org.apache.druid.frame.key.RowKeyReader; +import org.apache.druid.frame.processor.test.AlwaysAsyncPartitionedReadableFrameChannel; +import org.apache.druid.frame.processor.test.AlwaysAsyncReadableFrameChannel; import org.apache.druid.frame.read.FrameReader; import org.apache.druid.frame.testutil.FrameSequenceBuilder; import org.apache.druid.frame.testutil.FrameTestUtil; @@ -434,8 +436,8 @@ private void verifySuperSorter( clusterByPartitionsFuture, exec, FrameProcessorDecorator.NONE, - new FileOutputChannelFactory(tempFolder, maxBytesPerFrame, null), - outputChannelFactory, + makeOutputChannelFactory(new FileOutputChannelFactory(tempFolder, maxBytesPerFrame, null)), + makeOutputChannelFactory(outputChannelFactory), maxActiveProcessors, maxChannelsPerProcessor, limitHint, @@ -839,9 +841,48 @@ public void accept(final Frame frame) for (final BlockingQueueFrameChannel channel : channels) { channel.writable().close(); - retVal.add(channel.readable()); + retVal.add(new AlwaysAsyncReadableFrameChannel(channel.readable())); } return retVal; } + + /** + * Wraps an underlying {@link OutputChannelFactory} in one that uses {@link AlwaysAsyncReadableFrameChannel} + * for all of its readable channels. This helps catch bugs due to improper usage of {@link ReadableFrameChannel} + * methods that enable async reads. + */ + private static OutputChannelFactory makeOutputChannelFactory(final OutputChannelFactory baseFactory) + { + return new OutputChannelFactory() { + @Override + public OutputChannel openChannel(int partitionNumber) throws IOException + { + final OutputChannel channel = baseFactory.openChannel(partitionNumber); + return OutputChannel.pair( + channel.getWritableChannel(), + channel.getFrameMemoryAllocator(), + () -> new AlwaysAsyncReadableFrameChannel(channel.getReadableChannelSupplier().get()), + channel.getPartitionNumber() + ); + } + + @Override + public PartitionedOutputChannel openPartitionedChannel(String name, boolean deleteAfterRead) throws IOException + { + final PartitionedOutputChannel channel = baseFactory.openPartitionedChannel(name, deleteAfterRead); + return PartitionedOutputChannel.pair( + channel.getWritableChannel(), + channel.getFrameMemoryAllocator(), + () -> new AlwaysAsyncPartitionedReadableFrameChannel(channel.getReadableChannelSupplier().get()) + ); + } + + @Override + public OutputChannel openNilChannel(int partitionNumber) + { + return baseFactory.openNilChannel(partitionNumber); + } + }; + } } diff --git a/processing/src/test/java/org/apache/druid/frame/processor/test/AlwaysAsyncPartitionedReadableFrameChannel.java b/processing/src/test/java/org/apache/druid/frame/processor/test/AlwaysAsyncPartitionedReadableFrameChannel.java new file mode 100644 index 000000000000..4013889df6e5 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/frame/processor/test/AlwaysAsyncPartitionedReadableFrameChannel.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor.test; + +import org.apache.druid.frame.channel.PartitionedReadableFrameChannel; +import org.apache.druid.frame.channel.ReadableFrameChannel; + +import java.io.IOException; + +/** + * Implementation of {@link PartitionedReadableFrameChannel} that wraps all underlying channels in + * {@link AlwaysAsyncReadableFrameChannel}. + */ +public class AlwaysAsyncPartitionedReadableFrameChannel implements PartitionedReadableFrameChannel +{ + private final PartitionedReadableFrameChannel delegate; + + public AlwaysAsyncPartitionedReadableFrameChannel(PartitionedReadableFrameChannel delegate) + { + this.delegate = delegate; + } + + @Override + public ReadableFrameChannel getReadableFrameChannel(int partitionNumber) + { + return new AlwaysAsyncReadableFrameChannel(delegate.getReadableFrameChannel(partitionNumber)); + } + + @Override + public void close() throws IOException + { + delegate.close(); + } +} diff --git a/processing/src/test/java/org/apache/druid/frame/processor/test/AlwaysAsyncReadableFrameChannel.java b/processing/src/test/java/org/apache/druid/frame/processor/test/AlwaysAsyncReadableFrameChannel.java new file mode 100644 index 000000000000..8ff10aeb7b07 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/frame/processor/test/AlwaysAsyncReadableFrameChannel.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor.test; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.java.util.common.ISE; + +/** + * Wraps an underlying channel and forces an async style of reading. After each call to {@link #read()}, the + * {@link #canRead()} and {@link #isFinished()} methods return false until {@link #readabilityFuture()} is called. + */ +public class AlwaysAsyncReadableFrameChannel implements ReadableFrameChannel +{ + private final ReadableFrameChannel delegate; + private boolean defer; + + public AlwaysAsyncReadableFrameChannel(ReadableFrameChannel delegate) + { + this.delegate = delegate; + } + + @Override + public boolean isFinished() + { + if (defer) { + return false; + } + + return delegate.isFinished(); + } + + @Override + public boolean canRead() + { + if (defer) { + return false; + } + + return delegate.canRead(); + } + + @Override + public Frame read() + { + if (defer) { + throw new ISE("Cannot call read() while deferred"); + } + + defer = true; + return delegate.read(); + } + + @Override + public ListenableFuture readabilityFuture() + { + defer = false; + return delegate.readabilityFuture(); + } + + @Override + public void close() + { + defer = false; + delegate.close(); + } +}