diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8bcb6c9e..a784ac8b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,20 +12,13 @@ jobs: strategy: fail-fast: true matrix: - python: ["3.8", "3.12"] + python: ["3.9", "3.12"] os: [ubuntu-latest, macos-intel, macos-arm, windows-latest] include: - os: macos-intel runsOn: macos-13 - os: macos-arm runsOn: macos-14 - # macOS ARM 3.8 does not have an available Python build at - # https://raw.githubusercontent.com/actions/python-versions/main/versions-manifest.json. - # See https://github.com/actions/setup-python/issues/808 and - # https://github.com/actions/python-versions/pull/259. - exclude: - - os: macos-arm - python: "3.8" runs-on: ${{ matrix.runsOn || matrix.os }} steps: - name: Print build information @@ -39,7 +32,7 @@ jobs: # Using fixed Poetry version until # https://github.com/python-poetry/poetry/pull/7694 is fixed - run: python -m pip install --upgrade wheel "poetry==1.4.0" poethepoet - - run: poetry install --with pydantic --with dsl --with encryption + - run: poetry install --with pydantic --with dsl --with encryption --with trio_async - run: poe lint - run: mkdir junit-xml - run: poe test -s -o log_cli_level=DEBUG --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}.xml diff --git a/README.md b/README.md index 15ed5939..03f2a6cf 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ This is the set of Python samples for the [Python SDK](https://github.com/tempor Prerequisites: -* Python >= 3.8 +* Python >= 3.9 * [Poetry](https://python-poetry.org) * [Temporal CLI installed](https://docs.temporal.io/cli#install) * [Local Temporal server running](https://docs.temporal.io/cli/server#start-dev) @@ -72,6 +72,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models. * [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule. * [sentry](sentry) - Report errors to Sentry. +* [trio_async](trio_async) - Use asyncio Temporal in Trio-based environments. * [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers. * [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code. diff --git a/poetry.lock b/poetry.lock index 03ce9f96..9a9da81b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1806,6 +1806,20 @@ files = [ {file = "orjson-3.10.7.tar.gz", hash = "sha256:75ef0640403f945f3a1f9f6400686560dbfb0fb5b16589ad62cd477043c4eee3"}, ] +[[package]] +name = "outcome" +version = "1.3.0.post0" +description = "Capture the outcome of Python function calls." +optional = false +python-versions = ">=3.7" +files = [ + {file = "outcome-1.3.0.post0-py2.py3-none-any.whl", hash = "sha256:e771c5ce06d1415e356078d3bdd68523f284b4ce5419828922b6871e65eda82b"}, + {file = "outcome-1.3.0.post0.tar.gz", hash = "sha256:9dcf02e65f2971b80047b377468e72a268e15c0af3cf1238e6ff14f7f91143b8"}, +] + +[package.dependencies] +attrs = ">=19.2.0" + [[package]] name = "packaging" version = "23.2" @@ -2575,6 +2589,17 @@ files = [ {file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"}, ] +[[package]] +name = "sortedcontainers" +version = "2.4.0" +description = "Sorted Containers -- Sorted List, Sorted Dict, Sorted Set" +optional = false +python-versions = "*" +files = [ + {file = "sortedcontainers-2.4.0-py2.py3-none-any.whl", hash = "sha256:a163dcaede0f1c021485e957a39245190e74249897e2ae4b2aa38595db237ee0"}, + {file = "sortedcontainers-2.4.0.tar.gz", hash = "sha256:25caa5a06cc30b6b83d11423433f65d1f9d76c4c6a0c90e3379eaa43b9bfdb88"}, +] + [[package]] name = "sqlalchemy" version = "2.0.35" @@ -2805,6 +2830,44 @@ notebook = ["ipywidgets (>=6)"] slack = ["slack-sdk"] telegram = ["requests"] +[[package]] +name = "trio" +version = "0.28.0" +description = "A friendly Python library for async concurrency and I/O" +optional = false +python-versions = ">=3.9" +files = [ + {file = "trio-0.28.0-py3-none-any.whl", hash = "sha256:56d58977acc1635735a96581ec70513cc781b8b6decd299c487d3be2a721cd94"}, + {file = "trio-0.28.0.tar.gz", hash = "sha256:4e547896fe9e8a5658e54e4c7c5fa1db748cbbbaa7c965e7d40505b928c73c05"}, +] + +[package.dependencies] +attrs = ">=23.2.0" +cffi = {version = ">=1.14", markers = "os_name == \"nt\" and implementation_name != \"pypy\""} +exceptiongroup = {version = "*", markers = "python_version < \"3.11\""} +idna = "*" +outcome = "*" +sniffio = ">=1.3.0" +sortedcontainers = "*" + +[[package]] +name = "trio-asyncio" +version = "0.15.0" +description = "A re-implementation of the asyncio mainloop on top of Trio" +optional = false +python-versions = ">=3.8" +files = [ + {file = "trio_asyncio-0.15.0-py3-none-any.whl", hash = "sha256:7dad5a5edcc7c90c5b80b777dcaef11c22668ce7ddc374633068c2b35d683d62"}, + {file = "trio_asyncio-0.15.0.tar.gz", hash = "sha256:061e31a71fb039d5074f064ec868dc0e6759e6cca33bf3080733a20ee9667781"}, +] + +[package.dependencies] +exceptiongroup = {version = ">=1.0.0", markers = "python_version < \"3.11\""} +greenlet = "*" +outcome = "*" +sniffio = ">=1.3.0" +trio = ">=0.22.0" + [[package]] name = "types-protobuf" version = "5.28.0.20240924" @@ -3435,5 +3498,5 @@ testing = ["coverage[toml]", "zope.event", "zope.testing"] [metadata] lock-version = "2.0" -python-versions = "^3.8" -content-hash = "18ee8c512339f34e5cd8361b1057df06e6d45eedd70d6ff70b893f0229f4bb78" +python-versions = "^3.9" +content-hash = "3bcbba92814feab9cc46897967ef51d408271019eb12d3c940f2a0d91a690428" diff --git a/pyproject.toml b/pyproject.toml index 3b51617c..c7343507 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ packages = [ "Bug Tracker" = "https://github.com/temporalio/samples-python/issues" [tool.poetry.dependencies] -python = "^3.8" +python = "^3.9" temporalio = "^1.9.0" [tool.poetry.dev-dependencies] @@ -71,6 +71,10 @@ dependencies = { pydantic = "^1.10.4" } optional = true dependencies = { sentry-sdk = "^1.11.0" } +[tool.poetry.group.trio_async] +optional = true +dependencies = { trio = "^0.28.0", trio-asyncio = "^0.15.0" } + [tool.poe.tasks] format = [{cmd = "black ."}, {cmd = "isort ."}] lint = [{cmd = "black --check ."}, {cmd = "isort --check-only ."}, {ref = "lint-types" }] diff --git a/tests/trio_async/__init__.py b/tests/trio_async/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/trio_async/workflow_test.py b/tests/trio_async/workflow_test.py new file mode 100644 index 00000000..e6981bee --- /dev/null +++ b/tests/trio_async/workflow_test.py @@ -0,0 +1,40 @@ +import uuid + +import trio_asyncio +from temporalio.client import Client +from temporalio.worker import Worker + +from trio_async import activities, workflows + + +async def test_workflow_with_trio(client: Client): + @trio_asyncio.aio_as_trio + async def inside_trio(client: Client) -> list[str]: + # Create Trio thread executor + with trio_asyncio.TrioExecutor(max_workers=200) as thread_executor: + task_queue = f"tq-{uuid.uuid4()}" + # Run worker + async with Worker( + client, + task_queue=task_queue, + activities=[ + activities.say_hello_activity_async, + activities.say_hello_activity_sync, + ], + workflows=[workflows.SayHelloWorkflow], + activity_executor=thread_executor, + workflow_task_executor=thread_executor, + ): + # Run workflow and return result + return await client.execute_workflow( + workflows.SayHelloWorkflow.run, + "some-user", + id=f"wf-{uuid.uuid4()}", + task_queue=task_queue, + ) + + result = trio_asyncio.run(inside_trio, client) + assert result == [ + "Hello, some-user! (from asyncio)", + "Hello, some-user! (from thread)", + ] diff --git a/trio_async/README.md b/trio_async/README.md new file mode 100644 index 00000000..01bc2870 --- /dev/null +++ b/trio_async/README.md @@ -0,0 +1,23 @@ +# Trio Async Sample + +This sample shows how to use Temporal asyncio with [Trio](https://trio.readthedocs.io) using +[Trio asyncio](https://trio-asyncio.readthedocs.io). Specifically it demonstrates using a traditional Temporal client +and worker in a Trio setting, and how Trio-based code can run in both asyncio async activities and threaded sync +activities. + +For this sample, the optional `trio_async` dependency group must be included. To include, run: + + poetry install --with trio_async + +To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the +worker: + + poetry run python worker.py + +This will start the worker. Then, in another terminal, run the following to execute the workflow: + + poetry run python starter.py + +The starter should complete with: + + INFO:root:Workflow result: ['Hello, Temporal! (from asyncio)', 'Hello, Temporal! (from thread)'] \ No newline at end of file diff --git a/trio_async/__init__.py b/trio_async/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/trio_async/activities.py b/trio_async/activities.py new file mode 100644 index 00000000..c253c93e --- /dev/null +++ b/trio_async/activities.py @@ -0,0 +1,51 @@ +import asyncio +import time + +import trio +import trio_asyncio +from temporalio import activity + + +# An asyncio-based async activity +@activity.defn +async def say_hello_activity_async(name: str) -> str: + # Demonstrate a sleep in both asyncio and Trio, showing that both asyncio + # and Trio primitives can be used + + # First asyncio + activity.logger.info("Sleeping in asyncio") + await asyncio.sleep(0.1) + + # Now Trio. We have to invoke the function separately decorated. + # We cannot use the @trio_as_aio decorator on the activity itself because + # it doesn't use functools wrap or similar so it doesn't respond to things + # like __name__ that @activity.defn needs. + return await say_hello_in_trio_from_asyncio(name) + + +@trio_asyncio.trio_as_aio +async def say_hello_in_trio_from_asyncio(name: str) -> str: + activity.logger.info("Sleeping in Trio (from asyncio)") + await trio.sleep(0.1) + return f"Hello, {name}! (from asyncio)" + + +# A thread-based sync activity +@activity.defn +def say_hello_activity_sync(name: str) -> str: + # Demonstrate a sleep in both threaded and Trio, showing that both + # primitives can be used + + # First, thread-blocking + activity.logger.info("Sleeping normally") + time.sleep(0.1) + + # Now Trio. We have to use Trio's thread sync tools to run trio calls from + # a different thread. + return trio.from_thread.run(say_hello_in_trio_from_sync, name) + + +async def say_hello_in_trio_from_sync(name: str) -> str: + activity.logger.info("Sleeping in Trio (from thread)") + await trio.sleep(0.1) + return f"Hello, {name}! (from thread)" diff --git a/trio_async/starter.py b/trio_async/starter.py new file mode 100644 index 00000000..67f7568b --- /dev/null +++ b/trio_async/starter.py @@ -0,0 +1,28 @@ +import logging + +import trio_asyncio +from temporalio.client import Client + +from trio_async import workflows + + +@trio_asyncio.aio_as_trio # Note this decorator which allows asyncio primitives +async def main(): + logging.basicConfig(level=logging.INFO) + + # Connect client + client = await Client.connect("localhost:7233") + + # Execute the workflow + result = await client.execute_workflow( + workflows.SayHelloWorkflow.run, + "Temporal", + id=f"trio-async-workflow-id", + task_queue="trio-async-task-queue", + ) + logging.info(f"Workflow result: {result}") + + +if __name__ == "__main__": + # Note how we're using Trio event loop, not asyncio + trio_asyncio.run(main) diff --git a/trio_async/worker.py b/trio_async/worker.py new file mode 100644 index 00000000..29f059b4 --- /dev/null +++ b/trio_async/worker.py @@ -0,0 +1,66 @@ +import asyncio +import logging +import os +import sys + +import trio_asyncio +from temporalio.client import Client +from temporalio.worker import Worker + +from trio_async import activities, workflows + + +@trio_asyncio.aio_as_trio # Note this decorator which allows asyncio primitives +async def main(): + logging.basicConfig(level=logging.INFO) + + # Connect client + client = await Client.connect("localhost:7233") + + # Temporal runs threaded activities and workflow tasks via run_in_executor. + # Due to how trio_asyncio works, you can only do run_in_executor with their + # specific executor. We make sure to give it 200 max since we are using it + # for both activities and workflow tasks and by default the worker supports + # 100 max concurrent activity tasks and 100 max concurrent workflow tasks. + with trio_asyncio.TrioExecutor(max_workers=200) as thread_executor: + + # Run a worker for the workflow + async with Worker( + client, + task_queue="trio-async-task-queue", + activities=[ + activities.say_hello_activity_async, + activities.say_hello_activity_sync, + ], + workflows=[workflows.SayHelloWorkflow], + activity_executor=thread_executor, + workflow_task_executor=thread_executor, + ): + # Wait until interrupted + logging.info("Worker started, ctrl+c to exit") + try: + await asyncio.Future() + except asyncio.CancelledError: + # Ignore, happens on ctrl+C + pass + finally: + logging.info("Shutting down") + + +if __name__ == "__main__": + # Note how we're using Trio event loop, not asyncio + try: + trio_asyncio.run(main) + except KeyboardInterrupt: + # Ignore ctrl+c + pass + except BaseException as err: + # On Python 3.11+ Trio represents keyboard interrupt inside an exception + # group + is_interrupt = ( + sys.version_info >= (3, 11) + and isinstance(err, BaseExceptionGroup) + and err.subgroup(KeyboardInterrupt) + ) + if not is_interrupt: + raise diff --git a/trio_async/workflows.py b/trio_async/workflows.py new file mode 100644 index 00000000..adcb4761 --- /dev/null +++ b/trio_async/workflows.py @@ -0,0 +1,30 @@ +from datetime import timedelta + +from temporalio import workflow + +with workflow.unsafe.imports_passed_through(): + from trio_async.activities import say_hello_activity_async, say_hello_activity_sync + + +@workflow.defn +class SayHelloWorkflow: + @workflow.run + async def run(self, name: str) -> list[str]: + # Workflows don't use default asyncio event loop or Trio, they use a + # custom event loop. Therefore Trio primitives should never be used in a + # workflow, only asyncio helpers (which delegate to the custom loop). + return [ + # That these are two different activities for async or sync means + # nothing to the workflow, we just have both to demonstrate the + # activity side + await workflow.execute_activity( + say_hello_activity_async, + name, + start_to_close_timeout=timedelta(minutes=5), + ), + await workflow.execute_activity( + say_hello_activity_sync, + name, + start_to_close_timeout=timedelta(minutes=5), + ), + ]