-
Notifications
You must be signed in to change notification settings - Fork 16.4k
[Bug] Backfill job fails to run when there are tasks run into rescheduling state. #17305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Welcome anyone who is familiar with this code block to help to improve the UT. The code logic has run for two weeks in our production env and the stability is ok. |
|
Could you rebase this to the latest main? We had some disruptions in CI that caused the tests to fail in this PR. |
2ad74ef to
1d7062a
Compare
Sure, I will rebase it. Thanks for your kind reminder. |
|
CI seems to pass (the failure is due to resource insufficiency and not related to this PR). I have no idea if this is a right change, so let’s see what people more familiar with this part of code have to say… |
Sure, thank you a lot. |
|
Hey, It looks like we hit same problem described in #13322 . We would be really grateful if anyone from reviewers could look into this patch? (cc @ashb @kaxil @XD-DENG ) Basically backfill feature is heavily used in our production release of Airflow, and it's blocking us with moving into Airflow 2.x. Thanks for all contributors that are working really hard on this project. |
|
Since this is a bug fix, please could you add a unit test to prevent future regressions of this? |
Ehh...I have tried, but it's difficult to make a UT for it. |
|
Hi, |
Maybe there are other cases to cause the issue... You can print the content of |
Hi, so I was not clear. Fix from PR resolves issue with until timeout is reached. |
Is it in a rescheduling state? if so, it's reasonable because the attempt sequence num only is updated when the task runs into failure. And In rescheduling state, it only means the upstream is not met and will try again until upstream is done. |
You are right, I messed up some concepts. |
|
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
|
I am merging this to get it in 2.2.0rc1 end of today but we should write a test for it |
This PR adds a unittest for the PR apache#17305
Backfill job fails to run when there are tasks run into rescheduling state.
The error log as follows in issue #13322
Traceback (most recent call last): File "/opt/conda/bin/airflow", line 8, in sys.exit(main()) File "/opt/conda/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main args.func(args) File "/opt/conda/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command return func(*args, **kwargs) File "/opt/conda/lib/python3.8/site-packages/airflow/utils/cli.py", line 89, in wrapper return f(*args, **kwargs) File "/opt/conda/lib/python3.8/site-packages/airflow/cli/commands/dag_command.py", line 103, in dag_backfill dag.run( File "/opt/conda/lib/python3.8/site-packages/airflow/models/dag.py", line 1701, in run job.run() File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 237, in run self._execute() File "/opt/conda/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper return func(*args, session=session, **kwargs) File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 799, in _execute self._execute_for_run_dates( File "/opt/conda/lib/python3.8/site-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 722, in _execute_for_run_dates processed_dag_run_dates = self._process_backfill_task_instances( File "/opt/conda/lib/python3.8/site-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 620, in _process_backfill_task_instances self._update_counters(ti_status=ti_status) File "/opt/conda/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper return func(*args, session=session, **kwargs) File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 211, in _update_counters ti_status.running.pop(key) KeyError: TaskInstanceKey(dag_id='dag_id', task_id='task_name', execution_date=datetime.datetime(2020, 12, 15, 0, 0, tzinfo=Timezone('UTC')), try_number=2)The root cause is that the field
try_numberdoesn't Increase when the task runs into rescheduling state, but there is a reduce operation ontry_number.Currently, I can't think out a good ut to test it, only post the code here to help the one who is affected by it to solve the problem.