Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 70 additions & 70 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,76 +539,6 @@ def ncores(self):
return self._nthreads


class TaskGroup:
"""Collection tracking all tasks within a group

Keys often have a structure like ``("x-123", 0)``
A group takes the first section, like ``"x-123"``

.. attribute:: name: str

The name of a group of tasks.
For a task like ``("x-123", 0)`` this is the text ``"x-123"``

.. attribute:: states: Dict[str, int]

The number of tasks in each state,
like ``{"memory": 10, "processing": 3, "released": 4, ...}``

.. attribute:: dependencies: Set[TaskGroup]

The other TaskGroups on which this one depends

.. attribute:: nbytes_total: int

The total number of bytes that this task group has produced

.. attribute:: nbytes_in_memory: int

The number of bytes currently stored by this TaskGroup

.. attribute:: duration: float

The total amount of time spent on all tasks in this TaskGroup

.. attribute:: types: Set[str]

The result types of this TaskGroup

See also
--------
TaskPrefix
"""

def __init__(self, name):
self.name = name
self.states = {state: 0 for state in ALL_TASK_STATES}
self.states["forgotten"] = 0
self.dependencies = set()
self.nbytes_total = 0
self.nbytes_in_memory = 0
self.duration = 0
self.types = set()

def add(self, ts):
self.states[ts.state] += 1
ts.group = self

def __repr__(self):
return (
"<"
+ (self.name or "no-group")
+ ": "
+ ", ".join(
"%s: %d" % (k, v) for (k, v) in sorted(self.states.items()) if v
)
+ ">"
)

def __len__(self):
return sum(self.states.values())


class TaskPrefix:
"""Collection tracking all tasks within a group

Expand Down Expand Up @@ -703,6 +633,76 @@ def types(self):
return set().union(*[tg.types for tg in self.groups])


class TaskGroup:
"""Collection tracking all tasks within a group

Keys often have a structure like ``("x-123", 0)``
A group takes the first section, like ``"x-123"``

.. attribute:: name: str

The name of a group of tasks.
For a task like ``("x-123", 0)`` this is the text ``"x-123"``

.. attribute:: states: Dict[str, int]

The number of tasks in each state,
like ``{"memory": 10, "processing": 3, "released": 4, ...}``

.. attribute:: dependencies: Set[TaskGroup]

The other TaskGroups on which this one depends

.. attribute:: nbytes_total: int

The total number of bytes that this task group has produced

.. attribute:: nbytes_in_memory: int

The number of bytes currently stored by this TaskGroup

.. attribute:: duration: float

The total amount of time spent on all tasks in this TaskGroup

.. attribute:: types: Set[str]

The result types of this TaskGroup

See also
--------
TaskPrefix
"""

def __init__(self, name):
self.name = name
self.states = {state: 0 for state in ALL_TASK_STATES}
self.states["forgotten"] = 0
self.dependencies = set()
self.nbytes_total = 0
self.nbytes_in_memory = 0
self.duration = 0
self.types = set()

def add(self, ts):
self.states[ts.state] += 1
ts.group = self

def __repr__(self):
return (
"<"
+ (self.name or "no-group")
+ ": "
+ ", ".join(
"%s: %d" % (k, v) for (k, v) in sorted(self.states.items()) if v
)
+ ">"
)

def __len__(self):
return sum(self.states.values())


class TaskState:
"""
A simple object holding information about a task.
Expand Down