Skip to content

Conversation

@hendrikmakait
Copy link
Member

@hendrikmakait hendrikmakait commented Jan 19, 2023

Supersedes #7487 and finishes it up.

  • Tests added / passed
  • Passes pre-commit run --all-files

(f3.key, "resumed", "released", "cancelled", {}),
(f3.key, "cancelled", "waiting", "executing", {}),
(f3.key, "executing", "error", "error", {}),
# FIXME: (distributed#7489)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of accepting the erred task, the scheduler should reject the result and reschedule the computation (#7489)

@hendrikmakait hendrikmakait marked this pull request as draft January 19, 2023 15:36
@hendrikmakait hendrikmakait marked this pull request as ready for review January 20, 2023 12:22
@hendrikmakait hendrikmakait self-assigned this Jan 20, 2023
@github-actions
Copy link
Contributor

github-actions bot commented Jan 20, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       24 files  ±  0         24 suites  ±0   10h 27m 31s ⏱️ + 26m 45s
  3 329 tests +  2    3 220 ✔️ +  1     105 💤  - 1  4 +2 
39 252 runs  +24  37 357 ✔️ +22  1 890 💤  - 1  5 +3 

For more details on these failures, see this check.

Results for commit 3a655f1. ± Comparison against base commit fae59c4.

♻️ This comment has been updated with latest results.

@fjetter
Copy link
Member

fjetter commented Jan 20, 2023

The codecov is interesting. There are some indirect code coverage changes reported. Basically _transition_resumed_waiting is not covered anymore. That's interesting 🤔

@fjetter
Copy link
Member

fjetter commented Jan 20, 2023

The codecov is interesting. There are some indirect code coverage changes reported. Basically _transition_resumed_waiting is not covered anymore. That's interesting 🤔

Might of course just be flaky but there is also a real possibility that this is no longer possible. I was hoping that with the consistencies the run_id provides the cancelled/resumed states would no longer be required. This may be the first glimpse at this

@fjetter
Copy link
Member

fjetter commented Jan 20, 2023

I can confirm that distributed/tests/test_cancelled_state.py::test_resumed_cancelled_handle_compute[True-True] is triggering this code path on main

@fjetter
Copy link
Member

fjetter commented Jan 20, 2023

https://app.codecov.io/gh/dask/distributed/blob/main/distributed/worker_state_machine.py

Parts of this transition were already uncovered
image

From what I can tell,

  • the first branch (initial state resumed(executing->fetch) is only covered by distributed/tests/test_cancelled_state.py::test_resumed_cancelled_handle_compute[True-True]
  • the second one (initial state resumed(long-running->fetch))is covered by the tests distributed/tests/test_cancelled_state.py::test_secede_cancelled_or_resumed_workerstate
  • the third one was never covered. I went back ~6months where the code looked very different but similar code sections where already skipped back then.

@fjetter
Copy link
Member

fjetter commented Jan 20, 2023

The last code branch is indeed impossible. It could only trigger if the scheduler asked a worker to compute a task twice w/out any additional intermediate messages

@fjetter
Copy link
Member

fjetter commented Jan 20, 2023

This is a low level test that covers the above branches and shows what is happening and why that is OK. This is effectively the scenario you are describing in #7490 (comment) and I believe this is the only way to trigger this.
This transition path is made impossible if FreeKeysEvent(keys=["y"], stimulus_id="s4") is allowed to release x, i.e. we remove resumed from PROCESSING

@pytest.mark.parametrize("secede", [True, False])
def test_compute_free_fetch_compute(ws, secede):
    ws2 = "127.0.0.1:2"
    instructions = ws.handle_stimulus(ComputeTaskEvent.dummy("x", stimulus_id="s1"))
    # Note: A future implementation could also allow the task to be executed
    # again Right now, the scheduler should reschedule the task because of wrong
    # run_id
    if secede:
        ws.handle_stimulus(
            SecedeEvent(
                key="x",
                compute_duration=1.0,
                stimulus_id=f"secede",
            )
        )
    assert len(instructions) == 1
    assert isinstance(instructions[0], Execute)
    instructions = ws.handle_stimulus(
        # x is released for whatever reasen (e.g. client cancellation)
        FreeKeysEvent(keys=["x"], stimulus_id="s2"),
        # x was computed somewhere else
        ComputeTaskEvent.dummy("y", who_has={"x": [ws2]}, stimulus_id="s3"),
        # x was lost / no known replicas, therefore y is cancelled
        FreeKeysEvent(keys=["y"], stimulus_id="s4"),
        ComputeTaskEvent.dummy("x", stimulus_id="s5"),
    )
    assert len(ws.tasks) == 1
    assert ws.tasks["x"].state == "executing" if not secede else "long-running"

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good to go once CI is done and green-ish

Comment on lines -2213 to -2253
def _transition_resumed_waiting(
self, ts: TaskState, *, stimulus_id: str
) -> RecsInstrs:
"""
See also
--------
_transition_cancelled_fetch
_transition_cancelled_or_resumed_long_running
_transition_cancelled_waiting
_transition_resumed_fetch
"""
# None of the exit events of execute or gather_dep recommend a transition to
# waiting
assert not ts.done
if ts.previous == "executing":
assert ts.next == "fetch"
# We're back where we started. We should forget about the entire
# cancellation attempt
ts.state = "executing"
ts.next = None
ts.previous = None
return {}, []

elif ts.previous == "long-running":
assert ts.next == "fetch"
# Same as executing, and in addition send the LongRunningMsg in arrears
# Note that, if the task seceded before it was cancelled, this will cause
# the message to be sent twice.
ts.state = "long-running"
ts.next = None
ts.previous = None
smsg = LongRunningMsg(
key=ts.key, compute_duration=None, stimulus_id=stimulus_id
)
return {}, [smsg]

else:
assert ts.previous == "flight"
assert ts.next == "waiting"
return {}, []

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<3

@hendrikmakait
Copy link
Member Author

I haven't seen test_memory flake before, but CI looks green-ish, and failures appear to be unrelated.

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of nits but the PR can go in

Co-authored-by: Florian Jetter <fjetter@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants