Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c2a3ead
test_scheduler.py succeeds
sjperkins Apr 1, 2022
5d10407
Working test_worker.py and test_client.py
sjperkins Apr 1, 2022
e30be1b
Support transition_log in http output
sjperkins Apr 1, 2022
308568e
Rename assert_worker_story assert_story
sjperkins Apr 1, 2022
b017a12
If possible, defer to STIMULUS_ID when sending messages
sjperkins Apr 1, 2022
dd6efcb
Support passing stimulus_id in Scheduler handlers
sjperkins Apr 1, 2022
40f87f7
Transmit stimulus_id's from client
sjperkins Apr 1, 2022
52697de
Generate new stimulus_id on completion/failure of Worker.execute
sjperkins Apr 1, 2022
17f069a
Use decorator to manage stimulus injection
sjperkins Apr 4, 2022
36e9ef7
Merge branch 'main' into stimulus-ids-contextvars
sjperkins Apr 4, 2022
ef4e29c
Enable github tmate
sjperkins Apr 4, 2022
08a6812
Target specific test case
sjperkins Apr 4, 2022
1ce45f0
bump
sjperkins Apr 4, 2022
222b24d
Explicitly specify sync/async stimulus_handler
sjperkins Apr 4, 2022
55e5216
Assert with is_coroutine_function
sjperkins Apr 4, 2022
22e3300
Document sync parameter
sjperkins Apr 4, 2022
9c42c3f
Revert "bump"
sjperkins Apr 4, 2022
a517fcb
Revert "Target specific test case"
sjperkins Apr 4, 2022
e29ad73
Revert "Enable github tmate"
sjperkins Apr 4, 2022
8458969
comments
sjperkins Apr 4, 2022
caf9a1d
Template stimulus_id var in dashboard
sjperkins Apr 4, 2022
a57d9c1
Pass stimulus_id to Client._decref
sjperkins Apr 4, 2022
b311bef
stimulus_handler changes
sjperkins Apr 4, 2022
1cf4032
worker changes
sjperkins Apr 4, 2022
9b789f6
Merge branch 'main' into stimulus-ids-contextvars
sjperkins Apr 5, 2022
4433754
Use a contextmanager instead of a decorator
sjperkins Apr 5, 2022
046b49a
Remove default stimulus_id's throughout the scheduler
sjperkins Apr 5, 2022
9241c72
RuntimeError -> AssertionError
sjperkins Apr 5, 2022
789080d
Move STIMULUS_ID ctxvar to utils
fjetter Apr 5, 2022
93f8aa7
use ctx.run for gather_dep
fjetter Apr 5, 2022
c780413
add reason kwarg
fjetter Apr 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 62 additions & 26 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
from distributed.sizeof import sizeof
from distributed.threadpoolexecutor import rejoin
from distributed.utils import (
STIMULUS_ID,
All,
Any,
CancelledError,
Expand All @@ -93,6 +94,7 @@
import_term,
log_errors,
no_default,
set_default_stimulus,
sync,
thread_state,
)
Expand Down Expand Up @@ -472,6 +474,7 @@ def __setstate__(self, state):
"tasks": {},
"keys": [stringify(self.key)],
"client": c.id,
"stimulus_id": f"client-update-graph-{time()}",
}
)

Expand Down Expand Up @@ -1367,7 +1370,7 @@ def _inc_ref(self, key):
self.refcount[key] += 1

def _dec_ref(self, key):
with self._refcount_lock:
with self._refcount_lock, set_default_stimulus(f"client-dec-ref-{time()}"):
self.refcount[key] -= 1
if self.refcount[key] == 0:
del self.refcount[key]
Expand All @@ -1381,7 +1384,12 @@ def _release_key(self, key):
st.cancel()
if self.status != "closed":
self._send_to_scheduler(
{"op": "client-releases-keys", "keys": [key], "client": self.id}
{
"op": "client-releases-keys",
"keys": [key],
"client": self.id,
"stimulus_id": STIMULUS_ID.get(),
}
)

async def _handle_report(self):
Expand Down Expand Up @@ -1527,31 +1535,33 @@ async def _close(self, fast=False):
):
await self.scheduler_comm.close()

for key in list(self.futures):
self._release_key(key=key)
with set_default_stimulus(f"client-close-{time()}"):

if self._start_arg is None:
with suppress(AttributeError):
await self.cluster.close()
for key in list(self.futures):
self._release_key(key=key)

await self.rpc.close()
if self._start_arg is None:
with suppress(AttributeError):
await self.cluster.close()

self.status = "closed"
await self.rpc.close()

if _get_global_client() is self:
_set_global_client(None)
self.status = "closed"

if (
handle_report_task is not None
and handle_report_task is not current_task
):
with suppress(TimeoutError, asyncio.CancelledError):
await asyncio.wait_for(handle_report_task, 0 if fast else 2)
if _get_global_client() is self:
_set_global_client(None)

with suppress(AttributeError):
await self.scheduler.close_rpc()
if (
handle_report_task is not None
and handle_report_task is not current_task
):
with suppress(TimeoutError, asyncio.CancelledError):
await asyncio.wait_for(handle_report_task, 0 if fast else 2)

self.scheduler = None
with suppress(AttributeError):
await self.scheduler.close_rpc()

