Skip to content

Conversation

@SultanOrazbayev
Copy link
Contributor

When defining priorities manually, somehow they are added with a negative sign. In the example below, last task should have the highest priority, but ends up with a lower priority (-4):

from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)

def test_func(i):
    return

futures = [client.submit(test_func, i, priority=i) for i in range(5)]

When defining priorities manually, somehow they are added with a negative sign. In the example below, last task should have the highest priority, but ends up with the lower priority (-5):

```python
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)

def test_func(i):
    return

futures = [client.submit(test_func, i, priority=i) for i in range(5)]
```
Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @SultanOrazbayev. IIUC the - is intentional as when the scheduler constructs task state transition recommendations we add tasks to the recommendations dictionary in reverse order of their priority

for ts in sorted(runnables, key=operator.attrgetter("priority"), reverse=True):
if ts.state == "released" and ts.run_spec:
recommendations[ts.key] = "waiting"

This is so the highest priority tasks, those with the largest priority= value in client.submit, will be added to the recommendations dict last. This is important because later on when we go to actually transition the tasks (e.g. to the "waiting" state) using recommendations.popitem

while recommendations:
key, finish = recommendations.popitem()
keys.add(key)
new = self.transition(key, finish)

higher priority tasks will be processed first.

@jrbourbeau
Copy link
Member

@SultanOrazbayev I should have asked, did you find tasks were being run in the opposite order you expected?

@SultanOrazbayev
Copy link
Contributor Author

SultanOrazbayev commented Oct 23, 2020

@SultanOrazbayev I should have asked, did you find tasks were being run in the opposite order you expected?

Thank you for the quick feedback, I noticed the reverse ordering in the code but got confused by the negative priority values in the task details of the dashboard. With regards to the task completion... I think the order of completion is not correct, but maybe I am doing it the wrong way. So, consider a modification of the above:

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=1, resources={'foo': 1})
client = Client(cluster)

def test_func(i):
    import time
    time.sleep(i)
    return i

futures = [client.submit(test_func, i, priority=i, resources={'foo': 1}) for i in range(5)]

This should complete the longest running task first, but in the dashboard I see that the completion of the first task is instant. Or, am I coding it wrong?

@mrocklin
Copy link
Member

mrocklin commented Oct 27, 2020 via email

@SultanOrazbayev
Copy link
Contributor Author

Probably the scheduler just got the first task first and so started running it right away. The scheduler then got the subsequent tasks a fraction of a millisecond later but the first task was already running.

OK, thanks! Just to make sure I understand it correctly, with futures it's not possible to affect the order of execution? Changing priority sign in the code below gives the same execution order (in the order of submission):

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=1, resources={'foo': 1})
client = Client(cluster)

# a small delay is added to check if the scheduler needs more time to accept all tasks before sorting them
def test_func(i):
    import time
    time.sleep(1+i)
    return i

# executed in the order of submission (priorities are in the same order)
futures = [client.submit(test_func, i, priority=-i, resources={'foo': 1}) for i in range(5)]

# also executed in the order of submission, even though priorities of later tasks are higher
futures = [client.submit(test_func, i, priority=i, resources={'foo': 1}) for i in range(5)]

@mrocklin
Copy link
Member

mrocklin commented Oct 27, 2020 via email

@SultanOrazbayev
Copy link
Contributor Author

Hmm, after removing resources, the order of execution is affected by the priorities (so things run as expected). Not sure if this is a feature.

# this executes in the order of priorities (long-running task first)
futures = [client.submit(test_func, i, priority=i) for i in range(5)]

@SultanOrazbayev
Copy link
Contributor Author

I got confused by the negative sign in the task dashboard, but right now priorities work as expected. The only problem I have is combining resources and priority tags, but this is more a matter of understanding how to use resources, so I opened a discussion here: #4209

@mrocklin
Copy link
Member

mrocklin commented Oct 31, 2020 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants