From 06b88f12719140f6f38548f73b88fb700535103a Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Fri, 19 Feb 2021 19:20:54 -0800 Subject: [PATCH] Move `new_task` to `SchedulerState` --- distributed/scheduler.py | 60 ++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index c13f5292538..f447661268e 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1764,6 +1764,33 @@ def __pdict__(self): "host_info": self._host_info, } + @ccall + @exceptval(check=False) + def new_task(self, key: str, spec: object, state: str) -> TaskState: + """ Create a new task, and associated states """ + ts: TaskState = TaskState(key, spec) + ts._state = state + + tp: TaskPrefix + prefix_key = key_split(key) + tp = self._task_prefixes.get(prefix_key) + if tp is None: + self._task_prefixes[prefix_key] = tp = TaskPrefix(prefix_key) + ts._prefix = tp + + tg: TaskGroup + group_key = ts._group_key + tg = self._task_groups.get(group_key) + if tg is None: + self._task_groups[group_key] = tg = TaskGroup(group_key) + tg._prefix = tp + tp._groups.append(tg) + tg.add(ts) + + self._tasks[key] = ts + + return ts + def transition_released_waiting(self, key): try: ts: TaskState = self._tasks[key] @@ -3771,7 +3798,7 @@ def update_graph( # XXX Have a method get_task_state(self, k) ? ts = parent._tasks.get(k) if ts is None: - ts = self.new_task(k, tasks.get(k), "released") + ts = parent.new_task(k, tasks.get(k), "released") elif not ts._run_spec: ts._run_spec = tasks.get(k) @@ -3945,33 +3972,6 @@ def update_graph( # TODO: balance workers - def new_task(self, key: str, spec: object, state: str): - """ Create a new task, and associated states """ - parent: SchedulerState = cast(SchedulerState, self) - - ts: TaskState = TaskState(key, spec) - ts._state = state - - tp: TaskPrefix - prefix_key = key_split(key) - tp = parent._task_prefixes.get(prefix_key) - if tp is None: - parent._task_prefixes[prefix_key] = tp = TaskPrefix(prefix_key) - ts._prefix = tp - - tg: TaskGroup - group_key = ts._group_key - tg = parent._task_groups.get(group_key) - if tg is None: - parent._task_groups[group_key] = tg = TaskGroup(group_key) - tg._prefix = tp - tp._groups.append(tg) - tg.add(ts) - - parent._tasks[key] = ts - - return ts - def stimulus_task_finished(self, key=None, worker=None, **kwargs): """ Mark that a task has finished execution on a particular worker """ parent: SchedulerState = cast(SchedulerState, self) @@ -4267,7 +4267,7 @@ def client_desires_keys(self, keys=None, client=None): ts = parent._tasks.get(k) if ts is None: # For publish, queues etc. - ts = self.new_task(k, None, "released") + ts = parent.new_task(k, None, "released") ts._who_wants.add(cs) cs._wants_what.add(ts) @@ -5585,7 +5585,7 @@ def update_data( for key, workers in who_has.items(): ts: TaskState = parent._tasks.get(key) if ts is None: - ts: TaskState = self.new_task(key, None, "memory") + ts: TaskState = parent.new_task(key, None, "memory") ts.state = "memory" if key in nbytes: ts.set_nbytes(nbytes[key])