From 9eb57c1b44d8558736b6b0af2f9ee1c36dda923a Mon Sep 17 00:00:00 2001 From: Miguel Anzo Date: Mon, 20 Dec 2021 23:09:29 -0600 Subject: [PATCH 1/2] Added a zero row job to bigquery writeTable --- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 141 ++++++++++++++++++ 1 file changed, 141 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index 1637f1388bab..d337cb997669 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -23,6 +23,7 @@ import com.google.api.services.bigquery.model.EncryptionConfiguration; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; @@ -30,6 +31,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -269,6 +271,19 @@ public void processElement( createDisposition = CreateDisposition.CREATE_IF_NEEDED; } + BigQueryHelpers.PendingJob schemaJob = + startZeroLoadJob( + getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), + getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), + jobIdPrefix, + tableReference, + tableDestination.getTimePartitioning(), + tableDestination.getClustering(), + tableSchema, + writeDisposition, + createDisposition, + schemaUpdateOptions); + BigQueryHelpers.PendingJob retryJob = startLoad( getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), @@ -282,6 +297,16 @@ public void processElement( writeDisposition, createDisposition, schemaUpdateOptions); + if (schemaJob != null) { + pendingJobs.add( + new PendingJobData( + window, + schemaJob, + partitionFiles, + tableDestination, + tableReference, + element.getValue().isFirstPane())); + } pendingJobs.add( new PendingJobData( window, @@ -549,6 +574,122 @@ private PendingJob startLoad( return retryJob; } + private PendingJob startZeroLoadJob( + JobService jobService, + DatasetService datasetService, + String jobIdPrefix, + TableReference ref, + TimePartitioning timePartitioning, + Clustering clustering, + @Nullable TableSchema schema, + WriteDisposition writeDisposition, + CreateDisposition createDisposition, + Set schemaUpdateOptions) { + JobConfigurationLoad loadConfig = + new JobConfigurationLoad() + .setDestinationTable(ref) + .setSchema(schema) + .setSourceUris(Collections.EMPTY_LIST) + .setWriteDisposition(writeDisposition.name()) + .setCreateDisposition(createDisposition.name()) + .setSourceFormat(sourceFormat) + .setIgnoreUnknownValues(ignoreUnknownValues) + .setUseAvroLogicalTypes(useAvroLogicalTypes); + if (schemaUpdateOptions != null) { + List options = + schemaUpdateOptions.stream() + .map(Enum::name) + .collect(Collectors.toList()); + loadConfig.setSchemaUpdateOptions(options); + } + if (!loadConfig.getWriteDisposition().equals(WriteDisposition.WRITE_TRUNCATE.toString()) + || !loadConfig.getWriteDisposition().equals(WriteDisposition.WRITE_APPEND.toString())) { + return null; + } + Table destinationTable = null; + try { + destinationTable = datasetService.getTable(ref); + if (destinationTable == null) { + return null; // no need to update schema ahead if table does not exists + } + } catch (IOException | InterruptedException e) { + LOG.warn("Failed to get table {} with {}", ref, e.toString()); + throw new RuntimeException(e); + } + if (destinationTable.getSchema().equals(schema)) { + return null; // no need to update schema ahead if schema is already the same + } + if (timePartitioning != null) { + loadConfig.setTimePartitioning(timePartitioning); + // only set clustering if timePartitioning is set + if (clustering != null) { + loadConfig.setClustering(clustering); + } + } + if (kmsKey != null) { + loadConfig.setDestinationEncryptionConfiguration( + new EncryptionConfiguration().setKmsKeyName(kmsKey)); + } + String projectId = + loadJobProjectId == null || loadJobProjectId.get() == null + ? ref.getProjectId() + : loadJobProjectId.get(); + String bqLocation = + BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), ref.getDatasetId()); + + PendingJob retryJob = + new PendingJob( + // Function to load the data. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + LOG.info( + "Loading zero rows using job {}, job id {} iteration {}", + ref, + jobRef, + jobId.getRetryIndex()); + try { + jobService.startLoadJob(jobRef, loadConfig); + } catch (IOException | InterruptedException e) { + LOG.warn("Load job {} failed with {}", jobRef, e.toString()); + throw new RuntimeException(e); + } + return null; + }, + // Function to poll the result of a load job. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + try { + return jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + // Function to lookup a job. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + try { + return jobService.getJob(jobRef); + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); + } + }, + maxRetryJobs, + jobIdPrefix); + return retryJob; + } + static void removeTemporaryFiles(Iterable files) throws IOException { ImmutableList.Builder fileResources = ImmutableList.builder(); for (String file : files) { From ebc47fd5be8138cc7065453e5640e1a98989946c Mon Sep 17 00:00:00 2001 From: Miguel Anzo Date: Wed, 22 Dec 2021 12:05:54 -0600 Subject: [PATCH 2/2] added unit test --- .../BigQuerySchemaUpdateOptionsIT.java | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java index ed75a6688075..72831e4c5bef 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java @@ -219,4 +219,64 @@ public void testAllowFieldRelaxation() throws Exception { List> expectedResult = Arrays.asList(Arrays.asList(value)); runWriteTest(schemaUpdateOptions, tableName, newSchema, rowToInsert, testQuery, expectedResult); } + + @Test + public void runWriteTestTempTables() throws Exception { + String tableName = makeTestTable(); + + Set schemaUpdateOptions = + EnumSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION); + + TableSchema schema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("new_field").setType("STRING"), + new TableFieldSchema().setName("optional_field").setType("STRING"), + new TableFieldSchema() + .setName("required_field") + .setType("STRING") + .setMode("REQUIRED"))); + + String[] values = {"meow", "bark"}; + TableRow rowToInsert = + new TableRow().set("new_field", values[0]).set("required_field", values[1]); + + String testQuery = + String.format( + "SELECT new_field, required_field FROM [%s.%s];", BIG_QUERY_DATASET_ID, tableName); + + List> expectedResult = Arrays.asList(Arrays.asList(values)); + Options options = TestPipeline.testingPipelineOptions().as(Options.class); + options.setTempLocation(options.getTempRoot() + "/bq_it_temp"); + + Pipeline p = Pipeline.create(options); + Create.Values input = Create.of(rowToInsert); + + Write writer = + BigQueryIO.writeTableRows() + .to(String.format("%s:%s.%s", options.getProject(), BIG_QUERY_DATASET_ID, tableName)) + .withSchema(schema) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) + .withSchemaUpdateOptions(schemaUpdateOptions) + .withMaxBytesPerPartition(1) + .withMaxFilesPerPartition(1); + + p.apply(input).apply(writer); + p.run().waitUntilFinish(); + + QueryResponse response = BQ_CLIENT.queryWithRetries(testQuery, project); + + List> result = + response.getRows().stream() + .map( + row -> + row.getF().stream() + .map(cell -> cell.getV().toString()) + .collect(Collectors.toList())) + .collect(Collectors.toList()); + + assertEquals(expectedResult, result); + } }