Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/content/docs/workers/languages/python/examples.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -213,20 +213,20 @@ from workers import WorkflowEntrypoint

class MyWorkflow(WorkflowEntrypoint):
async def run(self, event, step):
@step.do("dependency a")
@step.do()
async def step_a():
# do some work
return 10

@step.do("dependency b")
@step.do()
async def step_b():
# do some work
return 20

@step.do("my final step", depends=[step_a, step_b], concurrent=True)
async def my_final_step(result_a=0, result_b=0):
@step.do(concurrent=True)
async def my_final_step(step_a, step_b):
# should return 30
return result_a + result_b
return step_a + step_b

await my_final_step()
```
Expand Down
33 changes: 21 additions & 12 deletions src/content/docs/workflows/python/dag.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ sidebar:
order: 4
---

The Python Workflows SDK supports DAG workflows in a declarative way, using the `step.do` decorator with the `depends` parameter to define dependencies (other steps that must complete before this step can run).
The Python Workflows SDK supports DAG workflows in a declarative way, using `step.do` and parameter names to define dependencies (other steps that must complete before a step can run).

```python
from workers import Response, WorkflowEntrypoint
from workers import Response, WorkflowEntrypoint, WorkerEntrypoint

class PythonWorkflowStarter(WorkflowEntrypoint):
async def run(self, event, step):
Expand All @@ -20,30 +20,39 @@ class PythonWorkflowStarter(WorkflowEntrypoint):

await step.sleep('demo sleep', '10 seconds')

@step.do('dependency1')
@step.do()
async def dep_1():
# does stuff
print('executing dep1')
return 'dep1'

@step.do('dependency2')
@step.do()
async def dep_2():
# does stuff
print('executing dep2')
return 'dep2'

@step.do('demo do', depends=[dep_1, dep_2], concurrent=True)
async def final_step(res1, res2):
@step.do(concurrent=True)
async def final_step(dep_1, dep_2):
# does stuff
print('something')
print(f'{dep_1} {dep_2}')

await await_step(final_step)

async def on_fetch(request, env):
await env.MY_WORKFLOW.create()
return Response("Hello world!")
class Default(WorkerEntrypoint):
async def fetch(self, request):
await self.env.MY_WORKFLOW.create()
return Response("Hello world!")
```

On this example, `dep_1` and `dep_2` are run concurrently before execution of `final_step`, which depends on both of them.
In this example, `dep_1` and `dep_2` are run concurrently before execution of `final_step`, which depends on both of them.

Having `concurrent=True` allows the dependencies to be resolved concurrently. If one of the callables passed to `depends` has already completed, it will be skipped and its return value will be reused.
Having `concurrent=True` allows dependencies to be resolved concurrently. If a dependency has already completed, it will be skipped and its return value will be reused.

:::note

Older compatibility behavior supports explicit dependency lists with `depends=[...]`. For new workflows, use parameter names to express step dependencies.

:::

This pattern is useful for diamond shaped workflows, where a step depends on two or more other steps that can run concurrently.
10 changes: 5 additions & 5 deletions src/content/docs/workflows/python/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class MyWorkflow(WorkflowEntrypoint):
For example, a Workflow may be defined as:

```python
from workers import Response, WorkflowEntrypoint
from workers import Response, WorkflowEntrypoint, WorkerEntrypoint

class PythonWorkflowStarter(WorkflowEntrypoint):
async def run(self, event, step):
Expand All @@ -49,9 +49,10 @@ class PythonWorkflowStarter(WorkflowEntrypoint):
await step_1()
await step_2()

async def on_fetch(request, env):
await env.MY_WORKFLOW.create()
return Response("Hello world!")
class Default(WorkerEntrypoint):
async def fetch(self, request):
await self.env.MY_WORKFLOW.create()
return Response("Hello world!")
```

You must add both `python_workflows` and `python_workers` compatibility flags to your Wrangler configuration file.
Expand All @@ -64,7 +65,6 @@ You must add both `python_workflows` and `python_workers` compatibility flags to
"main": "src/entry.py",
"compatibility_flags": [
"python_workers",
"experimental",
"python_workflows"
],
"compatibility_date": "$today",
Expand Down
72 changes: 49 additions & 23 deletions src/content/docs/workflows/python/python-workers-api.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,30 @@ class MyWorkflow(WorkflowEntrypoint):

## WorkflowStep

* <code>step.do(name, depends=[], concurrent=False, config=None)</code> is a decorator that allows you to define a step in a workflow.
* `name` - the name of the step.
* `depends` - an optional list of steps that must complete before this step can run. See [DAG Workflows](/workflows/python/dag).
* `concurrent` - an optional boolean that indicates whether this step can run concurrently with other steps.
* `config` - an optional [`WorkflowStepConfig`](/workflows/build/workers-api/#workflowstepconfig) for configuring [step specific retry behaviour](/workflows/build/sleeping-and-retrying/). This is passed as a Python dictionary and then type translated into a `WorkflowStepConfig` object.
{/* prettier-ignore */}
- <code>step.do(name=None, *, concurrent=False, config=None)</code> — a decorator that allows you to define a step in a workflow.
- `name` — an optional name for the step. If omitted, the function name (`func.__name__`) is used.
- `concurrent` — an optional boolean that indicates whether dependencies for this step can run concurrently.
- `config` — an optional [`WorkflowStepConfig`](/workflows/build/workers-api/#workflowstepconfig) for configuring [step specific retry behaviour](/workflows/build/sleeping-and-retrying/). This is passed as a Python dictionary and then type translated into a `WorkflowStepConfig` object.

All parameters except `name` are keyword-only.

Dependencies are resolved implicitly by parameter name. If a step function parameter name matches a previously declared step function, its result is injected into the step.

If you define a `ctx` parameter, the step context is injected into that argument.

:::note

Older compatibility behavior supports explicit dependency lists with `depends=[...]`. For new workflows, prefer implicit dependency resolution by parameter name.

:::

```python
from workers import WorkflowEntrypoint

