From 2b51ff81852d88e3cf16a6d36cc02323251b6868 Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Fri, 23 Sep 2022 10:10:14 -0700 Subject: [PATCH] System test for Dynamo DB --- .../example_dags/example_dynamodb_to_s3.py | 73 -------- .../operators/transfer/dynamodb_to_s3.rst | 4 +- .../amazon/aws/example_dynamodb_to_s3.py | 162 ++++++++++++++++++ 3 files changed, 164 insertions(+), 75 deletions(-) delete mode 100644 airflow/providers/amazon/aws/example_dags/example_dynamodb_to_s3.py create mode 100644 tests/system/providers/amazon/aws/example_dynamodb_to_s3.py diff --git a/airflow/providers/amazon/aws/example_dags/example_dynamodb_to_s3.py b/airflow/providers/amazon/aws/example_dags/example_dynamodb_to_s3.py deleted file mode 100644 index 844e0dec88eec..0000000000000 --- a/airflow/providers/amazon/aws/example_dags/example_dynamodb_to_s3.py +++ /dev/null @@ -1,73 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from datetime import datetime -from os import environ - -from airflow import DAG -from airflow.models.baseoperator import chain -from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDBToS3Operator - -TABLE_NAME = environ.get('DYNAMO_TABLE_NAME', 'ExistingDynamoDbTableName') -BUCKET_NAME = environ.get('S3_BUCKET_NAME', 'ExistingS3BucketName') - - -with DAG( - dag_id='example_dynamodb_to_s3', - start_date=datetime(2021, 1, 1), - tags=['example'], - catchup=False, -) as dag: - # [START howto_transfer_dynamodb_to_s3] - backup_db = DynamoDBToS3Operator( - task_id='backup_db', - dynamodb_table_name=TABLE_NAME, - s3_bucket_name=BUCKET_NAME, - # Max output file size in bytes. If the Table is too large, multiple files will be created. - file_size=1000, - ) - # [END howto_transfer_dynamodb_to_s3] - - # [START howto_transfer_dynamodb_to_s3_segmented] - # Segmenting allows the transfer to be parallelized into {segment} number of parallel tasks. - backup_db_segment_1 = DynamoDBToS3Operator( - task_id='backup-1', - dynamodb_table_name=TABLE_NAME, - s3_bucket_name=BUCKET_NAME, - # Max output file size in bytes. If the Table is too large, multiple files will be created. - file_size=1000, - dynamodb_scan_kwargs={ - "TotalSegments": 2, - "Segment": 0, - }, - ) - - backup_db_segment_2 = DynamoDBToS3Operator( - task_id="backup-2", - dynamodb_table_name=TABLE_NAME, - s3_bucket_name=BUCKET_NAME, - # Max output file size in bytes. If the Table is too large, multiple files will be created. - file_size=1000, - dynamodb_scan_kwargs={ - "TotalSegments": 2, - "Segment": 1, - }, - ) - # [END howto_transfer_dynamodb_to_s3_segmented] - - chain(backup_db, [backup_db_segment_1, backup_db_segment_2]) diff --git a/docs/apache-airflow-providers-amazon/operators/transfer/dynamodb_to_s3.rst b/docs/apache-airflow-providers-amazon/operators/transfer/dynamodb_to_s3.rst index 61727140a048c..508f3d4fbca4a 100644 --- a/docs/apache-airflow-providers-amazon/operators/transfer/dynamodb_to_s3.rst +++ b/docs/apache-airflow-providers-amazon/operators/transfer/dynamodb_to_s3.rst @@ -48,7 +48,7 @@ To get more information visit: Example usage: -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_dynamodb_to_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_dynamodb_to_s3.py :language: python :dedent: 4 :start-after: [START howto_transfer_dynamodb_to_s3] @@ -57,7 +57,7 @@ Example usage: To parallelize the replication, users can create multiple ``DynamoDBToS3Operator`` tasks using the ``TotalSegments`` parameter. For instance to replicate with parallelism of 2, create two tasks: -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_dynamodb_to_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_dynamodb_to_s3.py :language: python :dedent: 4 :start-after: [START howto_transfer_dynamodb_to_s3_segmented] diff --git a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py new file mode 100644 index 0000000000000..92128bc7fd382 --- /dev/null +++ b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py @@ -0,0 +1,162 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +import boto3 + +from airflow.decorators import task +from airflow.models.baseoperator import chain +from airflow.models.dag import DAG +from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator +from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDBToS3Operator +from airflow.utils.trigger_rule import TriggerRule +from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder + +DAG_ID = 'example_dynamodb_to_s3' + +sys_test_context_task = SystemTestContextBuilder().build() + +TABLE_ATTRIBUTES = [ + {'AttributeName': 'ID', 'AttributeType': 'S'}, + {'AttributeName': 'Value', 'AttributeType': 'S'}, +] +TABLE_KEY_SCHEMA = [ + {'AttributeName': 'ID', 'KeyType': 'HASH'}, + {'AttributeName': 'Value', 'KeyType': 'RANGE'}, +] +TABLE_THROUGHPUT = {'ReadCapacityUnits': 1, 'WriteCapacityUnits': 1} +S3_KEY_PREFIX = 'dynamodb-segmented-file' + + +@task +def set_up_table(table_name: str): + dynamo_resource = boto3.resource('dynamodb') + table = dynamo_resource.create_table( + AttributeDefinitions=TABLE_ATTRIBUTES, + TableName=table_name, + KeySchema=TABLE_KEY_SCHEMA, + ProvisionedThroughput=TABLE_THROUGHPUT, + ) + boto3.client('dynamodb').get_waiter('table_exists').wait( + TableName=table_name, WaiterConfig={'Delay': 10, 'MaxAttempts': 10} + ) + table.put_item(Item={'ID': '123', 'Value': 'Testing'}) + + +@task +def wait_for_bucket(s3_bucket_name): + waiter = boto3.client('s3').get_waiter('bucket_exists') + waiter.wait(Bucket=s3_bucket_name) + + +@task(trigger_rule=TriggerRule.ALL_DONE) +def delete_dynamodb_table(table_name: str): + boto3.resource('dynamodb').Table(table_name).delete() + boto3.client('dynamodb').get_waiter('table_not_exists').wait( + TableName=table_name, WaiterConfig={'Delay': 10, 'MaxAttempts': 10} + ) + + +with DAG( + dag_id=DAG_ID, + schedule='@once', + start_date=datetime(2021, 1, 1), + catchup=False, + tags=['example'], +) as dag: + test_context = sys_test_context_task() + env_id = test_context[ENV_ID_KEY] + table_name = f'{env_id}-dynamodb-table' + bucket_name = f'{env_id}-dynamodb-bucket' + + create_table = set_up_table(table_name=table_name) + + create_bucket = S3CreateBucketOperator(task_id='create_bucket', bucket_name=bucket_name) + + # [START howto_transfer_dynamodb_to_s3] + backup_db = DynamoDBToS3Operator( + task_id='backup_db', + dynamodb_table_name=table_name, + s3_bucket_name=bucket_name, + # Max output file size in bytes. If the Table is too large, multiple files will be created. + file_size=20, + ) + # [END howto_transfer_dynamodb_to_s3] + + # [START howto_transfer_dynamodb_to_s3_segmented] + # Segmenting allows the transfer to be parallelized into {segment} number of parallel tasks. + backup_db_segment_1 = DynamoDBToS3Operator( + task_id='backup_db_segment_1', + dynamodb_table_name=table_name, + s3_bucket_name=bucket_name, + # Max output file size in bytes. If the Table is too large, multiple files will be created. + file_size=1000, + s3_key_prefix=f'{S3_KEY_PREFIX}-1-', + dynamodb_scan_kwargs={ + "TotalSegments": 2, + "Segment": 0, + }, + ) + + backup_db_segment_2 = DynamoDBToS3Operator( + task_id="backup_db_segment_2", + dynamodb_table_name=table_name, + s3_bucket_name=bucket_name, + # Max output file size in bytes. If the Table is too large, multiple files will be created. + file_size=1000, + s3_key_prefix=f'{S3_KEY_PREFIX}-2-', + dynamodb_scan_kwargs={ + "TotalSegments": 2, + "Segment": 1, + }, + ) + # [END howto_transfer_dynamodb_to_s3_segmented] + delete_table = delete_dynamodb_table(table_name=table_name) + + delete_bucket = S3DeleteBucketOperator( + task_id='delete_bucket', + bucket_name=bucket_name, + trigger_rule=TriggerRule.ALL_DONE, + force_delete=True, + ) + + chain( + # TEST SETUP + test_context, + create_table, + create_bucket, + wait_for_bucket(s3_bucket_name=bucket_name), + # TEST BODY + backup_db, + backup_db_segment_1, + backup_db_segment_2, + # TEST TEARDOWN + delete_table, + delete_bucket, + ) + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)