diff --git a/distributed/actor.py b/distributed/actor.py index 0facdda4cb8..77b2cda67de 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -162,10 +162,17 @@ async def run_actor_function_on_worker(): await self._future else: raise OSError("Unable to contact Actor's worker") - return result["result"] + return result if self._asynchronous: - return asyncio.ensure_future(run_actor_function_on_worker()) + + async def unwrap(): + result = await run_actor_function_on_worker() + if result["status"] == "OK": + return result["result"] + raise result["exception"] + + return asyncio.ensure_future(unwrap()) else: # TODO: this mechanism is error prone # we should endeavor to make dask's standard code work here @@ -187,7 +194,10 @@ async def get_actor_attribute_from_worker(): x = await self._worker_rpc.actor_attribute( attribute=key, actor=self.key ) - return x["result"] + if x["status"] == "OK": + return x["result"] + else: + raise x["exception"] return self._sync(get_actor_attribute_from_worker) @@ -237,10 +247,16 @@ def __await__(self): def result(self, timeout=None): try: + if isinstance(self._cached_result, Exception): + raise self._cached_result return self._cached_result except AttributeError: - self._cached_result = self.q.get(timeout=timeout) - return self._cached_result + out = self.q.get(timeout=timeout) + if out["status"] == "OK": + self._cached_result = out["result"] + else: + self._cached_result = out["exception"] + return self.result() def __repr__(self): return "" diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index d2298c1c2c2..851ee7e8b2a 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -581,3 +581,48 @@ async def test_async_deadlock(client, s, a, b): ac2 = await client.submit(UsesCounter, actor=True, workers=[ac._address]) assert (await ac2.ado_inc(ac)) == 1 + + +def test_exception(): + class MyException(Exception): + pass + + class Broken: + def method(self): + raise MyException + + @property + def prop(self): + raise MyException + + with cluster(nworkers=2) as (cl, w): + client = Client(cl["address"]) + ac = client.submit(Broken, actor=True).result() + acfut = ac.method() + with pytest.raises(MyException): + acfut.result() + + with pytest.raises(MyException): + ac.prop + + +@gen_cluster(client=True) +async def test_exception_async(client, s, a, b): + class MyException(Exception): + pass + + class Broken: + def method(self): + raise MyException + + @property + def prop(self): + raise MyException + + ac = await client.submit(Broken, actor=True) + acfut = ac.method() + with pytest.raises(MyException): + await acfut + + with pytest.raises(MyException): + await ac.prop diff --git a/distributed/worker.py b/distributed/worker.py index e802af68c71..94bb351b4cc 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2830,30 +2830,36 @@ async def actor_execute( func = getattr(actor, function) name = key_split(key) + "." + function - if iscoroutinefunction(func): - result = await func(*args, **kwargs) - elif separate_thread: - result = await self.executor_submit( - name, - apply_function_actor, - args=( - func, - args, - kwargs, - self.execution_state, + try: + if iscoroutinefunction(func): + result = await func(*args, **kwargs) + elif separate_thread: + result = await self.executor_submit( name, - self.active_threads, - self.active_threads_lock, - ), - executor=self.executors["actor"], - ) - else: - result = func(*args, **kwargs) - return {"status": "OK", "result": to_serialize(result)} + apply_function_actor, + args=( + func, + args, + kwargs, + self.execution_state, + name, + self.active_threads, + self.active_threads_lock, + ), + executor=self.executors["actor"], + ) + else: + result = func(*args, **kwargs) + return {"status": "OK", "result": to_serialize(result)} + except Exception as ex: + return {"status": "error", "exception": to_serialize(ex)} def actor_attribute(self, comm=None, actor=None, attribute=None): - value = getattr(self.actors[actor], attribute) - return {"status": "OK", "result": to_serialize(value)} + try: + value = getattr(self.actors[actor], attribute) + return {"status": "OK", "result": to_serialize(value)} + except Exception as ex: + return {"status": "error", "exception": to_serialize(ex)} def meets_resource_constraints(self, key): ts = self.tasks[key]