Skip to content

Commit 7dfe5cf

Browse files
committed
handle errors properly
1 parent 802f67f commit 7dfe5cf

File tree

1 file changed

+50
-50
lines changed

1 file changed

+50
-50
lines changed

distributed/scheduler.py

Lines changed: 50 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -4542,14 +4542,57 @@ async def update_graph(
45424542
start = time()
45434543
async with self._update_graph_lock:
45444544
try:
4545-
graph = deserialize(graph_header, graph_frames).data
4546-
del graph_header, graph_frames
4547-
except Exception as e:
4548-
msg = """\
4549-
Error during deserialization of the task graph. This frequently occurs if the Scheduler and Client have different environments. For more information, see https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments
4550-
"""
4551-
raise RuntimeError(textwrap.dedent(msg)) from e
4545+
try:
4546+
graph = deserialize(graph_header, graph_frames).data
4547+
del graph_header, graph_frames
4548+
except Exception as e:
4549+
msg = """\
4550+
Error during deserialization of the task graph. This frequently occurs if the Scheduler and Client have different environments. For more information, see https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments
4551+
"""
4552+
raise RuntimeError(textwrap.dedent(msg)) from e
4553+
else:
4554+
(
4555+
dsk,
4556+
dependencies,
4557+
annotations_by_type,
4558+
) = await offload(
4559+
_materialize_graph,
4560+
graph=graph,
4561+
global_annotations=annotations or {},
4562+
)
4563+
del graph
4564+
if not internal_priority:
4565+
# Removing all non-local keys before calling order()
4566+
dsk_keys = set(
4567+
dsk
4568+
) # intersection() of sets is much faster than dict_keys
4569+
stripped_deps = {
4570+
k: v.intersection(dsk_keys)
4571+
for k, v in dependencies.items()
4572+
if k in dsk_keys
4573+
}
4574+
internal_priority = await offload(
4575+
dask.order.order, dsk=dsk, dependencies=stripped_deps
4576+
)
45524577

4578+
self._create_taskstate_from_graph(
4579+
dsk=dsk,
4580+
client=client,
4581+
dependencies=dependencies,
4582+
keys=set(keys),
4583+
ordered=internal_priority or {},
4584+
submitting_task=submitting_task,
4585+
user_priority=user_priority,
4586+
actors=actors,
4587+
fifo_timeout=fifo_timeout,
4588+
code=code,
4589+
annotations_by_type=annotations_by_type,
4590+
# FIXME: This is just used to attach to Computation
4591+
# objects. This should be removed
4592+
global_annotations=annotations,
4593+
start=start,
4594+
stimulus_id=stimulus_id or f"update-graph-{start}",
4595+
)
45534596
except RuntimeError as e:
45544597
err = error_message(e)
45554598
for key in keys:
@@ -4561,49 +4604,6 @@ async def update_graph(
45614604
"traceback": err["traceback"],
45624605
}
45634606
)
4564-
else:
4565-
(
4566-
dsk,
4567-
dependencies,
4568-
annotations_by_type,
4569-
) = await offload(
4570-
_materialize_graph,
4571-
graph=graph,
4572-
global_annotations=annotations or {},
4573-
)
4574-
del graph
4575-
if not internal_priority:
4576-
# Removing all non-local keys before calling order()
4577-
dsk_keys = set(
4578-
dsk
4579-
) # intersection() of sets is much faster than dict_keys
4580-
stripped_deps = {
4581-
k: v.intersection(dsk_keys)
4582-
for k, v in dependencies.items()
4583-
if k in dsk_keys
4584-
}
4585-
internal_priority = await offload(
4586-
dask.order.order, dsk=dsk, dependencies=stripped_deps
4587-
)
4588-
4589-
self._create_taskstate_from_graph(
4590-
dsk=dsk,
4591-
client=client,
4592-
dependencies=dependencies,
4593-
keys=set(keys),
4594-
ordered=internal_priority or {},
4595-
submitting_task=submitting_task,
4596-
user_priority=user_priority,
4597-
actors=actors,
4598-
fifo_timeout=fifo_timeout,
4599-
code=code,
4600-
annotations_by_type=annotations_by_type,
4601-
# FIXME: This is just used to attach to Computation objects. This
4602-
# should be removed
4603-
global_annotations=annotations,
4604-
start=start,
4605-
stimulus_id=stimulus_id or f"update-graph-{start}",
4606-
)
46074607
end = time()
46084608
self.digest_metric("update-graph-duration", end - start)
46094609

0 commit comments

Comments
 (0)