From c0077a863151dab4180a17f442a45b7989279320 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Wed, 12 Oct 2022 14:47:48 +0000 Subject: [PATCH 1/2] set table destination coder for multi partition writes --- .../apache/beam/sdk/io/gcp/bigquery/BatchLoads.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 82424412ffc5..6dfab240a8b2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -146,8 +146,8 @@ class BatchLoads private final Coder elementCoder; private final RowWriterFactory rowWriterFactory; private final @Nullable String kmsKey; - private final boolean clusteringEnabled; private final String tempDataset; + private Coder tableDestinationCoder; // The maximum number of times to retry failed load or copy jobs. private int maxRetryJobs = DEFAULT_MAX_RETRY_JOBS; @@ -186,9 +186,9 @@ class BatchLoads this.elementCoder = elementCoder; this.kmsKey = kmsKey; this.rowWriterFactory = rowWriterFactory; - this.clusteringEnabled = clusteringEnabled; schemaUpdateOptions = Collections.emptySet(); this.tempDataset = tempDataset; + this.tableDestinationCoder = clusteringEnabled ? TableDestinationCoderV3.of(): TableDestinationCoderV2.of(); } void setSchemaUpdateOptions(Set schemaUpdateOptions) { @@ -493,7 +493,8 @@ public WriteResult expandUntriggered(PCollection> inp maxRetryJobs, kmsKey, loadJobProjectId)) - .withSideInputs(copyJobIdPrefixView)); + .withSideInputs(copyJobIdPrefixView)) + .setCoder(tableDestinationCoder); PCollectionList allSuccessfulWrites = PCollectionList.of(successfulSinglePartitionWrites).and(successfulMultiPartitionWrites); @@ -755,9 +756,6 @@ PCollection writeSinglePartition( List> sideInputs = Lists.newArrayList(loadJobIdPrefixView); sideInputs.addAll(dynamicDestinations.getSideInputs()); - Coder tableDestinationCoder = - clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of(); - Coder, WritePartition.Result>> partitionsCoder = KvCoder.of( ShardedKeyCoder.of(NullableCoder.of(destinationCoder)), From 3a64fac5ab873f6a1a678556329184e2fb720023 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Wed, 12 Oct 2022 15:22:43 +0000 Subject: [PATCH 2/2] spotless --- .../java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 6dfab240a8b2..ada76e1ef484 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -188,7 +188,8 @@ class BatchLoads this.rowWriterFactory = rowWriterFactory; schemaUpdateOptions = Collections.emptySet(); this.tempDataset = tempDataset; - this.tableDestinationCoder = clusteringEnabled ? TableDestinationCoderV3.of(): TableDestinationCoderV2.of(); + this.tableDestinationCoder = + clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of(); } void setSchemaUpdateOptions(Set schemaUpdateOptions) {