diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GatherAllPanes.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GatherAllPanes.java new file mode 100644 index 0000000000..1d675e3487 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GatherAllPanes.java @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.util; + +import com.google.cloud.dataflow.sdk.transforms.GroupByKey; +import com.google.cloud.dataflow.sdk.transforms.GroupByKey.ReifyTimestampsAndWindows; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.Values; +import com.google.cloud.dataflow.sdk.transforms.WithKeys; +import com.google.cloud.dataflow.sdk.transforms.windowing.Never; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TypeDescriptor; + +/** + * Gathers all panes of each window into exactly one output. + * + *

Note that this will delay the output of a window until the garbage collection time (when the + * watermark passes the end of the window plus allowed lateness) even if the upstream triggers + * closed the window earlier. + */ +public class GatherAllPanes + extends PTransform, PCollection>>> { + /** + * Gathers all panes of each window into a single output element. + * + *

This will gather all output panes into a single element, which causes them to be colocated + * on a single worker. As a result, this is only suitable for {@link PCollection PCollections} + * where all of the output elements for each pane fit in memory, such as in tests. + */ + public static GatherAllPanes globally() { + return new GatherAllPanes<>(); + } + + private GatherAllPanes() {} + + @Override + public PCollection>> apply(PCollection input) { + WindowFn originalWindowFn = input.getWindowingStrategy().getWindowFn(); + + return input + .apply(WithKeys.of(0).withKeyType(new TypeDescriptor() {})) + .apply(new ReifyTimestampsAndWindows()) + .apply( + Window.into( + new IdentityWindowFn>>( + originalWindowFn.windowCoder(), originalWindowFn.assignsToSingleWindow())) + .triggering(Never.ever()) + .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness()) + .discardingFiredPanes()) + // all values have the same key so they all appear as a single output element + .apply(GroupByKey.>create()) + .apply(Values.>>create()) + .setWindowingStrategyInternal(input.getWindowingStrategy()); + } +} + diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/GatherAllPanesTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/GatherAllPanesTest.java new file mode 100644 index 0000000000..37bf04d469 --- /dev/null +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/GatherAllPanesTest.java @@ -0,0 +1,129 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.util; + +import static org.junit.Assert.fail; + +import com.google.cloud.dataflow.sdk.io.CountingInput; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.GroupByKey; +import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; +import com.google.cloud.dataflow.sdk.transforms.Values; +import com.google.cloud.dataflow.sdk.transforms.WithKeys; +import com.google.cloud.dataflow.sdk.transforms.WithTimestamps; +import com.google.cloud.dataflow.sdk.transforms.windowing.AfterPane; +import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TypeDescriptor; +import com.google.common.collect.Iterables; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; + +/** + * Tests for {@link GatherAllPanes}. + */ +@RunWith(JUnit4.class) +public class GatherAllPanesTest implements Serializable { + @Test + public void singlePaneSingleReifiedPane() { + TestPipeline p = TestPipeline.create(); + PCollection>>> accumulatedPanes = + p.apply(CountingInput.upTo(20000)) + .apply( + WithTimestamps.of( + new SerializableFunction() { + @Override + public Instant apply(Long input) { + return new Instant(input * 10); + } + })) + .apply( + Window.into(FixedWindows.of(Duration.standardMinutes(1))) + .triggering(AfterWatermark.pastEndOfWindow()) + .withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()) + .apply(WithKeys.of((Void) null).withKeyType(new TypeDescriptor() {})) + .apply(GroupByKey.create()) + .apply(Values.>create()) + .apply(GatherAllPanes.>globally()); + + DataflowAssert.that(accumulatedPanes) + .satisfies( + new SerializableFunction>>>, Void>() { + @Override + public Void apply(Iterable>>> input) { + for (Iterable>> windowedInput : input) { + if (Iterables.size(windowedInput) > 1) { + fail("Expected all windows to have exactly one pane, got " + windowedInput); + return null; + } + } + return null; + } + }); + } + + @Test + public void multiplePanesMultipleReifiedPane() { + TestPipeline p = TestPipeline.create(); + + PCollection>>> accumulatedPanes = + p.apply(CountingInput.upTo(20000)) + .apply( + WithTimestamps.of( + new SerializableFunction() { + @Override + public Instant apply(Long input) { + return new Instant(input * 10); + } + })) + .apply( + Window.into(FixedWindows.of(Duration.standardMinutes(1))) + .triggering( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(AfterPane.elementCountAtLeast(1))) + .withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()) + .apply(WithKeys.of((Void) null).withKeyType(new TypeDescriptor() {})) + .apply(GroupByKey.create()) + .apply(Values.>create()) + .apply(GatherAllPanes.>globally()); + + DataflowAssert.that(accumulatedPanes) + .satisfies( + new SerializableFunction>>>, Void>() { + @Override + public Void apply(Iterable>>> input) { + for (Iterable>> windowedInput : input) { + if (Iterables.size(windowedInput) > 1) { + return null; + } + } + fail("Expected at least one window to have multiple panes"); + return null; + } + }); + } +}