diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
new file mode 100644
index 000000000000..5f7851bba519
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.providers;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
+import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Duration;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery Storage Write API jobs
+ * configured via {@link BigQueryStorageWriteApiSchemaTransformConfiguration}.
+ *
+ *
Internal only: This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@Experimental(Kind.SCHEMAS)
+@AutoService(SchemaTransformProvider.class)
+public class BigQueryStorageWriteApiSchemaTransformProvider
+ extends TypedSchemaTransformProvider {
+ private static final Duration DEFAULT_TRIGGERING_FREQUENCY = Duration.standardSeconds(5);
+ private static final String INPUT_ROWS_TAG = "input";
+ private static final String OUTPUT_ERRORS_TAG = "errors";
+
+ @Override
+ protected Class configurationClass() {
+ return BigQueryStorageWriteApiSchemaTransformConfiguration.class;
+ }
+
+ @Override
+ protected SchemaTransform from(
+ BigQueryStorageWriteApiSchemaTransformConfiguration configuration) {
+ return new BigQueryStorageWriteApiSchemaTransform(configuration);
+ }
+
+ @Override
+ public String identifier() {
+ return String.format("beam:schematransform:org.apache.beam:bigquery_storage_write:v1");
+ }
+
+ @Override
+ public List inputCollectionNames() {
+ return Collections.singletonList(INPUT_ROWS_TAG);
+ }
+
+ @Override
+ public List outputCollectionNames() {
+ return Collections.singletonList(OUTPUT_ERRORS_TAG);
+ }
+
+ /** Configuration for writing to BigQuery with Storage Write API. */
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class BigQueryStorageWriteApiSchemaTransformConfiguration {
+ static final Map CREATE_DISPOSITIONS =
+ ImmutableMap.builder()
+ .put(CreateDisposition.CREATE_IF_NEEDED.name(), CreateDisposition.CREATE_IF_NEEDED)
+ .put(CreateDisposition.CREATE_NEVER.name(), CreateDisposition.CREATE_NEVER)
+ .build();
+
+ static final Map WRITE_DISPOSITIONS =
+ ImmutableMap.builder()
+ .put(WriteDisposition.WRITE_TRUNCATE.name(), WriteDisposition.WRITE_TRUNCATE)
+ .put(WriteDisposition.WRITE_EMPTY.name(), WriteDisposition.WRITE_EMPTY)
+ .put(WriteDisposition.WRITE_APPEND.name(), WriteDisposition.WRITE_APPEND)
+ .build();
+
+ public void validate() {
+ String invalidConfigMessage = "Invalid BigQuery Storage Write configuration: ";
+
+ // validate output table spec
+ checkArgument(
+ !Strings.isNullOrEmpty(this.getTable()),
+ invalidConfigMessage + "Table spec for a BigQuery Write must be specified.");
+ checkNotNull(BigQueryHelpers.parseTableSpec(this.getTable()));
+
+ // validate create and write dispositions
+ if (!Strings.isNullOrEmpty(this.getCreateDisposition())) {
+ checkArgument(
+ CREATE_DISPOSITIONS.get(this.getCreateDisposition().toUpperCase()) != null,
+ invalidConfigMessage
+ + "Invalid create disposition was specified. Available dispositions are: ",
+ CREATE_DISPOSITIONS.keySet());
+ }
+ if (!Strings.isNullOrEmpty(this.getWriteDisposition())) {
+ checkNotNull(
+ WRITE_DISPOSITIONS.get(this.getWriteDisposition().toUpperCase()),
+ invalidConfigMessage
+ + "Invalid write disposition was specified. Available dispositions are: ",
+ WRITE_DISPOSITIONS.keySet());
+ }
+ }
+
+ /**
+ * Instantiates a {@link BigQueryStorageWriteApiSchemaTransformConfiguration.Builder} instance.
+ */
+ public static Builder builder() {
+ return new AutoValue_BigQueryStorageWriteApiSchemaTransformProvider_BigQueryStorageWriteApiSchemaTransformConfiguration
+ .Builder();
+ }
+
+ public abstract String getTable();
+
+ @Nullable
+ public abstract String getCreateDisposition();
+
+ @Nullable
+ public abstract String getWriteDisposition();
+
+ @Nullable
+ public abstract Long getTriggeringFrequencySeconds();
+
+ @Nullable
+ public abstract Boolean getUseAtLeastOnceSemantics();
+
+ /** Builder for {@link BigQueryStorageWriteApiSchemaTransformConfiguration}. */
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setTable(String tableSpec);
+
+ public abstract Builder setCreateDisposition(String createDisposition);
+
+ public abstract Builder setWriteDisposition(String writeDisposition);
+
+ public abstract Builder setTriggeringFrequencySeconds(Long seconds);
+
+ public abstract Builder setUseAtLeastOnceSemantics(Boolean use);
+
+ /** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration} instance. */
+ public abstract BigQueryStorageWriteApiSchemaTransformProvider
+ .BigQueryStorageWriteApiSchemaTransformConfiguration
+ build();
+ }
+ }
+
+ /**
+ * A {@link SchemaTransform} for BigQuery Storage Write API, configured with {@link
+ * BigQueryStorageWriteApiSchemaTransformConfiguration} and instantiated by {@link
+ * BigQueryStorageWriteApiSchemaTransformProvider}.
+ */
+ private static class BigQueryStorageWriteApiSchemaTransform implements SchemaTransform {
+ private final BigQueryStorageWriteApiSchemaTransformConfiguration configuration;
+
+ BigQueryStorageWriteApiSchemaTransform(
+ BigQueryStorageWriteApiSchemaTransformConfiguration configuration) {
+ configuration.validate();
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PTransform buildTransform() {
+ return new BigQueryStorageWriteApiPCollectionRowTupleTransform(configuration);
+ }
+ }
+
+ static class BigQueryStorageWriteApiPCollectionRowTupleTransform
+ extends PTransform {
+ private final BigQueryStorageWriteApiSchemaTransformConfiguration configuration;
+ private BigQueryServices testBigQueryServices = null;
+
+ BigQueryStorageWriteApiPCollectionRowTupleTransform(
+ BigQueryStorageWriteApiSchemaTransformConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ @VisibleForTesting
+ public void setBigQueryServices(BigQueryServices testBigQueryServices) {
+ this.testBigQueryServices = testBigQueryServices;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ // Check that the input exists
+ checkArgument(input.has(INPUT_ROWS_TAG), "Missing expected input tag: %s", INPUT_ROWS_TAG);
+ PCollection inputRows = input.get(INPUT_ROWS_TAG);
+
+ BigQueryIO.Write write = createStorageWriteApiTransform();
+
+ if (inputRows.isBounded() == IsBounded.UNBOUNDED) {
+ Long triggeringFrequency = configuration.getTriggeringFrequencySeconds();
+ write =
+ write
+ .withAutoSharding()
+ .withTriggeringFrequency(
+ (triggeringFrequency == null || triggeringFrequency <= 0)
+ ? DEFAULT_TRIGGERING_FREQUENCY
+ : Duration.standardSeconds(triggeringFrequency));
+ }
+
+ WriteResult result = inputRows.apply(write);
+
+ Schema errorSchema =
+ Schema.of(
+ Field.of("failed_row", FieldType.STRING),
+ Field.of("error_message", FieldType.STRING));
+
+ // Errors consisting of failed rows along with their error message
+ PCollection errorRows =
+ result
+ .getFailedStorageApiInserts()
+ .apply(
+ "Extract Errors",
+ MapElements.into(TypeDescriptor.of(Row.class))
+ .via(
+ (storageError) ->
+ Row.withSchema(errorSchema)
+ .withFieldValue("error_message", storageError.getErrorMessage())
+ .withFieldValue("failed_row", storageError.getRow().toString())
+ .build()))
+ .setRowSchema(errorSchema);
+
+ return PCollectionRowTuple.of(OUTPUT_ERRORS_TAG, errorRows);
+ }
+
+ BigQueryIO.Write createStorageWriteApiTransform() {
+ Method writeMethod =
+ configuration.getUseAtLeastOnceSemantics() != null
+ && configuration.getUseAtLeastOnceSemantics()
+ ? Method.STORAGE_API_AT_LEAST_ONCE
+ : Method.STORAGE_WRITE_API;
+
+ BigQueryIO.Write write =
+ BigQueryIO.write()
+ .to(configuration.getTable())
+ .withMethod(writeMethod)
+ .useBeamSchema()
+ .withFormatFunction(BigQueryUtils.toTableRow());
+
+ if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
+ CreateDisposition createDisposition =
+ BigQueryStorageWriteApiSchemaTransformConfiguration.CREATE_DISPOSITIONS.get(
+ configuration.getCreateDisposition());
+ write = write.withCreateDisposition(createDisposition);
+ }
+ if (!Strings.isNullOrEmpty(configuration.getWriteDisposition())) {
+ WriteDisposition writeDisposition =
+ BigQueryStorageWriteApiSchemaTransformConfiguration.WRITE_DISPOSITIONS.get(
+ configuration.getWriteDisposition());
+ write = write.withWriteDisposition(writeDisposition);
+ }
+
+ if (this.testBigQueryServices != null) {
+ write = write.withTestServices(testBigQueryServices);
+ }
+
+ return write;
+ }
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
new file mode 100644
index 000000000000..c8e733c8458f
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.providers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
+import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiPCollectionRowTupleTransform;
+import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
+import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
+import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
+import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class BigQueryStorageWriteApiSchemaTransformProviderTest {
+ private FakeDatasetService fakeDatasetService = new FakeDatasetService();
+ private FakeJobService fakeJobService = new FakeJobService();
+ private FakeBigQueryServices fakeBigQueryServices =
+ new FakeBigQueryServices()
+ .withDatasetService(fakeDatasetService)
+ .withJobService(fakeJobService);
+
+ private static final Schema SCHEMA =
+ Schema.of(
+ Field.of("name", FieldType.STRING),
+ Field.of("number", FieldType.INT64),
+ Field.of("dt", FieldType.logicalType(SqlTypes.DATETIME)));
+
+ private static final List ROWS =
+ Arrays.asList(
+ Row.withSchema(SCHEMA)
+ .withFieldValue("name", "a")
+ .withFieldValue("number", 1L)
+ .withFieldValue("dt", LocalDateTime.parse("2000-01-01T00:00:00"))
+ .build(),
+ Row.withSchema(SCHEMA)
+ .withFieldValue("name", "b")
+ .withFieldValue("number", 2L)
+ .withFieldValue("dt", LocalDateTime.parse("2000-01-02T00:00:00"))
+ .build(),
+ Row.withSchema(SCHEMA)
+ .withFieldValue("name", "c")
+ .withFieldValue("number", 3L)
+ .withFieldValue("dt", LocalDateTime.parse("2000-01-03T00:00:00"))
+ .build());
+
+ @Rule public final transient TestPipeline p = TestPipeline.create();
+
+ @Before
+ public void setUp() throws Exception {
+ FakeDatasetService.setUp();
+
+ fakeDatasetService.createDataset("project", "dataset", "", "", null);
+ }
+
+ @Test
+ public void testInvalidConfig() {
+ List invalidConfigs =
+ Arrays.asList(
+ BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
+ .setTable("not_a_valid_table_spec"),
+ BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
+ .setTable("project:dataset.table")
+ .setCreateDisposition("INVALID_DISPOSITION"));
+
+ for (BigQueryStorageWriteApiSchemaTransformConfiguration.Builder config : invalidConfigs) {
+ assertThrows(
+ Exception.class,
+ () -> {
+ config.build().validate();
+ });
+ }
+ }
+
+ public PCollectionRowTuple runWithConfig(
+ BigQueryStorageWriteApiSchemaTransformConfiguration config) {
+ BigQueryStorageWriteApiSchemaTransformProvider provider =
+ new BigQueryStorageWriteApiSchemaTransformProvider();
+
+ BigQueryStorageWriteApiPCollectionRowTupleTransform writeRowTupleTransform =
+ (BigQueryStorageWriteApiPCollectionRowTupleTransform)
+ provider.from(config).buildTransform();
+
+ writeRowTupleTransform.setBigQueryServices(fakeBigQueryServices);
+ String tag = provider.inputCollectionNames().get(0);
+
+ PCollection rows = p.apply(Create.of(ROWS).withRowSchema(SCHEMA));
+
+ PCollectionRowTuple input = PCollectionRowTuple.of(tag, rows);
+ PCollectionRowTuple result = input.apply(writeRowTupleTransform);
+
+ return result;
+ }
+
+ @Test
+ public void testSimpleWrite() throws Exception {
+ String tableSpec = "project:dataset.simple_write";
+ BigQueryStorageWriteApiSchemaTransformConfiguration config =
+ BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(tableSpec).build();
+
+ runWithConfig(config);
+ p.run().waitUntilFinish();
+
+ assertNotNull(fakeDatasetService.getTable(BigQueryHelpers.parseTableSpec(tableSpec)));
+ assertEquals(
+ ROWS.size(), fakeDatasetService.getAllRows("project", "dataset", "simple_write").size());
+ }
+}