Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
Expand All @@ -44,30 +48,49 @@ public class LargeCommitTest {
@Test
@Category({ValidatesRunner.class})
public void testLargeCommit() {
// 5 50MB values shuffling to a single key
String value = bigString('a', 50 << 20);
KV<String, String> kv = KV.of("a", value);
// Prepare 5 values, each ~50MB, to test large commit sizes (~250MB total)
final int valueSizeMB = 50;
final int numValues = 5;
String largeValue = createLargeString('a', valueSizeMB << 20);
List<KV<String, String>> inputValues = generateKeyValuePairs("key", largeValue, numValues);

PCollection<KV<String, Iterable<String>>> result =
p.apply(Create.of(kv, kv, kv, kv, kv)).apply(GroupByKey.create());
p.apply(Create.of(inputValues))
.apply(GroupByKey.create()); // Group all large values by a single key

PAssert.that(result)
.satisfies(
kvs -> {
assertTrue(kvs.iterator().hasNext());
KV<String, Iterable<String>> outputKV = kvs.iterator().next();
assertFalse(kvs.iterator().hasNext());
assertEquals("a", outputKV.getKey());
assertThat(outputKV.getValue(), Matchers.contains(value, value, value, value, value));
// Validate that only one key exists
Iterator<KV<String, Iterable<String>>> iterator = kvs.iterator();
assertTrue(iterator.hasNext());
KV<String, Iterable<String>> outputKV = iterator.next();
assertFalse(iterator.hasNext()); // Ensure no additional keys exist

// Validate key and values
assertEquals("key", outputKV.getKey());
assertThat(
outputKV.getValue(),
Matchers.contains(largeValue, largeValue, largeValue, largeValue, largeValue));
return null;
});
p.run();
}

private static String bigString(char c, int size) {
char[] buf = new char[size];
for (int i = 0; i < size; i++) {
buf[i] = c;
/** Creates a large string of specified {@param size}. */
private static String createLargeString(char c, int size) {
char[] buffer = new char[size];
Arrays.fill(buffer, c);
return new String(buffer);
}

/** Generates {@param count} KV pairs with a single {@param key} and {@param value}. */
private static List<KV<String, String>> generateKeyValuePairs(
String key, String value, int count) {
List<KV<String, String>> keyValuePairs = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
keyValuePairs.add(KV.of(key, value));
}
return new String(buf);
return keyValuePairs;
}
}
Loading