From 1f4ebe57ac858b9d656d6953608901683eea2cb0 Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Tue, 13 Sep 2022 12:15:12 -0600 Subject: [PATCH 1/3] S3 system test (AIP-47) using SystemTestContextBuilder --- .../providers/amazon/aws}/example_s3.py | 187 +++++++++++------- 1 file changed, 113 insertions(+), 74 deletions(-) rename {airflow/providers/amazon/aws/example_dags => tests/system/providers/amazon/aws}/example_s3.py (58%) diff --git a/airflow/providers/amazon/aws/example_dags/example_s3.py b/tests/system/providers/amazon/aws/example_s3.py similarity index 58% rename from airflow/providers/amazon/aws/example_dags/example_s3.py rename to tests/system/providers/amazon/aws/example_s3.py index 5f5348cff1e18..a0e005ccaa38e 100644 --- a/airflow/providers/amazon/aws/example_dags/example_s3.py +++ b/tests/system/providers/amazon/aws/example_s3.py @@ -16,7 +16,6 @@ # under the License. from __future__ import annotations -import os from datetime import datetime from airflow.models.baseoperator import chain @@ -35,186 +34,226 @@ S3PutBucketTaggingOperator, ) from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor, S3KeysUnchangedSensor +from airflow.utils.trigger_rule import TriggerRule +from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder -BUCKET_NAME = os.environ.get('BUCKET_NAME', 'test-airflow-12345') -BUCKET_NAME_2 = os.environ.get('BUCKET_NAME_2', 'test-airflow-123456') -KEY = os.environ.get('KEY', 'key') -KEY_2 = os.environ.get('KEY_2', 'key2') -# Empty string prefix refers to the bucket root -# See what prefix is here https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html -PREFIX = os.environ.get('PREFIX', '') -DELIMITER = os.environ.get('DELIMITER', '/') -TAG_KEY = os.environ.get('TAG_KEY', 'test-s3-bucket-tagging-key') -TAG_VALUE = os.environ.get('TAG_VALUE', 'test-s3-bucket-tagging-value') -DATA = os.environ.get( - 'DATA', - ''' -apple,0.5 -milk,2.5 -bread,4.0 -''', -) +DAG_ID = 'example_s3' + +sys_test_context_task = SystemTestContextBuilder().build() + +DATA = ''' + apple,0.5 + milk,2.5 + bread,4.0 +''' with DAG( - dag_id='example_s3', + dag_id=DAG_ID, + schedule='@once', start_date=datetime(2021, 1, 1), catchup=False, tags=['example'], ) as dag: + test_context = sys_test_context_task() + env_id = test_context[ENV_ID_KEY] + + bucket_name = f's3-bucket-{env_id}' + bucket_name_2 = f's3-bucket-2-{env_id}' + + key = f'{env_id}-key' + key_2 = f'{env_id}-key2' + + tag_key = 'test-s3-bucket-tagging-key' + tag_value = 'test-s3-bucket-tagging-value' + + # Empty string prefix refers to the bucket root + # See what prefix is here https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html + prefix = '' + delimiter = '/' + # [START howto_sensor_s3_key_function_definition] def check_fn(files: list) -> bool: """ - Example of custom check: check if all files are bigger than ``1kB`` + Example of custom check: check if all files are bigger than ``20 bytes`` :param files: List of S3 object attributes. :return: true if the criteria is met :rtype: bool """ - return all(f.get('Size', 0) > 1024 for f in files) + return all(f.get('Size', 0) > 20 for f in files) # [END howto_sensor_s3_key_function_definition] # [START howto_operator_s3_create_bucket] create_bucket = S3CreateBucketOperator( - task_id='s3_create_bucket', - bucket_name=BUCKET_NAME, + task_id='create_bucket', + bucket_name=bucket_name, ) # [END howto_operator_s3_create_bucket] + create_bucket_2 = S3CreateBucketOperator( + task_id='create_bucket_2', + bucket_name=bucket_name_2, + ) + # [START howto_operator_s3_put_bucket_tagging] put_tagging = S3PutBucketTaggingOperator( - task_id='s3_put_bucket_tagging', - bucket_name=BUCKET_NAME, - key=TAG_KEY, - value=TAG_VALUE, + task_id='put_tagging', + bucket_name=bucket_name, + key=tag_key, + value=tag_value, ) # [END howto_operator_s3_put_bucket_tagging] # [START howto_operator_s3_get_bucket_tagging] get_tagging = S3GetBucketTaggingOperator( - task_id='s3_get_bucket_tagging', - bucket_name=BUCKET_NAME, + task_id='get_tagging', + bucket_name=bucket_name, ) # [END howto_operator_s3_get_bucket_tagging] # [START howto_operator_s3_delete_bucket_tagging] delete_tagging = S3DeleteBucketTaggingOperator( - task_id='s3_delete_bucket_tagging', - bucket_name=BUCKET_NAME, + task_id='delete_tagging', + bucket_name=bucket_name, ) # [END howto_operator_s3_delete_bucket_tagging] # [START howto_operator_s3_create_object] create_object = S3CreateObjectOperator( - task_id="s3_create_object", - s3_bucket=BUCKET_NAME, - s3_key=KEY, + task_id="create_object", + s3_bucket=bucket_name, + s3_key=key, data=DATA, replace=True, ) # [END howto_operator_s3_create_object] + create_object_2 = S3CreateObjectOperator( + task_id="create_object_2", + s3_bucket=bucket_name, + s3_key=key_2, + data=DATA, + replace=True, + ) + # [START howto_operator_s3_list_prefixes] list_prefixes = S3ListPrefixesOperator( - task_id="s3_list_prefix_operator", - bucket=BUCKET_NAME, - prefix=PREFIX, - delimiter=DELIMITER, + task_id="list_prefixes", + bucket=bucket_name, + prefix=prefix, + delimiter=delimiter, ) # [END howto_operator_s3_list_prefixes] # [START howto_operator_s3_list] list_keys = S3ListOperator( - task_id="s3_list_operator", - bucket=BUCKET_NAME, - prefix=PREFIX, + task_id="list_keys", + bucket=bucket_name, + prefix=prefix, ) # [END howto_operator_s3_list] # [START howto_sensor_s3_key_single_key] # Check if a file exists sensor_one_key = S3KeySensor( - task_id="s3_sensor_one_key", - bucket_name=BUCKET_NAME, - bucket_key=KEY, + task_id="sensor_one_key", + bucket_name=bucket_name, + bucket_key=key, ) # [END howto_sensor_s3_key_single_key] # [START howto_sensor_s3_key_multiple_keys] # Check if both files exist sensor_two_keys = S3KeySensor( - task_id="s3_sensor_two_keys", - bucket_name=BUCKET_NAME, - bucket_key=[KEY, KEY_2], + task_id="sensor_two_keys", + bucket_name=bucket_name, + bucket_key=[key, key_2], ) # [END howto_sensor_s3_key_multiple_keys] # [START howto_sensor_s3_key_function] # Check if a file exists and match a certain pattern defined in check_fn sensor_key_with_function = S3KeySensor( - task_id="s3_sensor_key_function", - bucket_name=BUCKET_NAME, - bucket_key=KEY, + task_id="sensor_key_with_function", + bucket_name=bucket_name, + bucket_key=key, check_fn=check_fn, ) # [END howto_sensor_s3_key_function] - # [START howto_sensor_s3_keys_unchanged] - sensor_keys_unchanged = S3KeysUnchangedSensor( - task_id="s3_sensor_one_key_size", - bucket_name=BUCKET_NAME_2, - prefix=PREFIX, - inactivity_period=10, - ) - # [END howto_sensor_s3_keys_unchanged] - # [START howto_operator_s3_copy_object] copy_object = S3CopyObjectOperator( - task_id="s3_copy_object", - source_bucket_name=BUCKET_NAME, - dest_bucket_name=BUCKET_NAME_2, - source_bucket_key=KEY, - dest_bucket_key=KEY_2, + task_id="copy_object", + source_bucket_name=bucket_name, + dest_bucket_name=bucket_name_2, + source_bucket_key=key, + dest_bucket_key=key_2, ) # [END howto_operator_s3_copy_object] # [START howto_operator_s3_file_transform] - transforms_file = S3FileTransformOperator( - task_id="s3_file_transform", - source_s3_key=f's3://{BUCKET_NAME}/{KEY}', - dest_s3_key=f's3://{BUCKET_NAME_2}/{KEY_2}', + file_transform = S3FileTransformOperator( + task_id="file_transform", + source_s3_key=f's3://{bucket_name}/{key}', + dest_s3_key=f's3://{bucket_name_2}/{key_2}', # Use `cp` command as transform script as an example transform_script='cp', replace=True, ) # [END howto_operator_s3_file_transform] + # [START howto_sensor_s3_keys_unchanged] + sensor_keys_unchanged = S3KeysUnchangedSensor( + task_id="sensor_keys_unchanged", + bucket_name=bucket_name_2, + prefix=prefix, + inactivity_period=10, + ) + # [END howto_sensor_s3_keys_unchanged] + # [START howto_operator_s3_delete_objects] delete_objects = S3DeleteObjectsOperator( - task_id="s3_delete_objects", - bucket=BUCKET_NAME_2, - keys=KEY_2, + task_id="delete_objects", + bucket=bucket_name_2, + keys=key_2, ) # [END howto_operator_s3_delete_objects] # [START howto_operator_s3_delete_bucket] delete_bucket = S3DeleteBucketOperator( - task_id='s3_delete_bucket', bucket_name=BUCKET_NAME, force_delete=True + task_id='delete_bucket', bucket_name=bucket_name, force_delete=True, trigger_rule=TriggerRule.ALL_DONE ) # [END howto_operator_s3_delete_bucket] chain( + # TEST SETUP + test_context, + # TEST BODY create_bucket, + create_bucket_2, put_tagging, get_tagging, delete_tagging, create_object, + create_object_2, list_prefixes, list_keys, [sensor_one_key, sensor_two_keys, sensor_key_with_function], copy_object, - transforms_file, + file_transform, sensor_keys_unchanged, + # TEST TEARDOWN delete_objects, delete_bucket, ) + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) From 23f0d1f89312b7b71ea62dc64478e9a506362573 Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Mon, 19 Sep 2022 15:08:37 -0700 Subject: [PATCH 2/3] Move constants to module level Add task to delete second bucket --- .../system/providers/amazon/aws/example_s3.py | 46 +++++++++++-------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/tests/system/providers/amazon/aws/example_s3.py b/tests/system/providers/amazon/aws/example_s3.py index a0e005ccaa38e..d24ad72029141 100644 --- a/tests/system/providers/amazon/aws/example_s3.py +++ b/tests/system/providers/amazon/aws/example_s3.py @@ -47,6 +47,13 @@ bread,4.0 ''' +# Empty string prefix refers to the bucket root +# See what prefix is here https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html +PREFIX = '' +DELIMITER = '/' +TAG_KEY = 'test-s3-bucket-tagging-key' +TAG_VALUE = 'test-s3-bucket-tagging-value' + with DAG( dag_id=DAG_ID, schedule='@once', @@ -57,20 +64,12 @@ test_context = sys_test_context_task() env_id = test_context[ENV_ID_KEY] - bucket_name = f's3-bucket-{env_id}' - bucket_name_2 = f's3-bucket-2-{env_id}' + bucket_name = f'{env_id}-s3-bucket' + bucket_name_2 = f'{env_id}-s3-bucket-2' key = f'{env_id}-key' key_2 = f'{env_id}-key2' - tag_key = 'test-s3-bucket-tagging-key' - tag_value = 'test-s3-bucket-tagging-value' - - # Empty string prefix refers to the bucket root - # See what prefix is here https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html - prefix = '' - delimiter = '/' - # [START howto_sensor_s3_key_function_definition] def check_fn(files: list) -> bool: """ @@ -100,8 +99,8 @@ def check_fn(files: list) -> bool: put_tagging = S3PutBucketTaggingOperator( task_id='put_tagging', bucket_name=bucket_name, - key=tag_key, - value=tag_value, + key=TAG_KEY, + value=TAG_VALUE, ) # [END howto_operator_s3_put_bucket_tagging] @@ -141,8 +140,8 @@ def check_fn(files: list) -> bool: list_prefixes = S3ListPrefixesOperator( task_id="list_prefixes", bucket=bucket_name, - prefix=prefix, - delimiter=delimiter, + prefix=PREFIX, + delimiter=DELIMITER, ) # [END howto_operator_s3_list_prefixes] @@ -150,7 +149,7 @@ def check_fn(files: list) -> bool: list_keys = S3ListOperator( task_id="list_keys", bucket=bucket_name, - prefix=prefix, + prefix=PREFIX, ) # [END howto_operator_s3_list] @@ -207,8 +206,8 @@ def check_fn(files: list) -> bool: sensor_keys_unchanged = S3KeysUnchangedSensor( task_id="sensor_keys_unchanged", bucket_name=bucket_name_2, - prefix=prefix, - inactivity_period=10, + prefix=PREFIX, + inactivity_period=10, # inactivity_period in seconds ) # [END howto_sensor_s3_keys_unchanged] @@ -222,9 +221,19 @@ def check_fn(files: list) -> bool: # [START howto_operator_s3_delete_bucket] delete_bucket = S3DeleteBucketOperator( - task_id='delete_bucket', bucket_name=bucket_name, force_delete=True, trigger_rule=TriggerRule.ALL_DONE + task_id='delete_bucket', + bucket_name=bucket_name, + force_delete=True, ) # [END howto_operator_s3_delete_bucket] + delete_bucket.trigger_rule = TriggerRule.ALL_DONE + + delete_bucket_2 = S3DeleteBucketOperator( + task_id='delete_bucket_2', + bucket_name=bucket_name_2, + force_delete=True, + ) + delete_bucket_2.trigger_rule = TriggerRule.ALL_DONE chain( # TEST SETUP @@ -246,6 +255,7 @@ def check_fn(files: list) -> bool: # TEST TEARDOWN delete_objects, delete_bucket, + delete_bucket_2, ) from tests.system.utils.watcher import watcher From 3439e5a37e193da645b31c6f47e6fd8b523d8e6f Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Tue, 20 Sep 2022 13:30:06 -0700 Subject: [PATCH 3/3] Update s3.rst to new DAG --- .../operators/s3.rst | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/docs/apache-airflow-providers-amazon/operators/s3.rst b/docs/apache-airflow-providers-amazon/operators/s3.rst index bbb13d976366c..3d594fcdcb2bb 100644 --- a/docs/apache-airflow-providers-amazon/operators/s3.rst +++ b/docs/apache-airflow-providers-amazon/operators/s3.rst @@ -38,7 +38,7 @@ Create an Amazon S3 bucket To create an Amazon S3 bucket you can use :class:`~airflow.providers.amazon.aws.operators.s3.S3CreateBucketOperator`. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py :language: python :dedent: 4 :start-after: [START howto_operator_s3_create_bucket] @@ -52,7 +52,7 @@ Delete an Amazon S3 bucket To delete an Amazon S3 bucket you can use :class:`~airflow.providers.amazon.aws.operators.s3.S3DeleteBucketOperator`. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py :language: python :dedent: 4 :start-after: [START howto_operator_s3_delete_bucket] @@ -66,7 +66,7 @@ Set the tags for an Amazon S3 bucket To set the tags for an Amazon S3 bucket you can use :class:`~airflow.providers.amazon.aws.operators.s3.S3PutBucketTaggingOperator`. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py :language: python :dedent: 4 :start-after: [START howto_operator_s3_put_bucket_tagging] @@ -80,7 +80,7 @@ Get the tag of an Amazon S3 bucket To get the tag set associated with an Amazon S3 bucket you can use :class:`~airflow.providers.amazon.aws.operators.s3.S3GetBucketTaggingOperator`. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py :language: python :dedent: 4 :start-after: [START howto_operator_s3_get_bucket_tagging] @@ -94,7 +94,7 @@ Delete the tags of an Amazon S3 bucket To delete the tags of an Amazon S3 bucket you can use :class:`~airflow.providers.amazon.aws.operators.s3.S3DeleteBucketTaggingOperator`. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py :language: python :dedent: 4 :start-after: [START howto_operator_s3_delete_bucket_tagging] @@ -108,7 +108,7 @@ Create an Amazon S3 object To create a new (or replace) Amazon S3 object you can use :class:`~airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator`. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py :language: python :dedent: 4 :start-after: [START howto_operator_s3_create_object] @@ -123,7 +123,7 @@ To copy an Amazon S3 object from one bucket to another you can use :class:`~airflow.providers.amazon.aws.operators.s3.S3CopyObjectOperator`. The Amazon S3 connection used here needs to have access to both source and destination bucket/key. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py :language: python :dedent: 4 :start-after: [START howto_operator_s3_copy_object] @@ -137,7 +137,7 @@ Delete Amazon S3 objects To delete one or multiple Amazon S3 objects you can use :class:`~airflow.providers.amazon.aws.operators.s3.S3DeleteObjectsOperator`. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py :language: python :dedent: 4 :start-after: [START howto_operator_s3_delete_objects] @@ -153,7 +153,7 @@ To transform the data from one Amazon S3 object and save it to another object yo You can also apply an optional [Amazon S3 Select expression](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-glacier-select-sql-reference-select.html) to select the data you want to retrieve from ``source_s3_key`` using ``select_expression``. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py :language: python :dedent: 4 :start-after: [START howto_operator_s3_file_transform] @@ -169,7 +169,7 @@ To list all Amazon S3 prefixes within an Amazon S3 bucket you can use See `here `__ for more information about Amazon S3 prefixes. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py :language: python :dedent: 4 :start-after: [START howto_operator_s3_list_prefixes] @@ -184,7 +184,7 @@ To list all Amazon S3 objects within an Amazon S3 bucket you can use :class:`~airflow.providers.amazon.aws.operators.s3.S3ListOperator`. You can specify a ``prefix`` to filter the objects whose name begins with such prefix. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py :language: python :dedent: 4 :start-after: [START howto_operator_s3_list] @@ -208,7 +208,7 @@ Please keep in mind, especially when used to check a large volume of keys, that To check one file: -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py :language: python :dedent: 4 :start-after: [START howto_sensor_s3_key_single_key] @@ -216,7 +216,7 @@ To check one file: To check multiple files: -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py :language: python :dedent: 4 :start-after: [START howto_sensor_s3_key_multiple_keys] @@ -236,13 +236,13 @@ multiple files can match one key. The list of matched S3 object attributes conta [{"Size": int}] -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py :language: python :dedent: 4 :start-after: [START howto_sensor_s3_key_function_definition] :end-before: [END howto_sensor_s3_key_function_definition] -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py :language: python :dedent: 4 :start-after: [START howto_sensor_s3_key_function] @@ -259,7 +259,7 @@ the inactivity period has passed with no increase in the number of objects you c Note, this sensor will not behave correctly in reschedule mode, as the state of the listed objects in the Amazon S3 bucket will be lost between rescheduled invocations. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py :language: python :dedent: 4 :start-after: [START howto_sensor_s3_keys_unchanged]