Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.apache.druid.frame.processor;

import it.unimi.dsi.fastutil.ints.IntAVLTreeSet;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.ints.IntSets;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.channel.FrameWithPartition;
import org.apache.druid.frame.channel.ReadableFrameChannel;
Expand Down Expand Up @@ -68,7 +70,11 @@ public class FrameChannelMerger implements FrameProcessor<Long>
private final long rowLimit;
private long rowsOutput = 0;
private int currentPartition = 0;
private int remainingChannels;

/**
* Channels that still have input to read.
*/
private final IntSet remainingChannels;

// ColumnSelectorFactory that always reads from the current row in the merged sequence.
final MultiColumnSelectorFactory mergedColumnSelectorFactory;
Expand Down Expand Up @@ -119,7 +125,7 @@ public FrameChannelMerger(
this.partitions = partitionsToUse;
this.rowLimit = rowLimit;
this.currentFrames = new FramePlus[inputChannels.size()];
this.remainingChannels = 0;
this.remainingChannels = new IntAVLTreeSet(IntSets.fromTo(0, inputChannels.size()));
this.tournamentTree = new TournamentTree(
inputChannels.size(),
(k1, k2) -> {
Expand Down Expand Up @@ -241,7 +247,7 @@ private FrameWithPartition nextFrame()
if (rowLimit != UNLIMITED && rowsOutput >= rowLimit) {
// Limit reached; we're done.
Arrays.fill(currentFrames, null);
remainingChannels = 0;
remainingChannels.clear();
} else {
// Continue reading the currentChannel.
final FramePlus channelFramePlus = currentFrames[currentChannel];
Expand All @@ -251,7 +257,6 @@ private FrameWithPartition nextFrame()
// Done reading current frame from "channel".
// Clear it and see if there is another one available for immediate loading.
currentFrames[currentChannel] = null;
remainingChannels--;

final ReadableFrameChannel channel = inputChannels.get(currentChannel);

Expand All @@ -265,10 +270,10 @@ private FrameWithPartition nextFrame()
break;
} else {
currentFrames[currentChannel] = framePlus;
remainingChannels++;
}
} else if (channel.isFinished()) {
// Done reading this channel. Fall through and continue with other channels.
remainingChannels.remove(currentChannel);
} else {
// Nothing available, not finished; we can't continue. Finish up the current frame and return it.
break;
Expand All @@ -282,9 +287,12 @@ private FrameWithPartition nextFrame()
}
}

/**
* Returns whether all input is done being read.
*/
private boolean finished()
{
return remainingChannels == 0;
return remainingChannels.isEmpty();
}

@Override
Expand All @@ -302,7 +310,7 @@ private IntSet populateCurrentFramesAndTournamentTree()
final IntSet await = new IntOpenHashSet();

for (int i = 0; i < inputChannels.size(); i++) {
if (currentFrames[i] == null) {
if (currentFrames[i] == null && remainingChannels.contains(i)) {
final ReadableFrameChannel channel = inputChannels.get(i);

if (channel.canRead()) {
Expand All @@ -312,9 +320,10 @@ private IntSet populateCurrentFramesAndTournamentTree()
await.add(i);
} else {
currentFrames[i] = framePlus;
remainingChannels++;
}
} else if (!channel.isFinished()) {
} else if (channel.isFinished()) {
remainingChannels.remove(i);
} else {
await.add(i);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.druid.frame.key.KeyTestUtils;
import org.apache.druid.frame.key.RowKey;
import org.apache.druid.frame.key.RowKeyReader;
import org.apache.druid.frame.processor.test.AlwaysAsyncPartitionedReadableFrameChannel;
import org.apache.druid.frame.processor.test.AlwaysAsyncReadableFrameChannel;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.testutil.FrameTestUtil;
Expand Down Expand Up @@ -434,8 +436,8 @@ private void verifySuperSorter(
clusterByPartitionsFuture,
exec,
FrameProcessorDecorator.NONE,
new FileOutputChannelFactory(tempFolder, maxBytesPerFrame, null),
outputChannelFactory,
makeOutputChannelFactory(new FileOutputChannelFactory(tempFolder, maxBytesPerFrame, null)),
makeOutputChannelFactory(outputChannelFactory),
maxActiveProcessors,
maxChannelsPerProcessor,
limitHint,
Expand Down Expand Up @@ -839,9 +841,48 @@ public void accept(final Frame frame)

for (final BlockingQueueFrameChannel channel : channels) {
channel.writable().close();
retVal.add(channel.readable());
retVal.add(new AlwaysAsyncReadableFrameChannel(channel.readable()));
}

return retVal;
}

/**
* Wraps an underlying {@link OutputChannelFactory} in one that uses {@link AlwaysAsyncReadableFrameChannel}
* for all of its readable channels. This helps catch bugs due to improper usage of {@link ReadableFrameChannel}
* methods that enable async reads.
*/
private static OutputChannelFactory makeOutputChannelFactory(final OutputChannelFactory baseFactory)
{
return new OutputChannelFactory() {
@Override
public OutputChannel openChannel(int partitionNumber) throws IOException
{
final OutputChannel channel = baseFactory.openChannel(partitionNumber);
return OutputChannel.pair(
channel.getWritableChannel(),
channel.getFrameMemoryAllocator(),
() -> new AlwaysAsyncReadableFrameChannel(channel.getReadableChannelSupplier().get()),
channel.getPartitionNumber()
);
}

@Override
public PartitionedOutputChannel openPartitionedChannel(String name, boolean deleteAfterRead) throws IOException
{
final PartitionedOutputChannel channel = baseFactory.openPartitionedChannel(name, deleteAfterRead);
return PartitionedOutputChannel.pair(
channel.getWritableChannel(),
channel.getFrameMemoryAllocator(),
() -> new AlwaysAsyncPartitionedReadableFrameChannel(channel.getReadableChannelSupplier().get())
);
}

@Override
public OutputChannel openNilChannel(int partitionNumber)
{
return baseFactory.openNilChannel(partitionNumber);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.frame.processor.test;

import org.apache.druid.frame.channel.PartitionedReadableFrameChannel;
import org.apache.druid.frame.channel.ReadableFrameChannel;

import java.io.IOException;

/**
* Implementation of {@link PartitionedReadableFrameChannel} that wraps all underlying channels in
* {@link AlwaysAsyncReadableFrameChannel}.
*/
public class AlwaysAsyncPartitionedReadableFrameChannel implements PartitionedReadableFrameChannel
{
private final PartitionedReadableFrameChannel delegate;

public AlwaysAsyncPartitionedReadableFrameChannel(PartitionedReadableFrameChannel delegate)
{
this.delegate = delegate;
}

@Override
public ReadableFrameChannel getReadableFrameChannel(int partitionNumber)
{
return new AlwaysAsyncReadableFrameChannel(delegate.getReadableFrameChannel(partitionNumber));
}

@Override
public void close() throws IOException
{
delegate.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.frame.processor.test;

import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.java.util.common.ISE;

/**
* Wraps an underlying channel and forces an async style of reading. After each call to {@link #read()}, the
* {@link #canRead()} and {@link #isFinished()} methods return false until {@link #readabilityFuture()} is called.
*/
public class AlwaysAsyncReadableFrameChannel implements ReadableFrameChannel
{
private final ReadableFrameChannel delegate;
private boolean defer;

public AlwaysAsyncReadableFrameChannel(ReadableFrameChannel delegate)
{
this.delegate = delegate;
}

@Override
public boolean isFinished()
{
if (defer) {
return false;
}

return delegate.isFinished();
}

@Override
public boolean canRead()
{
if (defer) {
return false;
}

return delegate.canRead();
}

@Override
public Frame read()
{
if (defer) {
throw new ISE("Cannot call read() while deferred");
}

defer = true;
return delegate.read();
}

@Override
public ListenableFuture<?> readabilityFuture()
{
defer = false;
return delegate.readabilityFuture();
}

@Override
public void close()
{
defer = false;
delegate.close();
}
}