From 27cc1922743c4c202f32dbe7eca14388f84407ed Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Mon, 1 May 2023 13:40:08 -0700 Subject: [PATCH 1/4] Add adaptive.utils.daskify --- adaptive/utils.py | 48 ++++++++++++++- .../tutorial/tutorial.advanced-topics.md | 59 +++++++++++++++++-- 2 files changed, 100 insertions(+), 7 deletions(-) diff --git a/adaptive/utils.py b/adaptive/utils.py index 7b2826a38..ff80f62f1 100644 --- a/adaptive/utils.py +++ b/adaptive/utils.py @@ -7,13 +7,17 @@ import os import pickle import warnings -from collections.abc import Iterator, Sequence +from collections.abc import Awaitable, Iterator, Sequence from contextlib import contextmanager +from functools import wraps from itertools import product -from typing import Any, Callable +from typing import TYPE_CHECKING, Any, Callable, TypeVar import cloudpickle +if TYPE_CHECKING: + from dask.distributed import Client as AsyncDaskClient + def named_product(**items: Sequence[Any]): names = items.keys() @@ -161,3 +165,43 @@ def map(self, fn, *iterable, timeout=None, chunksize=1): def shutdown(self, wait=True): pass + + +def _cache_key(args: tuple[Any], kwargs: dict[str, Any]) -> str: + arg_strings = [str(a) for a in args] + kwarg_strings = [f"{k}={v}" for k, v in sorted(kwargs.items())] + return "_".join(arg_strings + kwarg_strings) + + +T = TypeVar("T") + + +def daskify( + client: AsyncDaskClient, cache: bool = False +) -> Callable[[Callable[..., T]], Callable[..., Awaitable[T]]]: + from dask import delayed + + def _daskify(func: Callable[..., T]) -> Callable[..., Awaitable[T]]: + if cache: + func.cache = {} # type: ignore[attr-defined] + + delayed_func = delayed(func) + + @wraps(func) + async def wrapper(*args: Any, **kwargs: Any) -> T: + if cache: + key = _cache_key(args, kwargs) # type: ignore[arg-type] + future = func.cache.get(key) # type: ignore[attr-defined] + + if future is None: + future = client.compute(delayed_func(*args, **kwargs)) + func.cache[key] = future # type: ignore[attr-defined] + else: + future = client.compute(delayed_func(*args, **kwargs)) + + result = await future + return result + + return wrapper + + return _daskify diff --git a/docs/source/tutorial/tutorial.advanced-topics.md b/docs/source/tutorial/tutorial.advanced-topics.md index 2dfc6cf29..93e83f5ab 100644 --- a/docs/source/tutorial/tutorial.advanced-topics.md +++ b/docs/source/tutorial/tutorial.advanced-topics.md @@ -372,15 +372,15 @@ Adaptive by itself does not implement a way of sharing partial results between f Instead its implementation of parallel computation using executors is minimal by design. The appropriate way to implement custom parallelization is by using coroutines (asynchronous functions). + We illustrate this approach by using `dask.distributed` for parallel computations in part because it supports asynchronous operation out-of-the-box. -Let us consider a function `f(x)` which is composed by two parts: -a slow part `g` which can be reused by multiple inputs and shared across function evaluations and a fast part `h` that will be computed for every `x`. +We will focus on a function `f(x)` that consists of two distinct components: a slow part `g` that can be reused across multiple inputs and shared among various function evaluations, and a fast part `h` that is calculated for each `x` value. ```{code-cell} ipython3 import time -def f(x): +def f(x): # example function without caching """ Integer part of `x` repeats and should be reused Decimal part requires a new computation @@ -399,12 +399,61 @@ def h(x): return x**3 ``` +### Using `adaptive.utils.daskify` + +To simplify the process of using coroutines and caching with dask and Adaptive, we provide the {func}`adaptive.utils.daskify` decorator. This decorator can be used to parallelize functions with caching as well as functions without caching, making it a powerful tool for custom parallelization in Adaptive. + +```{code-cell} ipython3 +import time + +from dask.distributed import Client + +import adaptive + +client = await Client(asynchronous=True) + + +# The g function has caching enabled +g = adaptive.utils.daskify(client, cache=True)(g) + +# Can be used like a decorator too: +# >>> @adaptive.utils.daskify(client, cache=True) +# ... def g(x): ... + +# The h function does not use caching +h = adaptive.utils.daskify(client)(h) + +# Now we need to rewrite `f(x)` to use `g` and `h` as coroutines + + +async def f_parallel(x): + g_result = await g(int(x)) + h_result = await h(x % 1) + return (g_result + h_result) ** 2 + + +learner = adaptive.Learner1D(f_parallel, bounds=(-3.5, 3.5)) +runner = adaptive.AsyncRunner(learner, loss_goal=0.01, ntasks=20) +runner.live_info() +``` + +Finally, we wait for the runner to finish, and then plot the result. + +```{code-cell} ipython3 +await runner.task +learner.plot() +``` + +## Step-by-step explanation of custom parallelization + +Now let's dive into a detailed explanation of the process to understand how the {func}`adaptive.utils.daskify` decorator works. + In order to combine reuse of values of `g` with adaptive, we need to convert `f` into a dask graph by using `dask.delayed`. ```{code-cell} ipython3 from dask import delayed -# Convert g and h to dask.Delayed objects +# Convert g and h to dask.Delayed objects, such that they run in the Client g, h = delayed(g), delayed(h) @@ -441,7 +490,7 @@ learner = adaptive.Learner1D(f_parallel, bounds=(-3.5, 3.5)) runner = adaptive.AsyncRunner(learner, loss_goal=0.01, ntasks=20) ``` -Finally we await for the runner to finish, and then plot the result. +Finally we wait for the runner to finish, and then plot the result. ```{code-cell} ipython3 await runner.task From fb4b7593de4c9e179686ce84db97bc063e2a74bf Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Mon, 1 May 2023 14:25:57 -0700 Subject: [PATCH 2/4] Do not overwrite variables g, h --- .../source/tutorial/tutorial.advanced-topics.md | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/docs/source/tutorial/tutorial.advanced-topics.md b/docs/source/tutorial/tutorial.advanced-topics.md index 93e83f5ab..50a55d616 100644 --- a/docs/source/tutorial/tutorial.advanced-topics.md +++ b/docs/source/tutorial/tutorial.advanced-topics.md @@ -377,9 +377,6 @@ We illustrate this approach by using `dask.distributed` for parallel computation We will focus on a function `f(x)` that consists of two distinct components: a slow part `g` that can be reused across multiple inputs and shared among various function evaluations, and a fast part `h` that is calculated for each `x` value. ```{code-cell} ipython3 -import time - - def f(x): # example function without caching """ Integer part of `x` repeats and should be reused @@ -390,7 +387,9 @@ def f(x): # example function without caching def g(x): """Slow but reusable function""" - time.sleep(random.randrange(5)) + from time import sleep + + sleep(random.randrange(5)) return x**2 @@ -404,8 +403,6 @@ def h(x): To simplify the process of using coroutines and caching with dask and Adaptive, we provide the {func}`adaptive.utils.daskify` decorator. This decorator can be used to parallelize functions with caching as well as functions without caching, making it a powerful tool for custom parallelization in Adaptive. ```{code-cell} ipython3 -import time - from dask.distributed import Client import adaptive @@ -414,21 +411,21 @@ client = await Client(asynchronous=True) # The g function has caching enabled -g = adaptive.utils.daskify(client, cache=True)(g) +g_dask = adaptive.utils.daskify(client, cache=True)(g) # Can be used like a decorator too: # >>> @adaptive.utils.daskify(client, cache=True) # ... def g(x): ... # The h function does not use caching -h = adaptive.utils.daskify(client)(h) +h_dask = adaptive.utils.daskify(client)(h) # Now we need to rewrite `f(x)` to use `g` and `h` as coroutines async def f_parallel(x): - g_result = await g(int(x)) - h_result = await h(x % 1) + g_result = await g_dask(int(x)) + h_result = await h_dask(x % 1) return (g_result + h_result) ** 2 From 01836e8d198893cf919ff23dbded85d185942a58 Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Mon, 1 May 2023 15:16:05 -0700 Subject: [PATCH 3/4] Fix header level --- docs/source/tutorial/tutorial.advanced-topics.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/tutorial/tutorial.advanced-topics.md b/docs/source/tutorial/tutorial.advanced-topics.md index 50a55d616..955fad3d0 100644 --- a/docs/source/tutorial/tutorial.advanced-topics.md +++ b/docs/source/tutorial/tutorial.advanced-topics.md @@ -441,7 +441,7 @@ await runner.task learner.plot() ``` -## Step-by-step explanation of custom parallelization +### Step-by-step explanation of custom parallelization Now let's dive into a detailed explanation of the process to understand how the {func}`adaptive.utils.daskify` decorator works. From f8367015523df63ed3ddc5d39bfdb0093fbcc8be Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Mon, 1 May 2023 15:22:00 -0700 Subject: [PATCH 4/4] Add link to TutorialAdvancedTopics --- docs/source/tutorial/tutorial.advanced-topics.md | 4 ++-- docs/source/tutorial/tutorial.parallelism.md | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/source/tutorial/tutorial.advanced-topics.md b/docs/source/tutorial/tutorial.advanced-topics.md index 955fad3d0..8ba0caf40 100644 --- a/docs/source/tutorial/tutorial.advanced-topics.md +++ b/docs/source/tutorial/tutorial.advanced-topics.md @@ -9,7 +9,7 @@ kernelspec: display_name: python3 name: python3 --- - +(TutorialAdvancedTopics)= # Advanced Topics ```{note} @@ -365,7 +365,7 @@ await runner.task # This is not needed in a notebook environment! # The result will only be set when the runner is done. timer.result() ``` - +(CustomParallelization)= ## Custom parallelization using coroutines Adaptive by itself does not implement a way of sharing partial results between function executions. diff --git a/docs/source/tutorial/tutorial.parallelism.md b/docs/source/tutorial/tutorial.parallelism.md index ef0963a3a..6b91f1266 100644 --- a/docs/source/tutorial/tutorial.parallelism.md +++ b/docs/source/tutorial/tutorial.parallelism.md @@ -57,6 +57,8 @@ runner.live_info() runner.live_plot(update_interval=0.1) ``` +Also check out the {ref}`Custom parallelization` section in the {ref}`advanced topics tutorial` for more control over caching and parallelization. + ## `mpi4py.futures.MPIPoolExecutor` This makes sense if you want to run a `Learner` on a cluster non-interactively using a job script.