From 1754bfdae5797aef0984a0019f6bfe7ffb7a128c Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Tue, 5 Dec 2023 17:01:43 -0800 Subject: [PATCH 1/6] Add support for Capacity Providers --- .../aws/executors/ecs/ecs_executor_config.py | 21 +++++++++++++++ .../amazon/aws/executors/ecs/utils.py | 2 +- airflow/providers/amazon/provider.yaml | 19 ++++++++++++- .../aws/executors/ecs/test_ecs_executor.py | 27 +++++++++++++++++++ 4 files changed, 67 insertions(+), 2 deletions(-) diff --git a/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py b/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py index f0fa97852a0d6..c9345ad81aa4d 100644 --- a/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py +++ b/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py @@ -40,6 +40,7 @@ camelize_dict_keys, parse_assign_public_ip, ) +from airflow.providers.amazon.aws.hooks.ecs import EcsHook from airflow.utils.helpers import prune_dict @@ -60,6 +61,26 @@ def build_task_kwargs() -> dict: task_kwargs = _fetch_config_values() task_kwargs.update(_fetch_templated_kwargs()) + try: + has_launch_type: bool = task_kwargs.get("launch_type") is not None + has_capacity_provider: bool = task_kwargs.get("capacity_provider_strategy") is not None + + if has_capacity_provider and has_launch_type: + raise ValueError( + "capacity_provider_strategy and launch_type are mutually exclusive, you can not provide both." + ) + elif not (has_capacity_provider or has_launch_type): + # Default API behavior if neither is provided is to fall back on the default capacity + # provider if it exists. Since it is not a required value, check if there is one + # before using it, and if there is not then use the FARGATE launch_type as + # the final fallback. + cluster = EcsHook().conn.describe_clusters(clusters=[task_kwargs["cluster"]])["clusters"][0] + if not cluster.get("defaultCapacityProviderStrategy"): + task_kwargs["launch_type"] = "FARGATE" + + except IndexError: + pass + # There can only be 1 count of these containers task_kwargs["count"] = 1 # type: ignore # There could be a generic approach to the below, but likely more convoluted then just manually ensuring diff --git a/airflow/providers/amazon/aws/executors/ecs/utils.py b/airflow/providers/amazon/aws/executors/ecs/utils.py index 4966fa3d2b8bc..7913bdf22719c 100644 --- a/airflow/providers/amazon/aws/executors/ecs/utils.py +++ b/airflow/providers/amazon/aws/executors/ecs/utils.py @@ -44,7 +44,6 @@ "conn_id": "aws_default", "max_run_task_attempts": "3", "assign_public_ip": "False", - "launch_type": "FARGATE", "platform_version": "LATEST", "check_health_on_startup": "True", } @@ -81,6 +80,7 @@ class RunTaskKwargsConfigKeys(BaseConfigKeys): """Keys loaded into the config which are valid ECS run_task kwargs.""" ASSIGN_PUBLIC_IP = "assign_public_ip" + CAPACITY_PROVIDER_STRATEGY = "capacity_provider_strategy" CLUSTER = "cluster" LAUNCH_TYPE = "launch_type" PLATFORM_VERSION = "platform_version" diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index d0f379e73df4f..80124e89f1154 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -828,6 +828,19 @@ config: type: string example: "ecs_executor_cluster" default: ~ + capacity_provider_strategy: + description: | + The capacity provider strategy to use for the task. + + If a capacityProviderStrategy is specified, the launchType parameter must be omitted. If + no capacityProviderStrategy or launchType is specified, the defaultCapacityProviderStrategy + for the cluster is used, if present. + + When you use cluster auto scaling, you must specify capacityProviderStrategy and not launchType. + version_added: "8.13" + type: string + example: "[{'capacityProvider': 'cp1', 'weight': 5}, {'capacityProvider': 'cp2', 'weight': 1}]" + default: ~ container_name: description: | Name of the container that will be used to execute Airflow tasks via the ECS executor. @@ -843,6 +856,10 @@ config: Launch type can either be 'FARGATE' OR 'EC2'. For more info see url to Boto3 docs above. + If a launchType is specified, the capacityProviderStrategy parameter must be omitted. If + no capacityProviderStrategy or launchType is specified, the defaultCapacityProviderStrategy + for the cluster is used, if present. + If the launch type is EC2, the executor will attempt to place tasks on empty EC2 instances. If there are no EC2 instances available, no task is placed and this function will be called again in the next heart-beat. @@ -852,7 +869,7 @@ config: version_added: "8.10" type: string example: "FARGATE" - default: "FARGATE" + default: ~ platform_version: description: | The platform version the task uses. A platform version is only specified diff --git a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py index 600f2597d3c34..0b842ca0da7c8 100644 --- a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py +++ b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py @@ -1086,3 +1086,30 @@ def test_start_health_check_config(self, set_env_vars): executor.start() ecs_mock.stop_task.assert_not_called() + + def test_providing_both_capacity_provider_and_launch_type_fails(self, set_env_vars): + os.environ[ + f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.CAPACITY_PROVIDER_STRATEGY}".upper() + ] = "[{'capacityProvider': 'cp1', 'weight': 5}, {'capacityProvider': 'cp2', 'weight': 1}]" + expected_error = ( + "capacity_provider_strategy and launch_type are mutually exclusive, you can not provide both." + ) + + with pytest.raises(ValueError, match=expected_error): + AwsEcsExecutor() + + def test_providing_capacity_provider(self, set_env_vars): + valid_capacity_provider = \ + "[{'capacityProvider': 'cp1', 'weight': 5}, {'capacityProvider': 'cp2', 'weight': 1}]" + + os.environ[ + f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.CAPACITY_PROVIDER_STRATEGY}".upper() + ] = valid_capacity_provider + os.environ.pop(f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.LAUNCH_TYPE}".upper()) + + from airflow.providers.amazon.aws.executors.ecs import ecs_executor_config + task_kwargs = ecs_executor_config.build_task_kwargs() + + assert "launchType" not in task_kwargs + assert task_kwargs["capacityProviderStrategy"] == valid_capacity_provider + From cf3b2289828a60fc23378158e4de3483b4a4645b Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Wed, 10 Jan 2024 11:46:28 -0800 Subject: [PATCH 2/6] Add support for Capacity Providers --- .../amazon/aws/executors/ecs/ecs_executor_config.py | 2 +- .../amazon/aws/executors/ecs/test_ecs_executor.py | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py b/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py index c9345ad81aa4d..70c61dd9631fc 100644 --- a/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py +++ b/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py @@ -78,7 +78,7 @@ def build_task_kwargs() -> dict: if not cluster.get("defaultCapacityProviderStrategy"): task_kwargs["launch_type"] = "FARGATE" - except IndexError: + except KeyError: pass # There can only be 1 count of these containers diff --git a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py index 0b842ca0da7c8..5229402459e78 100644 --- a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py +++ b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py @@ -47,6 +47,7 @@ _recursive_flatten_dict, parse_assign_public_ip, ) +from airflow.providers.amazon.aws.hooks.ecs import EcsHook from airflow.utils.helpers import convert_camel_to_snake from airflow.utils.state import State, TaskInstanceState @@ -943,8 +944,10 @@ def test_provided_values_override_defaults(self, assign_subnets): assert task_kwargs["platformVersion"] == templated_version - def test_count_can_not_be_modified_by_the_user(self, assign_subnets): + @mock.patch.object(EcsHook, "conn") + def test_count_can_not_be_modified_by_the_user(self, mock_conn, assign_subnets): """The ``count`` parameter must always be 1; verify that the user can not override this value.""" + mock_conn.describe_clusters.return_value = {"clusters": [{"status": "ACTIVE"}]} templated_version = "1" templated_cluster = "templated_cluster_name" @@ -1099,8 +1102,9 @@ def test_providing_both_capacity_provider_and_launch_type_fails(self, set_env_va AwsEcsExecutor() def test_providing_capacity_provider(self, set_env_vars): - valid_capacity_provider = \ + valid_capacity_provider = ( "[{'capacityProvider': 'cp1', 'weight': 5}, {'capacityProvider': 'cp2', 'weight': 1}]" + ) os.environ[ f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.CAPACITY_PROVIDER_STRATEGY}".upper() @@ -1108,8 +1112,8 @@ def test_providing_capacity_provider(self, set_env_vars): os.environ.pop(f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.LAUNCH_TYPE}".upper()) from airflow.providers.amazon.aws.executors.ecs import ecs_executor_config + task_kwargs = ecs_executor_config.build_task_kwargs() assert "launchType" not in task_kwargs assert task_kwargs["capacityProviderStrategy"] == valid_capacity_provider - From 868c607584ef505fdc1dc8b1875e7194cb28379f Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Wed, 10 Jan 2024 12:26:04 -0800 Subject: [PATCH 3/6] Fix release version and docstring --- airflow/providers/amazon/provider.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index 80124e89f1154..977adc408f3c3 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -832,12 +832,12 @@ config: description: | The capacity provider strategy to use for the task. - If a capacityProviderStrategy is specified, the launchType parameter must be omitted. If - no capacityProviderStrategy or launchType is specified, the defaultCapacityProviderStrategy + If a Capacity Provider Strategy is specified, the Launch Type parameter must be omitted. If + no Capacity Provider Strategy or Launch Type is specified, the Default CapacityProvider Strategy for the cluster is used, if present. - When you use cluster auto scaling, you must specify capacityProviderStrategy and not launchType. - version_added: "8.13" + When you use cluster auto scaling, you must specify Capacity Provider Strategy and not Launch Type. + version_added: "8.17" type: string example: "[{'capacityProvider': 'cp1', 'weight': 5}, {'capacityProvider': 'cp2', 'weight': 1}]" default: ~ From d460f18df737b00400d80641b17140b971532ad5 Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Wed, 10 Jan 2024 12:30:31 -0800 Subject: [PATCH 4/6] docstring revision --- airflow/providers/amazon/provider.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index 977adc408f3c3..09af54bcc9a6e 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -856,8 +856,8 @@ config: Launch type can either be 'FARGATE' OR 'EC2'. For more info see url to Boto3 docs above. - If a launchType is specified, the capacityProviderStrategy parameter must be omitted. If - no capacityProviderStrategy or launchType is specified, the defaultCapacityProviderStrategy + If a Launch Type is specified, the Capacity Provider Strategy parameter must be omitted. If + no Capacity Provider Strategy or Launch Type is specified, the Default Capacity Provider Strategy for the cluster is used, if present. If the launch type is EC2, the executor will attempt to place tasks on From f18fc3d7fb21e209dcef85bf01cdd30f55b34efd Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Wed, 10 Jan 2024 14:25:05 -0800 Subject: [PATCH 5/6] Niko suggestions --- .../providers/amazon/aws/executors/ecs/ecs_executor_config.py | 4 ++-- tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py | 4 +--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py b/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py index 70c61dd9631fc..4ac58410ba63b 100644 --- a/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py +++ b/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py @@ -62,8 +62,8 @@ def build_task_kwargs() -> dict: task_kwargs.update(_fetch_templated_kwargs()) try: - has_launch_type: bool = task_kwargs.get("launch_type") is not None - has_capacity_provider: bool = task_kwargs.get("capacity_provider_strategy") is not None + has_launch_type: bool = "launch_type" in task_kwargs + has_capacity_provider: bool = "capacity_provider_strategy" in task_kwargs if has_capacity_provider and has_launch_type: raise ValueError( diff --git a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py index 5229402459e78..ca92cdfba4d2d 100644 --- a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py +++ b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py @@ -945,10 +945,8 @@ def test_provided_values_override_defaults(self, assign_subnets): assert task_kwargs["platformVersion"] == templated_version @mock.patch.object(EcsHook, "conn") - def test_count_can_not_be_modified_by_the_user(self, mock_conn, assign_subnets): + def test_count_can_not_be_modified_by_the_user(self, _, assign_subnets): """The ``count`` parameter must always be 1; verify that the user can not override this value.""" - mock_conn.describe_clusters.return_value = {"clusters": [{"status": "ACTIVE"}]} - templated_version = "1" templated_cluster = "templated_cluster_name" provided_run_task_kwargs = { From 056d88704f66c4e9851cd644ad190394ac6c0acc Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Wed, 10 Jan 2024 15:28:34 -0800 Subject: [PATCH 6/6] Improved testing and PR suggestions --- .../aws/executors/ecs/ecs_executor_config.py | 34 ++++++++----------- .../aws/executors/ecs/test_ecs_executor.py | 32 +++++++++++++++++ 2 files changed, 47 insertions(+), 19 deletions(-) diff --git a/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py b/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py index 4ac58410ba63b..4f57b72d96aef 100644 --- a/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py +++ b/airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py @@ -61,25 +61,21 @@ def build_task_kwargs() -> dict: task_kwargs = _fetch_config_values() task_kwargs.update(_fetch_templated_kwargs()) - try: - has_launch_type: bool = "launch_type" in task_kwargs - has_capacity_provider: bool = "capacity_provider_strategy" in task_kwargs - - if has_capacity_provider and has_launch_type: - raise ValueError( - "capacity_provider_strategy and launch_type are mutually exclusive, you can not provide both." - ) - elif not (has_capacity_provider or has_launch_type): - # Default API behavior if neither is provided is to fall back on the default capacity - # provider if it exists. Since it is not a required value, check if there is one - # before using it, and if there is not then use the FARGATE launch_type as - # the final fallback. - cluster = EcsHook().conn.describe_clusters(clusters=[task_kwargs["cluster"]])["clusters"][0] - if not cluster.get("defaultCapacityProviderStrategy"): - task_kwargs["launch_type"] = "FARGATE" - - except KeyError: - pass + has_launch_type: bool = "launch_type" in task_kwargs + has_capacity_provider: bool = "capacity_provider_strategy" in task_kwargs + + if has_capacity_provider and has_launch_type: + raise ValueError( + "capacity_provider_strategy and launch_type are mutually exclusive, you can not provide both." + ) + elif "cluster" in task_kwargs and not (has_capacity_provider or has_launch_type): + # Default API behavior if neither is provided is to fall back on the default capacity + # provider if it exists. Since it is not a required value, check if there is one + # before using it, and if there is not then use the FARGATE launch_type as + # the final fallback. + cluster = EcsHook().conn.describe_clusters(clusters=[task_kwargs["cluster"]])["clusters"][0] + if not cluster.get("defaultCapacityProviderStrategy"): + task_kwargs["launch_type"] = "FARGATE" # There can only be 1 count of these containers task_kwargs["count"] = 1 # type: ignore diff --git a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py index ca92cdfba4d2d..78c3c1bc2811e 100644 --- a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py +++ b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py @@ -1100,6 +1100,8 @@ def test_providing_both_capacity_provider_and_launch_type_fails(self, set_env_va AwsEcsExecutor() def test_providing_capacity_provider(self, set_env_vars): + # If a capacity provider strategy is supplied without a launch type, use the strategy. + valid_capacity_provider = ( "[{'capacityProvider': 'cp1', 'weight': 5}, {'capacityProvider': 'cp2', 'weight': 1}]" ) @@ -1115,3 +1117,33 @@ def test_providing_capacity_provider(self, set_env_vars): assert "launchType" not in task_kwargs assert task_kwargs["capacityProviderStrategy"] == valid_capacity_provider + + @mock.patch.object(EcsHook, "conn") + def test_providing_no_capacity_provider_no_lunch_type_with_cluster_default(self, mock_conn, set_env_vars): + # If no capacity provider strategy is supplied and no launch type, but the + # cluster has a default capacity provider strategy, use the cluster's default. + mock_conn.describe_clusters.return_value = { + "clusters": [{"defaultCapacityProviderStrategy": ["some_strategy"]}] + } + os.environ.pop(f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.LAUNCH_TYPE}".upper()) + + from airflow.providers.amazon.aws.executors.ecs import ecs_executor_config + + task_kwargs = ecs_executor_config.build_task_kwargs() + assert "launchType" not in task_kwargs + assert "capacityProviderStrategy" not in task_kwargs + assert mock_conn.describe_clusters.called_once() + + @mock.patch.object(EcsHook, "conn") + def test_providing_no_capacity_provider_no_lunch_type_no_cluster_default(self, mock_conn, set_env_vars): + # If no capacity provider strategy is supplied and no launch type, and the cluster + # does not have a default capacity provider strategy, use the FARGATE launch type. + + mock_conn.describe_clusters.return_value = {"clusters": [{"status": "ACTIVE"}]} + + os.environ.pop(f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.LAUNCH_TYPE}".upper()) + + from airflow.providers.amazon.aws.executors.ecs import ecs_executor_config + + task_kwargs = ecs_executor_config.build_task_kwargs() + assert task_kwargs["launchType"] == "FARGATE"