Skip to content

Conversation

@utkarsharma2
Copy link
Contributor

@utkarsharma2 utkarsharma2 commented May 9, 2023

DynamoDBToS3Operator - Add a feature to export the table to a point in time.
Fixed - Now fetching the TableARN from client.describe_table(TableName=self.dynamodb_table_name) and added file_size in the example.

closes: #28830

state at this point in time.
"""
client = self.hook.conn.meta.client
table_description = client.describe_table(TableName=self.dynamodb_table_name)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed TableARN issue

backup_db_to_point_in_time = DynamoDBToS3Operator(
task_id="backup_db_to_point_in_time",
dynamodb_table_name=table_name,
file_size=1000,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added required param.

@utkarsharma2 utkarsharma2 changed the title Dynamo db to s3 operator DynamoDBToS3Operator - Add a feature to export the table to a point in time. May 9, 2023
@utkarsharma2
Copy link
Contributor Author

Have tested this locally with the below dag and it's working for me

from datetime import datetime

from airflow import DAG
from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDBToS3Operator

with DAG(
    dag_id='example_export_dynamodb',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    tags=['example'],
    catchup=False,
) as dag:
    dynamodb_to_s3_operator = DynamoDBToS3Operator(
        task_id="dynamodb_to_s3",
        dynamodb_table_name="test",
        s3_bucket_name="tmp9",
        file_size=4000,
        export_time=datetime.now(),
        aws_conn_id="aws_default",
        s3_key_prefix="test1"
    )

Screenshot 2023-05-09 at 11 50 41 AM

Copy link
Contributor

@ferruzzi ferruzzi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for not only taking on the fixes, but getting the done so quick. LGTM (again 🤦)

client = self.hook.conn.meta.client
table_description = client.describe_table(TableName=self.dynamodb_table_name)
response = client.export_table_to_point_in_time(
TableArn=table_description.get("Table", {}).get("TableArn"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw your question in the other PR before I saw you already submitted this one. Glad you came tot his solution anyway :) As I mentioned over there, another possible solution would have been to use cached_property to get and store the ARN, but I don't think there is much difference in this case in the end. 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on this one :) Thanks for your reactivity!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for a quick review. I'll be more mindful going forward with respect to system tests. 👍

Copy link
Contributor

@o-nikolas o-nikolas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick turnaround!

@o-nikolas o-nikolas merged commit cd3fa33 into apache:main May 9, 2023
@ferruzzi
Copy link
Contributor

@utkarsharma2 System tests still failing with the mesage



Traceback (most recent call last):
--
File "/opt/airflow/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py", line 142, in execute
self._export_table_to_point_in_time()
File "/opt/airflow/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py", line 158, in _export_table_to_point_in_time
ExportFormat=self.export_format,
File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 530, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 960, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the ExportTableToPointInTime operation: Invalid Request: 1 validation error detected: Value '' at 's3Prefix' failed to satisfy constraint: Member must have length greater than or equal to 3

@ferruzzi
Copy link
Contributor

I'm at Open source Summit so I can't look too closely right now, but I wonder if the prefix has to start with s3: or s3/ or something like that?

@ferruzzi
Copy link
Contributor

Or if a blank prefix is viable, maybe try using airflow.helpers.prune_dict() to drop the None value

@utkarsharma2
Copy link
Contributor Author

@ferruzzi I was able to reproduce the issue on local and there was a missing key s3_key_prefix which I have added and the format for export_time was not correct both of the issues have been fixed in PR - #31197 and I have tested and verified below code on my local.

backup_db_to_point_in_time = DynamoDBToS3Operator(
        task_id="backup_db_to_point_in_time",
        dynamodb_table_name=table_name,
        file_size=1000,
        s3_bucket_name=bucket_name,
        export_time=datetime.now(),
        s3_key_prefix=f"{S3_KEY_PREFIX}-3-",
    )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Export DynamoDB table to S3 with PITR

4 participants