Skip to content
180 changes: 168 additions & 12 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from __future__ import absolute_import

import hashlib
import io
import logging
import random
import time
Expand All @@ -50,6 +51,13 @@
from apache_beam.transforms.util import GroupIntoBatches
from apache_beam.transforms.window import GlobalWindows

# Protect against environments where bigquery library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
try:
from apitools.base.py.exceptions import HttpError
except ImportError:
pass

_LOGGER = logging.getLogger(__name__)

ONE_TERABYTE = (1 << 40)
Expand Down Expand Up @@ -316,6 +324,119 @@ def process(self, element, file_prefix, *schema_side_inputs):
yield (destination, (file_path, file_size))


class UpdateDestinationSchema(beam.DoFn):
"""Update destination schema based on data that is about to be copied into it.

Unlike load and query jobs, BigQuery copy jobs do not support schema field
addition or relaxation on the destination table. This DoFn fills that gap by
updating the destination table schemas to be compatible with the data coming
from the source table so that schema field modification options are respected
regardless of whether data is loaded directly to the destination table or
loaded into temporary tables before being copied into the destination.

This tranform takes as input a (destination, job_reference) pair where the
job_reference refers to a completed load job into a temporary table.

This transform emits (destination, job_reference) pairs where the
job_reference refers to a submitted load job for performing the schema
modification. Note that the input and output job references are not the same.

Experimental; no backwards compatibility guarantees.
"""
def __init__(
self,
write_disposition=None,
test_client=None,
additional_bq_parameters=None,
step_name=None):
self._test_client = test_client
self._write_disposition = write_disposition
self._additional_bq_parameters = additional_bq_parameters or {}
self._step_name = step_name

def setup(self):
self._bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client)
self._bq_io_metadata = create_bigquery_io_metadata(self._step_name)

def process(self, element, schema_mod_job_name_prefix):
destination = element[0]
temp_table_load_job_reference = element[1]

if callable(self._additional_bq_parameters):
additional_parameters = self._additional_bq_parameters(destination)
elif isinstance(self._additional_bq_parameters, vp.ValueProvider):
additional_parameters = self._additional_bq_parameters.get()
else:
additional_parameters = self._additional_bq_parameters

# When writing to normal tables WRITE_TRUNCATE will overwrite the schema but
# when writing to a partition, care needs to be taken to update the schema
# even on WRITE_TRUNCATE.
if (self._write_disposition not in ('WRITE_TRUNCATE', 'WRITE_APPEND') or
not additional_parameters or
not additional_parameters.get("schemaUpdateOptions")):
# No need to modify schema of destination table
return

table_reference = bigquery_tools.parse_table_reference(destination)
if table_reference.projectId is None:
table_reference.projectId = vp.RuntimeValueProvider.get_value(
'project', str, '')

try:
# Check if destination table exists
destination_table = self._bq_wrapper.get_table(
project_id=table_reference.projectId,
dataset_id=table_reference.datasetId,
table_id=table_reference.tableId)
except HttpError as exn:
if exn.status_code == 404:
# Destination table does not exist, so no need to modify its schema
# ahead of the copy jobs.
return
else:
raise

temp_table_load_job = self._bq_wrapper.get_job(
project=temp_table_load_job_reference.projectId,
job_id=temp_table_load_job_reference.jobId,
location=temp_table_load_job_reference.location)
temp_table_schema = temp_table_load_job.configuration.load.schema

Comment on lines +403 to +405
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to compare the schema of the destination table with the schema of the temp table job? We'd save one load, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a simple comparison of destination_table.schema == temp_table_schema. It will work for trivial cases but doesn't catch the cases where the order of fields in a record differs. E.g., the following schemas are different according to == even though the temp table can be directly appended to the destination table without error.

<TableSchema
  fields: [
    <TableFieldSchema fields: [], name: 'bytes', type: 'BYTES'>,
    <TableFieldSchema fields: [], name: 'date', type: 'DATE'>,
    <TableFieldSchema fields: [], name: 'time', type: 'TIME'>
  ]>
<TableSchema
  fields: [
    <TableFieldSchema fields: [], name: 'date', type: 'DATE'>,
    <TableFieldSchema fields: [], name: 'time', type: 'TIME'>,
    <TableFieldSchema fields: [], name: 'bytes', type: 'BYTES'>
  ]>

I can probably write a function to check the schema recursively but do you know if one already exists?

# FIXME: This short-circuit lacks specificity. Schemas differing only in
# the order of fields are not equivalent according to == but do not
# need a schema modification job to precede the copy job.
if temp_table_schema == destination_table.schema:
# Destination table schema is already the same as the temp table schema,
# so no need to run a job to update the destination table schema.
return

destination_hash = _bq_uuid(
'%s:%s.%s' % (
table_reference.projectId,
table_reference.datasetId,
table_reference.tableId))
uid = _bq_uuid()
job_name = '%s_%s_%s' % (schema_mod_job_name_prefix, destination_hash, uid)

