From e2f4cc2316bddabf3d4546d3a80ef0adf507cc34 Mon Sep 17 00:00:00 2001 From: Andrey Anshin Date: Fri, 3 May 2024 00:00:59 +0400 Subject: [PATCH] Fix deprecated calls in `cncf.kubernetes` provider --- .../executors/kubernetes_executor_utils.py | 4 +- .../kubernetes/kubernetes_helper_functions.py | 40 ++-- .../cncf/kubernetes/pod_generator.py | 20 +- .../cncf/kubernetes/template_rendering.py | 6 +- tests/deprecations_ignore.yml | 29 +-- .../executors/test_kubernetes_executor.py | 75 ++++---- .../cncf/kubernetes/operators/test_pod.py | 93 +++++---- .../operators/test_spark_kubernetes.py | 6 +- .../providers/cncf/kubernetes/test_client.py | 2 + .../test_kubernetes_helper_functions.py | 35 +++- .../cncf/kubernetes/test_pod_generator.py | 177 +++++++++--------- 11 files changed, 230 insertions(+), 257 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index db2a4c0a81135..3f544fe2feff8 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -32,7 +32,7 @@ from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import ( annotations_for_logging_task_metadata, annotations_to_key, - create_pod_id, + create_unique_id, ) from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator from airflow.utils.log.logging_mixin import LoggingMixin @@ -413,7 +413,7 @@ def run_next(self, next_job: KubernetesJobType) -> None: pod = PodGenerator.construct_pod( namespace=self.namespace, scheduler_job_id=self.scheduler_job_id, - pod_id=create_pod_id(dag_id, task_id), + pod_id=create_unique_id(dag_id, task_id), dag_id=dag_id, task_id=task_id, kube_image=self.kube_config.kube_image, diff --git a/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py b/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py index 9129bdf77920f..c2e52b1a07ed0 100644 --- a/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py +++ b/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py @@ -19,10 +19,10 @@ import logging import secrets import string -import warnings from typing import TYPE_CHECKING import pendulum +from deprecated import deprecated from slugify import slugify from airflow.compat.functools import cache @@ -59,6 +59,10 @@ def add_unique_suffix(*, name: str, rand_len: int = 8, max_len: int = POD_NAME_M return name[: max_len - len(suffix)].strip("-.") + suffix +@deprecated( + reason="This function is deprecated. Please use `add_unique_suffix`", + category=AirflowProviderDeprecationWarning, +) def add_pod_suffix(*, pod_name: str, rand_len: int = 8, max_len: int = POD_NAME_MAX_LENGTH) -> str: """Add random string to pod name while staying under max length. @@ -67,14 +71,7 @@ def add_pod_suffix(*, pod_name: str, rand_len: int = 8, max_len: int = POD_NAME_ :param max_len: maximum length of the pod name :meta private: """ - warnings.warn( - "This function is deprecated. Please use `add_unique_suffix`.", - AirflowProviderDeprecationWarning, - stacklevel=2, - ) - - suffix = "-" + rand_str(rand_len) - return pod_name[: max_len - len(suffix)].strip("-.") + suffix + return add_unique_suffix(name=pod_name, rand_len=rand_len, max_len=max_len) def create_unique_id( @@ -109,6 +106,10 @@ def create_unique_id( return base_name +@deprecated( + reason="This function is deprecated. Please use `create_unique_id`.", + category=AirflowProviderDeprecationWarning, +) def create_pod_id( dag_id: str | None = None, task_id: str | None = None, @@ -125,26 +126,7 @@ def create_pod_id( :param unique: whether a random string suffix should be added :return: A valid identifier for a kubernetes pod name """ - warnings.warn( - "This function is deprecated. Please use `create_unique_id`.", - AirflowProviderDeprecationWarning, - stacklevel=2, - ) - - if not (dag_id or task_id): - raise ValueError("Must supply either dag_id or task_id.") - name = "" - if dag_id: - name += dag_id - if task_id: - if name: - name += "-" - name += task_id - base_name = slugify(name, lowercase=True)[:max_length].strip(".-") - if unique: - return add_pod_suffix(pod_name=base_name, rand_len=8, max_len=max_length) - else: - return base_name + return create_unique_id(dag_id=dag_id, task_id=task_id, max_length=max_length, unique=unique) def annotations_to_key(annotations: dict[str, str]) -> TaskInstanceKey: diff --git a/airflow/providers/cncf/kubernetes/pod_generator.py b/airflow/providers/cncf/kubernetes/pod_generator.py index 6e9cc32441851..a04eb5aa7e20e 100644 --- a/airflow/providers/cncf/kubernetes/pod_generator.py +++ b/airflow/providers/cncf/kubernetes/pod_generator.py @@ -45,7 +45,6 @@ ) from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import ( POD_NAME_MAX_LENGTH, - add_pod_suffix, add_unique_suffix, rand_str, ) @@ -156,12 +155,15 @@ def __init__( # Attach sidecar self.extract_xcom = extract_xcom - @deprecated(reason="This function is deprecated.", category=AirflowProviderDeprecationWarning) + @deprecated( + reason="This method is deprecated and will be removed in the future releases", + category=AirflowProviderDeprecationWarning, + ) def gen_pod(self) -> k8s.V1Pod: """Generate pod.""" result = self.ud_pod - result.metadata.name = add_pod_suffix(pod_name=result.metadata.name) + result.metadata.name = add_unique_suffix(name=result.metadata.name) if self.extract_xcom: result = self.add_xcom_sidecar(result) @@ -210,8 +212,8 @@ def from_obj(obj) -> dict | k8s.V1Pod | None: return k8s_object elif isinstance(k8s_legacy_object, dict): warnings.warn( - "Using a dictionary for the executor_config is deprecated and will soon be removed." - 'please use a `kubernetes.client.models.V1Pod` class with a "pod_override" key' + "Using a dictionary for the executor_config is deprecated and will soon be removed. " + 'Please use a `kubernetes.client.models.V1Pod` class with a "pod_override" key' " instead. ", category=AirflowProviderDeprecationWarning, stacklevel=2, @@ -575,7 +577,7 @@ def deserialize_model_dict(pod_dict: dict | None) -> k8s.V1Pod: @staticmethod @deprecated( - reason="This function is deprecated. Use `add_pod_suffix` in `kubernetes_helper_functions`.", + reason="This method is deprecated. Use `add_pod_suffix` in `kubernetes_helper_functions`.", category=AirflowProviderDeprecationWarning, ) def make_unique_pod_id(pod_id: str) -> str | None: @@ -595,12 +597,6 @@ def make_unique_pod_id(pod_id: str) -> str | None: :param pod_id: requested pod name :return: ``str`` valid Pod name of appropriate length """ - warnings.warn( - "This function is deprecated. Use `add_pod_suffix` in `kubernetes_helper_functions`.", - AirflowProviderDeprecationWarning, - stacklevel=2, - ) - if not pod_id: return None diff --git a/airflow/providers/cncf/kubernetes/template_rendering.py b/airflow/providers/cncf/kubernetes/template_rendering.py index c259a7258f792..c499dad24038c 100644 --- a/airflow/providers/cncf/kubernetes/template_rendering.py +++ b/airflow/providers/cncf/kubernetes/template_rendering.py @@ -24,9 +24,7 @@ from airflow.exceptions import AirflowException from airflow.providers.cncf.kubernetes.kube_config import KubeConfig -from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import ( - create_pod_id, -) +from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_unique_id from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator from airflow.utils.session import NEW_SESSION, provide_session @@ -43,7 +41,7 @@ def render_k8s_pod_yaml(task_instance: TaskInstance) -> dict | None: task_id=task_instance.task_id, map_index=task_instance.map_index, date=None, - pod_id=create_pod_id(task_instance.dag_id, task_instance.task_id), + pod_id=create_unique_id(task_instance.dag_id, task_instance.task_id), try_number=task_instance.try_number, kube_image=kube_config.kube_image, args=task_instance.command_as_list(), diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml index acbdf6c06b1a9..47eb89510439d 100644 --- a/tests/deprecations_ignore.yml +++ b/tests/deprecations_ignore.yml @@ -54,6 +54,7 @@ # CLI +# https://github.com/apache/airflow/issues/39199 - tests/cli/commands/test_kubernetes_command.py::TestGenerateDagYamlCommand::test_generate_dag_yaml @@ -456,34 +457,6 @@ - tests/providers/atlassian/jira/operators/test_jira.py::TestJiraOperator::test_project_issue_count - tests/providers/atlassian/jira/operators/test_jira.py::TestJiraOperator::test_update_issue - tests/providers/atlassian/jira/sensors/test_jira.py::TestJiraSensor::test_issue_label_set -- tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py::TestAirflowKubernetesScheduler::test_create_pod_id -- tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_pod_template_file_override_in_executor_config -- tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_run_next_exception_requeue -- tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_run_next_pmh_error -- tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_run_next_pod_reconciliation_error -- tests/providers/cncf/kubernetes/operators/test_pod.py::TestKubernetesPodOperator::test_execute_async_callbacks -- tests/providers/cncf/kubernetes/operators/test_pod.py::TestKubernetesPodOperator::test_mark_checked_if_not_deleted -- tests/providers/cncf/kubernetes/operators/test_pod.py::TestKubernetesPodOperator::test_pod_delete_after_await_container_error -- tests/providers/cncf/kubernetes/operators/test_pod.py::TestKubernetesPodOperatorAsync::test_async_create_pod_should_throw_exception -- tests/providers/cncf/kubernetes/operators/test_pod.py::TestKubernetesPodOperatorAsync::test_async_create_pod_with_skip_on_exit_code_should_skip -- tests/providers/cncf/kubernetes/operators/test_pod.py::TestKubernetesPodOperatorAsync::test_async_create_pod_xcom_push_should_execute_successfully -- tests/providers/cncf/kubernetes/operators/test_pod.py::TestKubernetesPodOperatorAsync::test_async_get_logs_should_execute_successfully -- tests/providers/cncf/kubernetes/operators/test_pod.py::TestKubernetesPodOperatorAsync::test_async_write_logs_should_execute_successfully -- tests/providers/cncf/kubernetes/operators/test_pod.py::test_async_kpo_wait_termination_before_cleanup_on_failure -- tests/providers/cncf/kubernetes/operators/test_pod.py::test_async_kpo_wait_termination_before_cleanup_on_success -- tests/providers/cncf/kubernetes/operators/test_pod.py::test_async_skip_kpo_wait_termination_with_timeout_event -- tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py::TestSparkKubernetesOperator::test_create_application_from_yaml_json -- tests/providers/cncf/kubernetes/test_kubernetes_helper_functions.py::TestCreatePodId::test_create_pod_id -- tests/providers/cncf/kubernetes/test_kubernetes_helper_functions.py::TestCreatePodId::test_create_pod_id_dag_and_task -- tests/providers/cncf/kubernetes/test_kubernetes_helper_functions.py::TestCreatePodId::test_create_pod_id_dag_only -- tests/providers/cncf/kubernetes/test_kubernetes_helper_functions.py::TestCreatePodId::test_create_pod_id_dag_too_long_non_unique -- tests/providers/cncf/kubernetes/test_kubernetes_helper_functions.py::TestCreatePodId::test_create_pod_id_dag_too_long_with_suffix -- tests/providers/cncf/kubernetes/test_kubernetes_helper_functions.py::TestCreatePodId::test_create_pod_id_task_only -- tests/providers/cncf/kubernetes/test_pod_generator.py::TestPodGenerator::test_from_obj -- tests/providers/cncf/kubernetes/test_pod_generator.py::TestPodGenerator::test_gen_pod_extract_xcom -- tests/providers/cncf/kubernetes/test_pod_generator.py::TestPodGenerator::test_pod_name_confirm_to_max_length -- tests/providers/cncf/kubernetes/test_pod_generator.py::TestPodGenerator::test_pod_name_is_valid -- tests/providers/cncf/kubernetes/test_template_rendering.py::test_render_k8s_pod_yaml - tests/providers/common/sql/hooks/test_dbapi.py::TestDbApiHook::test_insert_rows_executemany - tests/providers/common/sql/hooks/test_dbapi.py::TestDbApiHook::test_insert_rows_replace_executemany_hana_dialect - tests/providers/common/sql/hooks/test_dbapi.py::TestDbApiHook::test_instance_check_works_for_legacy_db_api_hook diff --git a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py index caf4bb1029f1b..9dffa3f778cca 100644 --- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -28,40 +28,36 @@ from kubernetes.client.rest import ApiException from urllib3 import HTTPResponse -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.models.taskinstancekey import TaskInstanceKey from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator +from airflow.providers.cncf.kubernetes import pod_generator +from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import ( + KubernetesExecutor, + PodReconciliationError, +) +from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import ( + ADOPTED, + POD_EXECUTOR_DONE_KEY, +) +from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import ( + AirflowKubernetesScheduler, + KubernetesJobWatcher, + ResourceVersion, + get_base_pod_from_template, +) +from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import ( + annotations_for_logging_task_metadata, + annotations_to_key, + create_unique_id, + get_logs_task_metadata, +) +from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator from airflow.utils import timezone from airflow.utils.state import State, TaskInstanceState from tests.test_utils.config import conf_vars -try: - from airflow.providers.cncf.kubernetes import pod_generator - from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import ( - KubernetesExecutor, - PodReconciliationError, - ) - from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import ( - ADOPTED, - POD_EXECUTOR_DONE_KEY, - ) - from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import ( - AirflowKubernetesScheduler, - KubernetesJobWatcher, - ResourceVersion, - create_pod_id, - get_base_pod_from_template, - ) - from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import ( - annotations_for_logging_task_metadata, - annotations_to_key, - get_logs_task_metadata, - ) - from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator -except ImportError: - AirflowKubernetesScheduler = None # type: ignore - class TestAirflowKubernetesScheduler: @staticmethod @@ -100,17 +96,12 @@ def _is_safe_label_value(value): regex = r"^[^a-z0-9A-Z]*|[^a-zA-Z0-9_\-\.]|[^a-z0-9A-Z]*$" return len(value) <= 63 and re.match(regex, value) - @pytest.mark.skipif( - AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" - ) def test_create_pod_id(self): for dag_id, task_id in self._cases(): - pod_name = PodGenerator.make_unique_pod_id(create_pod_id(dag_id, task_id)) - assert self._is_valid_pod_id(pod_name) + with pytest.warns(AirflowProviderDeprecationWarning, match=r"deprecated\. Use `add_pod_suffix`"): + pod_name = PodGenerator.make_unique_pod_id(create_unique_id(dag_id, task_id)) + assert self._is_valid_pod_id(pod_name), f"dag_id={dag_id!r}, task_id={task_id!r}" - @pytest.mark.skipif( - AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" - ) @mock.patch("airflow.providers.cncf.kubernetes.pod_generator.PodGenerator") @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubeConfig") def test_get_base_pod_from_template(self, mock_kubeconfig, mock_generator, data_file): @@ -144,14 +135,16 @@ def test_get_base_pod_from_template(self, mock_kubeconfig, mock_generator, data_ def test_make_safe_label_value(self): for dag_id, task_id in self._cases(): + case = f"dag_id={dag_id!r}, task_id={task_id!r}" safe_dag_id = pod_generator.make_safe_label_value(dag_id) - assert self._is_safe_label_value(safe_dag_id) + assert self._is_safe_label_value(safe_dag_id), case safe_task_id = pod_generator.make_safe_label_value(task_id) - assert self._is_safe_label_value(safe_task_id) - dag_id = "my_dag_id" - assert dag_id == pod_generator.make_safe_label_value(dag_id) - dag_id = "my_dag_id_" + "a" * 64 - assert "my_dag_id_" + "a" * 43 + "-0ce114c45" == pod_generator.make_safe_label_value(dag_id) + assert self._is_safe_label_value(safe_task_id), case + + dag_id = "my_dag_id" + assert dag_id == pod_generator.make_safe_label_value(dag_id) + dag_id = "my_dag_id_" + "a" * 64 + assert "my_dag_id_" + "a" * 43 + "-0ce114c45" == pod_generator.make_safe_label_value(dag_id) def test_execution_date_serialize_deserialize(self): datetime_obj = datetime.now() diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index f9277bab01ca1..4a08e736f6f58 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -28,7 +28,12 @@ from kubernetes.client.rest import ApiException from urllib3 import HTTPResponse -from airflow.exceptions import AirflowException, AirflowSkipException, TaskDeferred +from airflow.exceptions import ( + AirflowException, + AirflowProviderDeprecationWarning, + AirflowSkipException, + TaskDeferred, +) from airflow.models import DAG, DagModel, DagRun, TaskInstance from airflow.models.xcom import XCom from airflow.providers.cncf.kubernetes import pod_generator @@ -39,7 +44,7 @@ ) from airflow.providers.cncf.kubernetes.secret import Secret from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger -from airflow.providers.cncf.kubernetes.utils.pod_manager import PodLoggingStatus, PodPhase +from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction, PodLoggingStatus, PodPhase from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults from airflow.utils import timezone from airflow.utils.session import create_session @@ -763,12 +768,10 @@ def test_pod_with_istio_delete_after_await_container_error( @pytest.mark.parametrize( "task_kwargs, should_be_deleted", [ - ({}, True), # default values - ({"is_delete_operator_pod": True}, True), # check b/c of is_delete_operator_pod - ({"is_delete_operator_pod": False}, False), # check b/c of is_delete_operator_pod - ({"on_finish_action": "delete_pod"}, True), - ({"on_finish_action": "delete_succeeded_pod"}, False), - ({"on_finish_action": "keep_pod"}, False), + pytest.param({}, True, id="default"), # default values + pytest.param({"on_finish_action": "delete_pod"}, True, id="delete-pod"), + pytest.param({"on_finish_action": "delete_succeeded_pod"}, False, id="delete-succeeded-pod"), + pytest.param({"on_finish_action": "keep_pod"}, False, id="keep-pod"), ], ) @patch(f"{POD_MANAGER_CLASS}.delete_pod") @@ -1295,31 +1298,36 @@ def test_wait_for_xcom_sidecar_iff_push_xcom(self, mock_await, mock_extract_xcom else: mock_await.assert_not_called() + @pytest.mark.parametrize( + "on_finish_action", + # Regardless what we provide in `on_finish_action` + # it doesn't take any affect if `is_delete_operator_pod` provided. + [*sorted(OnFinishAction.__members__.values()), None], + ) + @pytest.mark.parametrize( + "is_delete_operator_pod, expected_on_finish_action", + [ + pytest.param(True, "delete_pod", id="delete-operator-pod"), + pytest.param(False, "keep_pod", id="keep-operator-pod"), + ], + ) + def test_deprecated_is_delete_operator_pod( + self, is_delete_operator_pod, expected_on_finish_action, on_finish_action + ): + with pytest.warns(AirflowProviderDeprecationWarning, match="please use `on_finish_action`"): + op = KubernetesPodOperator( + task_id="task", + is_delete_operator_pod=is_delete_operator_pod, + on_finish_action=on_finish_action, + ) + assert op.is_delete_operator_pod == is_delete_operator_pod + assert op.on_finish_action == expected_on_finish_action + @pytest.mark.parametrize( "task_kwargs, should_fail, should_be_deleted", [ ({}, False, True), ({}, True, True), - ( - {"is_delete_operator_pod": True, "on_finish_action": "keep_pod"}, - False, - True, - ), # check backcompat of is_delete_operator_pod - ( - {"is_delete_operator_pod": True, "on_finish_action": "keep_pod"}, - True, - True, - ), # check b/c of is_delete_operator_pod - ( - {"is_delete_operator_pod": False, "on_finish_action": "delete_pod"}, - False, - False, - ), # check b/c of is_delete_operator_pod - ( - {"is_delete_operator_pod": False, "on_finish_action": "delete_pod"}, - True, - False, - ), # check b/c of is_delete_operator_pod ({"on_finish_action": "keep_pod"}, False, False), ({"on_finish_action": "keep_pod"}, True, False), ({"on_finish_action": "delete_pod"}, False, True), @@ -1586,7 +1594,7 @@ def test_execute_async_callbacks(self): do_xcom_push=False, callbacks=MockKubernetesPodOperatorCallback, ) - k.execute_complete( + k.trigger_reentry( context=create_context(k), event={ "status": "success", @@ -1765,7 +1773,7 @@ def run_pod_async(self, operator: KubernetesPodOperator, map_index: int = -1): remote_pod_mock.metadata.namespace = TEST_NAMESPACE self.await_pod_mock.return_value = remote_pod_mock - operator.execute_complete( + operator.trigger_reentry( context=context, event={ "status": "success", @@ -1843,7 +1851,7 @@ def test_async_create_pod_should_throw_exception(self, mocked_hook, mocked_clean ) with pytest.raises(AirflowException): - k.execute_complete( + k.trigger_reentry( context=None, event={ "status": "error", @@ -1925,9 +1933,9 @@ def test_async_create_pod_with_skip_on_exit_code_should_skip( if expected_exc: with pytest.raises(expected_exc): - k.execute_complete(context=context, event=event) + k.trigger_reentry(context=context, event=event) else: - k.execute_complete(context=context, event=event) + k.trigger_reentry(context=context, event=event) @pytest.mark.parametrize("do_xcom_push", [True, False]) @patch(KUB_OP_PATH.format("post_complete_action")) @@ -1948,7 +1956,7 @@ def test_async_create_pod_xcom_push_should_execute_successfully( context = create_context(k) context["ti"] = MagicMock() - k.execute_complete( + k.trigger_reentry( context=context, event={ "status": "success", @@ -2007,7 +2015,7 @@ def test_async_get_logs_should_execute_successfully( context = create_context(k) context["ti"] = MagicMock() - k.execute_complete( + k.trigger_reentry( context=context, event={ "status": "success", @@ -2120,6 +2128,15 @@ def test_trigger_error(self, find_pod, cleanup): }, ) + def test_deprecated_execute_complete(self): + fake_context = mock.sentinel.context + fake_event = mock.sentinel.event + with mock.patch.object(KubernetesPodOperator, "trigger_reentry") as mocked_trigger_reentry: + op = KubernetesPodOperator(task_id="test-task") + with pytest.warns(AirflowProviderDeprecationWarning, match="use `trigger_reentry` instead"): + op.execute_complete(fake_context, fake_event) + mocked_trigger_reentry.assert_called_once_with(context=fake_context, event=fake_event) + @pytest.mark.parametrize("do_xcom_push", [True, False]) @patch(KUB_OP_PATH.format("extract_xcom")) @@ -2147,7 +2164,7 @@ def test_async_kpo_wait_termination_before_cleanup_on_success( } k = KubernetesPodOperator(task_id="task", deferrable=True, do_xcom_push=do_xcom_push) - k.execute_complete({}, success_event) + k.trigger_reentry({}, success_event) # check if it gets the pod mocked_hook.return_value.get_pod.assert_called_once_with(TEST_NAME, TEST_NAMESPACE) @@ -2192,7 +2209,7 @@ def test_async_kpo_wait_termination_before_cleanup_on_failure( k = KubernetesPodOperator(task_id="task", deferrable=True, do_xcom_push=do_xcom_push) with pytest.raises(AirflowException): - k.execute_complete({"ti": ti_mock}, success_event) + k.trigger_reentry({"ti": ti_mock}, success_event) # check if it gets the pod mocked_hook.return_value.get_pod.assert_called_once_with(TEST_NAME, TEST_NAMESPACE) @@ -2238,7 +2255,7 @@ def test_async_skip_kpo_wait_termination_with_timeout_event(mock_manager, mocked # assert that the AirflowException is raised when the timeout event is present with pytest.raises(AirflowException): - k.execute_complete({"ti": ti_mock}, event) + k.trigger_reentry({"ti": ti_mock}, event) # assert that the await_pod_completion is not called mock_manager.await_pod_completion.assert_not_called() diff --git a/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py b/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py index 878a19e7992fd..eefb6347593ad 100644 --- a/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py +++ b/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py @@ -192,10 +192,8 @@ def create_context(task): @patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_start") @patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.create_pod") @patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator.client") -@patch( - "airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator.create_job_name" -) # , return_value='default') -@patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.cleanup") +@patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator.create_job_name") +@patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup") @patch("kubernetes.client.api.custom_objects_api.CustomObjectsApi.get_namespaced_custom_object_status") @patch("kubernetes.client.api.custom_objects_api.CustomObjectsApi.create_namespaced_custom_object") class TestSparkKubernetesOperator: diff --git a/tests/providers/cncf/kubernetes/test_client.py b/tests/providers/cncf/kubernetes/test_client.py index 740b34e2148eb..836bdefdac407 100644 --- a/tests/providers/cncf/kubernetes/test_client.py +++ b/tests/providers/cncf/kubernetes/test_client.py @@ -19,6 +19,7 @@ import socket from unittest import mock +import pytest from kubernetes.client import Configuration from urllib3.connection import HTTPConnection, HTTPSConnection @@ -61,6 +62,7 @@ def test_load_config_ssl_ca_cert(self, conf, config): conf.get.assert_called_with("kubernetes_executor", "ssl_ca_cert") assert client.api_client.configuration.ssl_ca_cert == "/path/to/ca.crt" + @pytest.mark.platform("linux") def test_enable_tcp_keepalive(self): socket_options = [ (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), diff --git a/tests/providers/cncf/kubernetes/test_kubernetes_helper_functions.py b/tests/providers/cncf/kubernetes/test_kubernetes_helper_functions.py index 436be7e96c05f..bfe4f98f3e148 100644 --- a/tests/providers/cncf/kubernetes/test_kubernetes_helper_functions.py +++ b/tests/providers/cncf/kubernetes/test_kubernetes_helper_functions.py @@ -18,15 +18,17 @@ from __future__ import annotations import re +from unittest import mock import pytest -from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_pod_id +from airflow.exceptions import AirflowProviderDeprecationWarning +from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_pod_id, create_unique_id pod_name_regex = r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$" -class TestCreatePodId: +class TestCreateUniqueId: @pytest.mark.parametrize( "val, expected", [ @@ -40,7 +42,7 @@ class TestCreatePodId: ], ) def test_create_pod_id_task_only(self, val, expected): - actual = create_pod_id(task_id=val, unique=False) + actual = create_unique_id(task_id=val, unique=False) assert actual == expected assert re.match(pod_name_regex, actual) @@ -57,7 +59,7 @@ def test_create_pod_id_task_only(self, val, expected): ], ) def test_create_pod_id_dag_only(self, val, expected): - actual = create_pod_id(dag_id=val, unique=False) + actual = create_unique_id(dag_id=val, unique=False) assert actual == expected assert re.match(pod_name_regex, actual) @@ -74,18 +76,18 @@ def test_create_pod_id_dag_only(self, val, expected): ], ) def test_create_pod_id_dag_and_task(self, dag_id, task_id, expected): - actual = create_pod_id(dag_id=dag_id, task_id=task_id, unique=False) + actual = create_unique_id(dag_id=dag_id, task_id=task_id, unique=False) assert actual == expected assert re.match(pod_name_regex, actual) def test_create_pod_id_dag_too_long_with_suffix(self): - actual = create_pod_id("0" * 254) + actual = create_unique_id("0" * 254) assert len(actual) == 63 assert re.match(r"0{54}-[a-z0-9]{8}", actual) assert re.match(pod_name_regex, actual) def test_create_pod_id_dag_too_long_non_unique(self): - actual = create_pod_id("0" * 254, unique=False) + actual = create_unique_id("0" * 254, unique=False) assert len(actual) == 63 assert re.match(r"0{63}", actual) assert re.match(pod_name_regex, actual) @@ -96,7 +98,7 @@ def test_create_pod_id(self, length, unique): """Test behavior of max_length and unique.""" dag_id = "dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-" task_id = "task-task-task-task-task-task-task-task-task-task-task-task-task-task-task-task-task-" - actual = create_pod_id( + actual = create_unique_id( dag_id=dag_id, task_id=task_id, max_length=length, @@ -108,3 +110,20 @@ def test_create_pod_id(self, length, unique): assert re.match(r"-[a-z0-9]{8}", actual[-9:]) else: assert actual == base[:length] + + @pytest.mark.parametrize("dag_id", ["fake-dag", None]) + @pytest.mark.parametrize("task_id", ["fake-task", None]) + @pytest.mark.parametrize("max_length", [10, 42, None]) + @pytest.mark.parametrize("unique", [True, False]) + def test_back_compat_create_pod_id(self, dag_id, task_id, max_length, unique): + with mock.patch( + "airflow.providers.cncf.kubernetes.kubernetes_helper_functions.create_unique_id" + ) as mocked_create_unique_id: + with pytest.warns( + AirflowProviderDeprecationWarning, match=r"deprecated. Please use `create_unique_id`" + ): + create_pod_id(dag_id, task_id, max_length=max_length, unique=unique) + + mocked_create_unique_id.assert_called_once_with( + dag_id=dag_id, task_id=task_id, max_length=max_length, unique=unique + ) diff --git a/tests/providers/cncf/kubernetes/test_pod_generator.py b/tests/providers/cncf/kubernetes/test_pod_generator.py index 1a612d23edb4c..28c4323cbceb7 100644 --- a/tests/providers/cncf/kubernetes/test_pod_generator.py +++ b/tests/providers/cncf/kubernetes/test_pod_generator.py @@ -26,7 +26,7 @@ from kubernetes.client import ApiClient, models as k8s from airflow import __version__ -from airflow.exceptions import AirflowConfigException +from airflow.exceptions import AirflowConfigException, AirflowProviderDeprecationWarning from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import PodReconciliationError from airflow.providers.cncf.kubernetes.pod_generator import ( PodDefaultsDeprecated, @@ -170,7 +170,8 @@ def test_gen_pod_extract_xcom(self, mock_rand_str, data_file): template_file = data_file("pods/generator_base_with_secrets.yaml").as_posix() pod_generator = PodGenerator(pod_template_file=template_file, extract_xcom=True) - result = pod_generator.gen_pod() + with pytest.warns(AirflowProviderDeprecationWarning, match="deprecated and will be removed"): + result = pod_generator.gen_pod() container_two = { "name": "airflow-xcom-sidecar", "image": "alpine", @@ -194,37 +195,34 @@ def test_gen_pod_extract_xcom(self, mock_rand_str, data_file): expected_dict = self.k8s_client.sanitize_for_serialization(self.expected) assert result_dict == expected_dict - def test_from_obj(self): - result = PodGenerator.from_obj( - { - "pod_override": k8s.V1Pod( - api_version="v1", - kind="Pod", - metadata=k8s.V1ObjectMeta(name="foo", annotations={"test": "annotation"}), - spec=k8s.V1PodSpec( - containers=[ - k8s.V1Container( - name="base", - volume_mounts=[ - k8s.V1VolumeMount( - mount_path="/foo/", name="example-kubernetes-test-volume" - ) - ], - ) - ], - volumes=[ - k8s.V1Volume( - name="example-kubernetes-test-volume", - host_path=k8s.V1HostPathVolumeSource(path="/tmp/"), - ) - ], - ), - ) - } - ) - result = self.k8s_client.sanitize_for_serialization(result) + def test_from_obj_pod_override_object(self): + obj = { + "pod_override": k8s.V1Pod( + api_version="v1", + kind="Pod", + metadata=k8s.V1ObjectMeta(name="foo", annotations={"test": "annotation"}), + spec=k8s.V1PodSpec( + containers=[ + k8s.V1Container( + name="base", + volume_mounts=[ + k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume") + ], + ) + ], + volumes=[ + k8s.V1Volume( + name="example-kubernetes-test-volume", + host_path=k8s.V1HostPathVolumeSource(path="/tmp/"), + ) + ], + ), + ) + } + + result = PodGenerator.from_obj(obj) - assert { + assert self.k8s_client.sanitize_for_serialization(result) == { "apiVersion": "v1", "kind": "Pod", "metadata": { @@ -240,67 +238,33 @@ def test_from_obj(self): ], "volumes": [{"hostPath": {"path": "/tmp/"}, "name": "example-kubernetes-test-volume"}], }, - } == result - result = PodGenerator.from_obj( - { - "KubernetesExecutor": { - "annotations": {"test": "annotation"}, - "volumes": [ - { - "name": "example-kubernetes-test-volume", - "hostPath": {"path": "/tmp/"}, - }, - ], - "volume_mounts": [ - { - "mountPath": "/foo/", - "name": "example-kubernetes-test-volume", - }, - ], - } - } - ) - - result_from_pod = PodGenerator.from_obj( - { - "pod_override": k8s.V1Pod( - metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}), - spec=k8s.V1PodSpec( - containers=[ - k8s.V1Container( - name="base", - volume_mounts=[ - k8s.V1VolumeMount( - name="example-kubernetes-test-volume", mount_path="/foo/" - ) - ], - ) - ], - volumes=[k8s.V1Volume(name="example-kubernetes-test-volume", host_path="/tmp/")], - ), - ) - } - ) + } - result = self.k8s_client.sanitize_for_serialization(result) - result_from_pod = self.k8s_client.sanitize_for_serialization(result_from_pod) - expected_from_pod = { - "metadata": {"annotations": {"test": "annotation"}}, - "spec": { - "containers": [ + def test_from_obj_legacy(self): + obj = { + "KubernetesExecutor": { + "annotations": {"test": "annotation"}, + "volumes": [ { - "name": "base", - "volumeMounts": [{"mountPath": "/foo/", "name": "example-kubernetes-test-volume"}], - } + "name": "example-kubernetes-test-volume", + "hostPath": {"path": "/tmp/"}, + }, ], - "volumes": [{"hostPath": "/tmp/", "name": "example-kubernetes-test-volume"}], - }, + "volume_mounts": [ + { + "mountPath": "/foo/", + "name": "example-kubernetes-test-volume", + }, + ], + } } - assert ( - result_from_pod == expected_from_pod - ), "There was a discrepancy between KubernetesExecutor and pod_override" + with pytest.warns( + AirflowProviderDeprecationWarning, + match="Using a dictionary for the executor_config is deprecated and will soon be removed", + ): + result = PodGenerator.from_obj(obj) - assert { + assert self.k8s_client.sanitize_for_serialization(result) == { "apiVersion": "v1", "kind": "Pod", "metadata": { @@ -322,7 +286,32 @@ def test_from_obj(self): "imagePullSecrets": [], "volumes": [{"hostPath": {"path": "/tmp/"}, "name": "example-kubernetes-test-volume"}], }, - } == result + } + + def test_from_obj_both(self): + obj = { + "pod_override": k8s.V1Pod( + api_version="v1", + kind="Pod", + ), + "KubernetesExecutor": { + "annotations": {"test": "annotation"}, + "volumes": [ + { + "name": "example-kubernetes-test-volume", + "hostPath": {"path": "/tmp/"}, + }, + ], + "volume_mounts": [ + { + "mountPath": "/foo/", + "name": "example-kubernetes-test-volume", + }, + ], + }, + } + with pytest.raises(AirflowConfigException, match="Can not have both a legacy and new"): + PodGenerator.from_obj(obj) def test_reconcile_pods_empty_mutator_pod(self, data_file): template_file = data_file("pods/generator_base_with_secrets.yaml").as_posix() @@ -736,7 +725,10 @@ def test_deserialize_non_existent_model_file(self, caplog, tmp_path): ), ) def test_pod_name_confirm_to_max_length(self, input): - actual = PodGenerator.make_unique_pod_id(input) + with pytest.warns( + AirflowProviderDeprecationWarning, match="Use `add_pod_suffix` in `kubernetes_helper_functions`" + ): + actual = PodGenerator.make_unique_pod_id(input) assert len(actual) <= 100 actual_base, actual_suffix = actual.rsplit("-", maxsplit=1) # we limit pod id length to 100 @@ -767,7 +759,10 @@ def test_pod_name_is_valid(self, pod_id, expected_starts_with): `make_unique_pod_id` doesn't actually guarantee that the regex passes for any input. But I guess this test verifies that an otherwise valid pod_id doesn't get _screwed up_. """ - actual = PodGenerator.make_unique_pod_id(pod_id) + with pytest.warns( + AirflowProviderDeprecationWarning, match="Use `add_pod_suffix` in `kubernetes_helper_functions`" + ): + actual = PodGenerator.make_unique_pod_id(pod_id) assert len(actual) <= 253 assert actual == actual.lower(), "not lowercase" # verify using official k8s regex