From 7c4a5bf1a82c3c828556bfc45ca2bc1553767dd0 Mon Sep 17 00:00:00 2001 From: Boris Morel <2323800+borismo@users.noreply.github.com> Date: Fri, 2 Aug 2024 11:31:36 +0800 Subject: [PATCH 1/5] Ensure operator goes into deferrable mode --- airflow/providers/amazon/aws/operators/redshift_data.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/redshift_data.py b/airflow/providers/amazon/aws/operators/redshift_data.py index 54e3c2c7ae1ae..45fee2a919483 100644 --- a/airflow/providers/amazon/aws/operators/redshift_data.py +++ b/airflow/providers/amazon/aws/operators/redshift_data.py @@ -127,8 +127,8 @@ def execute(self, context: Context) -> GetStatementResultResponseTypeDef | str: # Set wait_for_completion to False so that it waits for the status in the deferred task. wait_for_completion = self.wait_for_completion - if self.deferrable and self.wait_for_completion: - self.wait_for_completion = False + if self.deferrable: + wait_for_completion = False self.statement_id = self.hook.execute_query( database=self.database, @@ -144,7 +144,7 @@ def execute(self, context: Context) -> GetStatementResultResponseTypeDef | str: poll_interval=self.poll_interval, ) - if self.deferrable: + if self.deferrable and self.wait_for_completion: is_finished = self.hook.check_query_is_finished(self.statement_id) if not is_finished: self.defer( From 360ee8f504e83582360591f975ab1d048f48151d Mon Sep 17 00:00:00 2001 From: Boris Morel <2323800+borismo@users.noreply.github.com> Date: Sat, 3 Aug 2024 16:44:24 +0800 Subject: [PATCH 2/5] Remove commented out code --- tests/providers/amazon/aws/operators/test_redshift_data.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/providers/amazon/aws/operators/test_redshift_data.py b/tests/providers/amazon/aws/operators/test_redshift_data.py index a02515441b0fb..5bebee4d9d9c1 100644 --- a/tests/providers/amazon/aws/operators/test_redshift_data.py +++ b/tests/providers/amazon/aws/operators/test_redshift_data.py @@ -51,7 +51,7 @@ def deferrable_operator(): secret_arn=secret_arn, statement_name=statement_name, parameters=parameters, - wait_for_completion=False, + wait_for_completion=True, poll_interval=poll_interval, deferrable=True, ) @@ -276,7 +276,6 @@ def test_execute_finished_before_defer(self, mock_exec_query, check_query_is_fin poll_interval=poll_interval, ) - # @mock.patch("airflow.providers.amazon.aws.operators.redshift_data.RedshiftDataOperator.defer") @mock.patch( "airflow.providers.amazon.aws.hooks.redshift_data.RedshiftDataHook.check_query_is_finished", return_value=False, From 73ff6b9fe4a21eff6f023d1110696176e312f4cc Mon Sep 17 00:00:00 2001 From: Boris Morel <2323800+borismo@users.noreply.github.com> Date: Sat, 3 Aug 2024 16:45:37 +0800 Subject: [PATCH 3/5] Test when not waiting for completion --- .../aws/operators/test_redshift_data.py | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/tests/providers/amazon/aws/operators/test_redshift_data.py b/tests/providers/amazon/aws/operators/test_redshift_data.py index 5bebee4d9d9c1..fa021395a419d 100644 --- a/tests/providers/amazon/aws/operators/test_redshift_data.py +++ b/tests/providers/amazon/aws/operators/test_redshift_data.py @@ -314,3 +314,38 @@ def test_execute_complete(self, deferrable_operator): == "uuid" ) mock_log_info.assert_called_with("%s completed successfully.", TASK_ID) + + @mock.patch("airflow.providers.amazon.aws.operators.redshift_data.RedshiftDataOperator.defer") + @mock.patch("airflow.providers.amazon.aws.hooks.redshift_data.RedshiftDataHook.check_query_is_finished") + @mock.patch("airflow.providers.amazon.aws.hooks.redshift_data.RedshiftDataHook.execute_query") + def test_no_wait_for_completion(self, mock_exec_query, mock_check_query_is_finished, mock_defer): + """Tests that the operator does not check for completion nor defers when wait_for_completion is False, + no matter the value of deferrable""" + cluster_identifier = "cluster_identifier" + db_user = "db_user" + secret_arn = "secret_arn" + statement_name = "statement_name" + parameters = [{"name": "id", "value": "1"}] + poll_interval = 5 + + wait_for_completion = False + + for deferrable in [True, False]: + operator = RedshiftDataOperator( + aws_conn_id=CONN_ID, + task_id=TASK_ID, + sql=SQL, + database=DATABASE, + cluster_identifier=cluster_identifier, + db_user=db_user, + secret_arn=secret_arn, + statement_name=statement_name, + parameters=parameters, + wait_for_completion=wait_for_completion, + poll_interval=poll_interval, + deferrable=deferrable, + ) + operator.execute(None) + + assert not mock_check_query_is_finished.called + assert not mock_defer.called From f78d6903a1f3a1ab4fdecd504014d5120b3c5092 Mon Sep 17 00:00:00 2001 From: Boris Morel <2323800+borismo@users.noreply.github.com> Date: Wed, 7 Aug 2024 10:18:38 +0800 Subject: [PATCH 4/5] Add entry to changelog --- airflow/providers/amazon/CHANGELOG.rst | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/airflow/providers/amazon/CHANGELOG.rst b/airflow/providers/amazon/CHANGELOG.rst index 09c6870948ad5..8d4d594158a3e 100644 --- a/airflow/providers/amazon/CHANGELOG.rst +++ b/airflow/providers/amazon/CHANGELOG.rst @@ -26,6 +26,19 @@ Changelog --------- +Main +...... + +.. warning:: A bug in ``RedshiftDataOperator`` prevented tasks configured with ``deferrable=True`` and + ``wait_for_completion=True`` from entering the deferred state. Instead, the task would occupy an executor slot + until the statement was completed. A workaround may have been to set ``wait_for_completion=False``. + In this version, tasks set up with ``deferrable=True`` and ``wait_for_completion=False`` will not wait anymore. + +Bug Fixes +~~~~~~~~~ + +* ``Fix deferred mode for 'RedshiftDataOperator' (#41206)`` + 8.27.0 ...... From 862163f9840258afc4ef759abd1502db434706f2 Mon Sep 17 00:00:00 2001 From: Boris Morel <2323800+borismo@users.noreply.github.com> Date: Wed, 7 Aug 2024 14:39:30 +0800 Subject: [PATCH 5/5] Rephrase warning --- airflow/providers/amazon/CHANGELOG.rst | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/airflow/providers/amazon/CHANGELOG.rst b/airflow/providers/amazon/CHANGELOG.rst index 8d4d594158a3e..5de873f220f7a 100644 --- a/airflow/providers/amazon/CHANGELOG.rst +++ b/airflow/providers/amazon/CHANGELOG.rst @@ -29,10 +29,11 @@ Changelog Main ...... -.. warning:: A bug in ``RedshiftDataOperator`` prevented tasks configured with ``deferrable=True`` and - ``wait_for_completion=True`` from entering the deferred state. Instead, the task would occupy an executor slot - until the statement was completed. A workaround may have been to set ``wait_for_completion=False``. - In this version, tasks set up with ``deferrable=True`` and ``wait_for_completion=False`` will not wait anymore. +.. warning:: When deferrable mode was introduced for ``RedshiftDataOperator``, in version 8.17.0, tasks configured with + ``deferrable=True`` and ``wait_for_completion=True`` wouldn't enter the deferred state. Instead, the task would occupy + an executor slot until the statement was completed. A workaround may have been to set ``wait_for_completion=False``. + In this version, tasks set up with ``wait_for_completion=False`` will not wait anymore, regardless of the value of + ``deferrable``. Bug Fixes ~~~~~~~~~