From 3cda11a26d0d568696866991e8245d587b21a149 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 30 Mar 2016 17:07:53 -0700 Subject: [PATCH 1/2] Add GatherAllPanes PTransform This PTransform Reifies and gathers all panes produced for each window, outputting a single pane per window at the time the window is Garbage Collected. For use in PAssert, to match over the final contents of each window. --- .../apache/beam/sdk/util/GatherAllPanes.java | 90 ++++++++++++ .../beam/sdk/util/GatherAllPanesTest.java | 132 ++++++++++++++++++ 2 files changed, 222 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java new file mode 100644 index 000000000000..2de40ddbf5ae --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.Never; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * Accumulates all panes that are output to the input {@link PCollection}, reifying the contents of + * those panes and emitting exactly one output pane. + */ +public class GatherAllPanes + extends PTransform, PCollection>>> { + public static GatherAllPanes create() { + return new GatherAllPanes<>(); + } + + private GatherAllPanes() {} + + private static class ReifiyWindowAndPaneDoFn extends DoFn> + implements RequiresWindowAccess { + @Override + public void processElement(DoFn>.ProcessContext c) + throws Exception { + c.output(WindowedValue.of(c.element(), c.timestamp(), c.window(), c.pane())); + } + } + + @Override + public PCollection>> apply(PCollection input) { + WindowFn originalWindowFn = input.getWindowingStrategy().getWindowFn(); + + PCollection> reifiedWindows = input.apply(new ReifiyWindowedValue()); + return reifiedWindows + .apply( + WithKeys.>of((Void) null) + .withKeyType(new TypeDescriptor() {})) + .apply( + Window.into( + new IdentityWindowFn>>( + originalWindowFn.windowCoder(), + input.getWindowingStrategy().getWindowFn().assignsToSingleWindow())) + .triggering(Never.ever())) + .apply(GroupByKey.>create()) + .apply(Values.>>create()) + .setWindowingStrategyInternal(input.getWindowingStrategy()); + } + + private static class ReifiyWindowedValue + extends PTransform, PCollection>> { + @Override + public PCollection> apply(PCollection input) { + Coder> windowedValueCoder = + FullWindowedValueCoder.of( + input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()); + + return input + .apply("ReifiyWindowedValues", ParDo.of(new ReifiyWindowAndPaneDoFn())) + .setCoder(windowedValueCoder); + } + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java new file mode 100644 index 000000000000..d2285747f472 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.fail; + +import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.WithTimestamps; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; + +import com.google.common.collect.Iterables; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; + +/** + * Tests for {@link GatherAllPanes}. + */ +@RunWith(JUnit4.class) +public class GatherAllPanesTest implements Serializable { + @Test + public void singlePaneSingleReifiedPane() { + TestPipeline p = TestPipeline.create(); + PCollection>>> accumulatedPanes = + p.apply(CountingInput.upTo(20000)) + .apply( + WithTimestamps.of( + new SerializableFunction() { + @Override + public Instant apply(Long input) { + return new Instant(input * 10); + } + })) + .apply( + Window.into(FixedWindows.of(Duration.standardMinutes(1))) + .triggering(AfterWatermark.pastEndOfWindow()) + .withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()) + .apply(WithKeys.of((Void) null).withKeyType(new TypeDescriptor() {})) + .apply(GroupByKey.create()) + .apply(Values.>create()) + .apply(GatherAllPanes.>create()); + + PAssert.that(accumulatedPanes) + .satisfies( + new SerializableFunction>>>, Void>() { + @Override + public Void apply(Iterable>>> input) { + for (Iterable>> windowedInput : input) { + if (Iterables.size(windowedInput) > 1) { + fail("Expected all windows to have exactly one pane, got " + windowedInput); + return null; + } + } + return null; + } + }); + } + + @Test + public void multiplePanesMultipleReifiedPane() { + TestPipeline p = TestPipeline.create(); + + PCollection>>> accumulatedPanes = + p.apply(CountingInput.upTo(20000)) + .apply( + WithTimestamps.of( + new SerializableFunction() { + @Override + public Instant apply(Long input) { + return new Instant(input * 10); + } + })) + .apply( + Window.into(FixedWindows.of(Duration.standardMinutes(1))) + .triggering( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(AfterPane.elementCountAtLeast(1))) + .withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()) + .apply(WithKeys.of((Void) null).withKeyType(new TypeDescriptor() {})) + .apply(GroupByKey.create()) + .apply(Values.>create()) + .apply(GatherAllPanes.>create()); + + PAssert.that(accumulatedPanes) + .satisfies( + new SerializableFunction>>>, Void>() { + @Override + public Void apply(Iterable>>> input) { + for (Iterable>> windowedInput : input) { + if (Iterables.size(windowedInput) > 1) { + return null; + } + } + fail("Expected at least one window to have multiple panes"); + return null; + } + }); + } +} From 34f1cb2fa0b671f1d50cf6cd7a55fd0d97f58975 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Thu, 14 Apr 2016 18:01:16 -0700 Subject: [PATCH 2/2] fixup! Add GatherAllPanes PTransform --- .../apache/beam/sdk/util/GatherAllPanes.java | 56 +++++++------------ .../sdk/util/GroupByKeyViaGroupByKeyOnly.java | 7 ++- .../beam/sdk/util/GatherAllPanesTest.java | 4 +- 3 files changed, 26 insertions(+), 41 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java index 2de40ddbf5ae..958d7102bb0c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java @@ -17,74 +17,58 @@ */ package org.apache.beam.sdk.util; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.windowing.Never; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; +import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; /** - * Accumulates all panes that are output to the input {@link PCollection}, reifying the contents of - * those panes and emitting exactly one output pane. + * Gathers all panes of each window into exactly one output. + * + *

+ * Note that this will delay the output of a window until the garbage collection time (when the + * watermark passes the end of the window plus allowed lateness) even if the upstream triggers + * closed the window earlier. */ public class GatherAllPanes extends PTransform, PCollection>>> { - public static GatherAllPanes create() { + /** + * Gathers all panes of each window into a single output element. + * + *

+ * This will gather all output panes into a single element, which causes them to be colocated on a + * single worker. As a result, this is only suitable for {@link PCollection PCollections} where + * all of the output elements for each pane fit in memory, such as in tests. + */ + public static GatherAllPanes globally() { return new GatherAllPanes<>(); } private GatherAllPanes() {} - private static class ReifiyWindowAndPaneDoFn extends DoFn> - implements RequiresWindowAccess { - @Override - public void processElement(DoFn>.ProcessContext c) - throws Exception { - c.output(WindowedValue.of(c.element(), c.timestamp(), c.window(), c.pane())); - } - } - @Override public PCollection>> apply(PCollection input) { WindowFn originalWindowFn = input.getWindowingStrategy().getWindowFn(); - PCollection> reifiedWindows = input.apply(new ReifiyWindowedValue()); - return reifiedWindows - .apply( - WithKeys.>of((Void) null) - .withKeyType(new TypeDescriptor() {})) + return input + .apply(WithKeys.of((Void) null).withKeyType(new TypeDescriptor() {})) + .apply(new ReifyTimestampsAndWindows()) .apply( Window.into( new IdentityWindowFn>>( originalWindowFn.windowCoder(), input.getWindowingStrategy().getWindowFn().assignsToSingleWindow())) .triggering(Never.ever())) + // all values have the same key so they all appear as a single output element .apply(GroupByKey.>create()) .apply(Values.>>create()) .setWindowingStrategyInternal(input.getWindowingStrategy()); } - - private static class ReifiyWindowedValue - extends PTransform, PCollection>> { - @Override - public PCollection> apply(PCollection input) { - Coder> windowedValueCoder = - FullWindowedValueCoder.of( - input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()); - - return input - .apply("ReifiyWindowedValues", ParDo.of(new ReifiyWindowAndPaneDoFn())) - .setCoder(windowedValueCoder); - } - } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java index d77dd2d651b1..65fc52d0a548 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java @@ -126,8 +126,8 @@ public Coder>> getDefaultOutputCoder(PCollection> inp } /** - * Helper transform that makes timestamps and window assignments - * explicit in the value part of each key/value pair. + * Helper transform that makes timestamps and window assignments explicit in the value part of + * each key/value pair. */ public static class ReifyTimestampsAndWindows extends PTransform>, PCollection>>> { @@ -137,7 +137,8 @@ public PCollection>> apply(PCollection> input) { // The requirement to use a KvCoder *is* actually a model-level requirement, not specific // to this implementation of GBK. All runners need a way to get the key. - checkArgument(input.getCoder() instanceof KvCoder, + checkArgument( + input.getCoder() instanceof KvCoder, "%s requires its input to use a %s", GroupByKey.class.getSimpleName(), KvCoder.class.getSimpleName()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java index d2285747f472..553d589175a1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java @@ -70,7 +70,7 @@ public Instant apply(Long input) { .apply(WithKeys.of((Void) null).withKeyType(new TypeDescriptor() {})) .apply(GroupByKey.create()) .apply(Values.>create()) - .apply(GatherAllPanes.>create()); + .apply(GatherAllPanes.>globally()); PAssert.that(accumulatedPanes) .satisfies( @@ -112,7 +112,7 @@ public Instant apply(Long input) { .apply(WithKeys.of((Void) null).withKeyType(new TypeDescriptor() {})) .apply(GroupByKey.create()) .apply(Values.>create()) - .apply(GatherAllPanes.>create()); + .apply(GatherAllPanes.>globally()); PAssert.that(accumulatedPanes) .satisfies(