From a8d85f9fd1642eebe231f7848d6089da42558a57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A8=D0=BA=D0=B0=D0=B1=D0=B5=D1=80=D0=B4=D0=B0=20=D0=92?= =?UTF-8?q?=D0=B0=D0=B4=D0=B8=D0=BC=20=D0=9C=D0=B8=D0=BA=D0=BE=D0=BB=D0=B0?= =?UTF-8?q?=D0=B9=D0=BE=D0=B2=D0=B8=D1=87?= Date: Wed, 24 Apr 2024 18:48:50 +0300 Subject: [PATCH] Remove unnecessary validation from cncf provider. --- .../operators/custom_object_launcher.py | 3 +- .../operators/test_custom_object_launcher.py | 28 ++++++++++++------- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py b/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py index 439c51e3cb3d3..77d99a0fba01e 100644 --- a/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py +++ b/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py @@ -60,12 +60,11 @@ def validate(self): if self.spec.get("dynamicAllocation", {}).get("enabled"): if not all( [ - self.spec["dynamicAllocation"].get("initialExecutors"), self.spec["dynamicAllocation"].get("minExecutors"), self.spec["dynamicAllocation"].get("maxExecutors"), ] ): - raise AirflowException("Make sure initial/min/max value for dynamic allocation is passed") + raise AirflowException("Make sure min/max value for dynamic allocation is passed") def update_resources(self): if self.spec["driver"].get("container_resources"): diff --git a/tests/providers/cncf/kubernetes/operators/test_custom_object_launcher.py b/tests/providers/cncf/kubernetes/operators/test_custom_object_launcher.py index d33fdd6048f43..3a57fdefdbd8e 100644 --- a/tests/providers/cncf/kubernetes/operators/test_custom_object_launcher.py +++ b/tests/providers/cncf/kubernetes/operators/test_custom_object_launcher.py @@ -64,6 +64,22 @@ def test_spark_job_spec_dynamicAllocation_enabled(self): assert spark_job_spec.spec["dynamicAllocation"]["enabled"] + def test_spark_job_spec_dynamicAllocation_enabled_with_default_initial_executors(self): + entries = { + "spec": { + "dynamicAllocation": { + "enabled": True, + "minExecutors": 1, + "maxExecutors": 2, + }, + "driver": {}, + "executor": {}, + } + } + spark_job_spec = SparkJobSpec(**entries) + + assert spark_job_spec.spec["dynamicAllocation"]["enabled"] + def test_spark_job_spec_dynamicAllocation_enabled_with_invalid_config(self): entries = { "spec": { @@ -78,19 +94,11 @@ def test_spark_job_spec_dynamicAllocation_enabled_with_invalid_config(self): } } - cloned_entries = entries.copy() - cloned_entries["spec"]["dynamicAllocation"]["initialExecutors"] = None - with pytest.raises( - AirflowException, - match="Make sure initial/min/max value for dynamic allocation is passed", - ): - SparkJobSpec(**cloned_entries) - cloned_entries = entries.copy() cloned_entries["spec"]["dynamicAllocation"]["minExecutors"] = None with pytest.raises( AirflowException, - match="Make sure initial/min/max value for dynamic allocation is passed", + match="Make sure min/max value for dynamic allocation is passed", ): SparkJobSpec(**cloned_entries) @@ -98,7 +106,7 @@ def test_spark_job_spec_dynamicAllocation_enabled_with_invalid_config(self): cloned_entries["spec"]["dynamicAllocation"]["maxExecutors"] = None with pytest.raises( AirflowException, - match="Make sure initial/min/max value for dynamic allocation is passed", + match="Make sure min/max value for dynamic allocation is passed", ): SparkJobSpec(**cloned_entries)