From ef510d26b62e67941dd98706f2de4cf268ddec07 Mon Sep 17 00:00:00 2001 From: Dennis Ferruzzi Date: Tue, 14 Dec 2021 12:08:19 -0800 Subject: [PATCH 1/7] Rename Athena Operator --- .../amazon/aws/example_dags/example_athena.py | 8 ++++---- .../providers/amazon/aws/operators/athena.py | 16 +++++++++++++-- .../prepare_provider_packages.py | 5 +++++ .../operators/athena.rst | 4 ++-- tests/deprecated_classes.py | 4 ++++ .../amazon/aws/operators/test_athena.py | 20 +++++++++---------- 6 files changed, 39 insertions(+), 18 deletions(-) diff --git a/airflow/providers/amazon/aws/example_dags/example_athena.py b/airflow/providers/amazon/aws/example_dags/example_athena.py index 3ae6e91e44534..80d30c2edd1bd 100644 --- a/airflow/providers/amazon/aws/example_dags/example_athena.py +++ b/airflow/providers/amazon/aws/example_dags/example_athena.py @@ -21,7 +21,7 @@ from airflow import DAG from airflow.decorators import task from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.providers.amazon.aws.operators.athena import AWSAthenaOperator +from airflow.providers.amazon.aws.operators.athena import AthenaOperator from airflow.providers.amazon.aws.sensors.athena import AthenaSensor # [START howto_operator_athena_env_variables] @@ -91,7 +91,7 @@ def read_results_from_s3(query_execution_id): # Using a task-decorated function to create a CSV file in S3 add_sample_data_to_s3 = add_sample_data_to_s3() - create_table = AWSAthenaOperator( + create_table = AthenaOperator( task_id='setup__create_table', query=QUERY_CREATE_TABLE, database=ATHENA_DATABASE, @@ -100,7 +100,7 @@ def read_results_from_s3(query_execution_id): max_tries=None, ) - read_table = AWSAthenaOperator( + read_table = AthenaOperator( task_id='query__read_table', query=QUERY_READ_TABLE, database=ATHENA_DATABASE, @@ -119,7 +119,7 @@ def read_results_from_s3(query_execution_id): # Using a task-decorated function to read the results from S3 read_results_from_s3 = read_results_from_s3(read_table.output) - drop_table = AWSAthenaOperator( + drop_table = AthenaOperator( task_id='teardown__drop_table', query=QUERY_DROP_TABLE, database=ATHENA_DATABASE, diff --git a/airflow/providers/amazon/aws/operators/athena.py b/airflow/providers/amazon/aws/operators/athena.py index 19b96398889c8..a95587da99329 100644 --- a/airflow/providers/amazon/aws/operators/athena.py +++ b/airflow/providers/amazon/aws/operators/athena.py @@ -16,6 +16,7 @@ # specific language governing permissions and limitations # under the License. # +import warnings import sys from typing import Any, Dict, Optional from uuid import uuid4 @@ -29,13 +30,24 @@ from airflow.providers.amazon.aws.hooks.athena import AWSAthenaHook -class AWSAthenaOperator(BaseOperator): +class AWSAthenaOperator: + """Deprecated Operator""" + + warnings.warn( + "This Operator is deprecated. Please use " + "`airflow.providers.amazon.aws.operators.athena.AthenaOperator`.", + DeprecationWarning, + stacklevel=2, + ) + + +class AthenaOperator(BaseOperator): """ An operator that submits a presto query to athena. .. seealso:: For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:AWSAthenaOperator` + :ref:`howto/operator:AthenaOperator` :param query: Presto to be run on athena. (templated) :type query: str diff --git a/dev/provider_packages/prepare_provider_packages.py b/dev/provider_packages/prepare_provider_packages.py index 6c57392a8399c..b0321d22c4ea6 100755 --- a/dev/provider_packages/prepare_provider_packages.py +++ b/dev/provider_packages/prepare_provider_packages.py @@ -2115,6 +2115,11 @@ def summarise_total_vs_bad_and_warnings(total: int, bad: int, warns: List[warnin ), ("SelectableGroups dict interface is deprecated. Use select.", "kombu"), ("The module cloudant is now deprecated. The replacement is ibmcloudant.", "cloudant"), + ( + "This Operator is deprecated. Please use " + "`airflow.providers.amazon.aws.operators.athena.AthenaOperator`.", + "athena", + ), } # The set of warning messages generated by direct importing of some deprecated modules. We should only diff --git a/docs/apache-airflow-providers-amazon/operators/athena.rst b/docs/apache-airflow-providers-amazon/operators/athena.rst index 8f150a9065aaf..47e62d160e380 100644 --- a/docs/apache-airflow-providers-amazon/operators/athena.rst +++ b/docs/apache-airflow-providers-amazon/operators/athena.rst @@ -16,7 +16,7 @@ under the License. -.. _howto/operator:AWSAthenaOperator: +.. _howto/operator:AthenaOperator: Amazon Athena Operator ====================== @@ -33,7 +33,7 @@ Prerequisite Tasks Using Operator -------------- Use the -:class:`~airflow.providers.amazon.aws.operators.athena.AWSAthenaOperator` +:class:`~airflow.providers.amazon.aws.operators.athena.AthenaOperator` to run a query in Amazon Athena. To get started with Amazon Athena please visit `aws.amazon.com/athena `_ diff --git a/tests/deprecated_classes.py b/tests/deprecated_classes.py index e258c4d5bbcb7..62b8288851d42 100644 --- a/tests/deprecated_classes.py +++ b/tests/deprecated_classes.py @@ -971,6 +971,10 @@ "airflow.providers.amazon.aws.operators.athena.AWSAthenaOperator", "airflow.contrib.operators.aws_athena_operator.AWSAthenaOperator", ), + ( + "airflow.providers.amazon.aws.operators.athena.AthenaOperator", + "airflow.providers.amazon.aws.operators.athena.AWSAthenaOperator", + ), ( "airflow.providers.amazon.aws.operators.batch.AwsBatchOperator", "airflow.contrib.operators.awsbatch_operator.AWSBatchOperator", diff --git a/tests/providers/amazon/aws/operators/test_athena.py b/tests/providers/amazon/aws/operators/test_athena.py index 97ec4d205065d..a76cd702813d0 100644 --- a/tests/providers/amazon/aws/operators/test_athena.py +++ b/tests/providers/amazon/aws/operators/test_athena.py @@ -22,7 +22,7 @@ from airflow.models import DAG, DagRun, TaskInstance from airflow.providers.amazon.aws.hooks.athena import AWSAthenaHook -from airflow.providers.amazon.aws.operators.athena import AWSAthenaOperator +from airflow.providers.amazon.aws.operators.athena import AthenaOperator from airflow.utils import timezone from airflow.utils.timezone import datetime @@ -43,7 +43,7 @@ result_configuration = {'OutputLocation': MOCK_DATA['outputLocation']} -class TestAWSAthenaOperator(unittest.TestCase): +class TestAthenaOperator(unittest.TestCase): def setUp(self): args = { 'owner': 'airflow', @@ -51,8 +51,8 @@ def setUp(self): } self.dag = DAG(TEST_DAG_ID + 'test_schedule_dag_once', default_args=args, schedule_interval='@once') - self.athena = AWSAthenaOperator( - task_id='test_aws_athena_operator', + self.athena = AthenaOperator( + task_id='test_athena_operator', query='SELECT * FROM TEST_TABLE', database='TEST_DATABASE', output_location='s3://test_s3_bucket/', @@ -76,7 +76,7 @@ def test_init(self): @mock.patch.object(AWSAthenaHook, 'run_query', return_value=ATHENA_QUERY_ID) @mock.patch.object(AWSAthenaHook, 'get_conn') def test_hook_run_small_success_query(self, mock_conn, mock_run_query, mock_check_query_status): - self.athena.execute(None) + self.athena.execute({}) mock_run_query.assert_called_once_with( MOCK_DATA['query'], query_context, @@ -98,7 +98,7 @@ def test_hook_run_small_success_query(self, mock_conn, mock_run_query, mock_chec @mock.patch.object(AWSAthenaHook, 'run_query', return_value=ATHENA_QUERY_ID) @mock.patch.object(AWSAthenaHook, 'get_conn') def test_hook_run_big_success_query(self, mock_conn, mock_run_query, mock_check_query_status): - self.athena.execute(None) + self.athena.execute({}) mock_run_query.assert_called_once_with( MOCK_DATA['query'], query_context, @@ -120,7 +120,7 @@ def test_hook_run_big_success_query(self, mock_conn, mock_run_query, mock_check_ @mock.patch.object(AWSAthenaHook, 'get_conn') def test_hook_run_failed_query_with_none(self, mock_conn, mock_run_query, mock_check_query_status): with pytest.raises(Exception): - self.athena.execute(None) + self.athena.execute({}) mock_run_query.assert_called_once_with( MOCK_DATA['query'], query_context, @@ -145,7 +145,7 @@ def test_hook_run_failure_query( self, mock_conn, mock_run_query, mock_check_query_status, mock_get_state_change_reason ): with pytest.raises(Exception): - self.athena.execute(None) + self.athena.execute({}) mock_run_query.assert_called_once_with( MOCK_DATA['query'], query_context, @@ -169,7 +169,7 @@ def test_hook_run_failure_query( @mock.patch.object(AWSAthenaHook, 'get_conn') def test_hook_run_cancelled_query(self, mock_conn, mock_run_query, mock_check_query_status): with pytest.raises(Exception): - self.athena.execute(None) + self.athena.execute({}) mock_run_query.assert_called_once_with( MOCK_DATA['query'], query_context, @@ -192,7 +192,7 @@ def test_hook_run_cancelled_query(self, mock_conn, mock_run_query, mock_check_qu @mock.patch.object(AWSAthenaHook, 'get_conn') def test_hook_run_failed_query_with_max_tries(self, mock_conn, mock_run_query, mock_check_query_status): with pytest.raises(Exception): - self.athena.execute(None) + self.athena.execute({}) mock_run_query.assert_called_once_with( MOCK_DATA['query'], query_context, From 357fe144c2843bb43ab2744c9aa8fc0311606a7f Mon Sep 17 00:00:00 2001 From: Dennis Ferruzzi Date: Tue, 14 Dec 2021 13:41:52 -0800 Subject: [PATCH 2/7] Rename Athena Hook --- airflow/providers/amazon/aws/hooks/athena.py | 13 +++++- .../providers/amazon/aws/operators/athena.py | 14 +++--- .../providers/amazon/aws/sensors/athena.py | 8 ++-- .../prepare_provider_packages.py | 6 ++- setup.cfg | 1 + tests/deprecated_classes.py | 4 ++ .../providers/amazon/aws/hooks/test_athena.py | 28 +++++------ .../amazon/aws/operators/test_athena.py | 46 +++++++++---------- 8 files changed, 70 insertions(+), 50 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/athena.py b/airflow/providers/amazon/aws/hooks/athena.py index 9bb58fd7345c5..b086fd85f0cf7 100644 --- a/airflow/providers/amazon/aws/hooks/athena.py +++ b/airflow/providers/amazon/aws/hooks/athena.py @@ -17,6 +17,7 @@ # under the License. """This module contains AWS Athena hook""" +import warnings from time import sleep from typing import Any, Dict, Optional @@ -25,7 +26,17 @@ from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook -class AWSAthenaHook(AwsBaseHook): +class AWSAthenaHook: + """Deprecated Hook""" + + warnings.warn( + "This hook is deprecated. Please use `airflow.providers.amazon.aws.hooks.athena.AthenaHook`.", + DeprecationWarning, + stacklevel=2, + ) + + +class AthenaHook(AwsBaseHook): """ Interact with AWS Athena to run, poll queries and return query results diff --git a/airflow/providers/amazon/aws/operators/athena.py b/airflow/providers/amazon/aws/operators/athena.py index a95587da99329..ebfca5d07918d 100644 --- a/airflow/providers/amazon/aws/operators/athena.py +++ b/airflow/providers/amazon/aws/operators/athena.py @@ -27,14 +27,14 @@ from cached_property import cached_property from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.athena import AWSAthenaHook +from airflow.providers.amazon.aws.hooks.athena import AthenaHook class AWSAthenaOperator: """Deprecated Operator""" warnings.warn( - "This Operator is deprecated. Please use " + "This operator is deprecated. Please use " "`airflow.providers.amazon.aws.operators.athena.AthenaOperator`.", DeprecationWarning, stacklevel=2, @@ -105,9 +105,9 @@ def __init__( self.query_execution_id = None # type: Optional[str] @cached_property - def hook(self) -> AWSAthenaHook: - """Create and return an AWSAthenaHook.""" - return AWSAthenaHook(self.aws_conn_id, sleep_time=self.sleep_time) + def hook(self) -> AthenaHook: + """Create and return an AthenaHook.""" + return AthenaHook(self.aws_conn_id, sleep_time=self.sleep_time) def execute(self, context: dict) -> Optional[str]: """Run Presto Query on Athena""" @@ -122,13 +122,13 @@ def execute(self, context: dict) -> Optional[str]: ) query_status = self.hook.poll_query_status(self.query_execution_id, self.max_tries) - if query_status in AWSAthenaHook.FAILURE_STATES: + if query_status in AthenaHook.FAILURE_STATES: error_message = self.hook.get_state_change_reason(self.query_execution_id) raise Exception( f'Final state of Athena job is {query_status}, query_execution_id is ' f'{self.query_execution_id}. Error: {error_message}' ) - elif not query_status or query_status in AWSAthenaHook.INTERMEDIATE_STATES: + elif not query_status or query_status in AthenaHook.INTERMEDIATE_STATES: raise Exception( f'Final state of Athena job is {query_status}. Max tries of poll status exceeded, ' f'query_execution_id is {self.query_execution_id}.' diff --git a/airflow/providers/amazon/aws/sensors/athena.py b/airflow/providers/amazon/aws/sensors/athena.py index 232bfdb5859f0..449688a996285 100644 --- a/airflow/providers/amazon/aws/sensors/athena.py +++ b/airflow/providers/amazon/aws/sensors/athena.py @@ -24,7 +24,7 @@ from cached_property import cached_property from airflow.exceptions import AirflowException -from airflow.providers.amazon.aws.hooks.athena import AWSAthenaHook +from airflow.providers.amazon.aws.hooks.athena import AthenaHook from airflow.sensors.base import BaseSensorOperator @@ -85,6 +85,6 @@ def poke(self, context: dict) -> bool: return True @cached_property - def hook(self) -> AWSAthenaHook: - """Create and return an AWSAthenaHook""" - return AWSAthenaHook(self.aws_conn_id, sleep_time=self.sleep_time) + def hook(self) -> AthenaHook: + """Create and return an AthenaHook""" + return AthenaHook(self.aws_conn_id, sleep_time=self.sleep_time) diff --git a/dev/provider_packages/prepare_provider_packages.py b/dev/provider_packages/prepare_provider_packages.py index b0321d22c4ea6..8f1ff56776dc3 100755 --- a/dev/provider_packages/prepare_provider_packages.py +++ b/dev/provider_packages/prepare_provider_packages.py @@ -2116,10 +2116,14 @@ def summarise_total_vs_bad_and_warnings(total: int, bad: int, warns: List[warnin ("SelectableGroups dict interface is deprecated. Use select.", "kombu"), ("The module cloudant is now deprecated. The replacement is ibmcloudant.", "cloudant"), ( - "This Operator is deprecated. Please use " + "This operator is deprecated. Please use " "`airflow.providers.amazon.aws.operators.athena.AthenaOperator`.", "athena", ), + ( + "This hook is deprecated. Please use `airflow.providers.amazon.aws.hooks.athena.AthenaHook`.", + "athena", + ), } # The set of warning messages generated by direct importing of some deprecated modules. We should only diff --git a/setup.cfg b/setup.cfg index adba25a9d95f8..4cb2e497b59c8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -45,6 +45,7 @@ license_files = licenses/LICENSE-moment-strftime.txt licenses/LICENSE-moment.txt licenses/LICENSE-normalize.txt + licenses/LICENSES-ui.txt # End of licences generated automatically licenses/LICENSES-ui.txt classifiers = diff --git a/tests/deprecated_classes.py b/tests/deprecated_classes.py index 62b8288851d42..4f7a87ce108bb 100644 --- a/tests/deprecated_classes.py +++ b/tests/deprecated_classes.py @@ -143,6 +143,10 @@ "airflow.providers.amazon.aws.hooks.s3.S3Hook", "airflow.hooks.S3_hook.S3Hook", ), + ( + "airflow.providers.amazon.aws.hooks.athena.AthenaHook", + "airflow.providers.amazon.aws.hooks.athena.AWSAthenaHook", + ), ( "airflow.providers.amazon.aws.hooks.sqs.SQSHook", "airflow.contrib.hooks.aws_sqs_hook.SQSHook", diff --git a/tests/providers/amazon/aws/hooks/test_athena.py b/tests/providers/amazon/aws/hooks/test_athena.py index a3cf521383f2c..5b73b0f244936 100644 --- a/tests/providers/amazon/aws/hooks/test_athena.py +++ b/tests/providers/amazon/aws/hooks/test_athena.py @@ -18,7 +18,7 @@ import unittest from unittest import mock -from airflow.providers.amazon.aws.hooks.athena import AWSAthenaHook +from airflow.providers.amazon.aws.hooks.athena import AthenaHook MOCK_DATA = { 'query': 'SELECT * FROM TEST_TABLE', @@ -47,15 +47,15 @@ } -class TestAWSAthenaHook(unittest.TestCase): +class TestAthenaHook(unittest.TestCase): def setUp(self): - self.athena = AWSAthenaHook(sleep_time=0) + self.athena = AthenaHook(sleep_time=0) def test_init(self): assert self.athena.aws_conn_id == 'aws_default' assert self.athena.sleep_time == 0 - @mock.patch.object(AWSAthenaHook, 'get_conn') + @mock.patch.object(AthenaHook, 'get_conn') def test_hook_run_query_without_token(self, mock_conn): mock_conn.return_value.start_query_execution.return_value = MOCK_QUERY_EXECUTION result = self.athena.run_query( @@ -72,7 +72,7 @@ def test_hook_run_query_without_token(self, mock_conn): mock_conn.return_value.start_query_execution.assert_called_with(**expected_call_params) assert result == MOCK_DATA['query_execution_id'] - @mock.patch.object(AWSAthenaHook, 'get_conn') + @mock.patch.object(AthenaHook, 'get_conn') def test_hook_run_query_with_token(self, mock_conn): mock_conn.return_value.start_query_execution.return_value = MOCK_QUERY_EXECUTION result = self.athena.run_query( @@ -91,20 +91,20 @@ def test_hook_run_query_with_token(self, mock_conn): mock_conn.return_value.start_query_execution.assert_called_with(**expected_call_params) assert result == MOCK_DATA['query_execution_id'] - @mock.patch.object(AWSAthenaHook, 'get_conn') + @mock.patch.object(AthenaHook, 'get_conn') def test_hook_get_query_results_with_non_succeeded_query(self, mock_conn): mock_conn.return_value.get_query_execution.return_value = MOCK_RUNNING_QUERY_EXECUTION result = self.athena.get_query_results(query_execution_id=MOCK_DATA['query_execution_id']) assert result is None - @mock.patch.object(AWSAthenaHook, 'get_conn') + @mock.patch.object(AthenaHook, 'get_conn') def test_hook_get_query_results_with_default_params(self, mock_conn): mock_conn.return_value.get_query_execution.return_value = MOCK_SUCCEEDED_QUERY_EXECUTION self.athena.get_query_results(query_execution_id=MOCK_DATA['query_execution_id']) expected_call_params = {'QueryExecutionId': MOCK_DATA['query_execution_id'], 'MaxResults': 1000} mock_conn.return_value.get_query_results.assert_called_with(**expected_call_params) - @mock.patch.object(AWSAthenaHook, 'get_conn') + @mock.patch.object(AthenaHook, 'get_conn') def test_hook_get_query_results_with_next_token(self, mock_conn): mock_conn.return_value.get_query_execution.return_value = MOCK_SUCCEEDED_QUERY_EXECUTION self.athena.get_query_results( @@ -117,13 +117,13 @@ def test_hook_get_query_results_with_next_token(self, mock_conn): } mock_conn.return_value.get_query_results.assert_called_with(**expected_call_params) - @mock.patch.object(AWSAthenaHook, 'get_conn') + @mock.patch.object(AthenaHook, 'get_conn') def test_hook_get_paginator_with_non_succeeded_query(self, mock_conn): mock_conn.return_value.get_query_execution.return_value = MOCK_RUNNING_QUERY_EXECUTION result = self.athena.get_query_results_paginator(query_execution_id=MOCK_DATA['query_execution_id']) assert result is None - @mock.patch.object(AWSAthenaHook, 'get_conn') + @mock.patch.object(AthenaHook, 'get_conn') def test_hook_get_paginator_with_default_params(self, mock_conn): mock_conn.return_value.get_query_execution.return_value = MOCK_SUCCEEDED_QUERY_EXECUTION self.athena.get_query_results_paginator(query_execution_id=MOCK_DATA['query_execution_id']) @@ -133,7 +133,7 @@ def test_hook_get_paginator_with_default_params(self, mock_conn): } mock_conn.return_value.get_paginator.return_value.paginate.assert_called_with(**expected_call_params) - @mock.patch.object(AWSAthenaHook, 'get_conn') + @mock.patch.object(AthenaHook, 'get_conn') def test_hook_get_paginator_with_pagination_config(self, mock_conn): mock_conn.return_value.get_query_execution.return_value = MOCK_SUCCEEDED_QUERY_EXECUTION self.athena.get_query_results_paginator( @@ -152,14 +152,14 @@ def test_hook_get_paginator_with_pagination_config(self, mock_conn): } mock_conn.return_value.get_paginator.return_value.paginate.assert_called_with(**expected_call_params) - @mock.patch.object(AWSAthenaHook, 'get_conn') + @mock.patch.object(AthenaHook, 'get_conn') def test_hook_poll_query_when_final(self, mock_conn): mock_conn.return_value.get_query_execution.return_value = MOCK_SUCCEEDED_QUERY_EXECUTION result = self.athena.poll_query_status(query_execution_id=MOCK_DATA['query_execution_id']) mock_conn.return_value.get_query_execution.assert_called_once() assert result == 'SUCCEEDED' - @mock.patch.object(AWSAthenaHook, 'get_conn') + @mock.patch.object(AthenaHook, 'get_conn') def test_hook_poll_query_with_timeout(self, mock_conn): mock_conn.return_value.get_query_execution.return_value = MOCK_RUNNING_QUERY_EXECUTION result = self.athena.poll_query_status( @@ -168,7 +168,7 @@ def test_hook_poll_query_with_timeout(self, mock_conn): mock_conn.return_value.get_query_execution.assert_called_once() assert result == 'RUNNING' - @mock.patch.object(AWSAthenaHook, 'get_conn') + @mock.patch.object(AthenaHook, 'get_conn') def test_hook_get_output_location(self, mock_conn): mock_conn.return_value.get_query_execution.return_value = MOCK_QUERY_EXECUTION_OUTPUT result = self.athena.get_output_location(query_execution_id=MOCK_DATA['query_execution_id']) diff --git a/tests/providers/amazon/aws/operators/test_athena.py b/tests/providers/amazon/aws/operators/test_athena.py index a76cd702813d0..d07dfc161910a 100644 --- a/tests/providers/amazon/aws/operators/test_athena.py +++ b/tests/providers/amazon/aws/operators/test_athena.py @@ -21,7 +21,7 @@ import pytest from airflow.models import DAG, DagRun, TaskInstance -from airflow.providers.amazon.aws.hooks.athena import AWSAthenaHook +from airflow.providers.amazon.aws.hooks.athena import AthenaHook from airflow.providers.amazon.aws.operators.athena import AthenaOperator from airflow.utils import timezone from airflow.utils.timezone import datetime @@ -72,9 +72,9 @@ def test_init(self): assert self.athena.hook.sleep_time == 0 - @mock.patch.object(AWSAthenaHook, 'check_query_status', side_effect=("SUCCESS",)) - @mock.patch.object(AWSAthenaHook, 'run_query', return_value=ATHENA_QUERY_ID) - @mock.patch.object(AWSAthenaHook, 'get_conn') + @mock.patch.object(AthenaHook, 'check_query_status', side_effect=("SUCCESS",)) + @mock.patch.object(AthenaHook, 'run_query', return_value=ATHENA_QUERY_ID) + @mock.patch.object(AthenaHook, 'get_conn') def test_hook_run_small_success_query(self, mock_conn, mock_run_query, mock_check_query_status): self.athena.execute({}) mock_run_query.assert_called_once_with( @@ -87,7 +87,7 @@ def test_hook_run_small_success_query(self, mock_conn, mock_run_query, mock_chec assert mock_check_query_status.call_count == 1 @mock.patch.object( - AWSAthenaHook, + AthenaHook, 'check_query_status', side_effect=( "RUNNING", @@ -95,8 +95,8 @@ def test_hook_run_small_success_query(self, mock_conn, mock_run_query, mock_chec "SUCCESS", ), ) - @mock.patch.object(AWSAthenaHook, 'run_query', return_value=ATHENA_QUERY_ID) - @mock.patch.object(AWSAthenaHook, 'get_conn') + @mock.patch.object(AthenaHook, 'run_query', return_value=ATHENA_QUERY_ID) + @mock.patch.object(AthenaHook, 'get_conn') def test_hook_run_big_success_query(self, mock_conn, mock_run_query, mock_check_query_status): self.athena.execute({}) mock_run_query.assert_called_once_with( @@ -109,15 +109,15 @@ def test_hook_run_big_success_query(self, mock_conn, mock_run_query, mock_check_ assert mock_check_query_status.call_count == 3 @mock.patch.object( - AWSAthenaHook, + AthenaHook, 'check_query_status', side_effect=( None, None, ), ) - @mock.patch.object(AWSAthenaHook, 'run_query', return_value=ATHENA_QUERY_ID) - @mock.patch.object(AWSAthenaHook, 'get_conn') + @mock.patch.object(AthenaHook, 'run_query', return_value=ATHENA_QUERY_ID) + @mock.patch.object(AthenaHook, 'get_conn') def test_hook_run_failed_query_with_none(self, mock_conn, mock_run_query, mock_check_query_status): with pytest.raises(Exception): self.athena.execute({}) @@ -130,17 +130,17 @@ def test_hook_run_failed_query_with_none(self, mock_conn, mock_run_query, mock_c ) assert mock_check_query_status.call_count == 3 - @mock.patch.object(AWSAthenaHook, 'get_state_change_reason') + @mock.patch.object(AthenaHook, 'get_state_change_reason') @mock.patch.object( - AWSAthenaHook, + AthenaHook, 'check_query_status', side_effect=( "RUNNING", "FAILED", ), ) - @mock.patch.object(AWSAthenaHook, 'run_query', return_value=ATHENA_QUERY_ID) - @mock.patch.object(AWSAthenaHook, 'get_conn') + @mock.patch.object(AthenaHook, 'run_query', return_value=ATHENA_QUERY_ID) + @mock.patch.object(AthenaHook, 'get_conn') def test_hook_run_failure_query( self, mock_conn, mock_run_query, mock_check_query_status, mock_get_state_change_reason ): @@ -157,7 +157,7 @@ def test_hook_run_failure_query( assert mock_get_state_change_reason.call_count == 1 @mock.patch.object( - AWSAthenaHook, + AthenaHook, 'check_query_status', side_effect=( "RUNNING", @@ -165,8 +165,8 @@ def test_hook_run_failure_query( "CANCELLED", ), ) - @mock.patch.object(AWSAthenaHook, 'run_query', return_value=ATHENA_QUERY_ID) - @mock.patch.object(AWSAthenaHook, 'get_conn') + @mock.patch.object(AthenaHook, 'run_query', return_value=ATHENA_QUERY_ID) + @mock.patch.object(AthenaHook, 'get_conn') def test_hook_run_cancelled_query(self, mock_conn, mock_run_query, mock_check_query_status): with pytest.raises(Exception): self.athena.execute({}) @@ -180,7 +180,7 @@ def test_hook_run_cancelled_query(self, mock_conn, mock_run_query, mock_check_qu assert mock_check_query_status.call_count == 3 @mock.patch.object( - AWSAthenaHook, + AthenaHook, 'check_query_status', side_effect=( "RUNNING", @@ -188,8 +188,8 @@ def test_hook_run_cancelled_query(self, mock_conn, mock_run_query, mock_check_qu "RUNNING", ), ) - @mock.patch.object(AWSAthenaHook, 'run_query', return_value=ATHENA_QUERY_ID) - @mock.patch.object(AWSAthenaHook, 'get_conn') + @mock.patch.object(AthenaHook, 'run_query', return_value=ATHENA_QUERY_ID) + @mock.patch.object(AthenaHook, 'get_conn') def test_hook_run_failed_query_with_max_tries(self, mock_conn, mock_run_query, mock_check_query_status): with pytest.raises(Exception): self.athena.execute({}) @@ -202,9 +202,9 @@ def test_hook_run_failed_query_with_max_tries(self, mock_conn, mock_run_query, m ) assert mock_check_query_status.call_count == 3 - @mock.patch.object(AWSAthenaHook, 'check_query_status', side_effect=("SUCCESS",)) - @mock.patch.object(AWSAthenaHook, 'run_query', return_value=ATHENA_QUERY_ID) - @mock.patch.object(AWSAthenaHook, 'get_conn') + @mock.patch.object(AthenaHook, 'check_query_status', side_effect=("SUCCESS",)) + @mock.patch.object(AthenaHook, 'run_query', return_value=ATHENA_QUERY_ID) + @mock.patch.object(AthenaHook, 'get_conn') def test_return_value(self, mock_conn, mock_run_query, mock_check_query_status): """Test we return the right value -- that will get put in to XCom by the execution engine""" dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=timezone.utcnow(), run_id="test") From ef2b41f44aa79c0b7affc716d73387d9c3c04340 Mon Sep 17 00:00:00 2001 From: Dennis Ferruzzi Date: Tue, 14 Dec 2021 21:38:54 -0800 Subject: [PATCH 3/7] Missed a couple of renames --- .../amazon/aws/operators/test_athena.py | 2 +- .../amazon/aws/sensors/test_athena.py | 22 +++++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/tests/providers/amazon/aws/operators/test_athena.py b/tests/providers/amazon/aws/operators/test_athena.py index d07dfc161910a..060bfb40c182a 100644 --- a/tests/providers/amazon/aws/operators/test_athena.py +++ b/tests/providers/amazon/aws/operators/test_athena.py @@ -31,7 +31,7 @@ ATHENA_QUERY_ID = 'eac29bf8-daa1-4ffc-b19a-0db31dc3b784' MOCK_DATA = { - 'task_id': 'test_aws_athena_operator', + 'task_id': 'test_athena_operator', 'query': 'SELECT * FROM TEST_TABLE', 'database': 'TEST_DATABASE', 'outputLocation': 's3://test_s3_bucket/', diff --git a/tests/providers/amazon/aws/sensors/test_athena.py b/tests/providers/amazon/aws/sensors/test_athena.py index 781f94cdb190a..1ef99753700d0 100644 --- a/tests/providers/amazon/aws/sensors/test_athena.py +++ b/tests/providers/amazon/aws/sensors/test_athena.py @@ -22,7 +22,7 @@ import pytest from airflow.exceptions import AirflowException -from airflow.providers.amazon.aws.hooks.athena import AWSAthenaHook +from airflow.providers.amazon.aws.hooks.athena import AthenaHook from airflow.providers.amazon.aws.sensors.athena import AthenaSensor @@ -36,26 +36,26 @@ def setUp(self): aws_conn_id='aws_default', ) - @mock.patch.object(AWSAthenaHook, 'poll_query_status', side_effect=("SUCCEEDED",)) + @mock.patch.object(AthenaHook, 'poll_query_status', side_effect=("SUCCEEDED",)) def test_poke_success(self, mock_poll_query_status): - assert self.sensor.poke(None) + assert self.sensor.poke({}) - @mock.patch.object(AWSAthenaHook, 'poll_query_status', side_effect=("RUNNING",)) + @mock.patch.object(AthenaHook, 'poll_query_status', side_effect=("RUNNING",)) def test_poke_running(self, mock_poll_query_status): - assert not self.sensor.poke(None) + assert not self.sensor.poke({}) - @mock.patch.object(AWSAthenaHook, 'poll_query_status', side_effect=("QUEUED",)) + @mock.patch.object(AthenaHook, 'poll_query_status', side_effect=("QUEUED",)) def test_poke_queued(self, mock_poll_query_status): - assert not self.sensor.poke(None) + assert not self.sensor.poke({}) - @mock.patch.object(AWSAthenaHook, 'poll_query_status', side_effect=("FAILED",)) + @mock.patch.object(AthenaHook, 'poll_query_status', side_effect=("FAILED",)) def test_poke_failed(self, mock_poll_query_status): with pytest.raises(AirflowException) as ctx: - self.sensor.poke(None) + self.sensor.poke({}) assert 'Athena sensor failed' in str(ctx.value) - @mock.patch.object(AWSAthenaHook, 'poll_query_status', side_effect=("CANCELLED",)) + @mock.patch.object(AthenaHook, 'poll_query_status', side_effect=("CANCELLED",)) def test_poke_cancelled(self, mock_poll_query_status): with pytest.raises(AirflowException) as ctx: - self.sensor.poke(None) + self.sensor.poke({}) assert 'Athena sensor failed' in str(ctx.value) From 7e3f7b9bdc2b03faa1de0431789de0773ae04af2 Mon Sep 17 00:00:00 2001 From: Dennis Ferruzzi Date: Tue, 14 Dec 2021 21:39:53 -0800 Subject: [PATCH 4/7] Fixed Deprecation --- airflow/providers/amazon/aws/hooks/athena.py | 23 +++++++++-------- .../providers/amazon/aws/operators/athena.py | 25 +++++++++++-------- .../prepare_provider_packages.py | 9 ------- tests/deprecated_classes.py | 8 ------ 4 files changed, 27 insertions(+), 38 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/athena.py b/airflow/providers/amazon/aws/hooks/athena.py index b086fd85f0cf7..b29c632899618 100644 --- a/airflow/providers/amazon/aws/hooks/athena.py +++ b/airflow/providers/amazon/aws/hooks/athena.py @@ -26,16 +26,6 @@ from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook -class AWSAthenaHook: - """Deprecated Hook""" - - warnings.warn( - "This hook is deprecated. Please use `airflow.providers.amazon.aws.hooks.athena.AthenaHook`.", - DeprecationWarning, - stacklevel=2, - ) - - class AthenaHook(AwsBaseHook): """ Interact with AWS Athena to run, poll queries and return query results @@ -271,3 +261,16 @@ def stop_query(self, query_execution_id: str) -> Dict: :return: dict """ return self.get_conn().stop_query_execution(QueryExecutionId=query_execution_id) + + +class AWSAthenaHook(AthenaHook): + """ + This hook is deprecated. + Please use :class:`airflow.providers.amazon.aws.hooks.athena.AthenaHook`. + """ + + warnings.warn( + "This hook is deprecated. Please use `airflow.providers.amazon.aws.hooks.athena.AthenaHook`.", + DeprecationWarning, + stacklevel=2, + ) diff --git a/airflow/providers/amazon/aws/operators/athena.py b/airflow/providers/amazon/aws/operators/athena.py index ebfca5d07918d..18207d2682db8 100644 --- a/airflow/providers/amazon/aws/operators/athena.py +++ b/airflow/providers/amazon/aws/operators/athena.py @@ -30,17 +30,6 @@ from airflow.providers.amazon.aws.hooks.athena import AthenaHook -class AWSAthenaOperator: - """Deprecated Operator""" - - warnings.warn( - "This operator is deprecated. Please use " - "`airflow.providers.amazon.aws.operators.athena.AthenaOperator`.", - DeprecationWarning, - stacklevel=2, - ) - - class AthenaOperator(BaseOperator): """ An operator that submits a presto query to athena. @@ -155,3 +144,17 @@ def on_kill(self) -> None: 'Polling Athena for query with id %s to reach final state', self.query_execution_id ) self.hook.poll_query_status(self.query_execution_id) + + +class AWSAthenaOperator(AthenaOperator): + """ + This operator is deprecated. + Please use :class:`airflow.providers.amazon.aws.operators.athena.AthenaOperator`. + """ + + warnings.warn( + "This operator is deprecated. Please use " + "`airflow.providers.amazon.aws.operators.athena.AthenaOperator`.", + DeprecationWarning, + stacklevel=2, + ) diff --git a/dev/provider_packages/prepare_provider_packages.py b/dev/provider_packages/prepare_provider_packages.py index 8f1ff56776dc3..6c57392a8399c 100755 --- a/dev/provider_packages/prepare_provider_packages.py +++ b/dev/provider_packages/prepare_provider_packages.py @@ -2115,15 +2115,6 @@ def summarise_total_vs_bad_and_warnings(total: int, bad: int, warns: List[warnin ), ("SelectableGroups dict interface is deprecated. Use select.", "kombu"), ("The module cloudant is now deprecated. The replacement is ibmcloudant.", "cloudant"), - ( - "This operator is deprecated. Please use " - "`airflow.providers.amazon.aws.operators.athena.AthenaOperator`.", - "athena", - ), - ( - "This hook is deprecated. Please use `airflow.providers.amazon.aws.hooks.athena.AthenaHook`.", - "athena", - ), } # The set of warning messages generated by direct importing of some deprecated modules. We should only diff --git a/tests/deprecated_classes.py b/tests/deprecated_classes.py index 4f7a87ce108bb..e258c4d5bbcb7 100644 --- a/tests/deprecated_classes.py +++ b/tests/deprecated_classes.py @@ -143,10 +143,6 @@ "airflow.providers.amazon.aws.hooks.s3.S3Hook", "airflow.hooks.S3_hook.S3Hook", ), - ( - "airflow.providers.amazon.aws.hooks.athena.AthenaHook", - "airflow.providers.amazon.aws.hooks.athena.AWSAthenaHook", - ), ( "airflow.providers.amazon.aws.hooks.sqs.SQSHook", "airflow.contrib.hooks.aws_sqs_hook.SQSHook", @@ -975,10 +971,6 @@ "airflow.providers.amazon.aws.operators.athena.AWSAthenaOperator", "airflow.contrib.operators.aws_athena_operator.AWSAthenaOperator", ), - ( - "airflow.providers.amazon.aws.operators.athena.AthenaOperator", - "airflow.providers.amazon.aws.operators.athena.AWSAthenaOperator", - ), ( "airflow.providers.amazon.aws.operators.batch.AwsBatchOperator", "airflow.contrib.operators.awsbatch_operator.AWSBatchOperator", From d49580965ea64a5d0dfe749fe5257211421e3122 Mon Sep 17 00:00:00 2001 From: Dennis Ferruzzi Date: Tue, 14 Dec 2021 23:07:52 -0800 Subject: [PATCH 5/7] Moved deprecation messages into init methods --- airflow/providers/amazon/aws/hooks/athena.py | 12 +++++++----- airflow/providers/amazon/aws/operators/athena.py | 14 ++++++++------ 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/athena.py b/airflow/providers/amazon/aws/hooks/athena.py index b29c632899618..c7a91a5870c4c 100644 --- a/airflow/providers/amazon/aws/hooks/athena.py +++ b/airflow/providers/amazon/aws/hooks/athena.py @@ -269,8 +269,10 @@ class AWSAthenaHook(AthenaHook): Please use :class:`airflow.providers.amazon.aws.hooks.athena.AthenaHook`. """ - warnings.warn( - "This hook is deprecated. Please use `airflow.providers.amazon.aws.hooks.athena.AthenaHook`.", - DeprecationWarning, - stacklevel=2, - ) + def __init__(self, *args, **kwargs): + warnings.warn( + "This hook is deprecated. Please use `airflow.providers.amazon.aws.hooks.athena.AthenaHook`.", + DeprecationWarning, + stacklevel=2, + ) + super().__init__(*args, **kwargs) diff --git a/airflow/providers/amazon/aws/operators/athena.py b/airflow/providers/amazon/aws/operators/athena.py index 18207d2682db8..b3411b5bc103f 100644 --- a/airflow/providers/amazon/aws/operators/athena.py +++ b/airflow/providers/amazon/aws/operators/athena.py @@ -152,9 +152,11 @@ class AWSAthenaOperator(AthenaOperator): Please use :class:`airflow.providers.amazon.aws.operators.athena.AthenaOperator`. """ - warnings.warn( - "This operator is deprecated. Please use " - "`airflow.providers.amazon.aws.operators.athena.AthenaOperator`.", - DeprecationWarning, - stacklevel=2, - ) + def __init__(self, *args, **kwargs): + warnings.warn( + "This operator is deprecated. Please use " + "`airflow.providers.amazon.aws.operators.athena.AthenaOperator`.", + DeprecationWarning, + stacklevel=2, + ) + super().__init__(*args, **kwargs) From a4c96d5076d86e5cfd5e359d663dd1f31b2d070b Mon Sep 17 00:00:00 2001 From: Dennis Ferruzzi Date: Tue, 14 Dec 2021 23:30:14 -0800 Subject: [PATCH 6/7] Revert changes to setup.cfg --- setup.cfg | 1 - 1 file changed, 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 4cb2e497b59c8..adba25a9d95f8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -45,7 +45,6 @@ license_files = licenses/LICENSE-moment-strftime.txt licenses/LICENSE-moment.txt licenses/LICENSE-normalize.txt - licenses/LICENSES-ui.txt # End of licences generated automatically licenses/LICENSES-ui.txt classifiers = From 40f84c42cea510e5c014e754ecd97e6886be7135 Mon Sep 17 00:00:00 2001 From: Dennis Ferruzzi Date: Thu, 16 Dec 2021 11:35:34 -0800 Subject: [PATCH 7/7] Fixed import sort --- airflow/providers/amazon/aws/operators/athena.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/athena.py b/airflow/providers/amazon/aws/operators/athena.py index b3411b5bc103f..dabf5e6c0dfb5 100644 --- a/airflow/providers/amazon/aws/operators/athena.py +++ b/airflow/providers/amazon/aws/operators/athena.py @@ -16,8 +16,8 @@ # specific language governing permissions and limitations # under the License. # -import warnings import sys +import warnings from typing import Any, Dict, Optional from uuid import uuid4