Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,328 changes: 2,328 additions & 0 deletions cross-sdk-design.md

Large diffs are not rendered by default.

185 changes: 185 additions & 0 deletions standalone-activity-todo.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
# Standalone Activities Implementation - Remaining Work

This document tracks the remaining work for the Python SDK implementation of Standalone Activities.

Reference: See `cross-sdk-design.md` for the full cross-SDK design specification.

---

## Status Overview

| Category | Status |
|----------|--------|
| Core client methods (`start_activity`, `execute_activity`, `list_activities`, `count_activities`, `get_activity_handle`) | ✅ Complete |
| `ActivityHandle` with `result()`, `describe()`, `cancel()`, `terminate()` | ✅ Complete |
| Type-safe overloads (matching workflow activities) | ✅ Complete |
| `ActivityExecution` and `ActivityExecutionDescription` dataclasses | ⚠️ Missing fields |
| Interceptor support | ✅ Complete |
| `activity.Info` changes | ✅ Complete |
| `ActivitySerializationContext` changes | ✅ Complete |
| Basic tests | ✅ Complete |
| Type checking tests | ✅ Complete |

---

## Must Complete (Missing from Spec)

### 1. `get_activity_handle()` Implementation
- **Status:** ✅ Complete
- **Location:** `temporalio/client.py`
- **Description:** Returns a handle to an existing standalone activity by ID, allowing callers to get results, describe, cancel, or terminate an activity they didn't start.
- **Test:** `tests/test_activity.py::test_get_activity_handle`

### 2. Missing Field: `ActivityExecution.state_transition_count`
- **Status:** ❌ Not implemented
- **Location:** `temporalio/client.py` - `ActivityExecution` dataclass
- **Spec:** `state_transition_count: Optional[int]` - not always present on List operation, see proto docs
- **Effort:** Low

### 3. Missing Field: `ActivityExecutionDescription.eager_execution_requested`
- **Status:** ❌ Not implemented
- **Location:** `temporalio/client.py` - `ActivityExecutionDescription` dataclass
- **Spec:** `eager_execution_requested: bool`
- **Effort:** Low

### 4. Missing Field: `ActivityExecutionDescription.paused`
- **Status:** ❌ Not implemented
- **Location:** `temporalio/client.py` - `ActivityExecutionDescription` dataclass
- **Spec:** `paused: bool`
- **Effort:** Low

### 5. Type Fix: `ActivityExecutionCountAggregationGroup.group_values`
- **Status:** ❌ Incorrect type
- **Location:** `temporalio/client.py` - `ActivityExecutionCountAggregationGroup` dataclass
- **Current:** `Sequence[Any]`
- **Spec:** `Sequence[temporalio.common.SearchAttributeValue]`
- **Effort:** Low

---

## Intentionally Deferred (Significant Refactoring Required)

### 1. `GetActivityResultInput` and `get_activity_result` Interceptor Method
- **Status:** 🔄 Deferred
- **Description:** The current implementation caches the result directly in `ActivityHandle._known_outcome` and doesn't expose a separate interceptor point for getting results. Adding this would require refactoring the result caching logic.
- **Decision:** Intentional - not blocking release

### 2. `ActivityExecutionDescription` Does Not Extend `ActivityExecution`
- **Status:** 🔄 Deferred
- **Description:** Python frozen dataclasses don't support inheritance well. The two classes are separate with duplicated fields.
- **Decision:** Stylistic difference, doesn't affect functionality

### 3. Typed Overload Methods
- **Status:** 🔄 Deferred
- **Description:** The following methods are not implemented:
- `start_activity_class`
- `start_activity_method`
- `execute_activity_class`
- `execute_activity_method`
- **Decision:** Optional per spec - provides better type inference for class-based and method-based activity definitions

---

## Spec Documentation Needed

These items are implemented but not documented in the spec. The spec should be updated to include them.

### 1. `PendingActivityState` Enum
- **Location:** `temporalio/common.py`
- **Description:** Added to support `ActivityExecutionDescription.run_state`
- **Action:** Add to Python section of `cross-sdk-design.md`

### 2. `ActivityFailedError` Exception Class
- **Location:** `temporalio/client.py`
- **Description:** New error class for standalone activity failures, distinct from workflow `ActivityError` which has required history event fields
- **Action:** Add to Python section of `cross-sdk-design.md`

