diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java index 26f1e60b6773..950db30d6a42 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java @@ -19,14 +19,18 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.hamcrest.Matchers; @@ -37,25 +41,38 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class LargeCommitTest { +public class LargeCommitTest implements Serializable { @Rule public transient TestPipeline p = TestPipeline.create(); @Test @Category({ValidatesRunner.class}) public void testLargeCommit() { - // 5 50MB values shuffling to a single key - String value = bigString('a', 50 << 20); - KV kv = KV.of("a", value); PCollection>> result = - p.apply(Create.of(kv, kv, kv, kv, kv)).apply(GroupByKey.create()); + p.apply(Create.of("starterElement")) + .apply( + ParDo.of( + new DoFn>() { + @ProcessElement + public void processElement(ProcessContext c) { + String value = bigString('a', 50 << 20); + KV kv = KV.of("a", value); + // 5 50MB values shuffling to a single key + for (int i = 0; i < 5; i++) { + c.output(kv); + } + } + })) + .apply(GroupByKey.create()); PAssert.that(result) .satisfies( kvs -> { - assertTrue(kvs.iterator().hasNext()); - KV> outputKV = kvs.iterator().next(); - assertFalse(kvs.iterator().hasNext()); + String value = bigString('a', 50 << 20); + List>> outputKVs = new ArrayList<>(); + Iterables.addAll(outputKVs, kvs); + assertEquals(1, outputKVs.size()); + KV> outputKV = outputKVs.get(0); assertEquals("a", outputKV.getKey()); assertThat(outputKV.getValue(), Matchers.contains(value, value, value, value, value)); return null;