Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions providers/cncf/kubernetes/docs/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,40 @@ Also for this action you can use operator in the deferrable mode:
:start-after: [START howto_operator_k8s_write_xcom_async]
:end-before: [END howto_operator_k8s_write_xcom_async]


Run command in KubernetesPodOperator from TaskFlow
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
With the usage of the ``@task.kubernetes_cmd`` decorator, you can run a command returned by a function
in a ``KubernetesPodOperator`` simplifying it's connection to the TaskFlow.

Difference between ``@task.kubernetes`` and ``@task.kubernetes_cmd``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
``@task.kubernetes`` decorator is designed to run a Python function inside a Kubernetes pod using KPO.
It does this by serializing the function into a temporary Python script that is executed inside the container.
This is well-suited for cases where you want to isolate Python code execution and manage complex dependencies,
as described in the :doc:`TaskFlow documentation <apache-airflow:tutorial/taskflow>`.

In contrast, ``@task.kubernetes_cmd`` decorator allows the decorated function to return
a shell command (as a list of strings), which is then passed as cmds or arguments to
``KubernetesPodOperator``.
This enables executing arbitrary commands available inside a Kubernetes pod --
without needing to wrap it in Python code.

A key benefit here is that Python excels at composing and templating these commands.
Shell commands can be dynamically generated using Python's string formatting, templating,
extra function calls and logic. This makes it a flexible tool for orchestrating complex pipelines
where the task is to invoke CLI-based operations in containers without the need to leave
a TaskFlow context.

How does this decorator work?
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
See the following examples on how the decorator works:

.. exampleinclude:: /../tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py
:language: python
:start-after: [START howto_decorator_kubernetes_cmd]
:end-before: [END howto_decorator_kubernetes_cmd]

Include error message in email alert
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
2 changes: 2 additions & 0 deletions providers/cncf/kubernetes/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ connection-types:
task-decorators:
- class-name: airflow.providers.cncf.kubernetes.decorators.kubernetes.kubernetes_task
name: kubernetes
- class-name: airflow.providers.cncf.kubernetes.decorators.kubernetes_cmd.kubernetes_cmd_task
name: kubernetes_cmd

config:
local_kubernetes_executor:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import warnings
from collections.abc import Sequence
from typing import TYPE_CHECKING, Callable

from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.bases.decorator import DecoratedOperator, TaskDecorator, task_decorator_factory
else:
from airflow.decorators.base import ( # type: ignore[no-redef]
DecoratedOperator,
TaskDecorator,
task_decorator_factory,
)
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.utils.context import context_merge
from airflow.utils.operator_helpers import determine_kwargs

if TYPE_CHECKING:
from airflow.utils.context import Context


class _KubernetesCmdDecoratedOperator(DecoratedOperator, KubernetesPodOperator):
custom_operator_name = "@task.kubernetes_cmd"

template_fields: Sequence[str] = KubernetesPodOperator.template_fields
overwrite_rtif_after_execution: bool = True

def __init__(self, *, python_callable: Callable, args_only: bool = False, **kwargs) -> None:
self.args_only = args_only

cmds = kwargs.pop("cmds", None)
arguments = kwargs.pop("arguments", None)

if cmds is not None or arguments is not None:
warnings.warn(
f"The `cmds` and `arguments` are unused in {self.custom_operator_name} decorator. "
"You should return a list of commands or image entrypoint arguments with "
"args_only=True from the python_callable.",
UserWarning,
stacklevel=3,
)

# If the name was not provided, we generate operator name from the python_callable
# we also instruct operator to add a random suffix to avoid collisions by default
op_name = kwargs.pop("name", f"k8s-airflow-pod-{python_callable.__name__}")
random_name_suffix = kwargs.pop("random_name_suffix", True)

super().__init__(
python_callable=python_callable,
name=op_name,
random_name_suffix=random_name_suffix,
cmds=None,
arguments=None,
**kwargs,
)

