From ede29eb0b220de1c203e2a3997df31a414d877a0 Mon Sep 17 00:00:00 2001 From: Bas Harenslak Date: Mon, 28 Oct 2019 22:16:05 +0100 Subject: [PATCH 1/3] [AIRFLOW-5777] Migrate AWS DynamoDB components to /providers/aws [AIP-21] --- UPDATING.md | 13 +- airflow/contrib/operators/dynamodb_to_s3.py | 133 +---------- airflow/contrib/operators/hive_to_dynamodb.py | 109 --------- .../aws/hooks/dynamodb.py} | 0 airflow/providers/aws/operators/dynamodb.py | 226 ++++++++++++++++++ docs/operators-and-hooks-ref.rst | 4 +- scripts/ci/pylint_todo.txt | 2 +- .../contrib/operators/test_dynamodb_to_s3.py | 77 ------ .../aws/hooks/test_dynamodb.py} | 2 +- .../aws/operators/test_dynamodb.py} | 76 ++++-- tests/test_core_to_contrib.py | 12 + 11 files changed, 319 insertions(+), 335 deletions(-) delete mode 100644 airflow/contrib/operators/hive_to_dynamodb.py rename airflow/{contrib/hooks/aws_dynamodb_hook.py => providers/aws/hooks/dynamodb.py} (100%) create mode 100644 airflow/providers/aws/operators/dynamodb.py delete mode 100644 tests/contrib/operators/test_dynamodb_to_s3.py rename tests/{contrib/hooks/test_aws_dynamodb_hook.py => providers/aws/hooks/test_dynamodb.py} (97%) rename tests/{contrib/operators/test_hive_to_dynamodb_operator.py => providers/aws/operators/test_dynamodb.py} (65%) diff --git a/UPDATING.md b/UPDATING.md index a09271f909f97..ad05a27b9c9c9 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -48,11 +48,14 @@ All AWS components (hooks, operators, sensors, example DAGs) will be grouped tog components remain backwards compatible but raise a `DeprecationWarning` when imported from the old module. Migrated are: -| Old path | New path | -|-----------------------------------------------------------------|----------------------------------------------------------| -| airflow.contrib.hooks.aws_athena_hook.AWSAthenaHook | airflow.providers.aws.hooks.athena.AWSAthenaHook | -| airflow.contrib.operators.aws_athena_operator.AWSAthenaOperator | airflow.providers.aws.operators.athena.AWSAthenaOperator | -| airflow.contrib.sensors.aws_athena_sensor.AthenaSensor | airflow.providers.aws.sensors.athena.AthenaSensor | +| Old path | New path | +|---------------------------------------------------------------------------|-------------------------------------------------------------------------| +| airflow.contrib.hooks.aws_athena_hook.AWSAthenaHook | airflow.providers.aws.hooks.athena.AWSAthenaHook | +| airflow.contrib.hooks.aws_dynamodb_hook.AwsDynamoDBHook | airflow.providers.aws.hooks.dynamodb.AwsDynamoDBHook | +| airflow.contrib.operators.aws_athena_operator.AWSAthenaOperator | airflow.providers.aws.operators.athena.AWSAthenaOperator | +| airflow.contrib.operators.dynamodb_to_s3.DynamoDBToS3Operator | airflow.providers.aws.operators.dynamodb.DynamoDBToS3Operator | +| airflow.contrib.operators.hive_to_dynamodb.HiveToDynamoDBTransferOperator | airflow.providers.aws.operators.dynamodb.HiveToDynamoDBTransferOperator | +| airflow.contrib.sensors.aws_athena_sensor.AthenaSensor | airflow.providers.aws.sensors.athena.AthenaSensor | ### Additional arguments passed to BaseOperator cause an exception diff --git a/airflow/contrib/operators/dynamodb_to_s3.py b/airflow/contrib/operators/dynamodb_to_s3.py index 22b86a3c39279..2bff0f2eeef19 100644 --- a/airflow/contrib/operators/dynamodb_to_s3.py +++ b/airflow/contrib/operators/dynamodb_to_s3.py @@ -18,130 +18,15 @@ # under the License. # -""" -This module contains operators to replicate records from -DynamoDB table to S3. -""" +"""This module is deprecated. Please use `airflow.providers.aws.operators.dynamodb`.""" -from copy import copy -from os.path import getsize -from tempfile import NamedTemporaryFile -from typing import Any, Callable, Dict, Optional -from uuid import uuid4 +import warnings -from boto.compat import json # type: ignore +# pylint: disable=unused-import +from airflow.providers.aws.operators.dynamodb import DynamoDBToS3Operator # noqa -from airflow.contrib.hooks.aws_dynamodb_hook import AwsDynamoDBHook -from airflow.hooks.S3_hook import S3Hook -from airflow.models.baseoperator import BaseOperator - - -def _convert_item_to_json_bytes(item): - return (json.dumps(item) + '\n').encode('utf-8') - - -def _upload_file_to_s3(file_obj, bucket_name, s3_key_prefix): - s3_client = S3Hook().get_conn() - file_obj.seek(0) - s3_client.upload_file( - Filename=file_obj.name, - Bucket=bucket_name, - Key=s3_key_prefix + str(uuid4()), - ) - - -class DynamoDBToS3Operator(BaseOperator): - """ - Replicates records from a DynamoDB table to S3. - It scans a DynamoDB table and write the received records to a file - on the local filesystem. It flushes the file to S3 once the file size - exceeds the file size limit specified by the user. - - Users can also specify a filtering criteria using dynamodb_scan_kwargs - to only replicate records that satisfy the criteria. - - To parallelize the replication, users can create multiple tasks of DynamoDBToS3Operator. - For instance to replicate with parallelism of 2, create two tasks like: - - .. code-block:: - - op1 = DynamoDBToS3Operator( - task_id='replicator-1', - dynamodb_table_name='hello', - dynamodb_scan_kwargs={ - 'TotalSegments': 2, - 'Segment': 0, - }, - ... - ) - - op2 = DynamoDBToS3Operator( - task_id='replicator-2', - dynamodb_table_name='hello', - dynamodb_scan_kwargs={ - 'TotalSegments': 2, - 'Segment': 1, - }, - ... - ) - - :param dynamodb_table_name: Dynamodb table to replicate data from - :param s3_bucket_name: S3 bucket to replicate data to - :param file_size: Flush file to s3 if file size >= file_size - :param dynamodb_scan_kwargs: kwargs pass to # noqa: E501 pylint: disable=line-too-long - :param s3_key_prefix: Prefix of s3 object key - :param process_func: How we transforms a dynamodb item to bytes. By default we dump the json - """ - - def __init__(self, - dynamodb_table_name: str, - s3_bucket_name: str, - file_size: int, - dynamodb_scan_kwargs: Optional[Dict[str, Any]] = None, - s3_key_prefix: str = '', - process_func: Callable[[Dict[str, Any]], bytes] = _convert_item_to_json_bytes, - *args, **kwargs): - super(DynamoDBToS3Operator, self).__init__(*args, **kwargs) - self.file_size = file_size - self.process_func = process_func - self.dynamodb_table_name = dynamodb_table_name - self.dynamodb_scan_kwargs = dynamodb_scan_kwargs - self.s3_bucket_name = s3_bucket_name - self.s3_key_prefix = s3_key_prefix - - def execute(self, context): - table = AwsDynamoDBHook().get_conn().Table(self.dynamodb_table_name) - scan_kwargs = copy(self.dynamodb_scan_kwargs) if self.dynamodb_scan_kwargs else {} - err = None - f = NamedTemporaryFile() - try: - f = self._scan_dynamodb_and_upload_to_s3(f, scan_kwargs, table) - except Exception as e: - err = e - raise e - finally: - if err is None: - _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix) - f.close() - - def _scan_dynamodb_and_upload_to_s3(self, temp_file, scan_kwargs, table): - while True: - response = table.scan(**scan_kwargs) - items = response['Items'] - for item in items: - temp_file.write(self.process_func(item)) - - if 'LastEvaluatedKey' not in response: - # no more items to scan - break - - last_evaluated_key = response['LastEvaluatedKey'] - scan_kwargs['ExclusiveStartKey'] = last_evaluated_key - - # Upload the file to S3 if reach file size limit - if getsize(temp_file.name) >= self.file_size: - _upload_file_to_s3(temp_file, self.s3_bucket_name, - self.s3_key_prefix) - temp_file.close() - temp_file = NamedTemporaryFile() - return temp_file +warnings.warn( + "This module is deprecated. Please use `airflow.providers.aws.operators.dynamodb`.", + DeprecationWarning, + stacklevel=2, +) diff --git a/airflow/contrib/operators/hive_to_dynamodb.py b/airflow/contrib/operators/hive_to_dynamodb.py deleted file mode 100644 index 2d7a9f60fd077..0000000000000 --- a/airflow/contrib/operators/hive_to_dynamodb.py +++ /dev/null @@ -1,109 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import json - -from airflow.contrib.hooks.aws_dynamodb_hook import AwsDynamoDBHook -from airflow.hooks.hive_hooks import HiveServer2Hook -from airflow.models import BaseOperator -from airflow.utils.decorators import apply_defaults - - -class HiveToDynamoDBTransferOperator(BaseOperator): - """ - Moves data from Hive to DynamoDB, note that for now the data is loaded - into memory before being pushed to DynamoDB, so this operator should - be used for smallish amount of data. - - :param sql: SQL query to execute against the hive database. (templated) - :type sql: str - :param table_name: target DynamoDB table - :type table_name: str - :param table_keys: partition key and sort key - :type table_keys: list - :param pre_process: implement pre-processing of source data - :type pre_process: function - :param pre_process_args: list of pre_process function arguments - :type pre_process_args: list - :param pre_process_kwargs: dict of pre_process function arguments - :type pre_process_kwargs: dict - :param region_name: aws region name (example: us-east-1) - :type region_name: str - :param schema: hive database schema - :type schema: str - :param hiveserver2_conn_id: source hive connection - :type hiveserver2_conn_id: str - :param aws_conn_id: aws connection - :type aws_conn_id: str - """ - - template_fields = ('sql',) - template_ext = ('.sql',) - ui_color = '#a0e08c' - - @apply_defaults - def __init__( - self, - sql, - table_name, - table_keys, - pre_process=None, - pre_process_args=None, - pre_process_kwargs=None, - region_name=None, - schema='default', - hiveserver2_conn_id='hiveserver2_default', - aws_conn_id='aws_default', - *args, **kwargs): - super().__init__(*args, **kwargs) - self.sql = sql - self.table_name = table_name - self.table_keys = table_keys - self.pre_process = pre_process - self.pre_process_args = pre_process_args - self.pre_process_kwargs = pre_process_kwargs - self.region_name = region_name - self.schema = schema - self.hiveserver2_conn_id = hiveserver2_conn_id - self.aws_conn_id = aws_conn_id - - def execute(self, context): - hive = HiveServer2Hook(hiveserver2_conn_id=self.hiveserver2_conn_id) - - self.log.info('Extracting data from Hive') - self.log.info(self.sql) - - data = hive.get_pandas_df(self.sql, schema=self.schema) - dynamodb = AwsDynamoDBHook(aws_conn_id=self.aws_conn_id, - table_name=self.table_name, - table_keys=self.table_keys, - region_name=self.region_name) - - self.log.info('Inserting rows into dynamodb') - - if self.pre_process is None: - dynamodb.write_batch_data( - json.loads(data.to_json(orient='records'))) - else: - dynamodb.write_batch_data( - self.pre_process(data=data, - args=self.pre_process_args, - kwargs=self.pre_process_kwargs)) - - self.log.info('Done.') diff --git a/airflow/contrib/hooks/aws_dynamodb_hook.py b/airflow/providers/aws/hooks/dynamodb.py similarity index 100% rename from airflow/contrib/hooks/aws_dynamodb_hook.py rename to airflow/providers/aws/hooks/dynamodb.py diff --git a/airflow/providers/aws/operators/dynamodb.py b/airflow/providers/aws/operators/dynamodb.py new file mode 100644 index 0000000000000..7a682cc3df16c --- /dev/null +++ b/airflow/providers/aws/operators/dynamodb.py @@ -0,0 +1,226 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json +from copy import copy +from os.path import getsize +from tempfile import NamedTemporaryFile +from typing import Optional, Dict, Any, Callable +from uuid import uuid4 + +from airflow.hooks.S3_hook import S3Hook +from airflow.hooks.hive_hooks import HiveServer2Hook +from airflow.models import BaseOperator +from airflow.providers.aws.hooks.dynamodb import AwsDynamoDBHook +from airflow.utils.decorators import apply_defaults + + +class HiveToDynamoDBTransferOperator(BaseOperator): + """ + Moves data from Hive to DynamoDB, note that for now the data is loaded + into memory before being pushed to DynamoDB, so this operator should + be used for smallish amount of data. + + :param sql: SQL query to execute against the hive database. (templated) + :type sql: str + :param table_name: target DynamoDB table + :type table_name: str + :param table_keys: partition key and sort key + :type table_keys: list + :param pre_process: implement pre-processing of source data + :type pre_process: function + :param pre_process_args: list of pre_process function arguments + :type pre_process_args: list + :param pre_process_kwargs: dict of pre_process function arguments + :type pre_process_kwargs: dict + :param region_name: aws region name (example: us-east-1) + :type region_name: str + :param schema: hive database schema + :type schema: str + :param hiveserver2_conn_id: source hive connection + :type hiveserver2_conn_id: str + :param aws_conn_id: aws connection + :type aws_conn_id: str + """ + + template_fields = ('sql',) + template_ext = ('.sql',) + ui_color = '#a0e08c' + + @apply_defaults + def __init__( + self, + sql, + table_name, + table_keys, + pre_process=None, + pre_process_args=None, + pre_process_kwargs=None, + region_name=None, + schema='default', + hiveserver2_conn_id='hiveserver2_default', + aws_conn_id='aws_default', + *args, **kwargs): + super().__init__(*args, **kwargs) + self.sql = sql + self.table_name = table_name + self.table_keys = table_keys + self.pre_process = pre_process + self.pre_process_args = pre_process_args + self.pre_process_kwargs = pre_process_kwargs + self.region_name = region_name + self.schema = schema + self.hiveserver2_conn_id = hiveserver2_conn_id + self.aws_conn_id = aws_conn_id + + def execute(self, context): + hive = HiveServer2Hook(hiveserver2_conn_id=self.hiveserver2_conn_id) + + self.log.info('Extracting data from Hive') + self.log.info(self.sql) + + data = hive.get_pandas_df(self.sql, schema=self.schema) + dynamodb = AwsDynamoDBHook(aws_conn_id=self.aws_conn_id, + table_name=self.table_name, + table_keys=self.table_keys, + region_name=self.region_name) + + self.log.info('Inserting rows into dynamodb') + + if self.pre_process is None: + dynamodb.write_batch_data( + json.loads(data.to_json(orient='records'))) + else: + dynamodb.write_batch_data( + self.pre_process(data=data, + args=self.pre_process_args, + kwargs=self.pre_process_kwargs)) + + self.log.info('Done.') + + +def _convert_item_to_json_bytes(item): + return (json.dumps(item) + '\n').encode('utf-8') + + +def _upload_file_to_s3(file_obj, bucket_name, s3_key_prefix): + s3_client = S3Hook().get_conn() + file_obj.seek(0) + s3_client.upload_file( + Filename=file_obj.name, + Bucket=bucket_name, + Key=s3_key_prefix + str(uuid4()), + ) + + +class DynamoDBToS3Operator(BaseOperator): + """ + Replicates records from a DynamoDB table to S3. + It scans a DynamoDB table and write the received records to a file + on the local filesystem. It flushes the file to S3 once the file size + exceeds the file size limit specified by the user. + + Users can also specify a filtering criteria using dynamodb_scan_kwargs + to only replicate records that satisfy the criteria. + + To parallelize the replication, users can create multiple tasks of DynamoDBToS3Operator. + For instance to replicate with parallelism of 2, create two tasks like: + + .. code-block:: + + op1 = DynamoDBToS3Operator( + task_id='replicator-1', + dynamodb_table_name='hello', + dynamodb_scan_kwargs={ + 'TotalSegments': 2, + 'Segment': 0, + }, + ... + ) + + op2 = DynamoDBToS3Operator( + task_id='replicator-2', + dynamodb_table_name='hello', + dynamodb_scan_kwargs={ + 'TotalSegments': 2, + 'Segment': 1, + }, + ... + ) + + :param dynamodb_table_name: Dynamodb table to replicate data from + :param s3_bucket_name: S3 bucket to replicate data to + :param file_size: Flush file to s3 if file size >= file_size + :param dynamodb_scan_kwargs: kwargs pass to # noqa: E501 pylint: disable=line-too-long + :param s3_key_prefix: Prefix of s3 object key + :param process_func: How we transforms a dynamodb item to bytes. By default we dump the json + """ + + def __init__(self, + dynamodb_table_name: str, + s3_bucket_name: str, + file_size: int, + dynamodb_scan_kwargs: Optional[Dict[str, Any]] = None, + s3_key_prefix: str = '', + process_func: Callable[[Dict[str, Any]], bytes] = _convert_item_to_json_bytes, + *args, **kwargs): + super(DynamoDBToS3Operator, self).__init__(*args, **kwargs) + self.file_size = file_size + self.process_func = process_func + self.dynamodb_table_name = dynamodb_table_name + self.dynamodb_scan_kwargs = dynamodb_scan_kwargs + self.s3_bucket_name = s3_bucket_name + self.s3_key_prefix = s3_key_prefix + + def execute(self, context): + table = AwsDynamoDBHook().get_conn().Table(self.dynamodb_table_name) + scan_kwargs = copy(self.dynamodb_scan_kwargs) if self.dynamodb_scan_kwargs else {} + err = None + f = NamedTemporaryFile() + try: + f = self._scan_dynamodb_and_upload_to_s3(f, scan_kwargs, table) + except Exception as e: + err = e + raise e + finally: + if err is None: + _upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix) + f.close() + + def _scan_dynamodb_and_upload_to_s3(self, temp_file, scan_kwargs, table): + while True: + response = table.scan(**scan_kwargs) + items = response['Items'] + for item in items: + temp_file.write(self.process_func(item)) + + if 'LastEvaluatedKey' not in response: + # no more items to scan + break + + last_evaluated_key = response['LastEvaluatedKey'] + scan_kwargs['ExclusiveStartKey'] = last_evaluated_key + + # Upload the file to S3 if reach file size limit + if getsize(temp_file.name) >= self.file_size: + _upload_file_to_s3(temp_file, self.s3_bucket_name, + self.s3_key_prefix) + temp_file.close() + temp_file = NamedTemporaryFile() + return temp_file diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst index fc516f4115895..0afb92147caca 100644 --- a/docs/operators-and-hooks-ref.rst +++ b/docs/operators-and-hooks-ref.rst @@ -320,8 +320,8 @@ These integrations allow you to perform various operations within the Amazon Web - * - `Amazon DynamoDB `__ - - :mod:`airflow.contrib.hooks.aws_dynamodb_hook` - - + - :mod:`airflow.providers.aws.hooks.dynamodb` + - :mod:`airflow.providers.aws.operators.dynamodb` - * - `Amazon EC2 `__ diff --git a/scripts/ci/pylint_todo.txt b/scripts/ci/pylint_todo.txt index cdbe04939ef47..3a5155408dc7c 100644 --- a/scripts/ci/pylint_todo.txt +++ b/scripts/ci/pylint_todo.txt @@ -49,7 +49,6 @@ ./airflow/contrib/operators/file_to_wasb.py ./airflow/contrib/operators/grpc_operator.py ./airflow/contrib/operators/hipchat_operator.py -./airflow/contrib/operators/hive_to_dynamodb.py ./airflow/contrib/operators/jenkins_job_trigger_operator.py ./airflow/contrib/operators/jira_operator.py ./airflow/contrib/operators/mongo_to_s3.py @@ -201,6 +200,7 @@ ./airflow/operators/subdag_operator.py ./airflow/plugins_manager.py ./airflow/providers/aws/operators/athena.py +./airflow/providers/aws/operators/dynamodb.py ./airflow/providers/aws/sensors/athena.py ./airflow/sensors/__init__.py ./airflow/sensors/base_sensor_operator.py diff --git a/tests/contrib/operators/test_dynamodb_to_s3.py b/tests/contrib/operators/test_dynamodb_to_s3.py deleted file mode 100644 index ea09ebaba1ed0..0000000000000 --- a/tests/contrib/operators/test_dynamodb_to_s3.py +++ /dev/null @@ -1,77 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import unittest -from multiprocessing import SimpleQueue -from unittest.mock import MagicMock, patch - -from boto.compat import json # type: ignore - -from airflow.contrib.operators.dynamodb_to_s3 import DynamoDBToS3Operator - - -class DynamodbToS3Test(unittest.TestCase): - - def setUp(self): - self.output_queue = SimpleQueue() - - def mock_upload_file(Filename, Bucket, Key): # pylint: disable=unused-argument,invalid-name - with open(Filename) as f: - lines = f.readlines() - for line in lines: - self.output_queue.put(json.loads(line)) - self.mock_upload_file_func = mock_upload_file - - def output_queue_to_list(self): - items = [] - while not self.output_queue.empty(): - items.append(self.output_queue.get()) - return items - - @patch('airflow.contrib.operators.dynamodb_to_s3.S3Hook') - @patch('airflow.contrib.operators.dynamodb_to_s3.AwsDynamoDBHook') - def test_dynamodb_to_s3_success(self, mock_aws_dynamodb_hook, mock_s3_hook): - responses = [ - { - 'Items': [{'a': 1}, {'b': 2}], - 'LastEvaluatedKey': '123', - }, - { - 'Items': [{'c': 3}], - }, - ] - table = MagicMock() - table.return_value.scan.side_effect = responses - mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table - - s3_client = MagicMock() - s3_client.return_value.upload_file = self.mock_upload_file_func - mock_s3_hook.return_value.get_conn = s3_client - - dynamodb_to_s3_operator = DynamoDBToS3Operator( - task_id='dynamodb_to_s3', - dynamodb_table_name='airflow_rocks', - s3_bucket_name='airflow-bucket', - file_size=4000, - ) - - dynamodb_to_s3_operator.execute(context={}) - - self.assertEqual([{'a': 1}, {'b': 2}, {'c': 3}], self.output_queue_to_list()) diff --git a/tests/contrib/hooks/test_aws_dynamodb_hook.py b/tests/providers/aws/hooks/test_dynamodb.py similarity index 97% rename from tests/contrib/hooks/test_aws_dynamodb_hook.py rename to tests/providers/aws/hooks/test_dynamodb.py index 835c042b2dc62..53c3269627e41 100644 --- a/tests/contrib/hooks/test_aws_dynamodb_hook.py +++ b/tests/providers/aws/hooks/test_dynamodb.py @@ -21,7 +21,7 @@ import unittest import uuid -from airflow.contrib.hooks.aws_dynamodb_hook import AwsDynamoDBHook +from airflow.providers.aws.hooks.dynamodb import AwsDynamoDBHook try: from moto import mock_dynamodb2 diff --git a/tests/contrib/operators/test_hive_to_dynamodb_operator.py b/tests/providers/aws/operators/test_dynamodb.py similarity index 65% rename from tests/contrib/operators/test_hive_to_dynamodb_operator.py rename to tests/providers/aws/operators/test_dynamodb.py index 5948e83946cda..38b579a967a2f 100644 --- a/tests/contrib/operators/test_hive_to_dynamodb_operator.py +++ b/tests/providers/aws/operators/test_dynamodb.py @@ -16,22 +16,20 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# import datetime -import json import unittest +from multiprocessing import SimpleQueue from unittest import mock +from unittest.mock import MagicMock, patch import pandas as pd +from boto.compat import json # type: ignore -import airflow.contrib.operators.hive_to_dynamodb -from airflow import DAG -from airflow.contrib.hooks.aws_dynamodb_hook import AwsDynamoDBHook - -DEFAULT_DATE = datetime.datetime(2015, 1, 1) -DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat() -DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10] +import airflow.providers.aws.operators.dynamodb +from airflow.models import DAG +from airflow.providers.aws.hooks.dynamodb import AwsDynamoDBHook +from airflow.providers.aws.operators.dynamodb import DynamoDBToS3Operator try: from moto import mock_dynamodb2 @@ -39,10 +37,60 @@ mock_dynamodb2 = None +class DynamodbToS3Test(unittest.TestCase): + + def setUp(self): + self.output_queue = SimpleQueue() + + def mock_upload_file(Filename, Bucket, Key): # pylint: disable=unused-argument,invalid-name + with open(Filename) as f: + lines = f.readlines() + for line in lines: + self.output_queue.put(json.loads(line)) + self.mock_upload_file_func = mock_upload_file + + def output_queue_to_list(self): + items = [] + while not self.output_queue.empty(): + items.append(self.output_queue.get()) + return items + + @patch('airflow.providers.aws.operators.dynamodb.S3Hook') + @patch('airflow.providers.aws.operators.dynamodb.AwsDynamoDBHook') + def test_dynamodb_to_s3_success(self, mock_aws_dynamodb_hook, mock_s3_hook): + responses = [ + { + 'Items': [{'a': 1}, {'b': 2}], + 'LastEvaluatedKey': '123', + }, + { + 'Items': [{'c': 3}], + }, + ] + table = MagicMock() + table.return_value.scan.side_effect = responses + mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table + + s3_client = MagicMock() + s3_client.return_value.upload_file = self.mock_upload_file_func + mock_s3_hook.return_value.get_conn = s3_client + + dynamodb_to_s3_operator = DynamoDBToS3Operator( + task_id='dynamodb_to_s3', + dynamodb_table_name='airflow_rocks', + s3_bucket_name='airflow-bucket', + file_size=4000, + ) + + dynamodb_to_s3_operator.execute(context={}) + + self.assertEqual([{'a': 1}, {'b': 2}, {'c': 3}], self.output_queue_to_list()) + + class TestHiveToDynamoDBTransferOperator(unittest.TestCase): def setUp(self): - args = {'owner': 'airflow', 'start_date': DEFAULT_DATE} + args = {'owner': 'airflow', 'start_date': datetime.datetime(2015, 1, 1)} dag = DAG('test_dag_id', default_args=args) self.dag = dag self.sql = 'SELECT 1' @@ -85,7 +133,7 @@ def test_get_records_with_schema(self, mock_get_pandas_df): } ) - operator = airflow.contrib.operators.hive_to_dynamodb.HiveToDynamoDBTransferOperator( + operator = airflow.providers.aws.operators.dynamodb.HiveToDynamoDBTransferOperator( sql=self.sql, table_name="test_airflow", task_id='hive_to_dynamodb_check', @@ -125,7 +173,7 @@ def test_pre_process_records_with_schema(self, mock_get_pandas_df): } ) - operator = airflow.contrib.operators.hive_to_dynamodb.HiveToDynamoDBTransferOperator( + operator = airflow.providers.aws.operators.dynamodb.HiveToDynamoDBTransferOperator( sql=self.sql, table_name='test_airflow', task_id='hive_to_dynamodb_check', @@ -138,7 +186,3 @@ def test_pre_process_records_with_schema(self, mock_get_pandas_df): table = self.hook.get_conn().Table('test_airflow') table.meta.client.get_waiter('table_exists').wait(TableName='test_airflow') self.assertEqual(table.item_count, 1) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/test_core_to_contrib.py b/tests/test_core_to_contrib.py index 1ddf0dc48569b..79965a0d1f6ec 100644 --- a/tests/test_core_to_contrib.py +++ b/tests/test_core_to_contrib.py @@ -132,6 +132,10 @@ "airflow.providers.aws.hooks.athena.AWSAthenaHook", "airflow.contrib.hooks.aws_athena_hook.AWSAthenaHook", ), + ( + "airflow.providers.aws.hooks.dynamodb.AwsDynamoDBHook", + "airflow.contrib.hooks.aws_dynamodb_hook.AwsDynamoDBHook", + ), ] OPERATOR = [ ( @@ -741,6 +745,14 @@ "airflow.providers.aws.operators.athena.AWSAthenaOperator", "airflow.contrib.operators.aws_athena_operator.AWSAthenaOperator", ), + ( + "airflow.providers.aws.operators.dynamodb.HiveToDynamoDBTransferOperator", + "airflow.contrib.operators.hive_to_dynamodb.HiveToDynamoDBTransferOperator", + ), + ( + "airflow.providers.aws.operators.dynamodb.DynamoDBToS3Operator", + "airflow.contrib.operators.dynamodb_to_s3.DynamoDBToS3Operator", + ), ] SENSOR = [ ( From 0faf55c0bc58f38ec4096d8637a8bbddb54c6703 Mon Sep 17 00:00:00 2001 From: Bas Harenslak Date: Mon, 28 Oct 2019 22:23:35 +0100 Subject: [PATCH 2/3] isort on airflow/providers/aws/operators/dynamodb.py --- airflow/providers/aws/operators/dynamodb.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/aws/operators/dynamodb.py b/airflow/providers/aws/operators/dynamodb.py index 7a682cc3df16c..2ff77e85adc98 100644 --- a/airflow/providers/aws/operators/dynamodb.py +++ b/airflow/providers/aws/operators/dynamodb.py @@ -21,11 +21,11 @@ from copy import copy from os.path import getsize from tempfile import NamedTemporaryFile -from typing import Optional, Dict, Any, Callable +from typing import Any, Callable, Dict, Optional from uuid import uuid4 -from airflow.hooks.S3_hook import S3Hook from airflow.hooks.hive_hooks import HiveServer2Hook +from airflow.hooks.S3_hook import S3Hook from airflow.models import BaseOperator from airflow.providers.aws.hooks.dynamodb import AwsDynamoDBHook from airflow.utils.decorators import apply_defaults From c3893ee067cfddfbb43ce52555233705b804b4a9 Mon Sep 17 00:00:00 2001 From: Bas Harenslak Date: Mon, 28 Oct 2019 22:36:04 +0100 Subject: [PATCH 3/3] Add empty files with deprecation warnings on old paths --- airflow/contrib/hooks/aws_dynamodb_hook.py | 31 +++++++++++++++++++ airflow/contrib/operators/hive_to_dynamodb.py | 31 +++++++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 airflow/contrib/hooks/aws_dynamodb_hook.py create mode 100644 airflow/contrib/operators/hive_to_dynamodb.py diff --git a/airflow/contrib/hooks/aws_dynamodb_hook.py b/airflow/contrib/hooks/aws_dynamodb_hook.py new file mode 100644 index 0000000000000..41b7fa952d6bd --- /dev/null +++ b/airflow/contrib/hooks/aws_dynamodb_hook.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""This module is deprecated. Please use `airflow.providers.aws.hooks.dynamodb`.""" + +import warnings + +# pylint: disable=unused-import +from airflow.providers.aws.hooks.dynamodb import AwsDynamoDBHook # noqa + +warnings.warn( + "This module is deprecated. Please use `airflow.providers.aws.hooks.dynamodb`.", + DeprecationWarning, + stacklevel=2, +) diff --git a/airflow/contrib/operators/hive_to_dynamodb.py b/airflow/contrib/operators/hive_to_dynamodb.py new file mode 100644 index 0000000000000..d1478348163e5 --- /dev/null +++ b/airflow/contrib/operators/hive_to_dynamodb.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""This module is deprecated. Please use `airflow.providers.aws.operators.dynamodb`.""" + +import warnings + +# pylint: disable=unused-import +from airflow.providers.aws.operators.dynamodb import HiveToDynamoDBTransferOperator # noqa + +warnings.warn( + "This module is deprecated. Please use `airflow.providers.aws.operators.dynamodb`.", + DeprecationWarning, + stacklevel=2, +)