From 4c854ce5c572d17c0ba46ecae00747355aca96b8 Mon Sep 17 00:00:00 2001 From: Vincent Beck Date: Wed, 17 May 2023 11:39:25 -0400 Subject: [PATCH 1/2] Fix AWS system test example_dynamodb_to_s3 --- .../amazon/aws/transfers/dynamodb_to_s3.py | 15 ++++++++++----- .../amazon/aws/example_dynamodb_to_s3.py | 19 ++++++++++++++++++- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py index c8eee0b9f5e28..48067a666fad2 100644 --- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py @@ -97,9 +97,14 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): template_fields: Sequence[str] = ( *AwsToAwsBaseOperator.template_fields, + "dynamodb_table_name", "s3_bucket_name", + "file_size", + "dynamodb_scan_kwargs", "s3_key_prefix", - "dynamodb_table_name", + "process_func", + "export_time", + "export_format", ) template_fields_renderers = { @@ -129,9 +134,6 @@ def __init__( self.export_time = export_time self.export_format = export_format - if self.export_time and self.export_time > datetime.now(): - raise ValueError("The export_time parameter cannot be a future time.") - @cached_property def hook(self): """Create DynamoDBHook""" @@ -148,6 +150,9 @@ def _export_table_to_point_in_time(self): Export data from start of epoc till `export_time`. Table export will be a snapshot of the table's state at this point in time. """ + if self.export_time and self.export_time > datetime.now(self.export_time.tzinfo): + raise ValueError("The export_time parameter cannot be a future time.") + client = self.hook.conn.meta.client table_description = client.describe_table(TableName=self.dynamodb_table_name) response = client.export_table_to_point_in_time( @@ -163,7 +168,7 @@ def _export_table_to_point_in_time(self): def _export_entire_data(self): """Export all data from the table.""" - table = self.hook.get_conn().Table(self.dynamodb_table_name) + table = self.hook.conn.Table(self.dynamodb_table_name) scan_kwargs = copy(self.dynamodb_scan_kwargs) if self.dynamodb_scan_kwargs else {} err = None f: IO[Any] diff --git a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py index efe16dd86a104..e482bc023e7cb 100644 --- a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py +++ b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py @@ -56,9 +56,24 @@ def set_up_table(table_name: str): boto3.client("dynamodb").get_waiter("table_exists").wait( TableName=table_name, WaiterConfig={"Delay": 10, "MaxAttempts": 10} ) + boto3.client("dynamodb").update_continuous_backups( + TableName=table_name, + PointInTimeRecoverySpecification={ + "PointInTimeRecoveryEnabled": True, + }, + ) table.put_item(Item={"ID": "123", "Value": "Testing"}) +@task +def get_export_time(table_name: str): + r = boto3.client("dynamodb").describe_continuous_backups( + TableName=table_name, + ) + + return r["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]["EarliestRestorableDateTime"] + + @task def wait_for_bucket(s3_bucket_name): waiter = boto3.client("s3").get_waiter("bucket_exists") @@ -128,13 +143,14 @@ def delete_dynamodb_table(table_name: str): ) # [END howto_transfer_dynamodb_to_s3_segmented] + export_time = get_export_time(table_name) # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time] backup_db_to_point_in_time = DynamoDBToS3Operator( task_id="backup_db_to_point_in_time", dynamodb_table_name=table_name, file_size=1000, s3_bucket_name=bucket_name, - export_time=datetime.utcnow() - datetime.timedelta(days=7), + export_time=export_time, s3_key_prefix=f"{S3_KEY_PREFIX}-3-", ) # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time] @@ -158,6 +174,7 @@ def delete_dynamodb_table(table_name: str): backup_db, backup_db_segment_1, backup_db_segment_2, + export_time, backup_db_to_point_in_time, # TEST TEARDOWN delete_table, From 56946cb40dfd792c044d4fc69eeb2c6e2c6e941b Mon Sep 17 00:00:00 2001 From: Vincent Beck Date: Wed, 17 May 2023 12:55:11 -0400 Subject: [PATCH 2/2] Fix unit test --- .../amazon/aws/transfers/test_dynamodb_to_s3.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py index 5bb02448f2537..bc1b9751b74ab 100644 --- a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py +++ b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py @@ -70,7 +70,7 @@ def test_dynamodb_to_s3_success(self, mock_aws_dynamodb_hook, mock_s3_hook): ] table = MagicMock() table.return_value.scan.side_effect = responses - mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table + mock_aws_dynamodb_hook.return_value.conn.Table = table s3_client = MagicMock() s3_client.return_value.upload_file = self.mock_upload_file @@ -99,7 +99,7 @@ def test_dynamodb_to_s3_success_with_decimal(self, mock_aws_dynamodb_hook, mock_ ] table = MagicMock() table.return_value.scan.side_effect = responses - mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table + mock_aws_dynamodb_hook.return_value.conn.Table = table s3_client = MagicMock() s3_client.return_value.upload_file = self.mock_upload_file @@ -198,7 +198,7 @@ def test_dynamodb_to_s3_with_different_aws_conn_id(self, mock_aws_dynamodb_hook, ] table = MagicMock() table.return_value.scan.side_effect = responses - mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table + mock_aws_dynamodb_hook.return_value.conn.Table = table s3_client = MagicMock() s3_client.return_value.upload_file = self.mock_upload_file @@ -234,7 +234,7 @@ def test_dynamodb_to_s3_with_two_different_connections(self, mock_aws_dynamodb_h ] table = MagicMock() table.return_value.scan.side_effect = responses - mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table + mock_aws_dynamodb_hook.return_value.conn.Table = table s3_client = MagicMock() s3_client.return_value.upload_file = self.mock_upload_file @@ -272,7 +272,7 @@ def test_dynamodb_to_s3_with_just_dest_aws_conn_id(self, mock_aws_dynamodb_hook, ] table = MagicMock() table.return_value.scan.side_effect = responses - mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table + mock_aws_dynamodb_hook.return_value.conn.Table = table s3_client = MagicMock() s3_client.return_value.upload_file = self.mock_upload_file @@ -356,4 +356,4 @@ def test_dynamodb_with_future_date(self): s3_bucket_name="airflow-bucket", file_size=4000, export_time=datetime(year=3000, month=1, day=1), - ) + ).execute(context={})