-
Notifications
You must be signed in to change notification settings - Fork 94
Change minimum Python to 3.9 and add Trio sample #162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| import uuid | ||
|
|
||
| import trio_asyncio | ||
| from temporalio.client import Client | ||
| from temporalio.worker import Worker | ||
|
|
||
| from trio_async import activities, workflows | ||
|
|
||
|
|
||
| async def test_workflow_with_trio(client: Client): | ||
| @trio_asyncio.aio_as_trio | ||
| async def inside_trio(client: Client) -> list[str]: | ||
| # Create Trio thread executor | ||
| with trio_asyncio.TrioExecutor(max_workers=200) as thread_executor: | ||
| task_queue = f"tq-{uuid.uuid4()}" | ||
| # Run worker | ||
| async with Worker( | ||
| client, | ||
| task_queue=task_queue, | ||
| activities=[ | ||
| activities.say_hello_activity_async, | ||
| activities.say_hello_activity_sync, | ||
| ], | ||
| workflows=[workflows.SayHelloWorkflow], | ||
| activity_executor=thread_executor, | ||
| workflow_task_executor=thread_executor, | ||
| ): | ||
| # Run workflow and return result | ||
| return await client.execute_workflow( | ||
| workflows.SayHelloWorkflow.run, | ||
| "some-user", | ||
| id=f"wf-{uuid.uuid4()}", | ||
| task_queue=task_queue, | ||
| ) | ||
|
|
||
| result = trio_asyncio.run(inside_trio, client) | ||
| assert result == [ | ||
| "Hello, some-user! (from asyncio)", | ||
| "Hello, some-user! (from thread)", | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| # Trio Async Sample | ||
|
|
||
| This sample shows how to use Temporal asyncio with [Trio](https://trio.readthedocs.io) using | ||
| [Trio asyncio](https://trio-asyncio.readthedocs.io). Specifically it demonstrates using a traditional Temporal client | ||
| and worker in a Trio setting, and how Trio-based code can run in both asyncio async activities and threaded sync | ||
| activities. | ||
|
|
||
| For this sample, the optional `trio_async` dependency group must be included. To include, run: | ||
|
|
||
| poetry install --with trio_async | ||
|
|
||
| To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the | ||
| worker: | ||
|
|
||
| poetry run python worker.py | ||
|
|
||
| This will start the worker. Then, in another terminal, run the following to execute the workflow: | ||
|
|
||
| poetry run python starter.py | ||
|
|
||
| The starter should complete with: | ||
|
|
||
| INFO:root:Workflow result: ['Hello, Temporal! (from asyncio)', 'Hello, Temporal! (from thread)'] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| import asyncio | ||
| import time | ||
|
|
||
| import trio | ||
| import trio_asyncio | ||
| from temporalio import activity | ||
|
|
||
|
|
||
| # An asyncio-based async activity | ||
| @activity.defn | ||
| async def say_hello_activity_async(name: str) -> str: | ||
| # Demonstrate a sleep in both asyncio and Trio, showing that both asyncio | ||
| # and Trio primitives can be used | ||
|
|
||
| # First asyncio | ||
| activity.logger.info("Sleeping in asyncio") | ||
| await asyncio.sleep(0.1) | ||
|
|
||
| # Now Trio. We have to invoke the function separately decorated. | ||
| # We cannot use the @trio_as_aio decorator on the activity itself because | ||
| # it doesn't use functools wrap or similar so it doesn't respond to things | ||
| # like __name__ that @activity.defn needs. | ||
| return await say_hello_in_trio_from_asyncio(name) | ||
|
|
||
|
|
||
| @trio_asyncio.trio_as_aio | ||
| async def say_hello_in_trio_from_asyncio(name: str) -> str: | ||
| activity.logger.info("Sleeping in Trio (from asyncio)") | ||
| await trio.sleep(0.1) | ||
| return f"Hello, {name}! (from asyncio)" | ||
|
|
||
|
|
||
| # A thread-based sync activity | ||
| @activity.defn | ||
| def say_hello_activity_sync(name: str) -> str: | ||
| # Demonstrate a sleep in both threaded and Trio, showing that both | ||
| # primitives can be used | ||
|
|
||
| # First, thread-blocking | ||
| activity.logger.info("Sleeping normally") | ||
| time.sleep(0.1) | ||
|
|
||
| # Now Trio. We have to use Trio's thread sync tools to run trio calls from | ||
| # a different thread. | ||
| return trio.from_thread.run(say_hello_in_trio_from_sync, name) | ||
|
|
||
|
|
||
| async def say_hello_in_trio_from_sync(name: str) -> str: | ||
| activity.logger.info("Sleeping in Trio (from thread)") | ||
| await trio.sleep(0.1) | ||
| return f"Hello, {name}! (from thread)" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| import logging | ||
|
|
||
| import trio_asyncio | ||
| from temporalio.client import Client | ||
|
|
||
| from trio_async import workflows | ||
|
|
||
|
|
||
| @trio_asyncio.aio_as_trio # Note this decorator which allows asyncio primitives | ||
| async def main(): | ||
| logging.basicConfig(level=logging.INFO) | ||
|
|
||
| # Connect client | ||
| client = await Client.connect("localhost:7233") | ||
|
|
||
| # Execute the workflow | ||
| result = await client.execute_workflow( | ||
| workflows.SayHelloWorkflow.run, | ||
| "Temporal", | ||
| id=f"trio-async-workflow-id", | ||
| task_queue="trio-async-task-queue", | ||
| ) | ||
| logging.info(f"Workflow result: {result}") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| # Note how we're using Trio event loop, not asyncio | ||
| trio_asyncio.run(main) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| import asyncio | ||
| import logging | ||
| import os | ||
| import sys | ||
|
|
||
| import trio_asyncio | ||
| from temporalio.client import Client | ||
| from temporalio.worker import Worker | ||
|
|
||
| from trio_async import activities, workflows | ||
|
|
||
|
|
||
| @trio_asyncio.aio_as_trio # Note this decorator which allows asyncio primitives | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As above, would be good to expand and clarify this. Are we saying
?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I can adjust. I do not believe it is true that it's anything from the asyncio module because I didn't go over everything in that module. Is there another way to phrase it? Or is what's there ok? |
||
| async def main(): | ||
| logging.basicConfig(level=logging.INFO) | ||
|
|
||
| # Connect client | ||
| client = await Client.connect("localhost:7233") | ||
|
|
||
| # Temporal runs threaded activities and workflow tasks via run_in_executor. | ||
| # Due to how trio_asyncio works, you can only do run_in_executor with their | ||
| # specific executor. We make sure to give it 200 max since we are using it | ||
| # for both activities and workflow tasks and by default the worker supports | ||
| # 100 max concurrent activity tasks and 100 max concurrent workflow tasks. | ||
| with trio_asyncio.TrioExecutor(max_workers=200) as thread_executor: | ||
|
|
||
| # Run a worker for the workflow | ||
| async with Worker( | ||
| client, | ||
| task_queue="trio-async-task-queue", | ||
| activities=[ | ||
| activities.say_hello_activity_async, | ||
| activities.say_hello_activity_sync, | ||
| ], | ||
| workflows=[workflows.SayHelloWorkflow], | ||
| activity_executor=thread_executor, | ||
| workflow_task_executor=thread_executor, | ||
| ): | ||
| # Wait until interrupted | ||
| logging.info("Worker started, ctrl+c to exit") | ||
| try: | ||
| await asyncio.Future() | ||
| except asyncio.CancelledError: | ||
| # Ignore, happens on ctrl+C | ||
| pass | ||
| finally: | ||
| logging.info("Shutting down") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| # Note how we're using Trio event loop, not asyncio | ||
| try: | ||
| trio_asyncio.run(main) | ||
| except KeyboardInterrupt: | ||
| # Ignore ctrl+c | ||
| pass | ||
| except BaseException as err: | ||
| # On Python 3.11+ Trio represents keyboard interrupt inside an exception | ||
| # group | ||
| is_interrupt = ( | ||
| sys.version_info >= (3, 11) | ||
| and isinstance(err, BaseExceptionGroup) | ||
| and err.subgroup(KeyboardInterrupt) | ||
| ) | ||
| if not is_interrupt: | ||
| raise | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you expand this comment slightly, are we basically saying
?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, "allows asyncio primitives" was meant to imply "needed to allow asyncio primitives", otherwise if it wasn't needed we could remove the decorator. I can rephrase if needed (though don't want to effectively re-document https://trio-asyncio.readthedocs.io/en/latest/usage.html#calling-asyncio-from-trio). I admit I didn't go over all of
asynciomodule to confirm you can use nothing without this, so not sure the statement about "anything" is correct.