Skip to content
Merged
26 changes: 21 additions & 5 deletions distributed/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,17 @@ async def run_actor_function_on_worker():
await self._future
else:
raise OSError("Unable to contact Actor's worker")
return result["result"]
return result

if self._asynchronous:
return asyncio.ensure_future(run_actor_function_on_worker())

async def unwrap():
result = await run_actor_function_on_worker()
if result["status"] == "OK":
return result["result"]
raise result["exception"]

return asyncio.ensure_future(unwrap())
else:
# TODO: this mechanism is error prone
# we should endeavor to make dask's standard code work here
Expand All @@ -187,7 +194,10 @@ async def get_actor_attribute_from_worker():
x = await self._worker_rpc.actor_attribute(
attribute=key, actor=self.key
)
return x["result"]
if x["status"] == "OK":
return x["result"]
else:
raise x["exception"]

return self._sync(get_actor_attribute_from_worker)

Expand Down Expand Up @@ -237,10 +247,16 @@ def __await__(self):

def result(self, timeout=None):
try:
if isinstance(self._cached_result, Exception):
raise self._cached_result
return self._cached_result
except AttributeError:
self._cached_result = self.q.get(timeout=timeout)
return self._cached_result
out = self.q.get(timeout=timeout)
if out["status"] == "OK":
self._cached_result = out["result"]
else:
self._cached_result = out["exception"]
return self.result()

def __repr__(self):
return "<ActorFuture>"
45 changes: 45 additions & 0 deletions distributed/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,48 @@ async def test_async_deadlock(client, s, a, b):
ac2 = await client.submit(UsesCounter, actor=True, workers=[ac._address])

assert (await ac2.ado_inc(ac)) == 1


def test_exception():
class MyException(Exception):
pass

class Broken:
def method(self):
raise MyException

@property
def prop(self):
raise MyException

with cluster(nworkers=2) as (cl, w):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to use cluster here instead of @gen_cluster like most of the other test in this module?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was to test the sync API, which is more typical for actors. I added an async version immediately below (could remove this one, if you like).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was to test the sync API, which is more typical for actors. I added an async version immediately below (could remove this one, if you like).

To be clear here, the sync api is more common for everything. We prefer the async tests because they are faster/easier on CI and allow for greater debuggability. Adding sync tests too is fine if we want to be extra careful, but in general we prefer async tests. In general if async tests I usually have confidence that sync works just as well, unless I'm explicitly writing code to handle synchronization.

What's here is great though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's here is great though.

I don't mind keeping it or not. Writing the sync version first probably reflects how I initially tested by hand. It was some time ago, so I can't remember if there was any other reason, given that the async version is effectively identical and works just fine.

client = Client(cl["address"])
ac = client.submit(Broken, actor=True).result()
acfut = ac.method()
with pytest.raises(MyException):
acfut.result()

with pytest.raises(MyException):
ac.prop


@gen_cluster(client=True)
async def test_exception_async(client, s, a, b):
class MyException(Exception):
pass

class Broken:
def method(self):
raise MyException

@property
def prop(self):
raise MyException

ac = await client.submit(Broken, actor=True)
acfut = ac.method()
with pytest.raises(MyException):
await acfut

with pytest.raises(MyException):
await ac.prop
48 changes: 27 additions & 21 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2830,30 +2830,36 @@ async def actor_execute(
func = getattr(actor, function)
name = key_split(key) + "." + function

if iscoroutinefunction(func):
result = await func(*args, **kwargs)
elif separate_thread:
result = await self.executor_submit(
name,
apply_function_actor,
args=(
func,
args,
kwargs,
self.execution_state,
try:
if iscoroutinefunction(func):
result = await func(*args, **kwargs)
elif separate_thread:
result = await self.executor_submit(
name,
self.active_threads,
self.active_threads_lock,
),
executor=self.executors["actor"],
)
else:
result = func(*args, **kwargs)
return {"status": "OK", "result": to_serialize(result)}
apply_function_actor,
args=(
func,
args,
kwargs,
self.execution_state,
name,
self.active_threads,
self.active_threads_lock,
),
executor=self.executors["actor"],
)
else:
result = func(*args, **kwargs)
return {"status": "OK", "result": to_serialize(result)}
except Exception as ex:
return {"status": "error", "exception": to_serialize(ex)}

def actor_attribute(self, comm=None, actor=None, attribute=None):
value = getattr(self.actors[actor], attribute)
return {"status": "OK", "result": to_serialize(value)}
try:
value = getattr(self.actors[actor], attribute)
return {"status": "OK", "result": to_serialize(value)}
except Exception as ex:
return {"status": "error", "exception": to_serialize(ex)}

def meets_resource_constraints(self, key):
ts = self.tasks[key]
Expand Down