From 3f7463de3662f180f8c8a3c5de658ca5da3d4fce Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Tue, 1 Apr 2025 18:32:29 -0700 Subject: [PATCH 1/4] Clarify the Redshift delete cluster operator messaging. --- .../amazon/aws/operators/redshift_cluster.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py index 7f2f686cb3030..37a2a7d3d7bc9 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py @@ -755,11 +755,18 @@ def execute(self, context: Context): final_cluster_snapshot_identifier=self.final_cluster_snapshot_identifier, ) break - except self.redshift_hook.get_conn().exceptions.InvalidClusterStateFault: + except self.redshift_hook.conn.exceptions.InvalidClusterStateFault: self._attempts -= 1 if self._attempts: - self.log.error("Unable to delete cluster. %d attempts remaining.", self._attempts) + current_state = self.redshift_hook.conn.describe_clusters( + ClusterIdentifier=self.cluster_identifier + )["Clusters"][0]["ClusterStatus"] + self.log.error( + "Cluster in %s state, unable to delete. %d attempts remaining.", + current_state, + self._attempts, + ) time.sleep(self._attempt_interval) else: raise @@ -785,7 +792,7 @@ def execute(self, context: Context): ) elif self.wait_for_completion: - waiter = self.redshift_hook.get_conn().get_waiter("cluster_deleted") + waiter = self.redshift_hook.conn.get_waiter("cluster_deleted") waiter.wait( ClusterIdentifier=self.cluster_identifier, WaiterConfig={"Delay": self.poll_interval, "MaxAttempts": self.max_attempts}, From dda2fb02fa47f8876e79debc1c73a10253bd25fc Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Wed, 2 Apr 2025 10:43:29 -0700 Subject: [PATCH 2/4] Fix tests for using conn instead of get_conn --- .../amazon/aws/hooks/redshift_cluster.py | 20 ++--- .../amazon/aws/hooks/test_redshift_cluster.py | 2 +- .../aws/operators/test_redshift_cluster.py | 84 +++++++++---------- 3 files changed, 53 insertions(+), 53 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_cluster.py b/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_cluster.py index a2dc837ea2fa8..d9a7feefa2d94 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_cluster.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_cluster.py @@ -67,7 +67,7 @@ def create_cluster( for the cluster that is being created. :param params: Remaining AWS Create cluster API params. """ - response = self.get_conn().create_cluster( + response = self.conn.create_cluster( ClusterIdentifier=cluster_identifier, NodeType=node_type, MasterUsername=master_username, @@ -87,13 +87,13 @@ def cluster_status(self, cluster_identifier: str) -> str: :param cluster_identifier: unique identifier of a cluster """ try: - response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)["Clusters"] + response = self.conn.describe_clusters(ClusterIdentifier=cluster_identifier)["Clusters"] return response[0]["ClusterStatus"] if response else None - except self.get_conn().exceptions.ClusterNotFoundFault: + except self.conn.exceptions.ClusterNotFoundFault: return "cluster_not_found" async def cluster_status_async(self, cluster_identifier: str) -> str: - async with await self.get_async_conn() as client: + async with await self.get_async_conn as client: response = await client.describe_clusters(ClusterIdentifier=cluster_identifier) return response["Clusters"][0]["ClusterStatus"] if response else None @@ -115,7 +115,7 @@ def delete_cluster( """ final_cluster_snapshot_identifier = final_cluster_snapshot_identifier or "" - response = self.get_conn().delete_cluster( + response = self.conn.delete_cluster( ClusterIdentifier=cluster_identifier, SkipFinalClusterSnapshot=skip_final_cluster_snapshot, FinalClusterSnapshotIdentifier=final_cluster_snapshot_identifier, @@ -131,7 +131,7 @@ def describe_cluster_snapshots(self, cluster_identifier: str) -> list[str] | Non :param cluster_identifier: unique identifier of a cluster """ - response = self.get_conn().describe_cluster_snapshots(ClusterIdentifier=cluster_identifier) + response = self.conn.describe_cluster_snapshots(ClusterIdentifier=cluster_identifier) if "Snapshots" not in response: return None snapshots = response["Snapshots"] @@ -149,7 +149,7 @@ def restore_from_cluster_snapshot(self, cluster_identifier: str, snapshot_identi :param cluster_identifier: unique identifier of a cluster :param snapshot_identifier: unique identifier for a snapshot of a cluster """ - response = self.get_conn().restore_from_cluster_snapshot( + response = self.conn.restore_from_cluster_snapshot( ClusterIdentifier=cluster_identifier, SnapshotIdentifier=snapshot_identifier ) return response["Cluster"] if response["Cluster"] else None @@ -175,7 +175,7 @@ def create_cluster_snapshot( """ if tags is None: tags = [] - response = self.get_conn().create_cluster_snapshot( + response = self.conn.create_cluster_snapshot( SnapshotIdentifier=snapshot_identifier, ClusterIdentifier=cluster_identifier, ManualSnapshotRetentionPeriod=retention_period, @@ -192,11 +192,11 @@ def get_cluster_snapshot_status(self, snapshot_identifier: str): :param snapshot_identifier: A unique identifier for the snapshot that you are requesting """ try: - response = self.get_conn().describe_cluster_snapshots( + response = self.conn.describe_cluster_snapshots( SnapshotIdentifier=snapshot_identifier, ) snapshot = response.get("Snapshots")[0] snapshot_status: str = snapshot.get("Status") return snapshot_status - except self.get_conn().exceptions.ClusterSnapshotNotFoundFault: + except self.conn.exceptions.ClusterSnapshotNotFoundFault: return None diff --git a/providers/amazon/tests/unit/amazon/aws/hooks/test_redshift_cluster.py b/providers/amazon/tests/unit/amazon/aws/hooks/test_redshift_cluster.py index 6ebaab56c9600..847ff91959484 100644 --- a/providers/amazon/tests/unit/amazon/aws/hooks/test_redshift_cluster.py +++ b/providers/amazon/tests/unit/amazon/aws/hooks/test_redshift_cluster.py @@ -46,7 +46,7 @@ def _create_clusters(): def test_get_client_type_returns_a_boto3_client_of_the_requested_type(self): self._create_clusters() hook = AwsBaseHook(aws_conn_id="aws_default", client_type="redshift") - client_from_hook = hook.get_conn() + client_from_hook = hook.conn clusters = client_from_hook.describe_clusters()["Clusters"] assert len(clusters) == 2 diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py b/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py index 36ccf31217a9e..9142f66f89eb2 100644 --- a/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py +++ b/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py @@ -57,8 +57,8 @@ def test_init(self): assert redshift_operator.master_username == "adminuser" assert redshift_operator.master_user_password == "Test123$" - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn") - def test_create_single_node_cluster(self, mock_get_conn): + @mock.patch.object(RedshiftHook, "conn") + def test_create_single_node_cluster(self, mock_conn): redshift_operator = RedshiftCreateClusterOperator( task_id="task_test", cluster_identifier="test-cluster", @@ -78,7 +78,7 @@ def test_create_single_node_cluster(self, mock_get_conn): "PubliclyAccessible": True, "Port": 5439, } - mock_get_conn.return_value.create_cluster.assert_called_once_with( + mock_conn.create_cluster.assert_called_once_with( ClusterIdentifier="test-cluster", NodeType="dc2.large", MasterUsername="adminuser", @@ -87,12 +87,12 @@ def test_create_single_node_cluster(self, mock_get_conn): ) # wait_for_completion is True so check waiter is called - mock_get_conn.return_value.get_waiter.return_value.wait.assert_called_once_with( + mock_conn.get_waiter.return_value.wait.assert_called_once_with( ClusterIdentifier="test-cluster", WaiterConfig={"Delay": 60, "MaxAttempts": 5} ) - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn") - def test_create_multi_node_cluster(self, mock_get_conn): + @mock.patch.object(RedshiftHook, "conn") + def test_create_multi_node_cluster(self, mock_conn): redshift_operator = RedshiftCreateClusterOperator( task_id="task_test", cluster_identifier="test-cluster", @@ -113,7 +113,7 @@ def test_create_multi_node_cluster(self, mock_get_conn): "PubliclyAccessible": True, "Port": 5439, } - mock_get_conn.return_value.create_cluster.assert_called_once_with( + mock_conn.create_cluster.assert_called_once_with( ClusterIdentifier="test-cluster", NodeType="dc2.large", MasterUsername="adminuser", @@ -122,10 +122,10 @@ def test_create_multi_node_cluster(self, mock_get_conn): ) # wait_for_completion is False so check waiter is not called - mock_get_conn.return_value.get_waiter.assert_not_called() + mock_conn.get_waiter.assert_not_called() - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn") - def test_create_cluster_deferrable(self, mock_get_conn): + @mock.patch.object(RedshiftHook, "conn") + def test_create_cluster_deferrable(self, mock_conn): redshift_operator = RedshiftCreateClusterOperator( task_id="task_test", cluster_identifier="test-cluster", @@ -242,8 +242,8 @@ class TestRedshiftDeleteClusterSnapshotOperator: @mock.patch( "airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_cluster_snapshot_status" ) - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn") - def test_delete_cluster_snapshot_wait(self, mock_get_conn, mock_get_cluster_snapshot_status): + @mock.patch.object(RedshiftHook, "conn") + def test_delete_cluster_snapshot_wait(self, mock_conn, mock_get_cluster_snapshot_status): mock_get_cluster_snapshot_status.return_value = None delete_snapshot = RedshiftDeleteClusterSnapshotOperator( task_id="test_snapshot", @@ -251,7 +251,7 @@ def test_delete_cluster_snapshot_wait(self, mock_get_conn, mock_get_cluster_snap snapshot_identifier="test_snapshot", ) delete_snapshot.execute(None) - mock_get_conn.return_value.delete_cluster_snapshot.assert_called_once_with( + mock_conn.return_value.delete_cluster_snapshot.assert_called_once_with( SnapshotClusterIdentifier="test_cluster", SnapshotIdentifier="test_snapshot", ) @@ -263,8 +263,8 @@ def test_delete_cluster_snapshot_wait(self, mock_get_conn, mock_get_cluster_snap @mock.patch( "airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_cluster_snapshot_status" ) - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn") - def test_delete_cluster_snapshot(self, mock_get_conn, mock_get_cluster_snapshot_status): + @mock.patch.object(RedshiftHook, "conn") + def test_delete_cluster_snapshot(self, mock_conn, mock_get_cluster_snapshot_status): delete_snapshot = RedshiftDeleteClusterSnapshotOperator( task_id="test_snapshot", cluster_identifier="test_cluster", @@ -272,7 +272,7 @@ def test_delete_cluster_snapshot(self, mock_get_conn, mock_get_cluster_snapshot_ wait_for_completion=False, ) delete_snapshot.execute(None) - mock_get_conn.return_value.delete_cluster_snapshot.assert_called_once_with( + mock_conn.return_value.delete_cluster_snapshot.assert_called_once_with( SnapshotClusterIdentifier="test_cluster", SnapshotIdentifier="test_snapshot", ) @@ -298,13 +298,13 @@ def test_init(self): assert redshift_operator.cluster_identifier == "test_cluster" assert redshift_operator.aws_conn_id == "aws_conn_test" - @mock.patch.object(RedshiftHook, "get_conn") - def test_resume_cluster_is_called_when_cluster_is_paused(self, mock_get_conn): + @mock.patch.object(RedshiftHook, "conn") + def test_resume_cluster_is_called_when_cluster_is_paused(self, mock_conn): redshift_operator = RedshiftResumeClusterOperator( task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" ) redshift_operator.execute(None) - mock_get_conn.return_value.resume_cluster.assert_called_once_with(ClusterIdentifier="test_cluster") + mock_conn.return_value.resume_cluster.assert_called_once_with(ClusterIdentifier="test_cluster") @mock.patch.object(RedshiftHook, "conn") @mock.patch("time.sleep", return_value=None) @@ -436,15 +436,15 @@ def test_init(self): assert redshift_operator.cluster_identifier == "test_cluster" assert redshift_operator.aws_conn_id == "aws_conn_test" - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn") - def test_pause_cluster_is_called_when_cluster_is_available(self, mock_get_conn): + @mock.patch.object(RedshiftHook, "conn") + def test_pause_cluster_is_called_when_cluster_is_available(self, mock_conn): redshift_operator = RedshiftPauseClusterOperator( task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" ) redshift_operator.execute(None) - mock_get_conn.return_value.pause_cluster.assert_called_once_with(ClusterIdentifier="test_cluster") + mock_conn.return_value.pause_cluster.assert_called_once_with(ClusterIdentifier="test_cluster") - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.conn") + @mock.patch.object(RedshiftHook, "conn") @mock.patch("time.sleep", return_value=None) def test_pause_cluster_multiple_attempts(self, mock_sleep, mock_conn): exception = boto3.client("redshift").exceptions.InvalidClusterStateFault({}, "test") @@ -462,7 +462,7 @@ def test_pause_cluster_multiple_attempts(self, mock_sleep, mock_conn): redshift_operator.execute(None) assert mock_conn.pause_cluster.call_count == 3 - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.conn") + @mock.patch.object(RedshiftHook, "conn") @mock.patch("time.sleep", return_value=None) def test_pause_cluster_multiple_attempts_fail(self, mock_sleep, mock_conn): exception = boto3.client("redshift").exceptions.InvalidClusterStateFault({}, "test") @@ -481,10 +481,10 @@ def test_pause_cluster_multiple_attempts_fail(self, mock_sleep, mock_conn): assert mock_conn.pause_cluster.call_count == 10 @mock.patch.object(RedshiftHook, "get_waiter") - @mock.patch.object(RedshiftHook, "get_conn") - def test_pause_cluster_wait_for_completion(self, mock_get_conn, mock_get_waiter): + @mock.patch.object(RedshiftHook, "conn") + def test_pause_cluster_wait_for_completion(self, mock_conn, mock_get_waiter): """Test Pause cluster operator with defer when deferrable param is true""" - mock_get_conn.return_value.pause_cluster.return_value = True + mock_conn.return_value.pause_cluster.return_value = True waiter = Mock() mock_get_waiter.return_value = waiter @@ -497,10 +497,10 @@ def test_pause_cluster_wait_for_completion(self, mock_get_conn, mock_get_waiter) waiter.wait.assert_called_once() @mock.patch.object(RedshiftHook, "cluster_status") - @mock.patch.object(RedshiftHook, "get_conn") - def test_pause_cluster_deferrable_mode(self, mock_get_conn, mock_cluster_status): + @mock.patch.object(RedshiftHook, "conn") + def test_pause_cluster_deferrable_mode(self, mock_conn, mock_cluster_status): """Test Pause cluster operator with defer when deferrable param is true""" - mock_get_conn.return_value.pause_cluster.return_value = True + mock_conn.return_value.pause_cluster.return_value = True mock_cluster_status.return_value = "available" redshift_operator = RedshiftPauseClusterOperator( @@ -516,12 +516,12 @@ def test_pause_cluster_deferrable_mode(self, mock_get_conn, mock_cluster_status) @mock.patch("airflow.providers.amazon.aws.operators.redshift_cluster.RedshiftPauseClusterOperator.defer") @mock.patch.object(RedshiftHook, "cluster_status") - @mock.patch.object(RedshiftHook, "get_conn") + @mock.patch.object(RedshiftHook, "conn") def test_pause_cluster_deferrable_mode_in_deleting_status( - self, mock_get_conn, mock_cluster_status, mock_defer + self, mock_conn, mock_cluster_status, mock_defer ): """Test Pause cluster operator with defer when deferrable param is true""" - mock_get_conn.return_value.pause_cluster.return_value = True + mock_conn.return_value.pause_cluster.return_value = True mock_cluster_status.return_value = "deleting" redshift_operator = RedshiftPauseClusterOperator( @@ -561,21 +561,21 @@ def test_template_fields(self): class TestDeleteClusterOperator: @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status") - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn") - def test_delete_cluster_with_wait_for_completion(self, mock_get_conn, mock_cluster_status): + @mock.patch.object(RedshiftHook, "conn") + def test_delete_cluster_with_wait_for_completion(self, mock_conn, mock_cluster_status): mock_cluster_status.return_value = "cluster_not_found" redshift_operator = RedshiftDeleteClusterOperator( task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" ) redshift_operator.execute(None) - mock_get_conn.return_value.delete_cluster.assert_called_once_with( + mock_conn.return_value.delete_cluster.assert_called_once_with( ClusterIdentifier="test_cluster", SkipFinalClusterSnapshot=True, FinalClusterSnapshotIdentifier="", ) - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn") - def test_delete_cluster_without_wait_for_completion(self, mock_get_conn): + @mock.patch.object(RedshiftHook, "conn") + def test_delete_cluster_without_wait_for_completion(self, mock_conn): redshift_operator = RedshiftDeleteClusterOperator( task_id="task_test", cluster_identifier="test_cluster", @@ -583,16 +583,16 @@ def test_delete_cluster_without_wait_for_completion(self, mock_get_conn): wait_for_completion=False, ) redshift_operator.execute(None) - mock_get_conn.return_value.delete_cluster.assert_called_once_with( + mock_conn.return_value.delete_cluster.assert_called_once_with( ClusterIdentifier="test_cluster", SkipFinalClusterSnapshot=True, FinalClusterSnapshotIdentifier="", ) - mock_get_conn.return_value.cluster_status.assert_not_called() + mock_conn.return_value.cluster_status.assert_not_called() @mock.patch.object(RedshiftHook, "delete_cluster") - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.conn") + @mock.patch.object(RedshiftHook, "conn") @mock.patch("time.sleep", return_value=None) def test_delete_cluster_multiple_attempts(self, _, mock_conn, mock_delete_cluster): exception = boto3.client("redshift").exceptions.InvalidClusterStateFault({}, "test") @@ -611,7 +611,7 @@ def test_delete_cluster_multiple_attempts(self, _, mock_conn, mock_delete_cluste assert mock_delete_cluster.call_count == 3 @mock.patch.object(RedshiftHook, "delete_cluster") - @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.conn") + @mock.patch.object(RedshiftHook, "conn") @mock.patch("time.sleep", return_value=None) def test_delete_cluster_multiple_attempts_fail(self, _, mock_conn, mock_delete_cluster): exception = boto3.client("redshift").exceptions.InvalidClusterStateFault({}, "test") From 7a314df9e121ca36712e69533cbf4e5ceb8cffa6 Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Wed, 2 Apr 2025 11:49:16 -0700 Subject: [PATCH 3/4] Fix operator tests for using conn instead of get_conn --- .../aws/operators/test_redshift_cluster.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py b/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py index 9142f66f89eb2..a4e6caa0ac303 100644 --- a/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py +++ b/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py @@ -251,7 +251,7 @@ def test_delete_cluster_snapshot_wait(self, mock_conn, mock_get_cluster_snapshot snapshot_identifier="test_snapshot", ) delete_snapshot.execute(None) - mock_conn.return_value.delete_cluster_snapshot.assert_called_once_with( + mock_conn.delete_cluster_snapshot.assert_called_once_with( SnapshotClusterIdentifier="test_cluster", SnapshotIdentifier="test_snapshot", ) @@ -272,7 +272,7 @@ def test_delete_cluster_snapshot(self, mock_conn, mock_get_cluster_snapshot_stat wait_for_completion=False, ) delete_snapshot.execute(None) - mock_conn.return_value.delete_cluster_snapshot.assert_called_once_with( + mock_conn.delete_cluster_snapshot.assert_called_once_with( SnapshotClusterIdentifier="test_cluster", SnapshotIdentifier="test_snapshot", ) @@ -304,7 +304,7 @@ def test_resume_cluster_is_called_when_cluster_is_paused(self, mock_conn): task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" ) redshift_operator.execute(None) - mock_conn.return_value.resume_cluster.assert_called_once_with(ClusterIdentifier="test_cluster") + mock_conn.resume_cluster.assert_called_once_with(ClusterIdentifier="test_cluster") @mock.patch.object(RedshiftHook, "conn") @mock.patch("time.sleep", return_value=None) @@ -442,7 +442,7 @@ def test_pause_cluster_is_called_when_cluster_is_available(self, mock_conn): task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" ) redshift_operator.execute(None) - mock_conn.return_value.pause_cluster.assert_called_once_with(ClusterIdentifier="test_cluster") + mock_conn.pause_cluster.assert_called_once_with(ClusterIdentifier="test_cluster") @mock.patch.object(RedshiftHook, "conn") @mock.patch("time.sleep", return_value=None) @@ -484,7 +484,7 @@ def test_pause_cluster_multiple_attempts_fail(self, mock_sleep, mock_conn): @mock.patch.object(RedshiftHook, "conn") def test_pause_cluster_wait_for_completion(self, mock_conn, mock_get_waiter): """Test Pause cluster operator with defer when deferrable param is true""" - mock_conn.return_value.pause_cluster.return_value = True + mock_conn.pause_cluster.return_value = True waiter = Mock() mock_get_waiter.return_value = waiter @@ -500,7 +500,7 @@ def test_pause_cluster_wait_for_completion(self, mock_conn, mock_get_waiter): @mock.patch.object(RedshiftHook, "conn") def test_pause_cluster_deferrable_mode(self, mock_conn, mock_cluster_status): """Test Pause cluster operator with defer when deferrable param is true""" - mock_conn.return_value.pause_cluster.return_value = True + mock_conn.pause_cluster.return_value = True mock_cluster_status.return_value = "available" redshift_operator = RedshiftPauseClusterOperator( @@ -521,7 +521,7 @@ def test_pause_cluster_deferrable_mode_in_deleting_status( self, mock_conn, mock_cluster_status, mock_defer ): """Test Pause cluster operator with defer when deferrable param is true""" - mock_conn.return_value.pause_cluster.return_value = True + mock_conn.pause_cluster.return_value = True mock_cluster_status.return_value = "deleting" redshift_operator = RedshiftPauseClusterOperator( @@ -568,7 +568,7 @@ def test_delete_cluster_with_wait_for_completion(self, mock_conn, mock_cluster_s task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" ) redshift_operator.execute(None) - mock_conn.return_value.delete_cluster.assert_called_once_with( + mock_conn.delete_cluster.assert_called_once_with( ClusterIdentifier="test_cluster", SkipFinalClusterSnapshot=True, FinalClusterSnapshotIdentifier="", @@ -583,13 +583,13 @@ def test_delete_cluster_without_wait_for_completion(self, mock_conn): wait_for_completion=False, ) redshift_operator.execute(None) - mock_conn.return_value.delete_cluster.assert_called_once_with( + mock_conn.delete_cluster.assert_called_once_with( ClusterIdentifier="test_cluster", SkipFinalClusterSnapshot=True, FinalClusterSnapshotIdentifier="", ) - mock_conn.return_value.cluster_status.assert_not_called() + mock_conn.cluster_status.assert_not_called() @mock.patch.object(RedshiftHook, "delete_cluster") @mock.patch.object(RedshiftHook, "conn") From b0f2bec16dcb987bd8e3e335bf306ae6b16f1b93 Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Wed, 2 Apr 2025 12:08:58 -0700 Subject: [PATCH 4/4] Fix overzealous find-and-replace --- .../src/airflow/providers/amazon/aws/hooks/redshift_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_cluster.py b/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_cluster.py index d9a7feefa2d94..eff7af4caf519 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_cluster.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_cluster.py @@ -93,7 +93,7 @@ def cluster_status(self, cluster_identifier: str) -> str: return "cluster_not_found" async def cluster_status_async(self, cluster_identifier: str) -> str: - async with await self.get_async_conn as client: + async with await self.get_async_conn() as client: response = await client.describe_clusters(ClusterIdentifier=cluster_identifier) return response["Clusters"][0]["ClusterStatus"] if response else None