def execute(self, context: Context):
generated = self._generate_cmds(context)
if self.args_only:
self.cmds = []
self.arguments = generated
else:
self.cmds = generated
self.arguments = []
context["ti"].render_templates() # type: ignore[attr-defined]
return super().execute(context)

def _generate_cmds(self, context: Context) -> list[str]:
context_merge(context, self.op_kwargs)
kwargs = determine_kwargs(self.python_callable, self.op_args, context)
generated_cmds = self.python_callable(*self.op_args, **kwargs)
func_name = self.python_callable.__name__
if not isinstance(generated_cmds, list):
raise TypeError(
f"Expected python_callable to return a list of strings, but got {type(generated_cmds)}"
)
if not all(isinstance(cmd, str) for cmd in generated_cmds):
raise TypeError(f"Expected {func_name} to return a list of strings, but got {generated_cmds}")
if not generated_cmds:
raise ValueError(f"The {func_name} returned an empty list of commands")

return generated_cmds


def kubernetes_cmd_task(
python_callable: Callable | None = None,
**kwargs,
) -> TaskDecorator:
"""
Kubernetes cmd operator decorator.

This wraps a function which should return command to be executed
in K8s using KubernetesPodOperator. The function should return a list of strings.
If args_only is set to True, the function should return a list of arguments for
container default command. Also accepts any argument that KubernetesPodOperator
will via ``kwargs``. Can be reused in a single DAG.

:param python_callable: Function to decorate
"""
return task_decorator_factory(
python_callable=python_callable,
decorated_operator_class=_KubernetesCmdDecoratedOperator,
**kwargs,
)
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ def get_provider_info():
{
"class-name": "airflow.providers.cncf.kubernetes.decorators.kubernetes.kubernetes_task",
"name": "kubernetes",
}
},
{
"class-name": "airflow.providers.cncf.kubernetes.decorators.kubernetes_cmd.kubernetes_cmd_task",
"name": "kubernetes_cmd",
},
],
"config": {
"local_kubernetes_executor": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from datetime import datetime

from airflow.sdk import DAG, task

with DAG(
dag_id="example_kubernetes_cmd_decorator",
schedule=None,
start_date=datetime(2021, 1, 1),
tags=["example", "cncf", "kubernetes"],
catchup=False,
) as dag:
# [START howto_decorator_kubernetes_cmd]
@task
def foo() -> str:
return "foo"

@task
def bar() -> str:
return "bar"

@task.kubernetes_cmd(
image="bash:5.2",
name="full_cmd",
in_cluster=False,
)
def execute_in_k8s_pod_full_cmd(foo_result: str, bar_result: str) -> list[str]:
return ["echo", "-e", f"With full cmd:\\t{foo_result}\\t{bar_result}"]

# The args_only parameter is used to indicate that the decorated function will
# return a list of arguments to be passed as arguments to the container entrypoint:
# in this case, the `bash` command
@task.kubernetes_cmd(args_only=True, image="bash:5.2", in_cluster=False)
def execute_in_k8s_pod_args_only(foo_result: str, bar_result: str) -> list[str]:
return ["-c", f"echo -e 'With args only:\\t{foo_result}\\t{bar_result}'"]

# Templating can be used in the returned command and all other templated fields in
# the decorator parameters.
@task.kubernetes_cmd(image="bash:5.2", name="my-pod-{{ ti.task_id }}", in_cluster=False)
def apply_templating(message: str) -> list[str]:
full_message = "Templated task_id: {{ ti.task_id }}, dag_id: " + message
return ["echo", full_message]

foo_result = foo()
bar_result = bar()

full_cmd_instance = execute_in_k8s_pod_full_cmd(foo_result, bar_result)
args_instance = execute_in_k8s_pod_args_only(foo_result, bar_result)

[full_cmd_instance, args_instance] >> apply_templating("{{ dag.dag_id }}")

# [END howto_decorator_kubernetes_cmd]


from tests_common.test_utils.system_tests import get_test_run

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
Loading