_LOGGER.debug(
'Triggering schema modification job %s on %s',
job_name,
table_reference)
# Trigger potential schema modification by loading zero rows into the
# destination table with the temporary table schema.
schema_update_job_reference = self._bq_wrapper.perform_load_job(
destination=table_reference,
source_stream=io.BytesIO(), # file with zero rows
job_id=job_name,
schema=temp_table_schema,
write_disposition='WRITE_APPEND',
create_disposition='CREATE_NEVER',
additional_load_parameters=additional_parameters,
job_labels=self._bq_io_metadata.add_additional_bq_job_labels())
yield (destination, schema_update_job_reference)


class TriggerCopyJobs(beam.DoFn):
"""Launches jobs to copy from temporary tables into the main target table.

Expand Down Expand Up @@ -355,7 +476,7 @@ def start_bundle(self):
if not self.bq_io_metadata:
self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)

def process(self, element, job_name_prefix=None):
def process(self, element, job_name_prefix=None, unused_schema_mod_jobs=None):
destination = element[0]
job_reference = element[1]

Expand Down Expand Up @@ -526,9 +647,9 @@ def process(self, element, load_job_name_prefix, *schema_side_inputs):
if not self.bq_io_metadata:
self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
job_reference = self.bq_wrapper.perform_load_job(
table_reference,
files,
job_name,
destination=table_reference,
source_uris=files,
job_id=job_name,
schema=schema,
write_disposition=self.write_disposition,
create_disposition=create_disposition,
Expand Down Expand Up @@ -845,6 +966,7 @@ def _load_data(
partitions_using_temp_tables,
partitions_direct_to_destination,
load_job_name_pcv,
schema_mod_job_name_pcv,
copy_job_name_pcv,
p,
step_name):
Expand Down Expand Up @@ -883,32 +1005,56 @@ def _load_data(
temp_tables_load_job_ids_pc = trigger_loads_outputs['main']
temp_tables_pc = trigger_loads_outputs[TriggerLoadJobs.TEMP_TABLES]

destination_copy_job_ids_pc = (
finished_temp_tables_load_jobs_pc = (
p
| "ImpulseMonitorLoadJobs" >> beam.Create([None])
| "WaitForTempTableLoadJobs" >> beam.ParDo(
WaitForBQJobs(self.test_client),
beam.pvalue.AsList(temp_tables_load_job_ids_pc))
pvalue.AsList(temp_tables_load_job_ids_pc)))

schema_mod_job_ids_pc = (
finished_temp_tables_load_jobs_pc
| beam.ParDo(
UpdateDestinationSchema(
write_disposition=self.write_disposition,
test_client=self.test_client,
additional_bq_parameters=self.additional_bq_parameters,
step_name=step_name),
schema_mod_job_name_pcv))

finished_schema_mod_jobs_pc = (
p
| "ImpulseMonitorSchemaModJobs" >> beam.Create([None])
| "WaitForSchemaModJobs" >> beam.ParDo(
WaitForBQJobs(self.test_client),
pvalue.AsList(schema_mod_job_ids_pc)))

destination_copy_job_ids_pc = (
finished_temp_tables_load_jobs_pc
| beam.ParDo(
TriggerCopyJobs(
create_disposition=self.create_disposition,
write_disposition=self.write_disposition,
test_client=self.test_client,
step_name=step_name),
copy_job_name_pcv))
copy_job_name_pcv,
pvalue.AsIter(finished_schema_mod_jobs_pc)))

finished_copy_jobs_pc = (
p
| "ImpulseMonitorCopyJobs" >> beam.Create([None])
| "WaitForCopyJobs" >> beam.ParDo(
WaitForBQJobs(self.test_client),
beam.pvalue.AsList(destination_copy_job_ids_pc)))
pvalue.AsList(destination_copy_job_ids_pc)))

_ = (
finished_copy_jobs_pc
p
| "RemoveTempTables/Impulse" >> beam.Create([None])
| "RemoveTempTables/PassTables" >> beam.FlatMap(
lambda x,
lambda _,
unused_copy_jobs,
deleting_tables: deleting_tables,
pvalue.AsIter(finished_copy_jobs_pc),
pvalue.AsIter(temp_tables_pc))
| "RemoveTempTables/AddUselessValue" >> beam.Map(lambda x: (x, None))
| "RemoveTempTables/DeduplicateTables" >> beam.GroupByKey()
Expand All @@ -934,10 +1080,10 @@ def _load_data(

_ = (
p
| "ImpulseMonitorDestLoadJobs" >> beam.Create([None])
| "ImpulseMonitorDestinationLoadJobs" >> beam.Create([None])
| "WaitForDestinationLoadJobs" >> beam.ParDo(
WaitForBQJobs(self.test_client),
beam.pvalue.AsList(destination_load_job_ids_pc)))
pvalue.AsList(destination_load_job_ids_pc)))

destination_load_job_ids_pc = (
(temp_tables_load_job_ids_pc, destination_load_job_ids_pc)
Expand Down Expand Up @@ -966,6 +1112,14 @@ def expand(self, pcoll):
lambda _: _generate_job_name(
job_name, bigquery_tools.BigQueryJobTypes.LOAD, 'LOAD_STEP')))

schema_mod_job_name_pcv = pvalue.AsSingleton(
singleton_pc
| "SchemaModJobNamePrefix" >> beam.Map(
lambda _: _generate_job_name(
job_name,
bigquery_tools.BigQueryJobTypes.LOAD,
'SCHEMA_MOD_STEP')))

copy_job_name_pcv = pvalue.AsSingleton(
singleton_pc
| "CopyJobNamePrefix" >> beam.Map(
Expand Down Expand Up @@ -1022,6 +1176,7 @@ def expand(self, pcoll):
self._load_data(all_partitions,
empty_pc,
load_job_name_pcv,
schema_mod_job_name_pcv,
copy_job_name_pcv,
p,
step_name))
Expand All @@ -1030,6 +1185,7 @@ def expand(self, pcoll):
self._load_data(multiple_partitions_per_destination_pc,
single_partition_per_destination_pc,
load_job_name_pcv,
schema_mod_job_name_pcv,
copy_job_name_pcv,
p,
step_name))
Expand Down
54 changes: 46 additions & 8 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
# Protect against environments where bigquery library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
try:
from apitools.base.py.transfer import Upload
from apitools.base.py.exceptions import HttpError, HttpForbiddenError
except ImportError:
pass
Expand All @@ -83,9 +84,9 @@

_LOGGER = logging.getLogger(__name__)

MAX_RETRIES = 3

JSON_COMPLIANCE_ERROR = 'NAN, INF and -INF values are not JSON compliant.'
MAX_RETRIES = 3
UNKNOWN_MIME_TYPE = 'application/octet-stream'


class FileFormat(object):
Expand Down Expand Up @@ -407,13 +408,29 @@ def _insert_load_job(
project_id,
job_id,
table_reference,
source_uris,
source_uris=None,
source_stream=None,
schema=None,
write_disposition=None,
create_disposition=None,
additional_load_parameters=None,
source_format=None,
job_labels=None):

if not source_uris and not source_stream:
raise ValueError(
'Either a non-empty list of fully-qualified source URIs must be '
'provided via the source_uris parameter or an open file object must '
'be provided via the source_stream parameter. Got neither.')

if source_uris and source_stream:
raise ValueError(
'Only one of source_uris and source_stream may be specified. '
'Got both.')

if source_uris is None:
source_uris = []

additional_load_parameters = additional_load_parameters or {}
job_schema = None if schema == 'SCHEMA_AUTODETECT' else schema
reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
Expand All @@ -435,18 +452,26 @@ def _insert_load_job(
),
jobReference=reference,
))
return self._start_job(request).jobReference
return self._start_job(request, stream=source_stream).jobReference

def _start_job(
self,
request # type: bigquery.BigqueryJobsInsertRequest
request, # type: bigquery.BigqueryJobsInsertRequest
stream=None,
):
"""Inserts a BigQuery job.

If the job exists already, it returns it.

Args:
request (bigquery.BigqueryJobsInsertRequest): An insert job request.
stream (IO[bytes]): A bytes IO object open for reading.
"""
try:
response = self.client.jobs.Insert(request)
upload = None
if stream:
upload = Upload.FromStream(stream, mime_type=UNKNOWN_MIME_TYPE)
response = self.client.jobs.Insert(request, upload=upload)
_LOGGER.info(
"Stated BigQuery job: %s\n "
"bq show -j --format=prettyjson --project_id=%s %s",
Expand Down Expand Up @@ -809,8 +834,9 @@ def get_job(self, project, job_id, location=None):
def perform_load_job(
self,
destination,
files,
job_id,
source_uris=None,
source_stream=None,
schema=None,
write_disposition=None,
create_disposition=None,
Expand All @@ -822,11 +848,23 @@ def perform_load_job(
Returns:
bigquery.JobReference with the information about the job that was started.
"""
if not source_uris and not source_stream:
raise ValueError(
'Either a non-empty list of fully-qualified source URIs must be '
'provided via the source_uris parameter or an open file object must '
'be provided via the source_stream parameter. Got neither.')

if source_uris and source_stream:
raise ValueError(
'Only one of source_uris and source_stream may be specified. '
'Got both.')

return self._insert_load_job(
destination.projectId,
job_id,
destination,
files,
source_uris=source_uris,
source_stream=source_stream,
schema=schema,
create_disposition=create_disposition,
write_disposition=write_disposition,
Expand Down
Loading