From f2bb6e86bfd71f5cda4f574c6bedfaeff172291a Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 29 Mar 2016 12:28:13 -0700 Subject: [PATCH 1/4] Optimize the Count CombineFn Previously the accumulator was stored as a Long. This uses a singleton long[] to avoid the boxing and unboxing on every increment. This required changing the Coder (the format actually remains the same, but we have no way of declaring that) so is not backwards compatible with reload. --- .../cloud/dataflow/sdk/transforms/Count.java | 72 ++++++++++++++++--- 1 file changed, 61 insertions(+), 11 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java index ffa11d13a3c9..a4ff677d1b21 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java @@ -16,10 +16,22 @@ package com.google.cloud.dataflow.sdk.transforms; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.CoderException; +import com.google.cloud.dataflow.sdk.coders.CoderRegistry; +import com.google.cloud.dataflow.sdk.coders.CustomCoder; import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn; +import com.google.cloud.dataflow.sdk.util.VarInt; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UTFDataFormatException; + + /** * {@code PTransorm}s to count the elements in a {@link PCollection}. * @@ -106,30 +118,68 @@ public void processElement(ProcessContext c) { /** * A {@link CombineFn} that counts elements. */ - private static class CountFn extends CombineFn { + private static class CountFn extends CombineFn { @Override - public Long createAccumulator() { - return 0L; + public long[] createAccumulator() { + return new long[] {0}; } @Override - public Long addInput(Long accumulator, T input) { - return accumulator + 1; + public long[] addInput(long[] accumulator, T input) { + accumulator[0] += 1; + return accumulator; } @Override - public Long mergeAccumulators(Iterable accumulators) { + public long[] mergeAccumulators(Iterable accumulators) { long result = 0L; - for (Long accum : accumulators) { - result += accum; + for (long[] accum : accumulators) { + result += accum[0]; } - return result; + return new long[] {result}; } @Override - public Long extractOutput(Long accumulator) { - return accumulator; + public Long extractOutput(long[] accumulator) { + return accumulator[0]; + } + + @Override + public Coder getAccumulatorCoder(CoderRegistry registry, + Coder inputCoder) { + return new CustomCoder() { + @Override + public void encode(long[] value, OutputStream outStream, Context context) + throws IOException { + VarInt.encode(value[0], outStream); + } + + @Override + public long[] decode(InputStream inStream, Context context) + throws IOException, CoderException { + try { + return new long[] {VarInt.decodeLong(inStream)}; + } catch (EOFException | UTFDataFormatException exn) { + throw new CoderException(exn); + } + } + + @Override + public boolean isRegisterByteSizeObserverCheap(long[] value, Context context) { + return true; + } + + @Override + protected long getEncodedElementByteSize(long[] value, Context context) { + return VarInt.getLength(value[0]); + } + + @Override + public String getEncodingId() { + return "VarLongSingletonArray"; + } + }; } } } From 41abfb5e41f7cfcd6fa57dba475637846a93eb51 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 29 Mar 2016 15:40:31 -0700 Subject: [PATCH 2/4] Add comment about CountFn's accumulator --- .../java/com/google/cloud/dataflow/sdk/transforms/Count.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java index a4ff677d1b21..7afeabcaa2f2 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java @@ -119,6 +119,8 @@ public void processElement(ProcessContext c) { * A {@link CombineFn} that counts elements. */ private static class CountFn extends CombineFn { + // Note that the long[] accumulator always has size 1, used as + // a box for a mutable long. @Override public long[] createAccumulator() { From 2bbc44ee765c7818b832a5f076e7803299f25ec9 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 30 Mar 2016 16:42:52 -0700 Subject: [PATCH 3/4] Update Count.java Optimize for the case of few accumulators. --- .../google/cloud/dataflow/sdk/transforms/Count.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java index 7afeabcaa2f2..1328bf883d1e 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java @@ -135,11 +135,15 @@ public long[] addInput(long[] accumulator, T input) { @Override public long[] mergeAccumulators(Iterable accumulators) { - long result = 0L; - for (long[] accum : accumulators) { - result += accum[0]; + Iterator iter = accumulators.iterator(); + if (!iter.hasNext()) { + return createAccumulator(); } - return new long[] {result}; + long[] running = iter.next(); + while (iter.hasNext()) { + running[0] += iter.next()[0]; + } + return running; } @Override From dd6792bf97b67de6ba275e9970e6b8d195e21783 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 31 Mar 2016 08:00:29 -0700 Subject: [PATCH 4/4] Add Iterator import --- .../java/com/google/cloud/dataflow/sdk/transforms/Count.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java index 1328bf883d1e..5ce4d2e671b0 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java @@ -30,6 +30,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.UTFDataFormatException; +import java.util.Iterator; /**