Fix waiting setup tasks when they are not a direct upstream#33570
Fix waiting setup tasks when they are not a direct upstream#33570hussein-awala wants to merge 24 commits intoapache:mainfrom
Conversation
|
good catch. bummer we missed this one. will take a look here. |
|
|
||
| upstream_done = done >= upstream | ||
| setup_done = (success_setup + skipped_setup + failed_setup) >= upstream_setup | ||
| is_tear_down = task.is_teardown or trigger_rule == TR.ALL_DONE_SETUP_SUCCESS |
There was a problem hiding this comment.
i don't think you need to check both is_teardown and trigger rule.
it should be sufficient to check the trigger rule.
and with that, it may be unnecessary to create this variable
|
@hussein-awala i noticed there are some failing tests. is this ready for review or are you still working through some edge cases? |
It was supposed to be ready (I tested it and it worked perfectly as expected), but there seems to be a problem with the mapped operators (which I didn't test), and I didn't have time to check it. I'll resume working on it this weekend, and I'll draft it in the meantime. |
|
Cool, so one thing @hussein-awala ... I think in order to ensure consistent behavior, we must also add something like this to function or perhaps better, do the check when setting upstream / downstream. The reason is that, what you are doing (and needfully so) is essentially to check that upstream setups are done and successful before the downstream (which is not directly connected) may run. BUT, if a user uses a trigger rule such as Other point... on the topic of scope of the setup.... I think your logic for |
- update the method which find all upstream setup tasks - update some code according to code review
|
i'm gonna collaborate on this with you if that's alright since time is short. |
|
oh darn cannot rebase let's see if i can just add |
tests/models/test_taskinstance.py
Outdated
| ), | ||
| ], | ||
| ) | ||
| def test_check_task_dependencies( |
There was a problem hiding this comment.
i think that some of these tests, where you've made substantial changes, i think we should just make a new test.
it's very important that we're confident that we're not breaking anything here, so rather than make such significant changes to the test, i think it's preferable to add a new one alongside. i'll try to do this right now.
There was a problem hiding this comment.
It was my first thought, but I tried to avoid duplicating the code. But yeah +1
tests/models/test_taskinstance.py
Outdated
| # successes, skipped, failed, upstream_failed, removed, done | ||
| @pytest.mark.parametrize( | ||
| "trigger_rule, upstream_setups, upstream_states, flag_upstream_failed, expect_state, expect_passed", | ||
| "trigger_rule, direct_upstream_setups, indirect_upstream_setups, upstream_states," |
There was a problem hiding this comment.
there are so many changes to tests it makes it a bit hard to review.
| done: int | ||
| success_setup: int | ||
| skipped_setup: int | ||
| failed_setup: int |
There was a problem hiding this comment.
until now, everything in this class is direct upstreams. is failed_setup direct only or does it include indirect too? if it includes indirect too, it should probably be clarified through a more precise variable name. but perhaps better would be to avoid mixing direct and indirect in the same class if it can be avoided. perhaps we can just add the information through an optional argument in calculate or something. this would also make the diff easier to deal with.
There was a problem hiding this comment.
the other issue is, there are more states than just "failed" that we need to account for. there is also upstream_failed, skipped, etc -- essentially anything other than success.
There was a problem hiding this comment.
is failed_setup direct only or does it include indirect too?
yes it includes indirect too. The same for success_setup and skipped_setup
There was a problem hiding this comment.
For me there is 4 different cases:
- running (all the unfinished states) -> we should wait
- success -> we can run the task
- failed (failed or upstream_failed) -> fail
- skipped with any fail -> we should skipp
So failed here includes failed and upstream_failed:
return _UpstreamTIStates(
success=counter.get(TaskInstanceState.SUCCESS, 0),
skipped=counter.get(TaskInstanceState.SKIPPED, 0),
failed=counter.get(TaskInstanceState.FAILED, 0),
upstream_failed=counter.get(TaskInstanceState.UPSTREAM_FAILED, 0),
removed=counter.get(TaskInstanceState.REMOVED, 0),
done=sum(counter.values()),
success_setup=setup_counter.get(TaskInstanceState.SUCCESS, 0),
skipped_setup=setup_counter.get(TaskInstanceState.SKIPPED, 0),
failed_setup=setup_counter.get(TaskInstanceState.FAILED, 0)
+ setup_counter.get(TaskInstanceState.UPSTREAM_FAILED, 0),
)|
ok yeah that worked... ok so yeah, pushed a few changes, mainly the only changes i made were with respect to making the diff easier to follow. added a few comments. will continue to review. |
| return | ||
| if ti.task.trigger_rule == TR.ALWAYS: | ||
| if ti.task.trigger_rule == TR.ALWAYS and not setup_upstream_tasks: | ||
| # even with ALWAYS trigger rule, we still need to check setup tasks |
There was a problem hiding this comment.
i think always should probably keep the same behavior. it says in the docs No dependencies at all, run this task at any time. it used to be called "dummy" trigger rule. so i think we just keep the absolute short circuit here.
There was a problem hiding this comment.
it's really a non-sensical rule. with it, the deps are just for show. no one should ever really use this trigger rule. perhaps it's there for testing 🤷
| yield self._passing_status(reason="The task had a always trigger rule set.") | ||
| return | ||
| yield from self._evaluate_trigger_rule(ti=ti, dep_context=dep_context, session=session) | ||
| yield from self._evaluate_trigger_rule( |
There was a problem hiding this comment.
given that there's no need to evaluate whether there are upstream setups for the purpose of TR.ALWAYS, i think we no longer need this new param setup_upstream_tasks in _evaluate_trigger_rule
Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com>
…tead of tasks list
…getting all tasks
| for down_task in task.downstream_list: | ||
| if not down_task.is_teardown and down_task.trigger_rule != TriggerRule.ALL_SUCCESS: | ||
| # this is required to ensure consistent clearing behavior when upstream | ||
| raise ValueError("Setup tasks must be followed with trigger rule ALL_SUCCESS.") |
There was a problem hiding this comment.
Since we’ve drilled this deep, can this show what the offending task is for clarity?
| @dataclass | ||
| class _UpstreamTIStates: |
There was a problem hiding this comment.
Is this just to have the default? Dataclass is pretty significantly slower and not really worthwhile here since the class is unpacked pretty much immediately. Better to stick to a named tuple.
| if ti.task.is_teardown: | ||
| setup_upstream_tasks = [task for task in ti.task.upstream_list if task.is_setup] | ||
| else: | ||
| setup_upstream_tasks = list(ti.task.get_upstreams_only_setups()) |
There was a problem hiding this comment.
| if ti.task.is_teardown: | |
| setup_upstream_tasks = [task for task in ti.task.upstream_list if task.is_setup] | |
| else: | |
| setup_upstream_tasks = list(ti.task.get_upstreams_only_setups()) | |
| if ti.task.is_teardown: | |
| setup_upstream_tasks = (task for task in ti.task.upstream_list if task.is_setup) | |
| else: | |
| setup_upstream_tasks = ti.task.get_upstreams_only_setups() |
No need to build a list here as far as I can tell
| elif not upstream_setup or setup_done: | ||
| # if there are no upstream setup tasks or all of them are done, | ||
| # and we haven't set a new state, then we can check the upstream tasks |
There was a problem hiding this comment.
I’d do
elif upstream_setup and not setup_down:
passand dedent the entire block below. (Not sure if I got the boolean negation right)
|
Fixed by an alternative solution by #33903 |
closes: #33561
This PR force the TI to wait all the upstream setup tasks event if they are not a direct upstream. Please check #33561 for more details.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.