Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.frame.processor.OutputChannel;
import org.apache.druid.frame.processor.PartitionedOutputChannel;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.ResourceLimitExceededException;

Expand All @@ -38,16 +41,29 @@
*/
public class ComposingWritableFrameChannel implements WritableFrameChannel
{
private final List<Supplier<WritableFrameChannel>> channels;
@Nullable
private final List<Supplier<OutputChannel>> outputChannelSuppliers;

@Nullable
private final List<Supplier<PartitionedOutputChannel>> partitionedOutputChannelSuppliers;

private final List<Supplier<WritableFrameChannel>> writableChannelSuppliers;
private final Map<Integer, HashSet<Integer>> partitionToChannelMap;
private int currentIndex;

public ComposingWritableFrameChannel(
List<Supplier<WritableFrameChannel>> channels,
@Nullable List<Supplier<OutputChannel>> outputChannelSuppliers,
@Nullable List<Supplier<PartitionedOutputChannel>> partitionedOutputChannelSuppliers,
List<Supplier<WritableFrameChannel>> writableChannelSuppliers,
Map<Integer, HashSet<Integer>> partitionToChannelMap
)
{
this.channels = Preconditions.checkNotNull(channels, "channels is null");
if (outputChannelSuppliers != null && partitionedOutputChannelSuppliers != null) {
throw new IAE("Atmost one of outputChannelSuppliers and partitionedOutputChannelSuppliers can be provided");
}
this.outputChannelSuppliers = outputChannelSuppliers;
this.partitionedOutputChannelSuppliers = partitionedOutputChannelSuppliers;
this.writableChannelSuppliers = Preconditions.checkNotNull(writableChannelSuppliers, "writableChannelSuppliers is null");
this.partitionToChannelMap =
Preconditions.checkNotNull(partitionToChannelMap, "partitionToChannelMap is null");
this.currentIndex = 0;
Expand All @@ -56,12 +72,12 @@ public ComposingWritableFrameChannel(
@Override
public void write(FrameWithPartition frameWithPartition) throws IOException
{
if (currentIndex >= channels.size()) {
throw new ISE("No more channels available to write. Total available channels : " + channels.size());
if (currentIndex >= writableChannelSuppliers.size()) {
throw new ISE("No more channels available to write. Total available channels : " + writableChannelSuppliers.size());
}

try {
channels.get(currentIndex).get().write(frameWithPartition);
writableChannelSuppliers.get(currentIndex).get().write(frameWithPartition);
partitionToChannelMap.computeIfAbsent(frameWithPartition.partition(), k -> Sets.newHashSetWithExpectedSize(1))
.add(currentIndex);
}
Expand All @@ -70,9 +86,19 @@ public void write(FrameWithPartition frameWithPartition) throws IOException
// exception is automatically passed up to the user incase all the channels are exhausted. If in future, more
// cases come up to dictate control flow, then we can switch to returning a custom object from the channel's write
// operation.
channels.get(currentIndex).get().close();
writableChannelSuppliers.get(currentIndex).get().close();

// We are converting the corresponding channel to read only after exhausting it because that channel won't be used
// for writes anymore
if (outputChannelSuppliers != null) {
outputChannelSuppliers.get(currentIndex).get().convertToReadOnly();
}
if (partitionedOutputChannelSuppliers != null) {
partitionedOutputChannelSuppliers.get(currentIndex).get().convertToReadOnly();
}

currentIndex++;
if (currentIndex >= channels.size()) {
if (currentIndex >= writableChannelSuppliers.size()) {
throw rlee;
}
write(frameWithPartition);
Expand All @@ -82,29 +108,29 @@ public void write(FrameWithPartition frameWithPartition) throws IOException
@Override
public void fail(@Nullable Throwable cause) throws IOException
{
for (Supplier<WritableFrameChannel> channel : channels) {
for (Supplier<WritableFrameChannel> channel : writableChannelSuppliers) {
channel.get().fail(cause);
}
}

@Override
public void close() throws IOException
{
if (currentIndex < channels.size()) {
channels.get(currentIndex).get().close();
currentIndex = channels.size();
if (currentIndex < writableChannelSuppliers.size()) {
writableChannelSuppliers.get(currentIndex).get().close();
currentIndex = writableChannelSuppliers.size();
}
}

@Override
public boolean isClosed()
{
return currentIndex == channels.size();
return currentIndex == writableChannelSuppliers.size();
}

@Override
public ListenableFuture<?> writabilityFuture()
{
return channels.get(currentIndex).get().writabilityFuture();
return writableChannelSuppliers.get(currentIndex).get().writabilityFuture();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OutputChannel openChannel(int partitionNumber)
{
ImmutableList.Builder<Supplier<WritableFrameChannel>> writableFrameChannelSuppliersBuilder = ImmutableList.builder();
ImmutableList.Builder<Supplier<ReadableFrameChannel>> readableFrameChannelSuppliersBuilder = ImmutableList.builder();
ImmutableList.Builder<Supplier<OutputChannel>> outputChannelSupplierBuilder = ImmutableList.builder();
for (OutputChannelFactory channelFactory : channelFactories) {
// open channel lazily
Supplier<OutputChannel> channel =
Expand All @@ -71,14 +72,19 @@ public OutputChannel openChannel(int partitionNumber)
throw new UncheckedIOException(e);
}
})::get;
outputChannelSupplierBuilder.add(channel);
writableFrameChannelSuppliersBuilder.add(() -> channel.get().getWritableChannel());
readableFrameChannelSuppliersBuilder.add(() -> channel.get().getReadableChannelSupplier().get());
// We read the output channel once they have been written to, and therefore it is space efficient and safe to
// save their read only copies
readableFrameChannelSuppliersBuilder.add(() -> channel.get().readOnly().getReadableChannelSupplier().get());
}

// the map maintains a mapping of channels which have the data for a given partition.
// it is useful to identify the readable channels to open in the composition while reading the partition data.
Map<Integer, HashSet<Integer>> partitionToChannelMap = new HashMap<>();
ComposingWritableFrameChannel writableFrameChannel = new ComposingWritableFrameChannel(
outputChannelSupplierBuilder.build(),
null,
writableFrameChannelSuppliersBuilder.build(),
partitionToChannelMap
);
Expand All @@ -103,6 +109,7 @@ public PartitionedOutputChannel openPartitionedChannel(String name, boolean dele
ImmutableList.Builder<Supplier<WritableFrameChannel>> writableFrameChannelsBuilder = ImmutableList.builder();
ImmutableList.Builder<Supplier<PartitionedReadableFrameChannel>> readableFrameChannelSuppliersBuilder =
ImmutableList.builder();
ImmutableList.Builder<Supplier<PartitionedOutputChannel>> partitionedOutputChannelSupplierBuilder = ImmutableList.builder();
for (OutputChannelFactory channelFactory : channelFactories) {
Supplier<PartitionedOutputChannel> channel =
Suppliers.memoize(() -> {
Expand All @@ -113,14 +120,19 @@ public PartitionedOutputChannel openPartitionedChannel(String name, boolean dele
throw new UncheckedIOException(e);
}
})::get;
partitionedOutputChannelSupplierBuilder.add(channel);
writableFrameChannelsBuilder.add(() -> channel.get().getWritableChannel());
readableFrameChannelSuppliersBuilder.add(() -> channel.get().getReadableChannelSupplier().get());
// We read the output channel once they have been written to, and therefore it is space efficient and safe to
// save their read only copies
readableFrameChannelSuppliersBuilder.add(() -> channel.get().readOnly().getReadableChannelSupplier().get());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will open a readOnly channel for all ComposingOutputChannel's. That shouldn't be the case no ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please add a comment saying that you should only request a readable channel if the outputchannel is completely written too.

}
// the map maintains a mapping of channels which have the data for a given partition.
// it is useful to identify the readable channels to open in the composition while reading the partition data.

Map<Integer, HashSet<Integer>> partitionToChannelMap = new HashMap<>();
ComposingWritableFrameChannel writableFrameChannel = new ComposingWritableFrameChannel(
null,
partitionedOutputChannelSupplierBuilder.build(),
writableFrameChannelsBuilder.build(),
partitionToChannelMap
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.frame.allocation.MemoryAllocator;
import org.apache.druid.frame.channel.FrameWithPartition;
import org.apache.druid.frame.channel.ReadableFrameChannel;
Expand All @@ -42,11 +43,16 @@
*/
public class OutputChannel
{
@GuardedBy("this")
@Nullable
private final WritableFrameChannel writableChannel;
private WritableFrameChannel writableChannel;

@GuardedBy("this")
@Nullable
private final MemoryAllocator frameMemoryAllocator;
private MemoryAllocator frameMemoryAllocator;

private final Supplier<ReadableFrameChannel> readableChannelSupplier;

private final boolean readableChannelUsableWhileWriting;
private final int partitionNumber;

Expand Down Expand Up @@ -157,24 +163,28 @@ public static OutputChannel nil(final int partitionNumber)
}

/**
* Returns the writable channel of this pair. The producer writes to this channel.
* Returns the writable channel of this pair. The producer writes to this channel. Throws ISE if the output channel is
* read only.
*/
public WritableFrameChannel getWritableChannel()
public synchronized WritableFrameChannel getWritableChannel()
{
if (writableChannel == null) {
throw new ISE("Writable channel is not available");
throw new ISE("Writable channel is not available. The output channel might be marked as read-only,"
+ " hence no writes are allowed.");
} else {
return writableChannel;
}
}

/**
* Returns the memory allocator for the writable channel. The producer uses this to generate frames for the channel.
* Throws ISE if the output channel is read only.
*/
public MemoryAllocator getFrameMemoryAllocator()
public synchronized MemoryAllocator getFrameMemoryAllocator()
{
if (frameMemoryAllocator == null) {
throw new ISE("Writable channel is not available");
throw new ISE("Frame allocator is not available. The output channel might be marked as read-only,"
+ " hence memory allocator is not required.");
} else {
return frameMemoryAllocator;
}
Expand All @@ -197,7 +207,7 @@ public ReadableFrameChannel getReadableChannel()
/**
* Whether {@link #getReadableChannel()} is ready to use.
*/
public boolean isReadableChannelReady()
public synchronized boolean isReadableChannelReady()
{
return readableChannelUsableWhileWriting || writableChannel == null || writableChannel.isClosed();
}
Expand All @@ -212,7 +222,7 @@ public int getPartitionNumber()
return partitionNumber;
}

public OutputChannel mapWritableChannel(final Function<WritableFrameChannel, WritableFrameChannel> mapFn)
public synchronized OutputChannel mapWritableChannel(final Function<WritableFrameChannel, WritableFrameChannel> mapFn)
{
if (writableChannel == null) {
return this;
Expand All @@ -235,4 +245,14 @@ public OutputChannel readOnly()
{
return OutputChannel.readOnly(readableChannelSupplier, partitionNumber);
}

/**
* Removes the reference to the {@link #writableChannel} and {@link #frameMemoryAllocator} from the object, making
* it more efficient
*/
public synchronized void convertToReadOnly()
{
this.writableChannel = null;
this.frameMemoryAllocator = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.frame.allocation.MemoryAllocator;
import org.apache.druid.frame.channel.PartitionedReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
Expand All @@ -37,10 +38,15 @@
*/
public class PartitionedOutputChannel
{

@GuardedBy("this")
@Nullable
private final WritableFrameChannel writableChannel;
private WritableFrameChannel writableChannel;

@GuardedBy("this")
@Nullable
private final MemoryAllocator frameMemoryAllocator;
private MemoryAllocator frameMemoryAllocator;

private final Supplier<PartitionedReadableFrameChannel> readableChannelSupplier;

private PartitionedOutputChannel(
Expand Down Expand Up @@ -76,24 +82,28 @@ public static PartitionedOutputChannel pair(
}

/**
* Returns the writable channel of this pair. The producer writes to this channel.
* Returns the writable channel of this pair. The producer writes to this channel. Throws ISE if the output channel is
* read only.
*/
public WritableFrameChannel getWritableChannel()
public synchronized WritableFrameChannel getWritableChannel()
{
if (writableChannel == null) {
throw new ISE("Writable channel is not available");
throw new ISE("Writable channel is not available. The output channel might be marked as read-only,"
+ " hence no writes are allowed.");
} else {
return writableChannel;
}
}

/**
* Returns the memory allocator for the writable channel. The producer uses this to generate frames for the channel.
* Throws ISE if the output channel is read only.
*/
public MemoryAllocator getFrameMemoryAllocator()
public synchronized MemoryAllocator getFrameMemoryAllocator()
{
if (frameMemoryAllocator == null) {
throw new ISE("Writable channel is not available");
throw new ISE("Frame allocator is not available. The output channel might be marked as read-only,"
+ " hence memory allocator is not required.");
} else {
return frameMemoryAllocator;
}
Expand All @@ -102,12 +112,12 @@ public MemoryAllocator getFrameMemoryAllocator()
/**
* Returns the partitioned readable channel supplier of this pair. The consumer reads from this channel.
*/
public Supplier<PartitionedReadableFrameChannel> getReadableChannelSupplier()
public synchronized Supplier<PartitionedReadableFrameChannel> getReadableChannelSupplier()
{
return readableChannelSupplier;
}

public PartitionedOutputChannel mapWritableChannel(final Function<WritableFrameChannel, WritableFrameChannel> mapFn)
public synchronized PartitionedOutputChannel mapWritableChannel(final Function<WritableFrameChannel, WritableFrameChannel> mapFn)
{
if (writableChannel == null) {
return this;
Expand All @@ -119,4 +129,24 @@ public PartitionedOutputChannel mapWritableChannel(final Function<WritableFrameC
);
}
}


/**
* Returns a read-only version of this instance. Read-only versions have neither {@link #getWritableChannel()} nor
* {@link #getFrameMemoryAllocator()}, and therefore require substantially less memory.
*/
public PartitionedOutputChannel readOnly()
{
return new PartitionedOutputChannel(null, null, readableChannelSupplier);
}

/**
* Removes the reference to the {@link #writableChannel} and {@link #frameMemoryAllocator} from the object, making
* it more efficient
*/
public synchronized void convertToReadOnly()
{
this.writableChannel = null;
this.frameMemoryAllocator = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,11 @@ private void runMerger(
);
writableChannel = partitionedOutputChannel.getWritableChannel();
frameAllocatorFactory = new SingleMemoryAllocatorFactory(partitionedOutputChannel.getFrameMemoryAllocator());
levelAndRankToReadableChannelMap.put(levelAndRankKey, partitionedOutputChannel);

// We add the readOnly() channel even though we require the writableChannel and the frame allocator because
// the original partitionedOutputChannel would contain the reference to those, which would get cleaned up
// appropriately and not be held up in the class level map
levelAndRankToReadableChannelMap.put(levelAndRankKey, partitionedOutputChannel.readOnly());
}

final FrameChannelMerger worker =
Expand Down
Loading