Skip to content

Scheduler transition error when tasks (transitively) transitions memory -> erred #8548

@hendrikmakait

Description

@hendrikmakait

Describe the issue:

Under a certain pattern of conditions, the scheduler can raise an error during transition.

Minimal Complete Verifiable Example:

def block(x, in_event: Event, block_event: Event):
    in_event.set()
    block_event.wait()
    return x

@gen_cluster(
    client=True,
    nthreads=[("", 1, {"resources": {"a": 1}}), ("", 1, {"resources": {"b": 1}})],
    config={"distributed.scheduler.allowed-failures": 1},
    Worker=Nanny,
)
async def test_reduce_fan_out_pattern_deadlock(c, s, a, b):
    in_ancestor = Event()
    block_ancestor = Event()
    in_on_a_descendant = Event()
    in_on_b_descendant = Event()
    block_descendants = Event()
    await block_ancestor.set()
    f = c.submit(block, 1, in_ancestor, block_ancestor, key="f", resources={"a": 1})
    g = c.submit(inc, f, key="g", resources={"a": 1})
    h1 = c.submit(block, g, in_on_a_descendant, block_descendants, key="h1", resources={"a": 1})
    h2 = c.submit(block, g, in_on_b_descendant, block_descendants, key="h2", resources={"b": 1})

    await asyncio.gather(wait_for_state("g", "memory", s), in_on_a_descendant.wait(), in_on_b_descendant.wait())
    await asyncio.gather(in_ancestor.clear(), block_ancestor.clear())
    await a.process.process.kill()

    await in_ancestor.wait()
    await in_ancestor.clear()
    await a.process.process.kill()

    await in_ancestor.wait()
    await in_ancestor.clear()
    await a.process.process.kill()

    await block_descendants.set()
    await h2
    await h1

logs

  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 5283, in remove_worker
    self.transitions(recommendations, stimulus_id=stimulus_id)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 7877, in transitions
    self._transitions(recommendations, client_msgs, worker_msgs, stimulus_id)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 2062, in _transitions
    new_recs, new_cmsgs, new_wmsgs = self._transition(key, finish, stimulus_id)
                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 1968, in _transition
    b_recs, b_cmsgs, b_wmsgs = func(self, key, stimulus_id)
                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 2085, in _transition_released_waiting
    assert dts.state not in {"forgotten", "erred"}
AssertionError

This is caused because of the following sequence of transitions:

...,
Transition(
    key="f",
    start="processing",
    finish="erred",
    recommendations={"g": "erred"},
    stimulus_id="handle-worker-cleanup-1709224381.2813592",
    timestamp=1709224381.281585,
),
Transition(
    key="g",
    start="memory",
    finish="released",
    recommendations={"g": "waiting", "h2": "waiting"},
    stimulus_id="handle-worker-cleanup-1709224381.2813592",
    timestamp=1709224381.281642,
),
Transition(
    key="h2",
    start="processing",
    finish="released",
    recommendations={"h2": "waiting"},
    stimulus_id="handle-worker-cleanup-1709224381.311968",
    timestamp=1709224381.3127308,
),
Transition(
    key="h2",
    start="released",
    finish="waiting",
    recommendations={"g": "waiting"},
    stimulus_id="handle-worker-cleanup-1709224381.311968",
    timestamp=1709224381.314497,
),
...

Namely, g should be transitioned memory -> erred which happens through a two-step transition memory -> released and released -> erred. The first step generates recommendations of its own ({"g": "waiting", "h2": "waiting"}) which are applied before the second step that should transition g to erred.

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions