Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.java.util.common.guava;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.RE;
Expand Down Expand Up @@ -81,6 +82,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
private final int parallelism;
private final long targetTimeNanos;
private final Consumer<MergeCombineMetrics> metricsReporter;

private final CancellationGizmo cancellationGizmo;

public ParallelMergeCombiningSequence(
Expand Down Expand Up @@ -152,6 +154,12 @@ public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulat
return finalOutSequence.toYielder(initValue, accumulator);
}

@VisibleForTesting
public CancellationGizmo getCancellationGizmo()
{
return cancellationGizmo;
}

/**
* Create an output {@link Sequence} that wraps the output {@link BlockingQueue} of a
* {@link MergeCombinePartitioningAction}
Expand All @@ -166,6 +174,7 @@ static <T> Sequence<T> makeOutputSequenceForQueue(
return new BaseSequence<>(
new BaseSequence.IteratorMaker<T, Iterator<T>>()
{
private boolean shouldCancelOnCleanup = true;
@Override
public Iterator<T> make()
{
Expand Down Expand Up @@ -201,6 +210,7 @@ public boolean hasNext()
}

if (currentBatch.isTerminalResult()) {
shouldCancelOnCleanup = false;
return false;
}
return true;
Expand Down Expand Up @@ -228,7 +238,9 @@ public T next()
@Override
public void cleanup(Iterator<T> iterFromMake)
{
// nothing to cleanup
if (shouldCancelOnCleanup) {
cancellationGizmo.cancel(new RuntimeException("Already closed"));
}
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,32 @@ public void testTimeoutExceptionDueToStalledReader() throws Exception
assertException(input, 8, 64, 1000, 500);
}

@Test
public void testGracefulCloseOfYielderCancelsPool() throws Exception
{

List<Sequence<IntPair>> input = new ArrayList<>();
input.add(nonBlockingSequence(10_000));
input.add(nonBlockingSequence(9_001));
input.add(nonBlockingSequence(7_777));
input.add(nonBlockingSequence(8_500));
input.add(nonBlockingSequence(5_000));
input.add(nonBlockingSequence(8_888));

assertResultWithEarlyClose(input, 128, 1024, 256, reportMetrics -> {
Assert.assertEquals(2, reportMetrics.getParallelism());
Assert.assertEquals(6, reportMetrics.getInputSequences());
// 49166 is total set of results if yielder were fully processed, expect somewhere more than 0 but less than that
// this isn't super indicative of anything really, since closing the yielder would have triggered the baggage
// to run, which runs this metrics reporter function, while the actual processing could still be occuring on the
// pool in the background and the yielder still operates as intended if cancellation isn't in fact happening.
// other tests ensure that this is true though (yielder.next throwing an exception for example)
Assert.assertTrue(49166 > reportMetrics.getInputRows());
Assert.assertTrue(0 < reportMetrics.getInputRows());
});
}


private void assertResult(List<Sequence<IntPair>> sequences) throws InterruptedException, IOException
{
assertResult(
Expand Down Expand Up @@ -611,6 +637,87 @@ private void assertResult(
Assert.assertEquals(0, pool.getRunningThreadCount());
combiningYielder.close();
parallelMergeCombineYielder.close();
// cancellation trigger should not be set if sequence was fully yielded and close is called
// (though shouldn't actually matter even if it was...)
Assert.assertFalse(parallelMergeCombineSequence.getCancellationGizmo().isCancelled());
}

private void assertResultWithEarlyClose(
List<Sequence<IntPair>> sequences,
int batchSize,
int yieldAfter,
int closeYielderAfter,
Consumer<ParallelMergeCombiningSequence.MergeCombineMetrics> reporter
)
throws InterruptedException, IOException
{
final CombiningSequence<IntPair> combiningSequence = CombiningSequence.create(
new MergeSequence<>(INT_PAIR_ORDERING, Sequences.simple(sequences)),
INT_PAIR_ORDERING,
INT_PAIR_MERGE_FN
);

final ParallelMergeCombiningSequence<IntPair> parallelMergeCombineSequence = new ParallelMergeCombiningSequence<>(
pool,
sequences,
INT_PAIR_ORDERING,
INT_PAIR_MERGE_FN,
true,
5000,
0,
TEST_POOL_SIZE,
yieldAfter,
batchSize,
ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS,
reporter
);

Yielder<IntPair> combiningYielder = Yielders.each(combiningSequence);
Yielder<IntPair> parallelMergeCombineYielder = Yielders.each(parallelMergeCombineSequence);

IntPair prev = null;

int yields = 0;
while (!combiningYielder.isDone() && !parallelMergeCombineYielder.isDone()) {
if (yields >= closeYielderAfter) {
parallelMergeCombineYielder.close();
combiningYielder.close();
break;
} else {
yields++;
Assert.assertEquals(combiningYielder.get(), parallelMergeCombineYielder.get());
Assert.assertNotEquals(parallelMergeCombineYielder.get(), prev);
prev = parallelMergeCombineYielder.get();
combiningYielder = combiningYielder.next(combiningYielder.get());
parallelMergeCombineYielder = parallelMergeCombineYielder.next(parallelMergeCombineYielder.get());
}
}
// trying to next the yielder creates sadness for you
final String expectedExceptionMsg = "Already closed";
try {
Assert.assertEquals(combiningYielder.get(), parallelMergeCombineYielder.get());
parallelMergeCombineYielder.next(parallelMergeCombineYielder.get());
// this should explode so the contradictory next statement should not be reached
Assert.assertTrue(false);
}
catch (RuntimeException rex) {
Assert.assertEquals(expectedExceptionMsg, rex.getMessage());
}

// cancellation gizmo of sequence should be cancelled, and also should contain our expected message
Assert.assertTrue(parallelMergeCombineSequence.getCancellationGizmo().isCancelled());
Assert.assertEquals(
expectedExceptionMsg,
parallelMergeCombineSequence.getCancellationGizmo().getRuntimeException().getMessage()
);

while (pool.getRunningThreadCount() > 0) {
Thread.sleep(100);
}
Assert.assertEquals(0, pool.getRunningThreadCount());

Assert.assertFalse(combiningYielder.isDone());
Assert.assertFalse(parallelMergeCombineYielder.isDone());
}

private void assertException(List<Sequence<IntPair>> sequences) throws Exception
Expand Down