From d9ce6df3c41dcbc7d884d5b19841475fac330757 Mon Sep 17 00:00:00 2001 From: Sela Date: Thu, 14 Apr 2016 22:01:15 +0300 Subject: [PATCH 1/5] Replace valueInEmptyWindows with valueInGlobalWindow --- .../org/apache/beam/runners/spark/translation/DoFnFunction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java index 674da7302cda..fbc9e98d0f69 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java @@ -75,7 +75,7 @@ private class ProcCtxt extends SparkProcessContext> { @Override public synchronized void output(O o) { outputs.add(windowedValue != null ? windowedValue.withValue(o) : - WindowedValue.valueInEmptyWindows(o)); + WindowedValue.valueInGlobalWindow(o)); } @Override From 2309ad6dbd74333808d6b993b43fe81791a19611 Mon Sep 17 00:00:00 2001 From: Sela Date: Thu, 14 Apr 2016 22:02:20 +0300 Subject: [PATCH 2/5] Replace valueInEmptyWindows with valueInGlobalWindow in Spark Function, and add per-value (non-RDD) windowing functions --- .../spark/translation/WindowingHelpers.java | 38 +++++++++++++++++-- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java index e92b6d166677..ec94f3e991a2 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java @@ -29,8 +29,8 @@ private WindowingHelpers() { } /** - * A function for converting a value to a {@link WindowedValue}. The resulting - * {@link WindowedValue} will be in no windows, and will have the default timestamp + * A Spark function for converting a value to a {@link WindowedValue}. The resulting + * {@link WindowedValue} will be in a global windows, and will have the default timestamp == MIN * and pane. * * @param The type of the object. @@ -40,13 +40,13 @@ public static Function> windowFunction() { return new Function>() { @Override public WindowedValue call(T t) { - return WindowedValue.valueInEmptyWindows(t); + return WindowedValue.valueInGlobalWindow(t); } }; } /** - * A function for extracting the value from a {@link WindowedValue}. + * A Spark function for extracting the value from a {@link WindowedValue}. * * @param The type of the object. * @return A function that accepts a {@link WindowedValue} and returns its value. @@ -59,4 +59,34 @@ public T call(WindowedValue t) { } }; } + + /** + * Same as windowFunction but for non-RDD values - not an RDD transformation! + * + * @param The type of the object. + * @return A function that accepts an object and returns its {@link WindowedValue}. + */ + public static com.google.common.base.Function> windowValueFunction() { + return new com.google.common.base.Function>() { + @Override + public WindowedValue apply(T t) { + return WindowedValue.valueInGlobalWindow(t); + } + }; + } + + /** + * Same as unwindowFunction but for non-RDD values - not an RDD transformation! + * + * @param The type of the object. + * @return A function that accepts an object and returns its {@link WindowedValue}. + */ + public static com.google.common.base.Function, T> unwindowValueFunction() { + return new com.google.common.base.Function, T>() { + @Override + public T apply(WindowedValue t) { + return t.getValue(); + } + }; + } } From 47fdf133da440a9ee8a6a1bc5ea34e59138f7c53 Mon Sep 17 00:00:00 2001 From: Sela Date: Thu, 14 Apr 2016 22:05:14 +0300 Subject: [PATCH 3/5] Materialize PCollection/RDD as windowed values with the appropriate windows. --- .../spark/translation/EvaluationContext.java | 62 +++++++++---------- 1 file changed, 30 insertions(+), 32 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index 78a62aa7aced..531a6ce27582 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -37,6 +37,9 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -76,12 +79,13 @@ public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) { */ private class RDDHolder { - private Iterable values; + private Iterable> windowedValues; private Coder coder; private JavaRDDLike, ?> rdd; RDDHolder(Iterable values, Coder coder) { - this.values = values; + this.windowedValues = + Iterables.transform(values, WindowingHelpers.windowValueFunction()); this.coder = coder; } @@ -91,14 +95,6 @@ private class RDDHolder { JavaRDDLike, ?> getRDD() { if (rdd == null) { - Iterable> windowedValues = Iterables.transform(values, - new Function>() { - @Override - public WindowedValue apply(T t) { - // TODO: this is wrong if T is a TimestampedValue - return WindowedValue.valueInEmptyWindows(t); - } - }); WindowedValue.ValueOnlyWindowedValueCoder windowCoder = WindowedValue.getValueOnlyCoder(coder); rdd = jsc.parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder)) @@ -107,29 +103,31 @@ public WindowedValue apply(T t) { return rdd; } - Iterable getValues(PCollection pcollection) { - if (values == null) { - coder = pcollection.getCoder(); - JavaRDDLike bytesRDD = rdd.map(WindowingHelpers.unwindowFunction()) - .map(CoderHelpers.toByteFunction(coder)); + Iterable> getValues(PCollection pcollection) { + if (windowedValues == null) { + WindowFn windowFn = + pcollection.getWindowingStrategy().getWindowFn(); + Coder windowCoder = windowFn.windowCoder(); + final WindowedValue.WindowedValueCoder windowedValueCoder; + if (windowFn instanceof GlobalWindows) { + windowedValueCoder = + WindowedValue.ValueOnlyWindowedValueCoder.of(pcollection.getCoder()); + } else { + windowedValueCoder = + WindowedValue.FullWindowedValueCoder.of(pcollection.getCoder(), windowCoder); + } + JavaRDDLike bytesRDD = + rdd.map(CoderHelpers.toByteFunction(windowedValueCoder)); List clientBytes = bytesRDD.collect(); - values = Iterables.transform(clientBytes, new Function() { + windowedValues = Iterables.transform(clientBytes, + new Function>() { @Override - public T apply(byte[] bytes) { - return CoderHelpers.fromByteArray(bytes, coder); + public WindowedValue apply(byte[] bytes) { + return CoderHelpers.fromByteArray(bytes, windowedValueCoder); } }); } - return values; - } - - Iterable> getWindowedValues(PCollection pcollection) { - return Iterables.transform(get(pcollection), new Function>() { - @Override - public WindowedValue apply(T t) { - return WindowedValue.valueInEmptyWindows(t); // TODO: not the right place? - } - }); + return windowedValues; } } @@ -264,15 +262,15 @@ public AggregatorValues getAggregatorValues(Aggregator aggregator) @Override public Iterable get(PCollection pcollection) { - @SuppressWarnings("unchecked") - RDDHolder rddHolder = (RDDHolder) pcollections.get(pcollection); - return rddHolder.getValues(pcollection); + @SuppressWarnings("unchecked") RDDHolder rddHolder = (RDDHolder) pcollections.get(pcollection); + Iterable> windowedValues = rddHolder.getValues(pcollection); + return Iterables.transform(windowedValues, WindowingHelpers.unwindowValueFunction()); } Iterable> getWindowedValues(PCollection pcollection) { @SuppressWarnings("unchecked") RDDHolder rddHolder = (RDDHolder) pcollections.get(pcollection); - return rddHolder.getWindowedValues(pcollection); + return rddHolder.getValues(pcollection); } @Override From 06afe790c3e933f7a1a5ed26c3586c22dbaf750f Mon Sep 17 00:00:00 2001 From: Sela Date: Thu, 14 Apr 2016 23:18:24 +0300 Subject: [PATCH 4/5] Add unit test for TextIO output to support the mvn exec:exec example we provide in README --- .../runners/spark/SimpleWordCountTest.java | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java index c413b3fe060f..20b95c1e888d 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java @@ -18,15 +18,18 @@ package org.apache.beam.runners.spark; +import java.io.File; import java.util.Arrays; import java.util.List; import java.util.Set; import java.util.regex.Pattern; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; @@ -37,8 +40,14 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.io.FileUtils; +import org.junit.rules.TemporaryFolder; +import org.junit.Rule; import org.junit.Test; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + public class SimpleWordCountTest { private static final String[] WORDS_ARRAY = { "hi there", "hi", "hi sue bob", @@ -48,7 +57,7 @@ public class SimpleWordCountTest { ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2"); @Test - public void testRun() throws Exception { + public void testInMem() throws Exception { SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); @@ -62,6 +71,29 @@ public void testRun() throws Exception { res.close(); } + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + + @Test + public void testOutputFile() throws Exception { + SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + options.setRunner(SparkPipelineRunner.class); + Pipeline p = Pipeline.create(options); + PCollection inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder + .of()); + PCollection output = inputWords.apply(new CountWords()); + + File outputFile = testFolder.newFile(); + output.apply( + TextIO.Write.named("WriteCounts").to(outputFile.getAbsolutePath()).withoutSharding()); + + EvaluationResult res = SparkPipelineRunner.create().run(p); + res.close(); + + assertThat(Sets.newHashSet(FileUtils.readLines(outputFile)), + containsInAnyOrder(EXPECTED_COUNT_SET.toArray())); + } + /** * A DoFn that tokenizes lines of text into individual words. */ From 51089e52474127e4dba21014ca79e1e8879b98cd Mon Sep 17 00:00:00 2001 From: Sela Date: Thu, 14 Apr 2016 23:58:14 +0300 Subject: [PATCH 5/5] Satisfy checkstyle --- .../beam/runners/spark/translation/EvaluationContext.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index 531a6ce27582..6d49bd35ab6e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -262,7 +262,8 @@ public AggregatorValues getAggregatorValues(Aggregator aggregator) @Override public Iterable get(PCollection pcollection) { - @SuppressWarnings("unchecked") RDDHolder rddHolder = (RDDHolder) pcollections.get(pcollection); + @SuppressWarnings("unchecked") + RDDHolder rddHolder = (RDDHolder) pcollections.get(pcollection); Iterable> windowedValues = rddHolder.getValues(pcollection); return Iterables.transform(windowedValues, WindowingHelpers.unwindowValueFunction()); }