From 9eb57c1b44d8558736b6b0af2f9ee1c36dda923a Mon Sep 17 00:00:00 2001 From: Miguel Anzo Date: Mon, 20 Dec 2021 23:09:29 -0600 Subject: [PATCH 1/8] Added a zero row job to bigquery writeTable --- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 141 ++++++++++++++++++ 1 file changed, 141 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index 1637f1388bab..d337cb997669 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -23,6 +23,7 @@ import com.google.api.services.bigquery.model.EncryptionConfiguration; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; @@ -30,6 +31,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -269,6 +271,19 @@ public void processElement( createDisposition = CreateDisposition.CREATE_IF_NEEDED; } + BigQueryHelpers.PendingJob schemaJob = + startZeroLoadJob( + getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), + getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), + jobIdPrefix, + tableReference, + tableDestination.getTimePartitioning(), + tableDestination.getClustering(), + tableSchema, + writeDisposition, + createDisposition, + schemaUpdateOptions); + BigQueryHelpers.PendingJob retryJob = startLoad( getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), @@ -282,6 +297,16 @@ public void processElement( writeDisposition, createDisposition, schemaUpdateOptions); + if (schemaJob != null) { + pendingJobs.add( + new PendingJobData( + window, + schemaJob, + partitionFiles, + tableDestination, + tableReference, + element.getValue().isFirstPane())); + } pendingJobs.add( new PendingJobData( window, @@ -549,6 +574,122 @@ private PendingJob startLoad( return retryJob; } + private PendingJob startZeroLoadJob( + JobService jobService, + DatasetService datasetService, + String jobIdPrefix, + TableReference ref, + TimePartitioning timePartitioning, + Clustering clustering, + @Nullable TableSchema schema, + WriteDisposition writeDisposition, + CreateDisposition createDisposition, + Set schemaUpdateOptions) { + JobConfigurationLoad loadConfig = + new JobConfigurationLoad() + .setDestinationTable(ref) + .setSchema(schema) + .setSourceUris(Collections.EMPTY_LIST) + .setWriteDisposition(writeDisposition.name()) + .setCreateDisposition(createDisposition.name()) + .setSourceFormat(sourceFormat) + .setIgnoreUnknownValues(ignoreUnknownValues) + .setUseAvroLogicalTypes(useAvroLogicalTypes); + if (schemaUpdateOptions != null) { + List options = + schemaUpdateOptions.stream() + .map(Enum::name) + .collect(Collectors.toList()); + loadConfig.setSchemaUpdateOptions(options); + } + if (!loadConfig.getWriteDisposition().equals(WriteDisposition.WRITE_TRUNCATE.toString()) + || !loadConfig.getWriteDisposition().equals(WriteDisposition.WRITE_APPEND.toString())) { + return null; + } + Table destinationTable = null; + try { + destinationTable = datasetService.getTable(ref); + if (destinationTable == null) { + return null; // no need to update schema ahead if table does not exists + } + } catch (IOException | InterruptedException e) { + LOG.warn("Failed to get table {} with {}", ref, e.toString()); + throw new RuntimeException(e); + } + if (destinationTable.getSchema().equals(schema)) { + return null; // no need to update schema ahead if schema is already the same + } + if (timePartitioning != null) { + loadConfig.setTimePartitioning(timePartitioning); + // only set clustering if timePartitioning is set + if (clustering != null) { + loadConfig.setClustering(clustering); + } + } + if (kmsKey != null) { + loadConfig.setDestinationEncryptionConfiguration( + new EncryptionConfiguration().setKmsKeyName(kmsKey)); + } + String projectId = + loadJobProjectId == null || loadJobProjectId.get() == null + ? ref.getProjectId() + : loadJobProjectId.get(); + String bqLocation = + BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), ref.getDatasetId()); + + PendingJob retryJob = + new PendingJob( + // Function to load the data. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + LOG.info( + "Loading zero rows using job {}, job id {} iteration {}", + ref, + jobRef, + jobId.getRetryIndex()); + try { + jobService.startLoadJob(jobRef, loadConfig); + } catch (IOException | InterruptedException e) { + LOG.warn("Load job {} failed with {}", jobRef, e.toString()); + throw new RuntimeException(e); + } + return null; + }, + // Function to poll the result of a load job. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + try { + return jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + // Function to lookup a job. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + try { + return jobService.getJob(jobRef); + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); + } + }, + maxRetryJobs, + jobIdPrefix); + return retryJob; + } + static void removeTemporaryFiles(Iterable files) throws IOException { ImmutableList.Builder fileResources = ImmutableList.builder(); for (String file : files) { From ebc47fd5be8138cc7065453e5640e1a98989946c Mon Sep 17 00:00:00 2001 From: Miguel Anzo Date: Wed, 22 Dec 2021 12:05:54 -0600 Subject: [PATCH 2/8] added unit test --- .../BigQuerySchemaUpdateOptionsIT.java | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java index ed75a6688075..72831e4c5bef 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java @@ -219,4 +219,64 @@ public void testAllowFieldRelaxation() throws Exception { List> expectedResult = Arrays.asList(Arrays.asList(value)); runWriteTest(schemaUpdateOptions, tableName, newSchema, rowToInsert, testQuery, expectedResult); } + + @Test + public void runWriteTestTempTables() throws Exception { + String tableName = makeTestTable(); + + Set schemaUpdateOptions = + EnumSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION); + + TableSchema schema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("new_field").setType("STRING"), + new TableFieldSchema().setName("optional_field").setType("STRING"), + new TableFieldSchema() + .setName("required_field") + .setType("STRING") + .setMode("REQUIRED"))); + + String[] values = {"meow", "bark"}; + TableRow rowToInsert = + new TableRow().set("new_field", values[0]).set("required_field", values[1]); + + String testQuery = + String.format( + "SELECT new_field, required_field FROM [%s.%s];", BIG_QUERY_DATASET_ID, tableName); + + List> expectedResult = Arrays.asList(Arrays.asList(values)); + Options options = TestPipeline.testingPipelineOptions().as(Options.class); + options.setTempLocation(options.getTempRoot() + "/bq_it_temp"); + + Pipeline p = Pipeline.create(options); + Create.Values input = Create.of(rowToInsert); + + Write writer = + BigQueryIO.writeTableRows() + .to(String.format("%s:%s.%s", options.getProject(), BIG_QUERY_DATASET_ID, tableName)) + .withSchema(schema) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) + .withSchemaUpdateOptions(schemaUpdateOptions) + .withMaxBytesPerPartition(1) + .withMaxFilesPerPartition(1); + + p.apply(input).apply(writer); + p.run().waitUntilFinish(); + + QueryResponse response = BQ_CLIENT.queryWithRetries(testQuery, project); + + List> result = + response.getRows().stream() + .map( + row -> + row.getF().stream() + .map(cell -> cell.getV().toString()) + .collect(Collectors.toList())) + .collect(Collectors.toList()); + + assertEquals(expectedResult, result); + } } From 6ade141c0988ffffd7209d236abcf5112970cfd3 Mon Sep 17 00:00:00 2001 From: Marco Robles Date: Fri, 25 Mar 2022 11:40:05 -0600 Subject: [PATCH 3/8] add sdf for update schema destination --- .../gcp/bigquery/UpdateSchemaDestination.java | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java new file mode 100644 index 000000000000..2fa479aba45e --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java @@ -0,0 +1,76 @@ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableReference; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.List; + +public class UpdateSchemaDestination extends DoFn>, TableDestination> { + + private static final Logger LOG = LoggerFactory.getLogger(UpdateSchemaDestination.class); + private final BigQueryServices bqServices; + private final PCollectionView jobIdToken; + private final ValueProvider loadJobProjectId; + private transient @Nullable DatasetService datasetService; + + private static class PendingJobData { + final BigQueryHelpers.PendingJob retryJob; + final TableDestination tableDestination; + final List tempTables; + final BoundedWindow window; + + public PendingJobData( + BigQueryHelpers.PendingJob retryJob, + TableDestination tableDestination, + List tempTables, + BoundedWindow window) { + this.retryJob = retryJob; + this.tableDestination = tableDestination; + this.tempTables = tempTables; + this.window = window; + } + } + + private List pendingJobs = Lists.newArrayList(); + + public UpdateSchemaDestination(BigQueryServices bqServices, + PCollectionView jobIdToken, + ValueProvider loadJobProjectId) { + this.loadJobProjectId = loadJobProjectId; + this.jobIdToken = jobIdToken; + this.bqServices = bqServices; + } + + @StartBundle + public void startBundle(StartBundleContext c) { + pendingJobs.clear(); + } + + @Teardown + public void onTeardown() { + + } + + @ProcessElement + public void processElement( + @Element Iterable> element, + ProcessContext context + ) { + Multimap tempTables = ArrayListMultimap.create(); + for (KV entry : element) { + tempTables.put(entry.getKey(), entry.getValue()); + } + } + +} From cedb93bc00f44620bd6a7b328a9015242be3ebed Mon Sep 17 00:00:00 2001 From: Marco Robles Date: Wed, 13 Apr 2022 14:25:17 -0500 Subject: [PATCH 4/8] add loadjob with inputstreamcontent parameter --- .../sdk/io/gcp/bigquery/BigQueryServices.java | 9 +++ .../io/gcp/bigquery/BigQueryServicesImpl.java | 64 +++++++++++++++++++ .../sdk/io/gcp/testing/FakeJobService.java | 10 +++ 3 files changed, 83 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 927b7b3a58d3..49250f5a1aab 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import com.google.api.client.http.AbstractInputStreamContent; import com.google.api.core.ApiFuture; import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.Job; @@ -66,6 +67,14 @@ public interface JobService extends AutoCloseable { /** Start a BigQuery load job. */ void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) throws InterruptedException, IOException; + + /** Start a BigQuery load job with stream content. */ + void startLoadJob( + JobReference jobRef, + JobConfigurationLoad loadConfig, + AbstractInputStreamContent streamContent) + throws InterruptedException, IOException; + /** Start a BigQuery extract job. */ void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig) throws InterruptedException, IOException; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index dec5ae8088c2..f57fc60f5045 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -22,6 +22,7 @@ import com.google.api.client.googleapis.json.GoogleJsonError; import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; +import com.google.api.client.http.AbstractInputStreamContent; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.util.BackOff; import com.google.api.client.util.BackOffUtils; @@ -233,6 +234,28 @@ public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) startJob(job, errorExtractor, client); } + /** + * {@inheritDoc} + * + *

Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds. + * + * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. + */ + @Override + public void startLoadJob( + JobReference jobRef, JobConfigurationLoad loadConfig, AbstractInputStreamContent stream) + throws InterruptedException, IOException { + Map labelMap = new HashMap<>(); + Job job = + new Job() + .setJobReference(jobRef) + .setConfiguration( + new JobConfiguration() + .setLoad(loadConfig) + .setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap))); + startJobStream(job, stream, errorExtractor, client, Sleeper.DEFAULT, createDefaultBackoff()); + } + /** * {@inheritDoc} * @@ -335,6 +358,47 @@ static void startJob( lastException); } + static void startJobStream( + Job job, + AbstractInputStreamContent streamContent, + ApiErrorExtractor errorExtractor, + Bigquery client, + Sleeper sleeper, + BackOff backOff) + throws IOException, InterruptedException { + JobReference jobReference = job.getJobReference(); + Exception exception; + do { + try { + client + .jobs() + .insert(jobReference.getProjectId(), job, streamContent) + .setPrettyPrint(false) + .execute(); + LOG.info( + "Started BigQuery job: {}.\n{}", + jobReference, + formatBqStatusCommand(jobReference.getProjectId(), jobReference.getJobId())); + return; + } catch (IOException e) { + if (errorExtractor.itemAlreadyExists(e)) { + LOG.info( + "BigQuery job " + jobReference + " already exists, will not retry inserting it:", + e); + return; // SUCCEEDED + } + // ignore and retry + LOG.info("Failed to insert job " + jobReference + ", will retry:", e); + exception = e; + } + } while (nextBackOff(sleeper, backOff)); + throw new IOException( + String.format( + "Unable to insert job: %s, aborting after %d .", + jobReference.getJobId(), MAX_RPC_RETRIES), + exception); + } + @Override public Job pollJob(JobReference jobRef, int maxAttempts) throws InterruptedException { BackOff backoff = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java index 768ecb1ae874..8e32df10b511 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java @@ -20,6 +20,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; +import com.google.api.client.http.AbstractInputStreamContent; import com.google.api.client.json.JsonFactory; import com.google.api.client.util.BackOff; import com.google.api.client.util.BackOffUtils; @@ -177,6 +178,15 @@ public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) } } + @Override + public void startLoadJob( + JobReference jobRef, + JobConfigurationLoad loadConfig, + AbstractInputStreamContent streamContent) + throws InterruptedException, IOException { + // TODO + } + @Override public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig) throws IOException { From a67b49184be19042630a6110a660c1b961cae1a7 Mon Sep 17 00:00:00 2001 From: Marco Robles Date: Wed, 13 Apr 2022 14:26:24 -0500 Subject: [PATCH 5/8] add updateschemadest with zeroloadjob before copying to dest --- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 57 ++- .../gcp/bigquery/UpdateSchemaDestination.java | 363 +++++++++++++++--- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 146 +------ 3 files changed, 361 insertions(+), 205 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 98310ebb8e14..28eea71bdbf8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -357,6 +357,10 @@ private WriteResult expandTriggered(PCollection> inpu PCollection> tempTables = writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView); + List> sideInputsForUpdateSchema = + Lists.newArrayList(tempLoadJobIdPrefixView); + sideInputsForUpdateSchema.addAll(dynamicDestinations.getSideInputs()); + PCollection successfulMultiPartitionWrites = tempTables // Now that the load job has happened, we want the rename to happen immediately. @@ -368,6 +372,23 @@ private WriteResult expandTriggered(PCollection> inpu .setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder())) .apply("GroupByKey", GroupByKey.create()) .apply("Extract Values", Values.create()) + .apply( + ParDo.of( + new UpdateSchemaDestination( + bigQueryServices, + tempLoadJobIdPrefixView, + loadJobProjectId, + WriteDisposition.WRITE_APPEND, + CreateDisposition.CREATE_NEVER, + maxRetryJobs, + ignoreUnknownValues, + kmsKey, + rowWriterFactory.getSourceFormat(), + useAvroLogicalTypes, + schemaUpdateOptions, + tempDataset, + dynamicDestinations)) + .withSideInputs(sideInputsForUpdateSchema)) .apply( "WriteRenameTriggered", ParDo.of( @@ -444,9 +465,30 @@ public WriteResult expandUntriggered(PCollection> inp PCollection successfulSinglePartitionWrites = writeSinglePartition(partitions.get(singlePartitionTag), loadJobIdPrefixView); + List> sideInputsForUpdateSchema = + Lists.newArrayList(tempLoadJobIdPrefixView); + sideInputsForUpdateSchema.addAll(dynamicDestinations.getSideInputs()); + PCollection successfulMultiPartitionWrites = writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView) .apply("ReifyRenameInput", new ReifyAsIterable<>()) + .apply( + ParDo.of( + new UpdateSchemaDestination( + bigQueryServices, + tempLoadJobIdPrefixView, + loadJobProjectId, + WriteDisposition.WRITE_APPEND, + CreateDisposition.CREATE_NEVER, + maxRetryJobs, + ignoreUnknownValues, + kmsKey, + rowWriterFactory.getSourceFormat(), + useAvroLogicalTypes, + schemaUpdateOptions, + tempDataset, + dynamicDestinations)) + .withSideInputs(sideInputsForUpdateSchema)) .apply( "WriteRenameUntriggered", ParDo.of( @@ -679,17 +721,6 @@ private PCollection> writeTempTables( ShardedKeyCoder.of(NullableCoder.of(destinationCoder)), WritePartition.ResultCoder.INSTANCE); - // If the final destination table exists already (and we're appending to it), then the temp - // tables must exactly match schema, partitioning, etc. Wrap the DynamicDestinations object - // with one that makes this happen. - @SuppressWarnings("unchecked") - DynamicDestinations destinations = dynamicDestinations; - if (createDisposition.equals(CreateDisposition.CREATE_IF_NEEDED) - || createDisposition.equals(CreateDisposition.CREATE_NEVER)) { - destinations = - DynamicDestinationsHelpers.matchTableDynamicDestinations(destinations, bigQueryServices); - } - Coder tableDestinationCoder = clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of(); @@ -711,7 +742,7 @@ private PCollection> writeTempTables( WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, sideInputs, - destinations, + dynamicDestinations, loadJobProjectId, maxRetryJobs, ignoreUnknownValues, @@ -720,7 +751,7 @@ private PCollection> writeTempTables( useAvroLogicalTypes, // Note that we can't pass through the schema update options when creating temporary // tables. They also shouldn't be needed. See BEAM-12482 for additional details. - Collections.emptySet(), + schemaUpdateOptions, tempDataset)) .setCoder(KvCoder.of(tableDestinationCoder, WriteTables.ResultCoder.INSTANCE)); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java index 2fa479aba45e..7b0c1a3ba50d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java @@ -1,76 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.io.gcp.bigquery; +import com.google.api.client.http.ByteArrayContent; +import com.google.api.services.bigquery.model.Clustering; +import com.google.api.services.bigquery.model.EncryptionConfiguration; +import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.api.services.bigquery.model.TimePartitioning; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; -import java.util.List; +@SuppressWarnings({"UnusedVariable", "rawtypes"}) +public class UpdateSchemaDestination + extends DoFn< + Iterable>, + Iterable>> { + + private static final Logger LOG = LoggerFactory.getLogger(UpdateSchemaDestination.class); + private final BigQueryServices bqServices; + private final PCollectionView loadJobIdPrefixView; + private final ValueProvider loadJobProjectId; + private transient @Nullable DatasetService datasetService; -public class UpdateSchemaDestination extends DoFn>, TableDestination> { - - private static final Logger LOG = LoggerFactory.getLogger(UpdateSchemaDestination.class); - private final BigQueryServices bqServices; - private final PCollectionView jobIdToken; - private final ValueProvider loadJobProjectId; - private transient @Nullable DatasetService datasetService; - - private static class PendingJobData { - final BigQueryHelpers.PendingJob retryJob; - final TableDestination tableDestination; - final List tempTables; - final BoundedWindow window; - - public PendingJobData( - BigQueryHelpers.PendingJob retryJob, - TableDestination tableDestination, - List tempTables, - BoundedWindow window) { - this.retryJob = retryJob; - this.tableDestination = tableDestination; - this.tempTables = tempTables; - this.window = window; - } + private final int maxRetryJobs; + private final @Nullable String kmsKey; + private final String sourceFormat; + private final boolean useAvroLogicalTypes; + private @Nullable BigQueryServices.JobService jobService; + private final @Nullable String tempDataset; + private final boolean ignoreUnknownValues; + private final Set schemaUpdateOptions; + private BigQueryIO.Write.WriteDisposition writeDisposition; + private BigQueryIO.Write.CreateDisposition createDisposition; + private DynamicDestinations dynamicDestinations; + + private static class PendingJobData { + final BigQueryHelpers.PendingJob retryJob; + final TableDestination tableDestination; + final BoundedWindow window; + + public PendingJobData( + BigQueryHelpers.PendingJob retryJob, + TableDestination tableDestination, + BoundedWindow window) { + this.retryJob = retryJob; + this.tableDestination = tableDestination; + this.window = window; } + } + + private List pendingJobs = Lists.newArrayList(); + + public UpdateSchemaDestination( + BigQueryServices bqServices, + PCollectionView loadJobIdPrefixView, + @Nullable ValueProvider loadJobProjectId, + BigQueryIO.Write.WriteDisposition writeDisposition, + BigQueryIO.Write.CreateDisposition createDisposition, + int maxRetryJobs, + boolean ignoreUnknownValues, + String kmsKey, + String sourceFormat, + boolean useAvroLogicalTypes, + Set schemaUpdateOptions, + @Nullable String tempDataset, + DynamicDestinations dynamicDestinations) { + this.loadJobProjectId = loadJobProjectId; + this.loadJobIdPrefixView = loadJobIdPrefixView; + this.bqServices = bqServices; + this.maxRetryJobs = maxRetryJobs; + this.ignoreUnknownValues = ignoreUnknownValues; + this.kmsKey = kmsKey; + this.sourceFormat = sourceFormat; + this.useAvroLogicalTypes = useAvroLogicalTypes; + this.schemaUpdateOptions = schemaUpdateOptions; + this.tempDataset = tempDataset; + this.createDisposition = createDisposition; + this.writeDisposition = writeDisposition; + this.dynamicDestinations = dynamicDestinations; + } - private List pendingJobs = Lists.newArrayList(); + @StartBundle + public void startBundle(StartBundleContext c) { + pendingJobs.clear(); + } - public UpdateSchemaDestination(BigQueryServices bqServices, - PCollectionView jobIdToken, - ValueProvider loadJobProjectId) { - this.loadJobProjectId = loadJobProjectId; - this.jobIdToken = jobIdToken; - this.bqServices = bqServices; + @ProcessElement + public void processElement( + @Element Iterable> element, + ProcessContext context, + BoundedWindow window) + throws IOException { + Object destination = null; + for (KV entry : element) { + destination = entry.getKey(); + if (destination != null) { + break; + } } + if (destination != null) { + TableDestination tableDestination = dynamicDestinations.getTable(destination); + TableSchema schema = dynamicDestinations.getSchema(destination); + TableReference tableReference = tableDestination.getTableReference(); + String jobIdPrefix = + BigQueryResourceNaming.createJobIdWithDestination( + context.sideInput(loadJobIdPrefixView), + tableDestination, + 1, + context.pane().getIndex()); + jobIdPrefix += "_schemaUpdateDestination"; + BigQueryHelpers.PendingJob updateSchemaDestinationJob = + startZeroLoadJob( + getJobService(context.getPipelineOptions().as(BigQueryOptions.class)), + getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class)), + jobIdPrefix, + tableReference, + tableDestination.getTimePartitioning(), + tableDestination.getClustering(), + schema, + BigQueryIO.Write.WriteDisposition.WRITE_APPEND, + BigQueryIO.Write.CreateDisposition.CREATE_NEVER, + schemaUpdateOptions); + if (updateSchemaDestinationJob != null) { + pendingJobs.add(new PendingJobData(updateSchemaDestinationJob, tableDestination, window)); + } + context.output(element); + } + } - @StartBundle - public void startBundle(StartBundleContext c) { - pendingJobs.clear(); + @Teardown + public void onTeardown() { + try { + if (datasetService != null) { + datasetService.close(); + datasetService = null; + } + if (jobService != null) { + jobService.close(); + jobService = null; + } + } catch (Exception e) { + throw new RuntimeException(e); } + } - @Teardown - public void onTeardown() { + @FinishBundle + public void finishBundle(FinishBundleContext context) throws Exception { + DatasetService datasetService = + getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class)); + BigQueryHelpers.PendingJobManager jobManager = new BigQueryHelpers.PendingJobManager(); + for (final PendingJobData pendingJobData : pendingJobs) { + jobManager = + jobManager.addPendingJob( + pendingJobData.retryJob, + j -> { + try { + if (pendingJobData.tableDestination.getTableDescription() != null) { + TableReference ref = pendingJobData.tableDestination.getTableReference(); + datasetService.patchTableDescription( + ref.clone() + .setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())), + pendingJobData.tableDestination.getTableDescription()); + } + return null; + } catch (IOException | InterruptedException e) { + return e; + } + }); + } + jobManager.waitForDone(); + } + private BigQueryHelpers.PendingJob startZeroLoadJob( + BigQueryServices.JobService jobService, + DatasetService datasetService, + String jobIdPrefix, + TableReference tableReference, + TimePartitioning timePartitioning, + Clustering clustering, + @Nullable TableSchema schema, + BigQueryIO.Write.WriteDisposition writeDisposition, + BigQueryIO.Write.CreateDisposition createDisposition, + Set schemaUpdateOptions) { + JobConfigurationLoad loadConfig = + new JobConfigurationLoad() + .setDestinationTable(tableReference) + .setSchema(schema) + .setWriteDisposition(writeDisposition.name()) + .setCreateDisposition(createDisposition.name()) + .setSourceFormat(sourceFormat) + .setIgnoreUnknownValues(ignoreUnknownValues) + .setUseAvroLogicalTypes(useAvroLogicalTypes); + if (schemaUpdateOptions != null) { + List options = + schemaUpdateOptions.stream() + .map(Enum::name) + .collect(Collectors.toList()); + loadConfig.setSchemaUpdateOptions(options); + } + if (!loadConfig + .getWriteDisposition() + .equals(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE.toString()) + && !loadConfig + .getWriteDisposition() + .equals(BigQueryIO.Write.WriteDisposition.WRITE_APPEND.toString())) { + return null; + } + Table destinationTable = null; + try { + destinationTable = datasetService.getTable(tableReference); + if (destinationTable == null) { + return null; // no need to update schema ahead if table does not exists + } + } catch (IOException | InterruptedException e) { + LOG.warn("Failed to get table {} with {}", tableReference, e.toString()); + throw new RuntimeException(e); + } + if (destinationTable.getSchema().equals(schema)) { + return null; // no need to update schema ahead if schema is already the same } + if (timePartitioning != null) { + loadConfig.setTimePartitioning(timePartitioning); + // only set clustering if timePartitioning is set + if (clustering != null) { + loadConfig.setClustering(clustering); + } + } + if (kmsKey != null) { + loadConfig.setDestinationEncryptionConfiguration( + new EncryptionConfiguration().setKmsKeyName(kmsKey)); + } + String projectId = + loadJobProjectId == null || loadJobProjectId.get() == null + ? tableReference.getProjectId() + : loadJobProjectId.get(); + String bqLocation = + BigQueryHelpers.getDatasetLocation( + datasetService, tableReference.getProjectId(), tableReference.getDatasetId()); - @ProcessElement - public void processElement( - @Element Iterable> element, - ProcessContext context - ) { - Multimap tempTables = ArrayListMultimap.create(); - for (KV entry : element) { - tempTables.put(entry.getKey(), entry.getValue()); - } + BigQueryHelpers.PendingJob retryJob = + new BigQueryHelpers.PendingJob( + // Function to load the data. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + LOG.info( + "Loading zero rows using job {}, job id {} iteration {}", + tableReference, + jobRef, + jobId.getRetryIndex()); + try { + jobService.startLoadJob( + jobRef, loadConfig, new ByteArrayContent("text/plain", new byte[0])); + } catch (IOException | InterruptedException e) { + LOG.warn("Load job {} failed with {}", jobRef, e.toString()); + throw new RuntimeException(e); + } + return null; + }, + // Function to poll the result of a load job. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + try { + return jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + // Function to lookup a job. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + try { + return jobService.getJob(jobRef); + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); + } + }, + maxRetryJobs, + jobIdPrefix); + return retryJob; + } + + private BigQueryServices.JobService getJobService(PipelineOptions pipelineOptions) + throws IOException { + if (jobService == null) { + jobService = bqServices.getJobService(pipelineOptions.as(BigQueryOptions.class)); } + return jobService; + } + private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException { + if (datasetService == null) { + datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class)); + } + return datasetService; + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index c59f00ed60fa..f30388b523cc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -23,7 +23,6 @@ import com.google.api.services.bigquery.model.EncryptionConfiguration; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobReference; -import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; @@ -31,7 +30,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -271,23 +269,10 @@ public void processElement( } else if (tempTable) { // In this case, we are writing to a temp table and always need to create it. // WRITE_TRUNCATE is set so that we properly handle retries of this pane. - writeDisposition = WriteDisposition.WRITE_TRUNCATE; + writeDisposition = WriteDisposition.WRITE_APPEND; createDisposition = CreateDisposition.CREATE_IF_NEEDED; } - BigQueryHelpers.PendingJob schemaJob = - startZeroLoadJob( - getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), - getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), - jobIdPrefix, - tableReference, - tableDestination.getTimePartitioning(), - tableDestination.getClustering(), - tableSchema, - writeDisposition, - createDisposition, - schemaUpdateOptions); - BigQueryHelpers.PendingJob retryJob = startLoad( getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), @@ -301,16 +286,7 @@ public void processElement( writeDisposition, createDisposition, schemaUpdateOptions); - if (schemaJob != null) { - pendingJobs.add( - new PendingJobData( - window, - schemaJob, - partitionFiles, - tableDestination, - tableReference, - element.getValue().isFirstPane())); - } + pendingJobs.add( new PendingJobData( window, @@ -379,7 +355,6 @@ public void finishBundle(FinishBundleContext c) throws Exception { BigQueryHelpers.stripPartitionDecorator(ref.getTableId())), pendingJob.tableDestination.getTableDescription()); } - Result result = new AutoValue_WriteTables_Result( BigQueryHelpers.toJsonString(pendingJob.tableReference), @@ -476,6 +451,7 @@ public PCollection> expand( .apply(GroupByKey.create()) .apply(Values.create()) .apply(ParDo.of(new GarbageCollectTemporaryFiles())); + return writeTablesOutputs.get(mainOutputTag); } @@ -579,122 +555,6 @@ private PendingJob startLoad( return retryJob; } - private PendingJob startZeroLoadJob( - JobService jobService, - DatasetService datasetService, - String jobIdPrefix, - TableReference ref, - TimePartitioning timePartitioning, - Clustering clustering, - @Nullable TableSchema schema, - WriteDisposition writeDisposition, - CreateDisposition createDisposition, - Set schemaUpdateOptions) { - JobConfigurationLoad loadConfig = - new JobConfigurationLoad() - .setDestinationTable(ref) - .setSchema(schema) - .setSourceUris(Collections.EMPTY_LIST) - .setWriteDisposition(writeDisposition.name()) - .setCreateDisposition(createDisposition.name()) - .setSourceFormat(sourceFormat) - .setIgnoreUnknownValues(ignoreUnknownValues) - .setUseAvroLogicalTypes(useAvroLogicalTypes); - if (schemaUpdateOptions != null) { - List options = - schemaUpdateOptions.stream() - .map(Enum::name) - .collect(Collectors.toList()); - loadConfig.setSchemaUpdateOptions(options); - } - if (!loadConfig.getWriteDisposition().equals(WriteDisposition.WRITE_TRUNCATE.toString()) - || !loadConfig.getWriteDisposition().equals(WriteDisposition.WRITE_APPEND.toString())) { - return null; - } - Table destinationTable = null; - try { - destinationTable = datasetService.getTable(ref); - if (destinationTable == null) { - return null; // no need to update schema ahead if table does not exists - } - } catch (IOException | InterruptedException e) { - LOG.warn("Failed to get table {} with {}", ref, e.toString()); - throw new RuntimeException(e); - } - if (destinationTable.getSchema().equals(schema)) { - return null; // no need to update schema ahead if schema is already the same - } - if (timePartitioning != null) { - loadConfig.setTimePartitioning(timePartitioning); - // only set clustering if timePartitioning is set - if (clustering != null) { - loadConfig.setClustering(clustering); - } - } - if (kmsKey != null) { - loadConfig.setDestinationEncryptionConfiguration( - new EncryptionConfiguration().setKmsKeyName(kmsKey)); - } - String projectId = - loadJobProjectId == null || loadJobProjectId.get() == null - ? ref.getProjectId() - : loadJobProjectId.get(); - String bqLocation = - BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), ref.getDatasetId()); - - PendingJob retryJob = - new PendingJob( - // Function to load the data. - jobId -> { - JobReference jobRef = - new JobReference() - .setProjectId(projectId) - .setJobId(jobId.getJobId()) - .setLocation(bqLocation); - LOG.info( - "Loading zero rows using job {}, job id {} iteration {}", - ref, - jobRef, - jobId.getRetryIndex()); - try { - jobService.startLoadJob(jobRef, loadConfig); - } catch (IOException | InterruptedException e) { - LOG.warn("Load job {} failed with {}", jobRef, e.toString()); - throw new RuntimeException(e); - } - return null; - }, - // Function to poll the result of a load job. - jobId -> { - JobReference jobRef = - new JobReference() - .setProjectId(projectId) - .setJobId(jobId.getJobId()) - .setLocation(bqLocation); - try { - return jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }, - // Function to lookup a job. - jobId -> { - JobReference jobRef = - new JobReference() - .setProjectId(projectId) - .setJobId(jobId.getJobId()) - .setLocation(bqLocation); - try { - return jobService.getJob(jobRef); - } catch (InterruptedException | IOException e) { - throw new RuntimeException(e); - } - }, - maxRetryJobs, - jobIdPrefix); - return retryJob; - } - static void removeTemporaryFiles(Iterable files) throws IOException { ImmutableList.Builder fileResources = ImmutableList.builder(); for (String file : files) { From 13184636a43397d92a9c00f5b632acec30f103f9 Mon Sep 17 00:00:00 2001 From: Marco Robles Date: Wed, 13 Apr 2022 14:26:45 -0500 Subject: [PATCH 6/8] add test for write temp tables --- .../bigquery/BigQuerySchemaUpdateOptionsIT.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java index 72831e4c5bef..dd9ab1508f3f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java @@ -28,6 +28,7 @@ import java.security.SecureRandom; import java.util.Arrays; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -239,19 +240,23 @@ public void runWriteTestTempTables() throws Exception { .setMode("REQUIRED"))); String[] values = {"meow", "bark"}; - TableRow rowToInsert = - new TableRow().set("new_field", values[0]).set("required_field", values[1]); String testQuery = String.format( "SELECT new_field, required_field FROM [%s.%s];", BIG_QUERY_DATASET_ID, tableName); - List> expectedResult = Arrays.asList(Arrays.asList(values)); + List> expectedResult = + Arrays.asList(Arrays.asList(values[0], values[1]), Arrays.asList(values[1], values[0])); + Options options = TestPipeline.testingPipelineOptions().as(Options.class); options.setTempLocation(options.getTempRoot() + "/bq_it_temp"); Pipeline p = Pipeline.create(options); - Create.Values input = Create.of(rowToInsert); + Create.Values input = + Create.of( + Arrays.asList( + new TableRow().set("new_field", values[0]).set("required_field", values[1]), + new TableRow().set("new_field", values[1]).set("required_field", values[0]))); Write writer = BigQueryIO.writeTableRows() @@ -277,6 +282,6 @@ public void runWriteTestTempTables() throws Exception { .collect(Collectors.toList())) .collect(Collectors.toList()); - assertEquals(expectedResult, result); + assertEquals(new HashSet<>(expectedResult), new HashSet<>(result)); } } From f7f9a1fd3d667c06e1887b33e1b191ec5ffd23c0 Mon Sep 17 00:00:00 2001 From: Marco Robles Date: Wed, 13 Apr 2022 16:40:46 -0500 Subject: [PATCH 7/8] add nullness supress warning --- .../beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java index 7b0c1a3ba50d..90b806f66414 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java @@ -42,7 +42,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@SuppressWarnings({"UnusedVariable", "rawtypes"}) +@SuppressWarnings({"nullness", "rawtypes"}) public class UpdateSchemaDestination extends DoFn< Iterable>, From 450e0a6e71a3cd40a6743885f46379c11d69717d Mon Sep 17 00:00:00 2001 From: Marco Robles Date: Wed, 13 Apr 2022 17:01:41 -0500 Subject: [PATCH 8/8] remove unnecesary variable and use global variable for updateschemadestination-dofn --- .../org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java | 2 -- .../beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java | 7 ++----- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 28eea71bdbf8..fc2b727ec79c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -386,7 +386,6 @@ private WriteResult expandTriggered(PCollection> inpu rowWriterFactory.getSourceFormat(), useAvroLogicalTypes, schemaUpdateOptions, - tempDataset, dynamicDestinations)) .withSideInputs(sideInputsForUpdateSchema)) .apply( @@ -486,7 +485,6 @@ public WriteResult expandUntriggered(PCollection> inp rowWriterFactory.getSourceFormat(), useAvroLogicalTypes, schemaUpdateOptions, - tempDataset, dynamicDestinations)) .withSideInputs(sideInputsForUpdateSchema)) .apply( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java index 90b806f66414..4ae1064bc431 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java @@ -59,7 +59,6 @@ public class UpdateSchemaDestination private final String sourceFormat; private final boolean useAvroLogicalTypes; private @Nullable BigQueryServices.JobService jobService; - private final @Nullable String tempDataset; private final boolean ignoreUnknownValues; private final Set schemaUpdateOptions; private BigQueryIO.Write.WriteDisposition writeDisposition; @@ -95,7 +94,6 @@ public UpdateSchemaDestination( String sourceFormat, boolean useAvroLogicalTypes, Set schemaUpdateOptions, - @Nullable String tempDataset, DynamicDestinations dynamicDestinations) { this.loadJobProjectId = loadJobProjectId; this.loadJobIdPrefixView = loadJobIdPrefixView; @@ -106,7 +104,6 @@ public UpdateSchemaDestination( this.sourceFormat = sourceFormat; this.useAvroLogicalTypes = useAvroLogicalTypes; this.schemaUpdateOptions = schemaUpdateOptions; - this.tempDataset = tempDataset; this.createDisposition = createDisposition; this.writeDisposition = writeDisposition; this.dynamicDestinations = dynamicDestinations; @@ -150,8 +147,8 @@ public void processElement( tableDestination.getTimePartitioning(), tableDestination.getClustering(), schema, - BigQueryIO.Write.WriteDisposition.WRITE_APPEND, - BigQueryIO.Write.CreateDisposition.CREATE_NEVER, + writeDisposition, + createDisposition, schemaUpdateOptions); if (updateSchemaDestinationJob != null) { pendingJobs.add(new PendingJobData(updateSchemaDestinationJob, tableDestination, window));