class MyWorkflow(WorkflowEntrypoint):
async def run(self, event, step):
@step.do("my first step")
@step.do()
async def my_first_step():
# do some work
return "Hello World!"
Expand All @@ -45,20 +57,20 @@ Note that the decorator doesn't make the call to the step, it just returns a cal
When returning state from a step, you must make sure that the returned value is serializable. Since steps run through an FFI layer, the returned value gets type translated via [FFI.](https://pyodide.org/en/stable/usage/api/python-api/ffi.html#pyodide.ffi.to_js)
Refer to [Pyodide's documentation](https://pyodide.org/en/stable/usage/type-conversions.html#type-translations-pyproxy-to-js) regarding type conversions for more information.

* <code>step.sleep(name, duration)</code>

* `name` - the name of the step.
* `duration` - the duration to sleep until, in either seconds or as a `WorkflowDuration` compatible string.
{/* prettier-ignore */}
- <code>step.sleep(name, duration)</code>
- `name` the name of the step.
- `duration` the duration to sleep until, in either seconds or as a `WorkflowDuration` compatible string.

```python
async def run(self, event, step):
await step.sleep("my-sleep-step", "10 seconds")
```

* <code>step.sleep_until(name, timestamp)</code>

* `name` - the name of the step.
* `timestamp` - a `datetime.datetime` object or seconds from the Unix epoch to sleep the workflow instance until.
{/* prettier-ignore */}
- <code>step.sleep_until(name, timestamp)</code>
- `name` the name of the step.
- `timestamp` a `datetime.datetime` object or seconds from the Unix epoch to sleep the workflow instance until.

```python
import datetime
Expand All @@ -67,11 +79,11 @@ async def run(self, event, step):
await step.sleep_until("my-sleep-step", datetime.datetime.now() + datetime.timedelta(seconds=10))
```

* <code>step.wait_for_event(name, event_type, timeout="24 hours")</code>

* `name` - the name of the step.
* `event_type` - the type of event to wait for.
* `timeout` - the timeout for the `wait_for_event` call. The default timeout is 24 hours.
{/* prettier-ignore */}
- <code>step.wait_for_event(name, event_type, timeout="24 hours")</code>
- `name` the name of the step.
- `event_type` the type of event to wait for.
- `timeout` the timeout for the `wait_for_event` call. The default timeout is 24 hours.

```python
async def run(self, event, step):
Expand All @@ -82,10 +94,10 @@ async def run(self, event, step):

The `event` parameter is a dictionary that contains the payload passed to the workflow instance, along with other metadata:

* <code>payload</code> - the payload passed to the workflow instance.
* <code>timestamp</code> - the timestamp that the workflow was triggered.
* <code>instanceId</code> - the ID of the current workflow instance.
* <code>workflowName</code> - the name of the workflow.
- <code>payload</code> - the payload passed to the workflow instance.
- <code>timestamp</code> - the timestamp that the workflow was triggered.
- <code>instanceId</code> - the ID of the current workflow instance.
- <code>workflowName</code> - the name of the workflow.

## Error Handling

Expand Down Expand Up @@ -141,6 +153,20 @@ class DemoWorkflowClass(WorkflowEntrypoint):
pass
```

### Access step context (`ctx`)

```python
from workers import WorkflowEntrypoint

class CtxWorkflow(WorkflowEntrypoint):
async def run(self, event, step):
@step.do()
async def read_context(ctx):
return ctx["attempt"]

return await read_context()
```

### Create an instance via binding

Note that `env` is a JavaScript object exposed to the Python script via [JsProxy](https://pyodide.org/en/stable/usage/api/python-api/ffi.html#pyodide.ffi.JsProxy). You can access the binding like you would on a JavaScript worker. Refer to the [Workflow binding documentation](/workflows/build/workers-api/#workflow) to learn more about the methods available.
Expand Down
Loading