From 996f2ac12035e3e9d99c6d7f91a555c1e27dda46 Mon Sep 17 00:00:00 2001 From: Firlej <25161992+Firlej@users.noreply.github.com> Date: Sat, 30 Apr 2022 18:25:13 +0200 Subject: [PATCH 1/5] [BEAM-14383] - add row_errors to returned rows when inserting to BigQuery --- sdks/python/apache_beam/io/gcp/bigquery.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index fa47eb9db017..2a226c4998f1 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1748,7 +1748,8 @@ def _flush_batch(self, destination): ignore_unknown_values=self.ignore_unknown_columns) self.batch_latency_metric.update((time.time() - start) * 1000) - failed_rows = [rows[entry['index']] for entry in errors] + failed_rows = [(rows[entry['index']], entry["errors"]) + for entry in errors] should_retry = any( RetryStrategy.should_retry( self._retry_strategy, entry['errors'][0]['reason']) @@ -1780,8 +1781,9 @@ def _flush_batch(self, destination): return [ pvalue.TaggedOutput( BigQueryWriteFn.FAILED_ROWS, - GlobalWindows.windowed_value((destination, row))) - for row in failed_rows + GlobalWindows.windowed_value((destination, row, row_errors))) + for row, + row_errors in failed_rows ] From 01a6de36d39a887d0dbbabbc4f7e682ca5a0e0d0 Mon Sep 17 00:00:00 2001 From: Firlej <25161992+Firlej@users.noreply.github.com> Date: Wed, 4 May 2022 15:40:33 +0200 Subject: [PATCH 2/5] [BEAM-14383] add test checking proper insert of _error_records --- .../io/gcp/bigquery_write_it_test.py | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index dd2283eb71d6..686e6d05e73c 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -26,6 +26,7 @@ import random import time import unittest +import json from decimal import Decimal import hamcrest as hc @@ -373,6 +374,92 @@ def test_big_query_write_without_schema(self): write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, temp_file_format=FileFormat.JSON)) + @pytest.mark.it_postcommit + def test_big_query_write_insert_errors_reporting(self): + table_name = 'python_write_table' + table_id = '{}.{}'.format(self.dataset_id, table_name) + + errors_table_name = table_name + '_error_records' + errors_table_id = '{}.{}'.format(self.dataset_id, errors_table_name) + + input_data = [{ + 'number': 1, + 'str': 'some_string', + }, { + 'number': 2 + }, { + 'number': 3, + 'str': 'some_string', + 'additional_field_str': 'some_string', + }] + + table_schema = { + "fields": [{ + "name": "number", "type": "INTEGER", 'mode': 'REQUIRED' + }, { + "name": "str", "type": "STRING", 'mode': 'REQUIRED' + }] + } + + errors_table_schema = { + "fields": [{ + 'name': 'table', 'type': 'STRING', 'mode': 'REQUIRED' + }, { + 'name': 'reason', 'type': 'STRING', 'mode': 'NULLABLE' + }, { + 'name': 'row_json', 'type': 'STRING', 'mode': 'REQUIRED' + }] + } + + pipeline_verifiers = [ + BigqueryFullResultMatcher( + project=self.project, + query="SELECT number, str FROM %s" % table_id, + data=[( + 1, + 'some_string' + )]), + BigqueryFullResultMatcher( + project=self.project, + query="SELECT table, reason, row_json FROM %s" % errors_table_id, + data=[( + table_id, + '[{"reason": "invalid", "location": "", "debugInfo": "", \ +"message": "Missing required field: Msg_0_CLOUD_QUERY_TABLE.str."}]', + '{"number": 2}' + ), ( + table_id, + '[{"reason": "invalid", "location": "additional_field_str", \ +"debugInfo": "", "message": "no such field: additional_field_str."}]', + '{"number": 3, "str": "some_string", "additional_field_str": \ +"some_string"}' + )]) + ] + + args = self.test_pipeline.get_full_options_as_args( + on_success_matcher=hc.all_of(*pipeline_verifiers)) + + with beam.Pipeline(argv=args) as p: + # pylint: disable=expression-not-assigned + errors = ( + p | 'create' >> beam.Create(input_data) + | 'write' >> beam.io.WriteToBigQuery( + table_id, + schema=table_schema, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY)) + ( + errors["FailedRows"] + | 'ParseErrors' >> beam.Map(lambda err: { + "table": err[0], + "reason": json.dumps(err[2]), + "row_json": json.dumps(err[1])}) + | 'WriteErrors' >> beam.io.WriteToBigQuery( + errors_table_id, + schema=errors_table_schema, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY)) + @pytest.mark.it_postcommit @parameterized.expand([ param(file_format=FileFormat.AVRO), From ab8dc748b443c49ee71fdcc1b75122ecf422bcc9 Mon Sep 17 00:00:00 2001 From: Firlej <25161992+Firlej@users.noreply.github.com> Date: Wed, 4 May 2022 15:51:54 +0200 Subject: [PATCH 3/5] [BEAM-14383] run yapf on python test and add function doc --- .../io/gcp/bigquery_write_it_test.py | 46 ++++++++++--------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index 686e6d05e73c..359b7d75088f 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -376,6 +376,10 @@ def test_big_query_write_without_schema(self): @pytest.mark.it_postcommit def test_big_query_write_insert_errors_reporting(self): + """ + Test that errors returned by beam.io.WriteToBigQuery + contain both the failed rows amd the reason for it failing. + """ table_name = 'python_write_table' table_id = '{}.{}'.format(self.dataset_id, table_name) @@ -387,11 +391,12 @@ def test_big_query_write_insert_errors_reporting(self): 'str': 'some_string', }, { 'number': 2 - }, { - 'number': 3, - 'str': 'some_string', - 'additional_field_str': 'some_string', - }] + }, + { + 'number': 3, + 'str': 'some_string', + 'additional_field_str': 'some_string', + }] table_schema = { "fields": [{ @@ -415,25 +420,22 @@ def test_big_query_write_insert_errors_reporting(self): BigqueryFullResultMatcher( project=self.project, query="SELECT number, str FROM %s" % table_id, - data=[( - 1, - 'some_string' - )]), + data=[(1, 'some_string')]), BigqueryFullResultMatcher( project=self.project, query="SELECT table, reason, row_json FROM %s" % errors_table_id, - data=[( + data= + [( table_id, '[{"reason": "invalid", "location": "", "debugInfo": "", \ "message": "Missing required field: Msg_0_CLOUD_QUERY_TABLE.str."}]', - '{"number": 2}' - ), ( - table_id, - '[{"reason": "invalid", "location": "additional_field_str", \ + '{"number": 2}'), + ( + table_id, + '[{"reason": "invalid", "location": "additional_field_str", \ "debugInfo": "", "message": "no such field: additional_field_str."}]', - '{"number": 3, "str": "some_string", "additional_field_str": \ -"some_string"}' - )]) + '{"number": 3, "str": "some_string", "additional_field_str": \ +"some_string"}')]) ] args = self.test_pipeline.get_full_options_as_args( @@ -450,10 +452,12 @@ def test_big_query_write_insert_errors_reporting(self): write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY)) ( errors["FailedRows"] - | 'ParseErrors' >> beam.Map(lambda err: { - "table": err[0], - "reason": json.dumps(err[2]), - "row_json": json.dumps(err[1])}) + | 'ParseErrors' >> beam.Map( + lambda err: { + "table": err[0], + "reason": json.dumps(err[2]), + "row_json": json.dumps(err[1]) + }) | 'WriteErrors' >> beam.io.WriteToBigQuery( errors_table_id, schema=errors_table_schema, From 9f798441ddd147624d7819c56c3a354b99255ef8 Mon Sep 17 00:00:00 2001 From: Firlej <25161992+Firlej@users.noreply.github.com> Date: Wed, 4 May 2022 20:04:28 +0200 Subject: [PATCH 4/5] [BEAM-14383] reorder imports --- sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index 359b7d75088f..aa188e79ae96 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -22,11 +22,11 @@ import base64 import datetime +import json import logging import random import time import unittest -import json from decimal import Decimal import hamcrest as hc From c1443d739f49c869557ff706d7a9e30ce3ec4123 Mon Sep 17 00:00:00 2001 From: Firlej <25161992+Firlej@users.noreply.github.com> Date: Thu, 5 May 2022 02:45:27 +0200 Subject: [PATCH 5/5] [BEAM-14383] add a breaking change note to 2.39 CHANGES.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 9c2c465e67e7..7e5b7a46ef12 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -78,6 +78,7 @@ .withValueMapper(new TextMessageMapper()); ``` * Coders in Python are expected to inherit from Coder. ([BEAM-14351](https://issues.apache.org/jira/browse/BEAM-14351)). +* `FailedRows` key of the errors dictionary returned by `beam.io.WriteToBigQuery` transform now returns an array of 3-element tuples `(destination_table, row, reason)` instead of `(destination_table, row)`. ([BEAM-14383](https://issues.apache.org/jira/browse/BEAM-14383)). ## Deprecations