-
-
Notifications
You must be signed in to change notification settings - Fork 748
Forbid collections of futures to be passed as arguments #7500
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
fjetter
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: There are a handful of tests that still use the old Scheduler.update_graph but I will adapt them accordingly
| def __setstate__(self, state): | ||
| key, address = state | ||
| try: | ||
| c = Client.current(allow_global=False) | ||
| except ValueError: | ||
| c = get_client(address) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the actual change I want to do
| c._send_to_scheduler( | ||
| { | ||
| "op": "update-graph", | ||
| "tasks": {}, | ||
| "keys": [stringify(self.key)], | ||
| "client": c.id, | ||
| } | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed this message. I believe this is entirely redundant since the initialized future will already let the scheduler know that it exists. This is the only place where this message is submitted, therefore I started to consolidate the two update_graph methods on the scheduler
| self, | ||
| client=None, | ||
| tasks=None, | ||
| keys=None, | ||
| dependencies=None, | ||
| restrictions=None, | ||
| priority=None, | ||
| loose_restrictions=None, | ||
| resources=None, | ||
| submitting_task=None, | ||
| retries=None, | ||
| user_priority=0, | ||
| actors=None, | ||
| fifo_timeout=0, | ||
| annotations=None, | ||
| code=None, | ||
| stimulus_id=None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed a couple of unused arguments and started to clean up the signature which led me to add type annotations. All of the changes in this method are merely there to make mypy happy but should not alter any behavior
| @gen_cluster(client=True) | ||
| async def test_serialize_collections_of_futures(c, s, a, b): | ||
| pd = pytest.importorskip("pandas") | ||
| dd = pytest.importorskip("dask.dataframe") | ||
| from dask.dataframe.utils import assert_eq | ||
|
|
||
| df = pd.DataFrame({"x": [1, 2, 3]}) | ||
| ddf = dd.from_pandas(df, npartitions=2).persist() | ||
| future = await c.scatter(ddf) | ||
|
|
||
| ddf2 = await future | ||
| df2 = await c.compute(ddf2) | ||
|
|
||
| assert_eq(df, df2) | ||
|
|
||
|
|
||
| def test_serialize_collections_of_futures_sync(c): | ||
| pd = pytest.importorskip("pandas") | ||
| dd = pytest.importorskip("dask.dataframe") | ||
| from dask.dataframe.utils import assert_eq | ||
|
|
||
| df = pd.DataFrame({"x": [1, 2, 3]}) | ||
| ddf = dd.from_pandas(df, npartitions=2).persist() | ||
| future = c.scatter(ddf) | ||
|
|
||
| result = future.result() | ||
| assert_eq(result.compute(), df) | ||
|
|
||
| assert future.type == dd.DataFrame | ||
| assert c.submit(lambda x, y: assert_eq(x.compute(), y), future, df).result() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe these tests are just wrong
| except NoCurrentClient: | ||
| raise NoCurrentClient( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know we typically do not work with custom exception types. However, in this case this gives us the possibility to easily tell the user what's going wrong which would otherwise much more messy.
|
FWIW I believe I can split off the "remove implicit client instantiation" from the update_graph refactoring. |
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 24 files ± 0 24 suites ±0 10h 2m 6s ⏱️ - 44m 19s For more details on these failures, see this check. Results for commit f2e3696. ± Comparison against base commit 1c6fb84. ♻️ This comment has been updated with latest results. |
9e9d4a5 to
d2e5127
Compare
|
Broke out the update_Graph refactoring to #7502 |
| with temp_default_client(ci), pytest.raises(NoCurrentClient): | ||
| future2 = pickle.loads(pickle.dumps(future)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a subtle difference between default clients and current clients. Current client is stricter and better in almost all circumstances. temp_default is also only used in testing.
I think this change makes everything much more predictable and should not have an effect on actual UX
| with pytest.raises(NoCurrentClient, match=r"Future.*argument.*persist"): | ||
| future = c.submit(f, x) | ||
| result = await future |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If users actually provide a collection as an argument they will receive a helpful exception message. unfortunately we can only raise once the task is deserialized on the worker. That's pretty late but the best I can do right now and still much better than spurious failures as described in #7498
| async def test_retire_state_change(c, s, a, b): | ||
| np = pytest.importorskip("numpy") | ||
| y = c.map(lambda x: x**2, range(10)) | ||
| await c.scatter(y) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No idea why this scatter is here. Am I missing something? Should this be a "replicate" or why would somebody scatter a future??
@jrbourbeau any ideas?
|
isntead #7580 |
Closes #7498