diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java index 26f1e60b6773..2be9aefec429 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java @@ -22,6 +22,10 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.ValidatesRunner; @@ -44,30 +48,49 @@ public class LargeCommitTest { @Test @Category({ValidatesRunner.class}) public void testLargeCommit() { - // 5 50MB values shuffling to a single key - String value = bigString('a', 50 << 20); - KV kv = KV.of("a", value); + // Prepare 5 values, each ~50MB, to test large commit sizes (~250MB total) + final int valueSizeMB = 50; + final int numValues = 5; + String largeValue = createLargeString('a', valueSizeMB << 20); + List> inputValues = generateKeyValuePairs("key", largeValue, numValues); + PCollection>> result = - p.apply(Create.of(kv, kv, kv, kv, kv)).apply(GroupByKey.create()); + p.apply(Create.of(inputValues)) + .apply(GroupByKey.create()); // Group all large values by a single key PAssert.that(result) .satisfies( kvs -> { - assertTrue(kvs.iterator().hasNext()); - KV> outputKV = kvs.iterator().next(); - assertFalse(kvs.iterator().hasNext()); - assertEquals("a", outputKV.getKey()); - assertThat(outputKV.getValue(), Matchers.contains(value, value, value, value, value)); + // Validate that only one key exists + Iterator>> iterator = kvs.iterator(); + assertTrue(iterator.hasNext()); + KV> outputKV = iterator.next(); + assertFalse(iterator.hasNext()); // Ensure no additional keys exist + + // Validate key and values + assertEquals("key", outputKV.getKey()); + assertThat( + outputKV.getValue(), + Matchers.contains(largeValue, largeValue, largeValue, largeValue, largeValue)); return null; }); p.run(); } - private static String bigString(char c, int size) { - char[] buf = new char[size]; - for (int i = 0; i < size; i++) { - buf[i] = c; + /** Creates a large string of specified {@param size}. */ + private static String createLargeString(char c, int size) { + char[] buffer = new char[size]; + Arrays.fill(buffer, c); + return new String(buffer); + } + + /** Generates {@param count} KV pairs with a single {@param key} and {@param value}. */ + private static List> generateKeyValuePairs( + String key, String value, int count) { + List> keyValuePairs = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + keyValuePairs.add(KV.of(key, value)); } - return new String(buf); + return keyValuePairs; } }