Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageApiInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
Expand Down Expand Up @@ -94,7 +95,7 @@ protected SchemaTransform from(

@Override
public String identifier() {
return String.format("beam:schematransform:org.apache.beam:bigquery_storage_write:v1");
return String.format("beam:schematransform:org.apache.beam:bigquery_storage_write:v2");
}

@Override
Expand Down Expand Up @@ -125,6 +126,24 @@ public abstract static class BigQueryStorageWriteApiSchemaTransformConfiguration
.put(WriteDisposition.WRITE_APPEND.name(), WriteDisposition.WRITE_APPEND)
.build();

@AutoValue
public abstract static class ErrorHandling {
@SchemaFieldDescription("The name of the output PCollection containing failed writes.")
public abstract String getOutput();

public static Builder builder() {
return new AutoValue_BigQueryStorageWriteApiSchemaTransformProvider_BigQueryStorageWriteApiSchemaTransformConfiguration_ErrorHandling
.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setOutput(String output);

public abstract ErrorHandling build();
}
}

public void validate() {
String invalidConfigMessage = "Invalid BigQuery Storage Write configuration: ";

Expand All @@ -151,6 +170,12 @@ public void validate() {
this.getWriteDisposition(),
WRITE_DISPOSITIONS.keySet());
}

if (this.getErrorHandling() != null) {
checkArgument(
!Strings.isNullOrEmpty(this.getErrorHandling().getOutput()),
invalidConfigMessage + "Output must not be empty if error handling specified.");
}
}

