From 4c4c6d3b249fbcfaddf18c104da16056979f8e4d Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Tue, 5 Sep 2023 14:06:14 -0400 Subject: [PATCH 01/10] file loads streaming integration tests --- .../sdk/io/gcp/bigquery/BigQueryUtils.java | 4 + .../io/gcp/bigquery/BigQueryUtilsTest.java | 27 +- .../io/gcp/bigquery/FileLoadsStreamingIT.java | 490 ++++++++++++++++++ 3 files changed, 516 insertions(+), 5 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 0063952d8b13..00ee815c3c93 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -689,6 +689,10 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso } } + if (jsonBQValue instanceof byte[] && fieldType.getTypeName() == TypeName.BYTES) { + return jsonBQValue; + } + if (jsonBQValue instanceof List) { if (fieldType.getCollectionElementType() == null) { throw new IllegalArgumentException( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java index 9bff77a16588..f4074cc1a556 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java @@ -87,6 +87,7 @@ public class BigQueryUtilsTest { .addNullableField("time0s_0ns", Schema.FieldType.logicalType(SqlTypes.TIME)) .addNullableField("valid", Schema.FieldType.BOOLEAN) .addNullableField("binary", Schema.FieldType.BYTES) + .addNullableField("raw_bytes", Schema.FieldType.BYTES) .addNullableField("numeric", Schema.FieldType.DECIMAL) .addNullableField("boolean", Schema.FieldType.BOOLEAN) .addNullableField("long", Schema.FieldType.INT64) @@ -188,6 +189,9 @@ public class BigQueryUtilsTest { private static final TableFieldSchema BINARY = new TableFieldSchema().setName("binary").setType(StandardSQLTypeName.BYTES.toString()); + private static final TableFieldSchema RAW_BYTES = + new TableFieldSchema().setName("raw_bytes").setType(StandardSQLTypeName.BYTES.toString()); + private static final TableFieldSchema NUMERIC = new TableFieldSchema().setName("numeric").setType(StandardSQLTypeName.NUMERIC.toString()); @@ -246,6 +250,7 @@ public class BigQueryUtilsTest { TIME_0S_0NS, VALID, BINARY, + RAW_BYTES, NUMERIC, BOOLEAN, LONG, @@ -276,6 +281,7 @@ public class BigQueryUtilsTest { TIME_0S_0NS, VALID, BINARY, + RAW_BYTES, NUMERIC, BOOLEAN, LONG, @@ -316,6 +322,7 @@ public class BigQueryUtilsTest { LocalTime.parse("12:34"), false, Base64.getDecoder().decode("ABCD1234"), + Base64.getDecoder().decode("ABCD1234"), new BigDecimal("123.456").setScale(3, RoundingMode.HALF_UP), true, 123L, @@ -346,6 +353,7 @@ public class BigQueryUtilsTest { .set("time0s_0ns", "12:34:00") .set("valid", "false") .set("binary", "ABCD1234") + .set("raw_bytes", Base64.getDecoder().decode("ABCD1234")) .set("numeric", "123.456") .set("boolean", true) .set("long", 123L) @@ -355,7 +363,7 @@ public class BigQueryUtilsTest { Row.withSchema(FLAT_TYPE) .addValues( null, null, null, null, null, null, null, null, null, null, null, null, null, null, - null, null, null, null, null, null, null, null) + null, null, null, null, null, null, null, null, null) .build(); private static final TableRow BQ_NULL_FLAT_ROW = @@ -378,6 +386,7 @@ public class BigQueryUtilsTest { .set("time0s_0ns", null) .set("valid", null) .set("binary", null) + .set("raw_bytes", null) .set("numeric", null) .set("boolean", null) .set("long", null) @@ -457,6 +466,7 @@ public class BigQueryUtilsTest { TIME_0S_0NS, VALID, BINARY, + RAW_BYTES, NUMERIC, BOOLEAN, LONG, @@ -512,6 +522,7 @@ public void testToTableSchema_flat() { TIME_0S_0NS, VALID, BINARY, + RAW_BYTES, NUMERIC, BOOLEAN, LONG, @@ -562,6 +573,7 @@ public void testToTableSchema_row() { TIME_0S_0NS, VALID, BINARY, + RAW_BYTES, NUMERIC, BOOLEAN, LONG, @@ -598,6 +610,7 @@ public void testToTableSchema_array_row() { TIME_0S_0NS, VALID, BINARY, + RAW_BYTES, NUMERIC, BOOLEAN, LONG, @@ -620,7 +633,7 @@ public void testToTableSchema_map() { public void testToTableRow_flat() { TableRow row = toTableRow().apply(FLAT_ROW); - assertThat(row.size(), equalTo(22)); + assertThat(row.size(), equalTo(23)); assertThat(row, hasEntry("id", "123")); assertThat(row, hasEntry("value", "123.456")); assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876")); @@ -635,6 +648,7 @@ public void testToTableRow_flat() { assertThat(row, hasEntry("name", "test")); assertThat(row, hasEntry("valid", "false")); assertThat(row, hasEntry("binary", "ABCD1234")); + assertThat(row, hasEntry("raw_bytes", "ABCD1234")); assertThat(row, hasEntry("numeric", "123.456")); assertThat(row, hasEntry("boolean", "true")); assertThat(row, hasEntry("long", "123")); @@ -674,7 +688,7 @@ public void testToTableRow_row() { assertThat(row.size(), equalTo(1)); row = (TableRow) row.get("row"); - assertThat(row.size(), equalTo(22)); + assertThat(row.size(), equalTo(23)); assertThat(row, hasEntry("id", "123")); assertThat(row, hasEntry("value", "123.456")); assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876")); @@ -689,6 +703,7 @@ public void testToTableRow_row() { assertThat(row, hasEntry("name", "test")); assertThat(row, hasEntry("valid", "false")); assertThat(row, hasEntry("binary", "ABCD1234")); + assertThat(row, hasEntry("raw_bytes", "ABCD1234")); assertThat(row, hasEntry("numeric", "123.456")); assertThat(row, hasEntry("boolean", "true")); assertThat(row, hasEntry("long", "123")); @@ -701,7 +716,7 @@ public void testToTableRow_array_row() { assertThat(row.size(), equalTo(1)); row = ((List) row.get("rows")).get(0); - assertThat(row.size(), equalTo(22)); + assertThat(row.size(), equalTo(23)); assertThat(row, hasEntry("id", "123")); assertThat(row, hasEntry("value", "123.456")); assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876")); @@ -716,6 +731,7 @@ public void testToTableRow_array_row() { assertThat(row, hasEntry("name", "test")); assertThat(row, hasEntry("valid", "false")); assertThat(row, hasEntry("binary", "ABCD1234")); + assertThat(row, hasEntry("raw_bytes", "ABCD1234")); assertThat(row, hasEntry("numeric", "123.456")); assertThat(row, hasEntry("boolean", "true")); assertThat(row, hasEntry("long", "123")); @@ -726,7 +742,7 @@ public void testToTableRow_array_row() { public void testToTableRow_null_row() { TableRow row = toTableRow().apply(NULL_FLAT_ROW); - assertThat(row.size(), equalTo(22)); + assertThat(row.size(), equalTo(23)); assertThat(row, hasEntry("id", null)); assertThat(row, hasEntry("value", null)); assertThat(row, hasEntry("name", null)); @@ -745,6 +761,7 @@ public void testToTableRow_null_row() { assertThat(row, hasEntry("time0s_0ns", null)); assertThat(row, hasEntry("valid", null)); assertThat(row, hasEntry("binary", null)); + assertThat(row, hasEntry("raw_bytes", null)); assertThat(row, hasEntry("numeric", null)); assertThat(row, hasEntry("boolean", null)); assertThat(row, hasEntry("long", null)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java new file mode 100644 index 000000000000..31290c202127 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeTrue; + +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.stream.Collectors; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; +import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.*; +import org.apache.beam.sdk.values.*; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.*; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(Parameterized.class) +public class FileLoadsStreamingIT { + private static final Logger LOG = LoggerFactory.getLogger(FileLoadsStreamingIT.class); + + @Parameterized.Parameters + public static Iterable data() { + return ImmutableList.of(new Object[] {true}, new Object[] {false}); + } + + @Parameterized.Parameter(0) + public boolean useInputSchema; + + @Rule public TestName testName = new TestName(); + + private static final BigqueryClient BQ_CLIENT = new BigqueryClient("FileLoadsStreamingIT"); + private static final String PROJECT = "google.com:clouddfe"; + // TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); + private static final String BIG_QUERY_DATASET_ID = "file_loads_streaming_it_" + System.nanoTime(); + + private static final String[] FIELDS = { + "BOOL", + "BOOLEAN", + "BYTES", + "INT64", + "INTEGER", + "FLOAT", + "FLOAT64", + "NUMERIC", + "STRING", + "DATE", + "TIMESTAMP" + }; + + private static final int TOTAL_N = 50; + + private final Random randomGenerator = new Random(); + + @BeforeClass + public static void setUpTestEnvironment() throws IOException, InterruptedException { + // Create one BQ dataset for all test cases. + BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID); + } + + @AfterClass + public static void cleanUp() { + BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID); + } + + static class GenerateRowFunc implements SerializableFunction { + private final List fieldNames; + + public GenerateRowFunc(List fieldNames) { + this.fieldNames = fieldNames; + } + + @Override + public TableRow apply(Long rowId) { + TableRow row = new TableRow(); + row.set("id", rowId); + + for (String name : fieldNames) { + String type = Iterables.get(Splitter.on('_').split(name), 0); + switch (type) { + case "BOOL": + case "BOOLEAN": + if (rowId % 2 == 0) { + row.set(name, false); + } else { + row.set(name, true); + } + break; + case "BYTES": + row.set(name, String.format("test_blob_%s", rowId).getBytes(StandardCharsets.UTF_8)); + break; + case "INT64": + case "INTEGER": + row.set(name, String.valueOf(rowId + 10)); + break; + case "FLOAT": + case "FLOAT64": + row.set(name, String.valueOf(0.5 + rowId)); + break; + case "NUMERIC": + row.set(name, String.valueOf(rowId + 0.12345)); + break; + case "DATE": + row.set(name, "2022-01-01"); + break; + case "TIMESTAMP": + row.set(name, "2022-01-01 10:10:10.012 UTC"); + break; + case "STRING": + row.set(name, "test_string" + rowId); + break; + default: + row.set(name, "unknown" + rowId); + break; + } + } + return row; + } + } + + private static TableSchema makeTableSchemaFromTypes(List fieldNames) { + ImmutableList.Builder builder = ImmutableList.builder(); + + // Add an id field for verification of correctness + builder.add(new TableFieldSchema().setType("INTEGER").setName("id").setMode("REQUIRED")); + + // the name is prefix with type_. + for (String name : fieldNames) { + String mode = "REQUIRED"; + builder.add(new TableFieldSchema().setType(name).setName(name).setMode(mode)); + } + + return new TableSchema().setFields(builder.build()); + } + + private String maybeCreateTable(TableSchema tableSchema, String suffix) + throws IOException, InterruptedException { + String tableId = Iterables.get(Splitter.on('[').split(testName.getMethodName()), 0); + + BQ_CLIENT.deleteTable(PROJECT, BIG_QUERY_DATASET_ID, tableId + suffix); + if (!useInputSchema) { + BQ_CLIENT.createNewTable( + PROJECT, + BIG_QUERY_DATASET_ID, + new Table() + .setSchema(tableSchema) + .setTableReference( + new TableReference() + .setTableId(tableId + suffix) + .setDatasetId(BIG_QUERY_DATASET_ID) + .setProjectId(PROJECT))); + } + return String.format("%s.%s.%s", PROJECT, BIG_QUERY_DATASET_ID, tableId + suffix); + } + + private void runStreaming(int numFileShards, boolean useCopyJobs) + throws IOException, InterruptedException { + Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions()); + // Only run the most relevant test case on Dataflow. Testing this dimension on DirectRunner is + // sufficient + if (p.getOptions().getRunner().getName().contains("DataflowRunner")) { + assumeTrue("Skipping in favor of more relevant test case", useInputSchema); + } + + List fieldNamesOrigin = Arrays.asList(FIELDS); + + // Shuffle the fields in the write schema to do fuzz testing on field order + List fieldNamesShuffled = new ArrayList(fieldNamesOrigin); + Collections.shuffle(fieldNamesShuffled, randomGenerator); + + TableSchema bqTableSchema = makeTableSchemaFromTypes(fieldNamesOrigin); + TableSchema inputSchema = makeTableSchemaFromTypes(fieldNamesShuffled); + String tableSpec = maybeCreateTable(bqTableSchema, ""); + + // set up and build pipeline + Instant start = new Instant(0); + GenerateRowFunc generateRowFunc = new GenerateRowFunc(fieldNamesShuffled); + PCollection instants = + p.apply( + "Generate Instants", + PeriodicImpulse.create() + .startAt(start) + .stopAt(start.plus(Duration.standardSeconds(TOTAL_N - 1))) + .withInterval(Duration.standardSeconds(1)) + .catchUpToNow(false)); + PCollection rows = + instants.apply( + "Create TableRows", + MapElements.into(TypeDescriptor.of(TableRow.class)) + .via(instant -> generateRowFunc.apply(instant.getMillis() / 1000))); + // build write transform + Write write = + BigQueryIO.writeTableRows() + .withCustomGcsTempLocation( + ValueProvider.StaticValueProvider.of("gs://ahmedabualsaud-wordcount/tmp")) + .to(tableSpec) + .withMethod(Write.Method.FILE_LOADS) + .withWriteDisposition(WriteDisposition.WRITE_APPEND) + .withTriggeringFrequency(Duration.standardSeconds(10)); + if (useInputSchema) { + write = + write.withSchema(inputSchema).withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED); + } else { + write = write.withCreateDisposition(CreateDisposition.CREATE_NEVER); + } + if (useCopyJobs) { + write = write.withMaxBytesPerPartition(100); + } + write = numFileShards == 0 ? write.withAutoSharding() : write.withNumFileShards(numFileShards); + + rows.apply("Stream loads to dynamic BigQuery destinations", write); + p.run().waitUntilFinish(); + + List expectedRows = new ArrayList<>(); + for (long i = 0; i < TOTAL_N; i++) { + expectedRows.add(generateRowFunc.apply(i)); + } + + // Perform checks + checkRowCompleteness(tableSpec, inputSchema, expectedRows); + } + + // Check the expected number of rows reached the table. + private static void checkRowCompleteness( + String tableSpec, TableSchema schema, List expectedRows) + throws IOException, InterruptedException { + List actualTableRows = + BQ_CLIENT.queryUnflattened( + String.format("SELECT * FROM [%s]", tableSpec), PROJECT, true, false); + + Schema rowSchema = BigQueryUtils.fromTableSchema(schema); + List actualBeamRows = + actualTableRows.stream() + .map(tableRow -> BigQueryUtils.toBeamRow(rowSchema, tableRow)) + .collect(Collectors.toList()); + List expectedBeamRows = + expectedRows.stream() + .map(tableRow -> BigQueryUtils.toBeamRow(rowSchema, tableRow)) + .collect(Collectors.toList()); + LOG.info( + "Actual rows number: {}, expected: {}", actualBeamRows.size(), expectedBeamRows.size()); + + assertThat( + "Comparing expected rows with actual rows", + actualBeamRows, + containsInAnyOrder(expectedBeamRows.toArray())); + assertEquals( + "Checking there is no duplication", expectedBeamRows.size(), actualBeamRows.size()); + } + + @Test + public void testLoadWithAutoSharding() throws IOException, InterruptedException { + runStreaming(0, false); + } + + @Test + public void testLoadWithFixedShards() throws IOException, InterruptedException { + runStreaming(5, false); + } + + @Test + public void testWithAutoShardingAndCopyJobs() throws IOException, InterruptedException { + // Tests the copy jobs (aka multiple partitions) route + runStreaming(0, true); + } + + @Test + public void testWithFixedShardsAndCopyJobs() throws IOException, InterruptedException { + // Tests the copy jobs (aka multiple partitions) route + runStreaming(5, true); + } + + @Test + public void testDynamicDestinationsWithAutoSharding() throws IOException, InterruptedException { + runStreamingToDynamicDestinations(0, false); + } + + @Test + public void testDynamicDestinationsWithFixedShards() throws IOException, InterruptedException { + runStreamingToDynamicDestinations(6, false); + } + + @Test + public void testDynamicDestinationsWithAutoShardingAndCopyJobs() + throws IOException, InterruptedException { + // This currently fails when creating tables is needed + // TODO(https://github.com/apache/beam/issues/28309) + assumeTrue(!useInputSchema); + runStreamingToDynamicDestinations(0, true); + } + + @Test + public void testDynamicDestinationsWithFixedShardsAndCopyJobs() + throws IOException, InterruptedException { + // This currently fails when creating tables is needed + // TODO(https://github.com/apache/beam/issues/28309) + assumeTrue(!useInputSchema); + runStreamingToDynamicDestinations(6, true); + } + + private void runStreamingToDynamicDestinations(int numFileShards, boolean useCopyJobs) + throws IOException, InterruptedException { + Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions()); + + // Only run the most relevant test cases on Dataflow. Testing this dimension on DirectRunner is + // sufficient + if (p.getOptions().getRunner().getName().contains("DataflowRunner")) { + assumeTrue("Skipping in favor of more relevant test case", useInputSchema); + } + List allFields = Arrays.asList(FIELDS); + List subFields0 = new ArrayList<>(allFields.subList(0, 4)); + List subFields1 = new ArrayList<>(allFields.subList(4, 8)); + List subFields2 = new ArrayList<>(allFields.subList(8, 11)); + TableSchema table0Schema = makeTableSchemaFromTypes(subFields0); + TableSchema table1Schema = makeTableSchemaFromTypes(subFields1); + TableSchema table2Schema = makeTableSchemaFromTypes(subFields2); + String table0Id = maybeCreateTable(table0Schema, "-0"); + String table1Id = maybeCreateTable(table1Schema, "-1"); + String table2Id = maybeCreateTable(table2Schema, "-2"); + GenerateRowFunc generateRowFunc0 = new GenerateRowFunc(subFields0); + GenerateRowFunc generateRowFunc1 = new GenerateRowFunc(subFields1); + GenerateRowFunc generateRowFunc2 = new GenerateRowFunc(subFields2); + + String tablePrefix = + String.format( + "%s.%s.%s", + PROJECT, + BIG_QUERY_DATASET_ID, + Iterables.get(Splitter.on('[').split(testName.getMethodName()), 0)); + + // set up and build pipeline + Instant start = new Instant(0); + PCollection instants = + p.apply( + "Generate Instants", + PeriodicImpulse.create() + .startAt(start) + .stopAt(start.plus(Duration.standardSeconds(TOTAL_N - 1))) + .withInterval(Duration.standardSeconds(1)) + .catchUpToNow(false)); + PCollection longs = + instants.apply( + "Create TableRows", + MapElements.into(TypeDescriptors.longs()).via(instant -> instant.getMillis() / 1000)); + // build write transform + Write write = + BigQueryIO.write() + .to( + new TestDynamicDest( + tablePrefix, subFields0, subFields1, subFields2, useInputSchema)) + .withFormatFunction( + id -> { + long dest = id % 3; + TableRow row; + if (dest == 0) { + row = generateRowFunc0.apply(id); + } else if (dest == 1) { + row = generateRowFunc1.apply(id); + } else { + row = generateRowFunc2.apply(id); + } + return row; + }) + .withMethod(Write.Method.FILE_LOADS) + .withCustomGcsTempLocation( + ValueProvider.StaticValueProvider.of("gs://ahmedabualsaud-wordcount/tmp")) + .withTriggeringFrequency(Duration.standardSeconds(10)); + if (useCopyJobs) { + write = write.withMaxBytesPerPartition(100); + } + write = + write.withCreateDisposition( + useInputSchema ? CreateDisposition.CREATE_IF_NEEDED : CreateDisposition.CREATE_NEVER); + write = numFileShards == 0 ? write.withAutoSharding() : write.withNumFileShards(numFileShards); + + longs.apply("Stream loads to dynamic destinations", write); + p.run().waitUntilFinish(); + + List expectedRows0 = new ArrayList<>(); + List expectedRows1 = new ArrayList<>(); + List expectedRows2 = new ArrayList<>(); + for (long i = 0; i < TOTAL_N; i++) { + long dest = i % 3; + if (dest == 0) { + expectedRows0.add(generateRowFunc0.apply(i)); + } else if (dest == 1) { + expectedRows1.add(generateRowFunc1.apply(i)); + } else { + expectedRows2.add(generateRowFunc2.apply(i)); + } + } + // Perform checks + checkRowCompleteness(table0Id, makeTableSchemaFromTypes(subFields0), expectedRows0); + checkRowCompleteness(table1Id, makeTableSchemaFromTypes(subFields1), expectedRows1); + checkRowCompleteness(table2Id, makeTableSchemaFromTypes(subFields2), expectedRows2); + } + + static class TestDynamicDest extends DynamicDestinations { + String tablePrefix; + List table0Fields; + List table1Fields; + List table2Fields; + boolean useInputSchema; + + public TestDynamicDest( + String tablePrefix, + List table0Fields, + List table1Fields, + List table2Fields, + boolean useInputSchema) { + this.tablePrefix = tablePrefix; + this.table0Fields = table0Fields; + this.table1Fields = table1Fields; + this.table2Fields = table2Fields; + this.useInputSchema = useInputSchema; + } + + @Override + public Long getDestination(@Nullable ValueInSingleWindow element) { + return element.getValue() % 3; + } + + @Override + public TableDestination getTable(Long destination) { + return new TableDestination(tablePrefix + "-" + destination, ""); + } + + @Override + public @Nullable TableSchema getSchema(Long destination) { + if (!useInputSchema) { + return null; + } + List fields; + if (destination == 0) { + fields = table0Fields; + } else if (destination == 1) { + fields = table1Fields; + } else { + fields = table2Fields; + } + List tableFields = + fields.stream() + .map(name -> new TableFieldSchema().setName(name).setType(name).setMode("REQUIRED")) + .collect(Collectors.toList()); + // we attach an ID to each row in addition to the existing schema fields + tableFields.add( + 0, new TableFieldSchema().setName("id").setType("INTEGER").setMode("REQUIRED")); + return new TableSchema().setFields(tableFields); + } + } +} From 40c57943d8231edd5fe40291ee2841a05c67d978 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Tue, 5 Sep 2023 16:04:15 -0400 Subject: [PATCH 02/10] spotless --- .../io/gcp/bigquery/FileLoadsStreamingIT.java | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java index 31290c202127..712bf5da7feb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java @@ -29,7 +29,11 @@ import com.google.api.services.bigquery.model.TableSchema; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; @@ -39,15 +43,24 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.*; -import org.apache.beam.sdk.values.*; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PeriodicImpulse; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; -import org.junit.*; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; import org.junit.rules.TestName; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -194,6 +207,7 @@ private String maybeCreateTable(TableSchema tableSchema, String suffix) private void runStreaming(int numFileShards, boolean useCopyJobs) throws IOException, InterruptedException { + Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions()); // Only run the most relevant test case on Dataflow. Testing this dimension on DirectRunner is // sufficient From bff7e46f3c2da21e5caf0af32abb7980fc4015d4 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Tue, 5 Sep 2023 16:06:46 -0400 Subject: [PATCH 03/10] spotless --- .../beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java index 712bf5da7feb..5ee56e3c3331 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java @@ -36,6 +36,7 @@ import java.util.Random; import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; @@ -82,8 +83,8 @@ public static Iterable data() { @Rule public TestName testName = new TestName(); private static final BigqueryClient BQ_CLIENT = new BigqueryClient("FileLoadsStreamingIT"); - private static final String PROJECT = "google.com:clouddfe"; - // TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); + private static final String PROJECT = + TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); private static final String BIG_QUERY_DATASET_ID = "file_loads_streaming_it_" + System.nanoTime(); private static final String[] FIELDS = { From fac0b7e40a9e752a3ef48be8f56cc98cce6b79c9 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Wed, 6 Sep 2023 15:42:04 -0400 Subject: [PATCH 04/10] fix dynamic destinations copy jobs --- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 11 +-- .../io/gcp/bigquery/FileLoadsStreamingIT.java | 73 ++++++++++--------- 2 files changed, 46 insertions(+), 38 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index f5f193aecb74..32ee29738bf8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -34,7 +34,6 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.ShardedKeyCoder; -import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; @@ -399,10 +398,12 @@ private WriteResult expandTriggered(PCollection> inpu "Window Into Global Windows", Window.>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))) - .apply("Add Void Key", WithKeys.of((Void) null)) - .setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder())) - .apply("GroupByKey", GroupByKey.create()) - .apply("Extract Values", Values.create()) + // We use this and the following GBK to aggregate by final destination. + // This way, each destination has its own pane sequence + .apply("AddDestinationKeys", WithKeys.of(result -> result.getKey())) + .setCoder(KvCoder.of(destinationCoder, tempTables.getCoder())) + .apply("GroupTempTablesByFinalDestination", GroupByKey.create()) + .apply("ExtractTempTables", Values.create()) .apply( ParDo.of( new UpdateSchemaDestination( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java index 5ee56e3c3331..1bc308ef032b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java @@ -41,9 +41,9 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; -import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PeriodicImpulse; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -74,7 +74,7 @@ public class FileLoadsStreamingIT { @Parameterized.Parameters public static Iterable data() { - return ImmutableList.of(new Object[] {true}, new Object[] {false}); + return ImmutableList.of(new Object[] {false}, new Object[] {true}); } @Parameterized.Parameter(0) @@ -108,6 +108,7 @@ public static Iterable data() { @BeforeClass public static void setUpTestEnvironment() throws IOException, InterruptedException { // Create one BQ dataset for all test cases. + cleanUp(); BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID); } @@ -202,22 +203,25 @@ private String maybeCreateTable(TableSchema tableSchema, String suffix) .setTableId(tableId + suffix) .setDatasetId(BIG_QUERY_DATASET_ID) .setProjectId(PROJECT))); + } else { + tableId += "WithInputSchema"; } return String.format("%s.%s.%s", PROJECT, BIG_QUERY_DATASET_ID, tableId + suffix); } private void runStreaming(int numFileShards, boolean useCopyJobs) throws IOException, InterruptedException { + TestPipelineOptions opts = TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); + opts.setTempLocation(opts.getTempRoot()); + Pipeline p = Pipeline.create(opts); - Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions()); - // Only run the most relevant test case on Dataflow. Testing this dimension on DirectRunner is - // sufficient + // Only run the most relevant test case on Dataflow. + // Testing this dimension on DirectRunner is sufficient if (p.getOptions().getRunner().getName().contains("DataflowRunner")) { assumeTrue("Skipping in favor of more relevant test case", useInputSchema); } List fieldNamesOrigin = Arrays.asList(FIELDS); - // Shuffle the fields in the write schema to do fuzz testing on field order List fieldNamesShuffled = new ArrayList(fieldNamesOrigin); Collections.shuffle(fieldNamesShuffled, randomGenerator); @@ -245,20 +249,24 @@ private void runStreaming(int numFileShards, boolean useCopyJobs) // build write transform Write write = BigQueryIO.writeTableRows() - .withCustomGcsTempLocation( - ValueProvider.StaticValueProvider.of("gs://ahmedabualsaud-wordcount/tmp")) .to(tableSpec) .withMethod(Write.Method.FILE_LOADS) - .withWriteDisposition(WriteDisposition.WRITE_APPEND) .withTriggeringFrequency(Duration.standardSeconds(10)); + if (useCopyJobs) { + write = write.withMaxBytesPerPartition(100); + } if (useInputSchema) { + // we're creating the table with the input schema write = - write.withSchema(inputSchema).withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED); + write + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE); } else { - write = write.withCreateDisposition(CreateDisposition.CREATE_NEVER); - } - if (useCopyJobs) { - write = write.withMaxBytesPerPartition(100); + // table already exists with a schema, no need to create it + write = + write + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withWriteDisposition(WriteDisposition.WRITE_APPEND); } write = numFileShards == 0 ? write.withAutoSharding() : write.withNumFileShards(numFileShards); @@ -337,30 +345,26 @@ public void testDynamicDestinationsWithFixedShards() throws IOException, Interru @Test public void testDynamicDestinationsWithAutoShardingAndCopyJobs() throws IOException, InterruptedException { - // This currently fails when creating tables is needed - // TODO(https://github.com/apache/beam/issues/28309) - assumeTrue(!useInputSchema); runStreamingToDynamicDestinations(0, true); } @Test public void testDynamicDestinationsWithFixedShardsAndCopyJobs() throws IOException, InterruptedException { - // This currently fails when creating tables is needed - // TODO(https://github.com/apache/beam/issues/28309) - assumeTrue(!useInputSchema); runStreamingToDynamicDestinations(6, true); } private void runStreamingToDynamicDestinations(int numFileShards, boolean useCopyJobs) throws IOException, InterruptedException { - Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions()); - + TestPipelineOptions opts = TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); + opts.setTempLocation(opts.getTempRoot()); + Pipeline p = Pipeline.create(opts); // Only run the most relevant test cases on Dataflow. Testing this dimension on DirectRunner is // sufficient if (p.getOptions().getRunner().getName().contains("DataflowRunner")) { assumeTrue("Skipping in favor of more relevant test case", useInputSchema); } + List allFields = Arrays.asList(FIELDS); List subFields0 = new ArrayList<>(allFields.subList(0, 4)); List subFields1 = new ArrayList<>(allFields.subList(4, 8)); @@ -375,12 +379,7 @@ private void runStreamingToDynamicDestinations(int numFileShards, boolean useCop GenerateRowFunc generateRowFunc1 = new GenerateRowFunc(subFields1); GenerateRowFunc generateRowFunc2 = new GenerateRowFunc(subFields2); - String tablePrefix = - String.format( - "%s.%s.%s", - PROJECT, - BIG_QUERY_DATASET_ID, - Iterables.get(Splitter.on('[').split(testName.getMethodName()), 0)); + String tablePrefix = table0Id.substring(0, table0Id.length() - 2); // set up and build pipeline Instant start = new Instant(0); @@ -416,15 +415,23 @@ private void runStreamingToDynamicDestinations(int numFileShards, boolean useCop return row; }) .withMethod(Write.Method.FILE_LOADS) - .withCustomGcsTempLocation( - ValueProvider.StaticValueProvider.of("gs://ahmedabualsaud-wordcount/tmp")) .withTriggeringFrequency(Duration.standardSeconds(10)); if (useCopyJobs) { write = write.withMaxBytesPerPartition(100); } - write = - write.withCreateDisposition( - useInputSchema ? CreateDisposition.CREATE_IF_NEEDED : CreateDisposition.CREATE_NEVER); + if (useInputSchema) { + // we're creating the table with the input schema + write = + write + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE); + } else { + // table already exists with a schema, no need to create it + write = + write + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withWriteDisposition(WriteDisposition.WRITE_APPEND); + } write = numFileShards == 0 ? write.withAutoSharding() : write.withNumFileShards(numFileShards); longs.apply("Stream loads to dynamic destinations", write); From 9be89f7a69ba2f47ada7383a5abc23e241a32c18 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Wed, 6 Sep 2023 16:18:59 -0400 Subject: [PATCH 05/10] with schema --- .../apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java index 1bc308ef032b..37d6444901db 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java @@ -259,6 +259,7 @@ private void runStreaming(int numFileShards, boolean useCopyJobs) // we're creating the table with the input schema write = write + .withSchema(inputSchema) .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE); } else { From e9f0a4ff5d364c57252f96c3aaeb82e907076c31 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Thu, 7 Sep 2023 13:38:59 -0400 Subject: [PATCH 06/10] less temp load jobs --- .../beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java index 37d6444901db..6ce3cc195c69 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java @@ -253,7 +253,7 @@ private void runStreaming(int numFileShards, boolean useCopyJobs) .withMethod(Write.Method.FILE_LOADS) .withTriggeringFrequency(Duration.standardSeconds(10)); if (useCopyJobs) { - write = write.withMaxBytesPerPartition(100); + write = write.withMaxBytesPerPartition(250); } if (useInputSchema) { // we're creating the table with the input schema @@ -418,7 +418,7 @@ private void runStreamingToDynamicDestinations(int numFileShards, boolean useCop .withMethod(Write.Method.FILE_LOADS) .withTriggeringFrequency(Duration.standardSeconds(10)); if (useCopyJobs) { - write = write.withMaxBytesPerPartition(100); + write = write.withMaxBytesPerPartition(150); } if (useInputSchema) { // we're creating the table with the input schema @@ -484,7 +484,7 @@ public Long getDestination(@Nullable ValueInSingleWindow element) { @Override public TableDestination getTable(Long destination) { - return new TableDestination(tablePrefix + "-" + destination, ""); + return new TableDestination(tablePrefix + "-" + destination, null); } @Override From 1508cd20971fbbd9eb698eee7601279caf079379 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Mon, 11 Sep 2023 17:07:05 -0400 Subject: [PATCH 07/10] nits --- .../beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java index 6ce3cc195c69..8b07dcfb2b8f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java @@ -271,7 +271,7 @@ private void runStreaming(int numFileShards, boolean useCopyJobs) } write = numFileShards == 0 ? write.withAutoSharding() : write.withNumFileShards(numFileShards); - rows.apply("Stream loads to dynamic BigQuery destinations", write); + rows.apply("Stream loads to BigQuery", write); p.run().waitUntilFinish(); List expectedRows = new ArrayList<>(); @@ -283,7 +283,7 @@ private void runStreaming(int numFileShards, boolean useCopyJobs) checkRowCompleteness(tableSpec, inputSchema, expectedRows); } - // Check the expected number of rows reached the table. + // Check that the expected rows reached the table. private static void checkRowCompleteness( String tableSpec, TableSchema schema, List expectedRows) throws IOException, InterruptedException { @@ -323,13 +323,11 @@ public void testLoadWithFixedShards() throws IOException, InterruptedException { @Test public void testWithAutoShardingAndCopyJobs() throws IOException, InterruptedException { - // Tests the copy jobs (aka multiple partitions) route runStreaming(0, true); } @Test public void testWithFixedShardsAndCopyJobs() throws IOException, InterruptedException { - // Tests the copy jobs (aka multiple partitions) route runStreaming(5, true); } From bf25ea1454f5a91c5bd9e460887b2cf79561589b Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Mon, 11 Sep 2023 17:10:03 -0400 Subject: [PATCH 08/10] disable for runnerV2 until pane index is fixed --- runners/google-cloud-dataflow-java/build.gradle | 3 +++ 1 file changed, 3 insertions(+) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index f6e2b9b147c5..2acc30455e22 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -612,6 +612,9 @@ task googleCloudPlatformRunnerV2IntegrationTest(type: Test) { exclude '**/FhirIOLROIT.class' exclude '**/FhirIOSearchIT.class' exclude '**/FhirIOPatientEverythingIT.class' + // failing due to pane index not incrementing after Reshuffle: + // https://github.com/apache/beam/issues/28219 + exclude '**/FileLoadsStreamingIT.class' maxParallelForks 4 classpath = configurations.googleCloudPlatformIntegrationTest From 315327b0366116f70d9be1f6eaf10876c471d984 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Tue, 12 Sep 2023 13:58:41 -0400 Subject: [PATCH 09/10] reduce number of tests --- .../io/gcp/bigquery/FileLoadsStreamingIT.java | 23 +------------------ 1 file changed, 1 insertion(+), 22 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java index 8b07dcfb2b8f..b5f70723ad5f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java @@ -311,31 +311,16 @@ private static void checkRowCompleteness( "Checking there is no duplication", expectedBeamRows.size(), actualBeamRows.size()); } - @Test - public void testLoadWithAutoSharding() throws IOException, InterruptedException { - runStreaming(0, false); - } - @Test public void testLoadWithFixedShards() throws IOException, InterruptedException { runStreaming(5, false); } @Test - public void testWithAutoShardingAndCopyJobs() throws IOException, InterruptedException { + public void testLoadWithAutoShardingAndCopyJobs() throws IOException, InterruptedException { runStreaming(0, true); } - @Test - public void testWithFixedShardsAndCopyJobs() throws IOException, InterruptedException { - runStreaming(5, true); - } - - @Test - public void testDynamicDestinationsWithAutoSharding() throws IOException, InterruptedException { - runStreamingToDynamicDestinations(0, false); - } - @Test public void testDynamicDestinationsWithFixedShards() throws IOException, InterruptedException { runStreamingToDynamicDestinations(6, false); @@ -347,12 +332,6 @@ public void testDynamicDestinationsWithAutoShardingAndCopyJobs() runStreamingToDynamicDestinations(0, true); } - @Test - public void testDynamicDestinationsWithFixedShardsAndCopyJobs() - throws IOException, InterruptedException { - runStreamingToDynamicDestinations(6, true); - } - private void runStreamingToDynamicDestinations(int numFileShards, boolean useCopyJobs) throws IOException, InterruptedException { TestPipelineOptions opts = TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); From 610ccb1d97e7f3e652d6bff1d93b24375848297c Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Tue, 12 Sep 2023 14:10:15 -0400 Subject: [PATCH 10/10] add streaming engine experiment for legacy dataflow runner --- .../beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java index b5f70723ad5f..012afed6fb43 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; @@ -219,6 +220,9 @@ private void runStreaming(int numFileShards, boolean useCopyJobs) // Testing this dimension on DirectRunner is sufficient if (p.getOptions().getRunner().getName().contains("DataflowRunner")) { assumeTrue("Skipping in favor of more relevant test case", useInputSchema); + // Need to manually enable streaming engine for legacy dataflow runner + ExperimentalOptions.addExperiment( + p.getOptions().as(ExperimentalOptions.class), GcpOptions.STREAMING_ENGINE_EXPERIMENT); } List fieldNamesOrigin = Arrays.asList(FIELDS); @@ -341,6 +345,9 @@ private void runStreamingToDynamicDestinations(int numFileShards, boolean useCop // sufficient if (p.getOptions().getRunner().getName().contains("DataflowRunner")) { assumeTrue("Skipping in favor of more relevant test case", useInputSchema); + // Need to manually enable streaming engine for legacy dataflow runner + ExperimentalOptions.addExperiment( + p.getOptions().as(ExperimentalOptions.class), GcpOptions.STREAMING_ENGINE_EXPERIMENT); } List allFields = Arrays.asList(FIELDS);