From 86d1177240b532e34c50019fe08ffef3478b80da Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Mon, 8 Aug 2022 19:55:07 +0000 Subject: [PATCH 1/5] keeping hold of user specified dynamic destination type to be able to use it in UpdateSchemaDestinations --- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 32 ++++++++++++------- .../gcp/bigquery/UpdateSchemaDestination.java | 15 ++++++--- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 15 +++++---- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 15 +++++---- 4 files changed, 48 insertions(+), 29 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 7a1d5ff46fe3..8284be5ef0ba 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -354,7 +354,7 @@ private WriteResult expandTriggered(PCollection> inpu rowWriterFactory)) .withSideInputs(tempFilePrefixView) .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); - PCollection> tempTables = + PCollection> tempTables = writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView); List> sideInputsForUpdateSchema = @@ -366,7 +366,7 @@ private WriteResult expandTriggered(PCollection> inpu // Now that the load job has happened, we want the rename to happen immediately. .apply( "Window Into Global Windows", - Window.>into(new GlobalWindows()) + Window.>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))) .apply("Add Void Key", WithKeys.of((Void) null)) .setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder())) @@ -374,7 +374,7 @@ private WriteResult expandTriggered(PCollection> inpu .apply("Extract Values", Values.create()) .apply( ParDo.of( - new UpdateSchemaDestination( + new UpdateSchemaDestination( bigQueryServices, tempLoadJobIdPrefixView, loadJobProjectId, @@ -473,7 +473,7 @@ public WriteResult expandUntriggered(PCollection> inp .apply("ReifyRenameInput", new ReifyAsIterable<>()) .apply( ParDo.of( - new UpdateSchemaDestination( + new UpdateSchemaDestination( bigQueryServices, tempLoadJobIdPrefixView, loadJobProjectId, @@ -708,7 +708,7 @@ public KV> apply( } // Take in a list of files and write them to temporary tables. - private PCollection> writeTempTables( + private PCollection> writeTempTables( PCollection, WritePartition.Result>> input, PCollectionView jobIdTokenView) { List> sideInputs = Lists.newArrayList(jobIdTokenView); @@ -719,9 +719,6 @@ private PCollection> writeTempTables( ShardedKeyCoder.of(NullableCoder.of(destinationCoder)), WritePartition.ResultCoder.INSTANCE); - Coder tableDestinationCoder = - clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of(); - // If WriteBundlesToFiles produced more than DEFAULT_MAX_FILES_PER_PARTITION files or // DEFAULT_MAX_BYTES_PER_PARTITION bytes, then // the import needs to be split into multiple partitions, and those partitions will be @@ -752,7 +749,7 @@ private PCollection> writeTempTables( // https://github.com/apache/beam/issues/21105 for additional details. schemaUpdateOptions, tempDataset)) - .setCoder(KvCoder.of(tableDestinationCoder, WriteTables.ResultCoder.INSTANCE)); + .setCoder(KvCoder.of(destinationCoder, WriteTables.ResultCoder.INSTANCE)); } // In the case where the files fit into a single load job, there's no need to write temporary @@ -771,7 +768,7 @@ PCollection writeSinglePartition( ShardedKeyCoder.of(NullableCoder.of(destinationCoder)), WritePartition.ResultCoder.INSTANCE); // Write single partition to final table - PCollection> successfulWrites = + PCollection> successfulWrites = input .setCoder(partitionsCoder) // Reshuffle will distribute this among multiple workers, and also guard against @@ -795,9 +792,20 @@ PCollection writeSinglePartition( useAvroLogicalTypes, schemaUpdateOptions, null)) - .setCoder(KvCoder.of(tableDestinationCoder, WriteTables.ResultCoder.INSTANCE)); + .setCoder(KvCoder.of(destinationCoder, WriteTables.ResultCoder.INSTANCE)); - return successfulWrites.apply(Keys.create()); + return successfulWrites + .apply(Keys.create()) + .apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element DestinationT dest, OutputReceiver o) { + o.output(dynamicDestinations.getTable(dest)); + } + })) + .setCoder(tableDestinationCoder); } private WriteResult writeResult(Pipeline p, PCollection successfulWrites) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java index 4130b09556ed..b09789b9bd72 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java @@ -27,6 +27,7 @@ import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -43,9 +44,9 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"nullness", "rawtypes"}) -public class UpdateSchemaDestination +public class UpdateSchemaDestination extends DoFn< - Iterable>, + Iterable>, Iterable>> { private static final Logger LOG = LoggerFactory.getLogger(UpdateSchemaDestination.class); @@ -116,12 +117,12 @@ public void startBundle(StartBundleContext c) { @ProcessElement public void processElement( - @Element Iterable> element, + @Element Iterable> element, ProcessContext context, BoundedWindow window) throws IOException { Object destination = null; - for (KV entry : element) { + for (KV entry : element) { destination = entry.getKey(); if (destination != null) { break; @@ -153,8 +154,12 @@ public void processElement( if (updateSchemaDestinationJob != null) { pendingJobs.add(new PendingJobData(updateSchemaDestinationJob, tableDestination, window)); } - context.output(element); } + List> tableDestinations = new ArrayList<>(); + for (KV entry : element) { + tableDestinations.add(KV.of(dynamicDestinations.getTable(entry.getKey()), entry.getValue())); + } + context.output(tableDestinations); } @Teardown diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index ce725c989b77..fadbca0280c3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -97,7 +97,7 @@ class WriteTables extends PTransform< PCollection, WritePartition.Result>>, - PCollection>> { + PCollection>> { @AutoValue abstract static class Result { abstract String getTableName(); @@ -135,7 +135,7 @@ public Result decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) private final Set schemaUpdateOptions; private final DynamicDestinations dynamicDestinations; private final List> sideInputs; - private final TupleTag> mainOutputTag; + private final TupleTag> mainOutputTag; private final TupleTag temporaryFilesTag; private final @Nullable ValueProvider loadJobProjectId; private final int maxRetryJobs; @@ -148,8 +148,7 @@ public Result decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) private final @Nullable String tempDataset; private class WriteTablesDoFn - extends DoFn< - KV, WritePartition.Result>, KV> { + extends DoFn, WritePartition.Result>, KV> { private Map jsonSchemas = Maps.newHashMap(); @@ -160,6 +159,7 @@ private class PendingJobData { final List partitionFiles; final TableDestination tableDestination; final TableReference tableReference; + final DestinationT destinationT; final boolean isFirstPane; public PendingJobData( @@ -168,12 +168,14 @@ public PendingJobData( List partitionFiles, TableDestination tableDestination, TableReference tableReference, + DestinationT destinationT, boolean isFirstPane) { this.window = window; this.retryJob = retryJob; this.partitionFiles = partitionFiles; this.tableDestination = tableDestination; this.tableReference = tableReference; + this.destinationT = destinationT; this.isFirstPane = isFirstPane; } } @@ -292,6 +294,7 @@ public void processElement( partitionFiles, tableDestination, tableReference, + destination, element.getValue().isFirstPane())); } @@ -359,7 +362,7 @@ public void finishBundle(FinishBundleContext c) throws Exception { pendingJob.isFirstPane); c.output( mainOutputTag, - KV.of(pendingJob.tableDestination, result), + KV.of(pendingJob.destinationT, result), pendingJob.window.maxTimestamp(), pendingJob.window); for (String file : pendingJob.partitionFiles) { @@ -423,7 +426,7 @@ public WriteTables( } @Override - public PCollection> expand( + public PCollection> expand( PCollection, WritePartition.Result>> input) { PCollectionTuple writeTablesOutputs = input.apply( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 290500286ad0..36db54e4d4a5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -2209,6 +2209,8 @@ public void testWriteTables() throws Exception { p.apply("CreateJobId", Create.of("jobId")).apply(View.asSingleton()); List> sideInputs = ImmutableList.of(jobIdTokenView); + DynamicDestinations dynamicDestinations = new IdentityDynamicTables(); + fakeJobService.setNumFailuresExpected(3); WriteTables writeTables = new WriteTables<>( @@ -2218,7 +2220,7 @@ public void testWriteTables() throws Exception { BigQueryIO.Write.WriteDisposition.WRITE_EMPTY, BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED, sideInputs, - new IdentityDynamicTables(), + dynamicDestinations, null, 4, false, @@ -2228,24 +2230,25 @@ public void testWriteTables() throws Exception { Collections.emptySet(), null); - PCollection> writeTablesOutput = + PCollection> writeTablesOutput = writeTablesInput .apply(writeTables) - .setCoder(KvCoder.of(TableDestinationCoderV3.of(), WriteTables.ResultCoder.INSTANCE)); + .setCoder(KvCoder.of(StringUtf8Coder.of(), WriteTables.ResultCoder.INSTANCE)); PAssert.thatMultimap(writeTablesOutput) .satisfies( input -> { assertEquals(input.keySet(), expectedTempTables.keySet()); - for (Map.Entry> entry : - input.entrySet()) { + for (Map.Entry> entry : input.entrySet()) { Iterable tableNames = StreamSupport.stream(entry.getValue().spliterator(), false) .map(Result::getTableName) .collect(Collectors.toList()); @SuppressWarnings("unchecked") String[] expectedValues = - Iterables.toArray(expectedTempTables.get(entry.getKey()), String.class); + Iterables.toArray( + expectedTempTables.get(dynamicDestinations.getTable(entry.getKey())), + String.class); assertThat(tableNames, containsInAnyOrder(expectedValues)); } return null; From 4163ac5b59cd2fc30c91846d4635a11480b0cd26 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Tue, 9 Aug 2022 19:21:56 +0000 Subject: [PATCH 2/5] fix for testWriteTables --- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 36db54e4d4a5..b210c3433974 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -104,8 +104,10 @@ import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Distinct; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -2230,25 +2232,31 @@ public void testWriteTables() throws Exception { Collections.emptySet(), null); - PCollection> writeTablesOutput = + PCollection> writeTablesOutput = writeTablesInput .apply(writeTables) - .setCoder(KvCoder.of(StringUtf8Coder.of(), WriteTables.ResultCoder.INSTANCE)); + .setCoder(KvCoder.of(StringUtf8Coder.of(), WriteTables.ResultCoder.INSTANCE)) + .apply(ParDo.of(new DoFn, KV>() { + @ProcessElement + public void processElement(@Element KV e, OutputReceiver> o) { + o.output(KV.of(dynamicDestinations.getTable(e.getKey()), e.getValue())); + } + })); + + PAssert.thatMultimap(writeTablesOutput) .satisfies( input -> { - assertEquals(input.keySet(), expectedTempTables.keySet()); - for (Map.Entry> entry : input.entrySet()) { + assertEquals(expectedTempTables.keySet(), input.keySet()); + for (Map.Entry> entry : input.entrySet()) { Iterable tableNames = StreamSupport.stream(entry.getValue().spliterator(), false) .map(Result::getTableName) .collect(Collectors.toList()); @SuppressWarnings("unchecked") String[] expectedValues = - Iterables.toArray( - expectedTempTables.get(dynamicDestinations.getTable(entry.getKey())), - String.class); + Iterables.toArray(expectedTempTables.get(entry.getKey()), String.class); assertThat(tableNames, containsInAnyOrder(expectedValues)); } return null; From fc503b9c99a4a5310e682cef200ed683c0c77487 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Wed, 10 Aug 2022 17:37:45 +0000 Subject: [PATCH 3/5] cleanup and support default project when not included in table ref --- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 26 ++++++++++++++----- .../gcp/bigquery/UpdateSchemaDestination.java | 23 +++++++++++++--- 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 8284be5ef0ba..7e1ab1f93e6b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -21,6 +21,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; +import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import java.util.Collections; import java.util.List; @@ -794,15 +795,26 @@ PCollection writeSinglePartition( null)) .setCoder(KvCoder.of(destinationCoder, WriteTables.ResultCoder.INSTANCE)); + BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); + String defaultProjectId = options.getBigQueryProject() == null + ? options.getProject() + : options.getBigQueryProject(); + return successfulWrites .apply(Keys.create()) - .apply( - ParDo.of( - new DoFn() { - @ProcessElement - public void processElement( - @Element DestinationT dest, OutputReceiver o) { - o.output(dynamicDestinations.getTable(dest)); + .apply("Convert to TableDestinations", MapElements.via( + new SimpleFunction() { + @Override + public TableDestination apply(DestinationT dest) { + TableDestination tableDestination = dynamicDestinations.getTable(dest); + TableReference tableReference = tableDestination.getTableReference(); + + // get project ID from options if it's not included in the table reference + if (Strings.isNullOrEmpty(tableReference.getProjectId())) { + tableReference.setProjectId(defaultProjectId); + tableDestination = tableDestination.withTableReference(tableReference); + } + return tableDestination; } })) .setCoder(tableDestinationCoder); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java index b09789b9bd72..3e6205241699 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.client.http.ByteArrayContent; +import com.google.api.client.util.Strings; import com.google.api.services.bigquery.model.Clustering; import com.google.api.services.bigquery.model.EncryptionConfiguration; import com.google.api.services.bigquery.model.JobConfigurationLoad; @@ -115,13 +116,29 @@ public void startBundle(StartBundleContext c) { pendingJobs.clear(); } + TableDestination getTableWithDefaultProject(DestinationT destination, BigQueryOptions options) { + TableDestination tableDestination = dynamicDestinations.getTable(destination); + TableReference tableReference = tableDestination.getTableReference(); + + if (Strings.isNullOrEmpty(tableReference.getProjectId())) { + tableReference.setProjectId( + options.getBigQueryProject() == null + ? options.getProject() + : options.getBigQueryProject()); + tableDestination = tableDestination.withTableReference(tableReference); + } + + return tableDestination; + } + @ProcessElement public void processElement( @Element Iterable> element, ProcessContext context, BoundedWindow window) throws IOException { - Object destination = null; + DestinationT destination = null; + BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class); for (KV entry : element) { destination = entry.getKey(); if (destination != null) { @@ -129,7 +146,7 @@ public void processElement( } } if (destination != null) { - TableDestination tableDestination = dynamicDestinations.getTable(destination); + TableDestination tableDestination = getTableWithDefaultProject(destination, options); TableSchema schema = dynamicDestinations.getSchema(destination); TableReference tableReference = tableDestination.getTableReference(); String jobIdPrefix = @@ -157,7 +174,7 @@ public void processElement( } List> tableDestinations = new ArrayList<>(); for (KV entry : element) { - tableDestinations.add(KV.of(dynamicDestinations.getTable(entry.getKey()), entry.getValue())); + tableDestinations.add(KV.of(getTableWithDefaultProject(destination, options), entry.getValue())); } context.output(tableDestinations); } From c15f6247194ec9b15fb0ce1b07893fb68513200f Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Thu, 11 Aug 2022 22:48:55 +0000 Subject: [PATCH 4/5] allow side inputs called from getTable() --- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 7e1ab1f93e6b..ccbb98e73f85 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -23,6 +23,7 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; +import com.sun.xml.internal.bind.v2.TODO; import java.util.Collections; import java.util.List; import java.util.Set; @@ -802,21 +803,21 @@ PCollection writeSinglePartition( return successfulWrites .apply(Keys.create()) - .apply("Convert to TableDestinations", MapElements.via( - new SimpleFunction() { - @Override - public TableDestination apply(DestinationT dest) { - TableDestination tableDestination = dynamicDestinations.getTable(dest); - TableReference tableReference = tableDestination.getTableReference(); - - // get project ID from options if it's not included in the table reference - if (Strings.isNullOrEmpty(tableReference.getProjectId())) { - tableReference.setProjectId(defaultProjectId); - tableDestination = tableDestination.withTableReference(tableReference); - } - return tableDestination; - } - })) + .apply("Convert to TableDestinations", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + dynamicDestinations.setSideInputAccessorFromProcessContext(c); + TableDestination tableDestination = dynamicDestinations.getTable(c.element()); + TableReference tableReference = tableDestination.getTableReference(); + + // get project ID from options if it's not included in the table reference + if (Strings.isNullOrEmpty(tableReference.getProjectId())) { + tableReference.setProjectId(defaultProjectId); + tableDestination = tableDestination.withTableReference(tableReference); + } + c.output(tableDestination); + } + }).withSideInputs(sideInputs)) .setCoder(tableDestinationCoder); } From 67d6e8aa1a7d8107e6fd84824364fc215269cc02 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Fri, 12 Aug 2022 03:43:23 +0000 Subject: [PATCH 5/5] style fixes --- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 41 ++++++++++--------- .../gcp/bigquery/UpdateSchemaDestination.java | 5 ++- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 23 +++++++---- 3 files changed, 39 insertions(+), 30 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index ccbb98e73f85..4dd7c55947de 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -23,7 +23,6 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; -import com.sun.xml.internal.bind.v2.TODO; import java.util.Collections; import java.util.List; import java.util.Set; @@ -797,27 +796,31 @@ PCollection writeSinglePartition( .setCoder(KvCoder.of(destinationCoder, WriteTables.ResultCoder.INSTANCE)); BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); - String defaultProjectId = options.getBigQueryProject() == null - ? options.getProject() - : options.getBigQueryProject(); + String defaultProjectId = + options.getBigQueryProject() == null ? options.getProject() : options.getBigQueryProject(); return successfulWrites .apply(Keys.create()) - .apply("Convert to TableDestinations", ParDo.of(new DoFn() { - @ProcessElement - public void processElement(ProcessContext c) { - dynamicDestinations.setSideInputAccessorFromProcessContext(c); - TableDestination tableDestination = dynamicDestinations.getTable(c.element()); - TableReference tableReference = tableDestination.getTableReference(); - - // get project ID from options if it's not included in the table reference - if (Strings.isNullOrEmpty(tableReference.getProjectId())) { - tableReference.setProjectId(defaultProjectId); - tableDestination = tableDestination.withTableReference(tableReference); - } - c.output(tableDestination); - } - }).withSideInputs(sideInputs)) + .apply( + "Convert to TableDestinations", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + dynamicDestinations.setSideInputAccessorFromProcessContext(c); + TableDestination tableDestination = + dynamicDestinations.getTable(c.element()); + TableReference tableReference = tableDestination.getTableReference(); + + // get project ID from options if it's not included in the table reference + if (Strings.isNullOrEmpty(tableReference.getProjectId())) { + tableReference.setProjectId(defaultProjectId); + tableDestination = tableDestination.withTableReference(tableReference); + } + c.output(tableDestination); + } + }) + .withSideInputs(sideInputs)) .setCoder(tableDestinationCoder); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java index 3e6205241699..574cb3e6d420 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.client.http.ByteArrayContent; -import com.google.api.client.util.Strings; import com.google.api.services.bigquery.model.Clustering; import com.google.api.services.bigquery.model.EncryptionConfiguration; import com.google.api.services.bigquery.model.JobConfigurationLoad; @@ -40,6 +39,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,7 +174,8 @@ public void processElement( } List> tableDestinations = new ArrayList<>(); for (KV entry : element) { - tableDestinations.add(KV.of(getTableWithDefaultProject(destination, options), entry.getValue())); + tableDestinations.add( + KV.of(getTableWithDefaultProject(destination, options), entry.getValue())); } context.output(tableDestinations); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index b210c3433974..e37cad65591d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -2236,20 +2236,25 @@ public void testWriteTables() throws Exception { writeTablesInput .apply(writeTables) .setCoder(KvCoder.of(StringUtf8Coder.of(), WriteTables.ResultCoder.INSTANCE)) - .apply(ParDo.of(new DoFn, KV>() { - @ProcessElement - public void processElement(@Element KV e, OutputReceiver> o) { - o.output(KV.of(dynamicDestinations.getTable(e.getKey()), e.getValue())); - } - })); - - + .apply( + ParDo.of( + new DoFn< + KV, + KV>() { + @ProcessElement + public void processElement( + @Element KV e, + OutputReceiver> o) { + o.output(KV.of(dynamicDestinations.getTable(e.getKey()), e.getValue())); + } + })); PAssert.thatMultimap(writeTablesOutput) .satisfies( input -> { assertEquals(expectedTempTables.keySet(), input.keySet()); - for (Map.Entry> entry : input.entrySet()) { + for (Map.Entry> entry : + input.entrySet()) { Iterable tableNames = StreamSupport.stream(entry.getValue().spliterator(), false) .map(Result::getTableName)