-
-
Notifications
You must be signed in to change notification settings - Fork 748
Client story #5987
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Client story #5987
Conversation
|
This PR is based on top of #5954 and should be rebased when #5954 is merged. TODO:
|
Unit Test Results 12 files ± 0 12 suites ±0 6h 19m 36s ⏱️ - 16m 38s For more details on these failures, see this check. Results for commit 2888b26. ± Comparison against base commit 6dd928b. ♻️ This comment has been updated with latest results. |
|
From #5872
and #5987 (comment)
|
distributed/tests/test_client.py
Outdated
|
|
||
| async def get_story(self, *args, **kw): | ||
| await self.unblock_worker.wait() | ||
| return super().get_story(*args, **kw) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| return super().get_story(*args, **kw) | |
| return await super().get_story(*args, **kw) |
distributed/client.py
Outdated
| return await task | ||
| except Exception: | ||
| if on_error == "raise": | ||
| task.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the task will always be done at this point - because of the return await task
distributed/scheduler.py
Outdated
| bits = [ | ||
| (ws, await self.rpc.connect(ws.address)) for ws in self.workers.values() | ||
| ] | ||
| tasks = [] | ||
|
|
||
| for _, worker_comm in bits: | ||
| coro = send_recv(comm=worker_comm, reply=True, op="get_story", keys=keys) | ||
| tasks.append(asyncio.ensure_future(coro)) | ||
|
|
||
| try: | ||
| worker_stories = await asyncio.gather( | ||
| *tasks, return_exceptions=on_error == "ignore" | ||
| ) | ||
| except Exception: | ||
| for task in tasks: | ||
| task.cancel() | ||
|
|
||
| raise | ||
| finally: | ||
| for worker_state, worker_comm in bits: | ||
| self.rpc.reuse(worker_state.address, worker_comm) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think reproducing the pattern from
distributed/distributed/scheduler.py
Lines 6155 to 6187 in 5c7d555
| async def send_message(addr): | |
| try: | |
| comm = await self.rpc.connect(addr) | |
| comm.name = "Scheduler Broadcast" | |
| try: | |
| resp = await send_recv( | |
| comm, close=True, serializers=serializers, **msg | |
| ) | |
| finally: | |
| self.rpc.reuse(addr, comm) | |
| return resp | |
| except Exception as e: | |
| logger.error(f"broadcast to {addr} failed: {e.__class__.__name__}: {e}") | |
| if on_error == "raise": | |
| raise | |
| elif on_error == "return": | |
| return e | |
| elif on_error == "return_pickle": | |
| return dumps(e, protocol=4) | |
| elif on_error == "ignore": | |
| return ERROR | |
| else: | |
| raise ValueError( | |
| "on_error must be 'raise', 'return', 'return_pickle', " | |
| f"or 'ignore'; got {on_error!r}" | |
| ) | |
| results = await All( | |
| [send_message(address) for address in addresses if address is not None] | |
| ) | |
| return {k: v for k, v in zip(workers, results) if v is not ERROR} |
|
Closed by #6161 |
Client.story- Support collecting cluster-wide story for a key or stimulus ID #5872pre-commit run --all-files