-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.6.3
What happened?
I am working in Cloud Composer on GCP. We are using composer-2.4.6-airflow-2.6.3.
I am building a script that is expected to use the BigQueryUpdateTableSchemaOperator to change the schema of a BigQuery table. The company I work for has very strict rule that all resources and jobs used in GCP are located in Europe, in respect of GDPR regulations. The BigQueryUpdateTableSchemaOperator in documentation is meant to accept "location" as one of the parameters. When I try to pass the code below Airflow raises DAG Import Error:
Broken DAG: [/home/airflow/gcs/dags/dags/ingestion_pipelines/ingestion_pipelines.py] Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 429, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 788, in init
raise AirflowException(
airflow.exceptions.AirflowException: Invalid arguments were passed to BigQueryUpdateTableSchemaOperator (task_id: update_schema). Invalid arguments were:
**kwargs: {'location': 'EU'}
Code:
table_schema = {
"name": "column",
"type": "TIMESTAMP",
"description": "column",
}
update_schema = BigQueryUpdateTableSchemaOperator(
task_id="update_schema",
dataset_id="dataset",
table_id="table_name",
project_id="PROJECT_ID",
schema_fields_updates=table_schema,
include_policy_tags=True,
location="EU",
impersonation_chain="SERVICE_ACCOUNT",
retries=0,
)
What you think should happen instead?
The DAG should load without errors.
How to reproduce
upload this code into the bucket that is connected in your Cloud Composer.
from airflow.decorators import dag
from airflow.decorators import task
from airflow.decorators import task_group
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.bigquery import BigQueryUpdateTableSchemaOperator
@dag(
dag_id="update_table",
max_active_runs=1,
start_date=datetime(2023, 1, 1),
catchup=False,
)
def task_flow():
@task_group(group_id="database")
def task_groups():
#this is the bigquery table
dataset="covid19_ecdc_eu"
table_name="covid_19_geographic_distribution_worldwide"
project_id="bigquery-public-data"
table_schema = {
"name": "date",
"description": "date column",
}
update_schema = BigQueryUpdateTableSchemaOperator(
task_id="update_bronze_schema_for_incr",
dataset_id=dataset,
table_id=table_name,
project_id=project_id,
schema_fields_updates=table_schema,
include_policy_tags=True,
location="EU",
retries=0,
)
chain(
update_schema
)
task_groups()
task_flow()
Operating System
macOS Sonoma 14.5 23F79
Versions of Apache Airflow Providers
Deployment
Google Cloud Composer
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct