From 303e14d508a45c848bafb227944e8b8a4078efb0 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Thu, 14 Apr 2016 17:23:42 -0700 Subject: [PATCH 1/2] [BEAM-188] Write: apply GlobalWindows first And do not supply a timestamp when outputting. Note that this is safe because the functions in the Writer cannot access the window or timestamp. When we add per-Window or similar functions to the sinks, we will likely do so at a higher level. --- .../java/org/apache/beam/sdk/io/Write.java | 7 +- .../org/apache/beam/sdk/io/WriteTest.java | 95 +++++++++++++++---- 2 files changed, 80 insertions(+), 22 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java index a8a251771fe0..c28b88b3721c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java @@ -154,6 +154,7 @@ public void processElement(ProcessContext c) throws Exception { // There is a dependency between this ParDo and the first (the WriteOperation PCollection // as a side input), so this will happen after the initial ParDo. PCollection results = input + .apply(Window.into(new GlobalWindows())) .apply("WriteBundles", ParDo.of(new DoFn() { // Writer that will write the records in this bundle. Lazily // initialized in processElement. @@ -184,13 +185,11 @@ public void processElement(ProcessContext c) throws Exception { public void finishBundle(Context c) throws Exception { if (writer != null) { WriteT result = writer.close(); - // Output the result of the write. - c.outputWithTimestamp(result, Instant.now()); + c.output(result); } } }).withSideInputs(writeOperationView)) - .setCoder(writeOperation.getWriterResultCoder()) - .apply(Window.into(new GlobalWindows())); + .setCoder(writeOperation.getWriterResultCoder()); final PCollectionView> resultsView = results.apply(View.asIterable()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java index 865dbe17e3e2..af0676ad6f3d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java @@ -35,8 +35,16 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import com.google.common.base.MoreObjects; @@ -54,6 +62,7 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; /** * Tests for the Write PTransform. @@ -61,7 +70,47 @@ @RunWith(JUnit4.class) public class WriteTest { // Static store that can be accessed within the writer - static List sinkContents = new ArrayList<>(); + private static List sinkContents = new ArrayList<>(); + + private static final MapElements IDENTITY_MAP = + MapElements.via(new SimpleFunction() { + @Override + public String apply(String input) { + return input; + } + }); + + private static class WindowAndReshuffle extends PTransform, PCollection> { + private final Window.Bound window; + public WindowAndReshuffle(Window.Bound window) { + this.window = window; + } + + private static class AddArbitraryKey extends DoFn> { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element())); + } + } + + private static class RemoveArbitraryKey extends DoFn>, T> { + @Override + public void processElement(ProcessContext c) throws Exception { + for (T s : c.element().getValue()) { + c.output(s); + } + } + } + + @Override + public PCollection apply(PCollection input) { + return input + .apply(window) + .apply(ParDo.of(new AddArbitraryKey())) + .apply(GroupByKey.create()) + .apply(ParDo.of(new RemoveArbitraryKey())); + } + } /** * Test a Write transform with a PCollection of elements. @@ -70,7 +119,7 @@ public class WriteTest { public void testWrite() { List inputs = Arrays.asList("Critical canary", "Apprehensive eagle", "Intimidating pigeon", "Pedantic gull", "Frisky finch"); - runWrite(inputs, /* not windowed */ false); + runWrite(inputs, IDENTITY_MAP); } /** @@ -79,7 +128,7 @@ public void testWrite() { @Test public void testWriteWithEmptyPCollection() { List inputs = new ArrayList<>(); - runWrite(inputs, /* not windowed */ false); + runWrite(inputs, IDENTITY_MAP); } /** @@ -89,7 +138,21 @@ public void testWriteWithEmptyPCollection() { public void testWriteWindowed() { List inputs = Arrays.asList("Critical canary", "Apprehensive eagle", "Intimidating pigeon", "Pedantic gull", "Frisky finch"); - runWrite(inputs, /* windowed */ true); + runWrite( + inputs, new WindowAndReshuffle(Window.into(FixedWindows.of(Duration.millis(2))))); + } + + /** + * Test a Write with sessions. + */ + @Test + public void testWriteWithSessions() { + List inputs = Arrays.asList("Critical canary", "Apprehensive eagle", + "Intimidating pigeon", "Pedantic gull", "Frisky finch"); + + runWrite( + inputs, + new WindowAndReshuffle(Window.into(Sessions.withGapDuration(Duration.millis(1))))); } /** @@ -97,7 +160,8 @@ public void testWriteWindowed() { * a test sink in the correct order, as well as verifies that the elements of a PCollection are * written to the sink. */ - public void runWrite(List inputs, boolean windowed) { + private static void runWrite( + List inputs, PTransform, PCollection> transform) { // Flag to validate that the pipeline options are passed to the Sink String[] args = {"--testFlag=test_value"}; PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(WriteOptions.class); @@ -106,21 +170,16 @@ public void runWrite(List inputs, boolean windowed) { // Clear the sink's contents. sinkContents.clear(); - // Construct the input PCollection and test Sink. - PCollection input; - if (windowed) { - List timestamps = new ArrayList<>(); - for (long i = 0; i < inputs.size(); i++) { - timestamps.add(i + 1); - } - input = p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of())) - .apply(Window.into(FixedWindows.of(new Duration(2)))); - } else { - input = p.apply(Create.of(inputs).withCoder(StringUtf8Coder.of())); + // Prepare timestamps for the elements. + List timestamps = new ArrayList<>(); + for (long i = 0; i < inputs.size(); i++) { + timestamps.add(i + 1); } - TestSink sink = new TestSink(); - input.apply(Write.to(sink)); + TestSink sink = new TestSink(); + p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of())) + .apply(transform) + .apply(Write.to(sink)); p.run(); assertThat(sinkContents, containsInAnyOrder(inputs.toArray())); From 462edbcff9df35b4c1ca3fe4bef9da33a9958c02 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Thu, 14 Apr 2016 17:40:27 -0700 Subject: [PATCH 2/2] fixup! checkstyle --- sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java index c28b88b3721c..67761cd24ea6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java @@ -35,8 +35,6 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; -import org.joda.time.Instant; - import java.util.UUID; /**