Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 59 additions & 2 deletions distributed/collections.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
from __future__ import annotations

import dataclasses
import heapq
import itertools
import weakref
from collections import OrderedDict, UserDict
from collections.abc import Callable, Hashable, Iterator
from typing import MutableSet # TODO move to collections.abc (requires Python >=3.9)
from typing import Any, TypeVar, cast
from typing import ( # TODO move to collections.abc (requires Python >=3.9)
Any,
Container,
MutableSet,
TypeVar,
cast,
)

T = TypeVar("T", bound=Hashable)

Expand Down Expand Up @@ -199,3 +205,54 @@ def clear(self) -> None:
self._data.clear()
self._heap.clear()
self._sorted = True


# NOTE: only used in Scheduler; if work stealing is ever removed,
# this could be moved to `scheduler.py`.
@dataclasses.dataclass
class Occupancy:
cpu: float
network: float

def __add__(self, other: Any) -> Occupancy:
if isinstance(other, type(self)):
return type(self)(self.cpu + other.cpu, self.network + other.network)
return NotImplemented

def __iadd__(self, other: Any) -> Occupancy:
if isinstance(other, type(self)):
self.cpu += other.cpu
self.network += other.network
return self
return NotImplemented

def __sub__(self, other: Any) -> Occupancy:
if isinstance(other, type(self)):
return type(self)(self.cpu - other.cpu, self.network - other.network)
return NotImplemented

def __isub__(self, other: Any) -> Occupancy:
if isinstance(other, type(self)):
self.cpu -= other.cpu
self.network -= other.network
return self
return NotImplemented

def __bool__(self) -> bool:
return self.cpu != 0 or self.network != 0

def __eq__(self, other: Any) -> bool:
if isinstance(other, type(self)):
return self.cpu == other.cpu and self.network == other.network
return NotImplemented

def clear(self) -> None:
self.cpu = 0.0
self.network = 0.0

def _to_dict(self, *, exclude: Container[str] = ()) -> dict[str, float]:
return {"cpu": self.cpu, "network": self.network}

@property
def total(self) -> float:
return self.cpu + self.network
55 changes: 34 additions & 21 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,14 @@ def __init__(self, scheduler, **kwargs):
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"occupancy": [0, 0],
"worker": ["a", "b"],
"x": [0.0, 0.1],
"y": [1, 2],
"ms": [1, 2],
"color": ["red", "blue"],
"escaped_worker": ["a", "b"],
"occupancy_network": [],
"occupancy_cpu": [],
"occupancy_network_ms": [],
"occupancy_cpu_ms": [],
"worker": [],
"y": [],
"color": [],
"escaped_worker": [],
}
)

Expand All @@ -139,10 +140,14 @@ def __init__(self, scheduler, **kwargs):
min_border_bottom=50,
**kwargs,
)
rect = self.root.rect(
source=self.source, x="x", width="ms", y="y", height=0.9, color="color"
self.root.hbar_stack(
["occupancy_network_ms", "occupancy_cpu_ms"],
source=self.source,
y="y",
height=0.9,
fill_alpha=[0.8, 1.0],
color="color",
)
rect.nonselection_glyph = None

self.root.xaxis.minor_tick_line_alpha = 0
self.root.yaxis.visible = False
Expand All @@ -153,7 +158,9 @@ def __init__(self, scheduler, **kwargs):
tap = TapTool(callback=OpenURL(url="./info/worker/@escaped_worker.html"))

hover = HoverTool()
hover.tooltips = "@worker : @occupancy s."
hover.tooltips = (
"@worker : network: @occupancy_network s, cpu: @occupancy_cpu s."
)
hover.point_policy = "follow_mouse"
self.root.add_tools(hover, tap)

Expand All @@ -163,10 +170,14 @@ def update(self):
workers = self.scheduler.workers.values()

y = list(range(len(workers)))
occupancy = [ws.occupancy for ws in workers]
ms = [occ * 1000 for occ in occupancy]
x = [occ / 500 for occ in occupancy]
total = sum(occupancy)
occupancy_network, occupancy_cpu = zip(
*((ws.occupancy.network, ws.occupancy.cpu) for ws in workers)
)
occupancy_network = np.array(occupancy_network)
occupancy_cpu = np.array(occupancy_cpu)
total_network = occupancy_network.sum()
total_cpu = occupancy_cpu.sum()
total = total_network + total_cpu
color = []
for ws in workers:
if ws in self.scheduler.idle:
Expand All @@ -178,20 +189,22 @@ def update(self):

if total:
self.root.title.text = (
f"Occupancy -- total time: {format_time(total)} "
f"wall time: {format_time(total / self.scheduler.total_nthreads)}"
f"Total time: {format_time(total)}, "
f"wall time: {format_time(total / self.scheduler.total_nthreads)}, "
f"{total_network / total:.0%} network"
)
else:
self.root.title.text = "Occupancy"

if occupancy:
if workers:
result = {
"occupancy": occupancy,
"occupancy_network": occupancy_network,
"occupancy_cpu": occupancy_cpu,
"occupancy_network_ms": occupancy_network * 1000,
"occupancy_cpu_ms": occupancy_cpu * 1000,
"worker": [ws.address for ws in workers],
"ms": ms,
"color": color,
"escaped_worker": [escape.url_escape(ws.address) for ws in workers],
"x": x,
"y": y,
}

Expand Down
6 changes: 4 additions & 2 deletions distributed/http/templates/worker-table.html
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
<th> Cores </th>
<th> Memory </th>
<th> Memory use </th>
<th> Occupancy </th>
<th> Network Occupancy </th>
<th> CPU Occupancy </th>
<th> Processing </th>
<th> In-memory </th>
<th> Services</th>
Expand All @@ -19,7 +20,8 @@
<td> {{ ws.nthreads }} </td>
<td> {{ format_bytes(ws.memory_limit) if ws.memory_limit is not None else "" }} </td>
<td> <progress class="progress" value="{{ ws.metrics['memory'] }}" max="{{ ws.memory_limit }}"></progress> </td>
<td> {{ format_time(ws.occupancy) }} </td>
<td> {{ format_time(ws.occupancy.network) }} </td>
<td> {{ format_time(ws.occupancy.cpu) }} </td>
<td> {{ len(ws.processing) }} </td>
<td> {{ len(ws.has_what) }} </td>
{% if 'dashboard' in ws.services %}
Expand Down
Loading