From 2c519835fc9d48969f5f628c5bd0cc78ed79f88a Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 7 Feb 2023 14:13:38 -0500 Subject: [PATCH 1/2] Fix UpdateSchemaDestination breaking DynamicDestination in Bigquery BatchLoad * Handle dynamic table destination in UpdateSchemaDestination impl * Add ZERO_LOAD job type for schema update load * Fix BigQuerySchemaUpdateOptionsIT to actually test temp tableis scenario Rewrite BigQuerySchemaUpdateOptionsIT.runWriteTestTempTable to test dynamicDestination scenario --- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 48 +++-- .../gcp/bigquery/BigQueryResourceNaming.java | 1 + .../gcp/bigquery/UpdateSchemaDestination.java | 76 +++++--- .../BigQuerySchemaUpdateOptionsIT.java | 176 +++++++++++++----- 4 files changed, 210 insertions(+), 91 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 29ba4fe759d7..e9adc3a303f0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -141,6 +141,14 @@ class BatchLoads // the table, even if there is no data in it. private final boolean singletonTable; private final DynamicDestinations dynamicDestinations; + + /** + * destinationsWithMatching wraps the dynamicDestinations redirects the schema, partitioning, etc + * to the final destination tables, if the final destination table exists already (and we're + * appending to it). It is used in writing to temp tables and updating final table schema. + */ + private DynamicDestinations destinationsWithMatching; + private final Coder destinationCoder; private int maxNumWritersPerBundle; private long maxFileSize; @@ -179,6 +187,9 @@ class BatchLoads this.createDisposition = createDisposition; this.singletonTable = singletonTable; this.dynamicDestinations = dynamicDestinations; + this.destinationsWithMatching = + DynamicDestinationsHelpers.matchTableDynamicDestinations( + dynamicDestinations, bigQueryServices); this.destinationCoder = destinationCoder; this.maxNumWritersPerBundle = DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE; this.maxFileSize = DEFAULT_MAX_FILE_SIZE; @@ -201,6 +212,15 @@ class BatchLoads void setSchemaUpdateOptions(Set schemaUpdateOptions) { this.schemaUpdateOptions = schemaUpdateOptions; + // In the case schemaUpdateOptions are specified by the user, do not wrap dynamicDestinations + // to respect those options. + if (schemaUpdateOptions != null && !schemaUpdateOptions.isEmpty()) { + this.destinationsWithMatching = dynamicDestinations; + } else { + this.destinationsWithMatching = + DynamicDestinationsHelpers.matchTableDynamicDestinations( + dynamicDestinations, bigQueryServices); + } } void setTestServices(BigQueryServices bigQueryServices) { @@ -287,6 +307,8 @@ private WriteResult expandTriggered(PCollection> inpu final PCollectionView loadJobIdPrefixView = createJobIdPrefixView(p, JobType.LOAD); final PCollectionView tempLoadJobIdPrefixView = createJobIdPrefixView(p, JobType.TEMP_TABLE_LOAD); + final PCollectionView zeroLoadJobIdPrefixView = + createJobIdPrefixView(p, JobType.ZERO_LOAD); final PCollectionView copyJobIdPrefixView = createJobIdPrefixView(p, JobType.COPY); final PCollectionView tempFilePrefixView = createTempFilePrefixView(p, loadJobIdPrefixView); @@ -367,7 +389,7 @@ private WriteResult expandTriggered(PCollection> inpu writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView); List> sideInputsForUpdateSchema = - Lists.newArrayList(tempLoadJobIdPrefixView); + Lists.newArrayList(zeroLoadJobIdPrefixView); sideInputsForUpdateSchema.addAll(dynamicDestinations.getSideInputs()); PCollection successfulMultiPartitionWrites = @@ -385,14 +407,14 @@ private WriteResult expandTriggered(PCollection> inpu ParDo.of( new UpdateSchemaDestination( bigQueryServices, - tempLoadJobIdPrefixView, + zeroLoadJobIdPrefixView, loadJobProjectId, WriteDisposition.WRITE_APPEND, CreateDisposition.CREATE_NEVER, maxRetryJobs, kmsKey, schemaUpdateOptions, - dynamicDestinations)) + destinationsWithMatching)) .withSideInputs(sideInputsForUpdateSchema)) .apply( "WriteRenameTriggered", @@ -426,6 +448,8 @@ public WriteResult expandUntriggered(PCollection> inp final PCollectionView loadJobIdPrefixView = createJobIdPrefixView(p, JobType.LOAD); final PCollectionView tempLoadJobIdPrefixView = createJobIdPrefixView(p, JobType.TEMP_TABLE_LOAD); + final PCollectionView zeroLoadJobIdPrefixView = + createJobIdPrefixView(p, JobType.ZERO_LOAD); final PCollectionView copyJobIdPrefixView = createJobIdPrefixView(p, JobType.COPY); final PCollectionView tempFilePrefixView = createTempFilePrefixView(p, loadJobIdPrefixView); @@ -471,7 +495,7 @@ public WriteResult expandUntriggered(PCollection> inp writeSinglePartition(partitions.get(singlePartitionTag), loadJobIdPrefixView); List> sideInputsForUpdateSchema = - Lists.newArrayList(tempLoadJobIdPrefixView); + Lists.newArrayList(zeroLoadJobIdPrefixView); sideInputsForUpdateSchema.addAll(dynamicDestinations.getSideInputs()); PCollection successfulMultiPartitionWrites = @@ -481,14 +505,14 @@ public WriteResult expandUntriggered(PCollection> inp ParDo.of( new UpdateSchemaDestination( bigQueryServices, - tempLoadJobIdPrefixView, + zeroLoadJobIdPrefixView, loadJobProjectId, WriteDisposition.WRITE_APPEND, CreateDisposition.CREATE_NEVER, maxRetryJobs, kmsKey, schemaUpdateOptions, - dynamicDestinations)) + destinationsWithMatching)) .withSideInputs(sideInputsForUpdateSchema)) .apply( "WriteRenameUntriggered", @@ -728,18 +752,6 @@ private PCollection> writeTempTables( ShardedKeyCoder.of(NullableCoder.of(destinationCoder)), WritePartition.ResultCoder.INSTANCE); - // If the final destination table exists already (and we're appending to it), then the temp - // tables must exactly match schema, partitioning, etc. Wrap the DynamicDestinations object - // with one that makes this happen. - // In the case schemaUpdateOptions are specified by the user, matching does not occur in order - // to respect those options. - DynamicDestinations destinationsWithMatching = dynamicDestinations; - if (schemaUpdateOptions.isEmpty()) { - destinationsWithMatching = - DynamicDestinationsHelpers.matchTableDynamicDestinations( - dynamicDestinations, bigQueryServices); - } - // If WriteBundlesToFiles produced more than DEFAULT_MAX_FILES_PER_PARTITION files or // DEFAULT_MAX_BYTES_PER_PARTITION bytes, then // the import needs to be split into multiple partitions, and those partitions will be diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java index c2c639fec0ad..431f6c71bf31 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java @@ -68,6 +68,7 @@ static String createJobIdWithDestination( public enum JobType { LOAD, TEMP_TABLE_LOAD, + ZERO_LOAD, COPY, EXPORT, QUERY, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java index 90bd99c7d617..4d5717388313 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java @@ -27,8 +27,8 @@ import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; import java.io.IOException; -import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -41,9 +41,23 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Update destination schema based on data that is about to be copied into it. + * + *

Unlike load and query jobs, BigQuery copy jobs do not support schema field addition or + * relaxation on the destination table. This DoFn fills that gap by updating the destination table + * schemas to be compatible with the data coming from the source table so that schemaUpdateOptions + * are respected regardless of whether data is loaded directly to the destination table or loaded + * into temporary tables before being copied into the destination. + * + *

This transform takes as input a list of KV(destination, WriteTables.Result) and emits a list + * of KV(TableDestination, WriteTables.Result) where the destination label is parsed and replaced to + * TableDestination objects. + */ @SuppressWarnings({"nullness"}) public class UpdateSchemaDestination extends DoFn< @@ -52,8 +66,8 @@ public class UpdateSchemaDestination private static final Logger LOG = LoggerFactory.getLogger(UpdateSchemaDestination.class); private final BigQueryServices bqServices; - private final PCollectionView loadJobIdPrefixView; - private final ValueProvider loadJobProjectId; + private final PCollectionView zeroLoadJobIdPrefixView; + private final @Nullable ValueProvider loadJobProjectId; private transient @Nullable DatasetService datasetService; private final int maxRetryJobs; private final @Nullable String kmsKey; @@ -78,11 +92,11 @@ public PendingJobData( } } - private final List pendingJobs = Lists.newArrayList(); + private final Map pendingJobs = Maps.newHashMap(); public UpdateSchemaDestination( BigQueryServices bqServices, - PCollectionView loadJobIdPrefixView, + PCollectionView zeroLoadJobIdPrefixView, @Nullable ValueProvider loadJobProjectId, BigQueryIO.Write.WriteDisposition writeDisposition, BigQueryIO.Write.CreateDisposition createDisposition, @@ -91,7 +105,7 @@ public UpdateSchemaDestination( Set schemaUpdateOptions, DynamicDestinations dynamicDestinations) { this.loadJobProjectId = loadJobProjectId; - this.loadJobIdPrefixView = loadJobIdPrefixView; + this.zeroLoadJobIdPrefixView = zeroLoadJobIdPrefixView; this.bqServices = bqServices; this.maxRetryJobs = maxRetryJobs; this.kmsKey = kmsKey; @@ -106,7 +120,13 @@ public void startBundle(StartBundleContext c) { pendingJobs.clear(); } - TableDestination getTableWithDefaultProject(DestinationT destination, BigQueryOptions options) { + TableDestination getTableWithDefaultProject(DestinationT destination) { + if (dynamicDestinations.getPipelineOptions() == null) { + throw new IllegalStateException( + "Unexpected null pipeline option for DynamicDestination object. " + + "Need to call setSideInputAccessorFromProcessContext(context) before use it."); + } + BigQueryOptions options = dynamicDestinations.getPipelineOptions().as(BigQueryOptions.class); TableDestination tableDestination = dynamicDestinations.getTable(destination); TableReference tableReference = tableDestination.getTableReference(); @@ -127,25 +147,24 @@ public void processElement( ProcessContext context, BoundedWindow window) throws IOException { - DestinationT destination = null; - BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class); + dynamicDestinations.setSideInputAccessorFromProcessContext(context); + List> outputs = Lists.newArrayList(); for (KV entry : element) { - destination = entry.getKey(); - if (destination != null) { - break; + DestinationT destination = entry.getKey(); + TableDestination tableDestination = getTableWithDefaultProject(destination); + outputs.add(KV.of(tableDestination, entry.getValue())); + if (pendingJobs.containsKey(destination)) { + // zero load job for this destination is already set + continue; } - } - if (destination != null) { - TableDestination tableDestination = getTableWithDefaultProject(destination, options); TableSchema schema = dynamicDestinations.getSchema(destination); TableReference tableReference = tableDestination.getTableReference(); String jobIdPrefix = BigQueryResourceNaming.createJobIdWithDestination( - context.sideInput(loadJobIdPrefixView), + context.sideInput(zeroLoadJobIdPrefixView), tableDestination, 1, context.pane().getIndex()); - jobIdPrefix += "_schemaUpdateDestination"; BigQueryHelpers.PendingJob updateSchemaDestinationJob = startZeroLoadJob( getJobService(context.getPipelineOptions().as(BigQueryOptions.class)), @@ -159,15 +178,17 @@ public void processElement( createDisposition, schemaUpdateOptions); if (updateSchemaDestinationJob != null) { - pendingJobs.add(new PendingJobData(updateSchemaDestinationJob, tableDestination, window)); + pendingJobs.put( + destination, new PendingJobData(updateSchemaDestinationJob, tableDestination, window)); } } - List> tableDestinations = new ArrayList<>(); - for (KV entry : element) { - tableDestinations.add( - KV.of(getTableWithDefaultProject(destination, options), entry.getValue())); + if (!pendingJobs.isEmpty()) { + LOG.info( + "Added {} pending jobs to update the schema for each destination before copying {} temp tables.", + pendingJobs.size(), + outputs.size()); } - context.output(tableDestinations); + context.output(outputs); } @Teardown @@ -191,7 +212,7 @@ public void finishBundle(FinishBundleContext context) throws Exception { DatasetService datasetService = getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class)); BigQueryHelpers.PendingJobManager jobManager = new BigQueryHelpers.PendingJobManager(); - for (final PendingJobData pendingJobData : pendingJobs) { + for (final PendingJobData pendingJobData : pendingJobs.values()) { jobManager = jobManager.addPendingJob( pendingJobData.retryJob, @@ -204,10 +225,10 @@ public void finishBundle(FinishBundleContext context) throws Exception { .setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())), pendingJobData.tableDestination.getTableDescription()); } - return null; } catch (IOException | InterruptedException e) { return e; } + return null; }); } jobManager.waitForDone(); @@ -337,15 +358,14 @@ private BigQueryHelpers.PendingJob startZeroLoadJob( return retryJob; } - private BigQueryServices.JobService getJobService(PipelineOptions pipelineOptions) - throws IOException { + private BigQueryServices.JobService getJobService(PipelineOptions pipelineOptions) { if (jobService == null) { jobService = bqServices.getJobService(pipelineOptions.as(BigQueryOptions.class)); } return jobService; } - private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException { + private DatasetService getDatasetService(PipelineOptions pipelineOptions) { if (datasetService == null) { datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class)); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java index dd9ab1508f3f..bd5a2762012f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java @@ -25,22 +25,30 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import java.io.IOException; import java.security.SecureRandom; +import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.SchemaUpdateOption; import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; +import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -152,7 +160,8 @@ private void runWriteTest( .withSchemaUpdateOptions(schemaUpdateOptions); p.apply(input).apply(writer); - p.run().waitUntilFinish(); + PipelineResult.State state = p.run().waitUntilFinish(); + assertEquals(PipelineResult.State.DONE, state); QueryResponse response = BQ_CLIENT.queryWithRetries(testQuery, project); @@ -222,66 +231,143 @@ public void testAllowFieldRelaxation() throws Exception { } @Test - public void runWriteTestTempTables() throws Exception { - String tableName = makeTestTable(); + public void runWriteTestTempTableAndDynamicDestination() throws Exception { + + final int numPerAnimal = 10; + + String tableNameCat = makeTestTable(); + String tableNameDog = makeTestTable(); + + WriteToBqDynamic dynamicDestination = + new WriteToBqDynamic(project, BIG_QUERY_DATASET_ID, tableNameCat, tableNameDog); Set schemaUpdateOptions = EnumSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION); - TableSchema schema = - new TableSchema() - .setFields( - ImmutableList.of( - new TableFieldSchema().setName("new_field").setType("STRING"), - new TableFieldSchema().setName("optional_field").setType("STRING"), - new TableFieldSchema() - .setName("required_field") - .setType("STRING") - .setMode("REQUIRED"))); - String[] values = {"meow", "bark"}; - String testQuery = - String.format( - "SELECT new_field, required_field FROM [%s.%s];", BIG_QUERY_DATASET_ID, tableName); + List> expectedCat = new ArrayList<>(); + List> expectedDog = new ArrayList<>(); + List inputRows = new ArrayList<>(); - List> expectedResult = - Arrays.asList(Arrays.asList(values[0], values[1]), Arrays.asList(values[1], values[0])); + for (int i = 0; i < numPerAnimal; ++i) { + expectedCat.add(ImmutableSet.of(values[0], String.valueOf(i))); + expectedDog.add(ImmutableSet.of(values[1], String.valueOf(i))); + // cat and dog tables have different schema + inputRows.add( + new TableRow().set("required_field", values[0]).set("cat_new_field", String.valueOf(i))); + inputRows.add(new TableRow().set("required_field", values[1]).set("dog_new_field", (long) i)); + } Options options = TestPipeline.testingPipelineOptions().as(Options.class); options.setTempLocation(options.getTempRoot() + "/bq_it_temp"); Pipeline p = Pipeline.create(options); - Create.Values input = - Create.of( - Arrays.asList( - new TableRow().set("new_field", values[0]).set("required_field", values[1]), - new TableRow().set("new_field", values[1]).set("required_field", values[0]))); - Write writer = - BigQueryIO.writeTableRows() - .to(String.format("%s:%s.%s", options.getProject(), BIG_QUERY_DATASET_ID, tableName)) - .withSchema(schema) - .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) - .withSchemaUpdateOptions(schemaUpdateOptions) - .withMaxBytesPerPartition(1) - .withMaxFilesPerPartition(1); + p.apply(Create.of(inputRows)) + .apply( + BigQueryIO.writeTableRows() + .to(dynamicDestination) + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) + .withSchemaUpdateOptions(schemaUpdateOptions) + .withMaxFileSize(10) + .withMaxFilesPerPartition(2)); + PipelineResult.State state = p.run().waitUntilFinish(); + assertEquals(PipelineResult.State.DONE, state); + + String testCatQuery = + String.format( + "SELECT cat_new_field, required_field FROM [%s.%s];", + BIG_QUERY_DATASET_ID, tableNameCat); + String testDogQuery = + String.format( + "SELECT dog_new_field, required_field FROM [%s.%s];", + BIG_QUERY_DATASET_ID, tableNameDog); - p.apply(input).apply(writer); - p.run().waitUntilFinish(); + List> catResult = runQuery(testCatQuery); + assertEquals(new HashSet<>(expectedCat), new HashSet<>(catResult)); + List> dogResult = runQuery(testDogQuery); + assertEquals(new HashSet<>(expectedDog), new HashSet<>(dogResult)); + } - QueryResponse response = BQ_CLIENT.queryWithRetries(testQuery, project); + /** Run a query and return result as a list of records with each having a list of values. */ + List> runQuery(String query) { + QueryResponse response; + try { + response = BQ_CLIENT.queryWithRetries(query, project); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + return response.getRows().stream() + .map( + row -> + row.getF().stream().map(cell -> cell.getV().toString()).collect(Collectors.toSet())) + .collect(Collectors.toList()); + } - List> result = - response.getRows().stream() - .map( - row -> - row.getF().stream() - .map(cell -> cell.getV().toString()) - .collect(Collectors.toList())) - .collect(Collectors.toList()); + static class WriteToBqDynamic extends DynamicDestinations { + private final String projectId; + private final String dataSetId; + private final String catTable; + private final String dogTable; + + public WriteToBqDynamic(String projectId, String dataSetId, String catTable, String dogTable) { + this.projectId = projectId; + this.dataSetId = dataSetId; + this.catTable = catTable; + this.dogTable = dogTable; + } + + private static final TableSchema CAT_SCHEMA = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("cat_new_field").setType("STRING"), + new TableFieldSchema().setName("optional_field").setType("STRING"), + new TableFieldSchema() + .setName("required_field") + .setType("STRING") + .setMode("REQUIRED"))); + + public static final TableSchema DOG_SCHEMA = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("dog_new_field").setType("INT64"), + new TableFieldSchema().setName("optional_field").setType("STRING"), + new TableFieldSchema() + .setName("required_field") + .setType("STRING") + .setMode("REQUIRED"))); - assertEquals(new HashSet<>(expectedResult), new HashSet<>(result)); + @Override + public String getDestination(@Nullable ValueInSingleWindow element) { + assert element != null; + String sound = (String) Objects.requireNonNull(element.getValue()).get("required_field"); + if (Objects.equals(sound, "meow")) { + return "cat"; + } else if (Objects.equals(sound, "bark")) { + return "dog"; + } else { + throw new IllegalArgumentException("Unknown sound: " + sound); + } + } + + @Override + public TableDestination getTable(String destination) { + String tableId = Objects.equals(destination, "cat") ? catTable : dogTable; + String reference = String.format("%s:%s.%s", projectId, dataSetId, tableId); + return new TableDestination(reference, destination); + } + + @Override + public @Nullable TableSchema getSchema(String destination) { + if (Objects.equals(destination, "cat")) { + return CAT_SCHEMA; + } else { + return DOG_SCHEMA; + } + } } } From af43ab5134c9c144c907a6b6373659e065c32d41 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 10 Feb 2023 12:35:17 -0500 Subject: [PATCH 2/2] Rename ZERO_LOAD -> SCHEMA_UPDATE --- .../java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java | 4 ++-- .../beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index e9adc3a303f0..9ba2a83d7b6d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -308,7 +308,7 @@ private WriteResult expandTriggered(PCollection> inpu final PCollectionView tempLoadJobIdPrefixView = createJobIdPrefixView(p, JobType.TEMP_TABLE_LOAD); final PCollectionView zeroLoadJobIdPrefixView = - createJobIdPrefixView(p, JobType.ZERO_LOAD); + createJobIdPrefixView(p, JobType.SCHEMA_UPDATE); final PCollectionView copyJobIdPrefixView = createJobIdPrefixView(p, JobType.COPY); final PCollectionView tempFilePrefixView = createTempFilePrefixView(p, loadJobIdPrefixView); @@ -449,7 +449,7 @@ public WriteResult expandUntriggered(PCollection> inp final PCollectionView tempLoadJobIdPrefixView = createJobIdPrefixView(p, JobType.TEMP_TABLE_LOAD); final PCollectionView zeroLoadJobIdPrefixView = - createJobIdPrefixView(p, JobType.ZERO_LOAD); + createJobIdPrefixView(p, JobType.SCHEMA_UPDATE); final PCollectionView copyJobIdPrefixView = createJobIdPrefixView(p, JobType.COPY); final PCollectionView tempFilePrefixView = createTempFilePrefixView(p, loadJobIdPrefixView); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java index 431f6c71bf31..e97fc8c8aa09 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java @@ -68,10 +68,10 @@ static String createJobIdWithDestination( public enum JobType { LOAD, TEMP_TABLE_LOAD, - ZERO_LOAD, COPY, EXPORT, QUERY, + SCHEMA_UPDATE, } /**