From 09a766720c3c6f642ec33fc8361f0f9da32b08d5 Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Tue, 5 Jul 2022 21:38:31 -0600 Subject: [PATCH 1/4] System test for local to s3 - without using fetch_variable --- .../amazon/aws/example_local_to_s3.py | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 tests/system/providers/amazon/aws/example_local_to_s3.py diff --git a/tests/system/providers/amazon/aws/example_local_to_s3.py b/tests/system/providers/amazon/aws/example_local_to_s3.py new file mode 100644 index 0000000000000..597549945fa57 --- /dev/null +++ b/tests/system/providers/amazon/aws/example_local_to_s3.py @@ -0,0 +1,95 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import os +from datetime import datetime + +from airflow import DAG +from airflow.decorators import task +from airflow.models.baseoperator import chain +from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator +from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator +from airflow.utils.trigger_rule import TriggerRule +from tests.system.providers.amazon.aws.utils import set_env_id + +ENV_ID = set_env_id() +DAG_ID = 'example_local_to_s3' + +S3_BUCKET_NAME = f'{ENV_ID}-bucket' +S3_KEY = f'{ENV_ID}/files/my-temp-file.txt' +TEMP_FILE_PATH = '/tmp/sample-txt.txt' +SAMPLE_TEXT = 'This is some sample text.' + + +@task +def create_temp_file(): + file = open(TEMP_FILE_PATH, 'w') + file.write(SAMPLE_TEXT) + + +@task(trigger_rule=TriggerRule.ALL_DONE) +def delete_temp_file(): + if os.path.exists(TEMP_FILE_PATH): + os.remove(TEMP_FILE_PATH) + + +with DAG( + "example_local_to_s3", + schedule_interval='@once', + start_date=datetime(2021, 1, 1), # Override to match your needs + catchup=False, +) as dag: + # [START howto_operator_s3_create_bucket] + create_s3_bucket = S3CreateBucketOperator( + task_id='create-s3-bucket', bucket_name=S3_BUCKET_NAME, region_name='us-east-1' + ) + # [END howto_operator_s3_create_bucket] + + # [START howto_transfer_local_to_s3] + create_local_to_s3_job = LocalFilesystemToS3Operator( + task_id="create_local_to_s3_job", + filename="/tmp/sample-txt.txt", + dest_key=S3_KEY, + dest_bucket=S3_BUCKET_NAME, + replace=True, + ) + # [END howto_transfer_local_to_s3] + + # [START howto_operator_s3_delete_bucket] + delete_s3_bucket = S3DeleteBucketOperator( + task_id='delete_s3_bucket', bucket_name=S3_BUCKET_NAME, force_delete=True, trigger_rule="all_done" + ) + # [END howto_operator_s3_delete_bucket] + + chain( + create_temp_file(), + create_s3_bucket, + create_local_to_s3_job, + delete_s3_bucket, + delete_temp_file(), + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) From 9db1a11529b124ad8abb2685c274d8320a45266c Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Wed, 6 Jul 2022 12:23:11 -0600 Subject: [PATCH 2/4] Update system test to use SystemTestContextBuilder --- .../amazon/aws/example_local_to_s3.py | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/tests/system/providers/amazon/aws/example_local_to_s3.py b/tests/system/providers/amazon/aws/example_local_to_s3.py index 597549945fa57..c729bd6246444 100644 --- a/tests/system/providers/amazon/aws/example_local_to_s3.py +++ b/tests/system/providers/amazon/aws/example_local_to_s3.py @@ -23,16 +23,11 @@ from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator from airflow.utils.trigger_rule import TriggerRule -from tests.system.providers.amazon.aws.utils import set_env_id +from tests.system.providers.amazon.aws.utils import SystemTestContextBuilder -ENV_ID = set_env_id() -DAG_ID = 'example_local_to_s3' - -S3_BUCKET_NAME = f'{ENV_ID}-bucket' -S3_KEY = f'{ENV_ID}/files/my-temp-file.txt' -TEMP_FILE_PATH = '/tmp/sample-txt.txt' -SAMPLE_TEXT = 'This is some sample text.' +sys_test_context_task = SystemTestContextBuilder().build() +DAG_ID = 'example_local_to_s3' @task def create_temp_file(): @@ -47,11 +42,20 @@ def delete_temp_file(): with DAG( - "example_local_to_s3", + dag_id=DAG_ID, schedule_interval='@once', start_date=datetime(2021, 1, 1), # Override to match your needs + tags=['example'], catchup=False, ) as dag: + test_context = sys_test_context_task() + ENV_ID = test_context['ENV_ID'] + + S3_BUCKET_NAME = f'{ENV_ID}-bucket' + S3_KEY = f'{ENV_ID}/files/my-temp-file.txt' + TEMP_FILE_PATH = '/tmp/sample-txt.txt' + SAMPLE_TEXT = 'This is some sample text.' + # [START howto_operator_s3_create_bucket] create_s3_bucket = S3CreateBucketOperator( task_id='create-s3-bucket', bucket_name=S3_BUCKET_NAME, region_name='us-east-1' @@ -75,6 +79,7 @@ def delete_temp_file(): # [END howto_operator_s3_delete_bucket] chain( + test_context, create_temp_file(), create_s3_bucket, create_local_to_s3_job, From fd80d0207031da38bd139f3cdc2cb844aa7e5496 Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Wed, 13 Jul 2022 10:04:23 -0700 Subject: [PATCH 3/4] Make requested changes --- .../aws/example_dags/example_local_to_s3.py | 42 ------------------- .../amazon/aws/example_local_to_s3.py | 35 ++++++++-------- 2 files changed, 18 insertions(+), 59 deletions(-) delete mode 100644 airflow/providers/amazon/aws/example_dags/example_local_to_s3.py diff --git a/airflow/providers/amazon/aws/example_dags/example_local_to_s3.py b/airflow/providers/amazon/aws/example_dags/example_local_to_s3.py deleted file mode 100644 index 05f9c74d7685b..0000000000000 --- a/airflow/providers/amazon/aws/example_dags/example_local_to_s3.py +++ /dev/null @@ -1,42 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -import os - -from airflow import models -from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator -from airflow.utils.dates import datetime - -S3_BUCKET = os.environ.get("S3_BUCKET", "test-bucket") -S3_KEY = os.environ.get("S3_KEY", "key") - -with models.DAG( - "example_local_to_s3", - schedule_interval=None, - start_date=datetime(2021, 1, 1), # Override to match your needs - catchup=False, -) as dag: - # [START howto_transfer_local_to_s3] - create_local_to_s3_job = LocalFilesystemToS3Operator( - task_id="create_local_to_s3_job", - filename="relative/path/to/file.csv", - dest_key=S3_KEY, - dest_bucket=S3_BUCKET, - replace=True, - ) - # [END howto_transfer_local_to_s3] diff --git a/tests/system/providers/amazon/aws/example_local_to_s3.py b/tests/system/providers/amazon/aws/example_local_to_s3.py index c729bd6246444..9e599fbc572d0 100644 --- a/tests/system/providers/amazon/aws/example_local_to_s3.py +++ b/tests/system/providers/amazon/aws/example_local_to_s3.py @@ -27,7 +27,11 @@ sys_test_context_task = SystemTestContextBuilder().build() + DAG_ID = 'example_local_to_s3' +TEMP_FILE_PATH = '/tmp/sample-txt.txt' +SAMPLE_TEXT = 'This is some sample text.' + @task def create_temp_file(): @@ -49,40 +53,37 @@ def delete_temp_file(): catchup=False, ) as dag: test_context = sys_test_context_task() - ENV_ID = test_context['ENV_ID'] + env_id = test_context['ENV_ID'] - S3_BUCKET_NAME = f'{ENV_ID}-bucket' - S3_KEY = f'{ENV_ID}/files/my-temp-file.txt' - TEMP_FILE_PATH = '/tmp/sample-txt.txt' - SAMPLE_TEXT = 'This is some sample text.' - - # [START howto_operator_s3_create_bucket] - create_s3_bucket = S3CreateBucketOperator( - task_id='create-s3-bucket', bucket_name=S3_BUCKET_NAME, region_name='us-east-1' - ) - # [END howto_operator_s3_create_bucket] + s3_bucket_name = f'{env_id}-bucket' + s3_key = f'{env_id}/files/my-temp-file.txt' + create_s3_bucket = S3CreateBucketOperator(task_id='create-s3-bucket', bucket_name=s3_bucket_name) # [START howto_transfer_local_to_s3] create_local_to_s3_job = LocalFilesystemToS3Operator( task_id="create_local_to_s3_job", - filename="/tmp/sample-txt.txt", - dest_key=S3_KEY, - dest_bucket=S3_BUCKET_NAME, + filename=TEMP_FILE_PATH, + dest_key=s3_key, + dest_bucket=s3_bucket_name, replace=True, ) # [END howto_transfer_local_to_s3] - # [START howto_operator_s3_delete_bucket] delete_s3_bucket = S3DeleteBucketOperator( - task_id='delete_s3_bucket', bucket_name=S3_BUCKET_NAME, force_delete=True, trigger_rule="all_done" + task_id='delete_s3_bucket', + bucket_name=s3_bucket_name, + force_delete=True, + trigger_rule=TriggerRule.ALL_DONE, ) - # [END howto_operator_s3_delete_bucket] chain( + # TEST SETUP test_context, create_temp_file(), create_s3_bucket, + # TEST BODY create_local_to_s3_job, + # TEST TEARDOWN delete_s3_bucket, delete_temp_file(), ) From 820e97ee3ce9505ab2e24c3205a817d0330c4a22 Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Tue, 2 Aug 2022 10:57:45 -0600 Subject: [PATCH 4/4] Update local_to_s3.rst file to use the correct code reference. --- .../operators/transfer/local_to_s3.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow-providers-amazon/operators/transfer/local_to_s3.rst b/docs/apache-airflow-providers-amazon/operators/transfer/local_to_s3.rst index 549ad8e2b1c93..403db3ea9a826 100644 --- a/docs/apache-airflow-providers-amazon/operators/transfer/local_to_s3.rst +++ b/docs/apache-airflow-providers-amazon/operators/transfer/local_to_s3.rst @@ -42,7 +42,7 @@ To get more information about this operator visit: Example usage: -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_local_to_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_local_to_s3.py :language: python :dedent: 4 :start-after: [START howto_transfer_local_to_s3]