-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Description
I'd like for @task.kubernetes to derive the task_id / pod_name based on the decorated function's name. To achieve this now, I would need to pass it into the decorator args, which is a bit repetitive:
@task.kubernetes(task_id="some_func", name=f"{some_prefix}_some_func")
def some_func():
passWe managed to do something like the above but it was more involved than expected:
# inheriting from a private class, not ideal
class DefaultKubernetesDecoratedOperator(_KubernetesDecoratedOperator):
custom_operator_name = "@default_kubernetes_task" # the operator removes the decorator so this is required
def __init__(self, **kwargs: Any) -> None:
if "task_id" not in kwargs and "python_callable" in kwargs:
task_id = kwargs["python_callable"].__name__
kwargs["task_id"] = task_id
if "task_id" in kwargs and "name" not in kwargs:
name = f"{get_prefix()}-{kwargs['task_id']}"
kwargs["name"] = name
super().__init__(**kwargs)
def default_kubernetes_task(
python_callable: Callable[[], None] | None = None,
multiple_outputs: bool | None = None,
**kwargs: Any,
) -> TaskDecorator:
return task_decorator_factory(
python_callable=python_callable,
multiple_outputs=multiple_outputs,
decorated_operator_class=DefaultKubernetesDecoratedOperator,
**kwargs,
)Initially I tried wrapping task.kubernetes itself, but the source code scrubbing relies on a hardcoded decorator name, so it doesn't scrub the decorator and the pod fails with NameError: name 'default_kubernetes_task' is not defined:
def default_kubernetes_task(
**task_kwargs: Any,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
def decorator(func: Callable[..., Any]) -> Any:
if "task_id" not in task_kwargs:
task_kwargs["task_id"] = func.__name__
if "name" not in task_kwargs:
name = f"{get_[prefix()}-{task_kwargs['task_id']}"
task_kwargs["name"] = name
return task.kubernetes(**task_kwargs)(func)
return decoratorIt'd be nice if there were a more official way to achieve the above (or something similar). Perhaps:
@task.kubernetescould accept an override for thedecorated_operator_class?- a list of custom operator names to remove could be set in the configuration?
Use case/motivation
I'd like for @task.kubernetes to derive the task_id / pod_name based on the decorated function's name, but more generally it could be useful to allow task.kubernetes to be overridden?
Related issues
No response
Are you willing to submit a PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct