From 2300301caa068eca303533403e54a002c85d0de5 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Fri, 12 Aug 2016 13:25:50 -0700 Subject: [PATCH 1/2] Add GatherAllPanes PTransform This PTransform Reifies and gathers all panes produced for each window, outputting a single pane per window at the time the window is Garbage Collected. For use in DataflowAssert, to match over the final contents of each window. --- .../dataflow/sdk/util/GatherAllPanes.java | 74 ++++++++++ .../dataflow/sdk/util/GatherAllPanesTest.java | 129 ++++++++++++++++++ 2 files changed, 203 insertions(+) create mode 100644 sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GatherAllPanes.java create mode 100644 sdk/src/test/java/com/google/cloud/dataflow/sdk/util/GatherAllPanesTest.java diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GatherAllPanes.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GatherAllPanes.java new file mode 100644 index 0000000000..ea178f82e5 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GatherAllPanes.java @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.util; + +import com.google.cloud.dataflow.sdk.transforms.GroupByKey; +import com.google.cloud.dataflow.sdk.transforms.GroupByKey.ReifyTimestampsAndWindows; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.Values; +import com.google.cloud.dataflow.sdk.transforms.WithKeys; +import com.google.cloud.dataflow.sdk.transforms.windowing.Never; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TypeDescriptor; + +/** + * Gathers all panes of each window into exactly one output. + * + *

+ * Note that this will delay the output of a window until the garbage collection time (when the + * watermark passes the end of the window plus allowed lateness) even if the upstream triggers + * closed the window earlier. + */ +public class GatherAllPanes + extends PTransform, PCollection>>> { + /** + * Gathers all panes of each window into a single output element. + * + *

+ * This will gather all output panes into a single element, which causes them to be colocated on a + * single worker. As a result, this is only suitable for {@link PCollection PCollections} where + * all of the output elements for each pane fit in memory, such as in tests. + */ + public static GatherAllPanes globally() { + return new GatherAllPanes<>(); + } + + private GatherAllPanes() {} + + @Override + public PCollection>> apply(PCollection input) { + WindowFn originalWindowFn = input.getWindowingStrategy().getWindowFn(); + + return input + .apply(WithKeys.of(0).withKeyType(new TypeDescriptor() {})) + .apply(new ReifyTimestampsAndWindows()) + .apply( + Window.into( + new IdentityWindowFn>>( + originalWindowFn.windowCoder(), originalWindowFn.assignsToSingleWindow())) + .triggering(Never.ever()) + .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness()) + .discardingFiredPanes()) + // all values have the same key so they all appear as a single output element + .apply(GroupByKey.>create()) + .apply(Values.>>create()) + .setWindowingStrategyInternal(input.getWindowingStrategy()); + } +} + diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/GatherAllPanesTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/GatherAllPanesTest.java new file mode 100644 index 0000000000..37bf04d469 --- /dev/null +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/GatherAllPanesTest.java @@ -0,0 +1,129 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.util; + +import static org.junit.Assert.fail; + +import com.google.cloud.dataflow.sdk.io.CountingInput; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.GroupByKey; +import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; +import com.google.cloud.dataflow.sdk.transforms.Values; +import com.google.cloud.dataflow.sdk.transforms.WithKeys; +import com.google.cloud.dataflow.sdk.transforms.WithTimestamps; +import com.google.cloud.dataflow.sdk.transforms.windowing.AfterPane; +import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TypeDescriptor; +import com.google.common.collect.Iterables; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; + +/** + * Tests for {@link GatherAllPanes}. + */ +@RunWith(JUnit4.class) +public class GatherAllPanesTest implements Serializable { + @Test + public void singlePaneSingleReifiedPane() { + TestPipeline p = TestPipeline.create(); + PCollection>>> accumulatedPanes = + p.apply(CountingInput.upTo(20000)) + .apply( + WithTimestamps.of( + new SerializableFunction() { + @Override + public Instant apply(Long input) { + return new Instant(input * 10); + } + })) + .apply( + Window.into(FixedWindows.of(Duration.standardMinutes(1))) + .triggering(AfterWatermark.pastEndOfWindow()) + .withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()) + .apply(WithKeys.of((Void) null).withKeyType(new TypeDescriptor() {})) + .apply(GroupByKey.create()) + .apply(Values.>create()) + .apply(GatherAllPanes.>globally()); + + DataflowAssert.that(accumulatedPanes) + .satisfies( + new SerializableFunction>>>, Void>() { + @Override + public Void apply(Iterable>>> input) { + for (Iterable>> windowedInput : input) { + if (Iterables.size(windowedInput) > 1) { + fail("Expected all windows to have exactly one pane, got " + windowedInput); + return null; + } + } + return null; + } + }); + } + + @Test + public void multiplePanesMultipleReifiedPane() { + TestPipeline p = TestPipeline.create(); + + PCollection>>> accumulatedPanes = + p.apply(CountingInput.upTo(20000)) + .apply( + WithTimestamps.of( + new SerializableFunction() { + @Override + public Instant apply(Long input) { + return new Instant(input * 10); + } + })) + .apply( + Window.into(FixedWindows.of(Duration.standardMinutes(1))) + .triggering( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(AfterPane.elementCountAtLeast(1))) + .withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()) + .apply(WithKeys.of((Void) null).withKeyType(new TypeDescriptor() {})) + .apply(GroupByKey.create()) + .apply(Values.>create()) + .apply(GatherAllPanes.>globally()); + + DataflowAssert.that(accumulatedPanes) + .satisfies( + new SerializableFunction>>>, Void>() { + @Override + public Void apply(Iterable>>> input) { + for (Iterable>> windowedInput : input) { + if (Iterables.size(windowedInput) > 1) { + return null; + } + } + fail("Expected at least one window to have multiple panes"); + return null; + } + }); + } +} From e6c5d3bb0790c3618cabbc9bf673cc7469e04fc8 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 17 Aug 2016 15:04:30 -0700 Subject: [PATCH 2/2] fixup! Add GatherAllPanes PTransform --- .../google/cloud/dataflow/sdk/util/GatherAllPanes.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GatherAllPanes.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GatherAllPanes.java index ea178f82e5..1d675e3487 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GatherAllPanes.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GatherAllPanes.java @@ -30,8 +30,7 @@ /** * Gathers all panes of each window into exactly one output. * - *

- * Note that this will delay the output of a window until the garbage collection time (when the + *

Note that this will delay the output of a window until the garbage collection time (when the * watermark passes the end of the window plus allowed lateness) even if the upstream triggers * closed the window earlier. */ @@ -40,10 +39,9 @@ public class GatherAllPanes /** * Gathers all panes of each window into a single output element. * - *

- * This will gather all output panes into a single element, which causes them to be colocated on a - * single worker. As a result, this is only suitable for {@link PCollection PCollections} where - * all of the output elements for each pane fit in memory, such as in tests. + *

This will gather all output panes into a single element, which causes them to be colocated + * on a single worker. As a result, this is only suitable for {@link PCollection PCollections} + * where all of the output elements for each pane fit in memory, such as in tests. */ public static GatherAllPanes globally() { return new GatherAllPanes<>();