Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 71 additions & 4 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ def run_load(self,
if not set(allowed_schema_update_options).issuperset(
set(schema_update_options)):
raise ValueError(
"{0} contains invalid schema update options. "
"{0} contains invalid schema update options."
"Please only use one or more of the following options: {1}"
.format(schema_update_options, allowed_schema_update_options))

Expand Down Expand Up @@ -1350,6 +1350,72 @@ def run_grant_dataset_view_access(self,
view_project, view_dataset, view_table, source_project, source_dataset)
return source_dataset_resource

def create_empty_dataset(self, dataset_id="", project_id="",
dataset_reference=None):
"""
Create a new empty dataset:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/insert

:param project_id: The name of the project where we want to create
an empty a dataset. Don't need to provide, if projectId in dataset_reference.
:type project_id: str
:param dataset_id: The id of dataset. Don't need to provide,
if datasetId in dataset_reference.
:type dataset_id: str
:param dataset_reference: Dataset reference that could be provided
with request body. More info:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
:type dataset_reference: dict
"""

if dataset_reference:
_validate_value('dataset_reference', dataset_reference, dict)
else:
dataset_reference = {}

if "datasetReference" not in dataset_reference:
dataset_reference["datasetReference"] = {}

if not dataset_reference["datasetReference"].get("datasetId") and not dataset_id:
raise ValueError(
"{} not provided datasetId. Impossible to create dataset")

dataset_required_params = [(dataset_id, "datasetId", ""),
(project_id, "projectId", self.project_id)]
for param_tuple in dataset_required_params:
param, param_name, param_default = param_tuple
if param_name not in dataset_reference['datasetReference']:
if param_default and not param:
self.log.info("{} was not specified. Will be used default "
"value {}.".format(param_name,
param_default))
param = param_default
dataset_reference['datasetReference'].update(
{param_name: param})
elif param:
_api_resource_configs_duplication_check(
param_name, param,
dataset_reference['datasetReference'], 'dataset_reference')

dataset_id = dataset_reference.get("datasetReference").get("datasetId")
dataset_project_id = dataset_reference.get("datasetReference").get(
"projectId")

self.log.info('Creating Dataset: %s in project: %s ', dataset_id,
dataset_project_id)

try:
self.service.datasets().insert(
projectId=dataset_project_id,
body=dataset_reference).execute()
self.log.info('Dataset created successfully: In project %s '
'Dataset %s', dataset_project_id, dataset_id)

except HttpError as err:
raise AirflowException(
'BigQuery job failed. Error was: {}'.format(err.content)
)

def delete_dataset(self, project_id, dataset_id):
"""
Delete a dataset of Big query in your project.
Expand Down Expand Up @@ -1671,10 +1737,11 @@ def _validate_value(key, value, expected_type):
key, expected_type, type(value)))


def _api_resource_configs_duplication_check(key, value, config_dict):
def _api_resource_configs_duplication_check(key, value, config_dict,
config_dict_name='api_resource_configs'):
if key in config_dict and value != config_dict[key]:
raise ValueError("Values of {param_name} param are duplicated. "
"`api_resource_configs` contained {param_name} param "
"{dict_name} contained {param_name} param "
"in `query` config and {param_name} was also provided "
"with arg to run_query() method. Please remove duplicates."
.format(param_name=key))
.format(param_name=key, dict_name=config_dict_name))
74 changes: 68 additions & 6 deletions airflow/contrib/operators/bigquery_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,12 +528,11 @@ class BigQueryDeleteDatasetOperator(BaseOperator):

**Example**: ::

delete_temp_data = BigQueryDeleteDatasetOperator(
dataset_id = 'temp-dataset',
project_id = 'temp-project',
bigquery_conn_id='_my_gcp_conn_',
task_id='Deletetemp',
dag=dag)
delete_temp_data = BigQueryDeleteDatasetOperator(dataset_id = 'temp-dataset',
project_id = 'temp-project',
bigquery_conn_id='_my_gcp_conn_',
task_id='Deletetemp',
dag=dag)
"""

template_fields = ('dataset_id', 'project_id')
Expand Down Expand Up @@ -567,3 +566,66 @@ def execute(self, context):
project_id=self.project_id,
dataset_id=self.dataset_id
)


class BigQueryCreateEmptyDatasetOperator(BaseOperator):
""""
This operator is used to create new dataset for your Project in Big query.
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource

:param project_id: The name of the project where we want to create the dataset.
Don't need to provide, if projectId in dataset_reference.
:type project_id: str
:param dataset_id: The id of dataset. Don't need to provide,
if datasetId in dataset_reference.
:type dataset_id: str
:param dataset_reference: Dataset reference that could be provided with request body.
More info:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
:type dataset_reference: dict

**Example**: ::

create_new_dataset = BigQueryCreateEmptyDatasetOperator(
dataset_id = 'new-dataset',
project_id = 'my-project',
dataset_reference = {"friendlyName": "New Dataset"}
bigquery_conn_id='_my_gcp_conn_',
task_id='newDatasetCreator',
dag=dag)

"""

