Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 76 additions & 12 deletions distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
GatherDepNetworkFailureEvent,
GatherDepSuccessEvent,
LongRunningMsg,
ReleaseWorkerDataMsg,
RemoveReplicasEvent,
RescheduleEvent,
SecedeEvent,
TaskFinishedMsg,
Expand Down Expand Up @@ -457,7 +459,7 @@ async def test_resumed_cancelled_handle_compute(
Given the history of a task
executing -> cancelled -> resumed(fetch)

A handle_compute should properly restore executing.
the scheduler should reject the result upon completion and reschedule the task.
"""
# This test is heavily using set_restrictions to simulate certain scheduler
# decisions of placing keys
Expand Down Expand Up @@ -535,12 +537,12 @@ async def release_all_futures():
(f3.key, "ready", "executing", "executing", {}),
(f3.key, "executing", "released", "cancelled", {}),
(f3.key, "cancelled", "fetch", "resumed", {}),
(f3.key, "resumed", "memory", "memory", {}),
(f3.key, "resumed", "released", "cancelled", {}),
(
f3.key,
"cancelled",
"memory",
"released",
"released",
{f2.key: "released", f3.key: "forgotten"},
),
(f3.key, "released", "forgotten", "forgotten", {f2.key: "forgotten"}),
Expand All @@ -558,11 +560,15 @@ async def release_all_futures():
(f3.key, "ready", "executing", "executing", {}),
(f3.key, "executing", "released", "cancelled", {}),
(f3.key, "cancelled", "fetch", "resumed", {}),
(f3.key, "resumed", "error", "released", {f3.key: "fetch"}),
(f3.key, "fetch", "flight", "flight", {}),
(f3.key, "flight", "missing", "missing", {}),
(f3.key, "missing", "waiting", "waiting", {f2.key: "fetch"}),
(f3.key, "waiting", "ready", "ready", {f3.key: "executing"}),
(f3.key, "resumed", "released", "cancelled", {}),
(
f3.key,
"cancelled",
"error",
"released",
{f2.key: "released", f3.key: "forgotten"},
),
(f3.key, "released", "forgotten", "forgotten", {f2.key: "forgotten"}),
(f3.key, "ready", "executing", "executing", {}),
(f3.key, "executing", "memory", "memory", {}),
],
Expand All @@ -577,7 +583,8 @@ async def release_all_futures():
(f3.key, "ready", "executing", "executing", {}),
(f3.key, "executing", "released", "cancelled", {}),
(f3.key, "cancelled", "fetch", "resumed", {}),
(f3.key, "resumed", "waiting", "executing", {}),
(f3.key, "resumed", "released", "cancelled", {}),
(f3.key, "cancelled", "waiting", "executing", {}),
(f3.key, "executing", "memory", "memory", {}),
(
f3.key,
Expand All @@ -602,8 +609,10 @@ async def release_all_futures():
(f3.key, "ready", "executing", "executing", {}),
(f3.key, "executing", "released", "cancelled", {}),
(f3.key, "cancelled", "fetch", "resumed", {}),
(f3.key, "resumed", "waiting", "executing", {}),
(f3.key, "resumed", "released", "cancelled", {}),
(f3.key, "cancelled", "waiting", "executing", {}),
(f3.key, "executing", "error", "error", {}),
# FIXME: (distributed#7489)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of accepting the erred task, the scheduler should reject the result and reschedule the computation (#7489)

],
)
else:
Expand Down Expand Up @@ -917,11 +926,11 @@ def test_workerstate_flight_failure_to_executing(ws, block_queue):
assert ws.tasks["x"].state == "executing"


def test_workerstate_resumed_fetch_to_executing(ws_with_running_task):
def test_workerstate_resumed_fetch_to_cancelled_to_executing(ws_with_running_task):
"""Test state loops:

- executing -> cancelled -> resumed(fetch) -> cancelled -> executing
- executing -> long-running -> cancelled -> resumed(fetch) -> long-running
- executing -> long-running -> cancelled -> resumed(fetch) -> cancelled -> long-running

See also: test_workerstate_resumed_waiting_to_flight
"""
Expand All @@ -944,6 +953,32 @@ def test_workerstate_resumed_fetch_to_executing(ws_with_running_task):
assert ws.tasks["x"].state == prev_state


def test_workerstate_resumed_fetch_to_executing(ws_with_running_task):
"""See test_resumed_cancelled_handle_compute for end-to-end version"""
ws = ws_with_running_task
ws2 = "127.0.0.1:2"

prev_state = ws.tasks["x"].state

instructions = ws.handle_stimulus(
# x is released for whatever reason (e.g. client cancellation)
FreeKeysEvent(keys=["x"], stimulus_id="s1"),
# x was computed somewhere else
ComputeTaskEvent.dummy("y", who_has={"x": [ws2]}, stimulus_id="s2"),
# x was lost / no known replicas, therefore y is cancelled
FreeKeysEvent(keys=["y"], stimulus_id="s3"),
ComputeTaskEvent.dummy("x", stimulus_id="s4"),
)
if prev_state == "executing":
assert not instructions
else:
assert instructions == [
LongRunningMsg(key="x", compute_duration=None, stimulus_id="s4")
]
assert len(ws.tasks) == 1
assert ws.tasks["x"].state == prev_state


def test_workerstate_resumed_waiting_to_flight(ws):
"""Test state loop:

Expand Down Expand Up @@ -1165,3 +1200,32 @@ def f(ev1, ev2, ev3, ev4):
await ev4.set()
assert await x == 2
assert not ws.processing


def test_workerstate_remove_replica_of_cancelled_task_dependency(ws):
"""If a dependency was fetched, but the task gets freed by the scheduler
before the add-keys message arrives, the scheduler sends a remove-replica
message to the worker, which should then release the dependency.

Read: https://github.com/dask/distributed/pull/7487#issuecomment-1387277900

See test_resumed_cancelled_handle_compute for end-to-end version
"""
ws2 = "127.0.0.1:2"
instructions = ws.handle_stimulus(
ComputeTaskEvent.dummy("y", who_has={"x": [ws2]}, stimulus_id="s1"),
GatherDepSuccessEvent(
worker=ws2, total_nbytes=1, data={"x": 123}, stimulus_id="s2"
),
FreeKeysEvent(keys=["y"], stimulus_id="s3"),
)
assert ws.tasks["x"].state == "memory"
assert ws.tasks["y"].state == "cancelled"

# Test that the worker does accepts the RemoveReplicasEvent and
# subsequently releases the data
instructions = ws.handle_stimulus(
RemoveReplicasEvent(keys=["x"], stimulus_id="s4"),
)
assert ws.tasks["x"].state == "released"
assert instructions == [ReleaseWorkerDataMsg(stimulus_id="s4", key="x")]
105 changes: 1 addition & 104 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import psutil
import pytest
from tlz import first, merge, pluck, sliding_window
from tlz import first, pluck, sliding_window
from tornado.ioloop import IOLoop

import dask
Expand Down Expand Up @@ -3026,109 +3026,6 @@ async def test_remove_replicas_simple(c, s, a, b):
assert all(s.tasks[f.key].who_has == {s.workers[a.address]} for f in futs)


@gen_cluster(
client=True,
nthreads=[("", 1), ("", 6)], # Up to 5 threads of b will get stuck; read below
config=merge(NO_AMM, {"distributed.comm.recent-messages-log-length": 1_000}),
)
async def test_remove_replicas_while_computing(c, s, a, b):
futs = c.map(inc, range(10), workers=[a.address])
dependents_event = distributed.Event()

def some_slow(x, event):
if x % 2:
event.wait()
return x + 1

# All interesting things will happen on b
dependents = c.map(some_slow, futs, event=dependents_event, workers=[b.address])

while not any(f.key in b.state.tasks for f in dependents):
await asyncio.sleep(0.01)

# The scheduler removes keys from who_has/has_what immediately
# Make sure the worker responds to the rejection and the scheduler corrects
# the state
ws = s.workers[b.address]

def ws_has_futs(aggr_func):
nonlocal futs
return aggr_func(s.tasks[fut.key] in ws.has_what for fut in futs)

# Wait for all futs to transfer over
while not ws_has_futs(all):
await asyncio.sleep(0.01)

# Wait for some dependent tasks to be done. No more than half of the dependents can
# finish, as the others are blocked on dependents_event.
# Note: for this to work reliably regardless of scheduling order, we need to have 6+
# threads. At the moment of writing it works with 2 because futures of Client.map
# are always scheduled from left to right, but we'd rather not rely on this
# assumption.
while not any(fut.status == "finished" for fut in dependents):
await asyncio.sleep(0.01)
assert not all(fut.status == "finished" for fut in dependents)

# Try removing the initial keys
s.request_remove_replicas(
b.address, [fut.key for fut in futs], stimulus_id=f"test-{time()}"
)
# Scheduler removed all keys immediately...
assert not ws_has_futs(any)
# ... but the state is properly restored for all tasks for which the dependent task
# isn't done yet
while not ws_has_futs(any):
await asyncio.sleep(0.01)

# Let the remaining dependent tasks complete
await dependents_event.set()
await wait(dependents)
assert ws_has_futs(any) and not ws_has_futs(all)

# If a request is rejected, the worker responds with an add-keys message to
# reenlist the key in the schedulers state system to avoid race conditions,
# see also https://github.com/dask/distributed/issues/5265
rejections = set()
for msg in b.state.log:
if msg[0] == "remove-replica-rejected":
rejections.update(msg[1])
assert rejections

def answer_sent(key):
for batch in b.batched_stream.recent_message_log:
for msg in batch:
if "op" in msg and msg["op"] == "add-keys" and key in msg["keys"]:
return True
return False

for rejected_key in rejections:
assert answer_sent(rejected_key)

# Now that all dependent tasks are done, futs replicas may be removed.
# They might be already gone due to the above remove replica calls
s.request_remove_replicas(
b.address,
[fut.key for fut in futs if ws in s.tasks[fut.key].who_has],
stimulus_id=f"test-{time()}",
)

while any(
b.state.tasks[f.key].state != "released" for f in futs if f.key in b.state.tasks
):
await asyncio.sleep(0.01)

# The scheduler actually gets notified about the removed replica
while ws_has_futs(any):
await asyncio.sleep(0.01)
# A replica is still on workers[0]
assert all(len(s.tasks[f.key].who_has) == 1 for f in futs)

del dependents, futs

while any(w.state.tasks for w in (a, b)):
await asyncio.sleep(0.01)


@gen_cluster(client=True, nthreads=[("", 1)] * 3, config=NO_AMM)
async def test_who_has_consistent_remove_replicas(c, s, *workers):
a = workers[0]
Expand Down
Loading