diff --git a/distributed/scheduler.py b/distributed/scheduler.py index c538056e7dc..19963fb7654 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3720,7 +3720,6 @@ def __init__( worker_handlers = { "task-finished": self.handle_task_finished, "task-erred": self.handle_task_erred, - "release": self.handle_release_data, "release-worker-data": self.release_worker_data, "add-keys": self.add_keys, "missing-data": self.handle_missing_data, @@ -4739,70 +4738,23 @@ def stimulus_task_erred( parent: SchedulerState = cast(SchedulerState, self) logger.debug("Stimulus task erred %s, %s", key, worker) - recommendations: dict = {} - client_msgs: dict = {} - worker_msgs: dict = {} - ts: TaskState = parent._tasks.get(key) - if ts is None: - return recommendations, client_msgs, worker_msgs - - if ts._state == "processing": - retries: Py_ssize_t = ts._retries - r: tuple - if retries > 0: - ts._retries = retries - 1 - r = parent._transition(key, "waiting") - else: - r = parent._transition( - key, - "erred", - cause=key, - exception=exception, - traceback=traceback, - worker=worker, - **kwargs, - ) - recommendations, client_msgs, worker_msgs = r - - return recommendations, client_msgs, worker_msgs - - def stimulus_missing_data( - self, cause=None, key=None, worker=None, ensure=True, **kwargs - ): - """Mark that certain keys have gone missing. Recover.""" - parent: SchedulerState = cast(SchedulerState, self) - with log_errors(): - logger.debug("Stimulus missing data %s, %s", key, worker) - - recommendations: dict = {} - client_msgs: dict = {} - worker_msgs: dict = {} - - ts: TaskState = parent._tasks.get(key) - if ts is None or ts._state == "memory": - return recommendations, client_msgs, worker_msgs - cts: TaskState = parent._tasks.get(cause) - - if cts is not None and cts._state == "memory": # couldn't find this - ws: WorkerState - cts_nbytes: Py_ssize_t = cts.get_nbytes() - for ws in cts._who_has: # TODO: this behavior is extreme - del ws._has_what[ts] - ws._nbytes -= cts_nbytes - cts._who_has.clear() - recommendations[cause] = "released" - - if key: - recommendations[key] = "released" - - parent._transitions(recommendations, client_msgs, worker_msgs) - recommendations = {} + if ts is None or ts._state != "processing": + return {}, {}, {} - if parent._validate: - assert cause not in self.who_has - - return recommendations, client_msgs, worker_msgs + if ts._retries > 0: + ts._retries -= 1 + return parent._transition(key, "waiting") + else: + return parent._transition( + key, + "erred", + cause=key, + exception=exception, + traceback=traceback, + worker=worker, + **kwargs, + ) def stimulus_retry(self, comm=None, keys=None, client=None): parent: SchedulerState = cast(SchedulerState, self) @@ -5346,25 +5298,6 @@ def handle_task_erred(self, key=None, **msg): self.send_all(client_msgs, worker_msgs) - def handle_release_data(self, key=None, worker=None, client=None, **msg): - parent: SchedulerState = cast(SchedulerState, self) - ts: TaskState = parent._tasks.get(key) - if ts is None: - return - ws: WorkerState = parent._workers_dv.get(worker) - if ws is None or ts._processing_on != ws: - return - - recommendations: dict - client_msgs: dict - worker_msgs: dict - - r: tuple = self.stimulus_missing_data(key=key, ensure=False, **msg) - recommendations, client_msgs, worker_msgs = r - parent._transitions(recommendations, client_msgs, worker_msgs) - - self.send_all(client_msgs, worker_msgs) - def handle_missing_data(self, key=None, errant_worker=None, **kwargs): parent: SchedulerState = cast(SchedulerState, self) logger.debug("handle missing data key=%s worker=%s", key, errant_worker)