self.scheduler = None

self.status = "closed"

Expand Down Expand Up @@ -2115,7 +2125,11 @@ async def _gather_remote(self, direct, local_worker):
response["data"].update(data2)

else: # ask scheduler to gather data for us
response = await retry_operation(self.scheduler.gather, keys=keys)
response = await retry_operation(
self.scheduler.gather,
keys=keys,
stimulus_id=f"client-gather-remote-{time()}",
)

return response

Expand Down Expand Up @@ -2201,6 +2215,8 @@ async def _scatter(
d = await self._scatter(keymap(stringify, data), workers, broadcast)
return {k: d[stringify(k)] for k in data}

stimulus_id = f"client-scatter-{time()}"

if isinstance(data, type(range(0))):
data = list(data)
input_type = type(data)
Expand Down Expand Up @@ -2242,6 +2258,7 @@ async def _scatter(
who_has={key: [local_worker.address] for key in data},
nbytes=valmap(sizeof, data),
client=self.id,
stimulus_id=stimulus_id,
)

else:
Expand All @@ -2264,7 +2281,10 @@ async def _scatter(
)

await self.scheduler.update_data(
who_has=who_has, nbytes=nbytes, client=self.id
who_has=who_has,
nbytes=nbytes,
client=self.id,
stimulus_id=stimulus_id,
)
else:
await self.scheduler.scatter(
Expand All @@ -2273,6 +2293,7 @@ async def _scatter(
client=self.id,
broadcast=broadcast,
timeout=timeout,
stimulus_id=stimulus_id,
)

out = {k: Future(k, self, inform=False) for k in data}
Expand Down Expand Up @@ -2396,7 +2417,12 @@ def scatter(

async def _cancel(self, futures, force=False):
keys = list({stringify(f.key) for f in futures_of(futures)})
await self.scheduler.cancel(keys=keys, client=self.id, force=force)
await self.scheduler.cancel(
keys=keys,
client=self.id,
force=force,
stimulus_id=f"client-cancel-{time()}",
)
for k in keys:
st = self.futures.pop(k, None)
if st is not None:
Expand All @@ -2423,7 +2449,9 @@ def cancel(self, futures, asynchronous=None, force=False):

async def _retry(self, futures):
keys = list({stringify(f.key) for f in futures_of(futures)})
response = await self.scheduler.retry(keys=keys, client=self.id)
response = await self.scheduler.retry(
keys=keys, client=self.id, stimulus_id=f"client-retry-{time()}"
)
for key in response:
st = self.futures[key]
st.retry()
Expand Down Expand Up @@ -2922,6 +2950,7 @@ def _graph_to_futures(
"fifo_timeout": fifo_timeout,
"actors": actors,
"code": self._get_computation_code(),
"stimulus_id": f"client-update-graph-hlg-{time()}",
}
)
return futures
Expand Down Expand Up @@ -3424,7 +3453,9 @@ async def _rebalance(self, futures=None, workers=None):
keys = list({stringify(f.key) for f in self.futures_of(futures)})
else:
keys = None
result = await self.scheduler.rebalance(keys=keys, workers=workers)
result = await self.scheduler.rebalance(
keys=keys, workers=workers, stimulus_id=f"client-rebalance-{time()}"
)
if result["status"] == "partial-fail":
raise KeyError(f"Could not rebalance keys: {result['keys']}")
assert result["status"] == "OK", result
Expand Down Expand Up @@ -3459,7 +3490,11 @@ async def _replicate(self, futures, n=None, workers=None, branching_factor=2):
await _wait(futures)
keys = {stringify(f.key) for f in futures}
await self.scheduler.replicate(
keys=list(keys), n=n, workers=workers, branching_factor=branching_factor
keys=list(keys),
n=n,
workers=workers,
branching_factor=branching_factor,
stimulus_id=f"client-replicate-{time()}",
)

def replicate(self, futures, n=None, workers=None, branching_factor=2, **kwargs):
Expand Down Expand Up @@ -4177,6 +4212,7 @@ def retire_workers(
self.scheduler.retire_workers,
workers=workers,
close_workers=close_workers,
stimulus_id=f"client-retire-workers-{time()}",
**kwargs,
)

Expand Down
5 changes: 4 additions & 1 deletion distributed/http/templates/task.html
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,18 @@ <h3 class="title is-5"> Transition Log </h3>
<th> Key </th>
<th> Start </th>
<th> Finish </th>
<th> Stimulus ID </th>
<th> Recommended Key </th>
<th> Recommended Action </th>
</thead>

{% for key, start, finish, recommendations, transition_time in scheduler.story(Task) %}
{% for key, start, finish, recommendations, stimulus_id, transition_time in scheduler.story(Task) %}
<tr>
<td> {{ fromtimestamp(transition_time) }} </td>
<td> <a href="{{ url_escape(key) }}.html">{{key}}</a> </td>
<td> {{ start }} </td>
<td> {{ finish }} </td>
<td> {{ stimulus_id }} </td>
<td> </td>
<td> </td>
</tr>
Expand All @@ -137,6 +139,7 @@ <h3 class="title is-5"> Transition Log </h3>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> <a href="{{ url_escape(key2) }}.html">{{key2}}</a> </td>
<td> {{ rec }} </td>
</tr>
Expand Down
Loading