Skip to content

Scheduler crashing when expanding TimeDeltaSensor #52845

@atul-astronomer

Description

@atul-astronomer

Apache Airflow version

3.0.3rc2

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Version: 3.0.3rc2

Scheduler crashing when expanding TimeDeltaSensor

[2025-07-04T06:07:14.139+0000] {scheduler_job_runner.py:1019} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1015, in _execute
    self._run_scheduler_loop()
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1296, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1396, in _do_scheduling
    callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/utils/retries.py", line 93, in wrapped_function
    for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs):
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 445, in __iter__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 378, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 400, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
                                     ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/opt/airflow/airflow-core/src/airflow/utils/retries.py", line 102, in wrapped_function
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1795, in _schedule_all_dag_runs
    callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs]
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1922, in _schedule_dag_run
    dag_run.schedule_tis(schedulable_tis, session, max_tis_per_query=self.job.max_tis_per_query)
  File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 99, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/models/dagrun.py", line 1899, in schedule_tis
    start_from_trigger = ti.task.expand_start_from_trigger(context=context, session=session)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/models/mappedoperator.py", line 76, in expand_start_from_trigger
    mapped_kwargs, _ = self._expand_mapped_kwargs(context)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/mappedoperator.py", line 693, in _expand_mapped_kwargs
    return self._get_specified_expand_input().resolve(context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'SchedulerDictOfListsExpandInput' object has no attribute 'resolve'
[2025-07-04T06:07:14.152+0000] {scheduler_job_runner.py:1031} INFO - Exited execute loop
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 10, in <module>
    sys.exit(main())
             ^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/__main__.py", line 55, in main
    args.func(args)
  File "/opt/airflow/airflow-core/src/airflow/cli/cli_config.py", line 48, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/utils/cli.py", line 112, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 52, in scheduler
    run_command_with_daemon_option(
  File "/opt/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
    callback()
  File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 55, in <lambda>
    callback=lambda: _run_scheduler_job(args),
                     ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 43, in _run_scheduler_job
    run_job(job=job_runner.job, execute_callable=job_runner._execute)
  File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 101, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 347, in run_job
    return execute_job(job, execute_callable=execute_callable)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 376, in execute_job
    ret = execute_callable()
          ^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1015, in _execute
    self._run_scheduler_loop()
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1296, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1396, in _do_scheduling
    callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/utils/retries.py", line 93, in wrapped_function
    for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs):
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 445, in __iter__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 378, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 400, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
                                     ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/opt/airflow/airflow-core/src/airflow/utils/retries.py", line 102, in wrapped_function
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1795, in _schedule_all_dag_runs
    callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs]
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1922, in _schedule_dag_run
    dag_run.schedule_tis(schedulable_tis, session, max_tis_per_query=self.job.max_tis_per_query)
  File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 99, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/models/dagrun.py", line 1899, in schedule_tis
    start_from_trigger = ti.task.expand_start_from_trigger(context=context, session=session)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/models/mappedoperator.py", line 76, in expand_start_from_trigger
    mapped_kwargs, _ = self._expand_mapped_kwargs(context)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/mappedoperator.py", line 693, in _expand_mapped_kwargs
    return self._get_specified_expand_input().resolve(context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'SchedulerDictOfListsExpandInput' object has no attribute 'resolve'

What you think should happen instead?

No response

How to reproduce

Run the below dag on 3.0.3rc2:

from datetime import datetime, timedelta
from time import sleep

from airflow.sdk import DAG
from airflow.decorators import task
from airflow.models.taskinstance import TaskInstance
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.sensors.date_time import DateTimeSensor, DateTimeSensorAsync
from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync

delays = [30, 60, 90]


@task
def get_delays():
    return delays


@task
def get_wakes(delay, **context):
    "Wake {delay} seconds after the task starts"
    ti: TaskInstance = context["ti"]
    return (ti.start_date + timedelta(seconds=delay)).isoformat()


with DAG(
    dag_id="datetime_mapped",
    start_date=datetime(1970, 1, 1),
    schedule=None,
    tags=["taskmap"] 
) as dag:

    wake_times = get_wakes.expand(delay=get_delays())

    DateTimeSensor.partial(task_id="expanded_datetime").expand(target_time=wake_times)
    TimeDeltaSensor.partial(task_id="expanded_timedelta").expand(
        delta=list(map(lambda x: timedelta(seconds=x), [30, 60, 90]))
    )

    DateTimeSensorAsync.partial(task_id="expanded_datetime_async").expand(
        target_time=wake_times
    )
    TimeDeltaSensorAsync.partial(task_id="expanded_timedelta_async").expand(
        delta=list(map(lambda x: timedelta(seconds=x), [30, 60, 90]))
    )

    TimeDeltaSensor(task_id="static_timedelta", delta=timedelta(seconds=90))
    DateTimeSensor(
        task_id="static_datetime",
        target_time="{{macros.datetime.now() + macros.timedelta(seconds=90)}}",
    )

    PythonOperator(task_id="op_sleep_90", python_callable=lambda: sleep(90))

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions