From 6c6ddb2b6902002e77e3abcb5ca396e6bf5549a3 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 5 Sep 2023 11:54:27 -0400 Subject: [PATCH 1/5] Add randomness to job id * Introduce PeriodicImpulse.stopAfter() * Use it in streaming FILE_LOAD integration test * Fix unit test assert to consider randomness in jobId --- .../beam/runners/dataflow/DataflowRunner.java | 2 +- .../beam/sdk/transforms/PeriodicImpulse.java | 95 ++++++++++++--- .../io/google-cloud-platform/build.gradle | 2 +- .../gcp/bigquery/BigQueryResourceNaming.java | 18 +-- .../gcp/bigquery/UpdateSchemaDestination.java | 5 +- .../beam/sdk/io/gcp/bigquery/WriteRename.java | 10 +- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 2 +- ...ageWriteIT.java => BigQueryIOWriteIT.java} | 110 +++++++++++++----- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 22 +++- 9 files changed, 206 insertions(+), 60 deletions(-) rename sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/{BigQueryIOStorageWriteIT.java => BigQueryIOWriteIT.java} (56%) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 02f6f9acd7a6..17aea34045ff 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1755,7 +1755,7 @@ void maybeRecordPCollectionWithAutoSharding(PCollection pcol) { options.isEnableStreamingEngine(), "Runner determined sharding not available in Dataflow for GroupIntoBatches for" + " non-Streaming-Engine jobs. In order to use runner determined sharding, please use" - + " --streaming --enable_streaming_engine"); + + " --streaming --experiments=enable_streaming_engine"); pCollectionsPreservedKeys.add(pcol); pcollectionsRequiringAutoSharding.add(pcol); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java index 3679c3eb10f5..dc8e79bd895c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java @@ -17,11 +17,14 @@ */ package org.apache.beam.sdk.transforms; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -34,28 +37,57 @@ */ public class PeriodicImpulse extends PTransform> { - Instant startTimestamp = Instant.now(); - Instant stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE; - Duration fireInterval = Duration.standardMinutes(1); + Instant startTimestamp; + Instant stopTimestamp; + @Nullable Duration stopDuration; + Duration fireInterval; boolean applyWindowing = false; boolean catchUpToNow = true; - private PeriodicImpulse() {} + private PeriodicImpulse() { + this.startTimestamp = Instant.now(); + this.stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE; + this.fireInterval = Duration.standardMinutes(1); + } public static PeriodicImpulse create() { return new PeriodicImpulse(); } + /** + * Assign a timestamp when the pipeliene starts to produce data. + * + *

Cannot be used along with {@link #stopAfter}. + */ public PeriodicImpulse startAt(Instant startTime) { + checkArgument(stopDuration == null, "startAt and stopAfter cannot be set at the same time"); this.startTimestamp = startTime; return this; } + /** + * Assign a timestamp when the pipeliene stops producing data. + * + *

Cannot be used along with {@link #stopAfter}. + */ public PeriodicImpulse stopAt(Instant stopTime) { + checkArgument(stopDuration == null, "stopAt and stopAfter cannot be set at the same time"); this.stopTimestamp = stopTime; return this; } + /** + * For internal use only; no backwards-compatibility guarantees. + * + *

Assign a time interval at which the pipeliene produces data. This is different from setting + * {@link #startAt} and {@link #stopAt}, as the first timestamp is determined at run time + * (pipeline starts processing). + */ + public PeriodicImpulse stopAfter(Duration duration) { + this.stopDuration = duration; + return this; + } + public PeriodicImpulse withInterval(Duration interval) { this.fireInterval = interval; return this; @@ -67,7 +99,9 @@ public PeriodicImpulse applyWindowing() { } /** - * The default behavior is that PeriodicImpulse emits all instants until Instant.now(), then + * For internal use only; no backwards-compatibility guarantees. + * + *

The default behavior is that PeriodicImpulse emits all instants until Instant.now(), then * starts firing at the specified interval. If this is set to false, the PeriodicImpulse will * perform the interval wait before firing each instant. */ @@ -78,20 +112,51 @@ public PeriodicImpulse catchUpToNow(boolean catchUpToNow) { @Override public PCollection expand(PBegin input) { - PCollection result = - input - .apply( - Create.of( - new PeriodicSequence.SequenceDefinition( - startTimestamp, stopTimestamp, fireInterval, catchUpToNow))) - .apply(PeriodicSequence.create()); + PCollection seqDef; + if (stopDuration != null) { + // nonnull guaranteed + Duration d = stopDuration; + seqDef = + input + .apply(Impulse.create()) + .apply(ParDo.of(new RuntimeSequenceFn(d, fireInterval, catchUpToNow))); + } else { + seqDef = + input.apply( + Create.of( + new PeriodicSequence.SequenceDefinition( + startTimestamp, stopTimestamp, fireInterval, catchUpToNow))); + } + PCollection result = seqDef.apply(PeriodicSequence.create()); if (this.applyWindowing) { result = - result.apply( - Window.into(FixedWindows.of(Duration.millis(fireInterval.getMillis())))); + result.apply(Window.into(FixedWindows.of(Duration.millis(fireInterval.getMillis())))); } - return result; } + + /** + * A DoFn generated a SequenceDefinition at run time. This enables set first element timestamp at + * pipeline start processing data. + */ + private static class RuntimeSequenceFn extends DoFn { + Duration stopDuration; + Duration fireInterval; + boolean catchUpToNow; + + RuntimeSequenceFn(Duration stopDuration, Duration fireInterval, boolean catchUpToNow) { + this.stopDuration = stopDuration; + this.fireInterval = fireInterval; + this.catchUpToNow = catchUpToNow; + } + + @ProcessElement + public void process(ProcessContext c) { + Instant now = Instant.now(); + c.output( + new PeriodicSequence.SequenceDefinition( + now, now.plus(stopDuration), fireInterval, catchUpToNow)); + } + } } diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 560b27aae162..5a26d9be5a4a 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -203,7 +203,7 @@ task integrationTest(type: Test, dependsOn: processTestResources) { exclude '**/BigQueryIOStorageQueryIT.class' exclude '**/BigQueryIOStorageReadIT.class' exclude '**/BigQueryIOStorageReadTableRowIT.class' - exclude '**/BigQueryIOStorageWriteIT.class' + exclude '**/BigQueryIOWriteIT.class' exclude '**/BigQueryToTableIT.class' exclude '**/BigQueryIOJsonTest.class' diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java index df5e641847b6..257fd72f269b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.randomUUIDString; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BIGQUERY_JOB_TEMPLATE; import com.google.api.services.bigquery.model.TableReference; @@ -47,21 +48,22 @@ class BigQueryResourceNaming { * @param prefix A prefix generated in {@link BigQueryResourceNaming::createJobIdPrefix}. * @param tableDestination A descriptor of the destination table. * @param partition A partition number in the destination table. - * @param index - * @return + * @return a generated jobId. */ static String createJobIdWithDestination( - String prefix, TableDestination tableDestination, int partition, long index) { + String prefix, TableDestination tableDestination, int partition) { // Job ID must be different for each partition of each table. String destinationHash = - Hashing.murmur3_128().hashUnencodedChars(tableDestination.toString()).toString(); - String jobId = String.format("%s_%s", prefix, destinationHash); + Hashing.murmur3_128() + .hashUnencodedChars(tableDestination.toString()) + .toString() + .substring(0, 16); + // add randomness to jobId to avoid conflict + String jobId = + String.format("%s_%s_%s", prefix, destinationHash, randomUUIDString().substring(0, 16)); if (partition >= 0) { jobId += String.format("_%05d", partition); } - if (index >= 0) { - jobId += String.format("_%05d", index); - } return jobId; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java index 51e61fe41953..f6536ec7f91f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java @@ -161,10 +161,7 @@ public void processElement( TableReference tableReference = tableDestination.getTableReference(); String jobIdPrefix = BigQueryResourceNaming.createJobIdWithDestination( - context.sideInput(zeroLoadJobIdPrefixView), - tableDestination, - 1, - context.pane().getIndex()); + context.sideInput(zeroLoadJobIdPrefixView), tableDestination, 1); BigQueryHelpers.PendingJob updateSchemaDestinationJob = startZeroLoadJob( getJobService(context.getPipelineOptions().as(BigQueryOptions.class)), diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java index 9d798b397070..01b70bd0bba7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -206,7 +206,15 @@ private PendingJobData startWriteRename( // Make sure each destination table gets a unique job id. String jobIdPrefix = BigQueryResourceNaming.createJobIdWithDestination( - c.sideInput(jobIdToken), finalTableDestination, -1, c.pane().getIndex()); + c.sideInput(jobIdToken), finalTableDestination, -1); + + if (isFirstPane) { + LOG.info("Setup write disposition {}, create disposition {} for first pane BigQuery job {}", + writeDisposition, createDisposition, jobIdPrefix); + } else { + LOG.debug("Setup write disposition {}, create disposition {} for BigQuery job {}", + writeDisposition, createDisposition, jobIdPrefix); + } BigQueryHelpers.PendingJob retryJob = startCopy( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index c6a7d32e2486..1d6c1b8cff08 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -251,7 +251,7 @@ public void processElement( List partitionFiles = Lists.newArrayList(element.getValue().getFilenames()); String jobIdPrefix = BigQueryResourceNaming.createJobIdWithDestination( - c.sideInput(loadJobIdPrefixView), tableDestination, partition, c.pane().getIndex()); + c.sideInput(loadJobIdPrefixView), tableDestination, partition); if (tempTable) { if (tempDataset != null) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteIT.java similarity index 56% rename from sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteIT.java index 81de67f38502..d9d45c10ee1a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteIT.java @@ -30,12 +30,22 @@ import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.PeriodicImpulse; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -46,11 +56,12 @@ * written row count. */ @RunWith(JUnit4.class) -public class BigQueryIOStorageWriteIT { +public class BigQueryIOWriteIT { private enum WriteMode { EXACT_ONCE, - AT_LEAST_ONCE + AT_LEAST_ONCE, + FILE_LOAD } private String project; @@ -62,14 +73,25 @@ private enum WriteMode { private void setUpTestEnvironment(WriteMode writeMode) { PipelineOptionsFactory.register(BigQueryOptions.class); + bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); bqOptions.setProject(TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject()); - bqOptions.setUseStorageWriteApi(true); - if (writeMode == WriteMode.AT_LEAST_ONCE) { - bqOptions.setUseStorageWriteApiAtLeastOnce(true); + if (writeMode == WriteMode.EXACT_ONCE || writeMode == WriteMode.AT_LEAST_ONCE) { + bqOptions.setUseStorageWriteApi(true); + if (writeMode == WriteMode.AT_LEAST_ONCE) { + bqOptions.setUseStorageWriteApiAtLeastOnce(true); + } + bqOptions.setNumStorageWriteApiStreams(2); + bqOptions.setStorageWriteApiTriggeringFrequencySec(1); + } else { + // if sharding not explicit set, autosharding required streaming engine + ExperimentalOptions.addExperiment( + bqOptions.as(ExperimentalOptions.class), "enable_streaming_engine"); + // FILE_LOAD need tempLocation set + if (Strings.isNullOrEmpty(bqOptions.getTempLocation())) { + bqOptions.setTempLocation(bqOptions.as(TestPipelineOptions.class).getTempRoot()); + } } - bqOptions.setNumStorageWriteApiStreams(2); - bqOptions.setStorageWriteApiTriggeringFrequencySec(1); project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); } @@ -81,15 +103,35 @@ public void processElement(ProcessContext c) { } } - private GenerateSequence stream(int rowCount) { - int timestampIntervalInMilliseconds = 10; - return GenerateSequence.from(0) - .to(rowCount) - .withRate(1, Duration.millis(timestampIntervalInMilliseconds)); + static class UnboundedStream extends PTransform> { + + private final int rowCount; + + public UnboundedStream(int rowCount) { + this.rowCount = rowCount; + } + + @Override + public PCollection expand(PBegin input) { + int timestampIntervalInMillis = 10; + PeriodicImpulse impulse = + PeriodicImpulse.create() + .stopAfter(Duration.millis((long) timestampIntervalInMillis * rowCount - 1)) + .withInterval(Duration.millis(timestampIntervalInMillis)); + return input + .apply(impulse) + .apply( + MapElements.via( + new SimpleFunction() { + @Override + public Long apply(Instant input) { + return input.getMillis(); + } + })); + } } - private void runBigQueryIOStorageWritePipeline( - int rowCount, WriteMode writeMode, Boolean isStreaming) { + private void runBigQueryIOWritePipeline(int rowCount, WriteMode writeMode, Boolean isStreaming) { String tableName = isStreaming ? TABLE_PREFIX + "streaming_" + System.currentTimeMillis() @@ -101,16 +143,26 @@ private void runBigQueryIOStorageWritePipeline( new TableFieldSchema().setName("number").setType("INTEGER"), new TableFieldSchema().setName("str").setType("STRING"))); + BigQueryIO.Write writeTransform = + BigQueryIO.writeTableRows() + .to(String.format("%s:%s.%s", project, DATASET_ID, tableName)) + .withSchema(schema) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND); + + if (writeMode == WriteMode.FILE_LOAD && isStreaming) { + writeTransform = + writeTransform + .withMethod(BigQueryIO.Write.Method.FILE_LOADS) + .withTriggeringFrequency(Duration.standardSeconds(10)); + } + Pipeline p = Pipeline.create(bqOptions); - p.apply("Input", isStreaming ? stream(rowCount) : GenerateSequence.from(0).to(rowCount)) + p.apply( + "Input", + isStreaming ? new UnboundedStream(rowCount) : GenerateSequence.from(0).to(rowCount)) .apply("GenerateMessage", ParDo.of(new FillRowFn())) - .apply( - "WriteToBQ", - BigQueryIO.writeTableRows() - .to(String.format("%s:%s.%s", project, DATASET_ID, tableName)) - .withSchema(schema) - .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); + .apply("WriteToBQ", writeTransform); p.run().waitUntilFinish(); String testQuery = String.format("SELECT count(*) FROM [%s.%s];", DATASET_ID, tableName); try { @@ -130,24 +182,30 @@ private void runBigQueryIOStorageWritePipeline( @Test public void testBigQueryStorageWrite3MProto() { setUpTestEnvironment(WriteMode.EXACT_ONCE); - runBigQueryIOStorageWritePipeline(3_000_000, WriteMode.EXACT_ONCE, false); + runBigQueryIOWritePipeline(3_000_000, WriteMode.EXACT_ONCE, false); } @Test public void testBigQueryStorageWrite3MProtoALO() { setUpTestEnvironment(WriteMode.AT_LEAST_ONCE); - runBigQueryIOStorageWritePipeline(3_000_000, WriteMode.AT_LEAST_ONCE, false); + runBigQueryIOWritePipeline(3_000_000, WriteMode.AT_LEAST_ONCE, false); } @Test public void testBigQueryStorageWrite3KProtoStreaming() { setUpTestEnvironment(WriteMode.EXACT_ONCE); - runBigQueryIOStorageWritePipeline(3000, WriteMode.EXACT_ONCE, true); + runBigQueryIOWritePipeline(3000, WriteMode.EXACT_ONCE, true); } @Test public void testBigQueryStorageWrite3KProtoALOStreaming() { setUpTestEnvironment(WriteMode.AT_LEAST_ONCE); - runBigQueryIOStorageWritePipeline(3000, WriteMode.AT_LEAST_ONCE, true); + runBigQueryIOWritePipeline(3000, WriteMode.AT_LEAST_ONCE, true); + } + + @Test + public void testBigQueryFileloadWrite3KJson() { + setUpTestEnvironment(WriteMode.FILE_LOAD); + runBigQueryIOWritePipeline(3000, WriteMode.FILE_LOAD, true); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 720419f2227a..077e203c9478 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -2444,6 +2444,8 @@ public TableSchema getSchema(String destination) { @Test public void testWriteTables() throws Exception { + assumeTrue(!useStorageApi); // test in FILE_LOAD mode + long numTables = 3; long numPartitions = 3; long numFilesPerPartition = 10; @@ -2456,7 +2458,7 @@ public void testWriteTables() throws Exception { TableDestination tableDestination = new TableDestination(tableName, tableName); for (int j = 0; j < numPartitions; ++j) { String tempTableId = - BigQueryResourceNaming.createJobIdWithDestination(jobIdToken, tableDestination, j, 0); + BigQueryResourceNaming.createJobIdWithDestination(jobIdToken, tableDestination, j); List filesPerPartition = Lists.newArrayList(); for (int k = 0; k < numFilesPerPartition; ++k) { String filename = @@ -2541,10 +2543,12 @@ public void processElement( Iterable tableNames = StreamSupport.stream(entry.getValue().spliterator(), false) .map(Result::getTableName) + .map(BigQueryIOWriteTest::removeUuidInJobIdNaming) .collect(Collectors.toList()); - @SuppressWarnings("unchecked") String[] expectedValues = - Iterables.toArray(expectedTempTables.get(entry.getKey()), String.class); + expectedTempTables.get(entry.getKey()).stream() + .map(BigQueryIOWriteTest::removeUuidInJobIdNaming) + .toArray(String[]::new); assertThat(tableNames, containsInAnyOrder(expectedValues)); } return null; @@ -2552,6 +2556,18 @@ public void processElement( p.run(); } + private static String removeUuidInJobIdNaming(String jobId) { + final int uuidLength = 16; + + int splitIdx = jobId.lastIndexOf("_"); + if (splitIdx > jobId.length() - (uuidLength + 1)) { // has partition index + return jobId.substring(0, jobId.lastIndexOf("_", splitIdx - 1)) + + jobId.substring(splitIdx + 1); + } else { + return jobId.substring(0, jobId.length() - (uuidLength + 1)); + } + } + @Test public void testRemoveTemporaryFiles() throws Exception { int numFiles = 10; From 6aadd9fda7a7063fe654d7b3882e38f03178b894 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 5 Sep 2023 17:20:56 -0400 Subject: [PATCH 2/5] Alternate approach - store pane index upstreeam --- .../beam/sdk/transforms/PeriodicImpulse.java | 3 + .../beam/sdk/transforms/PeriodicSequence.java | 3 + .../gcp/bigquery/BigQueryResourceNaming.java | 18 +++--- .../gcp/bigquery/UpdateSchemaDestination.java | 5 +- .../sdk/io/gcp/bigquery/WritePartition.java | 13 +++- .../beam/sdk/io/gcp/bigquery/WriteRename.java | 16 +++-- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 62 ++++++++++--------- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 30 +++------ 8 files changed, 82 insertions(+), 68 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java index dc8e79bd895c..db4f141ee624 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java @@ -19,6 +19,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -83,6 +84,7 @@ public PeriodicImpulse stopAt(Instant stopTime) { * {@link #startAt} and {@link #stopAt}, as the first timestamp is determined at run time * (pipeline starts processing). */ + @Internal public PeriodicImpulse stopAfter(Duration duration) { this.stopDuration = duration; return this; @@ -105,6 +107,7 @@ public PeriodicImpulse applyWindowing() { * starts firing at the specified interval. If this is set to false, the PeriodicImpulse will * perform the interval wait before firing each instant. */ + @Internal public PeriodicImpulse catchUpToNow(boolean catchUpToNow) { this.catchUpToNow = catchUpToNow; return this; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java index b3cd2afde697..12cbecd04b02 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java @@ -22,6 +22,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import java.util.Objects; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.schemas.JavaFieldSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; @@ -67,6 +68,8 @@ public SequenceDefinition(Instant first, Instant last, Duration duration) { this.catchUpToNow = true; } + /** catchUpToNow is experimental; no backwards-compatibility guarantees. */ + @Internal public SequenceDefinition( Instant first, Instant last, Duration duration, boolean catchUpToNow) { this.first = first; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java index 257fd72f269b..df5e641847b6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.randomUUIDString; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BIGQUERY_JOB_TEMPLATE; import com.google.api.services.bigquery.model.TableReference; @@ -48,22 +47,21 @@ class BigQueryResourceNaming { * @param prefix A prefix generated in {@link BigQueryResourceNaming::createJobIdPrefix}. * @param tableDestination A descriptor of the destination table. * @param partition A partition number in the destination table. - * @return a generated jobId. + * @param index + * @return */ static String createJobIdWithDestination( - String prefix, TableDestination tableDestination, int partition) { + String prefix, TableDestination tableDestination, int partition, long index) { // Job ID must be different for each partition of each table. String destinationHash = - Hashing.murmur3_128() - .hashUnencodedChars(tableDestination.toString()) - .toString() - .substring(0, 16); - // add randomness to jobId to avoid conflict - String jobId = - String.format("%s_%s_%s", prefix, destinationHash, randomUUIDString().substring(0, 16)); + Hashing.murmur3_128().hashUnencodedChars(tableDestination.toString()).toString(); + String jobId = String.format("%s_%s", prefix, destinationHash); if (partition >= 0) { jobId += String.format("_%05d", partition); } + if (index >= 0) { + jobId += String.format("_%05d", index); + } return jobId; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java index f6536ec7f91f..51e61fe41953 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java @@ -161,7 +161,10 @@ public void processElement( TableReference tableReference = tableDestination.getTableReference(); String jobIdPrefix = BigQueryResourceNaming.createJobIdWithDestination( - context.sideInput(zeroLoadJobIdPrefixView), tableDestination, 1); + context.sideInput(zeroLoadJobIdPrefixView), + tableDestination, + 1, + context.pane().getIndex()); BigQueryHelpers.PendingJob updateSchemaDestinationJob = startZeroLoadJob( getJobService(context.getPipelineOptions().as(BigQueryOptions.class)), diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java index 31036c58f9dc..be732550d875 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; @@ -48,25 +49,31 @@ class WritePartition @AutoValue abstract static class Result { public abstract List getFilenames(); - + // Downstream operations may rely on pane info which will get lost after a ReShuffle abstract Boolean isFirstPane(); + + abstract Long getPaneIndex(); } static class ResultCoder extends AtomicCoder { private static final Coder> FILENAMES_CODER = ListCoder.of(StringUtf8Coder.of()); private static final Coder FIRST_PANE_CODER = BooleanCoder.of(); + private static final Coder PANE_INDEX_CODER = VarLongCoder.of(); static final ResultCoder INSTANCE = new ResultCoder(); @Override public void encode(Result value, OutputStream outStream) throws IOException { FILENAMES_CODER.encode(value.getFilenames(), outStream); FIRST_PANE_CODER.encode(value.isFirstPane(), outStream); + PANE_INDEX_CODER.encode(value.getPaneIndex(), outStream); } @Override public Result decode(InputStream inStream) throws IOException { return new AutoValue_WritePartition_Result( - FILENAMES_CODER.decode(inStream), FIRST_PANE_CODER.decode(inStream)); + FILENAMES_CODER.decode(inStream), + FIRST_PANE_CODER.decode(inStream), + PANE_INDEX_CODER.decode(inStream)); } } @@ -234,7 +241,7 @@ public void processElement(ProcessContext c) throws Exception { KV.of( ShardedKey.of(destination, i + 1), new AutoValue_WritePartition_Result( - partitionData.getFilenames(), c.pane().isFirst()))); + partitionData.getFilenames(), c.pane().isFirst(), c.pane().getIndex()))); } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java index 01b70bd0bba7..4add4228bdc5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -206,14 +206,20 @@ private PendingJobData startWriteRename( // Make sure each destination table gets a unique job id. String jobIdPrefix = BigQueryResourceNaming.createJobIdWithDestination( - c.sideInput(jobIdToken), finalTableDestination, -1); + c.sideInput(jobIdToken), finalTableDestination, -1, c.pane().getIndex()); if (isFirstPane) { - LOG.info("Setup write disposition {}, create disposition {} for first pane BigQuery job {}", - writeDisposition, createDisposition, jobIdPrefix); + LOG.info( + "Setup write disposition {}, create disposition {} for first pane BigQuery job {}", + writeDisposition, + createDisposition, + jobIdPrefix); } else { - LOG.debug("Setup write disposition {}, create disposition {} for BigQuery job {}", - writeDisposition, createDisposition, jobIdPrefix); + LOG.debug( + "Setup write disposition {}, create disposition {} for BigQuery job {}", + writeDisposition, + createDisposition, + jobIdPrefix); } BigQueryHelpers.PendingJob retryJob = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index 1d6c1b8cff08..94d89cc26821 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -36,9 +36,9 @@ import java.util.stream.Collectors; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BooleanCoder; -import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResourceId; @@ -75,10 +75,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; -import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; -import org.checkerframework.checker.nullness.qual.UnknownKeyFor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,27 +99,28 @@ class WriteTables @AutoValue abstract static class Result { abstract String getTableName(); - + // Downstream operations may rely on pane info which will get lost after a ReShuffle abstract Boolean isFirstPane(); + + abstract Long getPaneIndex(); } static class ResultCoder extends AtomicCoder { static final ResultCoder INSTANCE = new ResultCoder(); @Override - public void encode(Result value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) - throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull - @Initialized IOException { + public void encode(Result value, OutputStream outStream) throws IOException { StringUtf8Coder.of().encode(value.getTableName(), outStream); BooleanCoder.of().encode(value.isFirstPane(), outStream); + VarLongCoder.of().encode(value.getPaneIndex(), outStream); } @Override - public Result decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) - throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull - @Initialized IOException { + public Result decode(InputStream inStream) throws IOException { return new AutoValue_WriteTables_Result( - StringUtf8Coder.of().decode(inStream), BooleanCoder.of().decode(inStream)); + StringUtf8Coder.of().decode(inStream), + BooleanCoder.of().decode(inStream), + VarLongCoder.of().decode(inStream)); } } @@ -156,27 +155,36 @@ private class WriteTablesDoFn private class PendingJobData { final BoundedWindow window; final BigQueryHelpers.PendingJob retryJob; - final List partitionFiles; + final WritePartition.Result partitionResult; final TableDestination tableDestination; final TableReference tableReference; final DestinationT destinationT; - final boolean isFirstPane; public PendingJobData( BoundedWindow window, BigQueryHelpers.PendingJob retryJob, - List partitionFiles, + WritePartition.Result partitionResult, TableDestination tableDestination, TableReference tableReference, - DestinationT destinationT, - boolean isFirstPane) { + DestinationT destinationT) { this.window = window; this.retryJob = retryJob; - this.partitionFiles = partitionFiles; + this.partitionResult = partitionResult; this.tableDestination = tableDestination; this.tableReference = tableReference; this.destinationT = destinationT; - this.isFirstPane = isFirstPane; + } + + public List paritionFiles() { + return partitionResult.getFilenames(); + } + + public boolean isFirstPane() { + return partitionResult.isFirstPane(); + } + + public long paneIndex() { + return partitionResult.getPaneIndex(); } } // All pending load jobs. @@ -251,7 +259,10 @@ public void processElement( List partitionFiles = Lists.newArrayList(element.getValue().getFilenames()); String jobIdPrefix = BigQueryResourceNaming.createJobIdWithDestination( - c.sideInput(loadJobIdPrefixView), tableDestination, partition); + c.sideInput(loadJobIdPrefixView), + tableDestination, + partition, + element.getValue().getPaneIndex()); if (tempTable) { if (tempDataset != null) { @@ -291,13 +302,7 @@ public void processElement( pendingJobs.add( new PendingJobData( - window, - retryJob, - partitionFiles, - tableDestination, - tableReference, - destination, - element.getValue().isFirstPane())); + window, retryJob, element.getValue(), tableDestination, tableReference, destination)); } @Teardown @@ -361,13 +366,14 @@ public void finishBundle(FinishBundleContext c) throws Exception { Result result = new AutoValue_WriteTables_Result( BigQueryHelpers.toJsonString(pendingJob.tableReference), - pendingJob.isFirstPane); + pendingJob.isFirstPane(), + pendingJob.paneIndex()); c.output( mainOutputTag, KV.of(pendingJob.destinationT, result), pendingJob.window.maxTimestamp(), pendingJob.window); - for (String file : pendingJob.partitionFiles) { + for (String file : pendingJob.paritionFiles()) { c.output( temporaryFilesTag, file, diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 077e203c9478..f584323a14f2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -2444,8 +2444,7 @@ public TableSchema getSchema(String destination) { @Test public void testWriteTables() throws Exception { - assumeTrue(!useStorageApi); // test in FILE_LOAD mode - + assumeTrue(!useStorageApi); long numTables = 3; long numPartitions = 3; long numFilesPerPartition = 10; @@ -2458,7 +2457,7 @@ public void testWriteTables() throws Exception { TableDestination tableDestination = new TableDestination(tableName, tableName); for (int j = 0; j < numPartitions; ++j) { String tempTableId = - BigQueryResourceNaming.createJobIdWithDestination(jobIdToken, tableDestination, j); + BigQueryResourceNaming.createJobIdWithDestination(jobIdToken, tableDestination, j, 0); List filesPerPartition = Lists.newArrayList(); for (int k = 0; k < numFilesPerPartition; ++k) { String filename = @@ -2477,7 +2476,7 @@ public void testWriteTables() throws Exception { partitions.add( KV.of( ShardedKey.of(tableDestination.getTableSpec(), j), - new AutoValue_WritePartition_Result(filesPerPartition, true))); + new AutoValue_WritePartition_Result(filesPerPartition, true, 0L))); String json = String.format( @@ -2543,12 +2542,10 @@ public void processElement( Iterable tableNames = StreamSupport.stream(entry.getValue().spliterator(), false) .map(Result::getTableName) - .map(BigQueryIOWriteTest::removeUuidInJobIdNaming) .collect(Collectors.toList()); + @SuppressWarnings("unchecked") String[] expectedValues = - expectedTempTables.get(entry.getKey()).stream() - .map(BigQueryIOWriteTest::removeUuidInJobIdNaming) - .toArray(String[]::new); + Iterables.toArray(expectedTempTables.get(entry.getKey()), String.class); assertThat(tableNames, containsInAnyOrder(expectedValues)); } return null; @@ -2556,20 +2553,9 @@ public void processElement( p.run(); } - private static String removeUuidInJobIdNaming(String jobId) { - final int uuidLength = 16; - - int splitIdx = jobId.lastIndexOf("_"); - if (splitIdx > jobId.length() - (uuidLength + 1)) { // has partition index - return jobId.substring(0, jobId.lastIndexOf("_", splitIdx - 1)) - + jobId.substring(splitIdx + 1); - } else { - return jobId.substring(0, jobId.length() - (uuidLength + 1)); - } - } - @Test public void testRemoveTemporaryFiles() throws Exception { + assumeTrue(!useStorageApi); int numFiles = 10; List fileNames = Lists.newArrayList(); String tempFilePrefix = options.getTempLocation() + "/"; @@ -2589,6 +2575,7 @@ public void testRemoveTemporaryFiles() throws Exception { @Test public void testWriteRename() throws Exception { + assumeTrue(!useStorageApi); p.enableAbandonedNodeEnforcement(false); final int numFinalTables = 3; @@ -2619,7 +2606,7 @@ public void testWriteRename() throws Exception { String tableJson = toJsonString(tempTable); tempTables.put(tableDestination, tableJson); tempTablesElement.add( - KV.of(tableDestination, new AutoValue_WriteTables_Result(tableJson, true))); + KV.of(tableDestination, new AutoValue_WriteTables_Result(tableJson, true, 0L))); } } @@ -2668,6 +2655,7 @@ public void testWriteRename() throws Exception { @Test public void testRemoveTemporaryTables() throws Exception { + assumeTrue(!useStorageApi); FakeDatasetService datasetService = new FakeDatasetService(); String projectId = "project"; String datasetId = "dataset"; From 2a131a8e6a9e4ac09fb0e962ada026112ed874d2 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 7 Sep 2023 15:17:07 -0400 Subject: [PATCH 3/5] Revert change to WriteTables.Result --- .../apache/beam/sdk/io/gcp/bigquery/WriteTables.java | 11 ++--------- .../beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 2 +- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index 94d89cc26821..a44244d05588 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.coders.BooleanCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResourceId; @@ -101,8 +100,6 @@ abstract static class Result { abstract String getTableName(); // Downstream operations may rely on pane info which will get lost after a ReShuffle abstract Boolean isFirstPane(); - - abstract Long getPaneIndex(); } static class ResultCoder extends AtomicCoder { @@ -112,15 +109,12 @@ static class ResultCoder extends AtomicCoder { public void encode(Result value, OutputStream outStream) throws IOException { StringUtf8Coder.of().encode(value.getTableName(), outStream); BooleanCoder.of().encode(value.isFirstPane(), outStream); - VarLongCoder.of().encode(value.getPaneIndex(), outStream); } @Override public Result decode(InputStream inStream) throws IOException { return new AutoValue_WriteTables_Result( - StringUtf8Coder.of().decode(inStream), - BooleanCoder.of().decode(inStream), - VarLongCoder.of().decode(inStream)); + StringUtf8Coder.of().decode(inStream), BooleanCoder.of().decode(inStream)); } } @@ -366,8 +360,7 @@ public void finishBundle(FinishBundleContext c) throws Exception { Result result = new AutoValue_WriteTables_Result( BigQueryHelpers.toJsonString(pendingJob.tableReference), - pendingJob.isFirstPane(), - pendingJob.paneIndex()); + pendingJob.isFirstPane()); c.output( mainOutputTag, KV.of(pendingJob.destinationT, result), diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index f584323a14f2..adc3867ca7e3 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -2606,7 +2606,7 @@ public void testWriteRename() throws Exception { String tableJson = toJsonString(tempTable); tempTables.put(tableDestination, tableJson); tempTablesElement.add( - KV.of(tableDestination, new AutoValue_WriteTables_Result(tableJson, true, 0L))); + KV.of(tableDestination, new AutoValue_WriteTables_Result(tableJson, true))); } } From d1dd0d35019a83b5616af354bfcbc34cc41d4904 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 14 Sep 2023 13:44:33 -0400 Subject: [PATCH 4/5] Revert change to add test - added elsewhere --- .../io/google-cloud-platform/build.gradle | 2 +- ...eIT.java => BigQueryIOStorageWriteIT.java} | 68 ++++++------------- 2 files changed, 21 insertions(+), 49 deletions(-) rename sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/{BigQueryIOWriteIT.java => BigQueryIOStorageWriteIT.java} (72%) diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 5a26d9be5a4a..560b27aae162 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -203,7 +203,7 @@ task integrationTest(type: Test, dependsOn: processTestResources) { exclude '**/BigQueryIOStorageQueryIT.class' exclude '**/BigQueryIOStorageReadIT.class' exclude '**/BigQueryIOStorageReadTableRowIT.class' - exclude '**/BigQueryIOWriteIT.class' + exclude '**/BigQueryIOStorageWriteIT.class' exclude '**/BigQueryToTableIT.class' exclude '**/BigQueryIOJsonTest.class' diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java similarity index 72% rename from sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteIT.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java index d9d45c10ee1a..fc3ce0be4b69 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java @@ -30,10 +30,8 @@ import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; -import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; @@ -42,7 +40,6 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.joda.time.Duration; import org.joda.time.Instant; @@ -56,12 +53,11 @@ * written row count. */ @RunWith(JUnit4.class) -public class BigQueryIOWriteIT { +public class BigQueryIOStorageWriteIT { private enum WriteMode { EXACT_ONCE, - AT_LEAST_ONCE, - FILE_LOAD + AT_LEAST_ONCE } private String project; @@ -73,25 +69,14 @@ private enum WriteMode { private void setUpTestEnvironment(WriteMode writeMode) { PipelineOptionsFactory.register(BigQueryOptions.class); - bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); bqOptions.setProject(TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject()); - if (writeMode == WriteMode.EXACT_ONCE || writeMode == WriteMode.AT_LEAST_ONCE) { - bqOptions.setUseStorageWriteApi(true); - if (writeMode == WriteMode.AT_LEAST_ONCE) { - bqOptions.setUseStorageWriteApiAtLeastOnce(true); - } - bqOptions.setNumStorageWriteApiStreams(2); - bqOptions.setStorageWriteApiTriggeringFrequencySec(1); - } else { - // if sharding not explicit set, autosharding required streaming engine - ExperimentalOptions.addExperiment( - bqOptions.as(ExperimentalOptions.class), "enable_streaming_engine"); - // FILE_LOAD need tempLocation set - if (Strings.isNullOrEmpty(bqOptions.getTempLocation())) { - bqOptions.setTempLocation(bqOptions.as(TestPipelineOptions.class).getTempRoot()); - } + bqOptions.setUseStorageWriteApi(true); + if (writeMode == WriteMode.AT_LEAST_ONCE) { + bqOptions.setUseStorageWriteApiAtLeastOnce(true); } + bqOptions.setNumStorageWriteApiStreams(2); + bqOptions.setStorageWriteApiTriggeringFrequencySec(1); project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); } @@ -131,7 +116,8 @@ public Long apply(Instant input) { } } - private void runBigQueryIOWritePipeline(int rowCount, WriteMode writeMode, Boolean isStreaming) { + private void runBigQueryIOStorageWritePipeline( + int rowCount, WriteMode writeMode, Boolean isStreaming) { String tableName = isStreaming ? TABLE_PREFIX + "streaming_" + System.currentTimeMillis() @@ -143,26 +129,18 @@ private void runBigQueryIOWritePipeline(int rowCount, WriteMode writeMode, Boole new TableFieldSchema().setName("number").setType("INTEGER"), new TableFieldSchema().setName("str").setType("STRING"))); - BigQueryIO.Write writeTransform = - BigQueryIO.writeTableRows() - .to(String.format("%s:%s.%s", project, DATASET_ID, tableName)) - .withSchema(schema) - .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND); - - if (writeMode == WriteMode.FILE_LOAD && isStreaming) { - writeTransform = - writeTransform - .withMethod(BigQueryIO.Write.Method.FILE_LOADS) - .withTriggeringFrequency(Duration.standardSeconds(10)); - } - Pipeline p = Pipeline.create(bqOptions); p.apply( "Input", isStreaming ? new UnboundedStream(rowCount) : GenerateSequence.from(0).to(rowCount)) .apply("GenerateMessage", ParDo.of(new FillRowFn())) - .apply("WriteToBQ", writeTransform); + .apply( + "WriteToBQ", + BigQueryIO.writeTableRows() + .to(String.format("%s:%s.%s", project, DATASET_ID, tableName)) + .withSchema(schema) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); p.run().waitUntilFinish(); String testQuery = String.format("SELECT count(*) FROM [%s.%s];", DATASET_ID, tableName); try { @@ -182,30 +160,24 @@ private void runBigQueryIOWritePipeline(int rowCount, WriteMode writeMode, Boole @Test public void testBigQueryStorageWrite3MProto() { setUpTestEnvironment(WriteMode.EXACT_ONCE); - runBigQueryIOWritePipeline(3_000_000, WriteMode.EXACT_ONCE, false); + runBigQueryIOStorageWritePipeline(3_000_000, WriteMode.EXACT_ONCE, false); } @Test public void testBigQueryStorageWrite3MProtoALO() { setUpTestEnvironment(WriteMode.AT_LEAST_ONCE); - runBigQueryIOWritePipeline(3_000_000, WriteMode.AT_LEAST_ONCE, false); + runBigQueryIOStorageWritePipeline(3_000_000, WriteMode.AT_LEAST_ONCE, false); } @Test public void testBigQueryStorageWrite3KProtoStreaming() { setUpTestEnvironment(WriteMode.EXACT_ONCE); - runBigQueryIOWritePipeline(3000, WriteMode.EXACT_ONCE, true); + runBigQueryIOStorageWritePipeline(3000, WriteMode.EXACT_ONCE, true); } @Test public void testBigQueryStorageWrite3KProtoALOStreaming() { setUpTestEnvironment(WriteMode.AT_LEAST_ONCE); - runBigQueryIOWritePipeline(3000, WriteMode.AT_LEAST_ONCE, true); - } - - @Test - public void testBigQueryFileloadWrite3KJson() { - setUpTestEnvironment(WriteMode.FILE_LOAD); - runBigQueryIOWritePipeline(3000, WriteMode.FILE_LOAD, true); + runBigQueryIOStorageWritePipeline(3000, WriteMode.AT_LEAST_ONCE, true); } } From 43f014cb2a2f97d9d3f2491e6499ac38409dc16a Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 14 Sep 2023 13:47:20 -0400 Subject: [PATCH 5/5] Enable testing runner v2 --- runners/google-cloud-dataflow-java/build.gradle | 3 --- 1 file changed, 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 2acc30455e22..f6e2b9b147c5 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -612,9 +612,6 @@ task googleCloudPlatformRunnerV2IntegrationTest(type: Test) { exclude '**/FhirIOLROIT.class' exclude '**/FhirIOSearchIT.class' exclude '**/FhirIOPatientEverythingIT.class' - // failing due to pane index not incrementing after Reshuffle: - // https://github.com/apache/beam/issues/28219 - exclude '**/FileLoadsStreamingIT.class' maxParallelForks 4 classpath = configurations.googleCloudPlatformIntegrationTest