Skip to content

Scheduler crash when clear a previous run of a normal task that is now a mapped task #27478

@raphaelauv

Description

@raphaelauv

Apache Airflow version

2.4.2

What happened

I have clear a task A that was a normal task but that is now a mapped task

[2022-11-02 23:33:20 +0000] [17] [INFO] Worker exiting (pid: 17)
2022-11-02T23:33:20.390911528Z Traceback (most recent call last):
2022-11-02T23:33:20.390935788Z   File "/usr/local/bin/airflow", line 8, in <module>
2022-11-02T23:33:20.390939798Z     sys.exit(main())
2022-11-02T23:33:20.390942302Z   File "/usr/local/lib/python3.10/site-packages/airflow/__main__.py", line 39, in main
2022-11-02T23:33:20.390944924Z     args.func(args)
2022-11-02T23:33:20.390947345Z   File "/usr/local/lib/python3.10/site-packages/airflow/cli/cli_parser.py", line 52, in command
2022-11-02T23:33:20.390949893Z     return func(*args, **kwargs)
2022-11-02T23:33:20.390952237Z   File "/usr/local/lib/python3.10/site-packages/airflow/utils/cli.py", line 103, in wrapper
2022-11-02T23:33:20.390954862Z     return f(*args, **kwargs)
2022-11-02T23:33:20.390957163Z   File "/usr/local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 85, in scheduler
2022-11-02T23:33:20.390959672Z     _run_scheduler_job(args=args)
2022-11-02T23:33:20.390961979Z   File "/usr/local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 50, in _run_scheduler_job
2022-11-02T23:33:20.390964496Z     job.run()
2022-11-02T23:33:20.390966931Z   File "/usr/local/lib/python3.10/site-packages/airflow/jobs/base_job.py", line 247, in run
2022-11-02T23:33:20.390969441Z     self._execute()
2022-11-02T23:33:20.390971778Z   File "/usr/local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 746, in _execute
2022-11-02T23:33:20.390974368Z     self._run_scheduler_loop()
2022-11-02T23:33:20.390976612Z   File "/usr/local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 866, in _run_scheduler_loop
2022-11-02T23:33:20.390979125Z     num_queued_tis = self._do_scheduling(session)
2022-11-02T23:33:20.390981458Z   File "/usr/local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 946, in _do_scheduling
2022-11-02T23:33:20.390984819Z     callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
2022-11-02T23:33:20.390988440Z   File "/usr/local/lib/python3.10/site-packages/airflow/utils/retries.py", line 78, in wrapped_function
2022-11-02T23:33:20.390991893Z     for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs):
2022-11-02T23:33:20.391008515Z   File "/usr/local/lib/python3.10/site-packages/tenacity/__init__.py", line 384, in __iter__
2022-11-02T23:33:20.391012668Z     do = self.iter(retry_state=retry_state)
2022-11-02T23:33:20.391016220Z   File "/usr/local/lib/python3.10/site-packages/tenacity/__init__.py", line 351, in iter
2022-11-02T23:33:20.391019633Z     return fut.result()
2022-11-02T23:33:20.391022534Z   File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, in result
2022-11-02T23:33:20.391025820Z     return self.__get_result()
2022-11-02T23:33:20.391029555Z   File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
2022-11-02T23:33:20.391033787Z     raise self._exception
2022-11-02T23:33:20.391037611Z   File "/usr/local/lib/python3.10/site-packages/airflow/utils/retries.py", line 87, in wrapped_function
2022-11-02T23:33:20.391040339Z     return func(*args, **kwargs)
2022-11-02T23:33:20.391042660Z   File "/usr/local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1234, in _schedule_all_dag_runs
2022-11-02T23:33:20.391045166Z     for dag_run in dag_runs:
2022-11-02T23:33:20.391047413Z   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/query.py", line 2887, in __iter__
2022-11-02T23:33:20.391049815Z     return self._iter().__iter__()
2022-11-02T23:33:20.391052252Z   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/query.py", line 2894, in _iter
2022-11-02T23:33:20.391054786Z     result = self.session.execute(
2022-11-02T23:33:20.391057119Z   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1688, in execute
2022-11-02T23:33:20.391059741Z     conn = self._connection_for_bind(bind)
2022-11-02T23:33:20.391062247Z   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1529, in _connection_for_bind
2022-11-02T23:33:20.391065901Z     return self._transaction._connection_for_bind(
2022-11-02T23:33:20.391069140Z   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 721, in _connection_for_bind
2022-11-02T23:33:20.391078064Z     self._assert_active()
2022-11-02T23:33:20.391081939Z   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 601, in _assert_active
2022-11-02T23:33:20.391085250Z     raise sa_exc.PendingRollbackError(
2022-11-02T23:33:20.391087747Z sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.errors.ForeignKeyViolation) update or delete on table "task_instance" violates foreign key constraint "task_fail_ti_fkey" on table "task_fail"
2022-11-02T23:33:20.391091226Z DETAIL:  Key (dag_id, task_id, run_id, map_index)=(kubernetes_dag, task-one, scheduled__2022-11-01T00:00:00+00:00, -1) is still referenced from table "task_fail".
2022-11-02T23:33:20.391093987Z 
2022-11-02T23:33:20.391102116Z [SQL: UPDATE task_instance SET map_index=%(map_index)s WHERE task_instance.dag_id = %(task_instance_dag_id)s AND task_instance.task_id = %(task_instance_task_id)s AND task_instance.run_id = %(task_instance_run_id)s AND task_instance.map_index = %(task_instance_map_index)s]
2022-11-02T23:33:20.391105554Z [parameters: {'map_index': 0, 'task_instance_dag_id': 'kubernetes_dag', 'task_instance_task_id': 'task-one', 'task_instance_run_id': 'scheduled__2022-11-01T00:00:00+00:00', 'task_instance_map_index': -1}]
2022-11-02T23:33:20.391108241Z (Background on this error at: https://sqlalche.me/e/14/gkpj) (Background on this error at: https://sqlalche.me/e/14/7s2a)
2022-11-02T23:33:20.489698500Z [2022-11-02 23:33:20 +0000] [7] [INFO] Shutting down: Master

What you think should happen instead

Airflow should evaluate the existing and previous runs as mapped task of 1 task

cause I can't see the logs anymore of a task that is now a mapped task

How to reproduce

dag with a normal task A
run dag
task A success
edit dag to make task A a mapped task ( without changing name of task )
clear task
scheduler crash

Operating System

ubuntu 22.04

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions