From 3eeed5f56a61a2af45e9ff077c52452b191b7afb Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Fri, 4 Nov 2022 19:38:05 +0000 Subject: [PATCH 01/15] storage write api provider class skeleton --- ...torageWriteApiSchemaTransformProvider.java | 102 ++++++++++++++++++ .../gcp/bigquery/providers/package-info.java | 20 ++++ 2 files changed, 122 insertions(+) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/package-info.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java new file mode 100644 index 000000000000..73249b642b04 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery.providers; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollectionRowTuple; + +/** + * An implementation of {@link TypedSchemaTransformProvider} for BigQuery Storage Write API jobs + * configured via {@link BigQueryStorageWriteApiSchemaTransformConfiguration}. + * + *

Internal only: This class is actively being worked on, and it will likely change. We + * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam + * repository. + */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +@Experimental(Kind.SCHEMAS) +@AutoService(SchemaTransformProvider.class) +public class BigQueryStorageWriteApiSchemaTransformProvider + extends TypedSchemaTransformProvider { + private static final String INPUT_TAG = "INPUT_ROWS"; + private static final String OUTPUT_FAILED_ROWS_TAG = "FAILED_ROWS"; + + @Override + protected Class configurationClass() { + return BigQueryStorageWriteApiSchemaTransformConfiguration.class; + } + + @Override + protected SchemaTransform from( + BigQueryStorageWriteApiSchemaTransformConfiguration configuration) { + return new BigQueryStorageWriteApiSchemaTransform(configuration); + } + + @Override + public String identifier() { + return String.format("beam:transform:org.apache.beam:bigquery_storage_write:v1"); + } + + @Override + public List inputCollectionNames() { + return Collections.singletonList(INPUT_TAG); + } + + @Override + public List outputCollectionNames() { + return Collections.singletonList(OUTPUT_FAILED_ROWS_TAG); + } + + /** Configuration for writing to BigQuery with Storage Write API. */ + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class BigQueryStorageWriteApiSchemaTransformConfiguration {} + + private static class BigQueryStorageWriteApiSchemaTransform implements SchemaTransform { + private final BigQueryStorageWriteApiSchemaTransformConfiguration configuration; + + BigQueryStorageWriteApiSchemaTransform( + BigQueryStorageWriteApiSchemaTransformConfiguration configuration) { + this.configuration = configuration; + } + + @Override + public PTransform buildTransform() { + return new PTransform() { + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + return null; + } + }; + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/package-info.java new file mode 100644 index 000000000000..8fc849bf5144 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Defines SchemaTransformProviders for reading and writing from Google BigQuery. */ +package org.apache.beam.sdk.io.gcp.bigquery.providers; From 76d080556eeefa8535e547b65587c96c916ed92b Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Fri, 4 Nov 2022 19:51:54 +0000 Subject: [PATCH 02/15] storage write schematransform configuration with validation --- ...torageWriteApiSchemaTransformProvider.java | 130 +++++++++++++++++- 1 file changed, 129 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index 73249b642b04..80971dffd173 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -17,12 +17,22 @@ */ package org.apache.beam.sdk.io.gcp.bigquery.providers; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.services.bigquery.model.TableSchema; import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import java.util.Collections; import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices; import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; @@ -31,6 +41,8 @@ import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; /** * An implementation of {@link TypedSchemaTransformProvider} for BigQuery Storage Write API jobs @@ -79,7 +91,123 @@ public List outputCollectionNames() { /** Configuration for writing to BigQuery with Storage Write API. */ @DefaultSchema(AutoValueSchema.class) @AutoValue - public abstract static class BigQueryStorageWriteApiSchemaTransformConfiguration {} + public abstract static class BigQueryStorageWriteApiSchemaTransformConfiguration { + static final Map CREATE_DISPOSITIONS = + ImmutableMap.builder() + .put(CreateDisposition.CREATE_IF_NEEDED.name(), CreateDisposition.CREATE_IF_NEEDED) + .put(CreateDisposition.CREATE_NEVER.name(), CreateDisposition.CREATE_NEVER) + .build(); + + static final Map WRITE_DISPOSITIONS = + ImmutableMap.builder() + .put(WriteDisposition.WRITE_TRUNCATE.name(), WriteDisposition.WRITE_TRUNCATE) + .put(WriteDisposition.WRITE_EMPTY.name(), WriteDisposition.WRITE_EMPTY) + .put(WriteDisposition.WRITE_APPEND.name(), WriteDisposition.WRITE_APPEND) + .build(); + + public void validate() { + String invalidConfigMessage = "Invalid BigQuery Storage Write configuration: "; + + // validate output table spec + checkArgument( + !Strings.isNullOrEmpty(this.getOuputTable()), + invalidConfigMessage + "Table spec for a BigQuery Write must be specified."); + checkNotNull(BigQueryHelpers.parseTableSpec(this.getOuputTable())); + + // validate create and write dispositions + CreateDisposition createDisposition = null; + WriteDisposition writeDisposition = null; + if (!Strings.isNullOrEmpty(this.getCreateDisposition())) { + checkNotNull( + CREATE_DISPOSITIONS.get(this.getCreateDisposition().toUpperCase()), + invalidConfigMessage + + "Invalid create disposition was specified. Available dispositions are: ", + CREATE_DISPOSITIONS.keySet()); + createDisposition = CREATE_DISPOSITIONS.get(this.getCreateDisposition().toUpperCase()); + } + if (!Strings.isNullOrEmpty(this.getWriteDisposition())) { + checkNotNull( + WRITE_DISPOSITIONS.get(this.getWriteDisposition().toUpperCase()), + invalidConfigMessage + + "Invalid write disposition was specified. Available dispositions are: ", + WRITE_DISPOSITIONS.keySet()); + writeDisposition = WRITE_DISPOSITIONS.get(this.getWriteDisposition().toUpperCase()); + } + + // validate schema + if (!Strings.isNullOrEmpty(this.getJsonSchema())) { + // check if a TableSchema can be deserialized from the input schema string + checkNotNull(BigQueryHelpers.fromJsonString(this.getJsonSchema(), TableSchema.class)); + } else if (!this.getUseBeamSchema()) { + checkArgument( + createDisposition == CreateDisposition.CREATE_NEVER, + invalidConfigMessage + + "Create disposition is CREATE_IF_NEEDED, but no schema was provided."); + } + } + + /** + * Instantiates a {@link BigQueryStorageWriteApiSchemaTransformConfiguration.Builder} instance. + */ + public static Builder builder() { + return new AutoValue_BigQueryStorageWriteApiSchemaTransformProvider_BigQueryStorageWriteApiSchemaTransformConfiguration + .Builder(); + } + + @Nullable + public abstract String getOuputTable(); + + @Nullable + public abstract String getJsonSchema(); + + @Nullable + public abstract boolean getUseBeamSchema(); + + @Nullable + public abstract String getCreateDisposition(); + + @Nullable + public abstract String getWriteDisposition(); + + @Nullable + public abstract int getNumStorageWriteApiStreams(); + + @Nullable + public abstract int getNumFileShards(); + + @Nullable + public abstract int getTriggeringFrequencySeconds(); + + @Nullable + public abstract BigQueryServices getBigQueryServices(); + + /** Builder for {@link BigQueryStorageWriteApiSchemaTransformConfiguration}. */ + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setOuputTable(String tableSpec); + + public abstract Builder setJsonSchema(String jsonSchema); + + public abstract Builder setUseBeamSchema(boolean useBeamSchema); + + public abstract Builder setCreateDisposition(String createDisposition); + + public abstract Builder setWriteDisposition(String writeDisposition); + + public abstract Builder setNumStorageWriteApiStreams(int numStorageWriteApiStreams); + + public abstract Builder setNumFileShards(int numFileShards); + + public abstract Builder setTriggeringFrequencySeconds(int seconds); + + public abstract Builder setBigQueryServices(BigQueryServices bigQueryServices); + + /** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration} instance. */ + public abstract BigQueryStorageWriteApiSchemaTransformProvider + .BigQueryStorageWriteApiSchemaTransformConfiguration + build(); + } + } private static class BigQueryStorageWriteApiSchemaTransform implements SchemaTransform { private final BigQueryStorageWriteApiSchemaTransformConfiguration configuration; From f68a896c31ce8827e75d7efb1551d4580b565de2 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Mon, 7 Nov 2022 20:14:35 +0000 Subject: [PATCH 03/15] small fixes, more validation checks, and setting up the storage write transform --- ...torageWriteApiSchemaTransformProvider.java | 113 ++++++++++++++---- 1 file changed, 92 insertions(+), 21 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index 80971dffd173..0115d316f2a5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -20,6 +20,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; +import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; @@ -30,19 +31,29 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageApiInsertError; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; +import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration; import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.joda.time.Duration; /** * An implementation of {@link TypedSchemaTransformProvider} for BigQuery Storage Write API jobs @@ -59,7 +70,7 @@ @AutoService(SchemaTransformProvider.class) public class BigQueryStorageWriteApiSchemaTransformProvider extends TypedSchemaTransformProvider { - private static final String INPUT_TAG = "INPUT_ROWS"; + private static final String INPUT_ROWS_TAG = "INPUT_ROWS"; private static final String OUTPUT_FAILED_ROWS_TAG = "FAILED_ROWS"; @Override @@ -80,7 +91,7 @@ public String identifier() { @Override public List inputCollectionNames() { - return Collections.singletonList(INPUT_TAG); + return Collections.singletonList(INPUT_ROWS_TAG); } @Override @@ -110,9 +121,9 @@ public void validate() { // validate output table spec checkArgument( - !Strings.isNullOrEmpty(this.getOuputTable()), + !Strings.isNullOrEmpty(this.getOutputTable()), invalidConfigMessage + "Table spec for a BigQuery Write must be specified."); - checkNotNull(BigQueryHelpers.parseTableSpec(this.getOuputTable())); + checkNotNull(BigQueryHelpers.parseTableSpec(this.getOutputTable())); // validate create and write dispositions CreateDisposition createDisposition = null; @@ -144,6 +155,13 @@ public void validate() { invalidConfigMessage + "Create disposition is CREATE_IF_NEEDED, but no schema was provided."); } + + checkArgument(this.getNumStorageWriteApiStreams() == null || this.getNumStorageWriteApiStreams() > 0, + invalidConfigMessage + "When set, numStorageWriteApiStreams must be > 0, but was: %s", this.getNumStorageWriteApiStreams()); + checkArgument(this.getNumFileShards() == null || this.getNumFileShards() > 0, + invalidConfigMessage + "When set, numFileShards must be > 0, but was: %s", this.getNumFileShards()); + checkArgument(this.getTriggeringFrequencySeconds() == null || this.getTriggeringFrequencySeconds() > 0, + invalidConfigMessage + "When set, the trigger frequency must be > 0, but was: %s", this.getTriggeringFrequencySeconds()); } /** @@ -154,14 +172,13 @@ public static Builder builder() { .Builder(); } - @Nullable - public abstract String getOuputTable(); + public abstract String getOutputTable(); @Nullable public abstract String getJsonSchema(); @Nullable - public abstract boolean getUseBeamSchema(); + public abstract Boolean getUseBeamSchema(); @Nullable public abstract String getCreateDisposition(); @@ -170,13 +187,13 @@ public static Builder builder() { public abstract String getWriteDisposition(); @Nullable - public abstract int getNumStorageWriteApiStreams(); + public abstract Integer getNumStorageWriteApiStreams(); @Nullable - public abstract int getNumFileShards(); + public abstract Integer getNumFileShards(); @Nullable - public abstract int getTriggeringFrequencySeconds(); + public abstract Integer getTriggeringFrequencySeconds(); @Nullable public abstract BigQueryServices getBigQueryServices(); @@ -184,21 +201,21 @@ public static Builder builder() { /** Builder for {@link BigQueryStorageWriteApiSchemaTransformConfiguration}. */ @AutoValue.Builder public abstract static class Builder { - public abstract Builder setOuputTable(String tableSpec); + public abstract Builder setOutputTable(String tableSpec); public abstract Builder setJsonSchema(String jsonSchema); - public abstract Builder setUseBeamSchema(boolean useBeamSchema); + public abstract Builder setUseBeamSchema(Boolean useBeamSchema); public abstract Builder setCreateDisposition(String createDisposition); public abstract Builder setWriteDisposition(String writeDisposition); - public abstract Builder setNumStorageWriteApiStreams(int numStorageWriteApiStreams); + public abstract Builder setNumStorageWriteApiStreams(Integer numStorageWriteApiStreams); - public abstract Builder setNumFileShards(int numFileShards); + public abstract Builder setNumFileShards(Integer numFileShards); - public abstract Builder setTriggeringFrequencySeconds(int seconds); + public abstract Builder setTriggeringFrequencySeconds(Integer seconds); public abstract Builder setBigQueryServices(BigQueryServices bigQueryServices); @@ -209,22 +226,76 @@ public abstract static class Builder { } } + /** + * A {@link SchemaTransform} for BigQuery Storage Write API, configured with {@link + * BigQueryStorageWriteApiSchemaTransformConfiguration} and instantiated by {@link + * BigQueryStorageWriteApiSchemaTransformProvider}. + */ private static class BigQueryStorageWriteApiSchemaTransform implements SchemaTransform { private final BigQueryStorageWriteApiSchemaTransformConfiguration configuration; BigQueryStorageWriteApiSchemaTransform( BigQueryStorageWriteApiSchemaTransformConfiguration configuration) { + configuration.validate(); this.configuration = configuration; } @Override public PTransform buildTransform() { - return new PTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - return null; - } - }; + return new BigQueryStorageWriteApiPCollectionRowTupleTransform(configuration); + } + } + + static class BigQueryStorageWriteApiPCollectionRowTupleTransform extends PTransform { + private final BigQueryStorageWriteApiSchemaTransformConfiguration configuration; + + BigQueryStorageWriteApiPCollectionRowTupleTransform(BigQueryStorageWriteApiSchemaTransformConfiguration configuration) { + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + PCollection inputRows = input.get(INPUT_ROWS_TAG); + + BigQueryIO.Write write = createStorageWriteApiTransform(); + + WriteResult result = inputRows.apply(write); + + return null; + } + + BigQueryIO.Write createStorageWriteApiTransform() { + BigQueryIO.Write write = BigQueryIO.write().to(configuration.getOutputTable()) + .withFormatFunction(BigQueryUtils.toTableRow()); + if(!Strings.isNullOrEmpty(configuration.getJsonSchema())) { + write = write.withSchema(BigQueryHelpers.fromJsonString(configuration.getJsonSchema(), TableSchema.class)); + } + if (configuration.getUseBeamSchema() != null && configuration.getUseBeamSchema()) { + write = write.useBeamSchema(); + } + if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) { + CreateDisposition createDisposition = BigQueryStorageWriteApiSchemaTransformConfiguration.CREATE_DISPOSITIONS.get(configuration.getCreateDisposition()); + write = write.withCreateDisposition(createDisposition); + } + if (!Strings.isNullOrEmpty(configuration.getWriteDisposition())) { + WriteDisposition writeDisposition = BigQueryStorageWriteApiSchemaTransformConfiguration.WRITE_DISPOSITIONS.get(configuration.getWriteDisposition()); + write = write.withWriteDisposition(writeDisposition); + } + if (configuration.getNumStorageWriteApiStreams() != null) { + write = write.withNumStorageWriteApiStreams(configuration.getNumStorageWriteApiStreams()); + } + if (configuration.getNumFileShards() != null) { + write = write.withNumFileShards(configuration.getNumFileShards()); + } + if (configuration.getTriggeringFrequencySeconds() != null) { + write = write.withTriggeringFrequency(Duration.standardSeconds(configuration.getTriggeringFrequencySeconds())); + } + + if (configuration.getBigQueryServices() != null) { + write = write.withTestServices(configuration.getBigQueryServices()); + } + + return write; } } } From 8aef1d9559f1daef5d5eaecacc343ade585d8dfc Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Mon, 7 Nov 2022 21:31:55 +0000 Subject: [PATCH 04/15] PCollectionRowTuple with two outputs: failed rows and (errorrors+failed rows) --- ...torageWriteApiSchemaTransformProvider.java | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index 0115d316f2a5..6d43ee0dfbee 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -41,6 +41,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; @@ -72,6 +74,7 @@ public class BigQueryStorageWriteApiSchemaTransformProvider extends TypedSchemaTransformProvider { private static final String INPUT_ROWS_TAG = "INPUT_ROWS"; private static final String OUTPUT_FAILED_ROWS_TAG = "FAILED_ROWS"; + private static final String OUTPUT_ERRORS_TAG = "FAILED_ERRORS"; @Override protected Class configurationClass() { @@ -248,7 +251,6 @@ public PTransform buildTransform() { static class BigQueryStorageWriteApiPCollectionRowTupleTransform extends PTransform { private final BigQueryStorageWriteApiSchemaTransformConfiguration configuration; - BigQueryStorageWriteApiPCollectionRowTupleTransform(BigQueryStorageWriteApiSchemaTransformConfiguration configuration) { this.configuration = configuration; } @@ -261,7 +263,28 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { WriteResult result = inputRows.apply(write); - return null; + Schema rowSchema = !Strings.isNullOrEmpty(configuration.getJsonSchema()) + ? BigQueryUtils.fromTableSchema(BigQueryHelpers.fromJsonString(configuration.getJsonSchema(), TableSchema.class)) // get from input schema + : inputRows.getSchema(); // or get from PCollection + + Schema errorSchema = Schema.of(Field.of("failed_row", FieldType.row(rowSchema)), Field.of("error_message", FieldType.STRING)); + + PCollection storageInsertErrors = result.getFailedStorageApiInserts(); + + // Errors consisting of failed rows along with their error message + PCollection errorRows = storageInsertErrors + .apply(MapElements.into(TypeDescriptor.of(Row.class)).via( + (storageError) -> Row.withSchema(errorSchema) + .withFieldValue("failed_row", BigQueryUtils.toBeamRow(rowSchema, storageError.getRow())) + .withFieldValue("error_message", storageError.getErrorMessage()).build() + )); + + // Failed rows + PCollection failedRows = storageInsertErrors + .apply(MapElements.into(TypeDescriptor.of(Row.class)).via( + (storageError) -> BigQueryUtils.toBeamRow(rowSchema, storageError.getRow()))); + + return PCollectionRowTuple.of(OUTPUT_FAILED_ROWS_TAG, failedRows).and(OUTPUT_ERRORS_TAG, errorRows); } BigQueryIO.Write createStorageWriteApiTransform() { From 7f416394518f57aef9e17a7f0b254a0f517ac1b2 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Tue, 8 Nov 2022 15:36:06 +0000 Subject: [PATCH 05/15] spotless --- ...torageWriteApiSchemaTransformProvider.java | 101 ++++++++++++------ 1 file changed, 67 insertions(+), 34 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index 6d43ee0dfbee..7ba37b7ab10b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -20,7 +20,6 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; -import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; @@ -130,7 +129,6 @@ public void validate() { // validate create and write dispositions CreateDisposition createDisposition = null; - WriteDisposition writeDisposition = null; if (!Strings.isNullOrEmpty(this.getCreateDisposition())) { checkNotNull( CREATE_DISPOSITIONS.get(this.getCreateDisposition().toUpperCase()), @@ -145,7 +143,6 @@ public void validate() { invalidConfigMessage + "Invalid write disposition was specified. Available dispositions are: ", WRITE_DISPOSITIONS.keySet()); - writeDisposition = WRITE_DISPOSITIONS.get(this.getWriteDisposition().toUpperCase()); } // validate schema @@ -153,18 +150,26 @@ public void validate() { // check if a TableSchema can be deserialized from the input schema string checkNotNull(BigQueryHelpers.fromJsonString(this.getJsonSchema(), TableSchema.class)); } else if (!this.getUseBeamSchema()) { + // if no schema is provided, create disposition CREATE_NEVER has to be specified. checkArgument( createDisposition == CreateDisposition.CREATE_NEVER, invalidConfigMessage + "Create disposition is CREATE_IF_NEEDED, but no schema was provided."); } - checkArgument(this.getNumStorageWriteApiStreams() == null || this.getNumStorageWriteApiStreams() > 0, - invalidConfigMessage + "When set, numStorageWriteApiStreams must be > 0, but was: %s", this.getNumStorageWriteApiStreams()); - checkArgument(this.getNumFileShards() == null || this.getNumFileShards() > 0, - invalidConfigMessage + "When set, numFileShards must be > 0, but was: %s", this.getNumFileShards()); - checkArgument(this.getTriggeringFrequencySeconds() == null || this.getTriggeringFrequencySeconds() > 0, - invalidConfigMessage + "When set, the trigger frequency must be > 0, but was: %s", this.getTriggeringFrequencySeconds()); + // validation checks for streaming writes + checkArgument( + this.getNumStorageWriteApiStreams() == null || this.getNumStorageWriteApiStreams() > 0, + invalidConfigMessage + "When set, numStorageWriteApiStreams must be > 0, but was: %s", + this.getNumStorageWriteApiStreams()); + checkArgument( + this.getNumFileShards() == null || this.getNumFileShards() > 0, + invalidConfigMessage + "When set, numFileShards must be > 0, but was: %s", + this.getNumFileShards()); + checkArgument( + this.getTriggeringFrequencySeconds() == null || this.getTriggeringFrequencySeconds() > 0, + invalidConfigMessage + "When set, the trigger frequency must be > 0, but was: %s", + this.getTriggeringFrequencySeconds()); } /** @@ -249,9 +254,12 @@ public PTransform buildTransform() { } } - static class BigQueryStorageWriteApiPCollectionRowTupleTransform extends PTransform { + static class BigQueryStorageWriteApiPCollectionRowTupleTransform + extends PTransform { private final BigQueryStorageWriteApiSchemaTransformConfiguration configuration; - BigQueryStorageWriteApiPCollectionRowTupleTransform(BigQueryStorageWriteApiSchemaTransformConfiguration configuration) { + + BigQueryStorageWriteApiPCollectionRowTupleTransform( + BigQueryStorageWriteApiSchemaTransformConfiguration configuration) { this.configuration = configuration; } @@ -263,45 +271,68 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { WriteResult result = inputRows.apply(write); - Schema rowSchema = !Strings.isNullOrEmpty(configuration.getJsonSchema()) - ? BigQueryUtils.fromTableSchema(BigQueryHelpers.fromJsonString(configuration.getJsonSchema(), TableSchema.class)) // get from input schema - : inputRows.getSchema(); // or get from PCollection + Schema rowSchema = + !Strings.isNullOrEmpty(configuration.getJsonSchema()) + ? BigQueryUtils.fromTableSchema( + BigQueryHelpers.fromJsonString( + configuration.getJsonSchema(), TableSchema.class)) // get from input schema + : inputRows.getSchema(); // or get from PCollection - Schema errorSchema = Schema.of(Field.of("failed_row", FieldType.row(rowSchema)), Field.of("error_message", FieldType.STRING)); + Schema errorSchema = + Schema.of( + Field.of("failed_row", FieldType.row(rowSchema)), + Field.of("error_message", FieldType.STRING)); - PCollection storageInsertErrors = result.getFailedStorageApiInserts(); + PCollection storageInsertErrors = + result.getFailedStorageApiInserts(); // Errors consisting of failed rows along with their error message - PCollection errorRows = storageInsertErrors - .apply(MapElements.into(TypeDescriptor.of(Row.class)).via( - (storageError) -> Row.withSchema(errorSchema) - .withFieldValue("failed_row", BigQueryUtils.toBeamRow(rowSchema, storageError.getRow())) - .withFieldValue("error_message", storageError.getErrorMessage()).build() - )); + PCollection errorRows = + storageInsertErrors.apply( + MapElements.into(TypeDescriptor.of(Row.class)) + .via( + (storageError) -> + Row.withSchema(errorSchema) + .withFieldValue( + "failed_row", + BigQueryUtils.toBeamRow(rowSchema, storageError.getRow())) + .withFieldValue("error_message", storageError.getErrorMessage()) + .build())); // Failed rows - PCollection failedRows = storageInsertErrors - .apply(MapElements.into(TypeDescriptor.of(Row.class)).via( - (storageError) -> BigQueryUtils.toBeamRow(rowSchema, storageError.getRow()))); - - return PCollectionRowTuple.of(OUTPUT_FAILED_ROWS_TAG, failedRows).and(OUTPUT_ERRORS_TAG, errorRows); + PCollection failedRows = + storageInsertErrors.apply( + MapElements.into(TypeDescriptor.of(Row.class)) + .via( + (storageError) -> BigQueryUtils.toBeamRow(rowSchema, storageError.getRow()))); + + return PCollectionRowTuple.of(OUTPUT_FAILED_ROWS_TAG, failedRows) + .and(OUTPUT_ERRORS_TAG, errorRows); } BigQueryIO.Write createStorageWriteApiTransform() { - BigQueryIO.Write write = BigQueryIO.write().to(configuration.getOutputTable()) - .withFormatFunction(BigQueryUtils.toTableRow()); - if(!Strings.isNullOrEmpty(configuration.getJsonSchema())) { - write = write.withSchema(BigQueryHelpers.fromJsonString(configuration.getJsonSchema(), TableSchema.class)); + BigQueryIO.Write write = + BigQueryIO.write() + .to(configuration.getOutputTable()) + .withFormatFunction(BigQueryUtils.toTableRow()); + if (!Strings.isNullOrEmpty(configuration.getJsonSchema())) { + write = + write.withSchema( + BigQueryHelpers.fromJsonString(configuration.getJsonSchema(), TableSchema.class)); } if (configuration.getUseBeamSchema() != null && configuration.getUseBeamSchema()) { write = write.useBeamSchema(); } if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) { - CreateDisposition createDisposition = BigQueryStorageWriteApiSchemaTransformConfiguration.CREATE_DISPOSITIONS.get(configuration.getCreateDisposition()); + CreateDisposition createDisposition = + BigQueryStorageWriteApiSchemaTransformConfiguration.CREATE_DISPOSITIONS.get( + configuration.getCreateDisposition()); write = write.withCreateDisposition(createDisposition); } if (!Strings.isNullOrEmpty(configuration.getWriteDisposition())) { - WriteDisposition writeDisposition = BigQueryStorageWriteApiSchemaTransformConfiguration.WRITE_DISPOSITIONS.get(configuration.getWriteDisposition()); + WriteDisposition writeDisposition = + BigQueryStorageWriteApiSchemaTransformConfiguration.WRITE_DISPOSITIONS.get( + configuration.getWriteDisposition()); write = write.withWriteDisposition(writeDisposition); } if (configuration.getNumStorageWriteApiStreams() != null) { @@ -311,7 +342,9 @@ BigQueryIO.Write createStorageWriteApiTransform() { write = write.withNumFileShards(configuration.getNumFileShards()); } if (configuration.getTriggeringFrequencySeconds() != null) { - write = write.withTriggeringFrequency(Duration.standardSeconds(configuration.getTriggeringFrequencySeconds())); + write = + write.withTriggeringFrequency( + Duration.standardSeconds(configuration.getTriggeringFrequencySeconds())); } if (configuration.getBigQueryServices() != null) { From 367cbb27aaa3be766f1a92d1872a0f9a37dc751b Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Mon, 28 Nov 2022 13:40:40 +0000 Subject: [PATCH 06/15] bq services does not belong as a configuration field --- .../BigQueryStorageWriteApiSchemaTransformProvider.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index 7ba37b7ab10b..19771756870f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -203,9 +203,6 @@ public static Builder builder() { @Nullable public abstract Integer getTriggeringFrequencySeconds(); - @Nullable - public abstract BigQueryServices getBigQueryServices(); - /** Builder for {@link BigQueryStorageWriteApiSchemaTransformConfiguration}. */ @AutoValue.Builder public abstract static class Builder { @@ -225,8 +222,6 @@ public abstract static class Builder { public abstract Builder setTriggeringFrequencySeconds(Integer seconds); - public abstract Builder setBigQueryServices(BigQueryServices bigQueryServices); - /** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration} instance. */ public abstract BigQueryStorageWriteApiSchemaTransformProvider .BigQueryStorageWriteApiSchemaTransformConfiguration @@ -347,10 +342,6 @@ BigQueryIO.Write createStorageWriteApiTransform() { Duration.standardSeconds(configuration.getTriggeringFrequencySeconds())); } - if (configuration.getBigQueryServices() != null) { - write = write.withTestServices(configuration.getBigQueryServices()); - } - return write; } } From ede6945142d1b1eb4eb7c970bd965ebc2c478c4b Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Mon, 28 Nov 2022 14:35:03 +0000 Subject: [PATCH 07/15] add AT_LEAST_ONCE semantics. also set test bq services --- ...torageWriteApiSchemaTransformProvider.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index 19771756870f..e3c23457b76e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -31,6 +31,8 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices; @@ -52,6 +54,7 @@ import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.joda.time.Duration; @@ -203,6 +206,9 @@ public static Builder builder() { @Nullable public abstract Integer getTriggeringFrequencySeconds(); + @Nullable + public abstract Boolean getUseAtLeastOnceSemantics(); + /** Builder for {@link BigQueryStorageWriteApiSchemaTransformConfiguration}. */ @AutoValue.Builder public abstract static class Builder { @@ -222,6 +228,8 @@ public abstract static class Builder { public abstract Builder setTriggeringFrequencySeconds(Integer seconds); + public abstract Builder setUseAtLeastOnceSemantics(Boolean use); + /** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration} instance. */ public abstract BigQueryStorageWriteApiSchemaTransformProvider .BigQueryStorageWriteApiSchemaTransformConfiguration @@ -252,14 +260,23 @@ public PTransform buildTransform() { static class BigQueryStorageWriteApiPCollectionRowTupleTransform extends PTransform { private final BigQueryStorageWriteApiSchemaTransformConfiguration configuration; + private BigQueryServices testBigQueryServices = null; BigQueryStorageWriteApiPCollectionRowTupleTransform( BigQueryStorageWriteApiSchemaTransformConfiguration configuration) { this.configuration = configuration; } + @VisibleForTesting + public void setBigQueryServices(BigQueryServices testBigQueryServices) { + this.testBigQueryServices = testBigQueryServices; + } + @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { + // Check that the input exists + checkArgument(input.has(INPUT_ROWS_TAG), "Missing expected input tag: %s", INPUT_ROWS_TAG); + PCollection inputRows = input.get(INPUT_ROWS_TAG); BigQueryIO.Write write = createStorageWriteApiTransform(); @@ -306,10 +323,16 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } BigQueryIO.Write createStorageWriteApiTransform() { + Method writeMethod = + configuration.getUseAtLeastOnceSemantics() != null && configuration.getUseAtLeastOnceSemantics() + ? Method.STORAGE_API_AT_LEAST_ONCE + : Method.STORAGE_WRITE_API; BigQueryIO.Write write = BigQueryIO.write() .to(configuration.getOutputTable()) + .withMethod(writeMethod) .withFormatFunction(BigQueryUtils.toTableRow()); + if (!Strings.isNullOrEmpty(configuration.getJsonSchema())) { write = write.withSchema( @@ -342,6 +365,10 @@ BigQueryIO.Write createStorageWriteApiTransform() { Duration.standardSeconds(configuration.getTriggeringFrequencySeconds())); } + if (this.testBigQueryServices != null) { + write = write.withTestServices(testBigQueryServices); + } + return write; } } From cfc0c2a3e7dec424a77cf8e382663f5d15252088 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Tue, 29 Nov 2022 17:08:35 +0000 Subject: [PATCH 08/15] test config validation and successful writes --- ...geWriteApiSchemaTransformProviderTest.java | 175 ++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java new file mode 100644 index 000000000000..4fd9e7121b11 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java @@ -0,0 +1,175 @@ +package org.apache.beam.sdk.io.gcp.bigquery.providers; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; +import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiPCollectionRowTupleTransform; +import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration; +import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; +import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; +import org.apache.beam.sdk.io.gcp.testing.FakeJobService; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.transforms.Select; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class BigQueryStorageWriteApiSchemaTransformProviderTest { + private FakeDatasetService fakeDatasetService = new FakeDatasetService(); + private FakeJobService fakeJobService = new FakeJobService(); + private FakeBigQueryServices fakeBigQueryServices = + new FakeBigQueryServices() + .withDatasetService(fakeDatasetService) + .withJobService(fakeJobService); + + private static final Schema SCHEMA = Schema.of( + Field.of("name", FieldType.STRING), + Field.of("number", FieldType.INT64), + Field.of("dt", FieldType.logicalType(SqlTypes.DATETIME))); + + private static final List ROWS = Arrays.asList( + Row.withSchema(SCHEMA) + .withFieldValue("name", "a") + .withFieldValue("number", 1L) + .withFieldValue("dt", LocalDateTime.parse("2000-01-01T00:00:00")) + .build(), + Row.withSchema(SCHEMA) + .withFieldValue("name", "b") + .withFieldValue("number", 2L) + .withFieldValue("dt", LocalDateTime.parse("2000-01-02T00:00:00")) + .build(), + Row.withSchema(SCHEMA) + .withFieldValue("name", "c") + .withFieldValue("number", 3L) + .withFieldValue("dt", LocalDateTime.parse("2000-01-03T00:00:00")) + .build()); + + @Rule + public final transient TestPipeline p = TestPipeline.create(); + + @Before + public void setUp() throws Exception{ + FakeDatasetService.setUp(); + + fakeDatasetService.createDataset("project", "dataset", "", "", null); + } + + @Test + public void testInvalidConfig() { + List invalidConfigs = + Arrays.asList( + BigQueryStorageWriteApiSchemaTransformConfiguration.builder() + .setOutputTable("not_a_valid_table_spec"), + BigQueryStorageWriteApiSchemaTransformConfiguration.builder() + .setOutputTable("project:dataset.table") + .setCreateDisposition("INVALID_DISPOSITION"), + BigQueryStorageWriteApiSchemaTransformConfiguration.builder() + .setOutputTable("project:dataset.table") + .setJsonSchema("not a valid schema"), + BigQueryStorageWriteApiSchemaTransformConfiguration.builder() + .setOutputTable("create_table:without.schema") + .setUseBeamSchema(false)); + + for (BigQueryStorageWriteApiSchemaTransformConfiguration.Builder config: invalidConfigs) { + assertThrows(Exception.class, () -> {config.build().validate();}); + } + } + + public PCollectionRowTuple runWithConfig(BigQueryStorageWriteApiSchemaTransformConfiguration config) { + BigQueryStorageWriteApiSchemaTransformProvider provider = new BigQueryStorageWriteApiSchemaTransformProvider(); + + BigQueryStorageWriteApiPCollectionRowTupleTransform writeRowTupleTransform = + (BigQueryStorageWriteApiPCollectionRowTupleTransform) provider.from(config).buildTransform(); + + writeRowTupleTransform.setBigQueryServices(fakeBigQueryServices); + String tag = provider.inputCollectionNames().get(0); + + PCollection rows = p.apply(Create.of(ROWS).withRowSchema(SCHEMA)); + + PCollectionRowTuple input = PCollectionRowTuple.of(tag, rows.setRowSchema(SCHEMA)); + PCollectionRowTuple result = input.apply(writeRowTupleTransform); + + return result; + } + + @Test + @Ignore + public void testSimpleWrite() throws Exception{ + String tableSpec = "project:dataset.simple_write"; + BigQueryStorageWriteApiSchemaTransformConfiguration config = + BigQueryStorageWriteApiSchemaTransformConfiguration.builder() + .setOutputTable(tableSpec) + .setUseBeamSchema(true) + .build(); + + runWithConfig(config); + p.run().waitUntilFinish(); + + assertNotNull(fakeDatasetService.getTable(BigQueryHelpers.parseTableSpec(tableSpec))); + assertEquals(ROWS.size(), fakeDatasetService.getAllRows("project", "dataset", "simple_write").size()); + } + + @Test + @Ignore + public void testWithJsonSchema() throws Exception { + String tableSpec = "project:dataset.with_json_schema"; + + String jsonSchema = + "{" + + " \"fields\": [" + + " {" + + " \"name\": \"name\"," + + " \"type\": \"STRING\"," + + " \"mode\": \"REQUIRED\"" + + " }," + + " {" + + " \"name\": \"number\"," + + " \"type\": \"INTEGER\"," + + " \"mode\": \"REQUIRED\"" + + " }," + + " {" + + " \"name\": \"dt\"," + + " \"type\": \"DATETIME\"," + + " \"mode\": \"REQUIRED\"" + + " }" + + " ]" + + "}"; + + BigQueryStorageWriteApiSchemaTransformConfiguration config = + BigQueryStorageWriteApiSchemaTransformConfiguration.builder() + .setOutputTable(tableSpec) + .setJsonSchema(jsonSchema) + .setUseBeamSchema(false) + .build(); + + runWithConfig(config); + p.run().waitUntilFinish(); + + assertNotNull(fakeDatasetService.getTable(BigQueryHelpers.parseTableSpec(tableSpec))); + assertEquals(ROWS.size(), fakeDatasetService.getAllRows("project", "dataset", "with_json_schema").size()); + } +} From dbc6e091b20b8ded416af78f1976e4102e6ca037 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Thu, 1 Dec 2022 21:41:06 +0000 Subject: [PATCH 09/15] test for failed rows --- ...torageWriteApiSchemaTransformProvider.java | 68 +++---- ...geWriteApiSchemaTransformProviderTest.java | 189 ++++++++++++------ 2 files changed, 154 insertions(+), 103 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index e3c23457b76e..3e8a551050ee 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -31,12 +31,10 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageApiInsertError; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration; @@ -75,7 +73,7 @@ public class BigQueryStorageWriteApiSchemaTransformProvider extends TypedSchemaTransformProvider { private static final String INPUT_ROWS_TAG = "INPUT_ROWS"; - private static final String OUTPUT_FAILED_ROWS_TAG = "FAILED_ROWS"; + // private static final String OUTPUT_FAILED_ROWS_TAG = "FAILED_ROWS"; private static final String OUTPUT_ERRORS_TAG = "FAILED_ERRORS"; @Override @@ -101,7 +99,7 @@ public List inputCollectionNames() { @Override public List outputCollectionNames() { - return Collections.singletonList(OUTPUT_FAILED_ROWS_TAG); + return Collections.singletonList(OUTPUT_ERRORS_TAG); } /** Configuration for writing to BigQuery with Storage Write API. */ @@ -133,8 +131,8 @@ public void validate() { // validate create and write dispositions CreateDisposition createDisposition = null; if (!Strings.isNullOrEmpty(this.getCreateDisposition())) { - checkNotNull( - CREATE_DISPOSITIONS.get(this.getCreateDisposition().toUpperCase()), + checkArgument( + CREATE_DISPOSITIONS.get(this.getCreateDisposition().toUpperCase()) != null, invalidConfigMessage + "Invalid create disposition was specified. Available dispositions are: ", CREATE_DISPOSITIONS.keySet()); @@ -152,7 +150,7 @@ public void validate() { if (!Strings.isNullOrEmpty(this.getJsonSchema())) { // check if a TableSchema can be deserialized from the input schema string checkNotNull(BigQueryHelpers.fromJsonString(this.getJsonSchema(), TableSchema.class)); - } else if (!this.getUseBeamSchema()) { + } else if (this.getUseBeamSchema() == null || !this.getUseBeamSchema()) { // if no schema is provided, create disposition CREATE_NEVER has to be specified. checkArgument( createDisposition == CreateDisposition.CREATE_NEVER, @@ -276,57 +274,41 @@ public void setBigQueryServices(BigQueryServices testBigQueryServices) { public PCollectionRowTuple expand(PCollectionRowTuple input) { // Check that the input exists checkArgument(input.has(INPUT_ROWS_TAG), "Missing expected input tag: %s", INPUT_ROWS_TAG); - PCollection inputRows = input.get(INPUT_ROWS_TAG); BigQueryIO.Write write = createStorageWriteApiTransform(); WriteResult result = inputRows.apply(write); - Schema rowSchema = - !Strings.isNullOrEmpty(configuration.getJsonSchema()) - ? BigQueryUtils.fromTableSchema( - BigQueryHelpers.fromJsonString( - configuration.getJsonSchema(), TableSchema.class)) // get from input schema - : inputRows.getSchema(); // or get from PCollection - Schema errorSchema = Schema.of( - Field.of("failed_row", FieldType.row(rowSchema)), + Field.of("failed_row", FieldType.STRING), Field.of("error_message", FieldType.STRING)); - PCollection storageInsertErrors = - result.getFailedStorageApiInserts(); - // Errors consisting of failed rows along with their error message PCollection errorRows = - storageInsertErrors.apply( - MapElements.into(TypeDescriptor.of(Row.class)) - .via( - (storageError) -> - Row.withSchema(errorSchema) - .withFieldValue( - "failed_row", - BigQueryUtils.toBeamRow(rowSchema, storageError.getRow())) - .withFieldValue("error_message", storageError.getErrorMessage()) - .build())); - - // Failed rows - PCollection failedRows = - storageInsertErrors.apply( - MapElements.into(TypeDescriptor.of(Row.class)) - .via( - (storageError) -> BigQueryUtils.toBeamRow(rowSchema, storageError.getRow()))); - - return PCollectionRowTuple.of(OUTPUT_FAILED_ROWS_TAG, failedRows) - .and(OUTPUT_ERRORS_TAG, errorRows); + result + .getFailedStorageApiInserts() + .apply( + "Extract Errors", + MapElements.into(TypeDescriptor.of(Row.class)) + .via( + (storageError) -> + Row.withSchema(errorSchema) + .withFieldValue("error_message", storageError.getErrorMessage()) + .withFieldValue("failed_row", storageError.getRow().toString()) + .build())) + .setRowSchema(errorSchema); + + return PCollectionRowTuple.of(OUTPUT_ERRORS_TAG, errorRows); } BigQueryIO.Write createStorageWriteApiTransform() { Method writeMethod = - configuration.getUseAtLeastOnceSemantics() != null && configuration.getUseAtLeastOnceSemantics() - ? Method.STORAGE_API_AT_LEAST_ONCE - : Method.STORAGE_WRITE_API; + configuration.getUseAtLeastOnceSemantics() != null + && configuration.getUseAtLeastOnceSemantics() + ? Method.STORAGE_API_AT_LEAST_ONCE + : Method.STORAGE_WRITE_API; BigQueryIO.Write write = BigQueryIO.write() .to(configuration.getOutputTable()) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java index 4fd9e7121b11..aad71a86bfe1 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java @@ -1,14 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.io.gcp.bigquery.providers; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiPCollectionRowTupleTransform; import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration; import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; @@ -18,18 +36,12 @@ import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; -import org.apache.beam.sdk.schemas.transforms.Select; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.beam.sdk.values.TypeDescriptors; import org.junit.Before; import org.junit.Ignore; import org.junit.Rule; @@ -46,33 +58,34 @@ public class BigQueryStorageWriteApiSchemaTransformProviderTest { .withDatasetService(fakeDatasetService) .withJobService(fakeJobService); - private static final Schema SCHEMA = Schema.of( - Field.of("name", FieldType.STRING), - Field.of("number", FieldType.INT64), - Field.of("dt", FieldType.logicalType(SqlTypes.DATETIME))); - - private static final List ROWS = Arrays.asList( - Row.withSchema(SCHEMA) - .withFieldValue("name", "a") - .withFieldValue("number", 1L) - .withFieldValue("dt", LocalDateTime.parse("2000-01-01T00:00:00")) - .build(), - Row.withSchema(SCHEMA) - .withFieldValue("name", "b") - .withFieldValue("number", 2L) - .withFieldValue("dt", LocalDateTime.parse("2000-01-02T00:00:00")) - .build(), - Row.withSchema(SCHEMA) - .withFieldValue("name", "c") - .withFieldValue("number", 3L) - .withFieldValue("dt", LocalDateTime.parse("2000-01-03T00:00:00")) - .build()); - - @Rule - public final transient TestPipeline p = TestPipeline.create(); + private static final Schema SCHEMA = + Schema.of( + Field.of("name", FieldType.STRING), + Field.of("number", FieldType.INT64), + Field.of("dt", FieldType.logicalType(SqlTypes.DATETIME))); + + private static final List ROWS = + Arrays.asList( + Row.withSchema(SCHEMA) + .withFieldValue("name", "a") + .withFieldValue("number", 1L) + .withFieldValue("dt", LocalDateTime.parse("2000-01-01T00:00:00")) + .build(), + Row.withSchema(SCHEMA) + .withFieldValue("name", "b") + .withFieldValue("number", 2L) + .withFieldValue("dt", LocalDateTime.parse("2000-01-02T00:00:00")) + .build(), + Row.withSchema(SCHEMA) + .withFieldValue("name", "c") + .withFieldValue("number", 3L) + .withFieldValue("dt", LocalDateTime.parse("2000-01-03T00:00:00")) + .build()); + + @Rule public final transient TestPipeline p = TestPipeline.create(); @Before - public void setUp() throws Exception{ + public void setUp() throws Exception { FakeDatasetService.setUp(); fakeDatasetService.createDataset("project", "dataset", "", "", null); @@ -94,16 +107,23 @@ public void testInvalidConfig() { .setOutputTable("create_table:without.schema") .setUseBeamSchema(false)); - for (BigQueryStorageWriteApiSchemaTransformConfiguration.Builder config: invalidConfigs) { - assertThrows(Exception.class, () -> {config.build().validate();}); + for (BigQueryStorageWriteApiSchemaTransformConfiguration.Builder config : invalidConfigs) { + assertThrows( + Exception.class, + () -> { + config.build().validate(); + }); } } - public PCollectionRowTuple runWithConfig(BigQueryStorageWriteApiSchemaTransformConfiguration config) { - BigQueryStorageWriteApiSchemaTransformProvider provider = new BigQueryStorageWriteApiSchemaTransformProvider(); + public PCollectionRowTuple runWithConfig( + BigQueryStorageWriteApiSchemaTransformConfiguration config) { + BigQueryStorageWriteApiSchemaTransformProvider provider = + new BigQueryStorageWriteApiSchemaTransformProvider(); BigQueryStorageWriteApiPCollectionRowTupleTransform writeRowTupleTransform = - (BigQueryStorageWriteApiPCollectionRowTupleTransform) provider.from(config).buildTransform(); + (BigQueryStorageWriteApiPCollectionRowTupleTransform) + provider.from(config).buildTransform(); writeRowTupleTransform.setBigQueryServices(fakeBigQueryServices); String tag = provider.inputCollectionNames().get(0); @@ -118,7 +138,7 @@ public PCollectionRowTuple runWithConfig(BigQueryStorageWriteApiSchemaTransformC @Test @Ignore - public void testSimpleWrite() throws Exception{ + public void testSimpleWrite() throws Exception { String tableSpec = "project:dataset.simple_write"; BigQueryStorageWriteApiSchemaTransformConfiguration config = BigQueryStorageWriteApiSchemaTransformConfiguration.builder() @@ -130,7 +150,8 @@ public void testSimpleWrite() throws Exception{ p.run().waitUntilFinish(); assertNotNull(fakeDatasetService.getTable(BigQueryHelpers.parseTableSpec(tableSpec))); - assertEquals(ROWS.size(), fakeDatasetService.getAllRows("project", "dataset", "simple_write").size()); + assertEquals( + ROWS.size(), fakeDatasetService.getAllRows("project", "dataset", "simple_write").size()); } @Test @@ -139,25 +160,25 @@ public void testWithJsonSchema() throws Exception { String tableSpec = "project:dataset.with_json_schema"; String jsonSchema = - "{" - + " \"fields\": [" - + " {" - + " \"name\": \"name\"," - + " \"type\": \"STRING\"," - + " \"mode\": \"REQUIRED\"" - + " }," - + " {" - + " \"name\": \"number\"," - + " \"type\": \"INTEGER\"," - + " \"mode\": \"REQUIRED\"" - + " }," - + " {" - + " \"name\": \"dt\"," - + " \"type\": \"DATETIME\"," - + " \"mode\": \"REQUIRED\"" - + " }" - + " ]" - + "}"; + "{" + + " \"fields\": [" + + " {" + + " \"name\": \"name\"," + + " \"type\": \"STRING\"," + + " \"mode\": \"REQUIRED\"" + + " }," + + " {" + + " \"name\": \"number\"," + + " \"type\": \"INTEGER\"," + + " \"mode\": \"REQUIRED\"" + + " }," + + " {" + + " \"name\": \"dt\"," + + " \"type\": \"DATETIME\"," + + " \"mode\": \"REQUIRED\"" + + " }" + + " ]" + + "}"; BigQueryStorageWriteApiSchemaTransformConfiguration config = BigQueryStorageWriteApiSchemaTransformConfiguration.builder() @@ -170,6 +191,54 @@ public void testWithJsonSchema() throws Exception { p.run().waitUntilFinish(); assertNotNull(fakeDatasetService.getTable(BigQueryHelpers.parseTableSpec(tableSpec))); - assertEquals(ROWS.size(), fakeDatasetService.getAllRows("project", "dataset", "with_json_schema").size()); + assertEquals( + ROWS.size(), + fakeDatasetService.getAllRows("project", "dataset", "with_json_schema").size()); + } + + @Test + public void testFailedRows() { + String tableSpec = "project:dataset.with_json_schema"; + + String jsonSchema = + "{" + + " \"fields\": [" + + " {" + + " \"name\": \"wrong_column\"," + + " \"type\": \"BOOLEAN\"," + + " \"mode\": \"REQUIRED\"" + + " }" + + " ]" + + "}"; + + BigQueryStorageWriteApiSchemaTransformConfiguration config = + BigQueryStorageWriteApiSchemaTransformConfiguration.builder() + .setOutputTable(tableSpec) + .setJsonSchema(jsonSchema) + .setUseBeamSchema(false) + .build(); + + PCollectionRowTuple result = runWithConfig(config); + PCollection failedRows = result.get("FAILED_ERRORS"); + + Schema errorSchema = + Schema.of( + Field.of("failed_row", FieldType.STRING), Field.of("error_message", FieldType.STRING)); + + List expectedFailedRows = new ArrayList<>(); + for (Row row : ROWS) { + String failedTableRow = BigQueryUtils.toTableRow(row).toString(); + String errorMessage = + "org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto$SchemaTooNarrowException: " + + "TableRow contained unexpected field with name name not found in schema for __root__"; + expectedFailedRows.add( + Row.withSchema(errorSchema) + .withFieldValue("failed_row", failedTableRow) + .withFieldValue("error_message", errorMessage) + .build()); + } + + PAssert.that(failedRows).containsInAnyOrder(expectedFailedRows); + p.run().waitUntilFinish(); } } From 8177bca61683f520e625cc95fef3eb4a15467c68 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Mon, 5 Dec 2022 17:48:38 +0000 Subject: [PATCH 10/15] test for failed rows --- .../BigQueryStorageWriteApiSchemaTransformProviderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java index aad71a86bfe1..30c45a82e70a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java @@ -230,7 +230,7 @@ public void testFailedRows() { String failedTableRow = BigQueryUtils.toTableRow(row).toString(); String errorMessage = "org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto$SchemaTooNarrowException: " - + "TableRow contained unexpected field with name name not found in schema for __root__"; + + "TableRow contained unexpected field with name name not found in schema for root"; expectedFailedRows.add( Row.withSchema(errorSchema) .withFieldValue("failed_row", failedTableRow) From 8aa51d4c52798e59ab628c3c1ed328567b25cb17 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Tue, 6 Dec 2022 12:30:32 +0000 Subject: [PATCH 11/15] some fixes --- .../BigQueryStorageWriteApiSchemaTransformProvider.java | 3 +-- ...igQueryStorageWriteApiSchemaTransformProviderTest.java | 8 ++------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index 3e8a551050ee..eb0f432039b1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -73,8 +73,7 @@ public class BigQueryStorageWriteApiSchemaTransformProvider extends TypedSchemaTransformProvider { private static final String INPUT_ROWS_TAG = "INPUT_ROWS"; - // private static final String OUTPUT_FAILED_ROWS_TAG = "FAILED_ROWS"; - private static final String OUTPUT_ERRORS_TAG = "FAILED_ERRORS"; + private static final String OUTPUT_ERRORS_TAG = "ERROR_ROWS"; @Override protected Class configurationClass() { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java index 30c45a82e70a..dd188f15b177 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java @@ -43,7 +43,6 @@ import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -130,14 +129,13 @@ public PCollectionRowTuple runWithConfig( PCollection rows = p.apply(Create.of(ROWS).withRowSchema(SCHEMA)); - PCollectionRowTuple input = PCollectionRowTuple.of(tag, rows.setRowSchema(SCHEMA)); + PCollectionRowTuple input = PCollectionRowTuple.of(tag, rows); PCollectionRowTuple result = input.apply(writeRowTupleTransform); return result; } @Test - @Ignore public void testSimpleWrite() throws Exception { String tableSpec = "project:dataset.simple_write"; BigQueryStorageWriteApiSchemaTransformConfiguration config = @@ -155,7 +153,6 @@ public void testSimpleWrite() throws Exception { } @Test - @Ignore public void testWithJsonSchema() throws Exception { String tableSpec = "project:dataset.with_json_schema"; @@ -184,7 +181,6 @@ public void testWithJsonSchema() throws Exception { BigQueryStorageWriteApiSchemaTransformConfiguration.builder() .setOutputTable(tableSpec) .setJsonSchema(jsonSchema) - .setUseBeamSchema(false) .build(); runWithConfig(config); @@ -219,7 +215,7 @@ public void testFailedRows() { .build(); PCollectionRowTuple result = runWithConfig(config); - PCollection failedRows = result.get("FAILED_ERRORS"); + PCollection failedRows = result.get("ERROR_ROWS"); Schema errorSchema = Schema.of( From 1b10d3a67e02527e73893d802ea13d968bdfe9aa Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Wed, 7 Dec 2022 18:20:30 +0000 Subject: [PATCH 12/15] experiment --- .../BigQueryStorageWriteApiSchemaTransformProvider.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index eb0f432039b1..9366d14556ff 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -56,6 +56,8 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An implementation of {@link TypedSchemaTransformProvider} for BigQuery Storage Write API jobs @@ -72,6 +74,8 @@ @AutoService(SchemaTransformProvider.class) public class BigQueryStorageWriteApiSchemaTransformProvider extends TypedSchemaTransformProvider { + private static final Logger LOG = LoggerFactory.getLogger(BigQueryStorageWriteApiSchemaTransformProvider.class); + private static final String INPUT_ROWS_TAG = "INPUT_ROWS"; private static final String OUTPUT_ERRORS_TAG = "ERROR_ROWS"; @@ -277,6 +281,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { BigQueryIO.Write write = createStorageWriteApiTransform(); + LOG.info("JSON SCHEMA: " + configuration.getJsonSchema()); WriteResult result = inputRows.apply(write); Schema errorSchema = From c25044defee740f521ff07afad368f1a831b65ac Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Wed, 7 Dec 2022 21:36:45 +0000 Subject: [PATCH 13/15] use autoSharding, set default triggering frequency, use beameam schema --- ...torageWriteApiSchemaTransformProvider.java | 95 +++------------- ...geWriteApiSchemaTransformProviderTest.java | 106 +----------------- 2 files changed, 21 insertions(+), 180 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index 9366d14556ff..570e8487f0bf 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -20,7 +20,6 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; -import com.google.api.services.bigquery.model.TableSchema; import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import java.util.Collections; @@ -49,6 +48,7 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; @@ -56,8 +56,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * An implementation of {@link TypedSchemaTransformProvider} for BigQuery Storage Write API jobs @@ -74,8 +72,7 @@ @AutoService(SchemaTransformProvider.class) public class BigQueryStorageWriteApiSchemaTransformProvider extends TypedSchemaTransformProvider { - private static final Logger LOG = LoggerFactory.getLogger(BigQueryStorageWriteApiSchemaTransformProvider.class); - + private static final Duration DEFAULT_TRIGGERING_FREQUENCY = Duration.standardSeconds(60); private static final String INPUT_ROWS_TAG = "INPUT_ROWS"; private static final String OUTPUT_ERRORS_TAG = "ERROR_ROWS"; @@ -127,19 +124,17 @@ public void validate() { // validate output table spec checkArgument( - !Strings.isNullOrEmpty(this.getOutputTable()), + !Strings.isNullOrEmpty(this.getTable()), invalidConfigMessage + "Table spec for a BigQuery Write must be specified."); - checkNotNull(BigQueryHelpers.parseTableSpec(this.getOutputTable())); + checkNotNull(BigQueryHelpers.parseTableSpec(this.getTable())); // validate create and write dispositions - CreateDisposition createDisposition = null; if (!Strings.isNullOrEmpty(this.getCreateDisposition())) { checkArgument( CREATE_DISPOSITIONS.get(this.getCreateDisposition().toUpperCase()) != null, invalidConfigMessage + "Invalid create disposition was specified. Available dispositions are: ", CREATE_DISPOSITIONS.keySet()); - createDisposition = CREATE_DISPOSITIONS.get(this.getCreateDisposition().toUpperCase()); } if (!Strings.isNullOrEmpty(this.getWriteDisposition())) { checkNotNull( @@ -148,32 +143,6 @@ public void validate() { + "Invalid write disposition was specified. Available dispositions are: ", WRITE_DISPOSITIONS.keySet()); } - - // validate schema - if (!Strings.isNullOrEmpty(this.getJsonSchema())) { - // check if a TableSchema can be deserialized from the input schema string - checkNotNull(BigQueryHelpers.fromJsonString(this.getJsonSchema(), TableSchema.class)); - } else if (this.getUseBeamSchema() == null || !this.getUseBeamSchema()) { - // if no schema is provided, create disposition CREATE_NEVER has to be specified. - checkArgument( - createDisposition == CreateDisposition.CREATE_NEVER, - invalidConfigMessage - + "Create disposition is CREATE_IF_NEEDED, but no schema was provided."); - } - - // validation checks for streaming writes - checkArgument( - this.getNumStorageWriteApiStreams() == null || this.getNumStorageWriteApiStreams() > 0, - invalidConfigMessage + "When set, numStorageWriteApiStreams must be > 0, but was: %s", - this.getNumStorageWriteApiStreams()); - checkArgument( - this.getNumFileShards() == null || this.getNumFileShards() > 0, - invalidConfigMessage + "When set, numFileShards must be > 0, but was: %s", - this.getNumFileShards()); - checkArgument( - this.getTriggeringFrequencySeconds() == null || this.getTriggeringFrequencySeconds() > 0, - invalidConfigMessage + "When set, the trigger frequency must be > 0, but was: %s", - this.getTriggeringFrequencySeconds()); } /** @@ -184,13 +153,7 @@ public static Builder builder() { .Builder(); } - public abstract String getOutputTable(); - - @Nullable - public abstract String getJsonSchema(); - - @Nullable - public abstract Boolean getUseBeamSchema(); + public abstract String getTable(); @Nullable public abstract String getCreateDisposition(); @@ -198,12 +161,6 @@ public static Builder builder() { @Nullable public abstract String getWriteDisposition(); - @Nullable - public abstract Integer getNumStorageWriteApiStreams(); - - @Nullable - public abstract Integer getNumFileShards(); - @Nullable public abstract Integer getTriggeringFrequencySeconds(); @@ -213,20 +170,12 @@ public static Builder builder() { /** Builder for {@link BigQueryStorageWriteApiSchemaTransformConfiguration}. */ @AutoValue.Builder public abstract static class Builder { - public abstract Builder setOutputTable(String tableSpec); - - public abstract Builder setJsonSchema(String jsonSchema); - - public abstract Builder setUseBeamSchema(Boolean useBeamSchema); + public abstract Builder setTable(String tableSpec); public abstract Builder setCreateDisposition(String createDisposition); public abstract Builder setWriteDisposition(String writeDisposition); - public abstract Builder setNumStorageWriteApiStreams(Integer numStorageWriteApiStreams); - - public abstract Builder setNumFileShards(Integer numFileShards); - public abstract Builder setTriggeringFrequencySeconds(Integer seconds); public abstract Builder setUseAtLeastOnceSemantics(Boolean use); @@ -281,7 +230,14 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { BigQueryIO.Write write = createStorageWriteApiTransform(); - LOG.info("JSON SCHEMA: " + configuration.getJsonSchema()); + if (inputRows.isBounded() == IsBounded.UNBOUNDED) { + write = + write.withTriggeringFrequency( + configuration.getTriggeringFrequencySeconds() == null + ? DEFAULT_TRIGGERING_FREQUENCY + : Duration.standardSeconds(configuration.getTriggeringFrequencySeconds())); + } + WriteResult result = inputRows.apply(write); Schema errorSchema = @@ -313,20 +269,14 @@ BigQueryIO.Write createStorageWriteApiTransform() { && configuration.getUseAtLeastOnceSemantics() ? Method.STORAGE_API_AT_LEAST_ONCE : Method.STORAGE_WRITE_API; + BigQueryIO.Write write = BigQueryIO.write() - .to(configuration.getOutputTable()) + .to(configuration.getTable()) .withMethod(writeMethod) + .useBeamSchema() .withFormatFunction(BigQueryUtils.toTableRow()); - if (!Strings.isNullOrEmpty(configuration.getJsonSchema())) { - write = - write.withSchema( - BigQueryHelpers.fromJsonString(configuration.getJsonSchema(), TableSchema.class)); - } - if (configuration.getUseBeamSchema() != null && configuration.getUseBeamSchema()) { - write = write.useBeamSchema(); - } if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) { CreateDisposition createDisposition = BigQueryStorageWriteApiSchemaTransformConfiguration.CREATE_DISPOSITIONS.get( @@ -339,17 +289,6 @@ BigQueryIO.Write createStorageWriteApiTransform() { configuration.getWriteDisposition()); write = write.withWriteDisposition(writeDisposition); } - if (configuration.getNumStorageWriteApiStreams() != null) { - write = write.withNumStorageWriteApiStreams(configuration.getNumStorageWriteApiStreams()); - } - if (configuration.getNumFileShards() != null) { - write = write.withNumFileShards(configuration.getNumFileShards()); - } - if (configuration.getTriggeringFrequencySeconds() != null) { - write = - write.withTriggeringFrequency( - Duration.standardSeconds(configuration.getTriggeringFrequencySeconds())); - } if (this.testBigQueryServices != null) { write = write.withTestServices(testBigQueryServices); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java index dd188f15b177..c8e733c8458f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java @@ -22,11 +22,9 @@ import static org.junit.Assert.assertThrows; import java.time.LocalDateTime; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiPCollectionRowTupleTransform; import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration; import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; @@ -36,7 +34,6 @@ import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; -import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; @@ -95,16 +92,10 @@ public void testInvalidConfig() { List invalidConfigs = Arrays.asList( BigQueryStorageWriteApiSchemaTransformConfiguration.builder() - .setOutputTable("not_a_valid_table_spec"), + .setTable("not_a_valid_table_spec"), BigQueryStorageWriteApiSchemaTransformConfiguration.builder() - .setOutputTable("project:dataset.table") - .setCreateDisposition("INVALID_DISPOSITION"), - BigQueryStorageWriteApiSchemaTransformConfiguration.builder() - .setOutputTable("project:dataset.table") - .setJsonSchema("not a valid schema"), - BigQueryStorageWriteApiSchemaTransformConfiguration.builder() - .setOutputTable("create_table:without.schema") - .setUseBeamSchema(false)); + .setTable("project:dataset.table") + .setCreateDisposition("INVALID_DISPOSITION")); for (BigQueryStorageWriteApiSchemaTransformConfiguration.Builder config : invalidConfigs) { assertThrows( @@ -139,10 +130,7 @@ public PCollectionRowTuple runWithConfig( public void testSimpleWrite() throws Exception { String tableSpec = "project:dataset.simple_write"; BigQueryStorageWriteApiSchemaTransformConfiguration config = - BigQueryStorageWriteApiSchemaTransformConfiguration.builder() - .setOutputTable(tableSpec) - .setUseBeamSchema(true) - .build(); + BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(tableSpec).build(); runWithConfig(config); p.run().waitUntilFinish(); @@ -151,90 +139,4 @@ public void testSimpleWrite() throws Exception { assertEquals( ROWS.size(), fakeDatasetService.getAllRows("project", "dataset", "simple_write").size()); } - - @Test - public void testWithJsonSchema() throws Exception { - String tableSpec = "project:dataset.with_json_schema"; - - String jsonSchema = - "{" - + " \"fields\": [" - + " {" - + " \"name\": \"name\"," - + " \"type\": \"STRING\"," - + " \"mode\": \"REQUIRED\"" - + " }," - + " {" - + " \"name\": \"number\"," - + " \"type\": \"INTEGER\"," - + " \"mode\": \"REQUIRED\"" - + " }," - + " {" - + " \"name\": \"dt\"," - + " \"type\": \"DATETIME\"," - + " \"mode\": \"REQUIRED\"" - + " }" - + " ]" - + "}"; - - BigQueryStorageWriteApiSchemaTransformConfiguration config = - BigQueryStorageWriteApiSchemaTransformConfiguration.builder() - .setOutputTable(tableSpec) - .setJsonSchema(jsonSchema) - .build(); - - runWithConfig(config); - p.run().waitUntilFinish(); - - assertNotNull(fakeDatasetService.getTable(BigQueryHelpers.parseTableSpec(tableSpec))); - assertEquals( - ROWS.size(), - fakeDatasetService.getAllRows("project", "dataset", "with_json_schema").size()); - } - - @Test - public void testFailedRows() { - String tableSpec = "project:dataset.with_json_schema"; - - String jsonSchema = - "{" - + " \"fields\": [" - + " {" - + " \"name\": \"wrong_column\"," - + " \"type\": \"BOOLEAN\"," - + " \"mode\": \"REQUIRED\"" - + " }" - + " ]" - + "}"; - - BigQueryStorageWriteApiSchemaTransformConfiguration config = - BigQueryStorageWriteApiSchemaTransformConfiguration.builder() - .setOutputTable(tableSpec) - .setJsonSchema(jsonSchema) - .setUseBeamSchema(false) - .build(); - - PCollectionRowTuple result = runWithConfig(config); - PCollection failedRows = result.get("ERROR_ROWS"); - - Schema errorSchema = - Schema.of( - Field.of("failed_row", FieldType.STRING), Field.of("error_message", FieldType.STRING)); - - List expectedFailedRows = new ArrayList<>(); - for (Row row : ROWS) { - String failedTableRow = BigQueryUtils.toTableRow(row).toString(); - String errorMessage = - "org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto$SchemaTooNarrowException: " - + "TableRow contained unexpected field with name name not found in schema for root"; - expectedFailedRows.add( - Row.withSchema(errorSchema) - .withFieldValue("failed_row", failedTableRow) - .withFieldValue("error_message", errorMessage) - .build()); - } - - PAssert.that(failedRows).containsInAnyOrder(expectedFailedRows); - p.run().waitUntilFinish(); - } } From 7d243510a510b946d9e4be20d57e1c8206fbcee2 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Mon, 12 Dec 2022 19:43:14 +0000 Subject: [PATCH 14/15] use Long instead of Integer --- ...igQueryStorageWriteApiSchemaTransformProvider.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index 570e8487f0bf..50328228f0e6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -89,7 +89,7 @@ protected SchemaTransform from( @Override public String identifier() { - return String.format("beam:transform:org.apache.beam:bigquery_storage_write:v1"); + return String.format("beam:schematransform:org.apache.beam:bigquery_storage_write:v1"); } @Override @@ -162,7 +162,7 @@ public static Builder builder() { public abstract String getWriteDisposition(); @Nullable - public abstract Integer getTriggeringFrequencySeconds(); + public abstract Long getTriggeringFrequencySeconds(); @Nullable public abstract Boolean getUseAtLeastOnceSemantics(); @@ -176,7 +176,7 @@ public abstract static class Builder { public abstract Builder setWriteDisposition(String writeDisposition); - public abstract Builder setTriggeringFrequencySeconds(Integer seconds); + public abstract Builder setTriggeringFrequencySeconds(Long seconds); public abstract Builder setUseAtLeastOnceSemantics(Boolean use); @@ -231,11 +231,12 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { BigQueryIO.Write write = createStorageWriteApiTransform(); if (inputRows.isBounded() == IsBounded.UNBOUNDED) { + Long triggeringFrequency = configuration.getTriggeringFrequencySeconds(); write = write.withTriggeringFrequency( - configuration.getTriggeringFrequencySeconds() == null + (triggeringFrequency == null || triggeringFrequency <= 0) ? DEFAULT_TRIGGERING_FREQUENCY - : Duration.standardSeconds(configuration.getTriggeringFrequencySeconds())); + : Duration.standardSeconds(triggeringFrequency)); } WriteResult result = inputRows.apply(write); From c8e9b16a0e190be96bb7d73dcde6863deedbd454 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Tue, 20 Dec 2022 18:06:40 +0000 Subject: [PATCH 15/15] address comments: use autosharding, 5s commit interval, change input and output tags --- ...ryStorageWriteApiSchemaTransformProvider.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index 50328228f0e6..5f7851bba519 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -72,9 +72,9 @@ @AutoService(SchemaTransformProvider.class) public class BigQueryStorageWriteApiSchemaTransformProvider extends TypedSchemaTransformProvider { - private static final Duration DEFAULT_TRIGGERING_FREQUENCY = Duration.standardSeconds(60); - private static final String INPUT_ROWS_TAG = "INPUT_ROWS"; - private static final String OUTPUT_ERRORS_TAG = "ERROR_ROWS"; + private static final Duration DEFAULT_TRIGGERING_FREQUENCY = Duration.standardSeconds(5); + private static final String INPUT_ROWS_TAG = "input"; + private static final String OUTPUT_ERRORS_TAG = "errors"; @Override protected Class configurationClass() { @@ -233,10 +233,12 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { if (inputRows.isBounded() == IsBounded.UNBOUNDED) { Long triggeringFrequency = configuration.getTriggeringFrequencySeconds(); write = - write.withTriggeringFrequency( - (triggeringFrequency == null || triggeringFrequency <= 0) - ? DEFAULT_TRIGGERING_FREQUENCY - : Duration.standardSeconds(triggeringFrequency)); + write + .withAutoSharding() + .withTriggeringFrequency( + (triggeringFrequency == null || triggeringFrequency <= 0) + ? DEFAULT_TRIGGERING_FREQUENCY + : Duration.standardSeconds(triggeringFrequency)); } WriteResult result = inputRows.apply(write);