-
-
Notifications
You must be signed in to change notification settings - Fork 748
Description
When using a multitude of annotations to specify where the task shall end up, this creates certain incompatibilities, in particular with resources. The following test causes a KeyError since the task is transitioned prematurely to a waiting state.
(What I actually wanted to do in this test is "Never execute on B" which is why I chose this construction but that is not the point here)
I believe there is a semantic misalignment in here since the resources are actually treated as a hard requirement, regardless of whether the allow_other_workers is set or not. I believe this is a sane behaviour since we otherwise cannot really tell whether we should or should not subtract anything. I'm wondering if the meaning of the flag allow_other_workers should be redefined. For some of the fields it may make sense (e.g. workers) but for things like host restrictions or resource restrictions this doesn't feel sensible to me.
The below exception should be "easily" fixable but the intended behaviour is unclear to me.
Looking at our own documentation it is already clear that there is some misalignment. I would argue that the definition of allow_other_workers makes sense but the loose_restrictions label is a bit ill defined, or at the very least not in alignment with the flag
See
distributed/distributed/client.py
Lines 1520 to 1522 in 27e5925
| allow_other_workers : bool (defaults to False) | |
| Used with ``workers``. Indicates whether or not the computations | |
| may be performed on workers that are not in the `workers` set(s). |
distributed/distributed/scheduler.py
Lines 1240 to 1251 in 27e5925
| .. attribute:: loose_restrictions: bool | |
| If ``False``, each of :attr:`host_restrictions`, | |
| :attr:`worker_restrictions` and :attr:`resource_restrictions` is | |
| a hard constraint: if no worker is available satisfying those | |
| restrictions, the task cannot go into the "processing" state and | |
| will instead go into the "no-worker" state. | |
| If ``True``, the above restrictions are mere preferences: if no worker | |
| is available satisfying those restrictions, the task can still go | |
| into the "processing" state and be sent for execution to another | |
| connected worker. |
Traceback (most recent call last):
File "/Users/fjetter/workspace/distributed-main/distributed/scheduler.py", line 2378, in transition_waiting_processing
self.consume_resources(ts, ws)
File "/Users/fjetter/workspace/distributed-main/distributed/scheduler.py", line 3148, in consume_resources
ws._used_resources[r] += required
KeyError: 'A'
distributed.scheduler - ERROR - Error transitioning 'inc-aa226ae3a9f799819e1e685fba467442' from 'waiting' to 'processing'
Traceback (most recent call last):
File "/Users/fjetter/workspace/distributed-main/distributed/scheduler.py", line 2035, in _transition
a: tuple = func(key, *args, **kwargs)
File "/Users/fjetter/workspace/distributed-main/distributed/scheduler.py", line 2378, in transition_waiting_processing
self.consume_resources(ts, ws)
File "/Users/fjetter/workspace/distributed-main/distributed/scheduler.py", line 3148, in consume_resources
ws._used_resources[r] += required
KeyError: 'A'
distributed.utils - ERROR - 'A'
@gen_cluster(
client=True, nthreads=[("127.0.0.1", 1)], worker_kwargs={"resources": {"A": 1}}
)
async def test_no_worker_recovers(c, s, a):
s.periodic_callbacks["stealing"].stop()
b = await Worker(s.address, name="b")
f = c.submit(
inc, 1, workers=[a.address], allow_other_workers=True, resources={"A": 1}
)
g = c.submit(
inc, 2, resources={"A": 1}, workers=[a.address], allow_other_workers=True
)
await f
await g
assert f.key in a.tasks
assert g.key in a.tasks
assert f.key != g.key
h = c.submit(add, f, g, workers=[b.address])
await a.close()
x = await Worker(s.address, resources={"A": 1}, name="x")
res = await h
assert res == 5