From cb035208274b7330fda7812df34092ad53d8a724 Mon Sep 17 00:00:00 2001 From: 2h-kim Date: Fri, 2 Dec 2022 09:02:26 +0000 Subject: [PATCH 01/10] Fix EmrAddStepsOperature wait_for_completion parameter is not working --- airflow/providers/amazon/aws/operators/emr.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py index 63659e8188b9d..83c95a8c9e29f 100644 --- a/airflow/providers/amazon/aws/operators/emr.py +++ b/airflow/providers/amazon/aws/operators/emr.py @@ -112,7 +112,7 @@ def execute(self, context: Context) -> list[str]: if isinstance(steps, str): steps = ast.literal_eval(steps) - return emr_hook.add_job_flow_steps(job_flow_id=job_flow_id, steps=steps, wait_for_completion=True) + return emr_hook.add_job_flow_steps(job_flow_id=job_flow_id, steps=steps, wait_for_completion=self.wait_for_completion) class EmrEksCreateClusterOperator(BaseOperator): From 4452aedbc2eb35ef9e7d8c81a05c0647f2676ea4 Mon Sep 17 00:00:00 2001 From: 2h-kim Date: Fri, 2 Dec 2022 09:21:59 +0000 Subject: [PATCH 02/10] Feature modified : code style changed --- airflow/providers/amazon/aws/operators/emr.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py index 83c95a8c9e29f..a902b41575521 100644 --- a/airflow/providers/amazon/aws/operators/emr.py +++ b/airflow/providers/amazon/aws/operators/emr.py @@ -109,10 +109,11 @@ def execute(self, context: Context) -> list[str]: # steps may arrive as a string representing a list # e.g. if we used XCom or a file then: steps="[{ step1 }, { step2 }]" steps = self.steps + wait_for_completion = self.wait_for_completion if isinstance(steps, str): steps = ast.literal_eval(steps) - return emr_hook.add_job_flow_steps(job_flow_id=job_flow_id, steps=steps, wait_for_completion=self.wait_for_completion) + return emr_hook.add_job_flow_steps(job_flow_id=job_flow_id, steps=steps, wait_for_completion=wait_for_completion) class EmrEksCreateClusterOperator(BaseOperator): From 4ee7f6d64efc1375bc4d956410bcbdfec6ad1083 Mon Sep 17 00:00:00 2001 From: 2h-kim Date: Sun, 4 Dec 2022 16:16:00 +0900 Subject: [PATCH 03/10] Feature add: unittest for wait_for_completion --- .../aws/operators/test_emr_add_steps.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/providers/amazon/aws/operators/test_emr_add_steps.py b/tests/providers/amazon/aws/operators/test_emr_add_steps.py index 657d27aebc630..8b7b21a7ea10e 100644 --- a/tests/providers/amazon/aws/operators/test_emr_add_steps.py +++ b/tests/providers/amazon/aws/operators/test_emr_add_steps.py @@ -19,6 +19,7 @@ import json import os +import time import unittest from datetime import timedelta from unittest.mock import MagicMock, call, patch @@ -192,3 +193,22 @@ def test_init_with_nonexistent_cluster_name(self): with pytest.raises(AirflowException) as ctx: operator.execute(self.mock_context) assert str(ctx.value) == f"No cluster found for name: {cluster_name}" + + def test_wait_for_completion(self): + def check_wait_for_completion(**kwargs): + return kwargs.get('wait_for_completion') + + wait_for_completion = False + with patch( + "airflow.providers.amazon.aws.hooks.emr.EmrHook.add_job_flow_steps" + ) as mock_add_job_flow_steps: + mock_add_job_flow_steps.side_effect = check_wait_for_completion + operator = EmrAddStepsOperator( + task_id="test_check_wait_for_completion_task", + job_flow_id="j-8989898989", + aws_conn_id="aws_default", + dag=DAG("test_dag_id", default_args=self.args), + wait_for_completion=wait_for_completion + ) + + assert operator.execute(self.mock_context) == wait_for_completion From c126f047f50a0fd4dbef896c55087ae654e41135 Mon Sep 17 00:00:00 2001 From: 2h-kim Date: Sun, 4 Dec 2022 16:24:43 +0900 Subject: [PATCH 04/10] Feature modified : remove time library it isn't used --- tests/providers/amazon/aws/operators/test_emr_add_steps.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/providers/amazon/aws/operators/test_emr_add_steps.py b/tests/providers/amazon/aws/operators/test_emr_add_steps.py index 8b7b21a7ea10e..a8872cd7e3384 100644 --- a/tests/providers/amazon/aws/operators/test_emr_add_steps.py +++ b/tests/providers/amazon/aws/operators/test_emr_add_steps.py @@ -19,7 +19,6 @@ import json import os -import time import unittest from datetime import timedelta from unittest.mock import MagicMock, call, patch From 2fb60d0f2094f6fa0a1539262ea30942cb35f52d Mon Sep 17 00:00:00 2001 From: 2h-kim Date: Mon, 5 Dec 2022 11:25:51 +0900 Subject: [PATCH 05/10] revised --- tests/providers/amazon/aws/operators/test_emr_add_steps.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/providers/amazon/aws/operators/test_emr_add_steps.py b/tests/providers/amazon/aws/operators/test_emr_add_steps.py index a8872cd7e3384..f95fe42f74154 100644 --- a/tests/providers/amazon/aws/operators/test_emr_add_steps.py +++ b/tests/providers/amazon/aws/operators/test_emr_add_steps.py @@ -195,7 +195,7 @@ def test_init_with_nonexistent_cluster_name(self): def test_wait_for_completion(self): def check_wait_for_completion(**kwargs): - return kwargs.get('wait_for_completion') + return kwargs.get("wait_for_completion") wait_for_completion = False with patch( @@ -203,11 +203,11 @@ def check_wait_for_completion(**kwargs): ) as mock_add_job_flow_steps: mock_add_job_flow_steps.side_effect = check_wait_for_completion operator = EmrAddStepsOperator( - task_id="test_check_wait_for_completion_task", + task_id="test_task", job_flow_id="j-8989898989", aws_conn_id="aws_default", dag=DAG("test_dag_id", default_args=self.args), - wait_for_completion=wait_for_completion + wait_for_completion=wait_for_completion, ) assert operator.execute(self.mock_context) == wait_for_completion From b0f4c6f2fe2194ff95fee427cb2f59a1421e4be7 Mon Sep 17 00:00:00 2001 From: 2h-kim Date: Mon, 5 Dec 2022 12:28:09 +0900 Subject: [PATCH 06/10] Feature modified: change flake8 format --- airflow/providers/amazon/aws/operators/emr.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py index a902b41575521..3c0e90bc1730f 100644 --- a/airflow/providers/amazon/aws/operators/emr.py +++ b/airflow/providers/amazon/aws/operators/emr.py @@ -113,7 +113,9 @@ def execute(self, context: Context) -> list[str]: if isinstance(steps, str): steps = ast.literal_eval(steps) - return emr_hook.add_job_flow_steps(job_flow_id=job_flow_id, steps=steps, wait_for_completion=wait_for_completion) + return emr_hook.add_job_flow_steps( + job_flow_id=job_flow_id, steps=steps, wait_for_completion=wait_for_completion + ) class EmrEksCreateClusterOperator(BaseOperator): From c689159e47466a81a5ba4a6e3cdae9f3bba330e7 Mon Sep 17 00:00:00 2001 From: 2h-kim <110394535+2h-kim@users.noreply.github.com> Date: Wed, 7 Dec 2022 10:13:41 +0900 Subject: [PATCH 07/10] Update tests/providers/amazon/aws/operators/test_emr_add_steps.py Co-authored-by: Vincent <97131062+vincbeck@users.noreply.github.com> --- .../aws/operators/test_emr_add_steps.py | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/tests/providers/amazon/aws/operators/test_emr_add_steps.py b/tests/providers/amazon/aws/operators/test_emr_add_steps.py index 7ce7b7ee1f26a..3adb829b8bba9 100644 --- a/tests/providers/amazon/aws/operators/test_emr_add_steps.py +++ b/tests/providers/amazon/aws/operators/test_emr_add_steps.py @@ -208,21 +208,19 @@ def test_init_with_nonexistent_cluster_name(self): operator.execute(self.mock_context) assert str(ctx.value) == f"No cluster found for name: {cluster_name}" - def test_wait_for_completion(self): - def check_wait_for_completion(**kwargs): - return kwargs.get("wait_for_completion") - - wait_for_completion = False - with patch( - "airflow.providers.amazon.aws.hooks.emr.EmrHook.add_job_flow_steps" - ) as mock_add_job_flow_steps: - mock_add_job_flow_steps.side_effect = check_wait_for_completion - operator = EmrAddStepsOperator( - task_id="test_task", - job_flow_id="j-8989898989", - aws_conn_id="aws_default", - dag=DAG("test_dag_id", default_args=self.args), - wait_for_completion=wait_for_completion, - ) + @mock.patch.object(EmrHook, "add_job_flow_steps") + def test_wait_for_completion(self, mock_add_job_flow_steps): + job_flow_id = "j-8989898989" + operator = EmrAddStepsOperator( + task_id="test_task", + job_flow_id=job_flow_id, + aws_conn_id="aws_default", + dag=DAG("test_dag_id", default_args=self.args), + wait_for_completion=False, + ) - assert operator.execute(self.mock_context) == wait_for_completion + mock_add_job_flow_steps.assert_called_once_with( + job_flow_id=job_flow_id, + steps=[], + wait_for_completion=False, + ) From b211f95f5de3d06261a2e87b6f24d4df69a5c0ac Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Wed, 7 Dec 2022 13:48:46 +0800 Subject: [PATCH 08/10] Fix mock usage in test --- tests/providers/amazon/aws/operators/test_emr_add_steps.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/providers/amazon/aws/operators/test_emr_add_steps.py b/tests/providers/amazon/aws/operators/test_emr_add_steps.py index 3adb829b8bba9..c088c3bc2e951 100644 --- a/tests/providers/amazon/aws/operators/test_emr_add_steps.py +++ b/tests/providers/amazon/aws/operators/test_emr_add_steps.py @@ -208,7 +208,7 @@ def test_init_with_nonexistent_cluster_name(self): operator.execute(self.mock_context) assert str(ctx.value) == f"No cluster found for name: {cluster_name}" - @mock.patch.object(EmrHook, "add_job_flow_steps") + @patch("airflow.providers.amazon.aws.hooks.emr.EmrHook.add_job_flow_steps") def test_wait_for_completion(self, mock_add_job_flow_steps): job_flow_id = "j-8989898989" operator = EmrAddStepsOperator( @@ -218,6 +218,7 @@ def test_wait_for_completion(self, mock_add_job_flow_steps): dag=DAG("test_dag_id", default_args=self.args), wait_for_completion=False, ) + operator.execute(self.mock_context) mock_add_job_flow_steps.assert_called_once_with( job_flow_id=job_flow_id, From 700e82e8c7e252ae08ea892bc3704237a07c37d1 Mon Sep 17 00:00:00 2001 From: 2h-kim Date: Sun, 4 Dec 2022 16:24:43 +0900 Subject: [PATCH 09/10] Feature modified : remove time library it isn't used --- tests/providers/amazon/aws/operators/test_emr_add_steps.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/providers/amazon/aws/operators/test_emr_add_steps.py b/tests/providers/amazon/aws/operators/test_emr_add_steps.py index c088c3bc2e951..1983c3985e735 100644 --- a/tests/providers/amazon/aws/operators/test_emr_add_steps.py +++ b/tests/providers/amazon/aws/operators/test_emr_add_steps.py @@ -19,6 +19,10 @@ import json import os +<<<<<<< HEAD +======= +import unittest +>>>>>>> c126f047f (Feature modified : remove time library it isn't used) from datetime import timedelta from unittest.mock import MagicMock, call, patch From 72c7ca4c93c4fd01fefcdf25cb1964b9baa16662 Mon Sep 17 00:00:00 2001 From: 2h-kim Date: Fri, 9 Dec 2022 10:57:18 +0900 Subject: [PATCH 10/10] revised --- tests/providers/amazon/aws/operators/test_emr_add_steps.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/providers/amazon/aws/operators/test_emr_add_steps.py b/tests/providers/amazon/aws/operators/test_emr_add_steps.py index 1983c3985e735..c088c3bc2e951 100644 --- a/tests/providers/amazon/aws/operators/test_emr_add_steps.py +++ b/tests/providers/amazon/aws/operators/test_emr_add_steps.py @@ -19,10 +19,6 @@ import json import os -<<<<<<< HEAD -======= -import unittest ->>>>>>> c126f047f (Feature modified : remove time library it isn't used) from datetime import timedelta from unittest.mock import MagicMock, call, patch