Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 61 additions & 19 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,15 @@ def compute_table_name(row):
NOTE: This job name template does not have backwards compatibility guarantees.
"""
BQ_JOB_NAME_TEMPLATE = "beam_bq_job_{job_type}_{job_id}_{step_id}{random}"
"""
The maximum number of times that a bundle of rows that errors out should be
sent for insertion into BigQuery.

The default is 10,000 with exponential backoffs, so a bundle of rows may be
tried for a very long time. You may reduce this property to reduce the number
of retries.
"""
MAX_INSERT_RETRIES = 10000


@deprecated(since='2.11.0', current="bigquery_tools.parse_table_reference")
Expand Down Expand Up @@ -1492,6 +1501,7 @@ class BigQueryWriteFn(DoFn):
DEFAULT_MAX_BATCH_SIZE = 500

FAILED_ROWS = 'FailedRows'
FAILED_ROWS_WITH_ERRORS = 'FailedRowsWithErrors'
STREAMING_API_LOGGING_FREQUENCY_SEC = 300

def __init__(
Expand All @@ -1507,7 +1517,8 @@ def __init__(
additional_bq_parameters=None,
ignore_insert_ids=False,
with_batched_input=False,
ignore_unknown_columns=False):
ignore_unknown_columns=False,
max_retries=MAX_INSERT_RETRIES):
"""Initialize a WriteToBigQuery transform.

Args:
Expand Down Expand Up @@ -1555,6 +1566,9 @@ def __init__(
the schema. The unknown values are ignored. Default is False,
which treats unknown values as errors. See reference:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
max_retries: The number of times that we will retry inserting a group of
rows into BigQuery. By default, we retry 10000 times with exponential
backoffs (effectively retry forever).

"""
self.schema = schema
Expand Down Expand Up @@ -1592,6 +1606,7 @@ def __init__(
self.streaming_api_logging_frequency_sec = (
BigQueryWriteFn.STREAMING_API_LOGGING_FREQUENCY_SEC)
self.ignore_unknown_columns = ignore_unknown_columns
self._max_retries = max_retries

def display_data(self):
return {
Expand Down Expand Up @@ -1643,7 +1658,9 @@ def start_bundle(self):

self._backoff_calculator = iter(
retry.FuzzedExponentialIntervals(
initial_delay_secs=0.2, num_retries=10000, max_delay_secs=1500))
initial_delay_secs=0.2,
num_retries=self._max_retries,
max_delay_secs=1500))

def _create_table_if_needed(self, table_reference, schema=None):
str_table_reference = '%s:%s.%s' % (
Expand Down Expand Up @@ -1754,41 +1771,57 @@ def _flush_batch(self, destination):
ignore_unknown_values=self.ignore_unknown_columns)
self.batch_latency_metric.update((time.time() - start) * 1000)

failed_rows = [rows[entry['index']] for entry in errors]
failed_rows = [(rows[entry['index']], entry["errors"])
for entry in errors]
retry_backoff = next(self._backoff_calculator, None)

# If retry_backoff is None, then we will not retry and must log.
should_retry = any(
RetryStrategy.should_retry(
self._retry_strategy, entry['errors'][0]['reason'])
for entry in errors)
for entry in errors) and retry_backoff is not None

if not passed:
self.failed_rows_metric.update(len(failed_rows))
message = (
'There were errors inserting to BigQuery. Will{} retry. '
'Errors were {}'.format(("" if should_retry else " not"), errors))
if should_retry:
_LOGGER.warning(message)
else:
_LOGGER.error(message)

rows = failed_rows
# The log level is:
# - WARNING when we are continuing to retry, and have a deadline.
# - ERROR when we will no longer retry, or MAY retry forever.
log_level = (
logging.WARN if should_retry or
self._retry_strategy != RetryStrategy.RETRY_ALWAYS else
logging.ERROR)

_LOGGER.log(log_level, message)

if not should_retry:
break
else:
retry_backoff = next(self._backoff_calculator)
_LOGGER.info(
'Sleeping %s seconds before retrying insertion.', retry_backoff)
time.sleep(retry_backoff)
rows = [fr[0] for fr in failed_rows]
self._throttled_secs.inc(retry_backoff)

self._total_buffered_rows -= len(self._rows_buffer[destination])
del self._rows_buffer[destination]

return [
return itertools.chain([
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
GlobalWindows.windowed_value((destination, row)))
for row in failed_rows
]
BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
GlobalWindows.windowed_value((destination, row, err))) for row,
err in failed_rows
],
[
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
GlobalWindows.windowed_value(
(destination, row))) for row,
unused_err in failed_rows
])


# The number of shards per destination when writing via streaming inserts.
Expand All @@ -1815,7 +1848,8 @@ def __init__(
ignore_insert_ids,
ignore_unknown_columns,
with_auto_sharding,
test_client=None):
test_client=None,
max_retries=None):
self.table_reference = table_reference
self.table_side_inputs = table_side_inputs
self.schema_side_inputs = schema_side_inputs
Expand All @@ -1831,6 +1865,7 @@ def __init__(
self.ignore_insert_ids = ignore_insert_ids
self.ignore_unknown_columns = ignore_unknown_columns
self.with_auto_sharding = with_auto_sharding
self.max_retries = max_retries or MAX_INSERT_RETRIES

class InsertIdPrefixFn(DoFn):
def start_bundle(self):
Expand All @@ -1856,7 +1891,8 @@ def expand(self, input):
additional_bq_parameters=self.additional_bq_parameters,
ignore_insert_ids=self.ignore_insert_ids,
ignore_unknown_columns=self.ignore_unknown_columns,
with_batched_input=self.with_auto_sharding)
with_batched_input=self.with_auto_sharding,
max_retries=self.max_retries)

def _add_random_shard(element):
key = element[0]
Expand Down Expand Up @@ -1905,7 +1941,9 @@ def _restore_table_ref(sharded_table_ref_elems_kv):
| 'FromHashableTableRef' >> beam.Map(_restore_table_ref)
| 'StreamInsertRows' >> ParDo(
bigquery_write_fn, *self.schema_side_inputs).with_outputs(
BigQueryWriteFn.FAILED_ROWS, main='main'))
BigQueryWriteFn.FAILED_ROWS,
BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
main='main'))


# Flag to be passed to WriteToBigQuery to force schema autodetection
Expand Down Expand Up @@ -2194,7 +2232,11 @@ def expand(self, pcoll):
with_auto_sharding=self.with_auto_sharding,
test_client=self.test_client)

return {BigQueryWriteFn.FAILED_ROWS: outputs[BigQueryWriteFn.FAILED_ROWS]}
return {
BigQueryWriteFn.FAILED_ROWS: outputs[BigQueryWriteFn.FAILED_ROWS],
BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS: outputs[
BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS],
}
else:
if self._temp_file_format == bigquery_tools.FileFormat.AVRO:
if self.schema == SCHEMA_AUTODETECT:
Expand Down
Loading