/**
Expand Down Expand Up @@ -198,6 +223,10 @@ public static Builder builder() {
@Nullable
public abstract Boolean getAutoSharding();

@SchemaFieldDescription("This option specifies whether and where to output unwritable rows.")
@Nullable
public abstract ErrorHandling getErrorHandling();

/** Builder for {@link BigQueryStorageWriteApiSchemaTransformConfiguration}. */
@AutoValue.Builder
public abstract static class Builder {
Expand All @@ -214,6 +243,8 @@ public abstract static class Builder {

public abstract Builder setAutoSharding(Boolean autoSharding);

public abstract Builder setErrorHandling(ErrorHandling errorHandling);

/** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration} instance. */
public abstract BigQueryStorageWriteApiSchemaTransformProvider
.BigQueryStorageWriteApiSchemaTransformConfiguration
Expand Down Expand Up @@ -244,7 +275,7 @@ public void setBigQueryServices(BigQueryServices testBigQueryServices) {

// A generic counter for PCollection of Row. Will be initialized with the given
// name argument. Performs element-wise counter of the input PCollection.
private static class ElementCounterFn extends DoFn<Row, Row> {
private static class ElementCounterFn<T> extends DoFn<T, T> {

private Counter bqGenericElementCounter;
private Long elementsInBundle = 0L;
Expand All @@ -267,6 +298,18 @@ public void finish(FinishBundleContext c) {
}
}

private static class FailOnError extends DoFn<BigQueryStorageApiInsertError, Void> {
@ProcessElement
public void process(ProcessContext c) {
throw new RuntimeException(c.element().getErrorMessage());
}
}

private static class NoOutputDoFn<T> extends DoFn<T, Row> {
@ProcessElement
public void process(ProcessContext c) {}
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
// Check that the input exists
Expand Down Expand Up @@ -294,53 +337,55 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
WriteResult result =
inputRows
.apply(
"element-count", ParDo.of(new ElementCounterFn("BigQuery-write-element-counter")))
"element-count",
ParDo.of(new ElementCounterFn<Row>("BigQuery-write-element-counter")))
.setRowSchema(inputSchema)
.apply(write);

Schema rowSchema = inputRows.getSchema();
Schema errorSchema =
Schema.of(
Field.of("failed_row", FieldType.row(rowSchema)),
Field.of("error_message", FieldType.STRING));

// Failed rows
PCollection<Row> failedRows =
result
.getFailedStorageApiInserts()
.apply(
"Construct failed rows",
MapElements.into(TypeDescriptors.rows())
.via(
(storageError) ->
BigQueryUtils.toBeamRow(rowSchema, storageError.getRow())))
.setRowSchema(rowSchema);

// Failed rows with error message
PCollection<Row> failedRowsWithErrors =
// Give something that can be followed.
PCollection<Row> postWrite =
result
.getFailedStorageApiInserts()
.apply(
"Construct failed rows and errors",
MapElements.into(TypeDescriptors.rows())
.via(
(storageError) ->
Row.withSchema(errorSchema)
.withFieldValue("error_message", storageError.getErrorMessage())
.withFieldValue(
"failed_row",
BigQueryUtils.toBeamRow(rowSchema, storageError.getRow()))
.build()))
.setRowSchema(errorSchema);

PCollection<Row> failedRowsOutput =
failedRows
.apply("error-count", ParDo.of(new ElementCounterFn("BigQuery-write-error-counter")))
.setRowSchema(rowSchema);

return PCollectionRowTuple.of(FAILED_ROWS_TAG, failedRowsOutput)
.and(FAILED_ROWS_WITH_ERRORS_TAG, failedRowsWithErrors)
.and("errors", failedRowsWithErrors);
.apply("post-write", ParDo.of(new NoOutputDoFn<BigQueryStorageApiInsertError>()))
.setRowSchema(Schema.of());

if (configuration.getErrorHandling() == null) {
result
.getFailedStorageApiInserts()
.apply("Error on failed inserts", ParDo.of(new FailOnError()));
return PCollectionRowTuple.of("post_write", postWrite);
} else {
result
.getFailedStorageApiInserts()
.apply(
"error-count",
ParDo.of(
new ElementCounterFn<BigQueryStorageApiInsertError>(
"BigQuery-write-error-counter")));

// Failed rows with error message
Schema errorSchema =
Schema.of(
Field.of("failed_row", FieldType.row(inputSchema)),
Field.of("error_message", FieldType.STRING));
PCollection<Row> failedRowsWithErrors =
result
.getFailedStorageApiInserts()
.apply(
"Construct failed rows and errors",
MapElements.into(TypeDescriptors.rows())
.via(
(storageError) ->
Row.withSchema(errorSchema)
.withFieldValue("error_message", storageError.getErrorMessage())
.withFieldValue(
"failed_row",
BigQueryUtils.toBeamRow(inputSchema, storageError.getRow()))
.build()))
.setRowSchema(errorSchema);
return PCollectionRowTuple.of("post_write", postWrite)
.and(configuration.getErrorHandling().getOutput(), failedRowsWithErrors);
}
}

BigQueryIO.Write<Row> createStorageWriteApiTransform() {
Expand Down
96 changes: 62 additions & 34 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ def chain_after(result):
from apache_beam.transforms.util import ReshufflePerKey
from apache_beam.transforms.window import GlobalWindows
from apache_beam.typehints.row_type import RowTypeConstraint
from apache_beam.typehints.schemas import schema_from_element_type
from apache_beam.utils import retry
from apache_beam.utils.annotations import deprecated

Expand Down Expand Up @@ -2148,6 +2149,7 @@ def expand(self, pcoll):
failed_rows=outputs[BigQueryWriteFn.FAILED_ROWS],
failed_rows_with_errors=outputs[
BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS])

elif method_to_use == WriteToBigQuery.Method.FILE_LOADS:
if self._temp_file_format == bigquery_tools.FileFormat.AVRO:
if self.schema == SCHEMA_AUTODETECT:
Expand Down Expand Up @@ -2212,33 +2214,45 @@ def find_in_nested_dict(schema):
BigQueryBatchFileLoads.DESTINATION_FILE_PAIRS],
destination_copy_jobid_pairs=output[
BigQueryBatchFileLoads.DESTINATION_COPY_JOBID_PAIRS])
else:
# Storage Write API

elif method_to_use == WriteToBigQuery.Method.STORAGE_WRITE_API:
if self.schema is None:
raise AttributeError(
"A schema is required in order to prepare rows"
"for writing with STORAGE_WRITE_API.")
if callable(self.schema):
try:
schema = schema_from_element_type(pcoll.element_type)
is_rows = True
except TypeError as exn:
raise ValueError(
"A schema is required in order to prepare rows"
"for writing with STORAGE_WRITE_API.") from exn
elif callable(self.schema):
raise NotImplementedError(
"Writing to dynamic destinations is not"
"supported for this write method.")
elif isinstance(self.schema, vp.ValueProvider):
schema = self.schema.get()
is_rows = False
else:
schema = self.schema
is_rows = False

table = bigquery_tools.get_hashable_destination(self.table_reference)
# None type is not supported
triggering_frequency = self.triggering_frequency or 0
# SchemaTransform expects Beam Rows, so map to Rows first
if is_rows:
input_beam_rows = pcoll
else:
input_beam_rows = (
pcoll
| "Convert dict to Beam Row" >> beam.Map(
lambda row: bigquery_tools.beam_row_from_dict(row, schema)
).with_output_types(
RowTypeConstraint.from_fields(
bigquery_tools.get_beam_typehints_from_tableschema(schema)))
)
output_beam_rows = (
pcoll
| "Convert dict to Beam Row" >>
beam.Map(lambda row: bigquery_tools.beam_row_from_dict(row, schema)).
with_output_types(
RowTypeConstraint.from_fields(
bigquery_tools.get_beam_typehints_from_tableschema(schema)))
| "StorageWriteToBigQuery" >> StorageWriteToBigQuery(
input_beam_rows
| StorageWriteToBigQuery(
table=table,
create_disposition=self.create_disposition,
write_disposition=self.write_disposition,
Expand All @@ -2247,23 +2261,31 @@ def find_in_nested_dict(schema):
with_auto_sharding=self.with_auto_sharding,
expansion_service=self.expansion_service))

# return back from Beam Rows to Python dict elements
failed_rows = (
output_beam_rows[StorageWriteToBigQuery.FAILED_ROWS]
| beam.Map(lambda row: row.as_dict()))
failed_rows_with_errors = (
output_beam_rows[StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS]
| beam.Map(
lambda row: {
"error_message": row.error_message,
"failed_row": row.failed_row.as_dict()
}))
if is_rows:
failed_rows = output_beam_rows[StorageWriteToBigQuery.FAILED_ROWS]
failed_rows_with_errors = output_beam_rows[
StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS]
else:
# return back from Beam Rows to Python dict elements
failed_rows = (
output_beam_rows[StorageWriteToBigQuery.FAILED_ROWS]
| beam.Map(lambda row: row.as_dict()))
failed_rows_with_errors = (
output_beam_rows[StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS]
| beam.Map(
lambda row: {
"error_message": row.error_message,
"failed_row": row.failed_row.as_dict()
}))

return WriteResult(
method=WriteToBigQuery.Method.STORAGE_WRITE_API,
failed_rows=failed_rows,
failed_rows_with_errors=failed_rows_with_errors)

else:
raise ValueError(f"Unsupported method {method_to_use}")

def display_data(self):
res = {}
if self.table_reference is not None and isinstance(self.table_reference,
Expand Down Expand Up @@ -2487,7 +2509,7 @@ class StorageWriteToBigQuery(PTransform):

Experimental; no backwards compatibility guarantees.
"""
URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v2"
FAILED_ROWS = "FailedRows"
FAILED_ROWS_WITH_ERRORS = "FailedRowsWithErrors"

Expand Down Expand Up @@ -2552,11 +2574,17 @@ def expand(self, input):
triggeringFrequencySeconds=self._triggering_frequency,
useAtLeastOnceSemantics=self._use_at_least_once,
writeDisposition=self._write_disposition,
)
errorHandling={
'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS
})

input_tag = self.schematransform_config.inputs[0]

return {input_tag: input} | external_storage_write
result = {input_tag: input} | external_storage_write
result[StorageWriteToBigQuery.FAILED_ROWS] = result[
StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS] | beam.Map(
lambda row_and_error: row_and_error[0])
return result


class ReadFromBigQuery(PTransform):
Expand Down Expand Up @@ -2791,14 +2819,14 @@ def _expand_direct_read(self, pcoll):
else:
project_id = pcoll.pipeline.options.view_as(GoogleCloudOptions).project

pipeline_details = {}
if temp_table_ref is not None:
pipeline_details['temp_table_ref'] = temp_table_ref
elif project_id is not None:
pipeline_details['project_id'] = project_id
pipeline_details['bigquery_dataset_labels'] = self.bigquery_dataset_labels

def _get_pipeline_details(unused_elm):
pipeline_details = {}
if temp_table_ref is not None:
pipeline_details['temp_table_ref'] = temp_table_ref
elif project_id is not None:
pipeline_details['project_id'] = project_id
pipeline_details[
'bigquery_dataset_labels'] = self.bigquery_dataset_labels
return pipeline_details

project_to_cleanup_pcoll = beam.pvalue.AsList(
Expand Down
5 changes: 4 additions & 1 deletion sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1330,7 +1330,10 @@ def transform_to_runner_api(

# Iterate over inputs and outputs by sorted key order, so that ids are
# consistently generated for multiple runs of the same pipeline.
transform_spec = transform_to_runner_api(self.transform, context)
try:
transform_spec = transform_to_runner_api(self.transform, context)
except Exception as exn:
raise RuntimeError(f'Unable to translate {self.full_label}') from exn
environment_id = self.environment_id
transform_urn = transform_spec.urn if transform_spec else None
if (not environment_id and
Expand Down
Loading