From 38d08fa8996edfaa0cae628e6611f94850c95968 Mon Sep 17 00:00:00 2001 From: kyungjunleeme Date: Wed, 2 Jul 2025 16:23:24 +0900 Subject: [PATCH 1/3] CHG: message modification --- .../airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py index 9b82e64876ea3..624fb433cdcd4 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py @@ -100,7 +100,7 @@ def __init__( def dry_run(self) -> None: if not AIRFLOW_V_3_0_PLUS: - raise NotImplementedError("Not implemented for Airflow 3.") + raise NotImplementedError("dry_run() is only supported in Airflow 3.0+.") super().dry_run() sftp_files: list[SftpFile] = self.get_sftp_files_map() for file in sftp_files: From 9c16c8a436c68691a7fb4499298f5f25ec01cb2a Mon Sep 17 00:00:00 2001 From: kyungjunleeme Date: Wed, 2 Jul 2025 16:29:15 +0900 Subject: [PATCH 2/3] ADD: dry_run test code --- .../azure/transfers/test_sftp_to_wasb.py | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/transfers/test_sftp_to_wasb.py b/providers/microsoft/azure/tests/unit/microsoft/azure/transfers/test_sftp_to_wasb.py index a389abf84673b..49d5fcbc68dbb 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/transfers/test_sftp_to_wasb.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/transfers/test_sftp_to_wasb.py @@ -24,6 +24,8 @@ from airflow.exceptions import AirflowException from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SftpFile, SFTPToWasbOperator +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS + TASK_ID = "test-gcs-to-sftp-operator" WASB_CONN_ID = "wasb_default" SFTP_CONN_ID = "ssh_default" @@ -56,6 +58,53 @@ def test_init(self): assert operator.blob_prefix == BLOB_PREFIX assert operator.create_container is False + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="dry_run only exists in Airflow 3.0+") + @mock.patch("airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook") + @mock.patch("airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook") + def test_dry_run_logs_and_skips_real_action(self, mock_sftp_hook, mock_wasb_hook, caplog): + mock_sftp_hook.return_value.get_tree_map.return_value = [ + ["main_dir/test_object3.json"], # files + [], # dirs + [], # links + ] + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=SOURCE_PATH_NO_WILDCARD, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + wasb_conn_id=WASB_CONN_ID, + blob_prefix="sponge-bob/", + move_object=True, + ) + + caplog.clear() + with caplog.at_level("INFO"): + operator.dry_run() + + assert "Process will upload file from (SFTP) main_dir/test_object3.json" in caplog.text + assert "as sponge-bob/test_object3.json" in caplog.text + assert "Executing delete of" in caplog.text + + assert not mock_wasb_hook.return_value.load_file.called + assert not mock_sftp_hook.return_value.delete_file.called + + @mock.patch("airflow.providers.microsoft.azure.transfers.sftp_to_wasb.AIRFLOW_V_3_0_PLUS", False) + def test_dry_run_raises_not_implemented(mock_version_tuple): + operator = SFTPToWasbOperator( + task_id="test-task", + sftp_source_path="main_dir/test_*.json", + sftp_conn_id="sftp_default", + container_name="test-container", + wasb_conn_id="wasb_default", + blob_prefix="sponge-bob/", + move_object=False, + ) + + with pytest.raises(NotImplementedError) as exc_info: + operator.dry_run() + + assert "dry_run() is only supported in Airflow 3.0+." in str(exc_info.value) + @mock.patch("airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook", autospec=True) def test_execute_more_than_one_wildcard_exception(self, mock_hook): operator = SFTPToWasbOperator( From 9ae346b08fcb203adb7c8a53da987019d33b0e45 Mon Sep 17 00:00:00 2001 From: kyungjunleeme Date: Thu, 3 Jul 2025 14:47:17 +0900 Subject: [PATCH 3/3] CHG: caplog -> patch.object style --- .../azure/transfers/test_sftp_to_wasb.py | 43 ++++++++++++------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/transfers/test_sftp_to_wasb.py b/providers/microsoft/azure/tests/unit/microsoft/azure/transfers/test_sftp_to_wasb.py index 49d5fcbc68dbb..cb3d85e3501c6 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/transfers/test_sftp_to_wasb.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/transfers/test_sftp_to_wasb.py @@ -59,14 +59,14 @@ def test_init(self): assert operator.create_container is False @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="dry_run only exists in Airflow 3.0+") - @mock.patch("airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook") - @mock.patch("airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook") - def test_dry_run_logs_and_skips_real_action(self, mock_sftp_hook, mock_wasb_hook, caplog): - mock_sftp_hook.return_value.get_tree_map.return_value = [ - ["main_dir/test_object3.json"], # files - [], # dirs - [], # links - ] + @pytest.mark.parametrize( + "move_object, expect_delete_log", + [ + (True, True), + (False, False), + ], + ) + def test_dry_run_logs_and_skips_real_action(self, move_object, expect_delete_log): operator = SFTPToWasbOperator( task_id=TASK_ID, sftp_source_path=SOURCE_PATH_NO_WILDCARD, @@ -74,19 +74,30 @@ def test_dry_run_logs_and_skips_real_action(self, mock_sftp_hook, mock_wasb_hook container_name=CONTAINER_NAME, wasb_conn_id=WASB_CONN_ID, blob_prefix="sponge-bob/", - move_object=True, + move_object=move_object, ) + with ( + mock.patch.object(operator.log, "info") as mock_info, + mock.patch("airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook"), + mock.patch("airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook") as mock_sftp_hook, + ): + mock_sftp_hook.return_value.get_tree_map.return_value = [ + ["main_dir/test_object3.json"], + [], + [], + ] - caplog.clear() - with caplog.at_level("INFO"): operator.dry_run() - assert "Process will upload file from (SFTP) main_dir/test_object3.json" in caplog.text - assert "as sponge-bob/test_object3.json" in caplog.text - assert "Executing delete of" in caplog.text + logged_messages = [call.args[0] for call in mock_info.call_args_list] + assert "Dry run" in logged_messages + assert "Process will upload file from (SFTP) %s to wasb://%s as %s" in logged_messages - assert not mock_wasb_hook.return_value.load_file.called - assert not mock_sftp_hook.return_value.delete_file.called + delete_log = "Executing delete of %s" + if expect_delete_log: + assert delete_log in logged_messages + else: + assert delete_log not in logged_messages @mock.patch("airflow.providers.microsoft.azure.transfers.sftp_to_wasb.AIRFLOW_V_3_0_PLUS", False) def test_dry_run_raises_not_implemented(mock_version_tuple):