Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ Sequence<ParallelMergeCombiningSequenceTest.IntPair> createParallelSequence(
parallelism,
yieldAfter,
batchSize,
targetTaskTimeMillis
targetTaskTimeMillis,
null
);
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public Yielder<OutType> get()
catch (Exception e) {
t.addSuppressed(e);
}
Throwables.propagateIfPossible(t);
throw new RuntimeException(t);
}
}
Expand Down Expand Up @@ -89,6 +90,7 @@ public void close() throws IOException
t.addSuppressed(e);
}
Throwables.propagateIfInstanceOf(t, IOException.class);
Throwables.propagateIfPossible(t);
throw new RuntimeException(t);
}
// "Normal" close
Expand All @@ -97,6 +99,7 @@ public void close() throws IOException
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e, IOException.class);
Throwables.propagateIfPossible(e);
throw new RuntimeException(e);
}
}
Expand Down
18 changes: 9 additions & 9 deletions core/src/main/java/org/apache/druid/math/expr/ApplyFunction.java
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public boolean hasArrayOutput(LambdaExpr lambdaExpr)

/**
* Accumulate a value for a single array input with a 2 argument {@link LambdaExpr}. The 'array' input expression is
* the first argument, the initial value for the accumlator expression is the 2nd argument.
* the first argument, the initial value for the accumulator expression is the 2nd argument.
*/
class FoldFunction extends BaseFoldFunction
{
Expand All @@ -314,10 +314,10 @@ public ExprEval apply(LambdaExpr lambdaExpr, List<Expr> argsExpr, Expr.ObjectBin
if (array == null) {
return ExprEval.of(null);
}
Object accumlator = accEval.value();
Object accumulator = accEval.value();

FoldLambdaBinding lambdaBinding = new FoldLambdaBinding(array, accumlator, lambdaExpr, bindings);
return applyFold(lambdaExpr, accumlator, lambdaBinding);
FoldLambdaBinding lambdaBinding = new FoldLambdaBinding(array, accumulator, lambdaExpr, bindings);
return applyFold(lambdaExpr, accumulator, lambdaBinding);
}

@Override
Expand All @@ -340,8 +340,8 @@ public void validateArguments(LambdaExpr lambdaExpr, List<Expr> args)

/**
* Accumulate a value for the cartesian product of 'n' array inputs arguments with an 'n + 1' argument
* {@link LambdaExpr}. The 'array' input expressions are the first 'n' arguments, the initial value for the accumlator
* expression is the final argument.
* {@link LambdaExpr}. The 'array' input expressions are the first 'n' arguments, the initial value for the
* accumulator expression is the final argument.
*/
class CartesianFoldFunction extends BaseFoldFunction
{
Expand Down Expand Up @@ -385,11 +385,11 @@ public ExprEval apply(LambdaExpr lambdaExpr, List<Expr> argsExpr, Expr.ObjectBin

ExprEval accEval = accExpr.eval(bindings);

Object accumlator = accEval.value();
Object accumulator = accEval.value();

CartesianFoldLambdaBinding lambdaBindings =
new CartesianFoldLambdaBinding(product, accumlator, lambdaExpr, bindings);
return applyFold(lambdaExpr, accumlator, lambdaBindings);
new CartesianFoldLambdaBinding(product, accumulator, lambdaExpr, bindings);
return applyFold(lambdaExpr, accumulator, lambdaBindings);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;

public class ParallelMergeCombiningSequenceTest
{
private static final int TEST_POOL_SIZE = 4;
private static final Logger LOG = new Logger(ParallelMergeCombiningSequenceTest.class);

public static final Ordering<IntPair> INT_PAIR_ORDERING = Ordering.natural().onResultOf(p -> p.lhs);
Expand All @@ -65,7 +67,7 @@ public class ParallelMergeCombiningSequenceTest
public void setup()
{
pool = new ForkJoinPool(
(int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.75),
TEST_POOL_SIZE,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
(t, e) -> LOG.error(e, "Unhandled exception in thread [%s]", t),
true
Expand Down Expand Up @@ -255,7 +257,7 @@ public void testEmpties() throws Exception
input.add(Sequences.empty());
assertResult(input);

// above min sequence count threshold, so will merge in parallel (if enough cores)
// above min sequence count threshold, so will merge in parallel
input.add(Sequences.empty());
input.add(Sequences.empty());
input.add(Sequences.empty());
Expand All @@ -273,7 +275,7 @@ public void testEmptiesAndNonEmpty() throws Exception

input.clear();

// above min sequence count threshold, so will merge in parallel (if enough cores)
// above min sequence count threshold, so will merge in parallel
input.add(Sequences.empty());
input.add(Sequences.empty());
input.add(Sequences.empty());
Expand All @@ -290,18 +292,32 @@ public void testAllInSingleBatch() throws Exception
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(nonBlockingSequence(5));
input.add(nonBlockingSequence(6));
assertResult(input, 10, 20);
assertResult(input, 10, 20, reportMetrics -> {
Assert.assertEquals(1, reportMetrics.getParallelism());
Assert.assertEquals(2, reportMetrics.getInputSequences());
Assert.assertEquals(11, reportMetrics.getInputRows());
// deltas because it depends how much result combining is happening, which is random
Assert.assertEquals(6, reportMetrics.getOutputRows(), 5);
Assert.assertEquals(4, reportMetrics.getTaskCount());
});

input.clear();

// above min sequence count threshold, so will merge in parallel (if enough cores)
// above min sequence count threshold, so will merge in parallel
input.add(nonBlockingSequence(5));
input.add(nonBlockingSequence(6));
input.add(nonBlockingSequence(5));
input.add(nonBlockingSequence(8));
input.add(nonBlockingSequence(4));
input.add(nonBlockingSequence(6));
assertResult(input, 10, 20);
assertResult(input, 10, 20, reportMetrics -> {
Assert.assertEquals(2, reportMetrics.getParallelism());
Assert.assertEquals(6, reportMetrics.getInputSequences());
Assert.assertEquals(34, reportMetrics.getInputRows());
// deltas because it depends how much result combining is happening, which is random
Assert.assertEquals(16, reportMetrics.getOutputRows(), 15);
Assert.assertEquals(10, reportMetrics.getTaskCount(), 2);
});
}

@Test
Expand All @@ -311,18 +327,32 @@ public void testAllInSingleYield() throws Exception
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(nonBlockingSequence(5));
input.add(nonBlockingSequence(6));
assertResult(input, 4, 20);
assertResult(input, 4, 20, reportMetrics -> {
Assert.assertEquals(1, reportMetrics.getParallelism());
Assert.assertEquals(2, reportMetrics.getInputSequences());
Assert.assertEquals(11, reportMetrics.getInputRows());
// deltas because it depends how much result combining is happening, which is random
Assert.assertEquals(6, reportMetrics.getOutputRows(), 5);
Assert.assertEquals(4, reportMetrics.getTaskCount());
});

input.clear();

// above min sequence count threshold, so will merge in parallel (if enough cores)
// above min sequence count threshold, so will merge in parallel
input.add(nonBlockingSequence(5));
input.add(nonBlockingSequence(6));
input.add(nonBlockingSequence(5));
input.add(nonBlockingSequence(8));
input.add(nonBlockingSequence(4));
input.add(nonBlockingSequence(6));
assertResult(input, 4, 20);
assertResult(input, 4, 20, reportMetrics -> {
Assert.assertEquals(2, reportMetrics.getParallelism());
Assert.assertEquals(6, reportMetrics.getInputSequences());
Assert.assertEquals(34, reportMetrics.getInputRows());
// deltas because it depends how much result combining is happening, which is random
Assert.assertEquals(16, reportMetrics.getOutputRows(), 15);
Assert.assertEquals(10, reportMetrics.getTaskCount(), 2);
});
}


Expand All @@ -334,15 +364,29 @@ public void testMultiBatchMultiYield() throws Exception
input.add(nonBlockingSequence(15));
input.add(nonBlockingSequence(26));

assertResult(input, 5, 10);
assertResult(input, 5, 10, reportMetrics -> {
Assert.assertEquals(1, reportMetrics.getParallelism());
Assert.assertEquals(2, reportMetrics.getInputSequences());
Assert.assertEquals(41, reportMetrics.getInputRows());
// deltas because it depends how much result combining is happening, which is random
Assert.assertEquals(21, reportMetrics.getOutputRows(), 20);
Assert.assertEquals(4, reportMetrics.getTaskCount(), 2);
});

// above min sequence count threshold, so will merge in parallel (if enough cores)
input.add(nonBlockingSequence(15));
input.add(nonBlockingSequence(33));
input.add(nonBlockingSequence(17));
input.add(nonBlockingSequence(14));

assertResult(input, 5, 10);
assertResult(input, 5, 10, reportMetrics -> {
Assert.assertEquals(2, reportMetrics.getParallelism());
Assert.assertEquals(6, reportMetrics.getInputSequences());
Assert.assertEquals(120, reportMetrics.getInputRows());
// deltas because it depends how much result combining is happening, which is random
Assert.assertEquals(60, reportMetrics.getOutputRows(), 59);
Assert.assertEquals(10, reportMetrics.getTaskCount(), 5);
});
}

@Test
Expand Down Expand Up @@ -372,14 +416,22 @@ public void testLongerSequencesJustForFun() throws Exception
input.add(nonBlockingSequence(10_000));
input.add(nonBlockingSequence(9_001));

assertResult(input, 128, 1024);
assertResult(input, 128, 1024, reportMetrics -> {
Assert.assertEquals(1, reportMetrics.getParallelism());
Assert.assertEquals(2, reportMetrics.getInputSequences());
Assert.assertEquals(19_001, reportMetrics.getInputRows());
});

input.add(nonBlockingSequence(7_777));
input.add(nonBlockingSequence(8_500));
input.add(nonBlockingSequence(5_000));
input.add(nonBlockingSequence(8_888));

assertResult(input, 128, 1024);
assertResult(input, 128, 1024, reportMetrics -> {
Assert.assertEquals(2, reportMetrics.getParallelism());
Assert.assertEquals(6, reportMetrics.getInputSequences());
Assert.assertEquals(49166, reportMetrics.getInputRows());
});
}

@Test
Expand Down Expand Up @@ -482,12 +534,29 @@ private void assertResult(List<Sequence<IntPair>> sequences) throws InterruptedE
assertResult(
sequences,
ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS,
ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS
ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS,
null
);
}

private void assertResult(List<Sequence<IntPair>> sequences, int batchSize, int yieldAfter)
throws InterruptedException, IOException
{
assertResult(
sequences,
batchSize,
yieldAfter,
null
);
}

private void assertResult(
List<Sequence<IntPair>> sequences,
int batchSize,
int yieldAfter,
Consumer<ParallelMergeCombiningSequence.MergeCombineMetrics> reporter
)
throws InterruptedException, IOException
{
final CombiningSequence<IntPair> combiningSequence = CombiningSequence.create(
new MergeSequence<>(INT_PAIR_ORDERING, Sequences.simple(sequences)),
Expand All @@ -503,10 +572,11 @@ private void assertResult(List<Sequence<IntPair>> sequences, int batchSize, int
true,
5000,
0,
(int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.5),
TEST_POOL_SIZE,
yieldAfter,
batchSize,
ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS
ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS,
reporter
);

Yielder<IntPair> combiningYielder = Yielders.each(combiningSequence);
Expand Down Expand Up @@ -561,10 +631,11 @@ private void assertException(
true,
timeout,
0,
(int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.5),
TEST_POOL_SIZE,
yieldAfter,
batchSize,
ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS
ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS,
null
);

Yielder<IntPair> parallelMergeCombineYielder = Yielders.each(parallelMergeCombineSequence);
Expand Down
Loading