Fix deadlock when mapped task with removed upstream is rerun#26518
Fix deadlock when mapped task with removed upstream is rerun#26518ephraimbuddy merged 3 commits intoapache:mainfrom
Conversation
When a dag with a mapped downstream tasks that depends on a mapped upstream tasks that have some mapped indexes removed is rerun, we run into a deadlock because the trigger rules evaluation is not accounting for removed task instances. The fix for the deadlocks was to account for the removed task instances where possible in the trigger rules In this fix, I added a case where if we set flag_upstream_failed, then for the removed task instance, the downstream of that task instance will be removed. That's if the upstream with index 3 is removed, then downstream with index 3 will also be removed if flag_upstream_failed is set to True.
b7da6a0 to
2584bcb
Compare
|
Here's how you can reproduce this bug in main: from datetime import datetime
from airflow.decorators import dag, task
@dag(
'bug_test',
schedule='@once',
start_date=datetime(2022,1,1),
max_active_runs=1
)
def test_scheduler_bug():
@task
def do_something(i):
return 6
@task
def do_something_else(i):
import logging
log = logging.getLogger('airflow.task')
log.info("I'll never run")
# After the run, reduce this range to 2
nums = do_something.expand(i=[i+1 for i in range(5)])
do_something_else.expand(i=nums)
TEST_DAG = test_scheduler_bug() |
| elif skipped: | ||
| changed = ti.set_state(State.SKIPPED, session) | ||
| elif removed and successes and ti.map_index > -1: | ||
| if ti.map_index >= successes: |
There was a problem hiding this comment.
why do we compare map index with number of upstream successes? that seems odd?
There was a problem hiding this comment.
Hmm, yes now you mention it this feels like it's going to break in some other cases.
Like what if there is 1 mapped upstream which is in the failed state, one in the removed state, this would erroneously remove it I think?
There was a problem hiding this comment.
@ephraimbuddy Could you take another look at this PR/case please?
There was a problem hiding this comment.
Hmm, yes now you mention it this feels like it's going to break in some other cases. Like what if there is 1 mapped upstream which is in the failed state, one in the removed state, this would erroneously remove it I think?
In this case, successes will be 0, also failed=1, so the condition will not be reached and the taskinstance will be marked as upstream_failed. Same thing when we have skipped task instances. The condition to mark the task instance as removed will not be reached.
The condition for the task to be marked removed is if we have some removed task instances and successful task instances, no failed, no skipped and the task is mapped. So if we get here, if the map_index of the task instance is >= all successful task instances, it means the task instance upstream is removed because indexes go from -1 upwards, it's not possible to remove map_index 1 and still have map_index 3?
If we have 5 mapped tasks(0,1,2,3,4), and we remove 2, we will have 3 mapped tasks(0,1,2). If these 3 are successful,(successes=3), then the removed are those greater than or equal to the map index 3(3,4).
There was a problem hiding this comment.
What if a task has multiple upstreams?
[a, b] >> mapped_task(list_gen) for instance?
Edit: [a, b] >> mapped_task.map(list_gen) for instance?
And a is success, b is failure, and list_gen is reduced to only returning a single item?
There was a problem hiding this comment.
This would apply
airflow/airflow/ti_deps/deps/trigger_rule_dep.py
Lines 165 to 166 in 0c7b4cb
Line 169 is only satisfied if we have removed, successes, no failed, no skipped and mapped task
There was a problem hiding this comment.
Ahh good. This is important enough functionality (it's the very core of Airflow) that we should add atest cases covering things like this
There was a problem hiding this comment.
It does seem like it's covered here:
airflow/tests/models/test_taskinstance.py
Lines 1065 to 1216 in 7d6d182
There was a problem hiding this comment.
Another option is removing this part altogether. It's not part of the deadlock issue but I feel that it's good to have stuff.
My reason is this:
If at first run upstream was 3 and downstream was 3 too. Upstream created the downstream. We have 3 -> 3 successes.
Then we reduce upstream to 2, meaning one task is removed and we clear and rerun the dag, without this part of the change, we will end up running all 3 of the downstreams: upstream (2 successful, 1 removed). Downstream(3 successful)
There was a problem hiding this comment.
It's not this one specific line, they are all like this and that's the worry.
The test you highlighted doesn't use mapped tasks so I don't think it covers the case I highlighted. Edit: sorry, original example didn't have a map. Added that.
When a dag with a mapped downstream tasks that depends on a mapped upstream tasks that have some mapped indexes removed is rerun, we run into a deadlock because the trigger rules evaluation is not accounting for removed task instances. The fix for the deadlocks was to account for the removed task instances where possible in the trigger rules In this fix, I added a case where if we set flag_upstream_failed, then for the removed task instance, the downstream of that task instance will be removed. That's if the upstream with index 3 is removed, then downstream with index 3 will also be removed if flag_upstream_failed is set to True. (cherry picked from commit e91637f)
When a dag with a mapped downstream tasks that depends on a mapped upstream tasks that have some mapped indexes removed is rerun, we run into a deadlock because the trigger rules evaluation is not accounting for removed task instances.
The fix for the deadlocks was to account for the removed task instances where possible in the trigger rules
In this fix, I added a case where if we set flag_upstream_failed, then for the removed task instance, the downstream of that task instance will be removed. That's if the upstream with index 3 is removed, then downstream with index 3 will also be removed if flag_upstream_failed is set to True.