Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Mar 9, 2025

closes: #47377

We recently ran into this issue for celery: #47349 for celery executor and as part of that effort we raised the bug: #47377 which was to potentially handle custom executors using task sdk.

executor_config is a property which is a "pre condition" of a task workload and is fulfilled to create a working environment for the task to run, it needn't and shouldn't need to be passed to the executor. Reference: https://docs.pydantic.dev/2.10/concepts/serialization/#model-and-field-level-include-and-exclude, we can use this to exclude it from serialisation.

If we used a task like this one:

    task = PythonVirtualenvOperator(
        task_id="virtualenv_python",
        python_callable=callable_virtualenv,
        requirements=["colorama==0.4.0"],
        system_site_packages=False,
        executor_config={
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            resources=k8s.V1ResourceRequirements(
                                requests={
                                    "cpu": "100m",
                                    "memory": "384Mi",
                                },
                                limits={
                                    "cpu": 1,
                                    "memory": "500Mi",
                                }
                            )
                        )
                    ]
                )
            )
        }
    )

Error earlier:

    callback()
  File "/opt/airflow/airflow/cli/commands/local_commands/scheduler_command.py", line 55, in <lambda>
    callback=lambda: _run_scheduler_job(args),
  File "/opt/airflow/airflow/cli/commands/local_commands/scheduler_command.py", line 43, in _run_scheduler_job
    run_job(job=job_runner.job, execute_callable=job_runner._execute)
  File "/opt/airflow/airflow/utils/session.py", line 101, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/airflow/airflow/jobs/job.py", line 342, in run_job
    return execute_job(job, execute_callable=execute_callable)
  File "/opt/airflow/airflow/jobs/job.py", line 371, in execute_job
    ret = execute_callable()
  File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 935, in _execute
    self._run_scheduler_loop()
  File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 1067, in _run_scheduler_loop
    executor.heartbeat()
  File "/opt/airflow/airflow/traces/tracer.py", line 54, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/executors/base_executor.py", line 250, in heartbeat
    self.trigger_tasks(open_slots)
  File "/opt/airflow/airflow/traces/tracer.py", line 54, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/executors/base_executor.py", line 407, in trigger_tasks
    self._process_workloads(workloads)  # type: ignore[attr-defined]
  File "/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor.py", line 281, in _process_workloads
    self._send_tasks(tasks)
  File "/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor.py", line 290, in _send_tasks
    key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
  File "/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor.py", line 329, in _send_tasks_to_celery
    return list(map(send_task_to_executor, task_tuples_to_send))
  File "/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py", line 267, in send_task_to_executor
    args = (args.model_dump_json(),)
  File "/usr/local/lib/python3.9/site-packages/pydantic/main.py", line 477, in model_dump_json
    return self.__pydantic_serializer__.to_json(
pydantic_core._pydantic_core.PydanticSerializationError: Unable to serialize unknown type: <class 'kubernetes.client.models.v1_pod.V1Pod'>

After the fix, the DAG runs fine:
image

And it doesn't crash the scheduler too


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@amoghrajesh amoghrajesh added the area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK label Mar 9, 2025
@amoghrajesh amoghrajesh requested review from ashb and eladkal March 9, 2025 13:27
@boring-cyborg boring-cyborg bot added area:Executors-core LocalExecutor & SequentialExecutor area:providers provider:celery labels Mar 9, 2025
@amoghrajesh amoghrajesh removed the area:Executors-core LocalExecutor & SequentialExecutor label Mar 9, 2025
Copy link
Contributor

@vatsrahul1001 vatsrahul1001 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@hussein-awala hussein-awala merged commit e35cf2f into apache:main Mar 9, 2025
66 checks passed
azharizz pushed a commit to azharizz/airflow that referenced this pull request Mar 9, 2025
nailo2c pushed a commit to nailo2c/airflow that referenced this pull request Apr 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK provider:celery

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Avoid scheduler crash when executor_config is passed for executors using task sdk

4 participants