Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@
.withValueMapper(new TextMessageMapper());
```
* Coders in Python are expected to inherit from Coder. ([BEAM-14351](https://issues.apache.org/jira/browse/BEAM-14351)).
* `FailedRows` key of the errors dictionary returned by `beam.io.WriteToBigQuery` transform now returns an array of 3-element tuples `(destination_table, row, reason)` instead of `(destination_table, row)`. ([BEAM-14383](https://issues.apache.org/jira/browse/BEAM-14383)).

## Deprecations

Expand Down
8 changes: 3 additions & 5 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1754,8 +1754,7 @@ def _flush_batch(self, destination):
ignore_unknown_values=self.ignore_unknown_columns)
self.batch_latency_metric.update((time.time() - start) * 1000)

failed_rows = [(rows[entry['index']], entry["errors"])
for entry in errors]
failed_rows = [rows[entry['index']] for entry in errors]
should_retry = any(
RetryStrategy.should_retry(
self._retry_strategy, entry['errors'][0]['reason'])
Expand Down Expand Up @@ -1787,9 +1786,8 @@ def _flush_batch(self, destination):
return [
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
GlobalWindows.windowed_value((destination, row, row_errors)))
for row,
row_errors in failed_rows
GlobalWindows.windowed_value((destination, row)))
for row in failed_rows
]


Expand Down
91 changes: 0 additions & 91 deletions sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import base64
import datetime
import json
import logging
import random
import time
Expand Down Expand Up @@ -374,96 +373,6 @@ def test_big_query_write_without_schema(self):
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
temp_file_format=FileFormat.JSON))

@pytest.mark.it_postcommit
def test_big_query_write_insert_errors_reporting(self):
"""
Test that errors returned by beam.io.WriteToBigQuery
contain both the failed rows amd the reason for it failing.
"""
table_name = 'python_write_table'
table_id = '{}.{}'.format(self.dataset_id, table_name)

errors_table_name = table_name + '_error_records'
errors_table_id = '{}.{}'.format(self.dataset_id, errors_table_name)

input_data = [{
'number': 1,
'str': 'some_string',
}, {
'number': 2
},
{
'number': 3,
'str': 'some_string',
'additional_field_str': 'some_string',
}]

table_schema = {
"fields": [{
"name": "number", "type": "INTEGER", 'mode': 'REQUIRED'
}, {
"name": "str", "type": "STRING", 'mode': 'REQUIRED'
}]
}

errors_table_schema = {
"fields": [{
'name': 'table', 'type': 'STRING', 'mode': 'REQUIRED'
}, {
'name': 'reason', 'type': 'STRING', 'mode': 'NULLABLE'
}, {
'name': 'row_json', 'type': 'STRING', 'mode': 'REQUIRED'
}]
}

pipeline_verifiers = [
BigqueryFullResultMatcher(
project=self.project,
query="SELECT number, str FROM %s" % table_id,
data=[(1, 'some_string')]),
BigqueryFullResultMatcher(
project=self.project,
query="SELECT table, reason, row_json FROM %s" % errors_table_id,
data=
[(
table_id,
'[{"reason": "invalid", "location": "", "debugInfo": "", \
"message": "Missing required field: Msg_0_CLOUD_QUERY_TABLE.str."}]',
'{"number": 2}'),
(
table_id,
'[{"reason": "invalid", "location": "additional_field_str", \
"debugInfo": "", "message": "no such field: additional_field_str."}]',
'{"number": 3, "str": "some_string", "additional_field_str": \
"some_string"}')])
]

args = self.test_pipeline.get_full_options_as_args(
on_success_matcher=hc.all_of(*pipeline_verifiers))

with beam.Pipeline(argv=args) as p:
# pylint: disable=expression-not-assigned
errors = (
p | 'create' >> beam.Create(input_data)
| 'write' >> beam.io.WriteToBigQuery(
table_id,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
(
errors["FailedRows"]
| 'ParseErrors' >> beam.Map(
lambda err: {
"table": err[0],
"reason": json.dumps(err[2]),
"row_json": json.dumps(err[1])
})
| 'WriteErrors' >> beam.io.WriteToBigQuery(
errors_table_id,
schema=errors_table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))

@pytest.mark.it_postcommit
@parameterized.expand([
param(file_format=FileFormat.AVRO),
Expand Down