### 3. `ActivityExecutionDescription.input` Field
- **Location:** `temporalio/client.py`
- **Description:** Extra field providing deserialized activity input. Useful for debugging.
- **Action:** Decide if spec should include this or if it's Python-specific

---

## Minor Type Differences to Review

| Field | Spec | Implementation | Notes |
|-------|------|----------------|-------|
| `ActivityHandle.activity_run_id` | `Optional[str]` | ✅ `str \| None` | Fixed - now matches spec |
| `ActivityExecutionDescription.retry_policy` | `Optional` | Not optional | Should verify proto field optionality |

---

## Test Coverage

### Existing Tests (`tests/test_activity.py`)
- ✅ `test_describe` - Describe a running activity
- ✅ `test_get_result` - Get result after activity completes
- ✅ `test_get_activity_handle` - Get handle by ID, with/without run_id and result_type
- ✅ `test_manual_completion` - Complete activity manually via async handle
- ✅ `test_manual_cancellation` - Cancel activity then report cancellation via async handle
- ✅ `test_manual_failure` - Fail activity manually via async handle
- ✅ `test_manual_heartbeat` - Heartbeat from async handle

### Additional Tests Needed

#### Functional Tests
- [ ] Test `list_activities()` with various queries
- [ ] Test `count_activities()` with various queries
- [ ] Test activity ID reuse policies
- [ ] Test activity ID conflict policies
- [ ] Test search attributes on activities
- [ ] Test priority on activities
- [ ] Test retry policy behavior
- [ ] Test cancellation flow (worker-side)
- [ ] Test termination flow

#### Overload/API Variation Tests
Tests for different ways to call `start_activity`/`execute_activity`:
- ✅ Activity by callable (typed): `client.start_activity(my_activity, args=[arg1, arg2], ...)`
- ✅ Activity by name (string): `client.start_activity("my_activity", args=[arg1], result_type=MyResult, ...)`
- ✅ Single arg as positional: `client.start_activity(my_activity, arg1, ...)`
- ✅ Async activity function (`async def my_activity`)
- ✅ Sync activity function (`def my_activity`)
- ✅ With explicit `result_type` parameter
- ✅ Without `result_type` (inferred from callable)

#### Type Checking Tests (using `tests/test_type_errors.py` machinery)

✅ **DONE** - Created `tests/test_activity_type_errors.py`

**Working type checks:**
- Infers `ActivityHandle[ReturnType]` from typed async/sync activity callables
- Catches wrong type assignments for `handle.result()` and `execute_activity()` results
- Catches missing required parameters (`id`, `task_queue`)
- Catches `ActivityHandle` type parameter mismatches
- Catches wrong argument types with type-safe single-param overloads

**Overloads implemented** (matching workflow activity overloads):
1. `CallableAsyncNoParam[ReturnType]` - async, no params
2. `CallableSyncNoParam[ReturnType]` - sync, no params
3. `CallableAsyncSingleParam[ParamType, ReturnType]` with `arg: ParamType` - async, typed single param
4. `CallableSyncSingleParam[ParamType, ReturnType]` with `arg: ParamType` - sync, typed single param
5. `Callable[..., Awaitable[ReturnType]]` with `args: Sequence[Any]` - async, multi-param
6. `Callable[..., ReturnType]` with `args: Sequence[Any]` - sync, multi-param
7. `str` with `arg: Any`, `args: Sequence[Any]` - string name

---

## Notes

### Breaking Change: `activity.Info` Fields Now Optional
The following fields in `activity.Info` are now `str | None` instead of `str`:
- `workflow_id`
- `workflow_namespace` (deprecated, use `namespace`)
- `workflow_run_id`
- `workflow_type`

For standalone activities, these will be `None`. Code accessing these fields should check for `None` or use the new `in_workflow` property.

### New Field: `activity.Info.namespace`
A new non-optional `namespace` field has been added that is always set, regardless of whether the activity is standalone or workflow-triggered.

### Deprecation: `activity.Info.workflow_namespace`
This field is deprecated in favor of `namespace`. Both fields have the same value when set.

