From 0a13589a76645ab6d8c7b6832f1dae2c1a056951 Mon Sep 17 00:00:00 2001 From: insomnes Date: Wed, 19 Feb 2025 23:48:55 +0100 Subject: [PATCH 01/21] Start for @task.kuberenetes_cmd --- providers/cncf/kubernetes/provider.yaml | 2 + .../kubernetes/decorators/kubernetes_cmd.py | 116 ++++++ .../cncf/kubernetes/get_provider_info.py | 6 +- .../decorators/test_kubernetes_cmd.py | 370 ++++++++++++++++++ 4 files changed, 493 insertions(+), 1 deletion(-) create mode 100644 providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py create mode 100644 providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py diff --git a/providers/cncf/kubernetes/provider.yaml b/providers/cncf/kubernetes/provider.yaml index 095f6b0c10ad4..6c2d51fe20a7e 100644 --- a/providers/cncf/kubernetes/provider.yaml +++ b/providers/cncf/kubernetes/provider.yaml @@ -147,6 +147,8 @@ connection-types: task-decorators: - class-name: airflow.providers.cncf.kubernetes.decorators.kubernetes.kubernetes_task name: kubernetes + - class-name: airflow.providers.cncf.kubernetes.decorators.kubernetes_cmd.kubernetes_task_cmd + name: kubernetes_cmd config: local_kubernetes_executor: diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py new file mode 100644 index 0000000000000..5480eb7cd0e9d --- /dev/null +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py @@ -0,0 +1,116 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +from collections.abc import Sequence +from typing import TYPE_CHECKING, Callable + +from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +logger = logging.getLogger(__name__) + + +class _KubernetesCmdDecoratedOperator(DecoratedOperator, KubernetesPodOperator): + custom_operator_name = "@task.kubernetes_cmd" + + # `cmds` and `arguments` are used internally by the operator + template_fields: Sequence[str] = tuple( + {"op_args", "op_kwargs", *KubernetesPodOperator.template_fields} - {"cmds", "arguments"} + ) + + # Since we won't mutate the arguments, we should just do the shallow copy + # there are some cases we can't deepcopy the objects (e.g protobuf). + shallow_copy_attrs: Sequence[str] = ("python_callable",) + + def __init__(self, namespace: str | None = None, args_only: bool = False, **kwargs) -> None: + self.args_only = args_only + + # If the name was not provided, we generate operator name from the python_callable + # we also instruct operator to add a random suffix to avoid collisions by default + op_name = kwargs.pop("name", f"k8s-airflow-pod-{kwargs['python_callable'].__name__}") + random_name_suffix = kwargs.pop("random_name_suffix", True) + if kwargs.pop("cmds", None) is not None: + logger.warning( + "The 'cmds' and 'arguments' are unused in @task.kubernetes_cmd decorator. " + "You should return a list of commands or default cmd arguments with " + "args_only=True from the python_callable." + ) + + super().__init__( + namespace=namespace, + name=op_name, + random_name_suffix=random_name_suffix, + cmds=["placeholder-command"], + arguments=["placeholder-argument"], + **kwargs, + ) + + def _generate_cmds(self) -> list[str]: + generated_cmds = self.python_callable(*self.op_args, **self.op_kwargs) + func_name = self.python_callable.__name__ + if not isinstance(generated_cmds, list): + raise TypeError( + f"Expected python_callable to return a list of strings, but got {type(generated_cmds)}" + ) + if not all(isinstance(cmd, str) for cmd in generated_cmds): + raise TypeError(f"Expected {func_name} to return a list of strings, but got {generated_cmds}") + if not generated_cmds: + raise ValueError(f"The {func_name} returned an empty list of commands") + + return generated_cmds + + def execute(self, context: Context): + if self.args_only: + self.cmds = [] + self.arguments = self._generate_cmds() + else: + self.cmds = self._generate_cmds() + self.arguments = [] + return super().execute(context) + + +def kubernetes_task_cmd( + python_callable: Callable | None = None, + multiple_outputs: bool | None = None, + **kwargs, +) -> TaskDecorator: + """ + Kubernetes cmd operator decorator. + + This wraps a function which should return command to be executed + in K8s using KubernetesPodOperator. The function should return a list of strings. + If args_only is set to True, the function should return a list of arguments for + container default command. Also accepts any argument that KubernetesPodOperator + will via ``kwargs``. Can be reused in a single DAG. + + :param python_callable: Function to decorate + :param multiple_outputs: if set, function return value will be + unrolled to multiple XCom values. Dict will unroll to xcom values with + keys as XCom keys. Defaults to False. + """ + return task_decorator_factory( + python_callable=python_callable, + multiple_outputs=multiple_outputs, + decorated_operator_class=_KubernetesCmdDecoratedOperator, + **kwargs, + ) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py index 6426b82404e3f..9d690a1cb95cb 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py @@ -85,7 +85,11 @@ def get_provider_info(): { "class-name": "airflow.providers.cncf.kubernetes.decorators.kubernetes.kubernetes_task", "name": "kubernetes", - } + }, + { + "class-name": "airflow.providers.cncf.kubernetes.decorators.kubernetes_cmd.kubernetes_task_cmd", + "name": "kubernetes_cmd", + }, ], "config": { "local_kubernetes_executor": { diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py new file mode 100644 index 0000000000000..41a21f7846f94 --- /dev/null +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py @@ -0,0 +1,370 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +import pytest + +from airflow.decorators import setup, task, teardown +from airflow.utils import timezone + +pytestmark = pytest.mark.db_test + + +DEFAULT_DATE = timezone.datetime(2021, 9, 1) + +KPO_MODULE = "airflow.providers.cncf.kubernetes.operators.pod" +POD_MANAGER_CLASS = "airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager" +HOOK_CLASS = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesHook" +XCOM_IMAGE = "XCOM_IMAGE" + + +@pytest.fixture(autouse=True) +def mock_create_pod() -> mock.Mock: + return mock.patch(f"{POD_MANAGER_CLASS}.create_pod").start() + + +@pytest.fixture(autouse=True) +def mock_await_pod_start() -> mock.Mock: + return mock.patch(f"{POD_MANAGER_CLASS}.await_pod_start").start() + + +@pytest.fixture(autouse=True) +def await_xcom_sidecar_container_start() -> mock.Mock: + return mock.patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start").start() + + +@pytest.fixture(autouse=True) +def extract_xcom() -> mock.Mock: + f = mock.patch(f"{POD_MANAGER_CLASS}.extract_xcom").start() + f.return_value = '{"key1": "value1", "key2": "value2"}' + return f + + +@pytest.fixture(autouse=True) +def mock_await_pod_completion() -> mock.Mock: + f = mock.patch(f"{POD_MANAGER_CLASS}.await_pod_completion").start() + f.return_value = mock.MagicMock(**{"status.phase": "Succeeded"}) + return f + + +@pytest.fixture(autouse=True) +def mock_hook(): + return mock.patch(HOOK_CLASS).start() + + +# Without this patch each time pod manager would try to extract logs from the pod +# and log an error about it's inability to get containers for the log +# {pod_manager.py:572} ERROR - Could not retrieve containers for the pod: ... +@pytest.fixture(autouse=True) +def mock_fetch_logs() -> mock.Mock: + f = mock.patch(f"{POD_MANAGER_CLASS}.fetch_requested_container_logs").start() + f.return_value = "logs" + return f + + +@pytest.mark.parametrize( + "args_only", + [True, False], +) +def test_basic_kubernetes_cmd( + dag_maker, + session, + args_only: bool, + mock_create_pod: mock.Mock, + mock_hook: mock.Mock, +) -> None: + with dag_maker(session=session) as dag: + + @task.kubernetes_cmd( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + namespace="default", + args_only=args_only, + ) + def hello(): + return ["echo", "Hello world!"] + + hello() + + dr = dag_maker.create_dagrun() + (ti,) = dr.task_instances + session.add(ti) + session.commit() + dag.get_task("hello").execute(context=ti.get_template_context(session=session)) + mock_hook.assert_called_once_with( + conn_id="kubernetes_default", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + ) + assert mock_create_pod.call_count == 1 + + containers = mock_create_pod.call_args.kwargs["pod"].spec.containers + assert len(containers) == 1 + expected_command = ["echo", "Hello world!"] + expected_args = [] + if args_only: + expected_args = expected_command + expected_command = [] + assert containers[0].command == expected_command + assert containers[0].args == expected_args + + +@pytest.mark.parametrize( + "func_return, exception", + [ + ("string", TypeError), + (42, TypeError), + (None, TypeError), + (("a", "b"), TypeError), + ([], ValueError), + ], +) +def test_kubernetes_cmd_wrong_cmd( + dag_maker, + session, + func_return, + exception, + mock_create_pod: mock.Mock, + mock_hook: mock.Mock, +) -> None: + with dag_maker(session=session) as dag: + + @task.kubernetes_cmd( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + namespace="default", + ) + def hello(): + return func_return + + hello() + + dr = dag_maker.create_dagrun() + (ti,) = dr.task_instances + session.add(ti) + session.commit() + with pytest.raises(exception): + dag.get_task("hello").execute(context=ti.get_template_context(session=session)) + + +def test_kubernetes_cmd_with_input_output( + dag_maker, session, mock_create_pod: mock.Mock, mock_hook: mock.Mock +) -> None: + with dag_maker(session=session) as dag: + + @task.kubernetes_cmd( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + namespace="default", + ) + def f(arg1, arg2, kwarg1=None, kwarg2=None): + return [ + "echo", + f"arg1={arg1}", + f"arg2={arg2}", + f"kwarg1={kwarg1}", + f"kwarg2={kwarg2}", + ] + + f.override(task_id="my_task_id", do_xcom_push=True)("arg1", "arg2", kwarg1="kwarg1") + + mock_hook.return_value.get_xcom_sidecar_container_image.return_value = XCOM_IMAGE + mock_hook.return_value.get_xcom_sidecar_container_resources.return_value = { + "requests": {"cpu": "1m", "memory": "10Mi"}, + "limits": {"cpu": "1m", "memory": "50Mi"}, + } + + dr = dag_maker.create_dagrun() + (ti,) = dr.task_instances + session.add(dr) + session.commit() + dag.get_task("my_task_id").execute(context=ti.get_template_context(session=session)) + + mock_hook.assert_called_once_with( + conn_id="kubernetes_default", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + ) + assert mock_create_pod.call_count == 1 + assert mock_hook.return_value.get_xcom_sidecar_container_image.call_count == 1 + assert mock_hook.return_value.get_xcom_sidecar_container_resources.call_count == 1 + + containers = mock_create_pod.call_args.kwargs["pod"].spec.containers + + # First container is main one with command + assert len(containers) == 2 + assert containers[0].command == ["echo", "arg1=arg1", "arg2=arg2", "kwarg1=kwarg1", "kwarg2=None"] + assert len(containers[0].args) == 0 + + # Second container is xcom image + assert containers[1].image == XCOM_IMAGE + assert containers[1].volume_mounts[0].mount_path == "/airflow/xcom" + + +def test_kubernetes_with_marked_as_setup( + dag_maker, session, mock_create_pod: mock.Mock, mock_hook: mock.Mock +) -> None: + with dag_maker(session=session) as dag: + + @setup + @task.kubernetes_cmd( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + ) + def f(): + return ["echo", "Hello world!"] + + f() + + assert len(dag.task_group.children) == 1 + setup_task = dag.task_group.children["f"] + assert setup_task.is_setup + + +def test_kubernetes_with_marked_as_teardown( + dag_maker, session, mock_create_pod: mock.Mock, mock_hook: mock.Mock +) -> None: + with dag_maker(session=session) as dag: + + @teardown + @task.kubernetes_cmd( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + ) + def f(): + return ["echo", "Hello world!"] + + f() + + assert len(dag.task_group.children) == 1 + teardown_task = dag.task_group.children["f"] + assert teardown_task.is_teardown + + +@pytest.mark.parametrize( + "name", + ["no_name_in_args", None, "test_task_name"], + ids=["no_name_in_args", "name_set_to_None", "with_name"], +) +@pytest.mark.parametrize( + "random_name_suffix", + [True, False], + ids=["rand_suffix", "no_rand_suffix"], +) +def test_pod_naming( + dag_maker, + session, + mock_create_pod: mock.Mock, + name: str | None, + random_name_suffix: bool, +) -> None: + """ + Idea behind this test is to check naming conventions are respected in various + decorator arguments combinations scenarios. + + @task.kubernetes_cmd differs from KubernetesPodOperator in a way that it distinguishes + between no name argument was provided and name was set to None. + In the first case, the operator name is generated from the python_callable name, + in the second case default KubernetesPodOperator behavior is preserved. + """ + extra_kwargs = {"name": name} + if name == "no_name_in_args": + extra_kwargs.pop("name") + + with dag_maker(session=session) as dag: + + @task.kubernetes_cmd( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + random_name_suffix=random_name_suffix, + namespace="default", + **extra_kwargs, # type: ignore + ) + def task_function_name(): + return ["42"] + + task_function_name() + + dr = dag_maker.create_dagrun() + (ti,) = dr.task_instances + session.add(ti) + session.commit() + + task_id = "task_function_name" + op = dag.get_task(task_id) + if name is not None: + assert isinstance(op.name, str) + + # If name was explicitly set to None, we expect the operator name to be None + if name is None: + assert op.name is None + # If name was not provided in decorator, it would be generated: + # f"k8s-airflow-pod-{python_callable.__name__}" + elif name == "no_name_in_args": + assert op.name == f"k8s-airflow-pod-{task_id}" + # Otherwise, we expect the name to be exactly the same as provided + else: + assert op.name == name + + op.execute(context=ti.get_template_context(session=session)) + pod_meta = mock_create_pod.call_args.kwargs["pod"].metadata + assert isinstance(pod_meta.name, str) + + # After execution pod names should not contain underscores + task_id_normalized = task_id.replace("_", "-") + + def check_op_name(name_arg: str | None) -> str: + if name_arg is None: + assert op.name is None + return task_id_normalized + + assert isinstance(op.name, str) + if name_arg == "no_name_in_args": + generated_name = f"k8s-airflow-pod-{task_id_normalized}" + assert op.name == generated_name + return generated_name + + normalized_name = name_arg.replace("_", "-") + assert op.name == normalized_name + + return normalized_name + + def check_pod_name(name_base: str): + if random_name_suffix: + assert pod_meta.name.startswith(f"{name_base}") + assert pod_meta.name != name_base + else: + assert pod_meta.name == name_base + + pod_name = check_op_name(name) + check_pod_name(pod_name) From 6a00dd040c6a7949380ee38298f92c17ba222235 Mon Sep 17 00:00:00 2001 From: insomnes Date: Sat, 1 Mar 2025 13:01:57 +0100 Subject: [PATCH 02/21] Update decorators tests --- .../cncf/kubernetes/decorators/conftest.py | 70 ++++++ .../kubernetes/decorators/test_kubernetes.py | 197 +-------------- .../decorators/test_kubernetes_cmd.py | 213 ++--------------- .../decorators/test_kubernetes_commons.py | 225 ++++++++++++++++++ 4 files changed, 312 insertions(+), 393 deletions(-) create mode 100644 providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/conftest.py create mode 100644 providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/conftest.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/conftest.py new file mode 100644 index 0000000000000..2c55a11fa1ce5 --- /dev/null +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/conftest.py @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from unittest import mock + +import pytest + +KPO_MODULE = "airflow.providers.cncf.kubernetes.operators.pod" +POD_MANAGER_CLASS = "airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager" +HOOK_CLASS = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesHook" + + +@pytest.fixture(autouse=True) +def mock_create_pod() -> mock.Mock: + return mock.patch(f"{POD_MANAGER_CLASS}.create_pod").start() + + +@pytest.fixture(autouse=True) +def mock_await_pod_start() -> mock.Mock: + return mock.patch(f"{POD_MANAGER_CLASS}.await_pod_start").start() + + +@pytest.fixture(autouse=True) +def await_xcom_sidecar_container_start() -> mock.Mock: + return mock.patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start").start() + + +@pytest.fixture(autouse=True) +def extract_xcom() -> mock.Mock: + f = mock.patch(f"{POD_MANAGER_CLASS}.extract_xcom").start() + f.return_value = '{"key1": "value1", "key2": "value2"}' + return f + + +@pytest.fixture(autouse=True) +def mock_await_pod_completion() -> mock.Mock: + f = mock.patch(f"{POD_MANAGER_CLASS}.await_pod_completion").start() + f.return_value = mock.MagicMock(**{"status.phase": "Succeeded"}) + return f + + +@pytest.fixture(autouse=True) +def mock_hook(): + return mock.patch(HOOK_CLASS).start() + + +# Without this patch each time pod manager would try to extract logs from the pod +# and log an error about it's inability to get containers for the log +# {pod_manager.py:572} ERROR - Could not retrieve containers for the pod: ... +@pytest.fixture(autouse=True) +def mock_fetch_logs() -> mock.Mock: + f = mock.patch(f"{POD_MANAGER_CLASS}.fetch_requested_container_logs").start() + f.return_value = "logs" + return f diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py index 528bce1780e2a..a373a1a11c9e7 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py @@ -22,64 +22,14 @@ import pytest -from airflow.decorators import setup, task, teardown -from airflow.utils import timezone +from airflow.decorators import task pytestmark = pytest.mark.db_test -DEFAULT_DATE = timezone.datetime(2021, 9, 1) - -KPO_MODULE = "airflow.providers.cncf.kubernetes.operators.pod" -POD_MANAGER_CLASS = "airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager" -HOOK_CLASS = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesHook" XCOM_IMAGE = "XCOM_IMAGE" -@pytest.fixture(autouse=True) -def mock_create_pod() -> mock.Mock: - return mock.patch(f"{POD_MANAGER_CLASS}.create_pod").start() - - -@pytest.fixture(autouse=True) -def mock_await_pod_start() -> mock.Mock: - return mock.patch(f"{POD_MANAGER_CLASS}.await_pod_start").start() - - -@pytest.fixture(autouse=True) -def await_xcom_sidecar_container_start() -> mock.Mock: - return mock.patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start").start() - - -@pytest.fixture(autouse=True) -def extract_xcom() -> mock.Mock: - f = mock.patch(f"{POD_MANAGER_CLASS}.extract_xcom").start() - f.return_value = '{"key1": "value1", "key2": "value2"}' - return f - - -@pytest.fixture(autouse=True) -def mock_await_pod_completion() -> mock.Mock: - f = mock.patch(f"{POD_MANAGER_CLASS}.await_pod_completion").start() - f.return_value = mock.MagicMock(**{"status.phase": "Succeeded"}) - return f - - -@pytest.fixture(autouse=True) -def mock_hook(): - return mock.patch(HOOK_CLASS).start() - - -# Without this patch each time pod manager would try to extract logs from the pod -# and log an error about it's inability to get containers for the log -# {pod_manager.py:572} ERROR - Could not retrieve containers for the pod: ... -@pytest.fixture(autouse=True) -def mock_fetch_logs() -> mock.Mock: - f = mock.patch(f"{POD_MANAGER_CLASS}.fetch_requested_container_logs").start() - f.return_value = "logs" - return f - - def test_basic_kubernetes(dag_maker, session, mock_create_pod: mock.Mock, mock_hook: mock.Mock) -> None: with dag_maker(session=session) as dag: @@ -181,148 +131,3 @@ def f(arg1, arg2, kwarg1=None, kwarg2=None): # Second container is xcom image assert containers[1].image == XCOM_IMAGE assert containers[1].volume_mounts[0].mount_path == "/airflow/xcom" - - -def test_kubernetes_with_marked_as_setup( - dag_maker, session, mock_create_pod: mock.Mock, mock_hook: mock.Mock -) -> None: - with dag_maker(session=session) as dag: - - @setup - @task.kubernetes( - image="python:3.10-slim-buster", - in_cluster=False, - cluster_context="default", - config_file="/tmp/fake_file", - ) - def f(): - return {"key1": "value1", "key2": "value2"} - - f() - - assert len(dag.task_group.children) == 1 - setup_task = dag.task_group.children["f"] - assert setup_task.is_setup - - -def test_kubernetes_with_marked_as_teardown( - dag_maker, session, mock_create_pod: mock.Mock, mock_hook: mock.Mock -) -> None: - with dag_maker(session=session) as dag: - - @teardown - @task.kubernetes( - image="python:3.10-slim-buster", - in_cluster=False, - cluster_context="default", - config_file="/tmp/fake_file", - ) - def f(): - return {"key1": "value1", "key2": "value2"} - - f() - - assert len(dag.task_group.children) == 1 - teardown_task = dag.task_group.children["f"] - assert teardown_task.is_teardown - - -@pytest.mark.parametrize( - "name", - ["no_name_in_args", None, "test_task_name"], - ids=["no_name_in_args", "name_set_to_None", "with_name"], -) -@pytest.mark.parametrize( - "random_name_suffix", - [True, False], - ids=["rand_suffix", "no_rand_suffix"], -) -def test_pod_naming( - dag_maker, - session, - mock_create_pod: mock.Mock, - name: str | None, - random_name_suffix: bool, -) -> None: - """ - Idea behind this test is to check naming conventions are respected in various - decorator arguments combinations scenarios. - - @task.kubernetes differs from KubernetesPodOperator in a way that it distinguishes - between no name argument was provided and name was set to None. - In the first case, the operator name is generated from the python_callable name, - in the second case default KubernetesPodOperator behavior is preserved. - """ - extra_kwargs = {"name": name} - if name == "no_name_in_args": - extra_kwargs.pop("name") - - with dag_maker(session=session) as dag: - - @task.kubernetes( - image="python:3.10-slim-buster", - in_cluster=False, - cluster_context="default", - config_file="/tmp/fake_file", - random_name_suffix=random_name_suffix, - namespace="default", - **extra_kwargs, # type: ignore - ) - def task_function_name(): - return 42 - - task_function_name() - - dr = dag_maker.create_dagrun() - (ti,) = dr.task_instances - session.add(ti) - session.commit() - - task_id = "task_function_name" - op = dag.get_task(task_id) - if name is not None: - assert isinstance(op.name, str) - - # If name was explicitly set to None, we expect the operator name to be None - if name is None: - assert op.name is None - # If name was not provided in decorator, it would be generated: - # f"k8s-airflow-pod-{python_callable.__name__}" - elif name == "no_name_in_args": - assert op.name == f"k8s-airflow-pod-{task_id}" - # Otherwise, we expect the name to be exactly the same as provided - else: - assert op.name == name - - op.execute(context=ti.get_template_context(session=session)) - pod_meta = mock_create_pod.call_args.kwargs["pod"].metadata - assert isinstance(pod_meta.name, str) - - # After execution pod names should not contain underscores - task_id_normalized = task_id.replace("_", "-") - - def check_op_name(name_arg: str | None) -> str: - if name_arg is None: - assert op.name is None - return task_id_normalized - - assert isinstance(op.name, str) - if name_arg == "no_name_in_args": - generated_name = f"k8s-airflow-pod-{task_id_normalized}" - assert op.name == generated_name - return generated_name - - normalized_name = name_arg.replace("_", "-") - assert op.name == normalized_name - - return normalized_name - - def check_pod_name(name_base: str): - if random_name_suffix: - assert pod_meta.name.startswith(f"{name_base}") - assert pod_meta.name != name_base - else: - assert pod_meta.name == name_base - - pod_name = check_op_name(name) - check_pod_name(pod_name) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py index 41a21f7846f94..91553131ff636 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py @@ -16,68 +16,19 @@ # under the License. from __future__ import annotations +import contextlib from unittest import mock import pytest -from airflow.decorators import setup, task, teardown -from airflow.utils import timezone +from airflow.decorators import task pytestmark = pytest.mark.db_test -DEFAULT_DATE = timezone.datetime(2021, 9, 1) - -KPO_MODULE = "airflow.providers.cncf.kubernetes.operators.pod" -POD_MANAGER_CLASS = "airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager" -HOOK_CLASS = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesHook" XCOM_IMAGE = "XCOM_IMAGE" -@pytest.fixture(autouse=True) -def mock_create_pod() -> mock.Mock: - return mock.patch(f"{POD_MANAGER_CLASS}.create_pod").start() - - -@pytest.fixture(autouse=True) -def mock_await_pod_start() -> mock.Mock: - return mock.patch(f"{POD_MANAGER_CLASS}.await_pod_start").start() - - -@pytest.fixture(autouse=True) -def await_xcom_sidecar_container_start() -> mock.Mock: - return mock.patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start").start() - - -@pytest.fixture(autouse=True) -def extract_xcom() -> mock.Mock: - f = mock.patch(f"{POD_MANAGER_CLASS}.extract_xcom").start() - f.return_value = '{"key1": "value1", "key2": "value2"}' - return f - - -@pytest.fixture(autouse=True) -def mock_await_pod_completion() -> mock.Mock: - f = mock.patch(f"{POD_MANAGER_CLASS}.await_pod_completion").start() - f.return_value = mock.MagicMock(**{"status.phase": "Succeeded"}) - return f - - -@pytest.fixture(autouse=True) -def mock_hook(): - return mock.patch(HOOK_CLASS).start() - - -# Without this patch each time pod manager would try to extract logs from the pod -# and log an error about it's inability to get containers for the log -# {pod_manager.py:572} ERROR - Could not retrieve containers for the pod: ... -@pytest.fixture(autouse=True) -def mock_fetch_logs() -> mock.Mock: - f = mock.patch(f"{POD_MANAGER_CLASS}.fetch_requested_container_logs").start() - f.return_value = "logs" - return f - - @pytest.mark.parametrize( "args_only", [True, False], @@ -136,6 +87,17 @@ def hello(): (None, TypeError), (("a", "b"), TypeError), ([], ValueError), + (["echo", 123], TypeError), + (["echo", "Hello world!"], None), + ], + ids=[ + "iterable_str", + "int", + "None", + "tuple", + "empty_list", + "mixed_list", + "valid_list", ], ) def test_kubernetes_cmd_wrong_cmd( @@ -164,7 +126,9 @@ def hello(): (ti,) = dr.task_instances session.add(ti) session.commit() - with pytest.raises(exception): + + context_manager = pytest.raises(exception) if exception else contextlib.nullcontext() + with context_manager: dag.get_task("hello").execute(context=ti.get_template_context(session=session)) @@ -223,148 +187,3 @@ def f(arg1, arg2, kwarg1=None, kwarg2=None): # Second container is xcom image assert containers[1].image == XCOM_IMAGE assert containers[1].volume_mounts[0].mount_path == "/airflow/xcom" - - -def test_kubernetes_with_marked_as_setup( - dag_maker, session, mock_create_pod: mock.Mock, mock_hook: mock.Mock -) -> None: - with dag_maker(session=session) as dag: - - @setup - @task.kubernetes_cmd( - image="python:3.10-slim-buster", - in_cluster=False, - cluster_context="default", - config_file="/tmp/fake_file", - ) - def f(): - return ["echo", "Hello world!"] - - f() - - assert len(dag.task_group.children) == 1 - setup_task = dag.task_group.children["f"] - assert setup_task.is_setup - - -def test_kubernetes_with_marked_as_teardown( - dag_maker, session, mock_create_pod: mock.Mock, mock_hook: mock.Mock -) -> None: - with dag_maker(session=session) as dag: - - @teardown - @task.kubernetes_cmd( - image="python:3.10-slim-buster", - in_cluster=False, - cluster_context="default", - config_file="/tmp/fake_file", - ) - def f(): - return ["echo", "Hello world!"] - - f() - - assert len(dag.task_group.children) == 1 - teardown_task = dag.task_group.children["f"] - assert teardown_task.is_teardown - - -@pytest.mark.parametrize( - "name", - ["no_name_in_args", None, "test_task_name"], - ids=["no_name_in_args", "name_set_to_None", "with_name"], -) -@pytest.mark.parametrize( - "random_name_suffix", - [True, False], - ids=["rand_suffix", "no_rand_suffix"], -) -def test_pod_naming( - dag_maker, - session, - mock_create_pod: mock.Mock, - name: str | None, - random_name_suffix: bool, -) -> None: - """ - Idea behind this test is to check naming conventions are respected in various - decorator arguments combinations scenarios. - - @task.kubernetes_cmd differs from KubernetesPodOperator in a way that it distinguishes - between no name argument was provided and name was set to None. - In the first case, the operator name is generated from the python_callable name, - in the second case default KubernetesPodOperator behavior is preserved. - """ - extra_kwargs = {"name": name} - if name == "no_name_in_args": - extra_kwargs.pop("name") - - with dag_maker(session=session) as dag: - - @task.kubernetes_cmd( - image="python:3.10-slim-buster", - in_cluster=False, - cluster_context="default", - config_file="/tmp/fake_file", - random_name_suffix=random_name_suffix, - namespace="default", - **extra_kwargs, # type: ignore - ) - def task_function_name(): - return ["42"] - - task_function_name() - - dr = dag_maker.create_dagrun() - (ti,) = dr.task_instances - session.add(ti) - session.commit() - - task_id = "task_function_name" - op = dag.get_task(task_id) - if name is not None: - assert isinstance(op.name, str) - - # If name was explicitly set to None, we expect the operator name to be None - if name is None: - assert op.name is None - # If name was not provided in decorator, it would be generated: - # f"k8s-airflow-pod-{python_callable.__name__}" - elif name == "no_name_in_args": - assert op.name == f"k8s-airflow-pod-{task_id}" - # Otherwise, we expect the name to be exactly the same as provided - else: - assert op.name == name - - op.execute(context=ti.get_template_context(session=session)) - pod_meta = mock_create_pod.call_args.kwargs["pod"].metadata - assert isinstance(pod_meta.name, str) - - # After execution pod names should not contain underscores - task_id_normalized = task_id.replace("_", "-") - - def check_op_name(name_arg: str | None) -> str: - if name_arg is None: - assert op.name is None - return task_id_normalized - - assert isinstance(op.name, str) - if name_arg == "no_name_in_args": - generated_name = f"k8s-airflow-pod-{task_id_normalized}" - assert op.name == generated_name - return generated_name - - normalized_name = name_arg.replace("_", "-") - assert op.name == normalized_name - - return normalized_name - - def check_pod_name(name_base: str): - if random_name_suffix: - assert pod_meta.name.startswith(f"{name_base}") - assert pod_meta.name != name_base - else: - assert pod_meta.name == name_base - - pod_name = check_op_name(name) - check_pod_name(pod_name) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py new file mode 100644 index 0000000000000..13671d8eaf4e5 --- /dev/null +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py @@ -0,0 +1,225 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Callable +from unittest import mock + +import pytest + +from airflow.decorators import setup, task, teardown + +pytestmark = pytest.mark.db_test + + +TASK_FUNCTION_NAME_ID = "task_function_name" + + +def _kubernetes_func(): + return {"key1": "value1", "key2": "value2"} + + +def _kubernetes_cmd_func(): + return ["echo", "Hello world!"] + + +def _get_decorator_func(decorator_name: str) -> Callable: + if decorator_name == "kubernetes": + return _kubernetes_func + if decorator_name == "kubernetes_cmd": + return _kubernetes_cmd_func + raise ValueError(f"Unknown decorator {decorator_name}") + + +def _prepare_task( + task_decorator: Callable, + decorator_name: str, + **decorator_kwargs, +) -> Callable: + func_to_use = _get_decorator_func(decorator_name) + + @task_decorator( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + **decorator_kwargs, + ) + def task_function_name(): + return func_to_use() + + return task_function_name + + +@pytest.mark.parametrize( + "task_decorator,decorator_name", + [ + (task.kubernetes, "kubernetes"), + (task.kubernetes_cmd, "kubernetes_cmd"), + ], + ids=["kubernetes", "kubernetes_cmd"], +) +def test_decorators_with_marked_as_setup( + task_decorator, + decorator_name, + dag_maker, + session, + mock_create_pod: mock.Mock, + mock_hook: mock.Mock, +) -> None: + with dag_maker(session=session) as dag: + task_function_name = setup(_prepare_task(task_decorator, decorator_name)) + task_function_name() + + assert len(dag.task_group.children) == 1 + setup_task = dag.task_group.children[TASK_FUNCTION_NAME_ID] + assert setup_task.is_setup + + +@pytest.mark.parametrize( + "task_decorator,decorator_name", + [ + (task.kubernetes, "kubernetes"), + (task.kubernetes_cmd, "kubernetes_cmd"), + ], + ids=["kubernetes", "kubernetes_cmd"], +) +def test_decorators_with_marked_as_teardown( + task_decorator, + decorator_name, + dag_maker, + session, + mock_create_pod: mock.Mock, + mock_hook: mock.Mock, +) -> None: + with dag_maker(session=session) as dag: + task_function_name = teardown(_prepare_task(task_decorator, decorator_name)) + task_function_name() + + assert len(dag.task_group.children) == 1 + teardown_task = dag.task_group.children[TASK_FUNCTION_NAME_ID] + assert teardown_task.is_teardown + + +@pytest.mark.parametrize( + "name", + ["no_name_in_args", None, "test_task_name"], + ids=["no_name_in_args", "name_set_to_None", "with_name"], +) +@pytest.mark.parametrize( + "random_name_suffix", + [True, False], + ids=["rand_suffix", "no_rand_suffix"], +) +@pytest.mark.parametrize( + "task_decorator,decorator_name", + [ + (task.kubernetes, "kubernetes"), + (task.kubernetes_cmd, "kubernetes_cmd"), + ], + ids=["kubernetes", "kubernetes_cmd"], +) +def test_pod_naming( + task_decorator, + decorator_name, + dag_maker, + session, + mock_create_pod: mock.Mock, + name: str | None, + random_name_suffix: bool, +) -> None: + """ + Idea behind this test is to check naming conventions are respected in various + decorator arguments combinations scenarios. + + @task.kubernetes[_cmd] differs from KubernetesPodOperator in a way that it distinguishes + between no name argument was provided and name was set to None. + In the first case, the operator name is generated from the python_callable name, + in the second case default KubernetesPodOperator behavior is preserved. + """ + extra_kwargs = {"name": name} + if name == "no_name_in_args": + extra_kwargs.pop("name") + + decorator_kwargs = { + "random_name_suffix": random_name_suffix, + "namespace": "default", + **extra_kwargs, + } + + with dag_maker(session=session) as dag: + task_function_name = _prepare_task( + task_decorator, + decorator_name, + **decorator_kwargs, + ) + + task_function_name() + + dr = dag_maker.create_dagrun() + (ti,) = dr.task_instances + session.add(ti) + session.commit() + + task_id = TASK_FUNCTION_NAME_ID + op = dag.get_task(task_id) + if name is not None: + assert isinstance(op.name, str) + + # If name was explicitly set to None, we expect the operator name to be None + if name is None: + assert op.name is None + # If name was not provided in decorator, it would be generated: + # f"k8s-airflow-pod-{python_callable.__name__}" + elif name == "no_name_in_args": + assert op.name == f"k8s-airflow-pod-{task_id}" + # Otherwise, we expect the name to be exactly the same as provided + else: + assert op.name == name + + op.execute(context=ti.get_template_context(session=session)) + pod_meta = mock_create_pod.call_args.kwargs["pod"].metadata + assert isinstance(pod_meta.name, str) + + # After execution pod names should not contain underscores + task_id_normalized = task_id.replace("_", "-") + + def check_op_name(name_arg: str | None) -> str: + if name_arg is None: + assert op.name is None + return task_id_normalized + + assert isinstance(op.name, str) + if name_arg == "no_name_in_args": + generated_name = f"k8s-airflow-pod-{task_id_normalized}" + assert op.name == generated_name + return generated_name + + normalized_name = name_arg.replace("_", "-") + assert op.name == normalized_name + + return normalized_name + + def check_pod_name(name_base: str): + if random_name_suffix: + assert pod_meta.name.startswith(f"{name_base}") + assert pod_meta.name != name_base + else: + assert pod_meta.name == name_base + + pod_name = check_op_name(name) + check_pod_name(pod_name) From 6da7d566b70bf3306534f047f300f955580d7d00 Mon Sep 17 00:00:00 2001 From: insomnes Date: Sun, 2 Mar 2025 10:13:18 +0100 Subject: [PATCH 03/21] Add usage example / system test --- .../example_kubernetes_cmd_decorator.py | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py diff --git a/providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py b/providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py new file mode 100644 index 0000000000000..6faba199187e8 --- /dev/null +++ b/providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py @@ -0,0 +1,78 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow import DAG +from airflow.decorators import task + +with DAG( + dag_id="example_kubernetes_cmd_decorator", + schedule=None, + start_date=datetime(2021, 1, 1), + tags=["example", "cncf", "kubernetes"], + catchup=False, +) as dag: + # [START howto_decorator_kubernetes_cmd] + @task + def foo() -> str: + return "foo" + + @task + def bar() -> str: + return "bar" + + @task.kubernetes_cmd( + image="bash:5.2", + name="full_cmd", + in_cluster=False, + ) + def execute_in_k8s_pod_full_cmd(foo_result: str, bar_result: str) -> list[str]: + return ["echo", "-e", f"With full cmd:\\t{foo_result}\\t{bar_result}"] + + # The args_only parameter is used to indicate that the decorated function will + # return a list of arguments to be passed as arguments to the container entrypoint: + # in this case, the `bash` command + @task.kubernetes_cmd(args_only=True, image="bash:5.2", in_cluster=False) + def execute_in_k8s_pod_args_only(foo_result: str, bar_result: str) -> list[str]: + return ["-c", f"echo -e 'With args only:\\t{foo_result}\\t{bar_result}'"] + + # Templating can be used in the return value and in python callable arguments + @task.kubernetes_cmd(image="bash:5.2", name="full_cmd", in_cluster=False) + def apply_templating(message: str) -> list[str]: + full_message = "Templated task_id: {{ task.task_id }}, dag_id: " + + full_message += message + return ["echo", full_message] + + foo_result = foo() + bar_result = bar() + + full_cmd_instance = execute_in_k8s_pod_full_cmd(foo_result, bar_result) + args_instance = execute_in_k8s_pod_args_only(foo_result, bar_result) + + [full_cmd_instance, args_instance] >> apply_templating("{{ dag.dag_id }}") + + # [END howto_decorator_kubernetes_cmd] + + +from tests_common.test_utils.system_tests import get_test_run + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) From d8c3b29aa5f7e109c0dac66597abf0b5c703e7bf Mon Sep 17 00:00:00 2001 From: insomnes Date: Sun, 2 Mar 2025 10:16:25 +0100 Subject: [PATCH 04/21] Warnings on cmds and arguments, update flow --- .../kubernetes/decorators/kubernetes_cmd.py | 57 +++++++++---------- 1 file changed, 26 insertions(+), 31 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py index 5480eb7cd0e9d..c92f6ae838fda 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py @@ -16,57 +16,56 @@ # under the License. from __future__ import annotations -import logging +import warnings from collections.abc import Sequence from typing import TYPE_CHECKING, Callable from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from airflow.utils.context import context_merge +from airflow.utils.operator_helpers import determine_kwargs if TYPE_CHECKING: from airflow.utils.context import Context -logger = logging.getLogger(__name__) - - class _KubernetesCmdDecoratedOperator(DecoratedOperator, KubernetesPodOperator): custom_operator_name = "@task.kubernetes_cmd" - # `cmds` and `arguments` are used internally by the operator - template_fields: Sequence[str] = tuple( - {"op_args", "op_kwargs", *KubernetesPodOperator.template_fields} - {"cmds", "arguments"} - ) - - # Since we won't mutate the arguments, we should just do the shallow copy - # there are some cases we can't deepcopy the objects (e.g protobuf). - shallow_copy_attrs: Sequence[str] = ("python_callable",) + template_fields: Sequence[str] = tuple({"op_args", "op_kwargs", *KubernetesPodOperator.template_fields}) - def __init__(self, namespace: str | None = None, args_only: bool = False, **kwargs) -> None: + def __init__(self, args_only: bool = False, **kwargs) -> None: self.args_only = args_only + cmds = kwargs.pop("cmds", None) + arguments = kwargs.pop("arguments", None) + + if cmds is not None or arguments is not None: + warnings.warn( + f"The `cmds` and `arguments` are unused in {self.custom_operator_name} decorator. " + "You should return a list of commands or image entrypoint arguments with " + "args_only=True from the python_callable.", + UserWarning, + stacklevel=3, + ) + # If the name was not provided, we generate operator name from the python_callable # we also instruct operator to add a random suffix to avoid collisions by default op_name = kwargs.pop("name", f"k8s-airflow-pod-{kwargs['python_callable'].__name__}") random_name_suffix = kwargs.pop("random_name_suffix", True) - if kwargs.pop("cmds", None) is not None: - logger.warning( - "The 'cmds' and 'arguments' are unused in @task.kubernetes_cmd decorator. " - "You should return a list of commands or default cmd arguments with " - "args_only=True from the python_callable." - ) super().__init__( - namespace=namespace, name=op_name, random_name_suffix=random_name_suffix, - cmds=["placeholder-command"], - arguments=["placeholder-argument"], + cmds=None, + arguments=None, **kwargs, ) - def _generate_cmds(self) -> list[str]: - generated_cmds = self.python_callable(*self.op_args, **self.op_kwargs) + def _generate_cmds(self, context: Context) -> list[str]: + context_merge(context, self.op_kwargs) + kwargs = determine_kwargs(self.python_callable, self.op_args, context) + generated_cmds = self.python_callable(*self.op_args, **kwargs) func_name = self.python_callable.__name__ if not isinstance(generated_cmds, list): raise TypeError( @@ -80,18 +79,18 @@ def _generate_cmds(self) -> list[str]: return generated_cmds def execute(self, context: Context): + generated = self._generate_cmds(context) if self.args_only: self.cmds = [] - self.arguments = self._generate_cmds() + self.arguments = generated else: - self.cmds = self._generate_cmds() + self.cmds = generated self.arguments = [] return super().execute(context) def kubernetes_task_cmd( python_callable: Callable | None = None, - multiple_outputs: bool | None = None, **kwargs, ) -> TaskDecorator: """ @@ -104,13 +103,9 @@ def kubernetes_task_cmd( will via ``kwargs``. Can be reused in a single DAG. :param python_callable: Function to decorate - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. Dict will unroll to xcom values with - keys as XCom keys. Defaults to False. """ return task_decorator_factory( python_callable=python_callable, - multiple_outputs=multiple_outputs, decorated_operator_class=_KubernetesCmdDecoratedOperator, **kwargs, ) From ee1e5817d1d9e51f178c0df1d007312faad069f2 Mon Sep 17 00:00:00 2001 From: insomnes Date: Sun, 2 Mar 2025 10:51:43 +0100 Subject: [PATCH 05/21] Red tests --- .../decorators/test_kubernetes_cmd.py | 125 +++++++++++++++++- 1 file changed, 123 insertions(+), 2 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py index 91553131ff636..ae36773233a9b 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py @@ -17,6 +17,7 @@ from __future__ import annotations import contextlib +import warnings from unittest import mock import pytest @@ -40,6 +41,7 @@ def test_basic_kubernetes_cmd( mock_create_pod: mock.Mock, mock_hook: mock.Mock, ) -> None: + expected = ["echo", "Hello world!"] with dag_maker(session=session) as dag: @task.kubernetes_cmd( @@ -51,7 +53,7 @@ def test_basic_kubernetes_cmd( args_only=args_only, ) def hello(): - return ["echo", "Hello world!"] + return expected hello() @@ -70,11 +72,13 @@ def hello(): containers = mock_create_pod.call_args.kwargs["pod"].spec.containers assert len(containers) == 1 - expected_command = ["echo", "Hello world!"] + + expected_command = expected expected_args = [] if args_only: expected_args = expected_command expected_command = [] + assert containers[0].command == expected_command assert containers[0].args == expected_args @@ -187,3 +191,120 @@ def f(arg1, arg2, kwarg1=None, kwarg2=None): # Second container is xcom image assert containers[1].image == XCOM_IMAGE assert containers[1].volume_mounts[0].mount_path == "/airflow/xcom" + + +@pytest.mark.parametrize( + "cmds", + [None, ["ignored_cmd"], "ignored_cmd"], +) +@pytest.mark.parametrize( + "arguments", + [None, ["ignored_arg"], "ignored_arg"], +) +@pytest.mark.parametrize( + "args_only", + [True, False], +) +def test_ignored_decorator_parameters( + dag_maker, + session, + cmds: list[str], + arguments: list[str], + args_only: bool, + mock_create_pod: mock.Mock, + mock_hook: mock.Mock, +) -> None: + """Verify setting `cmds` or `arguments` for a @task.kubernetes_cmd function is ignored.""" + + expected = ["func", "return"] + with dag_maker(session=session) as dag: + + @task.kubernetes_cmd( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + namespace="default", + cmds=cmds, + arguments=arguments, + args_only=args_only, + ) + def hello(): + return expected + + hello_task = hello() + + assert hello_task.operator.cmds == [] + assert hello_task.operator.arguments == [] + + with warnings.catch_warnings(): + warnings.simplefilter("error", category=UserWarning) + dr = dag_maker.create_dagrun() + (ti,) = dr.task_instances + session.add(ti) + session.commit() + dag.get_task("hello").execute(context=ti.get_template_context(session=session)) + + containers = mock_create_pod.call_args.kwargs["pod"].spec.containers + assert len(containers) == 1 + + expected_command = expected + expected_args = [] + if args_only: + expected_args = expected_command + expected_command = [] + assert containers[0].command == expected_command + assert containers[0].args == expected_args + + +@pytest.mark.parametrize( + argnames=["command", "expected_command", "expected_return_val"], + argvalues=[ + pytest.param( + ["echo", "hello", "world"], + ["echo", "hello", "world"], + ["echo", "hello", "world"], + id="not_templated", + ), + pytest.param(["echo", "{{ ti.task_id }}"], ["echo", "hello"], ["echo", "hello"], id="templated"), + ], +) +def test_rendering_kubernetes_cmd( + dag_maker, + session, + command, + expected_command, + expected_return_val, + mock_create_pod: mock.Mock, + mock_hook: mock.Mock, +) -> None: + with dag_maker(session=session) as dag: + + @task.kubernetes_cmd( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + namespace="default", + ) + def hello(): + return command + + hello() + + dr = dag_maker.create_dagrun() + (ti,) = dr.task_instances + session.add(ti) + session.commit() + retr = dag.get_task("hello").execute(context=ti.get_template_context(session=session)) + mock_hook.assert_called_once_with( + conn_id="kubernetes_default", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + ) + containers = mock_create_pod.call_args.kwargs["pod"].spec.containers + assert len(containers) == 1 + + assert containers[0].command == expected_command + assert retr == expected_return_val From e23566a9a42b13cd3923e912ea57782e07416002 Mon Sep 17 00:00:00 2001 From: insomnes Date: Sun, 2 Mar 2025 16:19:46 +0100 Subject: [PATCH 06/21] Save state --- .../kubernetes/decorators/kubernetes_cmd.py | 49 ++++++--- providers/cncf/kubernetes/tests/conftest.py | 1 + .../cncf/kubernetes/decorators/conftest.py | 2 + .../decorators/test_kubernetes_cmd.py | 101 +++++++++++++++--- 4 files changed, 122 insertions(+), 31 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py index c92f6ae838fda..b9e3b6addde3d 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py @@ -17,24 +17,37 @@ from __future__ import annotations import warnings -from collections.abc import Sequence -from typing import TYPE_CHECKING, Callable +from collections.abc import Collection, Mapping, Sequence +from typing import TYPE_CHECKING, Any, Callable from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from airflow.providers.cncf.kubernetes.template_rendering import refresh_rendered_fields from airflow.utils.context import context_merge from airflow.utils.operator_helpers import determine_kwargs if TYPE_CHECKING: - from airflow.utils.context import Context + try: + from airflow.sdk.definitions.context import Context + except ImportError: + # TODO: Remove once provider drops support for Airflow 2 + from airflow.utils.context import Context class _KubernetesCmdDecoratedOperator(DecoratedOperator, KubernetesPodOperator): custom_operator_name = "@task.kubernetes_cmd" - template_fields: Sequence[str] = tuple({"op_args", "op_kwargs", *KubernetesPodOperator.template_fields}) - - def __init__(self, args_only: bool = False, **kwargs) -> None: + template_fields: Sequence[str] = (*DecoratedOperator.template_fields, *KubernetesPodOperator.template_fields) + + def __init__( + self, + *, + python_callable: Callable, + op_args: Collection[Any] | None = None, + op_kwargs: Mapping[str, Any] | None = None, + args_only: bool = False, + **kwargs + ) -> None: self.args_only = args_only cmds = kwargs.pop("cmds", None) @@ -51,10 +64,11 @@ def __init__(self, args_only: bool = False, **kwargs) -> None: # If the name was not provided, we generate operator name from the python_callable # we also instruct operator to add a random suffix to avoid collisions by default - op_name = kwargs.pop("name", f"k8s-airflow-pod-{kwargs['python_callable'].__name__}") + op_name = kwargs.pop("name", f"k8s-airflow-pod-{python_callable.__name__}") random_name_suffix = kwargs.pop("random_name_suffix", True) super().__init__( + python_callable=python_callable, name=op_name, random_name_suffix=random_name_suffix, cmds=None, @@ -62,6 +76,17 @@ def __init__(self, args_only: bool = False, **kwargs) -> None: **kwargs, ) + def execute(self, context: Context): + generated = self._generate_cmds(context) + if self.args_only: + self.cmds = [] + self.arguments = generated + else: + self.cmds = generated + self.arguments = [] + refresh_rendered_fields(context["task_instance"]) + return super().execute(context) + def _generate_cmds(self, context: Context) -> list[str]: context_merge(context, self.op_kwargs) kwargs = determine_kwargs(self.python_callable, self.op_args, context) @@ -78,16 +103,6 @@ def _generate_cmds(self, context: Context) -> list[str]: return generated_cmds - def execute(self, context: Context): - generated = self._generate_cmds(context) - if self.args_only: - self.cmds = [] - self.arguments = generated - else: - self.cmds = generated - self.arguments = [] - return super().execute(context) - def kubernetes_task_cmd( python_callable: Callable | None = None, diff --git a/providers/cncf/kubernetes/tests/conftest.py b/providers/cncf/kubernetes/tests/conftest.py index f56ccce0a3f69..6ef555e84ff65 100644 --- a/providers/cncf/kubernetes/tests/conftest.py +++ b/providers/cncf/kubernetes/tests/conftest.py @@ -17,3 +17,4 @@ from __future__ import annotations pytest_plugins = "tests_common.pytest_plugin" +from tests_common.pytest_plugin diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/conftest.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/conftest.py index 2c55a11fa1ce5..db97cc64e31fd 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/conftest.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/conftest.py @@ -21,6 +21,8 @@ import pytest +pytest_plugins = "tests_common.pytest_plugin" + KPO_MODULE = "airflow.providers.cncf.kubernetes.operators.pod" POD_MANAGER_CLASS = "airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager" HOOK_CLASS = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesHook" diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py index ae36773233a9b..d5ae19cfe1016 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py @@ -41,6 +41,7 @@ def test_basic_kubernetes_cmd( mock_create_pod: mock.Mock, mock_hook: mock.Mock, ) -> None: + """Test basic @task.kubernetes_cmd flow.""" expected = ["echo", "Hello world!"] with dag_maker(session=session) as dag: @@ -87,6 +88,7 @@ def hello(): "func_return, exception", [ ("string", TypeError), + (True, TypeError), (42, TypeError), (None, TypeError), (("a", "b"), TypeError), @@ -96,6 +98,7 @@ def hello(): ], ids=[ "iterable_str", + "bool", "int", "None", "tuple", @@ -112,6 +115,7 @@ def test_kubernetes_cmd_wrong_cmd( mock_create_pod: mock.Mock, mock_hook: mock.Mock, ) -> None: + """Test that @task.kubernetes_cmd raises an error if the python_callable returns an invalid value.""" with dag_maker(session=session) as dag: @task.kubernetes_cmd( @@ -139,6 +143,7 @@ def hello(): def test_kubernetes_cmd_with_input_output( dag_maker, session, mock_create_pod: mock.Mock, mock_hook: mock.Mock ) -> None: + """Verify @task.kubernetes_cmd will run XCom container if do_xcom_push is set.""" with dag_maker(session=session) as dag: @task.kubernetes_cmd( @@ -193,6 +198,69 @@ def f(arg1, arg2, kwarg1=None, kwarg2=None): assert containers[1].volume_mounts[0].mount_path == "/airflow/xcom" +@pytest.mark.parametrize( + argnames=["cmds", "arguments"], + argvalues=[ + pytest.param( + ["ignored_cmd"], + None, + id="cmd-only", + ), + pytest.param( + None, + ["ignored_arg"], + id="arg-only", + ), + pytest.param( + ["ignored_cmd"], + ["ignored_arg"], + id="cmd-and-arg", + ), + pytest.param( + None, + None, + id="none", + ), + ], +) +def test_ignored_decorator_parameters_warning( + dag_maker, + session, + cmds: list[str] | None, + arguments: list[str] | None, + mock_create_pod: mock.Mock, + mock_hook: mock.Mock, +) -> None: + """Verify setting `cmds` or `arguments` for a @task.kubernetes_cmd function is ignored.""" + print("\nIN TEST") + print(cmds, arguments) + with dag_maker(session=session) as dag: + + @task.kubernetes_cmd( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + namespace="default", + cmds=cmds, + arguments=arguments, + ) + def hello(): + return ["echo", "Hello world!"] + + hello_task = hello() + + context_manager = pytest.warns(UserWarning) + if cmds is None and arguments is None: + context_manager = contextlib.nullcontext() + with pytest.warns(UserWarning): + dr = dag_maker.create_dagrun() + (ti,) = dr.task_instances + session.add(ti) + session.commit() + dag.get_task("hello").execute(context=ti.get_template_context(session=session)) + + @pytest.mark.parametrize( "cmds", [None, ["ignored_cmd"], "ignored_cmd"], @@ -208,8 +276,8 @@ def f(arg1, arg2, kwarg1=None, kwarg2=None): def test_ignored_decorator_parameters( dag_maker, session, - cmds: list[str], - arguments: list[str], + cmds: list[str] | None, + arguments: list[str] | None, args_only: bool, mock_create_pod: mock.Mock, mock_hook: mock.Mock, @@ -258,23 +326,28 @@ def hello(): @pytest.mark.parametrize( - argnames=["command", "expected_command", "expected_return_val"], + argnames=["command", "op_arg", "expected_command"], argvalues=[ pytest.param( - ["echo", "hello", "world"], - ["echo", "hello", "world"], + ["echo", "hello"], + "world", ["echo", "hello", "world"], id="not_templated", ), - pytest.param(["echo", "{{ ti.task_id }}"], ["echo", "hello"], ["echo", "hello"], id="templated"), + pytest.param( + ["echo", "{{ ti.task_id }}"], + "{{ ti.dag_id }}", + ["echo", "hello", "test_dag"], + id="templated" + ), ], ) def test_rendering_kubernetes_cmd( dag_maker, session, - command, - expected_command, - expected_return_val, + command: list[str], + op_arg: str, + expected_command: list[str], mock_create_pod: mock.Mock, mock_hook: mock.Mock, ) -> None: @@ -287,16 +360,17 @@ def test_rendering_kubernetes_cmd( config_file="/tmp/fake_file", namespace="default", ) - def hello(): - return command + def hello(add_to_command: str): + return command + [add_to_command] - hello() + hello(op_arg) dr = dag_maker.create_dagrun() (ti,) = dr.task_instances session.add(ti) session.commit() - retr = dag.get_task("hello").execute(context=ti.get_template_context(session=session)) + dag.get_task("hello").execute(context=ti.get_template_context(session=session)) + mock_hook.assert_called_once_with( conn_id="kubernetes_default", in_cluster=False, @@ -307,4 +381,3 @@ def hello(): assert len(containers) == 1 assert containers[0].command == expected_command - assert retr == expected_return_val From d435916427e9b6b42cdde6c6cbbf58da4d984438 Mon Sep 17 00:00:00 2001 From: insomnes Date: Sun, 2 Mar 2025 19:11:02 +0100 Subject: [PATCH 07/21] Finaliaze decorator changes --- .../kubernetes/decorators/kubernetes_cmd.py | 18 +- providers/cncf/kubernetes/tests/conftest.py | 1 - .../example_kubernetes_cmd_decorator.py | 6 +- .../cncf/kubernetes/decorators/conftest.py | 72 --- .../kubernetes/decorators/test_kubernetes.py | 182 +++--- .../decorators/test_kubernetes_cmd.py | 557 +++++++----------- .../decorators/test_kubernetes_commons.py | 355 ++++++----- 7 files changed, 518 insertions(+), 673 deletions(-) delete mode 100644 providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/conftest.py diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py index b9e3b6addde3d..aca77e6021cb5 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py @@ -17,8 +17,8 @@ from __future__ import annotations import warnings -from collections.abc import Collection, Mapping, Sequence -from typing import TYPE_CHECKING, Any, Callable +from collections.abc import Sequence +from typing import TYPE_CHECKING, Callable from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator @@ -37,17 +37,9 @@ class _KubernetesCmdDecoratedOperator(DecoratedOperator, KubernetesPodOperator): custom_operator_name = "@task.kubernetes_cmd" - template_fields: Sequence[str] = (*DecoratedOperator.template_fields, *KubernetesPodOperator.template_fields) - - def __init__( - self, - *, - python_callable: Callable, - op_args: Collection[Any] | None = None, - op_kwargs: Mapping[str, Any] | None = None, - args_only: bool = False, - **kwargs - ) -> None: + template_fields: Sequence[str] = KubernetesPodOperator.template_fields + + def __init__(self, *, python_callable: Callable, args_only: bool = False, **kwargs) -> None: self.args_only = args_only cmds = kwargs.pop("cmds", None) diff --git a/providers/cncf/kubernetes/tests/conftest.py b/providers/cncf/kubernetes/tests/conftest.py index 6ef555e84ff65..f56ccce0a3f69 100644 --- a/providers/cncf/kubernetes/tests/conftest.py +++ b/providers/cncf/kubernetes/tests/conftest.py @@ -17,4 +17,3 @@ from __future__ import annotations pytest_plugins = "tests_common.pytest_plugin" -from tests_common.pytest_plugin diff --git a/providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py b/providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py index 6faba199187e8..6e4abf574593a 100644 --- a/providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py +++ b/providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py @@ -53,12 +53,10 @@ def execute_in_k8s_pod_full_cmd(foo_result: str, bar_result: str) -> list[str]: def execute_in_k8s_pod_args_only(foo_result: str, bar_result: str) -> list[str]: return ["-c", f"echo -e 'With args only:\\t{foo_result}\\t{bar_result}'"] - # Templating can be used in the return value and in python callable arguments + # Templating can be used in the returned command @task.kubernetes_cmd(image="bash:5.2", name="full_cmd", in_cluster=False) def apply_templating(message: str) -> list[str]: - full_message = "Templated task_id: {{ task.task_id }}, dag_id: " - - full_message += message + full_message = "Templated task_id: {{ task.task_id }}, dag_id: " + message return ["echo", full_message] foo_result = foo() diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/conftest.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/conftest.py deleted file mode 100644 index db97cc64e31fd..0000000000000 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/conftest.py +++ /dev/null @@ -1,72 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from __future__ import annotations - -from unittest import mock - -import pytest - -pytest_plugins = "tests_common.pytest_plugin" - -KPO_MODULE = "airflow.providers.cncf.kubernetes.operators.pod" -POD_MANAGER_CLASS = "airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager" -HOOK_CLASS = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesHook" - - -@pytest.fixture(autouse=True) -def mock_create_pod() -> mock.Mock: - return mock.patch(f"{POD_MANAGER_CLASS}.create_pod").start() - - -@pytest.fixture(autouse=True) -def mock_await_pod_start() -> mock.Mock: - return mock.patch(f"{POD_MANAGER_CLASS}.await_pod_start").start() - - -@pytest.fixture(autouse=True) -def await_xcom_sidecar_container_start() -> mock.Mock: - return mock.patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start").start() - - -@pytest.fixture(autouse=True) -def extract_xcom() -> mock.Mock: - f = mock.patch(f"{POD_MANAGER_CLASS}.extract_xcom").start() - f.return_value = '{"key1": "value1", "key2": "value2"}' - return f - - -@pytest.fixture(autouse=True) -def mock_await_pod_completion() -> mock.Mock: - f = mock.patch(f"{POD_MANAGER_CLASS}.await_pod_completion").start() - f.return_value = mock.MagicMock(**{"status.phase": "Succeeded"}) - return f - - -@pytest.fixture(autouse=True) -def mock_hook(): - return mock.patch(HOOK_CLASS).start() - - -# Without this patch each time pod manager would try to extract logs from the pod -# and log an error about it's inability to get containers for the log -# {pod_manager.py:572} ERROR - Could not retrieve containers for the pod: ... -@pytest.fixture(autouse=True) -def mock_fetch_logs() -> mock.Mock: - f = mock.patch(f"{POD_MANAGER_CLASS}.fetch_requested_container_logs").start() - f.return_value = "logs" - return f diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py index a373a1a11c9e7..7d6753cea8351 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py @@ -18,116 +18,104 @@ import base64 import pickle -from unittest import mock - -import pytest from airflow.decorators import task +from unit.cncf.kubernetes.decorators.test_kubernetes_commons import TestKubernetesDecoratorsBase -pytestmark = pytest.mark.db_test +XCOM_IMAGE = "XCOM_IMAGE" -XCOM_IMAGE = "XCOM_IMAGE" +class TestKubernetesDecorator(TestKubernetesDecoratorsBase): + def test_basic_kubernetes(self): + """Test basic proper KubernetesPodOperator creation from @task.kubernetes decorator""" + with self.dag: + + @task.kubernetes( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + namespace="default", + ) + def f(): + import random + + return [random.random() for _ in range(100)] + k8s_task = f() -def test_basic_kubernetes(dag_maker, session, mock_create_pod: mock.Mock, mock_hook: mock.Mock) -> None: - with dag_maker(session=session) as dag: + self.execute_task(k8s_task) - @task.kubernetes( - image="python:3.10-slim-buster", + self.mock_hook.assert_called_once_with( + conn_id="kubernetes_default", in_cluster=False, cluster_context="default", config_file="/tmp/fake_file", - namespace="default", ) - def f(): - import random - - return [random.random() for _ in range(100)] - - f() - - dr = dag_maker.create_dagrun() - (ti,) = dr.task_instances - session.add(ti) - session.commit() - dag.get_task("f").execute(context=ti.get_template_context(session=session)) - mock_hook.assert_called_once_with( - conn_id="kubernetes_default", - in_cluster=False, - cluster_context="default", - config_file="/tmp/fake_file", - ) - assert mock_create_pod.call_count == 1 - - containers = mock_create_pod.call_args.kwargs["pod"].spec.containers - assert len(containers) == 1 - assert containers[0].command[0] == "bash" - assert len(containers[0].args) == 0 - assert containers[0].env[0].name == "__PYTHON_SCRIPT" - assert containers[0].env[0].value - assert containers[0].env[1].name == "__PYTHON_INPUT" - - # Ensure we pass input through a b64 encoded env var - decoded_input = pickle.loads(base64.b64decode(containers[0].env[1].value)) - assert decoded_input == {"args": [], "kwargs": {}} - - -def test_kubernetes_with_input_output( - dag_maker, session, mock_create_pod: mock.Mock, mock_hook: mock.Mock -) -> None: - with dag_maker(session=session) as dag: - - @task.kubernetes( - image="python:3.10-slim-buster", + assert self.mock_create_pod.call_count == 1 + + containers = self.mock_create_pod.call_args.kwargs["pod"].spec.containers + assert len(containers) == 1 + assert containers[0].command[0] == "bash" + assert len(containers[0].args) == 0 + assert containers[0].env[0].name == "__PYTHON_SCRIPT" + assert containers[0].env[0].value + assert containers[0].env[1].name == "__PYTHON_INPUT" + + # Ensure we pass input through a b64 encoded env var + decoded_input = pickle.loads(base64.b64decode(containers[0].env[1].value)) + assert decoded_input == {"args": [], "kwargs": {}} + + def test_kubernetes_with_input_output(self): + """Verify @task.kubernetes will run XCom container if do_xcom_push is set.""" + with self.dag: + + @task.kubernetes( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + namespace="default", + ) + def f(arg1, arg2, kwarg1=None, kwarg2=None): + return {"key1": "value1", "key2": "value2"} + + k8s_task = f.override(task_id="my_task_id", do_xcom_push=True)("arg1", "arg2", kwarg1="kwarg1") + + self.mock_hook.return_value.get_xcom_sidecar_container_image.return_value = XCOM_IMAGE + self.mock_hook.return_value.get_xcom_sidecar_container_resources.return_value = { + "requests": {"cpu": "1m", "memory": "10Mi"}, + "limits": {"cpu": "1m", "memory": "50Mi"}, + } + + self.execute_task(k8s_task) + assert self.mock_create_pod.call_count == 1 + + self.mock_hook.assert_called_once_with( + conn_id="kubernetes_default", in_cluster=False, cluster_context="default", config_file="/tmp/fake_file", - namespace="default", ) - def f(arg1, arg2, kwarg1=None, kwarg2=None): - return {"key1": "value1", "key2": "value2"} - - f.override(task_id="my_task_id", do_xcom_push=True)("arg1", "arg2", kwarg1="kwarg1") - - mock_hook.return_value.get_xcom_sidecar_container_image.return_value = XCOM_IMAGE - mock_hook.return_value.get_xcom_sidecar_container_resources.return_value = { - "requests": {"cpu": "1m", "memory": "10Mi"}, - "limits": {"cpu": "1m", "memory": "50Mi"}, - } - - dr = dag_maker.create_dagrun() - (ti,) = dr.task_instances - session.add(dr) - session.commit() - dag.get_task("my_task_id").execute(context=ti.get_template_context(session=session)) - - mock_hook.assert_called_once_with( - conn_id="kubernetes_default", - in_cluster=False, - cluster_context="default", - config_file="/tmp/fake_file", - ) - assert mock_create_pod.call_count == 1 - assert mock_hook.return_value.get_xcom_sidecar_container_image.call_count == 1 - assert mock_hook.return_value.get_xcom_sidecar_container_resources.call_count == 1 - - containers = mock_create_pod.call_args.kwargs["pod"].spec.containers - - # First container is Python script - assert len(containers) == 2 - assert containers[0].command[0] == "bash" - assert len(containers[0].args) == 0 - - assert containers[0].env[0].name == "__PYTHON_SCRIPT" - assert containers[0].env[0].value - assert containers[0].env[1].name == "__PYTHON_INPUT" - assert containers[0].env[1].value - - # Ensure we pass input through a b64 encoded env var - decoded_input = pickle.loads(base64.b64decode(containers[0].env[1].value)) - assert decoded_input == {"args": ("arg1", "arg2"), "kwargs": {"kwarg1": "kwarg1"}} - - # Second container is xcom image - assert containers[1].image == XCOM_IMAGE - assert containers[1].volume_mounts[0].mount_path == "/airflow/xcom" + assert self.mock_hook.return_value.get_xcom_sidecar_container_image.call_count == 1 + assert self.mock_hook.return_value.get_xcom_sidecar_container_resources.call_count == 1 + + containers = self.mock_create_pod.call_args.kwargs["pod"].spec.containers + + # First container is Python script + assert len(containers) == 2 + assert containers[0].command[0] == "bash" + assert len(containers[0].args) == 0 + + assert containers[0].env[0].name == "__PYTHON_SCRIPT" + assert containers[0].env[0].value + assert containers[0].env[1].name == "__PYTHON_INPUT" + assert containers[0].env[1].value + + # Ensure we pass input through a b64 encoded env var + decoded_input = pickle.loads(base64.b64decode(containers[0].env[1].value)) + assert decoded_input == {"args": ("arg1", "arg2"), "kwargs": {"kwarg1": "kwarg1"}} + + # Second container is xcom image + assert containers[1].image == XCOM_IMAGE + assert containers[1].volume_mounts[0].mount_path == "/airflow/xcom" diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py index d5ae19cfe1016..96568ec795269 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py @@ -17,367 +17,258 @@ from __future__ import annotations import contextlib -import warnings -from unittest import mock import pytest from airflow.decorators import task - -pytestmark = pytest.mark.db_test - +from unit.cncf.kubernetes.decorators.test_kubernetes_commons import DAG_ID, TestKubernetesDecoratorsBase XCOM_IMAGE = "XCOM_IMAGE" -@pytest.mark.parametrize( - "args_only", - [True, False], -) -def test_basic_kubernetes_cmd( - dag_maker, - session, - args_only: bool, - mock_create_pod: mock.Mock, - mock_hook: mock.Mock, -) -> None: - """Test basic @task.kubernetes_cmd flow.""" - expected = ["echo", "Hello world!"] - with dag_maker(session=session) as dag: - - @task.kubernetes_cmd( - image="python:3.10-slim-buster", +class TestKubernetesDecorator(TestKubernetesDecoratorsBase): + @pytest.mark.parametrize( + "args_only", + [True, False], + ) + def test_basic_kubernetes(self, args_only: bool): + """Test basic proper KubernetesPodOperator creation from @task.kubernetes_cmd decorator""" + expected = ["echo", "Hello world!"] + with self.dag: + + @task.kubernetes_cmd( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + namespace="default", + args_only=args_only, + ) + def hello(): + return expected + + k8s_task = hello() + + self.execute_task(k8s_task) + + self.mock_hook.assert_called_once_with( + conn_id="kubernetes_default", in_cluster=False, cluster_context="default", config_file="/tmp/fake_file", - namespace="default", - args_only=args_only, ) - def hello(): - return expected - - hello() - - dr = dag_maker.create_dagrun() - (ti,) = dr.task_instances - session.add(ti) - session.commit() - dag.get_task("hello").execute(context=ti.get_template_context(session=session)) - mock_hook.assert_called_once_with( - conn_id="kubernetes_default", - in_cluster=False, - cluster_context="default", - config_file="/tmp/fake_file", + assert self.mock_create_pod.call_count == 1 + + containers = self.mock_create_pod.call_args.kwargs["pod"].spec.containers + assert len(containers) == 1 + + expected_command = expected + expected_args = [] + if args_only: + expected_args = expected_command + expected_command = [] + + assert containers[0].command == expected_command + assert containers[0].args == expected_args + + @pytest.mark.parametrize( + "func_return, exception", + [ + pytest.param("string", TypeError, id="iterable_str"), + pytest.param(True, TypeError, id="bool"), + pytest.param(42, TypeError, id="int"), + pytest.param(None, TypeError, id="None"), + pytest.param(("a", "b"), TypeError, id="tuple"), + pytest.param([], ValueError, id="empty_list"), + pytest.param(["echo", 123], TypeError, id="mixed_list"), + pytest.param(["echo", "Hello world!"], None, id="valid_list"), + ], ) - assert mock_create_pod.call_count == 1 - - containers = mock_create_pod.call_args.kwargs["pod"].spec.containers - assert len(containers) == 1 - - expected_command = expected - expected_args = [] - if args_only: - expected_args = expected_command - expected_command = [] - - assert containers[0].command == expected_command - assert containers[0].args == expected_args - - -@pytest.mark.parametrize( - "func_return, exception", - [ - ("string", TypeError), - (True, TypeError), - (42, TypeError), - (None, TypeError), - (("a", "b"), TypeError), - ([], ValueError), - (["echo", 123], TypeError), - (["echo", "Hello world!"], None), - ], - ids=[ - "iterable_str", - "bool", - "int", - "None", - "tuple", - "empty_list", - "mixed_list", - "valid_list", - ], -) -def test_kubernetes_cmd_wrong_cmd( - dag_maker, - session, - func_return, - exception, - mock_create_pod: mock.Mock, - mock_hook: mock.Mock, -) -> None: - """Test that @task.kubernetes_cmd raises an error if the python_callable returns an invalid value.""" - with dag_maker(session=session) as dag: - - @task.kubernetes_cmd( - image="python:3.10-slim-buster", + def test_kubernetes_cmd_wrong_cmd( + self, + func_return, + exception, + ): + """ + Test that @task.kubernetes_cmd raises an error if the python_callable returns + an invalid value. + """ + with self.dag: + + @task.kubernetes_cmd( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + namespace="default", + ) + def hello(): + return func_return + + k8s_task = hello() + + context_manager = pytest.raises(exception) if exception else contextlib.nullcontext() + with context_manager: + self.execute_task(k8s_task) + + def test_kubernetes_cmd_with_input_output(self): + """Verify @task.kubernetes_cmd will run XCom container if do_xcom_push is set.""" + with self.dag: + + @task.kubernetes_cmd( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + namespace="default", + ) + def f(arg1: str, arg2: str, kwarg1: str | None = None, kwarg2: str | None = None): + return [ + "echo", + f"arg1={arg1}", + f"arg2={arg2}", + f"kwarg1={kwarg1}", + f"kwarg2={kwarg2}", + ] + + k8s_task = f.override(task_id="my_task_id", do_xcom_push=True)("arg1", "arg2", kwarg1="kwarg1") + + self.mock_hook.return_value.get_xcom_sidecar_container_image.return_value = XCOM_IMAGE + self.mock_hook.return_value.get_xcom_sidecar_container_resources.return_value = { + "requests": {"cpu": "1m", "memory": "10Mi"}, + "limits": {"cpu": "1m", "memory": "50Mi"}, + } + self.execute_task(k8s_task) + + self.mock_hook.assert_called_once_with( + conn_id="kubernetes_default", in_cluster=False, cluster_context="default", config_file="/tmp/fake_file", - namespace="default", ) - def hello(): - return func_return + assert self.mock_create_pod.call_count == 1 + assert self.mock_hook.return_value.get_xcom_sidecar_container_image.call_count == 1 + assert self.mock_hook.return_value.get_xcom_sidecar_container_resources.call_count == 1 - hello() + containers = self.mock_create_pod.call_args.kwargs["pod"].spec.containers - dr = dag_maker.create_dagrun() - (ti,) = dr.task_instances - session.add(ti) - session.commit() + # First container is main one with command + assert len(containers) == 2 + assert containers[0].command == ["echo", "arg1=arg1", "arg2=arg2", "kwarg1=kwarg1", "kwarg2=None"] + assert len(containers[0].args) == 0 - context_manager = pytest.raises(exception) if exception else contextlib.nullcontext() - with context_manager: - dag.get_task("hello").execute(context=ti.get_template_context(session=session)) + # Second container is xcom image + assert containers[1].image == XCOM_IMAGE + assert containers[1].volume_mounts[0].mount_path == "/airflow/xcom" - -def test_kubernetes_cmd_with_input_output( - dag_maker, session, mock_create_pod: mock.Mock, mock_hook: mock.Mock -) -> None: - """Verify @task.kubernetes_cmd will run XCom container if do_xcom_push is set.""" - with dag_maker(session=session) as dag: - - @task.kubernetes_cmd( - image="python:3.10-slim-buster", - in_cluster=False, - cluster_context="default", - config_file="/tmp/fake_file", - namespace="default", - ) - def f(arg1, arg2, kwarg1=None, kwarg2=None): - return [ - "echo", - f"arg1={arg1}", - f"arg2={arg2}", - f"kwarg1={kwarg1}", - f"kwarg2={kwarg2}", - ] - - f.override(task_id="my_task_id", do_xcom_push=True)("arg1", "arg2", kwarg1="kwarg1") - - mock_hook.return_value.get_xcom_sidecar_container_image.return_value = XCOM_IMAGE - mock_hook.return_value.get_xcom_sidecar_container_resources.return_value = { - "requests": {"cpu": "1m", "memory": "10Mi"}, - "limits": {"cpu": "1m", "memory": "50Mi"}, - } - - dr = dag_maker.create_dagrun() - (ti,) = dr.task_instances - session.add(dr) - session.commit() - dag.get_task("my_task_id").execute(context=ti.get_template_context(session=session)) - - mock_hook.assert_called_once_with( - conn_id="kubernetes_default", - in_cluster=False, - cluster_context="default", - config_file="/tmp/fake_file", + @pytest.mark.parametrize( + "cmds", + [None, ["ignored_cmd"], "ignored_cmd"], ) - assert mock_create_pod.call_count == 1 - assert mock_hook.return_value.get_xcom_sidecar_container_image.call_count == 1 - assert mock_hook.return_value.get_xcom_sidecar_container_resources.call_count == 1 - - containers = mock_create_pod.call_args.kwargs["pod"].spec.containers - - # First container is main one with command - assert len(containers) == 2 - assert containers[0].command == ["echo", "arg1=arg1", "arg2=arg2", "kwarg1=kwarg1", "kwarg2=None"] - assert len(containers[0].args) == 0 - - # Second container is xcom image - assert containers[1].image == XCOM_IMAGE - assert containers[1].volume_mounts[0].mount_path == "/airflow/xcom" - - -@pytest.mark.parametrize( - argnames=["cmds", "arguments"], - argvalues=[ - pytest.param( - ["ignored_cmd"], - None, - id="cmd-only", - ), - pytest.param( - None, - ["ignored_arg"], - id="arg-only", - ), - pytest.param( - ["ignored_cmd"], - ["ignored_arg"], - id="cmd-and-arg", - ), - pytest.param( - None, - None, - id="none", - ), - ], -) -def test_ignored_decorator_parameters_warning( - dag_maker, - session, - cmds: list[str] | None, - arguments: list[str] | None, - mock_create_pod: mock.Mock, - mock_hook: mock.Mock, -) -> None: - """Verify setting `cmds` or `arguments` for a @task.kubernetes_cmd function is ignored.""" - print("\nIN TEST") - print(cmds, arguments) - with dag_maker(session=session) as dag: - - @task.kubernetes_cmd( - image="python:3.10-slim-buster", - in_cluster=False, - cluster_context="default", - config_file="/tmp/fake_file", - namespace="default", - cmds=cmds, - arguments=arguments, - ) - def hello(): - return ["echo", "Hello world!"] - - hello_task = hello() - - context_manager = pytest.warns(UserWarning) - if cmds is None and arguments is None: - context_manager = contextlib.nullcontext() - with pytest.warns(UserWarning): - dr = dag_maker.create_dagrun() - (ti,) = dr.task_instances - session.add(ti) - session.commit() - dag.get_task("hello").execute(context=ti.get_template_context(session=session)) - - -@pytest.mark.parametrize( - "cmds", - [None, ["ignored_cmd"], "ignored_cmd"], -) -@pytest.mark.parametrize( - "arguments", - [None, ["ignored_arg"], "ignored_arg"], -) -@pytest.mark.parametrize( - "args_only", - [True, False], -) -def test_ignored_decorator_parameters( - dag_maker, - session, - cmds: list[str] | None, - arguments: list[str] | None, - args_only: bool, - mock_create_pod: mock.Mock, - mock_hook: mock.Mock, -) -> None: - """Verify setting `cmds` or `arguments` for a @task.kubernetes_cmd function is ignored.""" - - expected = ["func", "return"] - with dag_maker(session=session) as dag: - - @task.kubernetes_cmd( - image="python:3.10-slim-buster", - in_cluster=False, - cluster_context="default", - config_file="/tmp/fake_file", - namespace="default", - cmds=cmds, - arguments=arguments, - args_only=args_only, - ) - def hello(): - return expected - - hello_task = hello() + @pytest.mark.parametrize( + "arguments", + [None, ["ignored_arg"], "ignored_arg"], + ) + @pytest.mark.parametrize( + "args_only", + [True, False], + ) + def test_ignored_decorator_parameters( + self, + cmds: list[str] | None, + arguments: list[str] | None, + args_only: bool, + ) -> None: + """ + Test setting `cmds` or `arguments` from decorator does not affect the operator. + And the warning is shown only if `cmds` or `arguments` are not None. + """ + context_manager = pytest.warns(UserWarning, match="The `cmds` and `arguments` are unused") + # Don't warn if both `cmds` and `arguments` are None + if cmds is None and arguments is None: + context_manager = contextlib.nullcontext() # type: ignore + + expected = ["func", "return"] + with self.dag: + # We need to suppress the warning about `cmds` and `arguments` being unused + with context_manager: + + @task.kubernetes_cmd( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + namespace="default", + cmds=cmds, + arguments=arguments, + args_only=args_only, + ) + def hello(): + return expected + + hello_task = hello() assert hello_task.operator.cmds == [] assert hello_task.operator.arguments == [] - with warnings.catch_warnings(): - warnings.simplefilter("error", category=UserWarning) - dr = dag_maker.create_dagrun() - (ti,) = dr.task_instances - session.add(ti) - session.commit() - dag.get_task("hello").execute(context=ti.get_template_context(session=session)) - - containers = mock_create_pod.call_args.kwargs["pod"].spec.containers - assert len(containers) == 1 - - expected_command = expected - expected_args = [] - if args_only: - expected_args = expected_command - expected_command = [] - assert containers[0].command == expected_command - assert containers[0].args == expected_args - - -@pytest.mark.parametrize( - argnames=["command", "op_arg", "expected_command"], - argvalues=[ - pytest.param( - ["echo", "hello"], - "world", - ["echo", "hello", "world"], - id="not_templated", - ), - pytest.param( - ["echo", "{{ ti.task_id }}"], - "{{ ti.dag_id }}", - ["echo", "hello", "test_dag"], - id="templated" - ), - ], -) -def test_rendering_kubernetes_cmd( - dag_maker, - session, - command: list[str], - op_arg: str, - expected_command: list[str], - mock_create_pod: mock.Mock, - mock_hook: mock.Mock, -) -> None: - with dag_maker(session=session) as dag: - - @task.kubernetes_cmd( - image="python:3.10-slim-buster", + self.execute_task(hello_task) + containers = self.mock_create_pod.call_args.kwargs["pod"].spec.containers + assert len(containers) == 1 + + expected_command = expected + expected_args = [] + if args_only: + expected_args = expected_command + expected_command = [] + assert containers[0].command == expected_command + assert containers[0].args == expected_args + + @pytest.mark.parametrize( + argnames=["command", "op_arg", "expected_command"], + argvalues=[ + pytest.param( + ["echo", "hello"], + "world", + ["echo", "hello", "world"], + id="not_templated", + ), + pytest.param( + ["echo", "{{ ti.task_id }}"], "{{ ti.dag_id }}", ["echo", "hello", DAG_ID], id="templated" + ), + ], + ) + def test_rendering_kubernetes_cmd( + self, + command: list[str], + op_arg: str, + expected_command: list[str], + ): + """Test that templating works in function arguments""" + with self.dag: + + @task.kubernetes_cmd( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + namespace="default", + ) + def hello(add_to_command: str): + print("ADDING TO COMMAND", add_to_command) + return command + [add_to_command] + + hello_task = hello(op_arg) + + self.execute_task(hello_task) + + self.mock_hook.assert_called_once_with( + conn_id="kubernetes_default", in_cluster=False, cluster_context="default", config_file="/tmp/fake_file", - namespace="default", ) - def hello(add_to_command: str): - return command + [add_to_command] - - hello(op_arg) - - dr = dag_maker.create_dagrun() - (ti,) = dr.task_instances - session.add(ti) - session.commit() - dag.get_task("hello").execute(context=ti.get_template_context(session=session)) - - mock_hook.assert_called_once_with( - conn_id="kubernetes_default", - in_cluster=False, - cluster_context="default", - config_file="/tmp/fake_file", - ) - containers = mock_create_pod.call_args.kwargs["pod"].spec.containers - assert len(containers) == 1 + containers = self.mock_create_pod.call_args.kwargs["pod"].spec.containers + assert len(containers) == 1 - assert containers[0].command == expected_command + assert containers[0].command == expected_command + assert containers[0].args == [] diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py index 13671d8eaf4e5..cc5b4158bbbaf 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py @@ -22,11 +22,13 @@ import pytest from airflow.decorators import setup, task, teardown +from airflow.utils import timezone -pytestmark = pytest.mark.db_test - +from tests_common.test_utils.db import clear_db_dags, clear_db_runs, clear_rendered_ti_fields TASK_FUNCTION_NAME_ID = "task_function_name" +DEFAULT_DATE = timezone.datetime(2023, 1, 1) +DAG_ID = "k8s_deco_test_dag" def _kubernetes_func(): @@ -65,161 +67,208 @@ def task_function_name(): return task_function_name -@pytest.mark.parametrize( - "task_decorator,decorator_name", - [ - (task.kubernetes, "kubernetes"), - (task.kubernetes_cmd, "kubernetes_cmd"), - ], - ids=["kubernetes", "kubernetes_cmd"], -) -def test_decorators_with_marked_as_setup( - task_decorator, - decorator_name, - dag_maker, - session, - mock_create_pod: mock.Mock, - mock_hook: mock.Mock, -) -> None: - with dag_maker(session=session) as dag: - task_function_name = setup(_prepare_task(task_decorator, decorator_name)) - task_function_name() - - assert len(dag.task_group.children) == 1 - setup_task = dag.task_group.children[TASK_FUNCTION_NAME_ID] - assert setup_task.is_setup - - -@pytest.mark.parametrize( - "task_decorator,decorator_name", - [ - (task.kubernetes, "kubernetes"), - (task.kubernetes_cmd, "kubernetes_cmd"), - ], - ids=["kubernetes", "kubernetes_cmd"], -) -def test_decorators_with_marked_as_teardown( - task_decorator, - decorator_name, - dag_maker, - session, - mock_create_pod: mock.Mock, - mock_hook: mock.Mock, -) -> None: - with dag_maker(session=session) as dag: - task_function_name = teardown(_prepare_task(task_decorator, decorator_name)) - task_function_name() - - assert len(dag.task_group.children) == 1 - teardown_task = dag.task_group.children[TASK_FUNCTION_NAME_ID] - assert teardown_task.is_teardown - - -@pytest.mark.parametrize( - "name", - ["no_name_in_args", None, "test_task_name"], - ids=["no_name_in_args", "name_set_to_None", "with_name"], -) -@pytest.mark.parametrize( - "random_name_suffix", - [True, False], - ids=["rand_suffix", "no_rand_suffix"], -) -@pytest.mark.parametrize( - "task_decorator,decorator_name", - [ - (task.kubernetes, "kubernetes"), - (task.kubernetes_cmd, "kubernetes_cmd"), - ], - ids=["kubernetes", "kubernetes_cmd"], -) -def test_pod_naming( - task_decorator, - decorator_name, - dag_maker, - session, - mock_create_pod: mock.Mock, - name: str | None, - random_name_suffix: bool, -) -> None: - """ - Idea behind this test is to check naming conventions are respected in various - decorator arguments combinations scenarios. - - @task.kubernetes[_cmd] differs from KubernetesPodOperator in a way that it distinguishes - between no name argument was provided and name was set to None. - In the first case, the operator name is generated from the python_callable name, - in the second case default KubernetesPodOperator behavior is preserved. - """ - extra_kwargs = {"name": name} - if name == "no_name_in_args": - extra_kwargs.pop("name") - - decorator_kwargs = { - "random_name_suffix": random_name_suffix, - "namespace": "default", - **extra_kwargs, - } - - with dag_maker(session=session) as dag: - task_function_name = _prepare_task( - task_decorator, - decorator_name, - **decorator_kwargs, - ) +KPO_MODULE = "airflow.providers.cncf.kubernetes.operators.pod" +POD_MANAGER_CLASS = "airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager" +HOOK_CLASS = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesHook" - task_function_name() - - dr = dag_maker.create_dagrun() - (ti,) = dr.task_instances - session.add(ti) - session.commit() - - task_id = TASK_FUNCTION_NAME_ID - op = dag.get_task(task_id) - if name is not None: - assert isinstance(op.name, str) - - # If name was explicitly set to None, we expect the operator name to be None - if name is None: - assert op.name is None - # If name was not provided in decorator, it would be generated: - # f"k8s-airflow-pod-{python_callable.__name__}" - elif name == "no_name_in_args": - assert op.name == f"k8s-airflow-pod-{task_id}" - # Otherwise, we expect the name to be exactly the same as provided - else: - assert op.name == name - - op.execute(context=ti.get_template_context(session=session)) - pod_meta = mock_create_pod.call_args.kwargs["pod"].metadata - assert isinstance(pod_meta.name, str) - - # After execution pod names should not contain underscores - task_id_normalized = task_id.replace("_", "-") - - def check_op_name(name_arg: str | None) -> str: - if name_arg is None: - assert op.name is None - return task_id_normalized - assert isinstance(op.name, str) - if name_arg == "no_name_in_args": - generated_name = f"k8s-airflow-pod-{task_id_normalized}" - assert op.name == generated_name - return generated_name +@pytest.mark.db_test +class TestKubernetesDecoratorsBase: + @pytest.fixture(autouse=True) + def setup(self, dag_maker): + self.dag_maker = dag_maker + + with dag_maker(dag_id=DAG_ID) as dag: + ... + + self.dag = dag - normalized_name = name_arg.replace("_", "-") - assert op.name == normalized_name + self.mock_create_pod = mock.patch(f"{POD_MANAGER_CLASS}.create_pod").start() + self.mock_await_pod_start = mock.patch(f"{POD_MANAGER_CLASS}.await_pod_start").start() + self.mock_await_xcom_sidecar_container_start = mock.patch( + f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start" + ).start() - return normalized_name + self.mock_extract_xcom = mock.patch(f"{POD_MANAGER_CLASS}.extract_xcom").start() + self.mock_extract_xcom.return_value = '{"key1": "value1", "key2": "value2"}' - def check_pod_name(name_base: str): - if random_name_suffix: - assert pod_meta.name.startswith(f"{name_base}") - assert pod_meta.name != name_base + self.mock_await_pod_completion = mock.patch(f"{POD_MANAGER_CLASS}.await_pod_completion").start() + self.mock_await_pod_completion.return_value = mock.MagicMock(**{"status.phase": "Succeeded"}) + self.mock_hook = mock.patch(HOOK_CLASS).start() + + # Without this patch each time pod manager would try to extract logs from the pod + # and log an error about it's inability to get containers for the log + # {pod_manager.py:572} ERROR - Could not retrieve containers for the pod: ... + self.mock_fetch_logs = mock.patch(f"{POD_MANAGER_CLASS}.fetch_requested_container_logs").start() + self.mock_fetch_logs.return_value = "logs" + + def teardown_method(self): + clear_db_runs() + clear_db_dags() + clear_rendered_ti_fields() + + def execute_task(self, task): + session = self.dag_maker.session + dag_run = self.dag_maker.create_dagrun( + run_id=f"k8s_decorator_test_{DEFAULT_DATE.date()}", session=session + ) + ti = dag_run.get_task_instance(task.operator.task_id, session=session) + return_val = task.operator.execute(context=ti.get_template_context(session=session)) + + return ti, return_val + + +def parametrize_kubernetes_decorators_commons(cls): + for name, method in cls.__dict__.items(): + if not name.startswith("test_") or not callable(method): + continue + new_method = pytest.mark.parametrize( + "task_decorator,decorator_name", + [ + (task.kubernetes, "kubernetes"), + (task.kubernetes_cmd, "kubernetes_cmd"), + ], + ids=["kubernetes", "kubernetes_cmd"], + )(method) + setattr(cls, name, new_method) + + return cls + + +@parametrize_kubernetes_decorators_commons +class TestKubernetesDecoratorsCommons(TestKubernetesDecoratorsBase): + def test_k8s_decorator_init(self, task_decorator, decorator_name): + """Test the initialization of the @task.kubernetes[_cmd] decorated task.""" + + with self.dag: + + @task_decorator( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + ) + def k8s_task_function() -> list[str]: + return ["return", "value"] + + k8s_task = k8s_task_function() + + assert k8s_task.operator.task_id == "k8s_task_function" + assert k8s_task.operator.image == "python:3.10-slim-buster" + + expected_cmds = ["placeholder-command"] if decorator_name == "kubernetes" else [] + assert k8s_task.operator.cmds == expected_cmds + assert k8s_task.operator.random_name_suffix is True + + def test_decorators_with_marked_as_setup(self, task_decorator, decorator_name): + """Test the @task.kubernetes[_cmd] decorated task works with setup decorator.""" + with self.dag: + task_function_name = setup(_prepare_task(task_decorator, decorator_name)) + task_function_name() + + assert len(self.dag.task_group.children) == 1 + setup_task = self.dag.task_group.children[TASK_FUNCTION_NAME_ID] + assert setup_task.is_setup + + def test_decorators_with_marked_as_teardown(self, task_decorator, decorator_name): + """Test the @task.kubernetes[_cmd] decorated task works with teardown decorator.""" + with self.dag: + task_function_name = teardown(_prepare_task(task_decorator, decorator_name)) + task_function_name() + + assert len(self.dag.task_group.children) == 1 + teardown_task = self.dag.task_group.children[TASK_FUNCTION_NAME_ID] + assert teardown_task.is_teardown + + @pytest.mark.parametrize( + "name", + ["no_name_in_args", None, "test_task_name"], + ids=["no_name_in_args", "name_set_to_None", "with_name"], + ) + @pytest.mark.parametrize( + "random_name_suffix", + [True, False], + ids=["rand_suffix", "no_rand_suffix"], + ) + def test_pod_naming( + self, + task_decorator, + decorator_name, + name: str | None, + random_name_suffix: bool, + ) -> None: + """ + Idea behind this test is to check naming conventions are respected in various + decorator arguments combinations scenarios. + + @task.kubernetes[_cmd] differs from KubernetesPodOperator in a way that it distinguishes + between no name argument was provided and name was set to None. + In the first case, the operator name is generated from the python_callable name, + in the second case default KubernetesPodOperator behavior is preserved. + """ + extra_kwargs = {"name": name} + if name == "no_name_in_args": + extra_kwargs.pop("name") + + decorator_kwargs = { + "random_name_suffix": random_name_suffix, + "namespace": "default", + **extra_kwargs, + } + + with self.dag: + task_function_name = _prepare_task( + task_decorator, + decorator_name, + **decorator_kwargs, + ) + + k8s_task = task_function_name() + + task_id = TASK_FUNCTION_NAME_ID + op = self.dag.get_task(task_id) + if name is not None: + assert isinstance(op.name, str) + + # If name was explicitly set to None, we expect the operator name to be None + if name is None: + assert op.name is None + # If name was not provided in decorator, it would be generated: + # f"k8s-airflow-pod-{python_callable.__name__}" + elif name == "no_name_in_args": + assert op.name == f"k8s-airflow-pod-{task_id}" + # Otherwise, we expect the name to be exactly the same as provided else: - assert pod_meta.name == name_base + assert op.name == name + + self.execute_task(k8s_task) + pod_meta = self.mock_create_pod.call_args.kwargs["pod"].metadata + assert isinstance(pod_meta.name, str) + + # After execution pod names should not contain underscores + task_id_normalized = task_id.replace("_", "-") + + def check_op_name(name_arg: str | None) -> str: + if name_arg is None: + assert op.name is None + return task_id_normalized + + assert isinstance(op.name, str) + if name_arg == "no_name_in_args": + generated_name = f"k8s-airflow-pod-{task_id_normalized}" + assert op.name == generated_name + return generated_name + + normalized_name = name_arg.replace("_", "-") + assert op.name == normalized_name + + return normalized_name + + def check_pod_name(name_base: str): + if random_name_suffix: + assert pod_meta.name.startswith(f"{name_base}") + assert pod_meta.name != name_base + else: + assert pod_meta.name == name_base - pod_name = check_op_name(name) - check_pod_name(pod_name) + pod_name = check_op_name(name) + check_pod_name(pod_name) From 6e6344fda520d5b5006976cf97822e2c45c2a044 Mon Sep 17 00:00:00 2001 From: insomnes Date: Sun, 2 Mar 2025 19:46:51 +0100 Subject: [PATCH 08/21] Update tests and docs --- providers/cncf/kubernetes/docs/operators.rst | 14 +++++ .../example_kubernetes_cmd_decorator.py | 7 ++- .../decorators/test_kubernetes_cmd.py | 55 ++++++++++++++++++- 3 files changed, 71 insertions(+), 5 deletions(-) diff --git a/providers/cncf/kubernetes/docs/operators.rst b/providers/cncf/kubernetes/docs/operators.rst index 7188da11f2061..1782f3f0ab3d5 100644 --- a/providers/cncf/kubernetes/docs/operators.rst +++ b/providers/cncf/kubernetes/docs/operators.rst @@ -182,6 +182,20 @@ Also for this action you can use operator in the deferrable mode: :start-after: [START howto_operator_k8s_write_xcom_async] :end-before: [END howto_operator_k8s_write_xcom_async] + +Run command in KubernetesPodOperator from a task flow +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +With the usage of the ``@task.kubernetes_cmd`` decorator, you can run a command returned by a function +in a ``KubernetesPodOperator`` simplifying it's connection to the task flow. + +See the following example on how this works: + +.. exampleinclude:: /../../providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py + :language: python + :start-after: [START howto_decorator_kubernetes_cmd] + :end-before: [END howto_decorator_kubernetes_cmd] + + Include error message in email alert ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py b/providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py index 6e4abf574593a..45de1cb557120 100644 --- a/providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py +++ b/providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py @@ -53,10 +53,11 @@ def execute_in_k8s_pod_full_cmd(foo_result: str, bar_result: str) -> list[str]: def execute_in_k8s_pod_args_only(foo_result: str, bar_result: str) -> list[str]: return ["-c", f"echo -e 'With args only:\\t{foo_result}\\t{bar_result}'"] - # Templating can be used in the returned command - @task.kubernetes_cmd(image="bash:5.2", name="full_cmd", in_cluster=False) + # Templating can be used in the returned command and all other templated fields in + # the decorator parameters. + @task.kubernetes_cmd(image="bash:5.2", name="my-pod-{{ ti.task_id }}", in_cluster=False) def apply_templating(message: str) -> list[str]: - full_message = "Templated task_id: {{ task.task_id }}, dag_id: " + message + full_message = "Templated task_id: {{ ti.task_id }}, dag_id: " + message return ["echo", full_message] foo_result = foo() diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py index 96568ec795269..d8d6b78cf9636 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py @@ -21,12 +21,13 @@ import pytest from airflow.decorators import task +from airflow.exceptions import AirflowSkipException from unit.cncf.kubernetes.decorators.test_kubernetes_commons import DAG_ID, TestKubernetesDecoratorsBase XCOM_IMAGE = "XCOM_IMAGE" -class TestKubernetesDecorator(TestKubernetesDecoratorsBase): +class TestKubernetesCmdDecorator(TestKubernetesDecoratorsBase): @pytest.mark.parametrize( "args_only", [True, False], @@ -254,7 +255,6 @@ def test_rendering_kubernetes_cmd( namespace="default", ) def hello(add_to_command: str): - print("ADDING TO COMMAND", add_to_command) return command + [add_to_command] hello_task = hello(op_arg) @@ -272,3 +272,54 @@ def hello(add_to_command: str): assert containers[0].command == expected_command assert containers[0].args == [] + + def test_rendering_kubernetes_cmd_decorator_params(self): + """Test that templating works in decorator parameters""" + with self.dag: + + @task.kubernetes_cmd( + image="python:{{ dag.dag_id }}", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + namespace="default", + kubernetes_conn_id="kubernetes_{{ dag.dag_id }}", + ) + def hello(): + return ["echo", "Hello world!"] + + hello_task = hello() + + self.execute_task(hello_task) + + self.mock_hook.assert_called_once_with( + conn_id="kubernetes_" + DAG_ID, + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + ) + containers = self.mock_create_pod.call_args.kwargs["pod"].spec.containers + assert len(containers) == 1 + + assert containers[0].image == f"python:{DAG_ID}" + + def test_airflow_skip(self): + """Test that the operator is skipped if the task is skipped""" + with self.dag: + + @task.kubernetes_cmd( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + namespace="default", + ) + def hello(): + raise AirflowSkipException("This task should be skipped") + + hello_task = hello() + + with pytest.raises(AirflowSkipException): + self.execute_task(hello_task) + self.mock_hook.assert_not_called() + self.mock_create_pod.assert_not_called() From 9d79679b0484140ef54a44fd1fc7891e8b0bf4d8 Mon Sep 17 00:00:00 2001 From: insomnes Date: Mon, 3 Mar 2025 16:03:33 +0100 Subject: [PATCH 09/21] Add extra context tests --- .../decorators/test_kubernetes_cmd.py | 62 ++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py index d8d6b78cf9636..f31e7641ad972 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py @@ -244,7 +244,7 @@ def test_rendering_kubernetes_cmd( op_arg: str, expected_command: list[str], ): - """Test that templating works in function arguments""" + """Test that templating works in function return value""" with self.dag: @task.kubernetes_cmd( @@ -273,6 +273,66 @@ def hello(add_to_command: str): assert containers[0].command == expected_command assert containers[0].args == [] + def test_basic_context_works(self): + """Test that decorator works with context as kwargs unpcacked in function arguments""" + with self.dag: + + @task.kubernetes_cmd( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + namespace="default", + ) + def hello(**context): + return ["echo", context["ti"].task_id, context["dag_run"].dag_id] + + hello_task = hello() + + self.execute_task(hello_task) + + self.mock_hook.assert_called_once_with( + conn_id="kubernetes_default", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + ) + containers = self.mock_create_pod.call_args.kwargs["pod"].spec.containers + assert len(containers) == 1 + + assert containers[0].command == ["echo", "hello", DAG_ID] + assert containers[0].args == [] + + def test_named_context_variables(self): + """Test that decorator works with specific context variable as kwargs in function arguments""" + with self.dag: + + @task.kubernetes_cmd( + image="python:3.10-slim-buster", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + namespace="default", + ) + def hello(ti=None, dag_run=None): + return ["echo", ti.task_id, dag_run.dag_id] + + hello_task = hello() + + self.execute_task(hello_task) + + self.mock_hook.assert_called_once_with( + conn_id="kubernetes_default", + in_cluster=False, + cluster_context="default", + config_file="/tmp/fake_file", + ) + containers = self.mock_create_pod.call_args.kwargs["pod"].spec.containers + assert len(containers) == 1 + + assert containers[0].command == ["echo", "hello", DAG_ID] + assert containers[0].args == [] + def test_rendering_kubernetes_cmd_decorator_params(self): """Test that templating works in decorator parameters""" with self.dag: From ae24a5e09ece711df3859cc48fa81cd69bfaae9e Mon Sep 17 00:00:00 2001 From: insomnes Date: Tue, 25 Mar 2025 19:53:42 +0100 Subject: [PATCH 10/21] refactor: use render_templates from ti and overwrite_rtif_after_execution --- .../providers/cncf/kubernetes/decorators/kubernetes_cmd.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py index aca77e6021cb5..9da7e1e1856ad 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py @@ -22,7 +22,6 @@ from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator -from airflow.providers.cncf.kubernetes.template_rendering import refresh_rendered_fields from airflow.utils.context import context_merge from airflow.utils.operator_helpers import determine_kwargs @@ -38,6 +37,7 @@ class _KubernetesCmdDecoratedOperator(DecoratedOperator, KubernetesPodOperator): custom_operator_name = "@task.kubernetes_cmd" template_fields: Sequence[str] = KubernetesPodOperator.template_fields + overwrite_rtif_after_execution: bool = True def __init__(self, *, python_callable: Callable, args_only: bool = False, **kwargs) -> None: self.args_only = args_only @@ -76,7 +76,7 @@ def execute(self, context: Context): else: self.cmds = generated self.arguments = [] - refresh_rendered_fields(context["task_instance"]) + context["ti"].render_templates() # type: ignore[attr-defined] return super().execute(context) def _generate_cmds(self, context: Context) -> list[str]: From a591c389a51d591fe2c68ddcee42f5865970c9d6 Mon Sep 17 00:00:00 2001 From: insomnes Date: Tue, 25 Mar 2025 22:14:36 +0100 Subject: [PATCH 11/21] style: run ruff on test files for static checks --- .../tests/unit/cncf/kubernetes/decorators/test_kubernetes.py | 1 + .../tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py | 1 + 2 files changed, 2 insertions(+) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py index 7d6753cea8351..a371f40f2b5cc 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py @@ -20,6 +20,7 @@ import pickle from airflow.decorators import task + from unit.cncf.kubernetes.decorators.test_kubernetes_commons import TestKubernetesDecoratorsBase XCOM_IMAGE = "XCOM_IMAGE" diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py index f31e7641ad972..80812d9d0b60f 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py @@ -22,6 +22,7 @@ from airflow.decorators import task from airflow.exceptions import AirflowSkipException + from unit.cncf.kubernetes.decorators.test_kubernetes_commons import DAG_ID, TestKubernetesDecoratorsBase XCOM_IMAGE = "XCOM_IMAGE" From 78f2c4b0d128dd20751144a1bb8584d03cdb794a Mon Sep 17 00:00:00 2001 From: insomnes Date: Tue, 25 Mar 2025 22:59:36 +0100 Subject: [PATCH 12/21] fix: add @overload to kubernetes decorator This should fix docstring and type-checking for decorator --- task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi | 3 +++ 1 file changed, 3 insertions(+) diff --git a/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi b/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi index f3ebe6cecf427..4614d764db07f 100644 --- a/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi +++ b/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi @@ -495,6 +495,7 @@ class TaskDecoratorCollection: or a list of names of labels to set with empty values (e.g. ``["label1", "label2"]``) """ # [END decorator_signature] + @overload def kubernetes( self, *, @@ -667,6 +668,8 @@ class TaskDecoratorCollection: :param progress_callback: Callback function for receiving k8s container logs. """ @overload + def kubernetes(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ... + @overload def sensor( # type: ignore[misc] self, *, From a1a54187a131b4c89584bcc86072b117d5e492b3 Mon Sep 17 00:00:00 2001 From: insomnes Date: Tue, 25 Mar 2025 23:10:15 +0100 Subject: [PATCH 13/21] feat: add parameters description and doc-string inside pyi --- .../sdk/definitions/decorators/__init__.pyi | 170 ++++++++++++++++++ 1 file changed, 170 insertions(+) diff --git a/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi b/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi index 4614d764db07f..7d98cc7baf9cc 100644 --- a/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi +++ b/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi @@ -670,6 +670,176 @@ class TaskDecoratorCollection: @overload def kubernetes(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ... @overload + def kubernetes_cmd( + self, + *, + args_only: bool = False, # Added by _KubernetesCmdDecoratedOperator. + # 'cmds' filled by _KubernetesCmdDecoratedOperator. + # 'arguments' filled by _KubernetesCmdDecoratedOperator. + kubernetes_conn_id: str | None = ..., + namespace: str | None = None, + image: str | None = None, + name: str | None = None, + random_name_suffix: bool = ..., + ports: list[k8s.V1ContainerPort] | None = None, + volume_mounts: list[k8s.V1VolumeMount] | None = None, + volumes: list[k8s.V1Volume] | None = None, + env_vars: list[k8s.V1EnvVar] | dict[str, str] | None = None, + env_from: list[k8s.V1EnvFromSource] | None = None, + secrets: list[Secret] | None = None, + in_cluster: bool | None = None, + cluster_context: str | None = None, + labels: dict | None = None, + reattach_on_restart: bool = ..., + startup_timeout_seconds: int = ..., + startup_check_interval_seconds: int = ..., + get_logs: bool = True, + container_logs: Iterable[str] | str | Literal[True] = ..., + image_pull_policy: str | None = None, + annotations: dict | None = None, + container_resources: k8s.V1ResourceRequirements | None = None, + affinity: k8s.V1Affinity | None = None, + config_file: str | None = None, + node_selector: dict | None = None, + image_pull_secrets: list[k8s.V1LocalObjectReference] | None = None, + service_account_name: str | None = None, + hostnetwork: bool = False, + host_aliases: list[k8s.V1HostAlias] | None = None, + tolerations: list[k8s.V1Toleration] | None = None, + security_context: k8s.V1PodSecurityContext | dict | None = None, + container_security_context: k8s.V1SecurityContext | dict | None = None, + dnspolicy: str | None = None, + dns_config: k8s.V1PodDNSConfig | None = None, + hostname: str | None = None, + subdomain: str | None = None, + schedulername: str | None = None, + full_pod_spec: k8s.V1Pod | None = None, + init_containers: list[k8s.V1Container] | None = None, + log_events_on_failure: bool = False, + do_xcom_push: bool = False, + pod_template_file: str | None = None, + pod_template_dict: dict | None = None, + priority_class_name: str | None = None, + pod_runtime_info_envs: list[k8s.V1EnvVar] | None = None, + termination_grace_period: int | None = None, + configmaps: list[str] | None = None, + skip_on_exit_code: int | Container[int] | None = None, + base_container_name: str | None = None, + base_container_status_polling_interval: float = ..., + deferrable: bool = ..., + poll_interval: float = ..., + log_pod_spec_on_failure: bool = ..., + on_finish_action: str = ..., + termination_message_policy: str = ..., + active_deadline_seconds: int | None = None, + progress_callback: Callable[[str], None] | None = None, + **kwargs, + ) -> TaskDecorator: + """Create a decorator to convert a callable to a Kubernetes Pod task. + + :param args_only: If True, the decorated function should return a list arguments + to be passed to the entrypoint of the container image. Defaults to False. + :param kubernetes_conn_id: The Kubernetes cluster's + :ref:`connection ID `. + :param namespace: Namespace to run within Kubernetes. Defaults to *default*. + :param image: Docker image to launch. Defaults to *hub.docker.com*, but + a fully qualified URL will point to a custom repository. (templated) + :param name: Name of the pod to run. This will be used (plus a random + suffix if *random_name_suffix* is *True*) to generate a pod ID + (DNS-1123 subdomain, containing only ``[a-z0-9.-]``). Defaults to + ``k8s-airflow-pod-{python_callable.__name__}``. + :param random_name_suffix: If *True*, will generate a random suffix. + :param ports: Ports for the launched pod. + :param volume_mounts: *volumeMounts* for the launched pod. + :param volumes: Volumes for the launched pod. Includes *ConfigMaps* and + *PersistentVolumes*. + :param env_vars: Environment variables initialized in the container. + (templated) + :param env_from: List of sources to populate environment variables in + the container. + :param secrets: Kubernetes secrets to inject in the container. They can + be exposed as environment variables or files in a volume. + :param in_cluster: Run kubernetes client with *in_cluster* configuration. + :param cluster_context: Context that points to the Kubernetes cluster. + Ignored when *in_cluster* is *True*. If *None*, current-context will + be used. + :param reattach_on_restart: If the worker dies while the pod is running, + reattach and monitor during the next try. If *False*, always create + a new pod for each try. + :param labels: Labels to apply to the pod. (templated) + :param startup_timeout_seconds: Timeout in seconds to startup the pod. + :param startup_check_interval_seconds: interval in seconds to check if the pod has already started + :param get_logs: Get the stdout of the container as logs of the tasks. + :param container_logs: list of containers whose logs will be published to stdout + Takes a sequence of containers, a single container name or True. + If True, all the containers logs are published. Works in conjunction with ``get_logs`` param. + The default value is the base container. + :param image_pull_policy: Specify a policy to cache or always pull an + image. + :param annotations: Non-identifying metadata you can attach to the pod. + Can be a large range of data, and can include characters that are + not permitted by labels. + :param container_resources: Resources for the launched pod. + :param affinity: Affinity scheduling rules for the launched pod. + :param config_file: The path to the Kubernetes config file. If not + specified, default value is ``~/.kube/config``. (templated) + :param node_selector: A dict containing a group of scheduling rules. + :param image_pull_secrets: Any image pull secrets to be given to the + pod. If more than one secret is required, provide a comma separated + list, e.g. ``secret_a,secret_b``. + :param service_account_name: Name of the service account. + :param hostnetwork: If *True*, enable host networking on the pod. + :param host_aliases: A list of host aliases to apply to the containers in the pod. + :param tolerations: A list of Kubernetes tolerations. + :param security_context: Security options the pod should run with + (PodSecurityContext). + :param container_security_context: security options the container should run with. + :param dnspolicy: DNS policy for the pod. + :param dns_config: dns configuration (ip addresses, searches, options) for the pod. + :param hostname: hostname for the pod. + :param subdomain: subdomain for the pod. + :param schedulername: Specify a scheduler name for the pod + :param full_pod_spec: The complete podSpec + :param init_containers: Init containers for the launched pod. + :param log_events_on_failure: Log the pod's events if a failure occurs. + :param do_xcom_push: If *True*, the content of + ``/airflow/xcom/return.json`` in the container will also be pushed + to an XCom when the container completes. + :param pod_template_file: Path to pod template file (templated) + :param pod_template_dict: pod template dictionary (templated) + :param priority_class_name: Priority class name for the launched pod. + :param pod_runtime_info_envs: A list of environment variables + to be set in the container. + :param termination_grace_period: Termination grace period if task killed + in UI, defaults to kubernetes default + :param configmaps: A list of names of config maps from which it collects + ConfigMaps to populate the environment variables with. The contents + of the target ConfigMap's Data field will represent the key-value + pairs as environment variables. Extends env_from. + :param skip_on_exit_code: If task exits with this exit code, leave the task + in ``skipped`` state (default: None). If set to ``None``, any non-zero + exit code will be treated as a failure. + :param base_container_name: The name of the base container in the pod. This container's logs + will appear as part of this task's logs if get_logs is True. Defaults to None. If None, + will consult the class variable BASE_CONTAINER_NAME (which defaults to "base") for the base + container name to use. + :param base_container_status_polling_interval: Polling period in seconds to check for the pod base + container status. + :param deferrable: Run operator in the deferrable mode. + :param poll_interval: Polling period in seconds to check for the status. Used only in deferrable mode. + :param log_pod_spec_on_failure: Log the pod's specification if a failure occurs + :param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted. + If "delete_pod", the pod will be deleted regardless its state; if "delete_succeeded_pod", + only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod. + :param termination_message_policy: The termination message policy of the base container. + Default value is "File" + :param active_deadline_seconds: The active_deadline_seconds which matches to active_deadline_seconds + in V1PodSpec. + :param progress_callback: Callback function for receiving k8s container logs. + """ + @overload + def kubernetes_cmd(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ... + @overload def sensor( # type: ignore[misc] self, *, From 0ceb062708a4356a6cb16740ab15902ef4ceb46a Mon Sep 17 00:00:00 2001 From: insomnes Date: Tue, 25 Mar 2025 23:14:33 +0100 Subject: [PATCH 14/21] refactor: rename function to kubernetes_cmd_task --- providers/cncf/kubernetes/provider.yaml | 2 +- .../providers/cncf/kubernetes/decorators/kubernetes_cmd.py | 2 +- .../src/airflow/providers/cncf/kubernetes/get_provider_info.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/providers/cncf/kubernetes/provider.yaml b/providers/cncf/kubernetes/provider.yaml index 6c2d51fe20a7e..3df6ef515a29a 100644 --- a/providers/cncf/kubernetes/provider.yaml +++ b/providers/cncf/kubernetes/provider.yaml @@ -147,7 +147,7 @@ connection-types: task-decorators: - class-name: airflow.providers.cncf.kubernetes.decorators.kubernetes.kubernetes_task name: kubernetes - - class-name: airflow.providers.cncf.kubernetes.decorators.kubernetes_cmd.kubernetes_task_cmd + - class-name: airflow.providers.cncf.kubernetes.decorators.kubernetes_cmd.kubernetes_cmd_task name: kubernetes_cmd config: diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py index 9da7e1e1856ad..9de12288a47d3 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py @@ -96,7 +96,7 @@ def _generate_cmds(self, context: Context) -> list[str]: return generated_cmds -def kubernetes_task_cmd( +def kubernetes_cmd_task( python_callable: Callable | None = None, **kwargs, ) -> TaskDecorator: diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py index 9d690a1cb95cb..821f95cf614cb 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py @@ -87,7 +87,7 @@ def get_provider_info(): "name": "kubernetes", }, { - "class-name": "airflow.providers.cncf.kubernetes.decorators.kubernetes_cmd.kubernetes_task_cmd", + "class-name": "airflow.providers.cncf.kubernetes.decorators.kubernetes_cmd.kubernetes_cmd_task", "name": "kubernetes_cmd", }, ], From a5f95348c20274add4f99e15bde6fecc84f86c32 Mon Sep 17 00:00:00 2001 From: insomnes Date: Tue, 25 Mar 2025 23:17:39 +0100 Subject: [PATCH 15/21] docs: update deco doc-string in pyi --- task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi b/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi index 7d98cc7baf9cc..30e921f2f4881 100644 --- a/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi +++ b/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi @@ -735,7 +735,7 @@ class TaskDecoratorCollection: progress_callback: Callable[[str], None] | None = None, **kwargs, ) -> TaskDecorator: - """Create a decorator to convert a callable to a Kubernetes Pod task. + """Create a decorator to run a command returned by callable in a Kubernetes pod. :param args_only: If True, the decorated function should return a list arguments to be passed to the entrypoint of the container image. Defaults to False. From 44996983963b5f3bcd1829be71d3bcc85bc6d296 Mon Sep 17 00:00:00 2001 From: insomnes Date: Tue, 8 Apr 2025 10:20:28 +0200 Subject: [PATCH 16/21] fix: proper imports for `DAG` and task --- .../system/cncf/kubernetes/example_kubernetes_cmd_decorator.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py b/providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py index 45de1cb557120..3235dffe0f3c6 100644 --- a/providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py +++ b/providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py @@ -19,8 +19,7 @@ from datetime import datetime -from airflow import DAG -from airflow.decorators import task +from airflow.sdk import DAG, task with DAG( dag_id="example_kubernetes_cmd_decorator", From 0c6df8079782d70352d12cbf2a32b6f7d937d255 Mon Sep 17 00:00:00 2001 From: insomnes Date: Tue, 8 Apr 2025 10:23:48 +0200 Subject: [PATCH 17/21] docs: fix include in `operators.rst` --- providers/cncf/kubernetes/docs/operators.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/cncf/kubernetes/docs/operators.rst b/providers/cncf/kubernetes/docs/operators.rst index 1782f3f0ab3d5..aaf98843105c3 100644 --- a/providers/cncf/kubernetes/docs/operators.rst +++ b/providers/cncf/kubernetes/docs/operators.rst @@ -190,7 +190,7 @@ in a ``KubernetesPodOperator`` simplifying it's connection to the task flow. See the following example on how this works: -.. exampleinclude:: /../../providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py +.. exampleinclude:: /../tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py :language: python :start-after: [START howto_decorator_kubernetes_cmd] :end-before: [END howto_decorator_kubernetes_cmd] From 557fe25c521ba69ce3f7ed895e63d491e255b792 Mon Sep 17 00:00:00 2001 From: insomnes Date: Tue, 8 Apr 2025 10:50:09 +0200 Subject: [PATCH 18/21] fxi: proper imports in kubernetes_cm decorator module for V2 / V3 --- .../kubernetes/decorators/kubernetes_cmd.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py index 9de12288a47d3..a65efad1ae6cf 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py @@ -20,17 +20,22 @@ from collections.abc import Sequence from typing import TYPE_CHECKING, Callable -from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory +from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.bases.decorator import DecoratedOperator, TaskDecorator, task_decorator_factory +else: + from airflow.decorators.base import ( # type: ignore[no-redef] + DecoratedOperator, + TaskDecorator, + task_decorator_factory, + ) from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from airflow.utils.context import context_merge from airflow.utils.operator_helpers import determine_kwargs if TYPE_CHECKING: - try: - from airflow.sdk.definitions.context import Context - except ImportError: - # TODO: Remove once provider drops support for Airflow 2 - from airflow.utils.context import Context + from airflow.utils.context import Context class _KubernetesCmdDecoratedOperator(DecoratedOperator, KubernetesPodOperator): From 240078f63d88cfd2e85048dc735adf61631d06b9 Mon Sep 17 00:00:00 2001 From: insomnes Date: Tue, 8 Apr 2025 11:00:40 +0200 Subject: [PATCH 19/21] fix: proper imports for decorators in tests --- .../tests/unit/cncf/kubernetes/decorators/test_kubernetes.py | 2 +- .../unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py | 2 +- .../unit/cncf/kubernetes/decorators/test_kubernetes_commons.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py index a371f40f2b5cc..67a5a725635fd 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py @@ -19,7 +19,7 @@ import base64 import pickle -from airflow.decorators import task +from airflow.sdk import task from unit.cncf.kubernetes.decorators.test_kubernetes_commons import TestKubernetesDecoratorsBase diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py index 80812d9d0b60f..465299c7e2236 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py @@ -20,8 +20,8 @@ import pytest -from airflow.decorators import task from airflow.exceptions import AirflowSkipException +from airflow.sdk import task from unit.cncf.kubernetes.decorators.test_kubernetes_commons import DAG_ID, TestKubernetesDecoratorsBase diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py index cc5b4158bbbaf..4ff56dd552fa3 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py @@ -21,7 +21,7 @@ import pytest -from airflow.decorators import setup, task, teardown +from airflow.sdk import setup, task, teardown from airflow.utils import timezone from tests_common.test_utils.db import clear_db_dags, clear_db_runs, clear_rendered_ti_fields From 125420bddb467e10f4a057439b1d78189ced7eeb Mon Sep 17 00:00:00 2001 From: insomnes Date: Tue, 8 Apr 2025 18:46:58 +0200 Subject: [PATCH 20/21] test: use `if AIRFLOW_V_3_0_PLUS` for compat in deco tests --- .../unit/cncf/kubernetes/decorators/test_kubernetes.py | 6 +++++- .../cncf/kubernetes/decorators/test_kubernetes_cmd.py | 6 +++++- .../cncf/kubernetes/decorators/test_kubernetes_commons.py | 8 +++++++- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py index 67a5a725635fd..b5c4448e19301 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py @@ -19,8 +19,12 @@ import base64 import pickle -from airflow.sdk import task +from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import task +else: + from airflow.decorators import task from unit.cncf.kubernetes.decorators.test_kubernetes_commons import TestKubernetesDecoratorsBase XCOM_IMAGE = "XCOM_IMAGE" diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py index 465299c7e2236..9236ecf86e56a 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_cmd.py @@ -21,8 +21,12 @@ import pytest from airflow.exceptions import AirflowSkipException -from airflow.sdk import task +from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import task +else: + from airflow.decorators import task from unit.cncf.kubernetes.decorators.test_kubernetes_commons import DAG_ID, TestKubernetesDecoratorsBase XCOM_IMAGE = "XCOM_IMAGE" diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py index 4ff56dd552fa3..16db4b120fb27 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py @@ -21,7 +21,13 @@ import pytest -from airflow.sdk import setup, task, teardown +from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import setup, task, teardown +else: + from airflow.decorators import setup, task, teardown + from airflow.utils import timezone from tests_common.test_utils.db import clear_db_dags, clear_db_runs, clear_rendered_ti_fields From 0bae401bce1d18ef2ea8d041fffd887e34f8d1a8 Mon Sep 17 00:00:00 2001 From: insomnes Date: Tue, 15 Apr 2025 17:12:56 +0200 Subject: [PATCH 21/21] docs: add subsection on difference with @task.kubernetes --- providers/cncf/kubernetes/docs/operators.rst | 32 ++++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/providers/cncf/kubernetes/docs/operators.rst b/providers/cncf/kubernetes/docs/operators.rst index aaf98843105c3..3dead677793a7 100644 --- a/providers/cncf/kubernetes/docs/operators.rst +++ b/providers/cncf/kubernetes/docs/operators.rst @@ -183,19 +183,39 @@ Also for this action you can use operator in the deferrable mode: :end-before: [END howto_operator_k8s_write_xcom_async] -Run command in KubernetesPodOperator from a task flow -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Run command in KubernetesPodOperator from TaskFlow +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ With the usage of the ``@task.kubernetes_cmd`` decorator, you can run a command returned by a function -in a ``KubernetesPodOperator`` simplifying it's connection to the task flow. - -See the following example on how this works: +in a ``KubernetesPodOperator`` simplifying it's connection to the TaskFlow. + +Difference between ``@task.kubernetes`` and ``@task.kubernetes_cmd`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +``@task.kubernetes`` decorator is designed to run a Python function inside a Kubernetes pod using KPO. +It does this by serializing the function into a temporary Python script that is executed inside the container. +This is well-suited for cases where you want to isolate Python code execution and manage complex dependencies, +as described in the :doc:`TaskFlow documentation `. + +In contrast, ``@task.kubernetes_cmd`` decorator allows the decorated function to return +a shell command (as a list of strings), which is then passed as cmds or arguments to +``KubernetesPodOperator``. +This enables executing arbitrary commands available inside a Kubernetes pod -- +without needing to wrap it in Python code. + +A key benefit here is that Python excels at composing and templating these commands. +Shell commands can be dynamically generated using Python's string formatting, templating, +extra function calls and logic. This makes it a flexible tool for orchestrating complex pipelines +where the task is to invoke CLI-based operations in containers without the need to leave +a TaskFlow context. + +How does this decorator work? +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +See the following examples on how the decorator works: .. exampleinclude:: /../tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py :language: python :start-after: [START howto_decorator_kubernetes_cmd] :end-before: [END howto_decorator_kubernetes_cmd] - Include error message in email alert ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^