From c845a6c958ce56fd595069a4c4a835ebf266942d Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 6 Nov 2020 17:22:26 -0500 Subject: [PATCH 01/11] Allow actors to call actors on the same worker --- distributed/actor.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/distributed/actor.py b/distributed/actor.py index dc49571d1db..db250a062a5 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -1,5 +1,6 @@ import asyncio import functools +from inspect import iscoroutinefunction import threading from queue import Queue @@ -118,13 +119,27 @@ def __dir__(self): return sorted(o) def __getattr__(self, key): - attr = getattr(self._cls, key) + print("###getattr", file=open("temp", "at")) if self._future and self._future.status not in ("finished", "pending"): raise ValueError( "Worker holding Actor was lost. Status: " + self._future.status ) + if self._worker and self._worker.address == self._address: + actor = self._worker.actors[self.key] + attr = getattr(actor, key) + + if iscoroutinefunction(attr): + return lambda *args, **kwargs: attr(*args, **kwargs) + + elif callable(attr): + return lambda *args, **kwargs: ActorFuture(None, None, result=attr(*args, **kwargs)) + else: + return attr + + attr = getattr(self._cls, key) + if callable(attr): @functools.wraps(attr) @@ -206,9 +221,14 @@ class ActorFuture: Actor """ - def __init__(self, q, io_loop): + def __init__(self, q, io_loop, result=None): self.q = q self.io_loop = io_loop + if result: + self._cached_result = result + + def __await__(self): + return self.result() def result(self, timeout=None): try: From 9081029041390112ddef666e76fa715e6d13c78a Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 9 Nov 2020 09:28:26 -0500 Subject: [PATCH 02/11] Make tests --- distributed/actor.py | 4 +++- distributed/tests/test_actor.py | 33 ++++++++++++++++++++++++++++++++- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/distributed/actor.py b/distributed/actor.py index db250a062a5..fea9ba6fef2 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -134,7 +134,9 @@ def __getattr__(self, key): return lambda *args, **kwargs: attr(*args, **kwargs) elif callable(attr): - return lambda *args, **kwargs: ActorFuture(None, None, result=attr(*args, **kwargs)) + return lambda *args, **kwargs: ActorFuture( + None, None, result=attr(*args, **kwargs) + ) else: return attr diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index 89233eaca24..76ac988783e 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -6,7 +6,7 @@ import dask from distributed import Actor, ActorFuture, Client, Future, wait, Nanny -from distributed.utils_test import gen_cluster +from distributed.utils_test import cluster, gen_cluster from distributed.utils_test import client, cluster_fixture, loop # noqa: F401 from distributed.metrics import time @@ -21,11 +21,25 @@ def increment(self): self.n += 1 return self.n + async def ainc(self): + self.n += 1 + return self.n + def add(self, x): self.n += x return self.n +class UsesCounter: + # An actor whose method argument is another actor + + def do_inc(self, ac): + return ac.increment().result() + + async def ado_inc(self, ac): + return await ac.ainc() + + class List: L = [] @@ -550,3 +564,20 @@ async def wait(self): await waiter.set() await c.gather(futures) + + +def test_one_thread_deadlock(): + with cluster(nworkers=2) as (cl, w): + client = Client(cl["address"]) + ac = client.submit(Counter, actor=True).result() + ac2 = client.submit(UsesCounter, actor=True, workers=[ac._address]).result() + + assert ac2.do_inc(ac).result() == 1 + + +@gen_cluster(client=True) +async def test_waiter(client, s, a, b): + ac = await client.submit(Counter, actor=True) + ac2 = await client.submit(UsesCounter, actor=True, workers=[ac._address]) + + assert (await ac2.ado_inc(ac)) == 1 From 8c1628cde2d72f881eb16a3c28e6d5a4a2ff71ae Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 9 Nov 2020 09:29:41 -0500 Subject: [PATCH 03/11] simplify --- distributed/actor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/actor.py b/distributed/actor.py index fea9ba6fef2..e2a09e8bc1c 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -131,7 +131,7 @@ def __getattr__(self, key): attr = getattr(actor, key) if iscoroutinefunction(attr): - return lambda *args, **kwargs: attr(*args, **kwargs) + return attr elif callable(attr): return lambda *args, **kwargs: ActorFuture( From 23269b8c449c61cbd2b19e461791883e4d4371d8 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 9 Nov 2020 09:46:25 -0500 Subject: [PATCH 04/11] Extra code path only for actors --- distributed/actor.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/distributed/actor.py b/distributed/actor.py index e2a09e8bc1c..ce2a0eece98 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -119,14 +119,18 @@ def __dir__(self): return sorted(o) def __getattr__(self, key): - print("###getattr", file=open("temp", "at")) if self._future and self._future.status not in ("finished", "pending"): raise ValueError( "Worker holding Actor was lost. Status: " + self._future.status ) - if self._worker and self._worker.address == self._address: + if ( + self._worker + and self._worker.address == self._address + and threading.current_thread().name.startswith("Dask-Actor-Threads") + ): + # actor calls actor on same worker actor = self._worker.actors[self.key] attr = getattr(actor, key) From 96a2e0283a2a6d7ae79231cb4e59288852a3abbd Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 9 Nov 2020 11:10:38 -0500 Subject: [PATCH 05/11] oops, rename --- distributed/tests/test_actor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index 76ac988783e..26421d0385c 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -576,7 +576,7 @@ def test_one_thread_deadlock(): @gen_cluster(client=True) -async def test_waiter(client, s, a, b): +async def test_async_deadlock(client, s, a, b): ac = await client.submit(Counter, actor=True) ac2 = await client.submit(UsesCounter, actor=True, workers=[ac._address]) From a3d0cffd2c0978c4f4d850d16f0ce84842df6e21 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 10 Nov 2020 09:18:37 -0500 Subject: [PATCH 06/11] Allow actor exception to propagate --- distributed/actor.py | 26 +++++++++++++---- distributed/tests/test_actor.py | 23 ++++++++++++++++ distributed/worker.py | 49 +++++++++++++++++++-------------- 3 files changed, 72 insertions(+), 26 deletions(-) diff --git a/distributed/actor.py b/distributed/actor.py index ce2a0eece98..32f8d4e2b61 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -163,10 +163,17 @@ async def run_actor_function_on_worker(): await self._future else: raise OSError("Unable to contact Actor's worker") - return result["result"] + return result if self._asynchronous: - return asyncio.ensure_future(run_actor_function_on_worker()) + + async def unwrap(): + result = await run_actor_function_on_worker() + if "result" in result: + return result["result"] + raise result["exception"] + + return asyncio.ensure_future(unwrap()) else: # TODO: this mechanism is error prone # we should endeavor to make dask's standard code work here @@ -188,7 +195,10 @@ async def get_actor_attribute_from_worker(): x = await self._worker_rpc.actor_attribute( attribute=key, actor=self.key ) - return x["result"] + if "result" in x: + return x["result"] + else: + raise x["exception"] return self._sync(get_actor_attribute_from_worker) @@ -238,10 +248,16 @@ def __await__(self): def result(self, timeout=None): try: + if isinstance(self._cached_result, Exception): + raise self._cached_result return self._cached_result except AttributeError: - self._cached_result = self.q.get(timeout=timeout) - return self._cached_result + out = self.q.get(timeout=timeout) + if "result" in out: + self._cached_result = out["result"] + else: + self._cached_result = out["exception"] + return self.result() def __repr__(self): return "" diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index 26421d0385c..309c1b3dbb2 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -581,3 +581,26 @@ async def test_async_deadlock(client, s, a, b): ac2 = await client.submit(UsesCounter, actor=True, workers=[ac._address]) assert (await ac2.ado_inc(ac)) == 1 + + +def test_exception(): + class MyException(Exception): + pass + + class Broken: + def method(self): + raise MyException + + @property + def prop(self): + raise MyException + + with cluster(nworkers=2) as (cl, w): + client = Client(cl["address"]) + ac = client.submit(Broken, actor=True).result() + acfut = ac.method() + with pytest.raises(MyException): + acfut.result() + + with pytest.raises(MyException): + ac.prop diff --git a/distributed/worker.py b/distributed/worker.py index b9bc9b63375..10f615a77ff 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -785,6 +785,7 @@ async def get_metrics(self): executing={ key: now - self.tasks[key].start_time for key in self.active_threads.values() + if key in self.tasks }, ) custom = {} @@ -2388,30 +2389,36 @@ async def actor_execute( func = getattr(actor, function) name = key_split(key) + "." + function - if iscoroutinefunction(func): - result = await func(*args, **kwargs) - elif separate_thread: - result = await self.executor_submit( - name, - apply_function_actor, - args=( - func, - args, - kwargs, - self.execution_state, + try: + if iscoroutinefunction(func): + result = await func(*args, **kwargs) + elif separate_thread: + result = await self.executor_submit( name, - self.active_threads, - self.active_threads_lock, - ), - executor=self.actor_executor, - ) - else: - result = func(*args, **kwargs) - return {"status": "OK", "result": to_serialize(result)} + apply_function_actor, + args=( + func, + args, + kwargs, + self.execution_state, + name, + self.active_threads, + self.active_threads_lock, + ), + executor=self.actor_executor, + ) + else: + result = func(*args, **kwargs) + return {"status": "OK", "result": to_serialize(result)} + except Exception as ex: + return {"status": "error", "exception": to_serialize(ex)} def actor_attribute(self, comm=None, actor=None, attribute=None): - value = getattr(self.actors[actor], attribute) - return {"status": "OK", "result": to_serialize(value)} + try: + value = getattr(self.actors[actor], attribute) + return {"status": "OK", "result": to_serialize(value)} + except Exception as ex: + return {"status": "error", "exception": to_serialize(ex)} def meets_resource_constraints(self, key): ts = self.tasks[key] From a1a2620dc3fb85b91ef875df735ed5fc5a045db7 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 7 Jun 2021 12:24:15 -0400 Subject: [PATCH 07/11] fix merge --- distributed/tests/test_actor.py | 1 - distributed/worker.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index dec18be94b5..dc1e19a73b7 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -471,7 +471,6 @@ def f(block, ps=None): print(format_time(end - start)) -@pytest.mark.flaky(reruns=10, reruns_delay=5) @gen_cluster(client=True) async def test_compute(c, s, a, b): @dask.delayed diff --git a/distributed/worker.py b/distributed/worker.py index 87f36abd0c5..180ee8a4725 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2685,7 +2685,7 @@ async def actor_execute( self.active_threads, self.active_threads_lock, ), - executor=self.actor_executor, + executor=self.executors["actor"], ) else: result = func(*args, **kwargs) From 1e6e929b51d0ae41e601e99c7539e51b44bd18b7 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 11 Jun 2021 09:38:13 -0400 Subject: [PATCH 08/11] resolve comments --- distributed/actor.py | 6 +++--- distributed/tests/test_actor.py | 22 ++++++++++++++++++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/distributed/actor.py b/distributed/actor.py index b40d336cfa3..77b2cda67de 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -168,7 +168,7 @@ async def run_actor_function_on_worker(): async def unwrap(): result = await run_actor_function_on_worker() - if "result" in result: + if result["status"] == "OK": return result["result"] raise result["exception"] @@ -194,7 +194,7 @@ async def get_actor_attribute_from_worker(): x = await self._worker_rpc.actor_attribute( attribute=key, actor=self.key ) - if "result" in x: + if x["status"] == "OK": return x["result"] else: raise x["exception"] @@ -252,7 +252,7 @@ def result(self, timeout=None): return self._cached_result except AttributeError: out = self.q.get(timeout=timeout) - if "result" in out: + if out["status"] == "OK": self._cached_result = out["result"] else: self._cached_result = out["exception"] diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index dc1e19a73b7..c7a86200c0f 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -609,3 +609,25 @@ def prop(self): with pytest.raises(MyException): ac.prop + + +@gen_cluster(client=True) +async def test_exception_async(client, s, a, b): + class MyException(Exception): + pass + + class Broken: + def method(self): + raise MyException + + @property + def prop(self): + raise MyException + + ac = await client.submit(Broken, actor=True) + acfut = ac.method() + with pytest.raises(MyException): + await acfut + + with pytest.raises(MyException): + await ac.prop From 95631d4cb6f2b8b6072e0a262a24a54c57871aeb Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 11 Jun 2021 15:13:55 -0400 Subject: [PATCH 09/11] simplify test_actor::test_compute --- distributed/tests/test_actor.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index c7a86200c0f..6a84233b547 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -9,6 +9,7 @@ from distributed import Actor, ActorFuture, Client, Future, Nanny, wait from distributed.metrics import time from distributed.utils_test import ( # noqa: F401 + async_wait_for, client, cluster, cluster_fixture, @@ -490,10 +491,7 @@ def check(counter, blanks): result = await c.compute(final, actors=counter) assert result == 0 + 1 + 2 + 3 + 4 - start = time() - while a.data or b.data: - await asyncio.sleep(0.01) - assert time() < start + 30 + await async_wait_for(lambda: a.data or b.data, timeout=10) def test_compute_sync(client): From 16fd6d733f376c5ce8d24ecc4c33a6a988727819 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Sun, 13 Jun 2021 13:10:24 -0400 Subject: [PATCH 10/11] Responses --- distributed/tests/test_actor.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index 67f4c9ea79b..8d178c05636 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -8,13 +8,7 @@ from distributed import Actor, ActorFuture, Client, Future, Nanny, wait from distributed.metrics import time -from distributed.utils_test import ( # noqa: F401 - async_wait_for, - cluster, - cluster_fixture, - gen_cluster, - loop, -) +from distributed.utils_test import cluster, gen_cluster class Counter: @@ -471,6 +465,7 @@ def f(block, ps=None): print(format_time(end - start)) +@pytest.mark.flaky(max_runs=10) @gen_cluster(client=True) async def test_compute(c, s, a, b): @dask.delayed @@ -490,7 +485,10 @@ def check(counter, blanks): result = await c.compute(final, actors=counter) assert result == 0 + 1 + 2 + 3 + 4 - await async_wait_for(lambda: a.data or b.data, timeout=10) + start = time() + while a.data or b.data: + await asyncio.sleep(0.01) + assert time() < start + 30 def test_compute_sync(client): From 28b6460f5cc029efbd99e81851aa3d0e59b88ecf Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 14 Jun 2021 11:52:10 -0400 Subject: [PATCH 11/11] Update distributed/tests/test_actor.py Co-authored-by: James Bourbeau --- distributed/tests/test_actor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index 8d178c05636..851ee7e8b2a 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -465,7 +465,7 @@ def f(block, ps=None): print(format_time(end - start)) -@pytest.mark.flaky(max_runs=10) +@pytest.mark.flaky(reruns=10, reruns_delay=5) @gen_cluster(client=True) async def test_compute(c, s, a, b): @dask.delayed