-
-
Notifications
You must be signed in to change notification settings - Fork 748
Pickle WorkerState #6623
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pickle WorkerState #6623
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ± 0 15 suites ±0 10h 6m 8s ⏱️ - 33m 37s For more details on these failures, see this check. Results for commit eb308ba. ± Comparison against base commit dc019ed. |
| return f"<{type(self).__name__}: {len(self)} items>" | ||
|
|
||
| def __reduce__(self) -> tuple[Callable, tuple]: | ||
| heap = [(k, i, v) for k, i, vref in self._heap if (v := vref()) in self._data] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the worker state ever pickled from a different thread than where it's being mutated?
| heap = [(k, i, v) for k, i, vref in self._heap if (v := vref()) in self._data] | |
| heap = [(k, i, v) for k, i, vref in self._heap.copy() if (v := vref()) in self._data] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unlikely but not impossible.
Standard pattern (beyond the current benchmarkign stuff in dask/dask-benchmarks#50)
would be something like client.run or a newer version of client.dump_cluster_state which would all be executing on the main htead.
However, we cannot protect from improper usages, e.g.
def task():
return pickle.dumps(get_worker().state)
client.submit(task)is valid dask but would pickle in another thread. I don't think we should protect or guard against this if it is expensive/ _heap is a list here, isn't it? a shallow copy should be fine but I'm not convinced it is necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None of the class is thread-safe; I don't see why pickling should be an exception.
Also, there's no guarantee that list.copy() will hold the GIL - if it does, it's an implementation detail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P.S. the multithreading issue would be prevented with either client.run or an async task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
list and dict copying is atomic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We perhaps should explicitly document what non-trivial operations are atomical (for example list and dict copying is atomic) and whether atomacity is the part of the language specification or CPython implementation detail.
For as long as it's not explicitly documented, we must treat it as an implementation detail we can't rely upon
| ) | ||
| ) | ||
| ws.handle_stimulus(UpdateDataEvent(data={"y": 123}, report=False, stimulus_id="s")) | ||
| ws2 = pickle.loads(pickle.dumps(ws)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's very exciting
Co-authored-by: Thomas Grainger <tagrain@gmail.com>
Provide initial support for pickling the WorkerState.
This is not (yet) used for dumping; consensus should be reached before we do anything about it.
For the time being, this is necessary because of dask/dask-benchmarks#50.