diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
new file mode 100644
index 000000000000..958d7102bb0c
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * Gathers all panes of each window into exactly one output.
+ *
+ *
+ * Note that this will delay the output of a window until the garbage collection time (when the
+ * watermark passes the end of the window plus allowed lateness) even if the upstream triggers
+ * closed the window earlier.
+ */
+public class GatherAllPanes
+ extends PTransform, PCollection>>> {
+ /**
+ * Gathers all panes of each window into a single output element.
+ *
+ *
+ * This will gather all output panes into a single element, which causes them to be colocated on a
+ * single worker. As a result, this is only suitable for {@link PCollection PCollections} where
+ * all of the output elements for each pane fit in memory, such as in tests.
+ */
+ public static GatherAllPanes globally() {
+ return new GatherAllPanes<>();
+ }
+
+ private GatherAllPanes() {}
+
+ @Override
+ public PCollection>> apply(PCollection input) {
+ WindowFn, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();
+
+ return input
+ .apply(WithKeys.of((Void) null).withKeyType(new TypeDescriptor() {}))
+ .apply(new ReifyTimestampsAndWindows())
+ .apply(
+ Window.into(
+ new IdentityWindowFn>>(
+ originalWindowFn.windowCoder(),
+ input.getWindowingStrategy().getWindowFn().assignsToSingleWindow()))
+ .triggering(Never.ever()))
+ // all values have the same key so they all appear as a single output element
+ .apply(GroupByKey.>create())
+ .apply(Values.>>create())
+ .setWindowingStrategyInternal(input.getWindowingStrategy());
+ }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
index d77dd2d651b1..65fc52d0a548 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
@@ -126,8 +126,8 @@ public Coder>> getDefaultOutputCoder(PCollection> inp
}
/**
- * Helper transform that makes timestamps and window assignments
- * explicit in the value part of each key/value pair.
+ * Helper transform that makes timestamps and window assignments explicit in the value part of
+ * each key/value pair.
*/
public static class ReifyTimestampsAndWindows
extends PTransform>, PCollection>>> {
@@ -137,7 +137,8 @@ public PCollection>> apply(PCollection> input) {
// The requirement to use a KvCoder *is* actually a model-level requirement, not specific
// to this implementation of GBK. All runners need a way to get the key.
- checkArgument(input.getCoder() instanceof KvCoder,
+ checkArgument(
+ input.getCoder() instanceof KvCoder,
"%s requires its input to use a %s",
GroupByKey.class.getSimpleName(),
KvCoder.class.getSimpleName());
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java
new file mode 100644
index 000000000000..553d589175a1
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.fail;
+
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.WithTimestamps;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+import com.google.common.collect.Iterables;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+
+/**
+ * Tests for {@link GatherAllPanes}.
+ */
+@RunWith(JUnit4.class)
+public class GatherAllPanesTest implements Serializable {
+ @Test
+ public void singlePaneSingleReifiedPane() {
+ TestPipeline p = TestPipeline.create();
+ PCollection>>> accumulatedPanes =
+ p.apply(CountingInput.upTo(20000))
+ .apply(
+ WithTimestamps.of(
+ new SerializableFunction() {
+ @Override
+ public Instant apply(Long input) {
+ return new Instant(input * 10);
+ }
+ }))
+ .apply(
+ Window.into(FixedWindows.of(Duration.standardMinutes(1)))
+ .triggering(AfterWatermark.pastEndOfWindow())
+ .withAllowedLateness(Duration.ZERO)
+ .discardingFiredPanes())
+ .apply(WithKeys.of((Void) null).withKeyType(new TypeDescriptor() {}))
+ .apply(GroupByKey.create())
+ .apply(Values.>create())
+ .apply(GatherAllPanes.>globally());
+
+ PAssert.that(accumulatedPanes)
+ .satisfies(
+ new SerializableFunction>>>, Void>() {
+ @Override
+ public Void apply(Iterable>>> input) {
+ for (Iterable>> windowedInput : input) {
+ if (Iterables.size(windowedInput) > 1) {
+ fail("Expected all windows to have exactly one pane, got " + windowedInput);
+ return null;
+ }
+ }
+ return null;
+ }
+ });
+ }
+
+ @Test
+ public void multiplePanesMultipleReifiedPane() {
+ TestPipeline p = TestPipeline.create();
+
+ PCollection>>> accumulatedPanes =
+ p.apply(CountingInput.upTo(20000))
+ .apply(
+ WithTimestamps.of(
+ new SerializableFunction() {
+ @Override
+ public Instant apply(Long input) {
+ return new Instant(input * 10);
+ }
+ }))
+ .apply(
+ Window.into(FixedWindows.of(Duration.standardMinutes(1)))
+ .triggering(
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(AfterPane.elementCountAtLeast(1)))
+ .withAllowedLateness(Duration.ZERO)
+ .discardingFiredPanes())
+ .apply(WithKeys.of((Void) null).withKeyType(new TypeDescriptor() {}))
+ .apply(GroupByKey.create())
+ .apply(Values.>create())
+ .apply(GatherAllPanes.>globally());
+
+ PAssert.that(accumulatedPanes)
+ .satisfies(
+ new SerializableFunction>>>, Void>() {
+ @Override
+ public Void apply(Iterable>>> input) {
+ for (Iterable>> windowedInput : input) {
+ if (Iterables.size(windowedInput) > 1) {
+ return null;
+ }
+ }
+ fail("Expected at least one window to have multiple panes");
+ return null;
+ }
+ });
+ }
+}