Skip to content

Conversation

@github-actions
Copy link
Contributor

github-actions bot commented Jul 12, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       19 files   -        1         19 suites   - 1   11h 14m 52s ⏱️ - 44m 0s
  3 763 tests +       9    3 650 ✔️ +       7     106 💤 ±  0  6 +1  1 🔥 +1 
35 034 runs   - 1 280  33 312 ✔️  - 1 249  1 712 💤  - 36  9 +4  1 🔥 +1 

For more details on these failures and errors, see this check.

Results for commit 41f4f8e. ± Comparison against base commit 9255987.

This pull request removes 1 and adds 10 tests. Note that renamed tests count towards both.
distributed.tests.test_worker ‑ test_gather_missing_workers_replicated[True]
distributed.tests.test_client ‑ test_gather_race_vs_AMM[False]
distributed.tests.test_client ‑ test_gather_race_vs_AMM[True]
distributed.tests.test_utils_comm ‑ test_gather_from_workers_busy
distributed.tests.test_utils_comm ‑ test_gather_from_workers_missing_replicas
distributed.tests.test_utils_comm ‑ test_gather_from_workers_serialization_error[pickle]
distributed.tests.test_utils_comm ‑ test_gather_from_workers_serialization_error[unpickle]
distributed.tests.test_worker ‑ test_gather_missing_workers_replicated[True0]
distributed.tests.test_worker ‑ test_gather_missing_workers_replicated[True1]
distributed.tests.test_worker ‑ test_gather_missing_workers_replicated[True2]
distributed.tests.test_worker ‑ test_gather_missing_workers_replicated[True3]

♻️ This comment has been updated with latest results.

@crusaderky crusaderky force-pushed the gather branch 4 times, most recently from 5869dfd to b0c9f6e Compare July 18, 2023 16:43
Comment on lines 2327 to 2331
response = await retry_operation(
self.scheduler.gather, keys=missing_keys
)
if response["status"] == "OK":
response["data"].update(data)
Copy link
Collaborator Author

@crusaderky crusaderky Jul 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR does not change the previous behaviour. See

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR introduces significant changes to how we are gathering data. I strongly suggest to separate aesthetical refactoring from functional changes. Especially considering that this diff covers both logical changes (who_has) and aesthetical refactorings (if/else). This makes reviewing much harder than it should be.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot see any aesthetical refactoring in this PR that could be moved to a separate PR while preserving the functionality.

@crusaderky
Copy link
Collaborator Author

crusaderky commented Jul 19, 2023

This is ready for review

