diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 82424412ffc5..ada76e1ef484 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -146,8 +146,8 @@ class BatchLoads private final Coder elementCoder; private final RowWriterFactory rowWriterFactory; private final @Nullable String kmsKey; - private final boolean clusteringEnabled; private final String tempDataset; + private Coder tableDestinationCoder; // The maximum number of times to retry failed load or copy jobs. private int maxRetryJobs = DEFAULT_MAX_RETRY_JOBS; @@ -186,9 +186,10 @@ class BatchLoads this.elementCoder = elementCoder; this.kmsKey = kmsKey; this.rowWriterFactory = rowWriterFactory; - this.clusteringEnabled = clusteringEnabled; schemaUpdateOptions = Collections.emptySet(); this.tempDataset = tempDataset; + this.tableDestinationCoder = + clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of(); } void setSchemaUpdateOptions(Set schemaUpdateOptions) { @@ -493,7 +494,8 @@ public WriteResult expandUntriggered(PCollection> inp maxRetryJobs, kmsKey, loadJobProjectId)) - .withSideInputs(copyJobIdPrefixView)); + .withSideInputs(copyJobIdPrefixView)) + .setCoder(tableDestinationCoder); PCollectionList allSuccessfulWrites = PCollectionList.of(successfulSinglePartitionWrites).and(successfulMultiPartitionWrites); @@ -755,9 +757,6 @@ PCollection writeSinglePartition( List> sideInputs = Lists.newArrayList(loadJobIdPrefixView); sideInputs.addAll(dynamicDestinations.getSideInputs()); - Coder tableDestinationCoder = - clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of(); - Coder, WritePartition.Result>> partitionsCoder = KvCoder.of( ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),