From 65858dae34d537e6416a3fd94430771136c300fb Mon Sep 17 00:00:00 2001 From: Shahar Epstein Date: Fri, 19 May 2023 23:29:25 +0300 Subject: [PATCH 1/3] Fix `project_id` in `generate_query()` method of `BigQueryGetDataOperator` and clarify docstring --- .../google/cloud/operators/bigquery.py | 9 +++++--- .../google/cloud/operators/test_bigquery.py | 23 +++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 8cf3489ccf2ae..ba78d64206ffd 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -802,7 +802,7 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator): :param dataset_id: The dataset ID of the requested table. (templated) :param table_id: The table ID of the requested table. (templated) :param project_id: (Optional) The name of the project where the data - will be returned from. (templated) + will be returned from. If None, it will be derived from the hook's project ID. (templated) :param max_results: The maximum number of records (rows) to be fetched from the table. (templated) :param selected_fields: List of fields to return (comma-separated). If @@ -893,7 +893,10 @@ def generate_query(self) -> str: query += self.selected_fields else: query += "*" - query += f" from `{self.project_id}.{self.dataset_id}.{self.table_id}` limit {self.max_results}" + query += ( + f" from `{self.project_id + '.' if self.project_id else ''}{self.dataset_id}" + f".{self.table_id}` limit {self.max_results}" + ) return query def execute(self, context: Context): @@ -906,7 +909,7 @@ def execute(self, context: Context): if not self.deferrable: self.log.info( "Fetching Data from %s.%s.%s max results: %s", - self.project_id, + self.project_id or hook.project_id, self.dataset_id, self.table_id, self.max_results, diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py index 1e871678a9c40..2a358a39dd975 100644 --- a/tests/providers/google/cloud/operators/test_bigquery.py +++ b/tests/providers/google/cloud/operators/test_bigquery.py @@ -814,6 +814,29 @@ def test_execute(self, mock_hook, as_dict): location=TEST_DATASET_LOCATION, ) + @pytest.mark.parametrize( + "project_id,result", + [ + [ + TEST_GCP_PROJECT_ID, + f"select * from `{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}` limit 100", + ], + [None, f"select * from `{TEST_DATASET}.{TEST_TABLE_ID}` limit 100"], + ], + ) + @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") + def test_generate_query(self, mock_hook, project_id: str, result: str): + operator = BigQueryGetDataOperator( + gcp_conn_id=GCP_CONN_ID, + task_id=TASK_ID, + dataset_id=TEST_DATASET, + table_id=TEST_TABLE_ID, + project_id=project_id, + max_results=100, + use_legacy_sql=False, + ) + assert operator.generate_query() == result + @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") def test_bigquery_get_data_operator_async_with_selected_fields( self, mock_hook, create_task_instance_of_operator From 9b5ce56952d2b956f9d5abc384ada2128297fb53 Mon Sep 17 00:00:00 2001 From: Shahar Epstein Date: Fri, 19 May 2023 23:29:47 +0300 Subject: [PATCH 2/3] Add `as_dict` param to `BigQueryGetDataTrigger`'s `serialize()` method --- airflow/providers/google/cloud/triggers/bigquery.py | 1 + tests/providers/google/cloud/triggers/test_bigquery.py | 1 + 2 files changed, 2 insertions(+) diff --git a/airflow/providers/google/cloud/triggers/bigquery.py b/airflow/providers/google/cloud/triggers/bigquery.py index 1da7f87f90259..c7b17af2ed5f5 100644 --- a/airflow/providers/google/cloud/triggers/bigquery.py +++ b/airflow/providers/google/cloud/triggers/bigquery.py @@ -187,6 +187,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "project_id": self.project_id, "table_id": self.table_id, "poll_interval": self.poll_interval, + "as_dict": self.as_dict, }, ) diff --git a/tests/providers/google/cloud/triggers/test_bigquery.py b/tests/providers/google/cloud/triggers/test_bigquery.py index 410aa14b1a7ff..cb997259bd680 100644 --- a/tests/providers/google/cloud/triggers/test_bigquery.py +++ b/tests/providers/google/cloud/triggers/test_bigquery.py @@ -224,6 +224,7 @@ def test_bigquery_get_data_trigger_serialization(self, get_data_trigger): classpath, kwargs = get_data_trigger.serialize() assert classpath == "airflow.providers.google.cloud.triggers.bigquery.BigQueryGetDataTrigger" assert kwargs == { + "as_dict": False, "conn_id": TEST_CONN_ID, "job_id": TEST_JOB_ID, "dataset_id": TEST_DATASET_ID, From fbfc29ce3edef8c6bf64bccfc797bafab07b1442 Mon Sep 17 00:00:00 2001 From: Shahar Epstein Date: Mon, 22 May 2023 07:33:49 +0300 Subject: [PATCH 3/3] Simplify `generate_query` method --- .../google/cloud/operators/bigquery.py | 7 ++-- .../google/cloud/operators/test_bigquery.py | 35 ++++++++++++------- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index ba78d64206ffd..10aafd5d662bb 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -872,7 +872,7 @@ def _submit_job( hook: BigQueryHook, job_id: str, ) -> BigQueryJob: - get_query = self.generate_query() + get_query = self.generate_query(hook=hook) configuration = {"query": {"query": get_query, "useLegacySql": self.use_legacy_sql}} """Submit a new job and get the job id for polling the status using Triggerer.""" return hook.insert_job( @@ -883,10 +883,11 @@ def _submit_job( nowait=True, ) - def generate_query(self) -> str: + def generate_query(self, hook: BigQueryHook) -> str: """ Generate a select query if selected fields are given or with * for the given dataset and table id + :param hook BigQuery Hook """ query = "select " if self.selected_fields: @@ -894,7 +895,7 @@ def generate_query(self) -> str: else: query += "*" query += ( - f" from `{self.project_id + '.' if self.project_id else ''}{self.dataset_id}" + f" from `{self.project_id or hook.project_id}.{self.dataset_id}" f".{self.table_id}` limit {self.max_results}" ) return query diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py index 2a358a39dd975..b0547cec3f18c 100644 --- a/tests/providers/google/cloud/operators/test_bigquery.py +++ b/tests/providers/google/cloud/operators/test_bigquery.py @@ -814,28 +814,37 @@ def test_execute(self, mock_hook, as_dict): location=TEST_DATASET_LOCATION, ) - @pytest.mark.parametrize( - "project_id,result", - [ - [ - TEST_GCP_PROJECT_ID, - f"select * from `{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}` limit 100", - ], - [None, f"select * from `{TEST_DATASET}.{TEST_TABLE_ID}` limit 100"], - ], - ) @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") - def test_generate_query(self, mock_hook, project_id: str, result: str): + def test_generate_query__with_project_id(self, mock_hook): operator = BigQueryGetDataOperator( gcp_conn_id=GCP_CONN_ID, task_id=TASK_ID, dataset_id=TEST_DATASET, table_id=TEST_TABLE_ID, - project_id=project_id, + project_id=TEST_GCP_PROJECT_ID, max_results=100, use_legacy_sql=False, ) - assert operator.generate_query() == result + assert ( + operator.generate_query(hook=mock_hook) == f"select * from `{TEST_GCP_PROJECT_ID}." + f"{TEST_DATASET}.{TEST_TABLE_ID}` limit 100" + ) + + @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") + def test_generate_query__without_project_id(self, mock_hook): + hook_project_id = mock_hook.project_id + operator = BigQueryGetDataOperator( + gcp_conn_id=GCP_CONN_ID, + task_id=TASK_ID, + dataset_id=TEST_DATASET, + table_id=TEST_TABLE_ID, + max_results=100, + use_legacy_sql=False, + ) + assert ( + operator.generate_query(hook=mock_hook) == f"select * from `{hook_project_id}." + f"{TEST_DATASET}.{TEST_TABLE_ID}` limit 100" + ) @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") def test_bigquery_get_data_operator_async_with_selected_fields(