@crusaderky crusaderky marked this pull request as ready for review July 19, 2023 13:49
@crusaderky crusaderky force-pushed the gather branch 4 times, most recently from 90bbdab to fd83b33 Compare July 19, 2023 21:02
@hendrikmakait hendrikmakait self-requested a review July 20, 2023 07:18
Comment on lines 2330 to 2331
if response["status"] == "OK":
response["data"].update(data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this isn't new, I think it would be good to add a test to ensure that the refactoring works as expected.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to get rid of the whole functionality in #7993, so it would be throwaway work.

Comment on lines 141 to 143
for key in d[address]:
missing_keys.add(key)
del to_gather[key]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially only one of these keys caused the exception. Would it make sense to attempt gathering them individually to keep the # missing keys to a minimum?

We may also consider returning these as erring keys, not missing. However, this is out of scope.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed code to mark keys as erring, meaning that gather() won't try again.
However, note that there is no transition memory->error in the scheduler; this is the same problem as with gather_dep (#6705).

Yes, we could try a fetch the keys one by one, but it feels like a substantial complication and ultimately overkill for what, in theory, should be a rare-ish problem easily weeded out during development?

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot happening in this PR. Can we break this up a bit?

Comment on lines 2327 to 2331
response = await retry_operation(
self.scheduler.gather, keys=missing_keys
)
if response["status"] == "OK":
response["data"].update(data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR introduces significant changes to how we are gathering data. I strongly suggest to separate aesthetical refactoring from functional changes. Especially considering that this diff covers both logical changes (who_has) and aesthetical refactorings (if/else). This makes reviewing much harder than it should be.

Comment on lines 34 to 37
who_has: Callable[
[list[str]],
Mapping[str, Collection[str]] | Awaitable[Mapping[str, Collection[str]]],
],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced this callback structure is the right approach here. I find this makes the entire mechanism even harder to reason about. Besides, this bypasses (for better or worse) missing/update_who_has mechanics on the worker.
I believe this kind of logic should be handled a layer further up the stack instead of throwing this all into this function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method on the worker exclusively serves replicate, which we need to pen in to replace with AMM to begin with. The original method bypasses the whole worker missing/update_who_has system and I chose not to change that logic since it would be very complicated and ultimately throwaway.

Comment on lines 2873 to 2878
with captured_logger("distributed.scheduler") as sched_logger:
with captured_logger("distributed.client") as client_logger:
assert await c.gather(x, direct=False) == 2

assert s.tasks[fin.key].who_has == {s.workers[b.address]}
assert a.state.executed_count == 2
assert b.state.executed_count >= 1
# ^ leave room for a future switch from `remove_worker` to `retire_workers`
assert sched_logger.getvalue() == "Couldn't gather keys: {'x': 'memory'}\n" * 3
assert "Couldn't gather 1 keys, rescheduling" in client_logger.getvalue()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting aside for a moment what changes you are proposing here. Just reading this test I believe this behavior is false.

This test is fetching data via the scheduler but is running into connection failures. However, the Worker is still alive (otherwise the BatchedSend would've been broken and the Worker removed). Despite of knowing that the Worker is alive and it merely struggles to connect, it is rescheduling the key? This feels like an unwarranted escalation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the previous behaviour. I did not touch nor investigate it.
More specifically, I exclusively amended Client._gather_remote and didn't go into the several layers worth of wrappers around it.
Happy to look into them, but it should be left to a later PR.

@crusaderky
Copy link
Collaborator Author

As discussed online and offline:

@crusaderky
Copy link
Collaborator Author

@hendrikmakait @fjetter
This is in theory complete and ready for a second round of review.
However, I'm seeing a lot of instability in CI.

These failures I've never seen before; I'll investigate:

  • 1 out of 10 runs failed: test_TaskStreamPlugin (distributed.diagnostics.tests.test_task_stream)
  • 1 out of 10 runs failed: test_gather_after_failed_worker (distributed.tests.test_failed_workers)
  • 1 out of 10 runs failed: test_dont_steal_fast_tasks_compute_time (distributed.tests.test_steal)

These are known offenders but potentially impacted by this PR; I'll investigate:

  • 1 out of 10 runs failed: test_gather_then_submit_after_failed_workers (distributed.tests.test_failed_workers)
  • 1 out of 10 runs failed: test_tell_workers_when_peers_have_left (distributed.tests.test_scheduler)
  • 2 out of 10 runs failed: test_chaos_rechunk (distributed.tests.test_stress)

These are known offenders which shouldn't be impacted:

  • 1 out of 8 runs failed: test_closed_input_only_worker_during_transfer (distributed.shuffle.tests.test_shuffle)
  • 1 out of 8 runs failed: test_closed_worker_during_transfer (distributed.shuffle.tests.test_shuffle)
  • 1 out of 8 runs failed: test_crashed_worker_during_transfer (distributed.shuffle.tests.test_shuffle)
  • 2 out of 8 runs failed: test_restarting_during_transfer_raises_killed_worker (distributed.shuffle.tests.test_shuffle)
  • 1 out of 10 runs failed: test_file_descriptors_dont_leak[Nanny] (distributed.tests.test_client)

This reverts commit 3a67188.
@crusaderky
Copy link
Collaborator Author

I've run all tests above 10 times on each CI environment, on main and on this PR, and counted the number of failures, and I can see no regression; this is ready for final review and merge.

On a side note, this was an extremely time-consuming activity and we should urgently pen in the time to fix CI.

test Failures in main Failures in this PR
diagnostics/tests/test_task_stream.py::test_TaskStreamPlugin 1 0
shuffle/tests/test_shuffle.py::test_clean_after_close 1 0
shuffle/tests/test_shuffle.py::test_closed_input_only_worker_during_transfer 0 1
shuffle/tests/test_shuffle.py::test_closed_worker_during_transfer 29 35
shuffle/tests/test_shuffle.py::test_crashed_worker_during_transfer 6 1
shuffle/tests/test_shuffle.py::test_restarting_during_transfer_raises_killed_worker 38 33
tests/test_client.py::test_file_descriptors_dont_leak 9 10
tests/test_failed_workers.py::test_gather_after_failed_worker 4 2
tests/test_failed_workers.py::test_gather_then_submit_after_failed_workers 9 1
tests/test_scheduler.py::test_tell_workers_when_peers_have_left 1 1
tests/test_stress.py::test_chaos_rechunk 0 1

@hendrikmakait hendrikmakait self-requested a review August 7, 2023 16:09
Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above comment is not essential. I think it's good practice to cleanup our asyncio game but considering this hasn't bothered us before, I assume this thing is just never cancelled.

Comment on lines -89 to +98
for worker, c in coroutines.items():
for address, task in tasks.items():
try:
r = await c
r = await task
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is the same as before but the task scheduling pattern here is problematic in the event of cancellation.

If gather_from_workers is cancelled after the tasks were created, only the task we're currently awaiting is cancelled, all others will continue running and we'll get a "never awaited foo" warning.

I guess this coroutinefunction is never actually cancelled which is why we never ran into this...

The correct approach to this would be to use asyncio.gather (or even better but not backwards copatible, the TaskGroup of asyncio in python 3.11+)

I think the changes should be straight forward to something like

results = asyncio.gather(tasks, return_exceptions=True)
for addr, res in results.items():
    if isinstance(res, OSError):
        ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll address it in a follow-up

@crusaderky crusaderky merged commit 229a16f into dask:main Aug 9, 2023
@crusaderky crusaderky deleted the gather branch August 9, 2023 14:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

gather() prints confusing error messages Race condition between gather() and AMM Client/Scheduler gather not robust to busy worker

3 participants