template_fields = ('dataset_id', 'project_id')
ui_color = '#f0eee4'

@apply_defaults
def __init__(self,
dataset_id,
project_id=None,
dataset_reference=None,
bigquery_conn_id='bigquery_default',
delegate_to=None,
*args, **kwargs):
self.dataset_id = dataset_id
self.project_id = project_id
self.bigquery_conn_id = bigquery_conn_id
self.dataset_reference = dataset_reference if dataset_reference else {}
self.delegate_to = delegate_to

self.log.info('Dataset id: %s', self.dataset_id)
self.log.info('Project id: %s', self.project_id)

super(BigQueryCreateEmptyDatasetOperator, self).__init__(*args, **kwargs)

def execute(self, context):
bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)

conn = bq_hook.get_conn()
cursor = conn.cursor()

cursor.create_empty_dataset(
project_id=self.project_id,
dataset_id=self.dataset_id,
dataset_reference=self.dataset_reference)
1 change: 1 addition & 0 deletions docs/code.rst
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ Operators
.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperator
.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateExternalTableOperator
.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryDeleteDatasetOperator
.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyDatasetOperator
.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryOperator
.. autoclass:: airflow.contrib.operators.bigquery_table_delete_operator.BigQueryTableDeleteOperator
.. autoclass:: airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperator
Expand Down
8 changes: 8 additions & 0 deletions docs/integration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ BigQuery Operators
- :ref:`BigQueryCreateEmptyTableOperator` : Creates a new, empty table in the specified BigQuery dataset optionally with schema.
- :ref:`BigQueryCreateExternalTableOperator` : Creates a new, external table in the dataset with the data in Google Cloud Storage.
- :ref:`BigQueryDeleteDatasetOperator` : Deletes an existing BigQuery dataset.
- :ref:`BigQueryCreateEmptyDatasetOperator` : Creates an empty BigQuery dataset.
- :ref:`BigQueryOperator` : Executes BigQuery SQL queries in a specific BigQuery database.
- :ref:`BigQueryToBigQueryOperator` : Copy a BigQuery table to another BigQuery table.
- :ref:`BigQueryToCloudStorageOperator` : Transfers a BigQuery table to a Google Cloud Storage bucket
Expand Down Expand Up @@ -404,6 +405,13 @@ BigQueryDeleteDatasetOperator

.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryDeleteDatasetOperator

.. _BigQueryCreateEmptyDatasetOperator:

BigQueryCreateEmptyDatasetOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyDatasetOperator

.. _BigQueryOperator:

BigQueryOperator
Expand Down
25 changes: 24 additions & 1 deletion tests/contrib/hooks/test_bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,31 @@ def run_with_config(config):
mocked_rwc.assert_called_once()


class TestTimePartitioningInRunJob(unittest.TestCase):
class TestDatasetsOperations(unittest.TestCase):

@mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')
def test_create_empty_dataset_no_dataset_id_err(self,
run_with_configuration):

with self.assertRaises(ValueError):
hook.BigQueryBaseCursor(
mock.Mock(), "test_create_empty_dataset").create_empty_dataset(
dataset_id="", project_id="")

@mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')
def test_create_empty_dataset_duplicates_call_err(self,
run_with_configuration):
with self.assertRaises(ValueError):
hook.BigQueryBaseCursor(
mock.Mock(), "test_create_empty_dataset").create_empty_dataset(
dataset_id="", project_id="project_test",
dataset_reference={
"datasetReference":
{"datasetId": "test_dataset",
"projectId": "project_test2"}})


class TestTimePartitioningInRunJob(unittest.TestCase):
@mock.patch("airflow.contrib.hooks.bigquery_hook.LoggingMixin")
@mock.patch("airflow.contrib.hooks.bigquery_hook.time")
@mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')
Expand Down
25 changes: 23 additions & 2 deletions tests/contrib/operators/test_bigquery_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

from airflow.contrib.operators.bigquery_operator import \
BigQueryCreateExternalTableOperator, \
BigQueryOperator, \
BigQueryCreateEmptyTableOperator, BigQueryDeleteDatasetOperator
BigQueryOperator, BigQueryCreateEmptyTableOperator, \
BigQueryDeleteDatasetOperator, BigQueryCreateEmptyDatasetOperator

try:
from unittest import mock
Expand Down Expand Up @@ -136,3 +136,24 @@ def test_execute(self, mock_hook):
dataset_id=TEST_DATASET,
project_id=TEST_PROJECT_ID
)


class BigQueryCreateEmptyDatasetOperatorTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook')
def test_execute(self, mock_hook):
operator = BigQueryCreateEmptyDatasetOperator(
task_id=TASK_ID,
dataset_id=TEST_DATASET,
project_id=TEST_PROJECT_ID
)

operator.execute(None)
mock_hook.return_value \
.get_conn() \
.cursor() \
.create_empty_dataset \
.assert_called_once_with(
dataset_id=TEST_DATASET,
project_id=TEST_PROJECT_ID,
dataset_reference={}
)