diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 7a1d5ff46fe3..4dd7c55947de 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -21,6 +21,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; +import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import java.util.Collections; import java.util.List; @@ -354,7 +355,7 @@ private WriteResult expandTriggered(PCollection> inpu rowWriterFactory)) .withSideInputs(tempFilePrefixView) .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); - PCollection> tempTables = + PCollection> tempTables = writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView); List> sideInputsForUpdateSchema = @@ -366,7 +367,7 @@ private WriteResult expandTriggered(PCollection> inpu // Now that the load job has happened, we want the rename to happen immediately. .apply( "Window Into Global Windows", - Window.>into(new GlobalWindows()) + Window.>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))) .apply("Add Void Key", WithKeys.of((Void) null)) .setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder())) @@ -374,7 +375,7 @@ private WriteResult expandTriggered(PCollection> inpu .apply("Extract Values", Values.create()) .apply( ParDo.of( - new UpdateSchemaDestination( + new UpdateSchemaDestination( bigQueryServices, tempLoadJobIdPrefixView, loadJobProjectId, @@ -473,7 +474,7 @@ public WriteResult expandUntriggered(PCollection> inp .apply("ReifyRenameInput", new ReifyAsIterable<>()) .apply( ParDo.of( - new UpdateSchemaDestination( + new UpdateSchemaDestination( bigQueryServices, tempLoadJobIdPrefixView, loadJobProjectId, @@ -708,7 +709,7 @@ public KV> apply( } // Take in a list of files and write them to temporary tables. - private PCollection> writeTempTables( + private PCollection> writeTempTables( PCollection, WritePartition.Result>> input, PCollectionView jobIdTokenView) { List> sideInputs = Lists.newArrayList(jobIdTokenView); @@ -719,9 +720,6 @@ private PCollection> writeTempTables( ShardedKeyCoder.of(NullableCoder.of(destinationCoder)), WritePartition.ResultCoder.INSTANCE); - Coder tableDestinationCoder = - clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of(); - // If WriteBundlesToFiles produced more than DEFAULT_MAX_FILES_PER_PARTITION files or // DEFAULT_MAX_BYTES_PER_PARTITION bytes, then // the import needs to be split into multiple partitions, and those partitions will be @@ -752,7 +750,7 @@ private PCollection> writeTempTables( // https://github.com/apache/beam/issues/21105 for additional details. schemaUpdateOptions, tempDataset)) - .setCoder(KvCoder.of(tableDestinationCoder, WriteTables.ResultCoder.INSTANCE)); + .setCoder(KvCoder.of(destinationCoder, WriteTables.ResultCoder.INSTANCE)); } // In the case where the files fit into a single load job, there's no need to write temporary @@ -771,7 +769,7 @@ PCollection writeSinglePartition( ShardedKeyCoder.of(NullableCoder.of(destinationCoder)), WritePartition.ResultCoder.INSTANCE); // Write single partition to final table - PCollection> successfulWrites = + PCollection> successfulWrites = input .setCoder(partitionsCoder) // Reshuffle will distribute this among multiple workers, and also guard against @@ -795,9 +793,35 @@ PCollection writeSinglePartition( useAvroLogicalTypes, schemaUpdateOptions, null)) - .setCoder(KvCoder.of(tableDestinationCoder, WriteTables.ResultCoder.INSTANCE)); + .setCoder(KvCoder.of(destinationCoder, WriteTables.ResultCoder.INSTANCE)); - return successfulWrites.apply(Keys.create()); + BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); + String defaultProjectId = + options.getBigQueryProject() == null ? options.getProject() : options.getBigQueryProject(); + + return successfulWrites + .apply(Keys.create()) + .apply( + "Convert to TableDestinations", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + dynamicDestinations.setSideInputAccessorFromProcessContext(c); + TableDestination tableDestination = + dynamicDestinations.getTable(c.element()); + TableReference tableReference = tableDestination.getTableReference(); + + // get project ID from options if it's not included in the table reference + if (Strings.isNullOrEmpty(tableReference.getProjectId())) { + tableReference.setProjectId(defaultProjectId); + tableDestination = tableDestination.withTableReference(tableReference); + } + c.output(tableDestination); + } + }) + .withSideInputs(sideInputs)) + .setCoder(tableDestinationCoder); } private WriteResult writeResult(Pipeline p, PCollection successfulWrites) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java index 4130b09556ed..574cb3e6d420 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java @@ -27,6 +27,7 @@ import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -38,14 +39,15 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SuppressWarnings({"nullness", "rawtypes"}) -public class UpdateSchemaDestination +public class UpdateSchemaDestination extends DoFn< - Iterable>, + Iterable>, Iterable>> { private static final Logger LOG = LoggerFactory.getLogger(UpdateSchemaDestination.class); @@ -114,21 +116,37 @@ public void startBundle(StartBundleContext c) { pendingJobs.clear(); } + TableDestination getTableWithDefaultProject(DestinationT destination, BigQueryOptions options) { + TableDestination tableDestination = dynamicDestinations.getTable(destination); + TableReference tableReference = tableDestination.getTableReference(); + + if (Strings.isNullOrEmpty(tableReference.getProjectId())) { + tableReference.setProjectId( + options.getBigQueryProject() == null + ? options.getProject() + : options.getBigQueryProject()); + tableDestination = tableDestination.withTableReference(tableReference); + } + + return tableDestination; + } + @ProcessElement public void processElement( - @Element Iterable> element, + @Element Iterable> element, ProcessContext context, BoundedWindow window) throws IOException { - Object destination = null; - for (KV entry : element) { + DestinationT destination = null; + BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class); + for (KV entry : element) { destination = entry.getKey(); if (destination != null) { break; } } if (destination != null) { - TableDestination tableDestination = dynamicDestinations.getTable(destination); + TableDestination tableDestination = getTableWithDefaultProject(destination, options); TableSchema schema = dynamicDestinations.getSchema(destination); TableReference tableReference = tableDestination.getTableReference(); String jobIdPrefix = @@ -153,8 +171,13 @@ public void processElement( if (updateSchemaDestinationJob != null) { pendingJobs.add(new PendingJobData(updateSchemaDestinationJob, tableDestination, window)); } - context.output(element); } + List> tableDestinations = new ArrayList<>(); + for (KV entry : element) { + tableDestinations.add( + KV.of(getTableWithDefaultProject(destination, options), entry.getValue())); + } + context.output(tableDestinations); } @Teardown diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index ce725c989b77..fadbca0280c3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -97,7 +97,7 @@ class WriteTables extends PTransform< PCollection, WritePartition.Result>>, - PCollection>> { + PCollection>> { @AutoValue abstract static class Result { abstract String getTableName(); @@ -135,7 +135,7 @@ public Result decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) private final Set schemaUpdateOptions; private final DynamicDestinations dynamicDestinations; private final List> sideInputs; - private final TupleTag> mainOutputTag; + private final TupleTag> mainOutputTag; private final TupleTag temporaryFilesTag; private final @Nullable ValueProvider loadJobProjectId; private final int maxRetryJobs; @@ -148,8 +148,7 @@ public Result decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) private final @Nullable String tempDataset; private class WriteTablesDoFn - extends DoFn< - KV, WritePartition.Result>, KV> { + extends DoFn, WritePartition.Result>, KV> { private Map jsonSchemas = Maps.newHashMap(); @@ -160,6 +159,7 @@ private class PendingJobData { final List partitionFiles; final TableDestination tableDestination; final TableReference tableReference; + final DestinationT destinationT; final boolean isFirstPane; public PendingJobData( @@ -168,12 +168,14 @@ public PendingJobData( List partitionFiles, TableDestination tableDestination, TableReference tableReference, + DestinationT destinationT, boolean isFirstPane) { this.window = window; this.retryJob = retryJob; this.partitionFiles = partitionFiles; this.tableDestination = tableDestination; this.tableReference = tableReference; + this.destinationT = destinationT; this.isFirstPane = isFirstPane; } } @@ -292,6 +294,7 @@ public void processElement( partitionFiles, tableDestination, tableReference, + destination, element.getValue().isFirstPane())); } @@ -359,7 +362,7 @@ public void finishBundle(FinishBundleContext c) throws Exception { pendingJob.isFirstPane); c.output( mainOutputTag, - KV.of(pendingJob.tableDestination, result), + KV.of(pendingJob.destinationT, result), pendingJob.window.maxTimestamp(), pendingJob.window); for (String file : pendingJob.partitionFiles) { @@ -423,7 +426,7 @@ public WriteTables( } @Override - public PCollection> expand( + public PCollection> expand( PCollection, WritePartition.Result>> input) { PCollectionTuple writeTablesOutputs = input.apply( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 290500286ad0..e37cad65591d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -104,8 +104,10 @@ import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Distinct; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -2209,6 +2211,8 @@ public void testWriteTables() throws Exception { p.apply("CreateJobId", Create.of("jobId")).apply(View.asSingleton()); List> sideInputs = ImmutableList.of(jobIdTokenView); + DynamicDestinations dynamicDestinations = new IdentityDynamicTables(); + fakeJobService.setNumFailuresExpected(3); WriteTables writeTables = new WriteTables<>( @@ -2218,7 +2222,7 @@ public void testWriteTables() throws Exception { BigQueryIO.Write.WriteDisposition.WRITE_EMPTY, BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED, sideInputs, - new IdentityDynamicTables(), + dynamicDestinations, null, 4, false, @@ -2231,12 +2235,24 @@ public void testWriteTables() throws Exception { PCollection> writeTablesOutput = writeTablesInput .apply(writeTables) - .setCoder(KvCoder.of(TableDestinationCoderV3.of(), WriteTables.ResultCoder.INSTANCE)); + .setCoder(KvCoder.of(StringUtf8Coder.of(), WriteTables.ResultCoder.INSTANCE)) + .apply( + ParDo.of( + new DoFn< + KV, + KV>() { + @ProcessElement + public void processElement( + @Element KV e, + OutputReceiver> o) { + o.output(KV.of(dynamicDestinations.getTable(e.getKey()), e.getValue())); + } + })); PAssert.thatMultimap(writeTablesOutput) .satisfies( input -> { - assertEquals(input.keySet(), expectedTempTables.keySet()); + assertEquals(expectedTempTables.keySet(), input.keySet()); for (Map.Entry> entry : input.entrySet()) { Iterable tableNames =