From 5d0a6f2d69cf6aca3024050d876bf149045be412 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Thu, 1 Dec 2022 19:27:43 +0000 Subject: [PATCH 1/3] always attempt to match tables with final destination --- .../org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index ada76e1ef484..241fe71a9c6d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -716,6 +716,13 @@ private PCollection> writeTempTables( ShardedKeyCoder.of(NullableCoder.of(destinationCoder)), WritePartition.ResultCoder.INSTANCE); + // If the final destination table exists already (and we're appending to it), then the temp + // tables must exactly match schema, partitioning, etc. Wrap the DynamicDestinations object + // with one that makes this happen. + DynamicDestinations matchedDestinations = + DynamicDestinationsHelpers.matchTableDynamicDestinations( + dynamicDestinations, bigQueryServices); + // If WriteBundlesToFiles produced more than DEFAULT_MAX_FILES_PER_PARTITION files or // DEFAULT_MAX_BYTES_PER_PARTITION bytes, then // the import needs to be split into multiple partitions, and those partitions will be @@ -734,7 +741,7 @@ private PCollection> writeTempTables( WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, sideInputs, - dynamicDestinations, + matchedDestinations, loadJobProjectId, maxRetryJobs, ignoreUnknownValues, From e7fecfa17c3654d30a236d4893c711d20da5683c Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Thu, 1 Dec 2022 22:02:29 +0000 Subject: [PATCH 2/3] do not match when schema update options exist --- .../apache/beam/sdk/io/gcp/bigquery/BatchLoads.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 241fe71a9c6d..b9ae34d85314 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -719,9 +719,12 @@ private PCollection> writeTempTables( // If the final destination table exists already (and we're appending to it), then the temp // tables must exactly match schema, partitioning, etc. Wrap the DynamicDestinations object // with one that makes this happen. - DynamicDestinations matchedDestinations = - DynamicDestinationsHelpers.matchTableDynamicDestinations( - dynamicDestinations, bigQueryServices); + // In the case schemaUpdateOptions are specified by the user, matching does not occur in order + // to respect those options. + DynamicDestinations destinations = dynamicDestinations; + if (schemaUpdateOptions.isEmpty()) { + destinations = DynamicDestinationsHelpers.matchTableDynamicDestinations(dynamicDestinations, bigQueryServices); + } // If WriteBundlesToFiles produced more than DEFAULT_MAX_FILES_PER_PARTITION files or // DEFAULT_MAX_BYTES_PER_PARTITION bytes, then @@ -741,7 +744,7 @@ private PCollection> writeTempTables( WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, sideInputs, - matchedDestinations, + destinations, loadJobProjectId, maxRetryJobs, ignoreUnknownValues, From 6f7a9d06b9c56e745732525b58620ecec171a3fa Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Thu, 1 Dec 2022 22:05:09 +0000 Subject: [PATCH 3/3] spotless --- .../java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index b9ae34d85314..ec3ba90e02ba 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -723,7 +723,9 @@ private PCollection> writeTempTables( // to respect those options. DynamicDestinations destinations = dynamicDestinations; if (schemaUpdateOptions.isEmpty()) { - destinations = DynamicDestinationsHelpers.matchTableDynamicDestinations(dynamicDestinations, bigQueryServices); + destinations = + DynamicDestinationsHelpers.matchTableDynamicDestinations( + dynamicDestinations, bigQueryServices); } // If WriteBundlesToFiles produced more than DEFAULT_MAX_FILES_PER_PARTITION files or