-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Replace unittests in amazon provider tests by pure pytest
#27970
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace unittests in amazon provider tests by pure pytest
#27970
Conversation
e9fbf46 to
191a6e6
Compare
191a6e6 to
48ce15b
Compare
o-nikolas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's very difficult to review such a large PR for correctness, but I spot checked a handful of code and it seems like it looks good
Taragolis
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @o-nikolas mention that this kind of PR very hard to check (without losing concentration).
I add comments for anything which can not be classified as 'straightforward changes'
|
|
||
| class TestEksCreateClusterOperator(unittest.TestCase): | ||
| def setUp(self) -> None: | ||
| class TestEksCreateClusterOperator: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- replace TestCase.subTest by parametrize tests
- Rename method
nodegroup_setUptonodegroup_setup - Rename method
fargate_profile_setuptofargate_profile_setup
| class TestEksCreateFargateProfileOperator(unittest.TestCase): | ||
| def setUp(self) -> None: | ||
| self.create_fargate_profile_params: CreateFargateProfileParams = dict( # type: ignore | ||
| class TestEksCreateFargateProfileOperator: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace TestCase.subTest by parametrize tests
| mock_create_fargate_profile.assert_called_with(**convert_keys(parameters)) | ||
|
|
||
|
|
||
| class TestEksCreateNodegroupOperator: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace TestCase.subTest by parametrize tests
|
|
||
| mock_list_nodegroups.assert_called_once |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actually do nothing right now if convert to mock_list_nodegroups.assert_called_once() than it failed.
Remove this for now
| @pytest.mark.parametrize( | ||
| "job_flow_id, job_flow_name", | ||
| [ | ||
| pytest.param("j-8989898989", "test_cluster", id="both-specified"), | ||
| pytest.param(None, None, id="both-none"), | ||
| ], | ||
| ) | ||
| def test_validate_mutually_exclusive_args(self, job_flow_id, job_flow_name): | ||
| error_message = r"Exactly one of job_flow_id or job_flow_name must be specified\." | ||
| with pytest.raises(AirflowException, match=error_message): | ||
| EmrAddStepsOperator( | ||
| task_id="test_validate_mutually_exclusive_args", | ||
| job_flow_id=job_flow_id, | ||
| job_flow_name=job_flow_name, | ||
| ) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add this test from #27858
| self.create_transform_params = copy.deepcopy(CREATE_TRANSFORM_PARAMS) | ||
| self.create_model_params = copy.deepcopy(CREATE_MODEL_PARAMS) | ||
| self.config = {"Model": self.create_model_params, "Transform": self.create_transform_params} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After migrate to pytest some test failed due to mutability of test parameters so this parameters recreate for each test case.
| @mock.patch.object(AthenaHook, "poll_query_status", side_effect=("SUCCEEDED",)) | ||
| def test_poke_success(self, mock_poll_query_status): | ||
| assert self.sensor.poke({}) | ||
|
|
||
| @mock.patch.object(AthenaHook, "poll_query_status", side_effect=("RUNNING",)) | ||
| def test_poke_running(self, mock_poll_query_status): | ||
| assert not self.sensor.poke({}) | ||
|
|
||
| @mock.patch.object(AthenaHook, "poll_query_status", side_effect=("QUEUED",)) | ||
| def test_poke_queued(self, mock_poll_query_status): | ||
| assert not self.sensor.poke({}) | ||
|
|
||
| @mock.patch.object(AthenaHook, "poll_query_status", side_effect=("FAILED",)) | ||
| def test_poke_failed(self, mock_poll_query_status): | ||
| with pytest.raises(AirflowException) as ctx: | ||
| self.sensor.poke({}) | ||
| assert "Athena sensor failed" in str(ctx.value) | ||
|
|
||
| @mock.patch.object(AthenaHook, "poll_query_status", side_effect=("CANCELLED",)) | ||
| def test_poke_cancelled(self, mock_poll_query_status): | ||
| with pytest.raises(AirflowException) as ctx: | ||
| self.sensor.poke({}) | ||
| assert "Athena sensor failed" in str(ctx.value) | ||
| @pytest.mark.parametrize("poll_query_status", ["SUCCEEDED"]) | ||
| def test_poke_true_on_status(self, poll_query_status): | ||
| with mock.patch.object(AthenaHook, "poll_query_status", side_effect=[poll_query_status]): | ||
| assert self.sensor.poke({}) | ||
|
|
||
| @pytest.mark.parametrize("poll_query_status", ["RUNNING", "QUEUED"]) | ||
| def test_poke_false_on_status(self, poll_query_status): | ||
| with mock.patch.object(AthenaHook, "poll_query_status", side_effect=[poll_query_status]): | ||
| assert not self.sensor.poke({}) | ||
|
|
||
| @pytest.mark.parametrize("poll_query_status", ["FAILED", "CANCELLED"]) | ||
| def test_poke_raise_on_status(self, poll_query_status): | ||
| with mock.patch.object(AthenaHook, "poll_query_status", side_effect=[poll_query_status]): | ||
| with pytest.raises(AirflowException, match=r"Athena sensor failed"): | ||
| self.sensor.poke({}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merge same tests into parametrize tests
| @mock.patch.object(DmsHook, "get_task_status", side_effect=("stopped",)) | ||
| def test_poke_stopped(self, mock_get_task_status): | ||
| assert self.sensor.poke(None) | ||
|
|
||
| @mock.patch.object(DmsHook, "get_task_status", side_effect=("running",)) | ||
| def test_poke_running(self, mock_get_task_status): | ||
| assert not self.sensor.poke(None) | ||
|
|
||
| @mock.patch.object(DmsHook, "get_task_status", side_effect=("starting",)) | ||
| def test_poke_starting(self, mock_get_task_status): | ||
| assert not self.sensor.poke(None) | ||
|
|
||
| @mock.patch.object(DmsHook, "get_task_status", side_effect=("ready",)) | ||
| def test_poke_ready(self, mock_get_task_status): | ||
| with pytest.raises(AirflowException) as ctx: | ||
| self.sensor.poke(None) | ||
| assert "Unexpected status: ready" in str(ctx.value) | ||
|
|
||
| @mock.patch.object(DmsHook, "get_task_status", side_effect=("creating",)) | ||
| def test_poke_creating(self, mock_get_task_status): | ||
| with pytest.raises(AirflowException) as ctx: | ||
| self.sensor.poke(None) | ||
| assert "Unexpected status: creating" in str(ctx.value) | ||
|
|
||
| @mock.patch.object(DmsHook, "get_task_status", side_effect=("failed",)) | ||
| def test_poke_failed(self, mock_get_task_status): | ||
| with pytest.raises(AirflowException) as ctx: | ||
| self.sensor.poke(None) | ||
| assert "Unexpected status: failed" in str(ctx.value) | ||
|
|
||
| @mock.patch.object(DmsHook, "get_task_status", side_effect=("deleting",)) | ||
| def test_poke_deleting(self, mock_get_task_status): | ||
| with pytest.raises(AirflowException) as ctx: | ||
| self.sensor.poke(None) | ||
| assert "Unexpected status: deleting" in str(ctx.value) | ||
| @pytest.mark.parametrize("task_status", ["stopped"]) | ||
| def test_poke_true_on_status(self, task_status): | ||
| with mock.patch.object(DmsHook, "get_task_status", side_effect=[task_status]): | ||
| assert self.sensor.poke({}) | ||
|
|
||
| @pytest.mark.parametrize("task_status", ["running", "starting"]) | ||
| def test_poke_false_on_status(self, task_status): | ||
| with mock.patch.object(DmsHook, "get_task_status", side_effect=[task_status]): | ||
| assert not self.sensor.poke({}) | ||
|
|
||
| @pytest.mark.parametrize("task_status", ["ready", "creating", "failed", "deleting"]) | ||
| def test_poke_raise_unexpected_status_on_status(self, task_status): | ||
| with mock.patch.object(DmsHook, "get_task_status", side_effect=[task_status]): | ||
| with pytest.raises(AirflowException, match=rf"Unexpected status: {task_status}"): | ||
| self.sensor.poke({}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merge same tests into parametrize tests
| @mock.patch.object(EmrContainerHook, "check_query_status", side_effect=("PENDING",)) | ||
| def test_poke_pending(self, mock_check_query_status): | ||
| assert not self.sensor.poke(None) | ||
|
|
||
| @mock.patch.object(EmrContainerHook, "check_query_status", side_effect=("SUBMITTED",)) | ||
| def test_poke_submitted(self, mock_check_query_status): | ||
| assert not self.sensor.poke(None) | ||
|
|
||
| @mock.patch.object(EmrContainerHook, "check_query_status", side_effect=("RUNNING",)) | ||
| def test_poke_running(self, mock_check_query_status): | ||
| assert not self.sensor.poke(None) | ||
|
|
||
| @mock.patch.object(EmrContainerHook, "check_query_status", side_effect=("COMPLETED",)) | ||
| def test_poke_completed(self, mock_check_query_status): | ||
| assert self.sensor.poke(None) | ||
|
|
||
| @mock.patch.object(EmrContainerHook, "check_query_status", side_effect=("FAILED",)) | ||
| def test_poke_failed(self, mock_check_query_status): | ||
| with pytest.raises(AirflowException) as ctx: | ||
| self.sensor.poke(None) | ||
| assert "EMR Containers sensor failed" in str(ctx.value) | ||
|
|
||
| @mock.patch.object(EmrContainerHook, "check_query_status", side_effect=("CANCELLED",)) | ||
| def test_poke_cancelled(self, mock_check_query_status): | ||
| with pytest.raises(AirflowException) as ctx: | ||
| self.sensor.poke(None) | ||
| assert "EMR Containers sensor failed" in str(ctx.value) | ||
|
|
||
| @mock.patch.object(EmrContainerHook, "check_query_status", side_effect=("CANCEL_PENDING",)) | ||
| def test_poke_cancel_pending(self, mock_check_query_status): | ||
| with pytest.raises(AirflowException) as ctx: | ||
| self.sensor.poke(None) | ||
| assert "EMR Containers sensor failed" in str(ctx.value) | ||
| @pytest.mark.parametrize("query_status", ["COMPLETED"]) | ||
| def test_poke_true_on_query_status(self, query_status): | ||
| with mock.patch.object(EmrContainerHook, "check_query_status", side_effect=[query_status]): | ||
| assert self.sensor.poke({}) | ||
|
|
||
| @pytest.mark.parametrize("query_status", ["PENDING", "SUBMITTED", "RUNNING"]) | ||
| def test_poke_false_on_query_status(self, query_status): | ||
| with mock.patch.object(EmrContainerHook, "check_query_status", side_effect=[query_status]): | ||
| assert not self.sensor.poke({}) | ||
|
|
||
| @pytest.mark.parametrize("query_status", ["FAILED", "CANCELLED", "CANCEL_PENDING"]) | ||
| def test_poke_raise_on_query_status(self, query_status): | ||
| with mock.patch.object(EmrContainerHook, "check_query_status", side_effect=[query_status]): | ||
| with pytest.raises(AirflowException, match=r"EMR Containers sensor failed"): | ||
| assert not self.sensor.poke({}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merge same tests into parametrize tests
| def setup_method(self): | ||
| self.create_training_params = copy.deepcopy(CREATE_TRAINING_PARAMS) | ||
| self.sagemaker = SageMakerTrainingOperator( | ||
| task_id="test_sagemaker_operator", | ||
| config=CREATE_TRAINING_PARAMS, | ||
| config=self.create_training_params, | ||
| wait_for_completion=False, | ||
| check_interval=5, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After migrate to pytest some test failed due to mutability of test parameters so this parameters recreate for each test case.
| query_status = self.emr_containers.poll_query_status(job_id="job123456", max_polling_attempts=2) | ||
| query_status = self.emr_containers.poll_query_status( | ||
| job_id="job123456", max_polling_attempts=2, poll_interval=2 | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This simple change actually speedup test execution from 30 sec to 2 seconds
before
============================ slowest 100 durations =============================
30.03s call tests/providers/amazon/aws/hooks/test_emr_containers.py::TestEmrContainerHook::test_query_status_polling_with_timeout
after
2.01s call tests/providers/amazon/aws/hooks/test_emr_containers.py::TestEmrContainerHook::test_query_status_polling_with_timeout
|
I decide split this changes to multiple separate PR's
|
Migrate Amazon provider's tests to
pytest.All changes are more or less straightforward:
unittests.TestCaseclass and TestCase.assert* methodsparameterized.expandbypytest.mark.parametrize. Irequests_mock.mockdecorator byrequests_mockfixtureTestCase.subTestto parametrize testsSee additional findings, info about significant changes and potential follow up in comments