From 86889136765510005c14fd3d04958adde5bb9363 Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Fri, 12 May 2023 16:55:05 +0530 Subject: [PATCH 1/4] Optimize deferred mode for BigQueryInsertJobOperator --- .../google/cloud/operators/bigquery.py | 21 +++++++------- .../google/cloud/operators/test_bigquery.py | 28 +++++++++++++++++++ 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 10aafd5d662bb..e21ec601408a3 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -2709,16 +2709,17 @@ def execute(self, context: Any): self._handle_job_error(job) return self.job_id - self.defer( - timeout=self.execution_timeout, - trigger=BigQueryInsertJobTrigger( - conn_id=self.gcp_conn_id, - job_id=self.job_id, - project_id=self.project_id, - poll_interval=self.poll_interval, - ), - method_name="execute_complete", - ) + elif self.deferrable and job.running(): + self.defer( + timeout=self.execution_timeout, + trigger=BigQueryInsertJobTrigger( + conn_id=self.gcp_conn_id, + job_id=self.job_id, + project_id=self.project_id, + poll_interval=self.poll_interval, + ), + method_name="execute_complete", + ) def execute_complete(self, context: Context, event: dict[str, Any]): """ diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py index b0547cec3f18c..6c416781bfceb 100644 --- a/tests/providers/google/cloud/operators/test_bigquery.py +++ b/tests/providers/google/cloud/operators/test_bigquery.py @@ -1310,6 +1310,34 @@ def test_execute_no_force_rerun(self, mock_hook): with pytest.raises(AirflowException): op.execute(context=MagicMock()) + @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator.defer") + @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") + def test_bigquery_insert_job_operator_async_finish_before_deferred(self, mock_hook, mock_defer): + job_id = "123456" + hash_ = "hash" + real_job_id = f"{job_id}_{hash_}" + + configuration = { + "query": { + "query": "SELECT * FROM any", + "useLegacySql": False, + } + } + mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False) + mock_hook.return_value.insert_job.return_value.running.return_value = False + + op = BigQueryInsertJobOperator( + task_id="insert_query_job", + configuration=configuration, + location=TEST_DATASET_LOCATION, + job_id=job_id, + project_id=TEST_GCP_PROJECT_ID, + deferrable=True, + ) + + op.execute(MagicMock()) + assert not mock_defer.called + @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") def test_bigquery_insert_job_operator_async(self, mock_hook, create_task_instance_of_operator): """ From 691d9964dade6c99a903bf7617f0f74e49c9001f Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Sun, 14 May 2023 17:36:21 +0530 Subject: [PATCH 2/4] Apply review suggestion --- .../google/cloud/operators/bigquery.py | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index e21ec601408a3..d75d61cd7f579 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -2709,17 +2709,18 @@ def execute(self, context: Any): self._handle_job_error(job) return self.job_id - elif self.deferrable and job.running(): - self.defer( - timeout=self.execution_timeout, - trigger=BigQueryInsertJobTrigger( - conn_id=self.gcp_conn_id, - job_id=self.job_id, - project_id=self.project_id, - poll_interval=self.poll_interval, - ), - method_name="execute_complete", - ) + else: + if job.running(): + self.defer( + timeout=self.execution_timeout, + trigger=BigQueryInsertJobTrigger( + conn_id=self.gcp_conn_id, + job_id=self.job_id, + project_id=self.project_id, + poll_interval=self.poll_interval, + ), + method_name="execute_complete", + ) def execute_complete(self, context: Context, event: dict[str, Any]): """ From 894738f59d5a0b9ca8d714c0ced640d528a348fe Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Mon, 15 May 2023 19:56:09 +0530 Subject: [PATCH 3/4] Add job state to the log --- airflow/providers/google/cloud/operators/bigquery.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index d75d61cd7f579..467e376447335 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -2721,6 +2721,7 @@ def execute(self, context: Any): ), method_name="execute_complete", ) + self.log.info("Current state of job %s is %s", job.job_id, job.state) def execute_complete(self, context: Context, event: dict[str, Any]): """ From 06d6dd9c2215ed0947e2061740e96396502a7518 Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Tue, 23 May 2023 22:27:43 +0530 Subject: [PATCH 4/4] capture and assert job state log in test --- tests/providers/google/cloud/operators/test_bigquery.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py index 6c416781bfceb..4b6d31839f144 100644 --- a/tests/providers/google/cloud/operators/test_bigquery.py +++ b/tests/providers/google/cloud/operators/test_bigquery.py @@ -1312,7 +1312,7 @@ def test_execute_no_force_rerun(self, mock_hook): @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator.defer") @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") - def test_bigquery_insert_job_operator_async_finish_before_deferred(self, mock_hook, mock_defer): + def test_bigquery_insert_job_operator_async_finish_before_deferred(self, mock_hook, mock_defer, caplog): job_id = "123456" hash_ = "hash" real_job_id = f"{job_id}_{hash_}" @@ -1337,6 +1337,7 @@ def test_bigquery_insert_job_operator_async_finish_before_deferred(self, mock_ho op.execute(MagicMock()) assert not mock_defer.called + assert "Current state of job" in caplog.text @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") def test_bigquery_insert_job_operator_async(self, mock_hook, create_task_instance_of_operator):