Skip to content

Conversation

@hayesgb
Copy link

@hayesgb hayesgb commented Sep 10, 2022

Using the reproducer from @adbreind in #6368 as a starting point for:

from distributed import Client, LocalCluster
from time import time
import numpy as np


def run_test():
    runtime = []
    def foo():
        return [1.5] * 1_000_000

    # with LocalCluster(n_workers=2, threads_per_worker=1, memory_limit='8GiB') as cluster:
    for i in range(5):
        with Client() as client:
            s = time()
            res = client.submit(foo).result()
            runtime.append(time() - s)
    print(f"Run time (in seconds) for 5 runs is: {runtime}, and mean runtime:  {np.mean(runtime)} seconds")

if __name__ == "__main__":
    run_test()

On current main, I get:
Run time (in seconds) for 5 runs is: [13.804176807403564, 13.784174680709839, 13.835507869720459, 13.706598997116089, 13.749552011489868], and mean runtime: 13.776002073287964 seconds

While on this branch, I get:
Run time (in seconds) for 5 runs is: [0.15462398529052734, 0.1298370361328125, 0.1314990520477295, 0.13030385971069336, 0.12935090065002441], and mean runtime: 0.13512296676635743 seconds

What is happening?
When serializing collections, we prefer to use pickle and recurse into the collections, serializing each object in the collection separately. This decision was motivated by Blockwise-IO work as described by @rjzamora. While it makes sense, it also has the unfortunate consequence of making it expensive to serialize collections in general.

Here we create a Dispatch() method for lists that converts a list to a numpy array, which can then be serialized. We add infer_if_recurse_to_serialize_list. Now that lists can be serialized recursively using pickle, or with dask_serialize, we offload the decision about whether to iterate_collection toinfer_if_recurse_to_serialize_list.

We also need to handle the case where a Serialize object must itself be serialized. To handle this, we add an iterate_collection attribute.

@GPUtester
Copy link
Collaborator

Can one of the admins verify this patch?

Admins can comment ok to test to allow this one PR to run or add to allowlist to allow all future PRs from the same author to run.

@hayesgb hayesgb marked this pull request as draft September 10, 2022 00:24
@hayesgb hayesgb changed the title [WIP] Dispatch some lists Serialize some lists using Dispatch Sep 10, 2022
@github-actions
Copy link
Contributor

github-actions bot commented Sep 10, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±    0         15 suites  ±0   6h 20m 2s ⏱️ + 4m 47s
  3 117 tests +  16    3 031 ✔️ +  17    84 💤  - 1  2 ±0 
23 066 runs  +112  22 163 ✔️ +115  899 💤  - 5  4 +2 

For more details on these failures, see this check.

Results for commit 886108d. ± Comparison against base commit 1fd07f0.

♻️ This comment has been updated with latest results.

@hayesgb
Copy link
Author

hayesgb commented Sep 12, 2022

cc: @madsbk -- Wondering if you would mind taking a look at this PR. Also interested in your thoughts on dropping the requirement to use pickle with protocol=4. xref : rapidsai/dask-cuda#746

@hayesgb
Copy link
Author

hayesgb commented Sep 12, 2022

cc: @ian-r-rose

Copy link
Contributor

@madsbk madsbk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hayesgb, overall I think this is a good idea but we have to make sure that we are not serializing anything need by the scheduler such as task graph keys.

Comment on lines 211 to 215
if is_dask_collection(first_val) or typename(type(first_val)) not in [
"str",
"int",
"float",
]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if is_dask_collection(first_val) or typename(type(first_val)) not in [
"str",
"int",
"float",
]:
if is_dask_collection(first_val) or isinstance(first_val, (str, int, float)):

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to use isinstance(first_val, (str, int, float)) returns test failures in shuffle.

Additionally, this seems more consistent with the convention of using type(x), which is the preferred approach in serialize.py. I could see replacing with type(x) in [int, float] if that makes more sense...

Thoughts?

Copy link
Contributor

@madsbk madsbk Sep 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me a bit nervous.
AFAIK, the use of type(x) is only here to avoid sub-classes of list, set, tuple, dict to be converted to their base-class by msgpack. By disabling iterate_collection and use Dispatch on the while collection, we make sure to preserve the sub-classes.
Do you have a reproducer for the shuffle failing?

PS: I think this is another good reason why we need to re-design and simplify the serialization in Dask.

Comment on lines +355 to +357
and iterate_collection is True
or type(x) is dict
and iterate_collection
and iterate_collection is True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for this semantically?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing other than readability.

@hayesgb
Copy link
Author

hayesgb commented Sep 14, 2022

@madsbk -- Can we drop the requirement to use protocol=4 now that distributed requires python>=3.8?

@hayesgb hayesgb marked this pull request as ready for review September 14, 2022 18:46
Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @rjzamora @jakirkham if either of you get a chance to look at this. FYI for others, NVIDIA folks are off Sep 15-16 for a company-wide holiday, so it may be until next week until pings are seen

@madsbk
Copy link
Contributor

madsbk commented Sep 15, 2022

@madsbk -- Can we drop the requirement to use protocol=4 now that distributed requires python>=3.8?

Yes

hayesgb and others added 2 commits September 15, 2022 05:54
Co-authored-by: Mads R. B. Kristensen <madsbk@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Future submit()/result() takes very long time to complete with 8MB Python object

5 participants