diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 10aafd5d662bb..467e376447335 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -2709,16 +2709,19 @@ def execute(self, context: Any): self._handle_job_error(job) return self.job_id - self.defer( - timeout=self.execution_timeout, - trigger=BigQueryInsertJobTrigger( - conn_id=self.gcp_conn_id, - job_id=self.job_id, - project_id=self.project_id, - poll_interval=self.poll_interval, - ), - method_name="execute_complete", - ) + else: + if job.running(): + self.defer( + timeout=self.execution_timeout, + trigger=BigQueryInsertJobTrigger( + conn_id=self.gcp_conn_id, + job_id=self.job_id, + project_id=self.project_id, + poll_interval=self.poll_interval, + ), + method_name="execute_complete", + ) + self.log.info("Current state of job %s is %s", job.job_id, job.state) def execute_complete(self, context: Context, event: dict[str, Any]): """ diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py index b0547cec3f18c..4b6d31839f144 100644 --- a/tests/providers/google/cloud/operators/test_bigquery.py +++ b/tests/providers/google/cloud/operators/test_bigquery.py @@ -1310,6 +1310,35 @@ def test_execute_no_force_rerun(self, mock_hook): with pytest.raises(AirflowException): op.execute(context=MagicMock()) + @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator.defer") + @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") + def test_bigquery_insert_job_operator_async_finish_before_deferred(self, mock_hook, mock_defer, caplog): + job_id = "123456" + hash_ = "hash" + real_job_id = f"{job_id}_{hash_}" + + configuration = { + "query": { + "query": "SELECT * FROM any", + "useLegacySql": False, + } + } + mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False) + mock_hook.return_value.insert_job.return_value.running.return_value = False + + op = BigQueryInsertJobOperator( + task_id="insert_query_job", + configuration=configuration, + location=TEST_DATASET_LOCATION, + job_id=job_id, + project_id=TEST_GCP_PROJECT_ID, + deferrable=True, + ) + + op.execute(MagicMock()) + assert not mock_defer.called + assert "Current state of job" in caplog.text + @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") def test_bigquery_insert_job_operator_async(self, mock_hook, create_task_instance_of_operator): """