-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow Provider(s)
Versions of Apache Airflow Providers
No response
Apache Airflow version
2.1.4
Operating System
Container-Optimized OS with Containerd
Deployment
Composer
Deployment details
No response
What happened
I am trying to load some files from GCS to BigQuery. To get the list of files which matches the prefix, I am using the GoogleCloudStorageListOperator which pushes the output to XCOM.
XCOM Push from GoogleCloudStorageListOperator:
['file1.csv', 'file2.csv', 'file3.csv', 'file4.csv', 'file5.csv']
When I am pulling the list from XCOM to use for BigQuery Load operation, it is getting rendered like:
[['file1.csv', 'file2.csv', 'file3.csv', 'file4.csv', 'file5.csv']]
Due to this I am getting the below error in GCSToBigQueryOperator :
Source URI must not contain the ',' character: gs://inbound-bucket/[['file1.csv', 'file2.csv', 'file3.csv', 'file4.csv', 'file5.csv']]
The code I am using can be found below.
dag = DAG(AIRFLOW_DAG_ID, default_args = default_values, description = "google_operator_test", catchup = False, max_active_runs = 1 ,render_template_as_native_obj=True, schedule_interval = None)
get_gcs_file_list = GoogleCloudStorageListOperator(
task_id= "get_gcs_file_list_1",
google_cloud_storage_conn_id = "temp_google_access",
bucket= "inbound-bucket",
prefix= source_object_location,
delimiter= '.csv',
dag = dag)
#Load the data from GCS to BQ
load_to_bq_stage = GCSToBigQueryOperator(
task_id= "bqload",
bigquery_conn_id = "temp_google_access",
google_cloud_storage_conn_id = "temp_google_access",
bucket= "inbound-bucket",
source_objects= '{{task_instance.xcom_pull(task_ids="get_gcs_file_list_1")}}',
destination_project_dataset_table= dataset + "." + target_table,
write_disposition='WRITE_TRUNCATE',
field_delimiter = '|',
dag=dag)
What you expected to happen
The list should be rendered as is by the GCSToBigQueryOperator.
How to reproduce
- Push a list of strings in to XCOM
- Perform XCOM pull in source_objects parameter of GCSToBigQueryOperator like below,
source_objects ='{{task_instance.xcom_pull(task_ids="task_name")}}' - Notice that the rendered source_objects have an extra parenthesis added.
Anything else
There is a validation performed in the operator’s constructor to check whether or not the source_objecst datatype is a list. If is it not a list, then it’s presumed to be a string and wrap it as a list.Because source_objects is a template field, the field value isn’t evaluated until the task runs (aka meaning the value isn’t rendered until the execute() method of the operator).
| self.source_objects = source_objects if isinstance(source_objects, list) else [source_objects] |
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct