diff --git a/src/content/docs/workers/languages/python/examples.mdx b/src/content/docs/workers/languages/python/examples.mdx index 47b8249680832ab..382c1b53802e505 100644 --- a/src/content/docs/workers/languages/python/examples.mdx +++ b/src/content/docs/workers/languages/python/examples.mdx @@ -213,20 +213,20 @@ from workers import WorkflowEntrypoint class MyWorkflow(WorkflowEntrypoint): async def run(self, event, step): - @step.do("dependency a") + @step.do() async def step_a(): # do some work return 10 - @step.do("dependency b") + @step.do() async def step_b(): # do some work return 20 - @step.do("my final step", depends=[step_a, step_b], concurrent=True) - async def my_final_step(result_a=0, result_b=0): + @step.do(concurrent=True) + async def my_final_step(step_a, step_b): # should return 30 - return result_a + result_b + return step_a + step_b await my_final_step() ``` diff --git a/src/content/docs/workflows/python/dag.mdx b/src/content/docs/workflows/python/dag.mdx index 8bdba984a0e6cf3..e5f081863718129 100644 --- a/src/content/docs/workflows/python/dag.mdx +++ b/src/content/docs/workflows/python/dag.mdx @@ -5,10 +5,10 @@ sidebar: order: 4 --- -The Python Workflows SDK supports DAG workflows in a declarative way, using the `step.do` decorator with the `depends` parameter to define dependencies (other steps that must complete before this step can run). +The Python Workflows SDK supports DAG workflows in a declarative way, using `step.do` and parameter names to define dependencies (other steps that must complete before a step can run). ```python -from workers import Response, WorkflowEntrypoint +from workers import Response, WorkflowEntrypoint, WorkerEntrypoint class PythonWorkflowStarter(WorkflowEntrypoint): async def run(self, event, step): @@ -20,30 +20,39 @@ class PythonWorkflowStarter(WorkflowEntrypoint): await step.sleep('demo sleep', '10 seconds') - @step.do('dependency1') + @step.do() async def dep_1(): # does stuff print('executing dep1') + return 'dep1' - @step.do('dependency2') + @step.do() async def dep_2(): # does stuff print('executing dep2') + return 'dep2' - @step.do('demo do', depends=[dep_1, dep_2], concurrent=True) - async def final_step(res1, res2): + @step.do(concurrent=True) + async def final_step(dep_1, dep_2): # does stuff - print('something') + print(f'{dep_1} {dep_2}') await await_step(final_step) -async def on_fetch(request, env): - await env.MY_WORKFLOW.create() - return Response("Hello world!") +class Default(WorkerEntrypoint): + async def fetch(self, request): + await self.env.MY_WORKFLOW.create() + return Response("Hello world!") ``` -On this example, `dep_1` and `dep_2` are run concurrently before execution of `final_step`, which depends on both of them. +In this example, `dep_1` and `dep_2` are run concurrently before execution of `final_step`, which depends on both of them. -Having `concurrent=True` allows the dependencies to be resolved concurrently. If one of the callables passed to `depends` has already completed, it will be skipped and its return value will be reused. +Having `concurrent=True` allows dependencies to be resolved concurrently. If a dependency has already completed, it will be skipped and its return value will be reused. + +:::note + +Older compatibility behavior supports explicit dependency lists with `depends=[...]`. For new workflows, use parameter names to express step dependencies. + +::: This pattern is useful for diamond shaped workflows, where a step depends on two or more other steps that can run concurrently. diff --git a/src/content/docs/workflows/python/index.mdx b/src/content/docs/workflows/python/index.mdx index 023ddaad0db9721..0b9f09fd8f64bae 100644 --- a/src/content/docs/workflows/python/index.mdx +++ b/src/content/docs/workflows/python/index.mdx @@ -31,7 +31,7 @@ class MyWorkflow(WorkflowEntrypoint): For example, a Workflow may be defined as: ```python -from workers import Response, WorkflowEntrypoint +from workers import Response, WorkflowEntrypoint, WorkerEntrypoint class PythonWorkflowStarter(WorkflowEntrypoint): async def run(self, event, step): @@ -49,9 +49,10 @@ class PythonWorkflowStarter(WorkflowEntrypoint): await step_1() await step_2() -async def on_fetch(request, env): - await env.MY_WORKFLOW.create() - return Response("Hello world!") +class Default(WorkerEntrypoint): + async def fetch(self, request): + await self.env.MY_WORKFLOW.create() + return Response("Hello world!") ``` You must add both `python_workflows` and `python_workers` compatibility flags to your Wrangler configuration file. @@ -64,7 +65,6 @@ You must add both `python_workflows` and `python_workers` compatibility flags to "main": "src/entry.py", "compatibility_flags": [ "python_workers", - "experimental", "python_workflows" ], "compatibility_date": "$today", diff --git a/src/content/docs/workflows/python/python-workers-api.mdx b/src/content/docs/workflows/python/python-workers-api.mdx index 3573cb02df6b591..aded391f4657b58 100644 --- a/src/content/docs/workflows/python/python-workers-api.mdx +++ b/src/content/docs/workflows/python/python-workers-api.mdx @@ -21,18 +21,30 @@ class MyWorkflow(WorkflowEntrypoint): ## WorkflowStep -* step.do(name, depends=[], concurrent=False, config=None) is a decorator that allows you to define a step in a workflow. - * `name` - the name of the step. - * `depends` - an optional list of steps that must complete before this step can run. See [DAG Workflows](/workflows/python/dag). - * `concurrent` - an optional boolean that indicates whether this step can run concurrently with other steps. - * `config` - an optional [`WorkflowStepConfig`](/workflows/build/workers-api/#workflowstepconfig) for configuring [step specific retry behaviour](/workflows/build/sleeping-and-retrying/). This is passed as a Python dictionary and then type translated into a `WorkflowStepConfig` object. +{/* prettier-ignore */} +- step.do(name=None, *, concurrent=False, config=None) — a decorator that allows you to define a step in a workflow. + - `name` — an optional name for the step. If omitted, the function name (`func.__name__`) is used. + - `concurrent` — an optional boolean that indicates whether dependencies for this step can run concurrently. + - `config` — an optional [`WorkflowStepConfig`](/workflows/build/workers-api/#workflowstepconfig) for configuring [step specific retry behaviour](/workflows/build/sleeping-and-retrying/). This is passed as a Python dictionary and then type translated into a `WorkflowStepConfig` object. + + All parameters except `name` are keyword-only. + +Dependencies are resolved implicitly by parameter name. If a step function parameter name matches a previously declared step function, its result is injected into the step. + +If you define a `ctx` parameter, the step context is injected into that argument. + +:::note + +Older compatibility behavior supports explicit dependency lists with `depends=[...]`. For new workflows, prefer implicit dependency resolution by parameter name. + +::: ```python from workers import WorkflowEntrypoint class MyWorkflow(WorkflowEntrypoint): async def run(self, event, step): - @step.do("my first step") + @step.do() async def my_first_step(): # do some work return "Hello World!" @@ -45,20 +57,20 @@ Note that the decorator doesn't make the call to the step, it just returns a cal When returning state from a step, you must make sure that the returned value is serializable. Since steps run through an FFI layer, the returned value gets type translated via [FFI.](https://pyodide.org/en/stable/usage/api/python-api/ffi.html#pyodide.ffi.to_js) Refer to [Pyodide's documentation](https://pyodide.org/en/stable/usage/type-conversions.html#type-translations-pyproxy-to-js) regarding type conversions for more information. -* step.sleep(name, duration) - - * `name` - the name of the step. - * `duration` - the duration to sleep until, in either seconds or as a `WorkflowDuration` compatible string. +{/* prettier-ignore */} +- step.sleep(name, duration) + - `name` — the name of the step. + - `duration` — the duration to sleep until, in either seconds or as a `WorkflowDuration` compatible string. ```python async def run(self, event, step): await step.sleep("my-sleep-step", "10 seconds") ``` -* step.sleep_until(name, timestamp) - - * `name` - the name of the step. - * `timestamp` - a `datetime.datetime` object or seconds from the Unix epoch to sleep the workflow instance until. +{/* prettier-ignore */} +- step.sleep_until(name, timestamp) + - `name` — the name of the step. + - `timestamp` — a `datetime.datetime` object or seconds from the Unix epoch to sleep the workflow instance until. ```python import datetime @@ -67,11 +79,11 @@ async def run(self, event, step): await step.sleep_until("my-sleep-step", datetime.datetime.now() + datetime.timedelta(seconds=10)) ``` -* step.wait_for_event(name, event_type, timeout="24 hours") - - * `name` - the name of the step. - * `event_type` - the type of event to wait for. - * `timeout` - the timeout for the `wait_for_event` call. The default timeout is 24 hours. +{/* prettier-ignore */} +- step.wait_for_event(name, event_type, timeout="24 hours") + - `name` — the name of the step. + - `event_type` — the type of event to wait for. + - `timeout` — the timeout for the `wait_for_event` call. The default timeout is 24 hours. ```python async def run(self, event, step): @@ -82,10 +94,10 @@ async def run(self, event, step): The `event` parameter is a dictionary that contains the payload passed to the workflow instance, along with other metadata: -* payload - the payload passed to the workflow instance. -* timestamp - the timestamp that the workflow was triggered. -* instanceId - the ID of the current workflow instance. -* workflowName - the name of the workflow. +- payload - the payload passed to the workflow instance. +- timestamp - the timestamp that the workflow was triggered. +- instanceId - the ID of the current workflow instance. +- workflowName - the name of the workflow. ## Error Handling @@ -141,6 +153,20 @@ class DemoWorkflowClass(WorkflowEntrypoint): pass ``` +### Access step context (`ctx`) + +```python +from workers import WorkflowEntrypoint + +class CtxWorkflow(WorkflowEntrypoint): + async def run(self, event, step): + @step.do() + async def read_context(ctx): + return ctx["attempt"] + + return await read_context() +``` + ### Create an instance via binding Note that `env` is a JavaScript object exposed to the Python script via [JsProxy](https://pyodide.org/en/stable/usage/api/python-api/ffi.html#pyodide.ffi.JsProxy). You can access the binding like you would on a JavaScript worker. Refer to the [Workflow binding documentation](/workflows/build/workers-api/#workflow) to learn more about the methods available.