From 4068d3a5b4a091bf9f0f7237783f1f1a6a9c939d Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Tue, 21 Jan 2025 10:29:56 -0800 Subject: [PATCH 1/2] fix large commit test, move large value out of the graph --- .../runners/dataflow/LargeCommitTest.java | 37 ++++++++++++++----- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java index 26f1e60b6773..cca0d36e1872 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java @@ -19,14 +19,18 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import com.google.common.collect.Iterables; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.hamcrest.Matchers; @@ -37,25 +41,38 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class LargeCommitTest { +public class LargeCommitTest implements Serializable { @Rule public transient TestPipeline p = TestPipeline.create(); @Test @Category({ValidatesRunner.class}) public void testLargeCommit() { - // 5 50MB values shuffling to a single key - String value = bigString('a', 50 << 20); - KV kv = KV.of("a", value); PCollection>> result = - p.apply(Create.of(kv, kv, kv, kv, kv)).apply(GroupByKey.create()); + p.apply(Create.of("starterElement")) + .apply( + ParDo.of( + new DoFn>() { + @ProcessElement + public void processElement(ProcessContext c) { + String value = bigString('a', 50 << 20); + KV kv = KV.of("a", value); + // 5 50MB values shuffling to a single key + for (int i = 0; i < 5; i++) { + c.output(kv); + } + } + })) + .apply(GroupByKey.create()); PAssert.that(result) .satisfies( kvs -> { - assertTrue(kvs.iterator().hasNext()); - KV> outputKV = kvs.iterator().next(); - assertFalse(kvs.iterator().hasNext()); + String value = bigString('a', 50 << 20); + List>> outputKVs = new ArrayList<>(); + Iterables.addAll(outputKVs, kvs); + assertEquals(1, outputKVs.size()); + KV> outputKV = outputKVs.get(0); assertEquals("a", outputKV.getKey()); assertThat(outputKV.getValue(), Matchers.contains(value, value, value, value, value)); return null; From b869f72e44c0c9d469e4c3dec9122d6dcb3e34b6 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Wed, 22 Jan 2025 18:31:24 -0800 Subject: [PATCH 2/2] Update runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java Co-authored-by: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> --- .../java/org/apache/beam/runners/dataflow/LargeCommitTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java index cca0d36e1872..950db30d6a42 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java @@ -20,7 +20,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -import com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import java.io.Serializable; import java.util.ArrayList; import java.util.List;