-
-
Notifications
You must be signed in to change notification settings - Fork 748
AMM ReduceReplicas to iterate only on replicated tasks #5297
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
ping @fjetter @gjoseph92 - this has potential performance implications on the whole scheduler, so I'd very much like your review. |
fjetter
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I don't have any performance concerns
distributed/scheduler.py
Outdated
| def stimulus_missing_data(self, key=None): | ||
| """Mark that certain keys have gone missing. Recover.""" | ||
| parent: SchedulerState = cast(SchedulerState, self) | ||
| with log_errors(): | ||
| logger.debug("Stimulus missing data %s, %s", key, worker) | ||
|
|
||
| recommendations: dict = {} | ||
| client_msgs: dict = {} | ||
| worker_msgs: dict = {} | ||
|
|
||
| logger.debug("Stimulus missing data: %r", key) | ||
| ts: TaskState = parent._tasks.get(key) | ||
| if ts is None or ts._state == "memory": | ||
| return recommendations, client_msgs, worker_msgs | ||
| cts: TaskState = parent._tasks.get(cause) | ||
|
|
||
| if cts is not None and cts._state == "memory": # couldn't find this | ||
| ws: WorkerState | ||
| cts_nbytes: Py_ssize_t = cts.get_nbytes() | ||
| for ws in cts._who_has: # TODO: this behavior is extreme | ||
| del ws._has_what[ts] | ||
| ws._nbytes -= cts_nbytes | ||
| cts._who_has.clear() | ||
| recommendations[cause] = "released" | ||
|
|
||
| if key: | ||
| recommendations[key] = "released" | ||
|
|
||
| parent._transitions(recommendations, client_msgs, worker_msgs) | ||
| recommendations = {} | ||
|
|
||
| if parent._validate: | ||
| assert cause not in self.who_has | ||
|
|
||
| return recommendations, client_msgs, worker_msgs | ||
| return {}, {}, {} | ||
| return {key: "released"}, {}, {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found myself more than once in a situation where I wanted to delete this stimulus handler since I believe most of it is dead code. However, I don't see the connection to the current PR. Is this somehow related? Can this change be split off?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I can't. If you read the old code, it does something clearly wrong - on lines 4783:4787, it deletes ts from ws.has_what but it removes cts.nbytes from ws.nbytes; it should have removed ts.nbytes instead. With nothing invoking this code path, I had no idea what the original intent of the logic was, so I opted for cutting the method down to just what is actually invoked by handle_release_data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you prefer I can remove the stimulus_missing_data method altogether and inline the surviving code into handle_release_data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe that's reasonable, since it's not really a stimulus function anymore either and it's only a few lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with moving the code to handle_release_data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done; please double check that the new code is functionally identical to the previous one
| ws._nbytes += ts_nbytes | ||
| ws._has_what[ts] = None | ||
| ts._who_has.add(ws) | ||
| if ws not in ts._who_has: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed changes like these in a few different places. You replaced a ts not in ws._has_what with a ws not in ts._who_has. Assuming the state machine is not corrupt, these are equivalent statements. What motivated this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pure aesthetics and consistency. Nothing beyond that.
gjoseph92
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM as well. Nice to have this consolidated into a function anyway.
|
|
||
| @ccall | ||
| def remove_replica(self, ts: TaskState, ws: WorkerState): | ||
| """Note that a worker no longer holds a replica of a task""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: maybe add a if self._validate case to this and remove_all_replicas to catch any issues in future tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
both remove_replica and remove_all_replicas already raise KeyError. Replaced the discard() with remove() for extra safety.
distributed/scheduler.py
Outdated
| def stimulus_missing_data(self, key=None): | ||
| """Mark that certain keys have gone missing. Recover.""" | ||
| parent: SchedulerState = cast(SchedulerState, self) | ||
| with log_errors(): | ||
| logger.debug("Stimulus missing data %s, %s", key, worker) | ||
|
|
||
| recommendations: dict = {} | ||
| client_msgs: dict = {} | ||
| worker_msgs: dict = {} | ||
|
|
||
| logger.debug("Stimulus missing data: %r", key) | ||
| ts: TaskState = parent._tasks.get(key) | ||
| if ts is None or ts._state == "memory": | ||
| return recommendations, client_msgs, worker_msgs | ||
| cts: TaskState = parent._tasks.get(cause) | ||
|
|
||
| if cts is not None and cts._state == "memory": # couldn't find this | ||
| ws: WorkerState | ||
| cts_nbytes: Py_ssize_t = cts.get_nbytes() | ||
| for ws in cts._who_has: # TODO: this behavior is extreme | ||
| del ws._has_what[ts] | ||
| ws._nbytes -= cts_nbytes | ||
| cts._who_has.clear() | ||
| recommendations[cause] = "released" | ||
|
|
||
| if key: | ||
| recommendations[key] = "released" | ||
|
|
||
| parent._transitions(recommendations, client_msgs, worker_msgs) | ||
| recommendations = {} | ||
|
|
||
| if parent._validate: | ||
| assert cause not in self.who_has | ||
|
|
||
| return recommendations, client_msgs, worker_msgs | ||
| return {}, {}, {} | ||
| return {key: "released"}, {}, {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe that's reasonable, since it's not really a stimulus function anymore either and it's only a few lines.
| assert ws not in ts._who_has | ||
| assert ts not in ws._has_what | ||
|
|
||
| ws._nbytes += ts.get_nbytes() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps some mysterious magic type annotations are necessary to get this function to Cythonize well? I have no idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added explicit declaration of nbytes.
I also noticed that replacing
del ws._has_what[ts]
ts._who_has.remove(ws)
if len(ts._who_has) == 1:
self._replicated_tasks.remove(ts)with
wh: set = ts._who_has
hw: dict = ws._has_what
del hw[ts]
wh.remove(ws)
if len(wh) == 1:
rt: set = self._replicated_tasks
rt.remove(ts)marginally improves the Cython code, at the cost of greatly reduced readability, so I'd rather not do it.
8f82a93 to
8306013
Compare
|
All code review critiques have been incorporated. The new |
fjetter
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed the changes around handle_missing_data / stimulus_missing_data again and agree that they should be identical. The missing-data system evolved quite dramatically over time and I also discovered several places on worker side as well where the code does no longer make a lot of sense. I believe this is genuinely dead code.
Test failures appear to be unrelated as well
Encapsulate changes to ts._who_has / ws._has_what.
Track in-memory tasks with more than one replica apart from all the others.
Change the AMM ReduceReplicas policy to iterate only on this subset, which in the vast majority of the cases should be much smaller than the set of all tasks in the cluster.
CC @mrocklin @fjetter @jrbourbeau @gjoseph92