From 26575d7a099cbb48d2484365bf3032ed60688ef2 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 29 Nov 2023 19:09:27 +0100 Subject: [PATCH 1/2] Move `duckdb` & `pandas` import in tutorial DAG into task This improves the code as per best practices and avoids import error if duckdb is not installed --- airflow/example_dags/tutorial_objectstorage.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/airflow/example_dags/tutorial_objectstorage.py b/airflow/example_dags/tutorial_objectstorage.py index 47db595c24475..b44774ace57c5 100644 --- a/airflow/example_dags/tutorial_objectstorage.py +++ b/airflow/example_dags/tutorial_objectstorage.py @@ -62,9 +62,6 @@ def tutorial_objectstorage(): located [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/objectstorage.html) """ - # [END instantiate_dag] - import duckdb - import pandas as pd # [START get_air_quality_data] @task @@ -74,6 +71,8 @@ def get_air_quality_data(**kwargs) -> ObjectStoragePath: This task gets air quality data from the Finnish Meteorological Institute's open data API. The data is saved as parquet. """ + import pandas as pd + execution_date = kwargs["logical_date"] start_time = kwargs["data_interval_start"] @@ -113,6 +112,8 @@ def analyze(path: ObjectStoragePath, **kwargs): #### Analyze This task analyzes the air quality data, prints the results """ + import duckdb + conn = duckdb.connect(database=":memory:") conn.register_filesystem(path.fs) conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')") From fbd79e3a9fb3175c0842587fbe053149dec0e08d Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 29 Nov 2023 20:12:25 +0100 Subject: [PATCH 2/2] fixup! Move `duckdb` & `pandas` import in tutorial DAG into task --- airflow/example_dags/tutorial_objectstorage.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/example_dags/tutorial_objectstorage.py b/airflow/example_dags/tutorial_objectstorage.py index b44774ace57c5..11d817400df23 100644 --- a/airflow/example_dags/tutorial_objectstorage.py +++ b/airflow/example_dags/tutorial_objectstorage.py @@ -47,7 +47,6 @@ # [END create_object_storage_path] -# [START instantiate_dag] @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),