diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 693270fad4c31..dd77df1283d80 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -960,7 +960,7 @@ def run_load(self, if not set(allowed_schema_update_options).issuperset( set(schema_update_options)): raise ValueError( - "{0} contains invalid schema update options. " + "{0} contains invalid schema update options." "Please only use one or more of the following options: {1}" .format(schema_update_options, allowed_schema_update_options)) @@ -1350,6 +1350,72 @@ def run_grant_dataset_view_access(self, view_project, view_dataset, view_table, source_project, source_dataset) return source_dataset_resource + def create_empty_dataset(self, dataset_id="", project_id="", + dataset_reference=None): + """ + Create a new empty dataset: + https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/insert + + :param project_id: The name of the project where we want to create + an empty a dataset. Don't need to provide, if projectId in dataset_reference. + :type project_id: str + :param dataset_id: The id of dataset. Don't need to provide, + if datasetId in dataset_reference. + :type dataset_id: str + :param dataset_reference: Dataset reference that could be provided + with request body. More info: + https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource + :type dataset_reference: dict + """ + + if dataset_reference: + _validate_value('dataset_reference', dataset_reference, dict) + else: + dataset_reference = {} + + if "datasetReference" not in dataset_reference: + dataset_reference["datasetReference"] = {} + + if not dataset_reference["datasetReference"].get("datasetId") and not dataset_id: + raise ValueError( + "{} not provided datasetId. Impossible to create dataset") + + dataset_required_params = [(dataset_id, "datasetId", ""), + (project_id, "projectId", self.project_id)] + for param_tuple in dataset_required_params: + param, param_name, param_default = param_tuple + if param_name not in dataset_reference['datasetReference']: + if param_default and not param: + self.log.info("{} was not specified. Will be used default " + "value {}.".format(param_name, + param_default)) + param = param_default + dataset_reference['datasetReference'].update( + {param_name: param}) + elif param: + _api_resource_configs_duplication_check( + param_name, param, + dataset_reference['datasetReference'], 'dataset_reference') + + dataset_id = dataset_reference.get("datasetReference").get("datasetId") + dataset_project_id = dataset_reference.get("datasetReference").get( + "projectId") + + self.log.info('Creating Dataset: %s in project: %s ', dataset_id, + dataset_project_id) + + try: + self.service.datasets().insert( + projectId=dataset_project_id, + body=dataset_reference).execute() + self.log.info('Dataset created successfully: In project %s ' + 'Dataset %s', dataset_project_id, dataset_id) + + except HttpError as err: + raise AirflowException( + 'BigQuery job failed. Error was: {}'.format(err.content) + ) + def delete_dataset(self, project_id, dataset_id): """ Delete a dataset of Big query in your project. @@ -1671,10 +1737,11 @@ def _validate_value(key, value, expected_type): key, expected_type, type(value))) -def _api_resource_configs_duplication_check(key, value, config_dict): +def _api_resource_configs_duplication_check(key, value, config_dict, + config_dict_name='api_resource_configs'): if key in config_dict and value != config_dict[key]: raise ValueError("Values of {param_name} param are duplicated. " - "`api_resource_configs` contained {param_name} param " + "{dict_name} contained {param_name} param " "in `query` config and {param_name} was also provided " "with arg to run_query() method. Please remove duplicates." - .format(param_name=key)) + .format(param_name=key, dict_name=config_dict_name)) diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py index 8eee6a395c304..9386e57c0780f 100644 --- a/airflow/contrib/operators/bigquery_operator.py +++ b/airflow/contrib/operators/bigquery_operator.py @@ -528,12 +528,11 @@ class BigQueryDeleteDatasetOperator(BaseOperator): **Example**: :: - delete_temp_data = BigQueryDeleteDatasetOperator( - dataset_id = 'temp-dataset', - project_id = 'temp-project', - bigquery_conn_id='_my_gcp_conn_', - task_id='Deletetemp', - dag=dag) + delete_temp_data = BigQueryDeleteDatasetOperator(dataset_id = 'temp-dataset', + project_id = 'temp-project', + bigquery_conn_id='_my_gcp_conn_', + task_id='Deletetemp', + dag=dag) """ template_fields = ('dataset_id', 'project_id') @@ -567,3 +566,66 @@ def execute(self, context): project_id=self.project_id, dataset_id=self.dataset_id ) + + +class BigQueryCreateEmptyDatasetOperator(BaseOperator): + """" + This operator is used to create new dataset for your Project in Big query. + https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource + + :param project_id: The name of the project where we want to create the dataset. + Don't need to provide, if projectId in dataset_reference. + :type project_id: str + :param dataset_id: The id of dataset. Don't need to provide, + if datasetId in dataset_reference. + :type dataset_id: str + :param dataset_reference: Dataset reference that could be provided with request body. + More info: + https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource + :type dataset_reference: dict + + **Example**: :: + + create_new_dataset = BigQueryCreateEmptyDatasetOperator( + dataset_id = 'new-dataset', + project_id = 'my-project', + dataset_reference = {"friendlyName": "New Dataset"} + bigquery_conn_id='_my_gcp_conn_', + task_id='newDatasetCreator', + dag=dag) + + """ + + template_fields = ('dataset_id', 'project_id') + ui_color = '#f0eee4' + + @apply_defaults + def __init__(self, + dataset_id, + project_id=None, + dataset_reference=None, + bigquery_conn_id='bigquery_default', + delegate_to=None, + *args, **kwargs): + self.dataset_id = dataset_id + self.project_id = project_id + self.bigquery_conn_id = bigquery_conn_id + self.dataset_reference = dataset_reference if dataset_reference else {} + self.delegate_to = delegate_to + + self.log.info('Dataset id: %s', self.dataset_id) + self.log.info('Project id: %s', self.project_id) + + super(BigQueryCreateEmptyDatasetOperator, self).__init__(*args, **kwargs) + + def execute(self, context): + bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, + delegate_to=self.delegate_to) + + conn = bq_hook.get_conn() + cursor = conn.cursor() + + cursor.create_empty_dataset( + project_id=self.project_id, + dataset_id=self.dataset_id, + dataset_reference=self.dataset_reference) diff --git a/docs/code.rst b/docs/code.rst index fcabca000097a..e5a53193e62f7 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -120,6 +120,7 @@ Operators .. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperator .. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateExternalTableOperator .. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryDeleteDatasetOperator +.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyDatasetOperator .. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryOperator .. autoclass:: airflow.contrib.operators.bigquery_table_delete_operator.BigQueryTableDeleteOperator .. autoclass:: airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperator diff --git a/docs/integration.rst b/docs/integration.rst index 4c513bf26db1c..522d4bbd446ab 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -350,6 +350,7 @@ BigQuery Operators - :ref:`BigQueryCreateEmptyTableOperator` : Creates a new, empty table in the specified BigQuery dataset optionally with schema. - :ref:`BigQueryCreateExternalTableOperator` : Creates a new, external table in the dataset with the data in Google Cloud Storage. - :ref:`BigQueryDeleteDatasetOperator` : Deletes an existing BigQuery dataset. +- :ref:`BigQueryCreateEmptyDatasetOperator` : Creates an empty BigQuery dataset. - :ref:`BigQueryOperator` : Executes BigQuery SQL queries in a specific BigQuery database. - :ref:`BigQueryToBigQueryOperator` : Copy a BigQuery table to another BigQuery table. - :ref:`BigQueryToCloudStorageOperator` : Transfers a BigQuery table to a Google Cloud Storage bucket @@ -404,6 +405,13 @@ BigQueryDeleteDatasetOperator .. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryDeleteDatasetOperator +.. _BigQueryCreateEmptyDatasetOperator: + +BigQueryCreateEmptyDatasetOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyDatasetOperator + .. _BigQueryOperator: BigQueryOperator diff --git a/tests/contrib/hooks/test_bigquery_hook.py b/tests/contrib/hooks/test_bigquery_hook.py index 0006b0c616b87..84fe84043e582 100644 --- a/tests/contrib/hooks/test_bigquery_hook.py +++ b/tests/contrib/hooks/test_bigquery_hook.py @@ -337,8 +337,31 @@ def run_with_config(config): mocked_rwc.assert_called_once() -class TestTimePartitioningInRunJob(unittest.TestCase): +class TestDatasetsOperations(unittest.TestCase): + + @mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration') + def test_create_empty_dataset_no_dataset_id_err(self, + run_with_configuration): + + with self.assertRaises(ValueError): + hook.BigQueryBaseCursor( + mock.Mock(), "test_create_empty_dataset").create_empty_dataset( + dataset_id="", project_id="") + @mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration') + def test_create_empty_dataset_duplicates_call_err(self, + run_with_configuration): + with self.assertRaises(ValueError): + hook.BigQueryBaseCursor( + mock.Mock(), "test_create_empty_dataset").create_empty_dataset( + dataset_id="", project_id="project_test", + dataset_reference={ + "datasetReference": + {"datasetId": "test_dataset", + "projectId": "project_test2"}}) + + +class TestTimePartitioningInRunJob(unittest.TestCase): @mock.patch("airflow.contrib.hooks.bigquery_hook.LoggingMixin") @mock.patch("airflow.contrib.hooks.bigquery_hook.time") @mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration') diff --git a/tests/contrib/operators/test_bigquery_operator.py b/tests/contrib/operators/test_bigquery_operator.py index 7c76ab73fec81..9ce3b478d3338 100644 --- a/tests/contrib/operators/test_bigquery_operator.py +++ b/tests/contrib/operators/test_bigquery_operator.py @@ -22,8 +22,8 @@ from airflow.contrib.operators.bigquery_operator import \ BigQueryCreateExternalTableOperator, \ - BigQueryOperator, \ - BigQueryCreateEmptyTableOperator, BigQueryDeleteDatasetOperator + BigQueryOperator, BigQueryCreateEmptyTableOperator, \ + BigQueryDeleteDatasetOperator, BigQueryCreateEmptyDatasetOperator try: from unittest import mock @@ -136,3 +136,24 @@ def test_execute(self, mock_hook): dataset_id=TEST_DATASET, project_id=TEST_PROJECT_ID ) + + +class BigQueryCreateEmptyDatasetOperatorTest(unittest.TestCase): + @mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook') + def test_execute(self, mock_hook): + operator = BigQueryCreateEmptyDatasetOperator( + task_id=TASK_ID, + dataset_id=TEST_DATASET, + project_id=TEST_PROJECT_ID + ) + + operator.execute(None) + mock_hook.return_value \ + .get_conn() \ + .cursor() \ + .create_empty_dataset \ + .assert_called_once_with( + dataset_id=TEST_DATASET, + project_id=TEST_PROJECT_ID, + dataset_reference={} + )