From 61c20b62c9e4ee537189abbb19ff5baed4da62d2 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Tue, 25 May 2021 10:53:06 +0200 Subject: [PATCH 1/6] Checkpointing WIP on improved plugin. --- distributed/plugins/autorestrictor.py | 139 ++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 distributed/plugins/autorestrictor.py diff --git a/distributed/plugins/autorestrictor.py b/distributed/plugins/autorestrictor.py new file mode 100644 index 00000000000..d86601ea643 --- /dev/null +++ b/distributed/plugins/autorestrictor.py @@ -0,0 +1,139 @@ +from collections import defaultdict + +from distributed import SchedulerPlugin +from dask.core import reverse_dict, get_dependencies +from dask.base import tokenize +from dask.order import graph_metrics, ndependencies + + +def install_plugin(dask_scheduler=None, **kwargs): + dask_scheduler.add_plugin(AutoRestrictor(**kwargs), idempotent=True) + + +def unravel_deps(hlg_deps, name, unravelled_deps=None): + """Recursively construct a set of all dependencies for a specific task.""" + + if unravelled_deps is None: + unravelled_deps = set() + + for dep in hlg_deps[name]: + unravelled_deps |= {dep} + unravel_deps(hlg_deps, dep, unravelled_deps) + + return unravelled_deps + + +def get_node_depths(dependencies, root_nodes, metrics): + + node_depths = {} + + for k in dependencies.keys(): + # Get dependencies per node. + deps = unravel_deps(dependencies, k) + # Associate nodes with root nodes. + roots = root_nodes & deps + offset = metrics[k][-1] + node_depths[k] = \ + max(metrics[r][-1] - offset for r in roots) if roots else 0 + + return node_depths + + +class AutoRestrictor(SchedulerPlugin): + + def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, + **kw): + """Processes dependencies to assign tasks to specific workers.""" + + workers = list(scheduler.workers.keys()) + n_worker = len(workers) + + tasks = scheduler.tasks + dependencies = kw["dependencies"] + dependents = reverse_dict(dependencies) + + _, total_dependencies = ndependencies(dependencies, dependents) + metrics = graph_metrics(dependencies, dependents, total_dependencies) + + # Terminal nodes have no dependents, root nodes have no dependencies. + terminal_nodes = {k for (k, v) in dependents.items() if not v} + root_nodes = {k for (k, v) in dependencies.items() if not v} + + # Figure out the depth of every task. Depth is defined as maximum + # distance from a root node. + node_depths = get_node_depths(dependencies, root_nodes, metrics) + max_depth = max(node_depths.values()) + + # If we have fewer terminal nodes than workers, we cannot utilise all + # the workers and are likely dealing with a reduction. We work our way + # back through the graph, starting at the deepest terminal nodes, and + # try to find a depth at which there was enough work to utilise all + # workers. + while len(terminal_nodes) < n_worker: + _terminal_nodes = terminal_nodes.copy() + for tn in _terminal_nodes: + if node_depths[tn] == max_depth: + terminal_nodes ^= set((tn,)) + terminal_nodes |= dependencies[tn] + max_depth -= 1 + if max_depth == -1: + raise ValueError("AutoRestrictor cannot determine a sensible " + "work assignment pattern. Falling back to " + "default behaviour.") + + roots_per_terminal = {} + terminal_dependencies = {} + terminal_dependents = {} + + for tn in terminal_nodes: + # Get dependencies per terminal node. + terminal_dependencies[tn] = unravel_deps(dependencies, tn) + # Get dependents per terminal node. TODO: This terminology is + # confusing - the terminal nodes are not necessarily the last. + terminal_dependents[tn] = unravel_deps(dependents, tn) + # Associate terminal nodes with root nodes. + roots_per_terminal[tn] = root_nodes & terminal_dependencies[tn] + + # Create a unique token for each set of terminal roots. TODO: This is + # very strict. What about nodes with very similar roots? Tokenization + # may be overkill too. + root_tokens = \ + {tokenize(*sorted(v)): v for v in roots_per_terminal.values()} + + hash_map = defaultdict(set) + group_offset = 0 + + # Associate terminal roots with a specific group if they are not a + # subset of another larger root set. TODO: This can likely be improved. + for k, v in root_tokens.items(): + if any([v < vv for vv in root_tokens.values()]): # Strict subset. + continue + else: + hash_map[k] |= set([group_offset]) + group_offset += 1 + + # If roots were a subset, they should share the annotation of their + # superset/s. + for k, v in root_tokens.items(): + shared_roots = \ + {kk: None for kk, vv in root_tokens.items() if v < vv} + if shared_roots: + hash_map[k] = \ + set().union(*[hash_map[kk] for kk in shared_roots.keys()]) + + for k, deps in terminal_dependencies.items(): + + # TODO: This can likely be improved. + group = hash_map[tokenize(*sorted(roots_per_terminal[k]))] + + # Set restrictions on a terminal node and its dependencies. + for tn in [k, *deps]: + try: + task = tasks[tn] + except KeyError: # Keys may not have an assosciated task. + continue + if task._worker_restrictions is None: + task._worker_restrictions = set() + task._worker_restrictions |= \ + {workers[g % n_worker] for g in group} + task._loose_restrictions = False From dd8918c3c9598d788e1339d2ac6abce2e0b979a8 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Tue, 25 May 2021 14:13:29 +0200 Subject: [PATCH 2/6] Scheduler plugin should now correctly handle simple reductions. --- distributed/plugins/autorestrictor.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/distributed/plugins/autorestrictor.py b/distributed/plugins/autorestrictor.py index d86601ea643..5af285a9e25 100644 --- a/distributed/plugins/autorestrictor.py +++ b/distributed/plugins/autorestrictor.py @@ -1,7 +1,7 @@ from collections import defaultdict from distributed import SchedulerPlugin -from dask.core import reverse_dict, get_dependencies +from dask.core import reverse_dict from dask.base import tokenize from dask.order import graph_metrics, ndependencies @@ -121,13 +121,16 @@ def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, hash_map[k] = \ set().union(*[hash_map[kk] for kk in shared_roots.keys()]) - for k, deps in terminal_dependencies.items(): + for k in terminal_dependencies.keys(): + + tdp = terminal_dependencies[k] + tdn = terminal_dependents[k] if terminal_dependents[k] else set() # TODO: This can likely be improved. group = hash_map[tokenize(*sorted(roots_per_terminal[k]))] # Set restrictions on a terminal node and its dependencies. - for tn in [k, *deps]: + for tn in [k, *tdp, *tdn]: try: task = tasks[tn] except KeyError: # Keys may not have an assosciated task. From e3aea3b525091f6663224507ca82268716039026 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Mon, 31 May 2021 11:05:40 +0200 Subject: [PATCH 3/6] Assign work to least occupied worker. --- distributed/plugins/autorestrictor.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/distributed/plugins/autorestrictor.py b/distributed/plugins/autorestrictor.py index 5af285a9e25..b9d513a3a02 100644 --- a/distributed/plugins/autorestrictor.py +++ b/distributed/plugins/autorestrictor.py @@ -121,13 +121,28 @@ def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, hash_map[k] = \ set().union(*[hash_map[kk] for kk in shared_roots.keys()]) + worker_weights = dict(zip(workers, (0,) * len(workers))) + assignments = {} + for k in terminal_dependencies.keys(): tdp = terminal_dependencies[k] - tdn = terminal_dependents[k] if terminal_dependents[k] else set() + tdn = terminal_dependents[k] + + # Assume that the amount of work required is proportional to the + # number of tasks involved. + weight = len(tdp) + len(tdn) # TODO: This can likely be improved. - group = hash_map[tokenize(*sorted(roots_per_terminal[k]))] + groups = hash_map[tokenize(*sorted(roots_per_terminal[k]))] + + for g in groups: + if g in assignments: + assignee = assignments[g] + else: + assignee = min(worker_weights, key=worker_weights.get) + worker_weights[assignee] += weight + assignments[g] = assignee # Set restrictions on a terminal node and its dependencies. for tn in [k, *tdp, *tdn]: @@ -137,6 +152,5 @@ def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, continue if task._worker_restrictions is None: task._worker_restrictions = set() - task._worker_restrictions |= \ - {workers[g % n_worker] for g in group} + task._worker_restrictions |= {assignments[g] for g in groups} task._loose_restrictions = False From 681ddea64a3bcbe703186ac993be5055ee2e5c0b Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Mon, 31 May 2021 12:13:22 +0200 Subject: [PATCH 4/6] Tidy up and handle nodes with no dependencies. --- distributed/plugins/autorestrictor.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/distributed/plugins/autorestrictor.py b/distributed/plugins/autorestrictor.py index b9d513a3a02..cecad8a5cc4 100644 --- a/distributed/plugins/autorestrictor.py +++ b/distributed/plugins/autorestrictor.py @@ -76,7 +76,7 @@ def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, terminal_nodes ^= set((tn,)) terminal_nodes |= dependencies[tn] max_depth -= 1 - if max_depth == -1: + if max_depth == 0: raise ValueError("AutoRestrictor cannot determine a sensible " "work assignment pattern. Falling back to " "default behaviour.") @@ -106,7 +106,7 @@ def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, # Associate terminal roots with a specific group if they are not a # subset of another larger root set. TODO: This can likely be improved. for k, v in root_tokens.items(): - if any([v < vv for vv in root_tokens.values()]): # Strict subset. + if any(v < vv for vv in root_tokens.values()): # Strict subset. continue else: hash_map[k] |= set([group_offset]) @@ -115,6 +115,8 @@ def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, # If roots were a subset, they should share the annotation of their # superset/s. for k, v in root_tokens.items(): + if not v: # Special case - no dependencies. + continue shared_roots = \ {kk: None for kk, vv in root_tokens.items() if v < vv} if shared_roots: @@ -134,16 +136,21 @@ def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, weight = len(tdp) + len(tdn) # TODO: This can likely be improved. - groups = hash_map[tokenize(*sorted(roots_per_terminal[k]))] + if tdp: + groups = hash_map[tokenize(*sorted(roots_per_terminal[k]))] + else: # Special case - no dependencies. + groups = set((group_offset,)) + group_offset += 1 for g in groups: if g in assignments: assignee = assignments[g] else: assignee = min(worker_weights, key=worker_weights.get) + assignments[g] = assignee worker_weights[assignee] += weight - assignments[g] = assignee + assignees = {assignments[g] for g in groups} # Set restrictions on a terminal node and its dependencies. for tn in [k, *tdp, *tdn]: try: @@ -152,5 +159,5 @@ def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, continue if task._worker_restrictions is None: task._worker_restrictions = set() - task._worker_restrictions |= {assignments[g] for g in groups} + task._worker_restrictions |= assignees task._loose_restrictions = False From 40a9faf7582997b21a51af675b32e27583e100e5 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Mon, 31 May 2021 12:52:06 +0200 Subject: [PATCH 5/6] Clarify names, refine comments. --- distributed/plugins/autorestrictor.py | 77 ++++++++++++++------------- 1 file changed, 39 insertions(+), 38 deletions(-) diff --git a/distributed/plugins/autorestrictor.py b/distributed/plugins/autorestrictor.py index cecad8a5cc4..be0789e15b2 100644 --- a/distributed/plugins/autorestrictor.py +++ b/distributed/plugins/autorestrictor.py @@ -53,58 +53,58 @@ def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, dependents = reverse_dict(dependencies) _, total_dependencies = ndependencies(dependencies, dependents) + # TODO: Avoid calling graph metrics. metrics = graph_metrics(dependencies, dependents, total_dependencies) # Terminal nodes have no dependents, root nodes have no dependencies. - terminal_nodes = {k for (k, v) in dependents.items() if not v} + # Horizontal partition nodes are initialized as the terminal nodes. + part_nodes = {k for (k, v) in dependents.items() if not v} root_nodes = {k for (k, v) in dependencies.items() if not v} # Figure out the depth of every task. Depth is defined as maximum - # distance from a root node. + # distance from a root node. TODO: Optimize get_node_depths. node_depths = get_node_depths(dependencies, root_nodes, metrics) max_depth = max(node_depths.values()) - # If we have fewer terminal nodes than workers, we cannot utilise all + # If we have fewer partition nodes than workers, we cannot utilise all # the workers and are likely dealing with a reduction. We work our way # back through the graph, starting at the deepest terminal nodes, and # try to find a depth at which there was enough work to utilise all # workers. - while len(terminal_nodes) < n_worker: - _terminal_nodes = terminal_nodes.copy() - for tn in _terminal_nodes: - if node_depths[tn] == max_depth: - terminal_nodes ^= set((tn,)) - terminal_nodes |= dependencies[tn] + while len(part_nodes) < n_worker: + _part_nodes = part_nodes.copy() + for pn in _part_nodes: + if node_depths[pn] == max_depth: + part_nodes ^= set((pn,)) + part_nodes |= dependencies[pn] max_depth -= 1 if max_depth == 0: raise ValueError("AutoRestrictor cannot determine a sensible " "work assignment pattern. Falling back to " "default behaviour.") - roots_per_terminal = {} - terminal_dependencies = {} - terminal_dependents = {} + part_roots = {} + part_dependencies = {} + part_dependents = {} - for tn in terminal_nodes: - # Get dependencies per terminal node. - terminal_dependencies[tn] = unravel_deps(dependencies, tn) - # Get dependents per terminal node. TODO: This terminology is - # confusing - the terminal nodes are not necessarily the last. - terminal_dependents[tn] = unravel_deps(dependents, tn) - # Associate terminal nodes with root nodes. - roots_per_terminal[tn] = root_nodes & terminal_dependencies[tn] + for pn in part_nodes: + # Get dependencies per partition node. + part_dependencies[pn] = unravel_deps(dependencies, pn) + # Get dependents per partition node. + part_dependents[pn] = unravel_deps(dependents, pn) + # Associate partition nodes with root nodes. + part_roots[pn] = root_nodes & part_dependencies[pn] - # Create a unique token for each set of terminal roots. TODO: This is + # Create a unique token for each set of partition roots. TODO: This is # very strict. What about nodes with very similar roots? Tokenization # may be overkill too. - root_tokens = \ - {tokenize(*sorted(v)): v for v in roots_per_terminal.values()} + root_tokens = {tokenize(*sorted(v)): v for v in part_roots.values()} hash_map = defaultdict(set) group_offset = 0 - # Associate terminal roots with a specific group if they are not a - # subset of another larger root set. TODO: This can likely be improved. + # Associate partition roots with a specific group if they are not a + # subset of another, larger root set. for k, v in root_tokens.items(): if any(v < vv for vv in root_tokens.values()): # Strict subset. continue @@ -112,10 +112,10 @@ def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, hash_map[k] |= set([group_offset]) group_offset += 1 - # If roots were a subset, they should share the annotation of their + # If roots were a subset, they should share the group of their # superset/s. for k, v in root_tokens.items(): - if not v: # Special case - no dependencies. + if not v: # Special case - no dependencies. Handled below. continue shared_roots = \ {kk: None for kk, vv in root_tokens.items() if v < vv} @@ -126,20 +126,20 @@ def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, worker_weights = dict(zip(workers, (0,) * len(workers))) assignments = {} - for k in terminal_dependencies.keys(): + for pn in part_nodes: - tdp = terminal_dependencies[k] - tdn = terminal_dependents[k] + pdp = part_dependencies[pn] + pdn = part_dependents[pn] # Assume that the amount of work required is proportional to the - # number of tasks involved. - weight = len(tdp) + len(tdn) + # number of tasks involved. Add one for the task we are looking at. + weight = len(pdp) + len(pdn) + 1 # TODO: This can likely be improved. - if tdp: - groups = hash_map[tokenize(*sorted(roots_per_terminal[k]))] + if pdp: + groups = hash_map[tokenize(*sorted(part_roots[pn]))] else: # Special case - no dependencies. - groups = set((group_offset,)) + groups = {group_offset} group_offset += 1 for g in groups: @@ -151,10 +151,11 @@ def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, worker_weights[assignee] += weight assignees = {assignments[g] for g in groups} - # Set restrictions on a terminal node and its dependencies. - for tn in [k, *tdp, *tdn]: + # Set restrictions on a partition node, its depdendents and + # dependencies. + for task_name in [pn, *pdp, *pdn]: try: - task = tasks[tn] + task = tasks[task_name] except KeyError: # Keys may not have an assosciated task. continue if task._worker_restrictions is None: From b3c5df027dbf752ebb2276aab75353ab2d65601f Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Mon, 31 May 2021 15:59:25 +0200 Subject: [PATCH 6/6] Revise task assignment for improved balance. --- distributed/plugins/autorestrictor.py | 39 +++++++++++---------------- 1 file changed, 15 insertions(+), 24 deletions(-) diff --git a/distributed/plugins/autorestrictor.py b/distributed/plugins/autorestrictor.py index be0789e15b2..e1fe17e58a3 100644 --- a/distributed/plugins/autorestrictor.py +++ b/distributed/plugins/autorestrictor.py @@ -71,17 +71,15 @@ def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, # back through the graph, starting at the deepest terminal nodes, and # try to find a depth at which there was enough work to utilise all # workers. - while len(part_nodes) < n_worker: + while (len(part_nodes) < n_worker) & (max_depth > 0): _part_nodes = part_nodes.copy() for pn in _part_nodes: if node_depths[pn] == max_depth: part_nodes ^= set((pn,)) part_nodes |= dependencies[pn] max_depth -= 1 - if max_depth == 0: - raise ValueError("AutoRestrictor cannot determine a sensible " - "work assignment pattern. Falling back to " - "default behaviour.") + if max_depth <= 0: + return # In this case, there in nothing we can do - fall back. part_roots = {} part_dependencies = {} @@ -123,19 +121,13 @@ def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, hash_map[k] = \ set().union(*[hash_map[kk] for kk in shared_roots.keys()]) - worker_weights = dict(zip(workers, (0,) * len(workers))) - assignments = {} + task_groups = defaultdict(set) for pn in part_nodes: pdp = part_dependencies[pn] pdn = part_dependents[pn] - # Assume that the amount of work required is proportional to the - # number of tasks involved. Add one for the task we are looking at. - weight = len(pdp) + len(pdn) + 1 - - # TODO: This can likely be improved. if pdp: groups = hash_map[tokenize(*sorted(part_roots[pn]))] else: # Special case - no dependencies. @@ -143,22 +135,21 @@ def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, group_offset += 1 for g in groups: - if g in assignments: - assignee = assignments[g] - else: - assignee = min(worker_weights, key=worker_weights.get) - assignments[g] = assignee - worker_weights[assignee] += weight - - assignees = {assignments[g] for g in groups} - # Set restrictions on a partition node, its depdendents and - # dependencies. - for task_name in [pn, *pdp, *pdn]: + task_groups[g] |= pdp | pdn | {pn} + + worker_loads = {wkr: 0 for wkr in workers} + + for task_group in task_groups.values(): + + assignee = min(worker_loads, key=worker_loads.get) + worker_loads[assignee] += len(task_group) + + for task_name in task_group: try: task = tasks[task_name] except KeyError: # Keys may not have an assosciated task. continue if task._worker_restrictions is None: task._worker_restrictions = set() - task._worker_restrictions |= assignees + task._worker_restrictions |= {assignee} task._loose_restrictions = False