From 41a3847e51192b8a0be21f5f1eaaeaeedf96ceea Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Tue, 20 Jun 2023 20:14:43 +0530 Subject: [PATCH 1/2] Call job error handler when job not deferred --- airflow/providers/google/cloud/operators/bigquery.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index be7a8673ff186..173b910ebae8e 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -2756,6 +2756,7 @@ def execute(self, context: Any): method_name="execute_complete", ) self.log.info("Current state of job %s is %s", job.job_id, job.state) + self._handle_job_error(job) def execute_complete(self, context: Context, event: dict[str, Any]): """ From db3d26fb90ccb9c5fb89ed49c7172b207eef0bbb Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Tue, 20 Jun 2023 20:56:19 +0530 Subject: [PATCH 2/2] Add test --- .../google/cloud/operators/test_bigquery.py | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py index 34c1a803cf8eb..4d2a19db73f47 100644 --- a/tests/providers/google/cloud/operators/test_bigquery.py +++ b/tests/providers/google/cloud/operators/test_bigquery.py @@ -1347,6 +1347,36 @@ def test_bigquery_insert_job_operator_async_finish_before_deferred(self, mock_ho assert not mock_defer.called assert "Current state of job" in caplog.text + @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator.defer") + @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") + def test_bigquery_insert_job_operator_async_error_before_deferred(self, mock_hook, mock_defer, caplog): + job_id = "123456" + hash_ = "hash" + real_job_id = f"{job_id}_{hash_}" + + configuration = { + "query": { + "query": "SELECT * FROM any", + "useLegacySql": False, + } + } + mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=True) + mock_hook.return_value.insert_job.return_value.running.return_value = False + + op = BigQueryInsertJobOperator( + task_id="insert_query_job", + configuration=configuration, + location=TEST_DATASET_LOCATION, + job_id=job_id, + project_id=TEST_GCP_PROJECT_ID, + deferrable=True, + ) + + with pytest.raises(AirflowException) as exc: + op.execute(MagicMock()) + + assert str(exc.value) == f"BigQuery job {real_job_id} failed: True" + @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") def test_bigquery_insert_job_operator_async(self, mock_hook, create_task_instance_of_operator): """