-
-
Notifications
You must be signed in to change notification settings - Fork 748
Refactor occupancy #7030
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor occupancy #7030
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ± 0 15 suites ±0 6h 30m 45s ⏱️ + 41m 46s For more details on these failures, see this check. Results for commit a98049c. ± Comparison against base commit e892d0b. ♻️ This comment has been updated with latest results. |
|
Very early preliminary results
|
distributed/stealing.py
Outdated
| duration = self.scheduler.get_task_duration( | ||
| ts | ||
| ) + self.scheduler.get_comm_cost(ts, ts.processing_on) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the stealing fixes here are preliminary. I suspect we want to get #7026 done first
| # TODO: occupancy no longer concats linearily so we can't easily | ||
| # assume that the network cost would go down by that much |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in different terms: "Occupancy by task" is no longer constant and we'd need to recompute it if this should be used for any decision.
| @pytest.mark.skip("executing heartbeats not considered yet") | ||
| @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) | ||
| async def test_correct_bad_time_estimate(c, s, *workers): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the one functionality I couldn't restore so far. The problem is that upon every reevaluate_occupancy, we didn't only reevaluate the occupancy but if we detect a significant shift of occupancy, we'd recalculate the steal time ratio for all tasks in processing.
With this PR there is no place any more to reevaluate occupancy so this is no longer possible. More natural would be to recalculate tasks whenever a task group/prefix duration drifts but we'd need to track tasks of a taskgroup to make this work.
I'm currently not fully convinced that this is worth doing. Particularly since this only affects tasks with large network transfers and small occupancy. As it stands right now, this would only affects tasks with a transfer time to occupancy ratio of more than 257 which is typically only possible for lightning fast tasks anyhow.
Before engaging on this I would like to get #7026 or a version of it done
hendrikmakait
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this general change quite a bit! It should give us some more useful occupancy estimates and the way reevaluate_occupancy worked was messy. I have some nits and a concern regarding the handling of adding/removing replicas. Apart from that, I'd love to see an A/B test for this since it's hard to judge if this has a negative impact on runtimes. The regression in https://github.com/dask/distributed/pull/7030/files#r971979808 feels fine and we should be able to find good ways of tackling this should the need arise.
|
|
||
| # Reference to scheduler task_groups | ||
| scheduler_ref: weakref.ref[SchedulerState] | None | ||
| task_groups_count: dict[str, int] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| task_groups_count: dict[str, int] | |
| task_groups_count: defaultdict[str, int] |
distributed/stealing.py
Outdated
| if ( | ||
| ts not in self.key_stealable | ||
| or ts.processing_on is not victim | ||
| or not ts.processing_on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this condition implicit in ts.processing_on is not victim?
| assert self.scheduler_ref and (scheduler := self.scheduler_ref()) | ||
| nbytes = ts.get_nbytes() | ||
| if ts in self.needs_what: | ||
| del self.needs_what[ts] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there might be an issue with the removal of self.needs_what[ts] here and only incrementing it by one on remove_replica.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs_what is a counter for how many tasks assigned to this worker require this particular key. As soon as we call add_replica this counter drops immediately to zero.
needs_what keys are disjoint with has_what
This is an implementation of the suggestion in #7027
Pros
Cons
Benchmarks: pending. Early results do not show a negative impact on scheduler performance.