48 changes: 38 additions & 10 deletions temporalio/activity.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should go ahead and put here the expected changes to activity runtime. Specifically I assume all workflow_-prefixed fields of Info will become optional. I would also recommend either a "kind" enumerate for activities, or add an is_standalone akin to is_local so users can know it's not the traditional activity.

Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@
from typing import (
TYPE_CHECKING,
Any,
List,
NoReturn,
Optional,
Tuple,
Type,
Union,
overload,
)

Expand Down Expand Up @@ -109,16 +105,26 @@ class Info:
heartbeat_details: Sequence[Any]
heartbeat_timeout: timedelta | None
is_local: bool
namespace: str
"""Namespace the activity is running in."""
schedule_to_close_timeout: timedelta | None
scheduled_time: datetime
start_to_close_timeout: timedelta | None
started_time: datetime
task_queue: str
task_token: bytes
workflow_id: str
workflow_namespace: str
workflow_run_id: str
workflow_type: str
workflow_id: str | None
"""ID of the workflow that started this activity. None for standalone activities."""
workflow_namespace: str | None
"""Namespace of the workflow that started this activity. None for standalone activities.

.. deprecated::
Use :py:attr:`namespace` instead.
"""
workflow_run_id: str | None
"""Run ID of the workflow that started this activity. None for standalone activities."""
workflow_type: str | None
"""Type of the workflow that started this activity. None for standalone activities."""
priority: temporalio.common.Priority
retry_policy: temporalio.common.RetryPolicy | None
"""The retry policy of this activity.
Expand All @@ -127,14 +133,22 @@ class Info:
If the value is None, it means the server didn't send information about retry policy (e.g. due to old server
version), but it may still be defined server-side."""

activity_run_id: str | None = None
"""Run ID of this standalone activity. None for workflow activities."""

@property
def in_workflow(self) -> bool:
"""Whether this activity was started by a workflow (vs. standalone)."""
return self.workflow_id is not None

# TODO(cretz): Consider putting identity on here for "worker_id" for logger?

def _logger_details(self) -> Mapping[str, Any]:
return {
"activity_id": self.activity_id,
"activity_type": self.activity_type,
"attempt": self.attempt,
"namespace": self.workflow_namespace,
"namespace": self.namespace,
"task_queue": self.task_queue,
"workflow_id": self.workflow_id,
"workflow_run_id": self.workflow_run_id,
Expand Down Expand Up @@ -243,7 +257,7 @@ def metric_meter(self) -> temporalio.common.MetricMeter:
info = self.info()
self._metric_meter = self.runtime_metric_meter.with_additional_attributes(
{
"namespace": info.workflow_namespace,
"namespace": info.namespace,
"task_queue": info.task_queue,
"activity_type": info.activity_type,
}
Expand Down Expand Up @@ -582,6 +596,20 @@ def must_from_callable(fn: Callable) -> _Definition:
f"Activity {fn_name} missing attributes, was it decorated with @activity.defn?"
)

@classmethod
def get_name_and_result_type(
Copy link
Member

@cretz cretz Oct 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're extracting this common logic out of workflow_instance.py, can we update workflow_instance.py use this too? Also, then can we get rid of must_from_callable and inline it into this method since it won't be called anywhere anymore?

cls, name_or_run_fn: str | Callable[..., Any]
) -> tuple[str, Type | None]:
if isinstance(name_or_run_fn, str):
return name_or_run_fn, None
elif callable(name_or_run_fn):
defn = cls.must_from_callable(name_or_run_fn)
if not defn.name:
raise ValueError(f"Activity {name_or_run_fn} definition has no name")
return defn.name, defn.ret_type
else:
raise TypeError("Activity must be a string or callable")

@staticmethod
def _apply_to_callable(
fn: Callable,
Expand Down
10 changes: 9 additions & 1 deletion temporalio/api/activity/v1/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
from .message_pb2 import ActivityOptions
from .message_pb2 import (
ActivityExecutionInfo,
ActivityExecutionListInfo,
ActivityExecutionOutcome,
ActivityOptions,
)

__all__ = [
"ActivityExecutionInfo",
"ActivityExecutionListInfo",
"ActivityExecutionOutcome",
"ActivityOptions",
]
Loading
Loading