diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py index 7ec62db9bfbd0..f9f7dd7241531 100644 --- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py @@ -143,6 +143,10 @@ def _handle_job_error(job: BigQueryJob | UnknownJob) -> None: raise AirflowException(f"BigQuery job {job.job_id} failed: {job.error_result}") def _prepare_configuration(self): + """ + This configuration define necessary argument for the data transfer, which related to project + where the BigQuery table resides. + """ source_project, source_dataset, source_table = self.hook.split_tablename( table_input=self.source_project_dataset_table, default_project_id=self.project_id or self.hook.project_id, @@ -183,7 +187,7 @@ def _submit_job( return hook.insert_job( configuration=configuration, - project_id=configuration["extract"]["sourceTable"]["projectId"], + project_id=self.project_id if self.project_id is not None else hook.project_id, location=self.location, job_id=job_id, timeout=self.result_timeout,