From 882f1c6d097e811e75b83267d002daadeecb1430 Mon Sep 17 00:00:00 2001 From: Lihan Li Date: Tue, 14 Jun 2022 22:14:35 +1000 Subject: [PATCH] Fixing TypeError: 'ExtractJob' object is not subscriptable Fixing instance check Pass static analysis --- .../google/cloud/transfers/bigquery_to_gcs.py | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py index fbae5a5b77933..afcca1684b256 100644 --- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py @@ -18,6 +18,8 @@ """This module contains Google BigQuery to Google Cloud Storage operator.""" from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union +from google.cloud.bigquery import ExtractJob + from airflow.models import BaseOperator from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, BigQueryJob from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink @@ -125,12 +127,13 @@ def execute(self, context: 'Context'): labels=self.labels, return_full_job=True, ) - conf = job["configuration"]["extract"]["sourceTable"] - dataset_id, project_id, table_id = conf["datasetId"], conf["projectId"], conf["tableId"] - BigQueryTableLink.persist( - context=context, - task_instance=self, - dataset_id=dataset_id, - project_id=project_id, - table_id=table_id, - ) + table_ref = job.source + if isinstance(table_ref, ExtractJob): + dataset_id, project_id, table_id = table_ref.dataset_id, table_ref.project, table_ref.table_id + BigQueryTableLink.persist( + context=context, + task_instance=self, + dataset_id=dataset_id, + project_id=project_id, + table_id=table_id, + )