From 0e1d8f73e631b0a418131e3013a191ed35b2ff25 Mon Sep 17 00:00:00 2001 From: Irvi Aini Date: Mon, 26 Jun 2023 17:47:50 +0800 Subject: [PATCH] fix: Fix hardcoded BigQuery to GCS project id In the airflow configuration, we defined the self.project_id where we pass the information regarding the project_id where we run the job. There's other configuration which is defined in _prepare_configuration regarding how we define the job id given for the BQ table source. --- airflow/providers/google/cloud/transfers/bigquery_to_gcs.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py index 7ec62db9bfbd0..f9f7dd7241531 100644 --- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py @@ -143,6 +143,10 @@ def _handle_job_error(job: BigQueryJob | UnknownJob) -> None: raise AirflowException(f"BigQuery job {job.job_id} failed: {job.error_result}") def _prepare_configuration(self): + """ + This configuration define necessary argument for the data transfer, which related to project + where the BigQuery table resides. + """ source_project, source_dataset, source_table = self.hook.split_tablename( table_input=self.source_project_dataset_table, default_project_id=self.project_id or self.hook.project_id, @@ -183,7 +187,7 @@ def _submit_job( return hook.insert_job( configuration=configuration, - project_id=configuration["extract"]["sourceTable"]["projectId"], + project_id=self.project_id if self.project_id is not None else hook.project_id, location=self.location, job_id=job_id, timeout=self.result_timeout,