From 38a215357a4c47f6a73c409bd39911ef049df9b7 Mon Sep 17 00:00:00 2001 From: Pablo E Date: Fri, 6 May 2022 19:56:56 -0700 Subject: [PATCH 1/7] [BEAM-14415] Exception handling tests and logging for partial failures in BQ IO --- sdks/python/apache_beam/io/gcp/bigquery.py | 49 ++++- .../apache_beam/io/gcp/bigquery_test.py | 192 +++++++++++++++++- 2 files changed, 228 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 1375287b595c..73e13056fdfb 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -372,6 +372,15 @@ def compute_table_name(row): NOTE: This job name template does not have backwards compatibility guarantees. """ BQ_JOB_NAME_TEMPLATE = "beam_bq_job_{job_type}_{job_id}_{step_id}{random}" +""" +The maximum number of times that a bundle of rows that errors out should be +sent for insertion into BigQuery. + +The default is 10,000 with exponential backoffs, so a bundle of rows may be +tried for a very long time. You may reduce this property to reduce the number +of retries. +""" +MAX_INSERT_RETRIES = 10000 @deprecated(since='2.11.0', current="bigquery_tools.parse_table_reference") @@ -1501,7 +1510,8 @@ def __init__( additional_bq_parameters=None, ignore_insert_ids=False, with_batched_input=False, - ignore_unknown_columns=False): + ignore_unknown_columns=False, + max_retries=MAX_INSERT_RETRIES): """Initialize a WriteToBigQuery transform. Args: @@ -1549,6 +1559,9 @@ def __init__( the schema. The unknown values are ignored. Default is False, which treats unknown values as errors. See reference: https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll + max_retries: The number of times that we will retry inserting a group of + rows into BigQuery. By default, we retry 10000 times with exponential + backoffs (effectively retry forever). """ self.schema = schema @@ -1586,6 +1599,7 @@ def __init__( self.streaming_api_logging_frequency_sec = ( BigQueryWriteFn.STREAMING_API_LOGGING_FREQUENCY_SEC) self.ignore_unknown_columns = ignore_unknown_columns + self._max_retries = max_retries def display_data(self): return { @@ -1637,7 +1651,9 @@ def start_bundle(self): self._backoff_calculator = iter( retry.FuzzedExponentialIntervals( - initial_delay_secs=0.2, num_retries=10000, max_delay_secs=1500)) + initial_delay_secs=0.2, + num_retries=self._max_retries, + max_delay_secs=1500)) def _create_table_if_needed(self, table_reference, schema=None): str_table_reference = '%s:%s.%s' % ( @@ -1750,29 +1766,37 @@ def _flush_batch(self, destination): failed_rows = [(rows[entry['index']], entry["errors"]) for entry in errors] + retry_backoff = next(self._backoff_calculator, None) + + # If retry_backoff is None, then we will not retry and must log. should_retry = any( RetryStrategy.should_retry( self._retry_strategy, entry['errors'][0]['reason']) - for entry in errors) + for entry in errors) and retry_backoff is not None + if not passed: self.failed_rows_metric.update(len(failed_rows)) message = ( 'There were errors inserting to BigQuery. Will{} retry. ' 'Errors were {}'.format(("" if should_retry else " not"), errors)) - if should_retry: - _LOGGER.warning(message) - else: - _LOGGER.error(message) - rows = failed_rows + # The log level is: + # - WARNING when we are continuing to retry, and have a deadline. + # - ERROR when we will no longer retry, or MAY retry forever. + log_level = ( + logging.WARN if should_retry or + self._retry_strategy != RetryStrategy.RETRY_ALWAYS else + logging.ERROR) + + _LOGGER.log(log_level, message) if not should_retry: break else: - retry_backoff = next(self._backoff_calculator) _LOGGER.info( 'Sleeping %s seconds before retrying insertion.', retry_backoff) time.sleep(retry_backoff) + rows = [fr[0] for fr in failed_rows] self._throttled_secs.inc(retry_backoff) self._total_buffered_rows -= len(self._rows_buffer[destination]) @@ -1811,7 +1835,8 @@ def __init__( ignore_insert_ids, ignore_unknown_columns, with_auto_sharding, - test_client=None): + test_client=None, + max_retries=None): self.table_reference = table_reference self.table_side_inputs = table_side_inputs self.schema_side_inputs = schema_side_inputs @@ -1827,6 +1852,7 @@ def __init__( self.ignore_insert_ids = ignore_insert_ids self.ignore_unknown_columns = ignore_unknown_columns self.with_auto_sharding = with_auto_sharding + self.max_retries = max_retries or MAX_INSERT_RETRIES class InsertIdPrefixFn(DoFn): def start_bundle(self): @@ -1852,7 +1878,8 @@ def expand(self, input): additional_bq_parameters=self.additional_bq_parameters, ignore_insert_ids=self.ignore_insert_ids, ignore_unknown_columns=self.ignore_unknown_columns, - with_batched_input=self.with_auto_sharding) + with_batched_input=self.with_auto_sharding, + max_retries=self.max_retries) def _add_random_shard(element): key = element[0] diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index a4fd5053df00..56827d6a78d9 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -41,6 +41,7 @@ from apache_beam.internal import pickler from apache_beam.internal.gcp.json_value import to_json_value from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp +from apache_beam.io.gcp import bigquery as beam_bq from apache_beam.io.gcp import bigquery_tools from apache_beam.io.gcp.bigquery import TableRowJsonCoder from apache_beam.io.gcp.bigquery import WriteToBigQuery @@ -86,6 +87,14 @@ _LOGGER = logging.getLogger(__name__) +def _load_or_default(filename): + try: + with open(filename) as f: + return json.load(f) + except: # pylint: disable=bare-except + return {} + + @unittest.skipIf( HttpError is None or gcp_bigquery is None, 'GCP dependencies are not installed') @@ -1026,6 +1035,185 @@ def store_callback(table, **kwargs): with open(file_name_1) as f1, open(file_name_2) as f2: self.assertEqual(json.load(f1), json.load(f2)) + @parameterized.expand([ + param(retry_strategy=RetryStrategy.RETRY_ALWAYS), + param(retry_strategy=RetryStrategy.RETRY_NEVER), + param(retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR), + ]) + def test_failure_in_some_rows_does_not_duplicate( + self, unused_sleep_mock=None, retry_strategy=None): + with mock.patch('time.sleep'): + # In this test we simulate a failure to write out two out of three rows. + # Row 0 and row 2 fail to be written on the first attempt, and then + # succeed on the next attempt (if there is one). + tempdir = '%s%s' % (self._new_tempdir(), os.sep) + file_name_1 = os.path.join(tempdir, 'file1_partial') + file_name_2 = os.path.join(tempdir, 'file2_partial') + + def store_callback(table, **kwargs): + insert_ids = [r for r in kwargs['row_ids']] + colA_values = [r['columnA'] for r in kwargs['json_rows']] + + # The first time this function is called, all rows are included + # so we need to filter out 'failed' rows. + json_output_1 = { + 'insertIds': [insert_ids[1]], 'colA_values': [colA_values[1]] + } + # The second time this function is called, only rows 0 and 2 are incl + # so we don't need to filter any of them. We just write them all out. + json_output_2 = {'insertIds': insert_ids, 'colA_values': colA_values} + + # The first time we try to insert, we save those insertions in + # file insert_calls1. + if not os.path.exists(file_name_1): + with open(file_name_1, 'w') as f: + json.dump(json_output_1, f) + return [ + { + 'index': 0, + 'errors': [{ + 'reason': 'i dont like this row' + }, { + 'reason': 'its bad' + }] + }, + { + 'index': 2, + 'errors': [{ + 'reason': 'i het this row' + }, { + 'reason': 'its no gud' + }] + }, + ] + else: + with open(file_name_2, 'w') as f: + json.dump(json_output_2, f) + return [] + + client = mock.Mock() + client.insert_rows_json = mock.Mock(side_effect=store_callback) + + # The expected rows to be inserted according to the insert strategy + if retry_strategy == RetryStrategy.RETRY_NEVER: + result = ['value3'] + else: # RETRY_ALWAYS and RETRY_ON_TRANSIENT_ERRORS should insert all rows + result = ['value1', 'value3', 'value5'] + + # Using the bundle based direct runner to avoid pickling problems + # with mocks. + with beam.Pipeline(runner='BundleBasedDirectRunner') as p: + bq_write_out = ( + p + | beam.Create([{ + 'columnA': 'value1', 'columnB': 'value2' + }, { + 'columnA': 'value3', 'columnB': 'value4' + }, { + 'columnA': 'value5', 'columnB': 'value6' + }]) + | _StreamToBigQuery( + table_reference='project:dataset.table', + table_side_inputs=[], + schema_side_inputs=[], + schema='anyschema', + batch_size=None, + triggering_frequency=None, + create_disposition='CREATE_NEVER', + write_disposition=None, + kms_key=None, + retry_strategy=retry_strategy, + additional_bq_parameters=[], + ignore_insert_ids=False, + ignore_unknown_columns=False, + with_auto_sharding=False, + test_client=client)) + + failed_values = ( + bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS] + | beam.Map(lambda x: x[1]['columnA'])) + + assert_that( + failed_values, + equal_to(list({'value1', 'value3', 'value5'}.difference(result)))) + + data1 = _load_or_default(file_name_1) + data2 = _load_or_default(file_name_2) + + self.assertListEqual( + sorted(data1.get('colA_values', []) + data2.get('colA_values', [])), + result) + self.assertEqual(len(data1['colA_values']), 1) + + @parameterized.expand([ + param(retry_strategy=RetryStrategy.RETRY_ALWAYS), + param(retry_strategy=RetryStrategy.RETRY_NEVER), + param(retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR), + ]) + def test_permanent_failure_in_some_rows_does_not_duplicate( + self, unused_sleep_mock=None, retry_strategy=None): + with mock.patch('time.sleep'): + + def store_callback(table, **kwargs): + return [ + { + 'index': 0, + 'errors': [{ + 'reason': 'invalid' + }, { + 'reason': 'its bad' + }] + }, + ] + + client = mock.Mock() + client.insert_rows_json = mock.Mock(side_effect=store_callback) + + # The expected rows to be inserted according to the insert strategy + if retry_strategy == RetryStrategy.RETRY_NEVER: + inserted_rows = ['value3', 'value5'] + else: # RETRY_ALWAYS and RETRY_ON_TRANSIENT_ERRORS should insert all rows + inserted_rows = ['value3', 'value5'] + + # Using the bundle based direct runner to avoid pickling problems + # with mocks. + with beam.Pipeline(runner='BundleBasedDirectRunner') as p: + bq_write_out = ( + p + | beam.Create([{ + 'columnA': 'value1', 'columnB': 'value2' + }, { + 'columnA': 'value3', 'columnB': 'value4' + }, { + 'columnA': 'value5', 'columnB': 'value6' + }]) + | _StreamToBigQuery( + table_reference='project:dataset.table', + table_side_inputs=[], + schema_side_inputs=[], + schema='anyschema', + batch_size=None, + triggering_frequency=None, + create_disposition='CREATE_NEVER', + write_disposition=None, + kms_key=None, + retry_strategy=retry_strategy, + additional_bq_parameters=[], + ignore_insert_ids=False, + ignore_unknown_columns=False, + with_auto_sharding=False, + test_client=client, + max_retries=10)) + + failed_values = ( + bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS] + | beam.Map(lambda x: x[1]['columnA'])) + + assert_that( + failed_values, + equal_to( + list({'value1', 'value3', 'value5'}.difference(inserted_rows)))) + @parameterized.expand([ param(with_auto_sharding=False), param(with_auto_sharding=True), @@ -1413,12 +1601,12 @@ def test_avro_file_load(self): bigquery_file_loads._DEFAULT_MAX_FILE_SIZE = 100 elements = [ { - 'name': u'Negative infinity', + 'name': 'Negative infinity', 'value': -float('inf'), 'timestamp': datetime.datetime(1970, 1, 1, tzinfo=pytz.utc), }, { - 'name': u'Not a number', + 'name': 'Not a number', 'value': float('nan'), 'timestamp': datetime.datetime(2930, 12, 9, tzinfo=pytz.utc), }, From 5c23c311ba867bf9dbc251480f93b4411cb7e64b Mon Sep 17 00:00:00 2001 From: Pablo E Date: Mon, 9 May 2022 11:00:43 -0700 Subject: [PATCH 2/7] fix DLQ integration test --- .../io/gcp/bigquery_write_it_test.py | 74 +++++++++---------- 1 file changed, 33 insertions(+), 41 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index aa188e79ae96..2c9009be2465 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -37,6 +37,7 @@ from parameterized import parameterized import apache_beam as beam +from apache_beam.io.gcp.bigquery import BigQueryWriteFn from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper from apache_beam.io.gcp.bigquery_tools import FileFormat from apache_beam.io.gcp.internal.clients import bigquery @@ -45,6 +46,8 @@ # Protect against environments where bigquery library is not available. # pylint: disable=wrong-import-order, wrong-import-position +from apache_beam.testing.util import assert_that, equal_to + try: from apitools.base.py.exceptions import HttpError except ImportError: @@ -383,9 +386,6 @@ def test_big_query_write_insert_errors_reporting(self): table_name = 'python_write_table' table_id = '{}.{}'.format(self.dataset_id, table_name) - errors_table_name = table_name + '_error_records' - errors_table_id = '{}.{}'.format(self.dataset_id, errors_table_name) - input_data = [{ 'number': 1, 'str': 'some_string', @@ -406,36 +406,34 @@ def test_big_query_write_insert_errors_reporting(self): }] } - errors_table_schema = { - "fields": [{ - 'name': 'table', 'type': 'STRING', 'mode': 'REQUIRED' - }, { - 'name': 'reason', 'type': 'STRING', 'mode': 'NULLABLE' - }, { - 'name': 'row_json', 'type': 'STRING', 'mode': 'REQUIRED' - }] - } + bq_result_errors = [( + { + "number": 2 + }, + [{ + "reason": "invalid", + "location": "", + "debugInfo": "", + "message": "Missing required field: Msg_0_CLOUD_QUERY_TABLE.str." + }], + ), + ({ + "number": 3, + "str": "some_string", + "additional_field_str": "some_string" + }, + [{ + "reason": "invalid", + "location": "additional_field_str", + "debugInfo": "", + "message": "no such field: additional_field_str." + }])] pipeline_verifiers = [ BigqueryFullResultMatcher( project=self.project, query="SELECT number, str FROM %s" % table_id, data=[(1, 'some_string')]), - BigqueryFullResultMatcher( - project=self.project, - query="SELECT table, reason, row_json FROM %s" % errors_table_id, - data= - [( - table_id, - '[{"reason": "invalid", "location": "", "debugInfo": "", \ -"message": "Missing required field: Msg_0_CLOUD_QUERY_TABLE.str."}]', - '{"number": 2}'), - ( - table_id, - '[{"reason": "invalid", "location": "additional_field_str", \ -"debugInfo": "", "message": "no such field: additional_field_str."}]', - '{"number": 3, "str": "some_string", "additional_field_str": \ -"some_string"}')]) ] args = self.test_pipeline.get_full_options_as_args( @@ -448,21 +446,15 @@ def test_big_query_write_insert_errors_reporting(self): | 'write' >> beam.io.WriteToBigQuery( table_id, schema=table_schema, + method='STREAMING_INSERTS', + insert_retry_strategy='RETRY_NEVER', create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY)) - ( - errors["FailedRows"] - | 'ParseErrors' >> beam.Map( - lambda err: { - "table": err[0], - "reason": json.dumps(err[2]), - "row_json": json.dumps(err[1]) - }) - | 'WriteErrors' >> beam.io.WriteToBigQuery( - errors_table_id, - schema=errors_table_schema, - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY)) + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)) + + assert_that( + errors[BigQueryWriteFn.FAILED_ROWS] + | 'ParseErrors' >> beam.Map(lambda err: (err[1], err[2])), + equal_to(bq_result_errors)) @pytest.mark.it_postcommit @parameterized.expand([ From 6f88a80ea708983f0c205cef184e85210f800bea Mon Sep 17 00:00:00 2001 From: Pablo E Date: Mon, 9 May 2022 11:09:57 -0700 Subject: [PATCH 3/7] fix lint --- sdks/python/apache_beam/io/gcp/bigquery_tools_test.py | 2 +- sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index e4ff6082cabb..3ce8d0ff7de4 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -223,7 +223,7 @@ def test_delete_dataset_retries_for_timeouts(self, patched_time_sleep): self.assertTrue(client.datasets.Delete.called) @unittest.skipIf( - google and not hasattr(google.cloud, '_http'), + google and not hasattr(google.cloud, '_http'), # pylint: disable=c-extension-no-member 'Dependencies not installed') @mock.patch('time.sleep', return_value=None) @mock.patch('google.cloud._http.JSONConnection.http') diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index 2c9009be2465..74d51f18fbbe 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -22,7 +22,6 @@ import base64 import datetime -import json import logging import random import time @@ -43,10 +42,11 @@ from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to # Protect against environments where bigquery library is not available. # pylint: disable=wrong-import-order, wrong-import-position -from apache_beam.testing.util import assert_that, equal_to try: from apitools.base.py.exceptions import HttpError From 646b22785651f5df0a8faaa346b37e4eacf9b327 Mon Sep 17 00:00:00 2001 From: Pablo E Date: Mon, 9 May 2022 12:48:46 -0700 Subject: [PATCH 4/7] fix postcommit --- sdks/python/apache_beam/io/gcp/bigquery_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 56827d6a78d9..6db8e9e9fa2b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -1463,7 +1463,8 @@ def test_multiple_destinations_transform(self): method='STREAMING_INSERTS')) assert_that( - r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS], + r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS] + | beam.Map(lambda elm: (elm[0], elm[1])), equal_to([(full_output_table_1, bad_record)])) def tearDown(self): From a1757d773a0e98fe0f3a76f2def44d8e3bf0a013 Mon Sep 17 00:00:00 2001 From: Pablo E Date: Tue, 10 May 2022 14:43:42 -0700 Subject: [PATCH 5/7] fix formatter --- sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index 27bad78a4485..74d51f18fbbe 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -20,7 +20,6 @@ """Unit tests for BigQuery sources and sinks.""" # pytype: skip-file - import base64 import datetime import logging From 49db8ffd5331cdf152012b539e46552db78dad43 Mon Sep 17 00:00:00 2001 From: Pablo E Date: Wed, 11 May 2022 11:34:38 -0700 Subject: [PATCH 6/7] Fixing tests and adding test info --- sdks/python/apache_beam/io/gcp/bigquery.py | 28 ++++++++++++++----- .../apache_beam/io/gcp/bigquery_test.py | 15 ++++++---- .../io/gcp/bigquery_write_it_test.py | 2 +- 3 files changed, 32 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 0e24dcf2db37..4d7df85f905e 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1501,6 +1501,7 @@ class BigQueryWriteFn(DoFn): DEFAULT_MAX_BATCH_SIZE = 500 FAILED_ROWS = 'FailedRows' + FAILED_ROWS_WITH_ERRORS = 'FailedRowsWithErrors' STREAMING_API_LOGGING_FREQUENCY_SEC = 300 def __init__( @@ -1808,12 +1809,19 @@ def _flush_batch(self, destination): self._total_buffered_rows -= len(self._rows_buffer[destination]) del self._rows_buffer[destination] - return [ + return itertools.chain([ pvalue.TaggedOutput( - BigQueryWriteFn.FAILED_ROWS, - GlobalWindows.windowed_value((destination, row))) - for row in failed_rows - ] + BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS, + GlobalWindows.windowed_value((destination, row, err))) for row, + err in failed_rows + ], + [ + pvalue.TaggedOutput( + BigQueryWriteFn.FAILED_ROWS, + GlobalWindows.windowed_value( + (destination, row))) for row, + unused_err in failed_rows + ]) # The number of shards per destination when writing via streaming inserts. @@ -1933,7 +1941,9 @@ def _restore_table_ref(sharded_table_ref_elems_kv): | 'FromHashableTableRef' >> beam.Map(_restore_table_ref) | 'StreamInsertRows' >> ParDo( bigquery_write_fn, *self.schema_side_inputs).with_outputs( - BigQueryWriteFn.FAILED_ROWS, main='main')) + BigQueryWriteFn.FAILED_ROWS, + BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS, + main='main')) # Flag to be passed to WriteToBigQuery to force schema autodetection @@ -2222,7 +2232,11 @@ def expand(self, pcoll): with_auto_sharding=self.with_auto_sharding, test_client=self.test_client) - return {BigQueryWriteFn.FAILED_ROWS: outputs[BigQueryWriteFn.FAILED_ROWS]} + return { + BigQueryWriteFn.FAILED_ROWS: outputs[BigQueryWriteFn.FAILED_ROWS], + BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS: outputs[ + BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS], + } else: if self._temp_file_format == bigquery_tools.FileFormat.AVRO: if self.schema == SCHEMA_AUTODETECT: diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index ebee4d5bec0b..02eeb6e5febc 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -1242,7 +1242,8 @@ def test_with_batched_input(self): @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp): - def test_failure_has_same_insert_ids(self): + @mock.patch('time.sleep') + def test_failure_has_same_insert_ids(self, unused_mock_sleep): tempdir = '%s%s' % (self._new_tempdir(), os.sep) file_name_1 = os.path.join(tempdir, 'file1') file_name_2 = os.path.join(tempdir, 'file2') @@ -1303,8 +1304,7 @@ def store_callback(table, **kwargs): param(retry_strategy=RetryStrategy.RETRY_NEVER), param(retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR), ]) - def test_failure_in_some_rows_does_not_duplicate( - self, unused_sleep_mock=None, retry_strategy=None): + def test_failure_in_some_rows_does_not_duplicate(self, retry_strategy=None): with mock.patch('time.sleep'): # In this test we simulate a failure to write out two out of three rows. # Row 0 and row 2 fail to be written on the first attempt, and then @@ -1393,7 +1393,7 @@ def store_callback(table, **kwargs): test_client=client)) failed_values = ( - bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS] + bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS] | beam.Map(lambda x: x[1]['columnA'])) assert_that( @@ -1726,10 +1726,15 @@ def test_multiple_destinations_transform(self): method='STREAMING_INSERTS')) assert_that( - r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS] + r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS] | beam.Map(lambda elm: (elm[0], elm[1])), equal_to([(full_output_table_1, bad_record)])) + assert_that( + r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS], + equal_to([(full_output_table_1, bad_record)]), + label='FailedRowsMatch') + def tearDown(self): request = bigquery.BigqueryDatasetsDeleteRequest( projectId=self.project, datasetId=self.dataset_id, deleteContents=True) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index 74d51f18fbbe..e75b698c6516 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -452,7 +452,7 @@ def test_big_query_write_insert_errors_reporting(self): write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)) assert_that( - errors[BigQueryWriteFn.FAILED_ROWS] + errors[BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS] | 'ParseErrors' >> beam.Map(lambda err: (err[1], err[2])), equal_to(bq_result_errors)) From 5b8f0401b1562efa73beff9b7680762ddd9effa4 Mon Sep 17 00:00:00 2001 From: Pablo E Date: Thu, 12 May 2022 11:41:00 -0700 Subject: [PATCH 7/7] fix skipping tests --- sdks/python/apache_beam/io/gcp/bigquery_test.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 02eeb6e5febc..ff2a95c7f8eb 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -847,6 +847,7 @@ def noop(table, **kwargs): test_client=client)) +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class BigQueryStreamingInsertsErrorHandling(unittest.TestCase): # Using https://cloud.google.com/bigquery/docs/error-messages and @@ -1541,6 +1542,7 @@ def store_callback(table, **kwargs): self.assertEqual(out2['colA_values'], ['value5']) +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase): BIG_QUERY_DATASET_ID = 'python_bq_streaming_inserts_' @@ -1840,6 +1842,7 @@ def test_file_loads(self): WriteToBigQuery.Method.FILE_LOADS, triggering_frequency=20) +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class BigQueryFileLoadsIntegrationTests(unittest.TestCase): BIG_QUERY_DATASET_ID = 'python_bq_file_loads_'