Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,44 +136,55 @@ public CloseableIterator<Entry<KeyType>> combine(
)
{
// CombineBuffer is initialized when this method is called and closed after the result iterator is done
final Closer closer = Closer.create();
final ResourceHolder<ByteBuffer> combineBufferHolder = combineBufferSupplier.get();
final ByteBuffer combineBuffer = combineBufferHolder.get();
final int minimumRequiredBufferCapacity = StreamingMergeSortedGrouper.requiredBufferCapacity(
combineKeySerdeFactory.factorizeWithDictionary(mergedDictionary),
combiningFactories
);
// We want to maximize the parallelism while the size of buffer slice is greater than the minimum buffer size
// required by StreamingMergeSortedGrouper. Here, we find the leafCombineDegree of the cominbing tree and the
// required number of buffers maximizing the parallelism.
final Pair<Integer, Integer> degreeAndNumBuffers = findLeafCombineDegreeAndNumBuffers(
combineBuffer,
minimumRequiredBufferCapacity,
concurrencyHint,
sortedIterators.size()
);
closer.register(combineBufferHolder);

final int leafCombineDegree = degreeAndNumBuffers.lhs;
final int numBuffers = degreeAndNumBuffers.rhs;
final int sliceSize = combineBuffer.capacity() / numBuffers;
try {
final ByteBuffer combineBuffer = combineBufferHolder.get();
final int minimumRequiredBufferCapacity = StreamingMergeSortedGrouper.requiredBufferCapacity(
combineKeySerdeFactory.factorizeWithDictionary(mergedDictionary),
combiningFactories
);
// We want to maximize the parallelism while the size of buffer slice is greater than the minimum buffer size
// required by StreamingMergeSortedGrouper. Here, we find the leafCombineDegree of the cominbing tree and the
// required number of buffers maximizing the parallelism.
final Pair<Integer, Integer> degreeAndNumBuffers = findLeafCombineDegreeAndNumBuffers(
combineBuffer,
minimumRequiredBufferCapacity,
concurrencyHint,
sortedIterators.size()
);

final Supplier<ByteBuffer> bufferSupplier = createCombineBufferSupplier(combineBuffer, numBuffers, sliceSize);
final int leafCombineDegree = degreeAndNumBuffers.lhs;
final int numBuffers = degreeAndNumBuffers.rhs;
final int sliceSize = combineBuffer.capacity() / numBuffers;

final Pair<List<CloseableIterator<Entry<KeyType>>>, List<Future>> combineIteratorAndFutures = buildCombineTree(
sortedIterators,
bufferSupplier,
combiningFactories,
leafCombineDegree,
mergedDictionary
);
final Supplier<ByteBuffer> bufferSupplier = createCombineBufferSupplier(combineBuffer, numBuffers, sliceSize);

final CloseableIterator<Entry<KeyType>> combineIterator = Iterables.getOnlyElement(combineIteratorAndFutures.lhs);
final List<Future> combineFutures = combineIteratorAndFutures.rhs;
final Pair<List<CloseableIterator<Entry<KeyType>>>, List<Future>> combineIteratorAndFutures = buildCombineTree(
sortedIterators,
bufferSupplier,
combiningFactories,
leafCombineDegree,
mergedDictionary
);

final Closer closer = Closer.create();
closer.register(combineBufferHolder);
closer.register(() -> checkCombineFutures(combineFutures));
final CloseableIterator<Entry<KeyType>> combineIterator = Iterables.getOnlyElement(combineIteratorAndFutures.lhs);
final List<Future> combineFutures = combineIteratorAndFutures.rhs;
closer.register(() -> checkCombineFutures(combineFutures));

return CloseableIterators.wrap(combineIterator, closer);
return CloseableIterators.wrap(combineIterator, closer);
}
catch (Throwable t) {
try {
closer.close();
}
catch (Throwable t2) {
t.addSuppressed(t2);
}
throw t;
}
}

private static void checkCombineFutures(List<Future> combineFutures)
Expand Down Expand Up @@ -289,11 +300,11 @@ private int computeRequiredBufferNum(int numChildNodes, int combineDegree)
* Recursively build a combining tree in a bottom-up manner. Each node of the tree is a task that combines input
* iterators asynchronously.
*
* @param childIterators all iterators of the child level
* @param bufferSupplier combining buffer supplier
* @param combiningFactories array of combining aggregator factories
* @param combineDegree combining degree for the current level
* @param dictionary merged dictionary
* @param childIterators all iterators of the child level
* @param bufferSupplier combining buffer supplier
* @param combiningFactories array of combining aggregator factories
* @param combineDegree combining degree for the current level
* @param dictionary merged dictionary
*
* @return a pair of a list of iterators of the current level in the combining tree and a list of futures of all
* executed combining tasks
Expand Down