Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions distributed/active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,20 @@ class ActiveMemoryManagerExtension:
``distributed.scheduler.active-memory-manager``.
"""

#: Back-reference to the scheduler holding this extension
scheduler: Scheduler
#: All active policies
policies: set[ActiveMemoryManagerPolicy]
#: Memory measure to use. Must be one of the attributes or properties of
#: :class:`distributed.scheduler.MemoryState`.
measure: str
#: Run automatically every this many seconds
interval: float

# These attributes only exist within the scope of self.run()
# Current memory (in bytes) allocated on each worker, plus/minus pending actions
#: Current memory (in bytes) allocated on each worker, plus/minus pending actions
#: This attribute only exist within the scope of self.run().
workers_memory: dict[WorkerState, int]
# Pending replications and deletions for each task
#: Pending replications and deletions for each task
#: This attribute only exist within the scope of self.run().
pending: dict[TaskState, tuple[set[WorkerState], set[WorkerState]]]

def __init__(
Expand All @@ -63,6 +69,7 @@ def __init__(
# away on the fly a specialized manager, separate from the main one.
policies: set[ActiveMemoryManagerPolicy] | None = None,
*,
measure: str | None = None,
register: bool = True,
start: bool | None = None,
interval: float | None = None,
Expand All @@ -83,6 +90,23 @@ def __init__(
for policy in policies:
self.add_policy(policy)

if not measure:
measure = dask.config.get(
"distributed.scheduler.active-memory-manager.measure"
)
mem = scheduler.memory
measure_domain = {
name
for name in dir(mem)
if not name.startswith("_") and isinstance(getattr(mem, name), int)
}
if not isinstance(measure, str) or measure not in measure_domain:
raise ValueError(
"distributed.scheduler.active-memory-manager.measure "
"must be one of " + ", ".join(sorted(measure_domain))
)
self.measure = measure

if register:
scheduler.extensions["amm"] = self
scheduler.handlers["amm_handler"] = self.amm_handler
Expand All @@ -92,6 +116,7 @@ def __init__(
dask.config.get("distributed.scheduler.active-memory-manager.interval")
)
self.interval = interval

if start is None:
start = dask.config.get("distributed.scheduler.active-memory-manager.start")
if start:
Expand Down Expand Up @@ -140,8 +165,9 @@ def run_once(self) -> None:
assert not hasattr(self, "pending")

self.pending = {}
measure = self.measure
self.workers_memory = {
w: w.memory.optimistic for w in self.scheduler.workers.values()
ws: getattr(ws.memory, measure) for ws in self.scheduler.workers.values()
}
try:
# populate self.pending
Expand Down
10 changes: 9 additions & 1 deletion distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ properties:

active-memory-manager:
type: object
required: [start, interval, policies]
required: [start, interval, measure, policies]
additionalProperties: false
properties:
start:
Expand All @@ -287,6 +287,14 @@ properties:
type: string
description:
Time expression, e.g. "2s". Run the AMM cycle every <interval>.
measure:
enum:
- process
- optimistic
- managed
- managed_in_memory
description:
One of the attributes of distributed.scheduler.MemoryState
policies:
type: array
items:
Expand Down
10 changes: 8 additions & 2 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,17 @@ distributed:
# you'll have to either manually start it with client.amm.start() or run it once
# with client.amm.run_once().
start: false

# Once started, run the AMM cycle every <interval>
interval: 2s

# Memory measure to use. Must be one of the attributes of
# distributed.scheduler.MemoryState.
measure: optimistic

# Policies that should be executed at every cycle. Any additional keys in each
# object are passed as keyword arguments to the policy constructor.
policies:
# Policies that should be executed at every cycle. Any additional keys in each
# object are passed as keyword arguments to the policy constructor.
- class: distributed.active_memory_manager.ReduceReplicas

worker:
Expand Down
Loading