-
-
Notifications
You must be signed in to change notification settings - Fork 748
Open
Description
When a worker offers multiple resources, e.g. GPU=2 CPU=4, a task may remain stuck in constrained state despite its resource being available.
The use case is:
- a task requiring a depleted resource is enqueued on the worker
- a task requiring a non-depleted resource is enqueued afterwards. The task should transition to executing immediately; instead it remains in the queue.
This is because Worker.constrained is a single deque for all tasks with resources, which will stop at the first task whose resources can't be satisfied.
Reproducer:
@gen_cluster(client=True, nthreads=[("", 2, {"resources": {"A": 1, "B": 1}})])
async def test_multi_resources(c, s, a):
# Let all tasks pile up in the Worker before they can be processed
evc = Event()
clog = c.submit(lambda ev: ev.wait(), evc, resources={"A": 1, "B": 1}, key="clog")
evx = Event()
x1 = c.submit(lambda ev: ev.wait(), evx, resources={"A": 1}, key="x1")
x2 = c.submit(lambda ev: ev.wait(), evx, resources={"A": 1}, key="x2")
y = c.submit(inc, 1, evx, resources={"B": 1}, key="y")
while len(a.tasks) < 4:
await asyncio.sleep(0.01)
assert a.tasks["clog"].state == "executing"
assert a.tasks["x1"].state == "constrained"
assert a.tasks["x2"].state == "constrained"
assert a.tasks["y"].state == "constrained"
await evc.set()
# x1 constrained -> executing
# x2 remains in queue as it uses the same resource as x1
# y constrained -> executing, despite being lower priority than x2, because it uses
# a different resource
await y
await evx.set()
await wait([x1, x2])The above reproducer hangs on await y.
Related issues:
Metadata
Metadata
Assignees
Labels
No labels