Skip to content

Conversation

@fjetter
Copy link
Member

@fjetter fjetter commented Nov 18, 2019

Issue description

We're observing stability issues when gathering data. What we can currently reconstruct is the following

  1. During gathering a connection issue happens which we see as an EnvironmentError (Root cause unknown, worker seems healthy otherwise).
  2. The data collection logic (gather_from_workers) flags the worker for which the connection fails as missing.
  3. The missing worker is closed forcefully here. Since this worker holds the final result we will loose the result as well. Since the dependencies of the final task were already transitioned to released earlier on this amounts to a full data loss and the entire graph is rescheduled.
  4. Reschedule the lost key, i.e. the entire graph.

Root cause

This line removes workers if they are considered missing. As the comment already notes this is extreme and tbh I'm missing the reasoning there.
imho, this is not the place for dead worker cleanup and one should defer this responsibility to tasks which are intended for it, e.g. worker TTL. If there is a reason for this implementation, I'll happily learn something new and will add a comment there.

Changes in this PR

My intention is it to establish a better robustness in case of failure scenarios. My proposed implementation would get rid of the worker closing and would rely on another mechanism to clean up actual dead or misbehaving workers.
If we face a connection issue we retry previously failed workers until "no improvement" is detected but at least three times (there is obviously room for refinement). For happy path scenarios this behaves just like before but for connection failures we should be much more robust.
I hard coded the backoff and retry count since I wasn't sure if this is a good configuration parameter.

Open Question

I'm wondering how deep we should patch this in the stack. In this particular instance we have a call chain of
gather_from_workers -> get_data_from_worker -> send_recv/comm.read/comm.write
Where I could argue that even the comm.read/write should already retry but I'm not sure about the implications or if this isn't implemented intentionally this way.

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle this seems fine to me. I'm curious what happens if a worker has genuinely died / stalled in a way where it appears to still be alive. I'm not sure that we actually enforce TTL timeouts on dask workers today.

iteration = 0
while missing_workers:
missing_workers_begin = missing_workers.copy()
missed_data, missing_keys, missing_workers = await gather_from_workers(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the name missed_data? It seems like this is the data that we aren't missing.

@fjetter
Copy link
Member Author

fjetter commented Nov 20, 2019

I'm curious what happens if a worker has genuinely died / stalled in a way where it appears to still be alive.

The remove_worker within gather is the only place in the entire scheduler where, upon connection issue, the worker is actually requested to be closed (not only removed from the scheduler).
Genuinely dead workers are probably handled by the below mentioned methods. If the worker is in a weird semi-dead state I am wondering if the closing would actually work. Either way a more sensible approach seems appropriate.

There are currently two other places where workers are removed if a connection breaks.

These two instances rely on the same long running connection as far as I can tell so it is essentially the same error scenario (maybe the duplication might even cause issues since the one handles the removal immediately while the other one schedules a callback... Not my point though.)
The difference for these, compared to the removal within the gather method, is that they will not close the worker since the closing of the worker relies on the stream_comm. Both worker_send and handle_worker will only trigger the error handling / worker removal if the stream_comm is broken/non existent.
Therefore, the scheduler essentially only closes the workers during worker_ttl (or upon explicit request , e.g. restart, retire_worker, ...)

I'm not sure that we actually enforce TTL timeouts on dask workers today

No it isn't enforced, not even enabled by default. My personal feeling on this matter would be to strongly recommend users to either enable TTL or lifetime to cycle workers eventually but I guess this depends on the use case.

Either way it's hard for me to imagine how the scheduler should be able to close a worker if it behaves completely out of line. This is where the nanny or even an external cluster manager must intervene.

@fjetter fjetter force-pushed the robust_gather branch 5 times, most recently from 794f40b to 44df3e7 Compare November 20, 2019 17:03
@fjetter
Copy link
Member Author

fjetter commented Nov 20, 2019

I pushed the retry logic down to worker.get_data_from_worker where the retry is much simpler to implement and many places would benefit from this.

@mrocklin
Copy link
Member

Thanks for the update @fjetter . The retry logic looks good to me. I hope that this improves things.

I'm still uncertain about removing the remote_worker call, but that's mostly because I haven't had the time to review the implications of this closely (my apologies also for the delayed response). Given this, I think that we have two options:

  1. We merge this as-is, and you promise to help out if removing those lines ends up harming other use cases in the near-to-moderate future
  2. We keep the remove_worker lines, and just add the retry logic, which will hopefully avoid us having to call it in the future.

Thoughts?

@fjetter
Copy link
Member Author

fjetter commented Nov 25, 2019

you promise to help out if removing those lines ends up harming other use cases in the near-to-moderate future

If the removal of remove_worker poses issues I will help out. We have a strong interest for this to work properly.

We keep the remove_worker lines, and just add the retry logic

I fully agree with the code comment This is extreme and think this should not be there in the way it is at the moment.
Probably a better way to handle these situations would be to replace the current self.remove_worker(address=worker) with a self.remove_worker(address=worker, close=False). This would have the following effect:

  • close=False do not send the close signal to the worker but only remove it from internal scheduler state. This allows the worker to reconnect and we wouldn't loose state.
  • safe=False (i.e. keep the default) increase the suspicious count for all processing tasks on the worker. Without this, I'm not sure if the suspicious count is increased anywhere at all, effectively disabling the "suspicious tasks" feature.

@mrocklin
Copy link
Member

If the removal of remove_worker poses issues I will help out. We have a strong interest for this to work properly.

OK then. I'm happy to defer to your judgement here then. My guess is that at this point you're probably is familiar (or more familiar) with this code than I am :) . I'm happy to merge this as-is, or wait until you add the close/safe=False, whichever you think is best.

@mrocklin
Copy link
Member

It looks like there is a linting failure, and also some intermittent failures. The test_workspace_concurrency and test_nanny_terminate failures are known (my apologies for not fixing these yet, but the test_gather_failing_cnn_error failure is new.

@fjetter
Copy link
Member Author

fjetter commented Nov 26, 2019

but the test_gather_failing_cnn_error failure is new.

Fixed

I added a rather long and complicated test case to capture the scenario we are facing. The test should cover that the worker is actually allowed to reconnect and may register its keys again such that they don't need to be computed again. If this test is too specific/flaky or poses issues in the future due to the specific log messages we can iterate it, of course.

What I also see is that in this specific scenario we see a
Unexpected worker completed task, likely due to work stealing. Expected: %s, Got: %s, Key: %s
I didn't include this in the test because it fells rather like an unwanted side effect than intended behaviour. To my understanding it shouldn't harm anyone, though (see transition_processing_memory)

Cause: Upon removing the worker from the scheduler, it is also removed from the TaskStates processing_on attribute. Once the worker reconnects and offers the result it is "unexpected" for the scheduler but it doesn't look harmful.

@mrocklin
Copy link
Member

OK. Merging this in. Thank you for this fix @fjetter ! It's great having you around.

@mrocklin mrocklin merged commit 1d9aaac into dask:master Nov 26, 2019
@amerkel2
Copy link
Contributor

amerkel2 commented Dec 2, 2019

Thanks for working on this! Since we are seeing this problem quite frequently: When can we expect the next release including this fix?

@mrocklin
Copy link
Member

mrocklin commented Dec 2, 2019

Dask tends to do a release every couple of weeks. My guess would be this coming Friday.

@amerkel2
Copy link
Contributor

amerkel2 commented Dec 3, 2019

Thanks for the info, that would be great!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants