Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,433 changes: 363 additions & 1,070 deletions airflow/contrib/hooks/bigquery_hook.py

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions airflow/contrib/operators/bigquery_check_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BigQueryCheckOperator(CheckOperator):
def __init__(self,
sql,
bigquery_conn_id='bigquery_default',
use_legacy_sql=True,
use_legacy_sql=False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't change this behaviour

*args, **kwargs):
super(BigQueryCheckOperator, self).__init__(sql=sql, *args, **kwargs)
self.bigquery_conn_id = bigquery_conn_id
Expand Down Expand Up @@ -92,7 +92,7 @@ def __init__(self, sql,
pass_value,
tolerance=None,
bigquery_conn_id='bigquery_default',
use_legacy_sql=True,
use_legacy_sql=False,
*args, **kwargs):
super(BigQueryValueCheckOperator, self).__init__(
sql=sql, pass_value=pass_value, tolerance=tolerance,
Expand Down Expand Up @@ -132,7 +132,7 @@ class BigQueryIntervalCheckOperator(IntervalCheckOperator):
@apply_defaults
def __init__(self, table, metrics_thresholds, date_filter_column='ds',
days_back=-7, bigquery_conn_id='bigquery_default',
use_legacy_sql=True, *args, **kwargs):
use_legacy_sql=False, *args, **kwargs):
super(BigQueryIntervalCheckOperator, self).__init__(
table=table, metrics_thresholds=metrics_thresholds,
date_filter_column=date_filter_column, days_back=days_back,
Expand Down
25 changes: 7 additions & 18 deletions airflow/contrib/operators/bigquery_get_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,10 @@ def execute(self, context):
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)

conn = hook.get_conn()
cursor = conn.cursor()
response = cursor.get_tabledata(dataset_id=self.dataset_id,
table_id=self.table_id,
max_results=self.max_results,
selected_fields=self.selected_fields)

self.log.info('Total Extracted rows: %s', response['totalRows'])
rows = response['rows']

table_data = []
for dict_row in rows:
single_row = []
for fields in dict_row['f']:
single_row.append(fields['v'])
table_data.append(single_row)

return table_data
rows = hook.get_tabledata(dataset_id=self.dataset_id,
table_id=self.table_id,
max_results=self.max_results,
selected_fields=self.selected_fields)
rows = list(rows)
self.log.info('Total Extracted rows: %s', len(rows))
return rows
59 changes: 19 additions & 40 deletions airflow/contrib/operators/bigquery_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def __init__(self,
bigquery_conn_id='bigquery_default',
delegate_to=None,
udf_config=None,
use_legacy_sql=True,
use_legacy_sql=False,
maximum_billing_tier=None,
maximum_bytes_billed=None,
create_disposition='CREATE_IF_NEEDED',
Expand Down Expand Up @@ -147,25 +147,21 @@ def __init__(self,
self.schema_update_options = schema_update_options
self.query_params = query_params
self.labels = labels
self.bq_cursor = None
self.priority = priority
self.time_partitioning = time_partitioning
self.api_resource_configs = api_resource_configs
self.cluster_fields = cluster_fields
self.location = location

def execute(self, context):
if self.bq_cursor is None:
self.log.info('Executing: %s', self.sql)
hook = BigQueryHook(
bigquery_conn_id=self.bigquery_conn_id,
use_legacy_sql=self.use_legacy_sql,
delegate_to=self.delegate_to,
location=self.location,
)
conn = hook.get_conn()
self.bq_cursor = conn.cursor()
self.bq_cursor.run_query(
self.log.info('Executing: %s', self.sql)
hook = BigQueryHook(
bigquery_conn_id=self.bigquery_conn_id,
use_legacy_sql=self.use_legacy_sql,
delegate_to=self.delegate_to,
location=self.location,
)
hook.run_query(
sql=self.sql,
destination_dataset_table=self.destination_dataset_table,
write_disposition=self.write_disposition,
Expand All @@ -184,12 +180,6 @@ def execute(self, context):
cluster_fields=self.cluster_fields,
)

def on_kill(self):
super(BigQueryOperator, self).on_kill()
if self.bq_cursor is not None:
self.log.info('Cancelling running query')
self.bq_cursor.cancel_query()


class BigQueryCreateEmptyTableOperator(BaseOperator):
"""
Expand Down Expand Up @@ -328,10 +318,7 @@ def execute(self, context):
else:
schema_fields = self.schema_fields

conn = bq_hook.get_conn()
cursor = conn.cursor()

cursor.create_empty_table(
bq_hook.create_empty_table(
project_id=self.project_id,
dataset_id=self.dataset_id,
table_id=self.table_id,
Expand Down Expand Up @@ -408,8 +395,8 @@ class BigQueryCreateExternalTableOperator(BaseOperator):
work, the service account making the request must have domain-wide
delegation enabled.
:type delegate_to: str
:param src_fmt_configs: configure optional fields specific to the source format
:type src_fmt_configs: dict
:param external_config_options: configure optional fields specific to the source format
:type external_config_options: dict
:param labels: a dictionary containing labels for the table, passed to BigQuery
:type labels: dict
"""
Expand All @@ -435,7 +422,7 @@ def __init__(self,
bigquery_conn_id='bigquery_default',
google_cloud_storage_conn_id='google_cloud_default',
delegate_to=None,
src_fmt_configs={},
external_config_options=None,
labels=None,
*args, **kwargs):

Expand All @@ -462,7 +449,7 @@ def __init__(self,
self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
self.delegate_to = delegate_to

self.src_fmt_configs = src_fmt_configs
self.external_config_options = external_config_options
self.labels = labels

def execute(self, context):
Expand All @@ -482,10 +469,8 @@ def execute(self, context):

source_uris = ['gs://{}/{}'.format(self.bucket, source_object)
for source_object in self.source_objects]
conn = bq_hook.get_conn()
cursor = conn.cursor()

cursor.create_external_table(
bq_hook.create_external_table(
external_project_dataset_table=self.destination_project_dataset_table,
schema_fields=schema_fields,
source_uris=source_uris,
Expand All @@ -497,8 +482,8 @@ def execute(self, context):
quote_character=self.quote_character,
allow_quoted_newlines=self.allow_quoted_newlines,
allow_jagged_rows=self.allow_jagged_rows,
src_fmt_configs=self.src_fmt_configs,
labels=self.labels
external_config_options=self.external_config_options,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will have to be for 2.0 as it will break things. Also, it needs to be backward-compatible to make updation smooth from 1.X to 2.0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't yet reviewed the entire PR but schemed through it. Will try to find some time to look at it more thoroughly

labels=self.labels,
)


Expand Down Expand Up @@ -545,10 +530,7 @@ def execute(self, context):
bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)

conn = bq_hook.get_conn()
cursor = conn.cursor()

cursor.delete_dataset(
bq_hook.delete_dataset(
project_id=self.project_id,
dataset_id=self.dataset_id
)
Expand Down Expand Up @@ -608,10 +590,7 @@ def execute(self, context):
bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)

conn = bq_hook.get_conn()
cursor = conn.cursor()

cursor.create_empty_dataset(
bq_hook.create_empty_dataset(
project_id=self.project_id,
dataset_id=self.dataset_id,
dataset_reference=self.dataset_reference)
4 changes: 1 addition & 3 deletions airflow/contrib/operators/bigquery_table_delete_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,4 @@ def execute(self, context):
self.log.info('Deleting: %s', self.deletion_dataset_table)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = hook.get_conn()
cursor = conn.cursor()
cursor.run_table_delete(self.deletion_dataset_table, self.ignore_if_missing)
hook.run_table_delete(self.deletion_dataset_table, self.ignore_if_missing)
4 changes: 1 addition & 3 deletions airflow/contrib/operators/bigquery_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ def execute(self, context):
)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = hook.get_conn()
cursor = conn.cursor()
cursor.run_copy(
hook.run_copy(
self.source_project_dataset_tables,
self.destination_project_dataset_table,
self.write_disposition,
Expand Down
4 changes: 1 addition & 3 deletions airflow/contrib/operators/bigquery_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ def execute(self, context):
self.destination_cloud_storage_uris)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = hook.get_conn()
cursor = conn.cursor()
cursor.run_extract(
hook.run_extract(
self.source_project_dataset_table,
self.destination_cloud_storage_uris,
self.compression,
Expand Down
14 changes: 5 additions & 9 deletions airflow/contrib/operators/gcs_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def __init__(self,
google_cloud_storage_conn_id='google_cloud_default',
delegate_to=None,
schema_update_options=(),
src_fmt_configs=None,
external_config_options=None,
external_table=False,
time_partitioning=None,
cluster_fields=None,
Expand All @@ -158,10 +158,6 @@ def __init__(self,
super(GoogleCloudStorageToBigQueryOperator, self).__init__(*args, **kwargs)

# GCS config
if src_fmt_configs is None:
src_fmt_configs = {}
if time_partitioning is None:
time_partitioning = {}
self.bucket = bucket
self.source_objects = source_objects
self.schema_object = schema_object
Expand All @@ -188,7 +184,7 @@ def __init__(self,
self.delegate_to = delegate_to

self.schema_update_options = schema_update_options
self.src_fmt_configs = src_fmt_configs
self.external_config_options, = external_config_options,
self.time_partitioning = time_partitioning
self.cluster_fields = cluster_fields
self.autodetect = autodetect
Expand Down Expand Up @@ -220,7 +216,7 @@ def execute(self, context):
cursor = conn.cursor()

if self.external_table:
cursor.create_external_table(
bq_hook.create_external_table(
external_project_dataset_table=self.destination_project_dataset_table,
schema_fields=schema_fields,
source_uris=source_uris,
Expand All @@ -233,10 +229,10 @@ def execute(self, context):
ignore_unknown_values=self.ignore_unknown_values,
allow_quoted_newlines=self.allow_quoted_newlines,
allow_jagged_rows=self.allow_jagged_rows,
src_fmt_configs=self.src_fmt_configs
external_config_options=self.external_config_options,
)
else:
cursor.run_load(
bq_hook.run_load(
destination_project_dataset_table=self.destination_project_dataset_table,
schema_fields=schema_fields,
source_uris=source_uris,
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def write_version(filename=os.path.join(*['airflow',
'google-auth-httplib2>=0.0.1',
'google-cloud-container>=0.1.1',
'google-cloud-bigtable==0.31.0',
'google-cloud-bigquery>=1.8.1',
'google-cloud-spanner>=1.7.1',
'grpcio-gcp>=0.2.2',
'PyOpenSSL',
Expand Down
Loading