Skip to content

Conversation

@moiseenkov
Copy link
Contributor

This PR provides a small fix that adds missing scopes into a token.

Relates to #34727 and continues #36849, #36903.

In our use case we have a DAG (see below) with BigQueryInsertJobOperator with the following characteristics:

  • deferrable mode;
  • impersonation chain contains service account;
  • the BigQuery job operates over a dataset that belongs to a different GCP project.

In this case instantiation of the newly implemented class _CredentialsToken fails with exception raise in the base class:

{bigquery.py:111} ERROR - Exception occurred while checking for query completion
Traceback (most recent call last):
  File "/opt/airflow/airflow/providers/google/cloud/triggers/bigquery.py", line 90, in run
    job_status = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
  File "/opt/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 3300, in get_job_status
    job_client = await self.get_job_instance(project_id, job_id, s)
  File "/opt/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 3290, in get_job_instance
    token = await self.get_token(session=session)
  File "/opt/airflow/airflow/providers/google/common/hooks/base_google.py", line 695, in get_token
    return await _CredentialsToken.from_hook(sync_hook, session=session)
  File "/opt/airflow/airflow/providers/google/common/hooks/base_google.py", line 658, in from_hook
    return cls(
  File "/opt/airflow/airflow/providers/google/common/hooks/base_google.py", line 646, in __init__
    super().__init__(session=cast(Session, session))
  File "/usr/local/lib/python3.8/site-packages/gcloud/aio/auth/token.py", line 157, in __init__
    raise Exception(
Exception: scopes must be provided when token type is service account

Current PR simply passes hook scopes into the _CredentialsToken, so the base class Token would work properly.


DAG

from datetime import datetime

from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryCreateEmptyDatasetOperator,
    BigQueryCreateEmptyTableOperator,
    BigQueryInsertJobOperator,
)

ENV_ID = "TEST_ENV"
PROJECT_ID = "TEST_PROJECT"

DAG_ID = "example_bigquery_queries_async"

DATASET_NAME = f"bq_d_{DAG_ID}_{ENV_ID}".replace("-", "_")
LOCATION = "us"

TABLE_NAME_1 = f"bq_{DAG_ID}_{ENV_ID}_1".replace("-", "_")
TABLE_NAME_2 = f"table_{DAG_ID}_{ENV_ID}_2".replace("-", "_")

SCHEMA = [
    {"name": "value", "type": "INTEGER", "mode": "REQUIRED"},
    {"name": "name", "type": "STRING", "mode": "NULLABLE"},
    {"name": "ds", "type": "STRING", "mode": "NULLABLE"},
]

INSERT_DATE = datetime.now().strftime("%Y-%m-%d")
INSERT_ROWS_QUERY = (
    f"INSERT {DATASET_NAME}.{TABLE_NAME_1} VALUES "
    f"(42, 'monthy python', '{INSERT_DATE}'), "
    f"(42, 'fishy fish', '{INSERT_DATE}');"
)
SA = "impersonate-sa-example@developer.gserviceaccount.com"


with DAG(
    dag_id=DAG_ID,
    schedule="@once",
    start_date=datetime(2022, 1, 1),
    catchup=False,
    tags=["example", "bigquery", "deferrable"],
    user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_NAME_1},
) as dag:

    create_dataset = BigQueryCreateEmptyDatasetOperator(
        task_id="create_dataset",
        dataset_id=DATASET_NAME,
        location=LOCATION,
        project_id=PROJECT_ID,
        impersonation_chain=SA,
    )

    create_table_1 = BigQueryCreateEmptyTableOperator(
        task_id="create_table_1",
        dataset_id=DATASET_NAME,
        table_id=TABLE_NAME_1,
        schema_fields=SCHEMA,
        location=LOCATION,
        project_id=PROJECT_ID,
        impersonation_chain=SA,
    )

    insert_query_job_imp = BigQueryInsertJobOperator(
        task_id="insert_query_job_imp",
        project_id=PROJECT_ID,
        configuration={
            "query": {
                "query": INSERT_ROWS_QUERY,
                "useLegacySql": False,
                "priority": "BATCH",
            }
        },
        location=LOCATION,
        impersonation_chain=SA,
        deferrable=True,
    )

    create_dataset >> create_table_1 >> insert_query_job_imp

@boring-cyborg boring-cyborg bot added area:providers provider:google Google (including GCP) related issues labels Jan 23, 2024
@VladaZakharova
Copy link
Contributor

Hi @m1racoli @Lee-W @pankajastro !
Can you please check changes from this PR? Thanks!

@moiseenkov moiseenkov force-pushed the add_scope_to_gcp_token branch from 98c1983 to 2d6bda5 Compare January 23, 2024 16:21
@moiseenkov moiseenkov force-pushed the add_scope_to_gcp_token branch from 2d6bda5 to a37134a Compare January 23, 2024 16:23
Copy link
Contributor

@dirrao dirrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing test cases failing. Proposed a change to fix it.

Co-authored-by: Gopal Dirisala <39794726+dirrao@users.noreply.github.com>
@potiuk potiuk merged commit 241b50a into apache:main Jan 24, 2024
Copy link
Contributor

@m1racoli m1racoli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:google Google (including GCP) related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants