From c1875ea7831062e51dc94c44a423e0bb580c9b61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Tue, 18 Apr 2023 11:43:41 -0700 Subject: [PATCH 1/5] add a stop operator to emr serverless --- airflow/providers/amazon/aws/operators/emr.py | 94 ++++++++++++++----- .../aws/operators/test_emr_serverless.py | 30 +++++- .../amazon/aws/example_emr_serverless.py | 11 +++ 3 files changed, 109 insertions(+), 26 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py index 6bd1df4b3ecff..8d80dfe731fd0 100644 --- a/airflow/providers/amazon/aws/operators/emr.py +++ b/airflow/providers/amazon/aws/operators/emr.py @@ -1010,21 +1010,21 @@ def execute(self, context: Context) -> dict: return response["jobRunId"] -class EmrServerlessDeleteApplicationOperator(BaseOperator): +class EmrServerlessStopApplicationOperator(BaseOperator): """ - Operator to delete EMR Serverless application + Operator to stop an EMR Serverless application .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrServerlessDeleteApplicationOperator` - :param application_id: ID of the EMR Serverless application to delete. - :param wait_for_completion: If true, wait for the Application to start before returning. Default to True + :param application_id: ID of the EMR Serverless application to stop. + :param wait_for_completion: If true, wait for the Application to stop before returning. Default to True :param aws_conn_id: AWS connection to use :param waiter_countdown: Total amount of time, in seconds, the operator will wait for - the application be deleted. Defaults to 25 minutes. + the application be stopped. Defaults to 5 minutes. :param waiter_check_interval_seconds: Number of seconds between polling the state of the application. - Defaults to 60 seconds. + Defaults to 30 seconds. """ template_fields: Sequence[str] = ("application_id",) @@ -1034,8 +1034,8 @@ def __init__( application_id: str, wait_for_completion: bool = True, aws_conn_id: str = "aws_default", - waiter_countdown: int = 25 * 60, - waiter_check_interval_seconds: int = 60, + waiter_countdown: int = 5 * 60, + waiter_check_interval_seconds: int = 30, **kwargs, ): self.aws_conn_id = aws_conn_id @@ -1054,28 +1054,76 @@ def execute(self, context: Context) -> None: self.log.info("Stopping application: %s", self.application_id) self.hook.conn.stop_application(applicationId=self.application_id) - # This should be replaced with a boto waiter when available. - waiter( - get_state_callable=self.hook.conn.get_application, - get_state_args={ - "applicationId": self.application_id, - }, - parse_response=["application", "state"], - desired_state=EmrServerlessHook.APPLICATION_FAILURE_STATES, - failure_states=set(), - object_type="application", - action="stopped", - countdown=self.waiter_countdown, - check_interval_seconds=self.waiter_check_interval_seconds, + if self.wait_for_completion: + # This should be replaced with a boto waiter when available. + waiter( + get_state_callable=self.hook.conn.get_application, + get_state_args={ + "applicationId": self.application_id, + }, + parse_response=["application", "state"], + desired_state=EmrServerlessHook.APPLICATION_FAILURE_STATES, + failure_states=set(), + object_type="application", + action="stopped", + countdown=self.waiter_countdown, + check_interval_seconds=self.waiter_check_interval_seconds, + ) + self.log.info("EMR serverless application %s stopped successfully", self.application_id) + + +class EmrServerlessDeleteApplicationOperator(EmrServerlessStopApplicationOperator): + """ + Operator to delete EMR Serverless application + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:EmrServerlessDeleteApplicationOperator` + + :param application_id: ID of the EMR Serverless application to delete. + :param wait_for_completion: If true, wait for the Application to be deleted before returning. + Defaults to True. Note that this operator will always wait for the application to be STOPPED first. + :param aws_conn_id: AWS connection to use + :param waiter_countdown: Total amount of time, in seconds, the operator will wait for each step of first, + the application to be stopped, and then deleted. Defaults to 25 minutes. + :param waiter_check_interval_seconds: Number of seconds between polling the state of the application. + Defaults to 60 seconds. + """ + + template_fields: Sequence[str] = ("application_id",) + + def __init__( + self, + application_id: str, + wait_for_completion: bool = True, + aws_conn_id: str = "aws_default", + waiter_countdown: int = 25 * 60, + waiter_check_interval_seconds: int = 60, + **kwargs, + ): + self.wait_for_delete_completion = wait_for_completion + # super stops the app + super().__init__( + application_id=application_id, + # when deleting an app, we always need to wait for it to stop before we can call delete() + wait_for_completion=True, + aws_conn_id=aws_conn_id, + waiter_countdown=waiter_countdown, + waiter_check_interval_seconds=waiter_check_interval_seconds, + **kwargs, ) - self.log.info("Deleting application: %s", self.application_id) + def execute(self, context: Context) -> None: + # super stops the app (or makes sure it's already stopped) + super().execute(context) + + self.log.info("Now deleting application: %s", self.application_id) response = self.hook.conn.delete_application(applicationId=self.application_id) if response["ResponseMetadata"]["HTTPStatusCode"] != 200: raise AirflowException(f"Application deletion failed: {response}") - if self.wait_for_completion: + if self.wait_for_delete_completion: # This should be replaced with a boto waiter when available. waiter( get_state_callable=self.hook.conn.get_application, diff --git a/tests/providers/amazon/aws/operators/test_emr_serverless.py b/tests/providers/amazon/aws/operators/test_emr_serverless.py index 976bcee63fb17..698710f0a1c7d 100644 --- a/tests/providers/amazon/aws/operators/test_emr_serverless.py +++ b/tests/providers/amazon/aws/operators/test_emr_serverless.py @@ -17,6 +17,7 @@ from __future__ import annotations from unittest import mock +from unittest.mock import MagicMock from uuid import UUID import pytest @@ -26,6 +27,7 @@ EmrServerlessCreateApplicationOperator, EmrServerlessDeleteApplicationOperator, EmrServerlessStartJobOperator, + EmrServerlessStopApplicationOperator, ) task_id = "test_emr_serverless_task_id" @@ -606,14 +608,13 @@ def test_delete_application_without_wait_for_completion_successfully(self, mock_ operator.execute(None) - assert operator.wait_for_completion is False mock_waiter.assert_called_once() mock_conn.stop_application.assert_called_once() mock_conn.delete_application.assert_called_once_with(applicationId=application_id_delete_operator) @mock.patch("airflow.providers.amazon.aws.operators.emr.waiter") @mock.patch("airflow.providers.amazon.aws.hooks.emr.EmrServerlessHook.conn") - def test_delete_application_failed_deleteion(self, mock_conn, mock_waiter): + def test_delete_application_failed_deletion(self, mock_conn, mock_waiter): mock_waiter.return_value = True mock_conn.stop_application.return_value = {} mock_conn.delete_application.return_value = {"ResponseMetadata": {"HTTPStatusCode": 400}} @@ -626,7 +627,30 @@ def test_delete_application_failed_deleteion(self, mock_conn, mock_waiter): assert "Application deletion failed:" in str(ex_message.value) - assert operator.wait_for_completion is True mock_waiter.assert_called_once() mock_conn.stop_application.assert_called_once() mock_conn.delete_application.assert_called_once_with(applicationId=application_id_delete_operator) + + +class TestEmrServerlessStopOperator: + @mock.patch("airflow.providers.amazon.aws.operators.emr.waiter") + @mock.patch("airflow.providers.amazon.aws.hooks.emr.EmrServerlessHook.conn") + def test_stop(self, mock_conn: MagicMock, mock_waiter: MagicMock): + operator = EmrServerlessStopApplicationOperator(task_id=task_id, application_id="test") + + operator.execute(None) + + mock_waiter.assert_called_once() + mock_conn.stop_application.assert_called_once() + + @mock.patch("airflow.providers.amazon.aws.operators.emr.waiter") + @mock.patch("airflow.providers.amazon.aws.hooks.emr.EmrServerlessHook.conn") + def test_stop_no_wait(self, mock_conn: MagicMock, mock_waiter: MagicMock): + operator = EmrServerlessStopApplicationOperator( + task_id=task_id, application_id="test", wait_for_completion=False + ) + + operator.execute(None) + + mock_waiter.assert_not_called() + mock_conn.stop_application.assert_called_once() diff --git a/tests/system/providers/amazon/aws/example_emr_serverless.py b/tests/system/providers/amazon/aws/example_emr_serverless.py index 6d8a669c3e97e..8bd20a0dccc3c 100644 --- a/tests/system/providers/amazon/aws/example_emr_serverless.py +++ b/tests/system/providers/amazon/aws/example_emr_serverless.py @@ -26,6 +26,7 @@ EmrServerlessCreateApplicationOperator, EmrServerlessDeleteApplicationOperator, EmrServerlessStartJobOperator, + EmrServerlessStopApplicationOperator, ) from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator from airflow.providers.amazon.aws.sensors.emr import EmrServerlessApplicationSensor, EmrServerlessJobSensor @@ -108,6 +109,15 @@ job_run_id=start_job.output, ) # [END howto_sensor_emr_serverless_job] + wait_for_job.poke_interval = 10 + + # [START howto_operator_emr_serverless_stop_application] + stop_app = EmrServerlessStopApplicationOperator( + task_id="stop_application", + application_id=emr_serverless_app_id, + ) + # [END howto_operator_emr_serverless_stop_application] + stop_app.waiter_check_interval_seconds = 1 # [START howto_operator_emr_serverless_delete_application] delete_app = EmrServerlessDeleteApplicationOperator( @@ -134,6 +144,7 @@ wait_for_app_creation, start_job, wait_for_job, + stop_app, # TEST TEARDOWN delete_app, delete_s3_bucket, From 569f2d85bed16c407dfb30b6819c413fa5d7b584 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Tue, 18 Apr 2023 13:28:52 -0700 Subject: [PATCH 2/5] update doc --- airflow/providers/amazon/aws/operators/emr.py | 2 +- .../operators/emr/emr_serverless.rst | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py index 8d80dfe731fd0..f5f23cccf6531 100644 --- a/airflow/providers/amazon/aws/operators/emr.py +++ b/airflow/providers/amazon/aws/operators/emr.py @@ -1016,7 +1016,7 @@ class EmrServerlessStopApplicationOperator(BaseOperator): .. seealso:: For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:EmrServerlessDeleteApplicationOperator` + :ref:`howto/operator:EmrServerlessStopApplicationOperator` :param application_id: ID of the EMR Serverless application to stop. :param wait_for_completion: If true, wait for the Application to stop before returning. Default to True diff --git a/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst b/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst index 6a30f52825cb4..8bc11f04a1fdf 100644 --- a/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst +++ b/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst @@ -63,6 +63,18 @@ start an EMR Serverless Job. .. _howto/operator:EmrServerlessDeleteApplicationOperator: +Stop an EMR Serverless Application +================================== + +You can use :class:`~airflow.providers.amazon.aws.operators.emr.EmrServerlessStopApplicationOperator` to +stop an EMR Serverless Application. + +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_serverless.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_emr_serverless_stop_application] + :end-before: [END howto_operator_emr_serverless_stop_application] + Delete an EMR Serverless Application ==================================== From b6cc6d36784aac5cbea552069696b1379111bd6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= <114772123+vandonr-amz@users.noreply.github.com> Date: Tue, 18 Apr 2023 14:27:23 -0700 Subject: [PATCH 3/5] update label in doc Co-authored-by: Vincent <97131062+vincbeck@users.noreply.github.com> --- .../operators/emr/emr_serverless.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst b/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst index 8bc11f04a1fdf..a9cfbcce49bea 100644 --- a/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst +++ b/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst @@ -61,7 +61,7 @@ start an EMR Serverless Job. :start-after: [START howto_operator_emr_serverless_start_job] :end-before: [END howto_operator_emr_serverless_start_job] -.. _howto/operator:EmrServerlessDeleteApplicationOperator: +.. _howto/operator:EmrServerlessStopApplicationOperator: Stop an EMR Serverless Application ================================== From 2f93e8685b0a65c25cb9fd09e712ad6345801cae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= <114772123+vandonr-amz@users.noreply.github.com> Date: Tue, 18 Apr 2023 14:27:35 -0700 Subject: [PATCH 4/5] update label in doc Co-authored-by: Vincent <97131062+vincbeck@users.noreply.github.com> --- .../operators/emr/emr_serverless.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst b/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst index a9cfbcce49bea..033a172e0b984 100644 --- a/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst +++ b/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst @@ -75,6 +75,7 @@ stop an EMR Serverless Application. :start-after: [START howto_operator_emr_serverless_stop_application] :end-before: [END howto_operator_emr_serverless_stop_application] +.. _howto/operator:EmrServerlessDeleteApplicationOperator: Delete an EMR Serverless Application ==================================== From 2372235225cb0abd8bee571cf17c21137a21d245 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= <114772123+vandonr-amz@users.noreply.github.com> Date: Wed, 19 Apr 2023 09:57:02 -0700 Subject: [PATCH 5/5] fix missing newline in doc Co-authored-by: Vincent <97131062+vincbeck@users.noreply.github.com> --- .../operators/emr/emr_serverless.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst b/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst index 033a172e0b984..4f74690da00c9 100644 --- a/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst +++ b/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst @@ -76,6 +76,7 @@ stop an EMR Serverless Application. :end-before: [END howto_operator_emr_serverless_stop_application] .. _howto/operator:EmrServerlessDeleteApplicationOperator: + Delete an EMR Serverless Application ====================================