Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,47 @@ All notable changes to the AxonFlow Python SDK will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [6.5.0] - 2026-04-21

### Added

- **`retry_context` and `idempotency_key` support on the step gate** —
`StepGateResponse` now carries a `retry_context` object on every gate call with the
true `(workflow_id, step_id)` lifecycle: `gate_count`, `completion_count`,
`prior_completion_status` (`PriorCompletionStatus` enum —
`NONE` / `COMPLETED` / `GATED_NOT_COMPLETED`), `prior_output_available`,
`prior_output`, `prior_completion_at`, `first_attempt_at`, `last_attempt_at`,
`last_decision`, and `idempotency_key`. Prefer these fields to the legacy
`cached` / `decision_source` fields.
- **`client.step_gate(..., include_prior_output=False)`** — new keyword-only argument.
When `True`, the SDK sends `?include_prior_output=true` on the gate call and
`retry_context.prior_output` is populated when a prior `/complete` has landed.
Existing callers that omit the kwarg behave unchanged.
- **`StepGateRequest.idempotency_key`** — caller-supplied opaque business-level key
(max 255 chars). Immutable once recorded on the first gate call for a
`(workflow_id, step_id)`; subsequent gate/complete calls must pass the same key.
- **`MarkStepCompletedRequest.idempotency_key`** — must match the key set on the
corresponding gate call, if any. Mismatch (including missing-vs-set on either side)
surfaces as a typed `IdempotencyKeyMismatchError`.
- **`IdempotencyKeyMismatchError`** — typed exception raised by `step_gate` and
`mark_step_completed` when the platform returns HTTP 409 with
`error.code == "IDEMPOTENCY_KEY_MISMATCH"`. Surfaces `workflow_id`, `step_id`,
`expected_idempotency_key`, `received_idempotency_key`, and the human-readable `message`.
Exported from `axonflow` top-level.
- **`RetryContext`, `PriorCompletionStatus`** — exported pydantic model + enum.

### Deprecated

- **`StepGateResponse.cached`** and **`StepGateResponse.decision_source`** — still
populated but deprecated in favor of `retry_context.gate_count > 1` and
`retry_context.prior_completion_status`. Planned for removal in a future major version.

### Compatibility

Companion to the platform change that introduces `retry_context` on
`POST /api/v1/workflows/{workflow_id}/steps/{step_id}/gate`. Additive only — existing
callers that never set `idempotency_key` or `include_prior_output` see no behavior change.

## [6.4.0] - 2026-04-18

### Added
Expand Down
6 changes: 6 additions & 0 deletions axonflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
ConfigurationError,
ConnectionError,
ConnectorError,
IdempotencyKeyMismatchError,
PlanExecutionError,
PolicyViolationError,
RateLimitError,
Expand Down Expand Up @@ -242,7 +243,9 @@
PendingApproval,
PendingApprovalsResponse,
PolicyMatch,
PriorCompletionStatus,
RejectStepResponse,
RetryContext,
RetryPolicy,
StepGateRequest,
StepGateResponse,
Expand Down Expand Up @@ -404,6 +407,7 @@
"ConnectionError",
"TimeoutError",
"ConnectorError",
"IdempotencyKeyMismatchError",
"PlanExecutionError",
# Cost Controls types
"BudgetScope",
Expand Down Expand Up @@ -440,6 +444,8 @@
"CreateWorkflowResponse",
"StepGateRequest",
"StepGateResponse",
"RetryContext",
"PriorCompletionStatus",
"WorkflowStepInfo",
"WorkflowStatusResponse",
"ListWorkflowsOptions",
Expand Down
2 changes: 1 addition & 1 deletion axonflow/_version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Single source of truth for the AxonFlow SDK version."""

__version__ = "6.4.0"
__version__ = "6.5.0"
132 changes: 119 additions & 13 deletions axonflow/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
BudgetExceededError,
ConnectionError,
ConnectorError,
IdempotencyKeyMismatchError,
PlanExecutionError,
PolicyViolationError,
TimeoutError,
Expand Down Expand Up @@ -210,6 +211,7 @@
PendingApprovalsResponse,
RejectStepResponse,
ResumeFromCheckpointResponse,
RetryContext,
StepGateRequest,
StepGateResponse,
StepType,
Expand Down Expand Up @@ -255,6 +257,38 @@ def normalize_fractional_seconds(match: re.Match[str]) -> str:
T = TypeVar("T")


def _parse_idempotency_key_mismatch(
response: httpx.Response,
*,
workflow_id: str,
step_id: str,
) -> IdempotencyKeyMismatchError | None:
"""Inspect a 409 response body for IDEMPOTENCY_KEY_MISMATCH.

Returns a typed :class:`IdempotencyKeyMismatchError` if the body matches the
contract shape (``error.code == "IDEMPOTENCY_KEY_MISMATCH"``), otherwise ``None``.
"""
try:
payload = response.json()
except ValueError:
return None
if not isinstance(payload, dict):
return None
err = payload.get("error")
if not isinstance(err, dict) or err.get("code") != "IDEMPOTENCY_KEY_MISMATCH":
return None
details = err.get("details") or {}
if not isinstance(details, dict):
details = {}
return IdempotencyKeyMismatchError(
message=str(err.get("message", "idempotency_key mismatch")),
workflow_id=str(details.get("workflow_id") or workflow_id),
step_id=str(details.get("step_id") or step_id),
expected_idempotency_key=str(details.get("expected_idempotency_key", "")),
received_idempotency_key=str(details.get("received_idempotency_key", "")),
)


def _parse_version(v: str) -> tuple[int, ...]:
"""Parse a semver string into a tuple of ints for correct numeric comparison."""
parts: list[int] = []
Expand Down Expand Up @@ -3999,6 +4033,8 @@ async def step_gate(
workflow_id: str,
step_id: str,
request: StepGateRequest,
*,
include_prior_output: bool = False,
) -> StepGateResponse:
"""Check if a workflow step is allowed to proceed (step gate).

Expand All @@ -4009,10 +4045,18 @@ async def step_gate(
workflow_id: Workflow ID
step_id: Unique step identifier (you provide this)
request: Step gate request with step details
include_prior_output: When True, sends ``?include_prior_output=true`` and
``retry_context.prior_output`` is populated when a prior /complete has
landed. Default False because prior output may be large and/or contain
sensitive data.

Returns:
Gate decision: allow, block, or require_approval

Raises:
IdempotencyKeyMismatchError: If ``request.idempotency_key`` conflicts with
the key recorded on an earlier gate call for this (workflow_id, step_id).

Example:
>>> gate = await client.step_gate(
... "wf_123",
Expand All @@ -4021,15 +4065,17 @@ async def step_gate(
... step_name="Generate Code",
... step_type=StepType.LLM_CALL,
... model="gpt-4",
... provider="openai"
... )
... provider="openai",
... idempotency_key="payment:wire:acct4471:invoice-7721",
... ),
... include_prior_output=True,
... )
>>> if gate.decision == GateDecision.BLOCK:
... raise Exception(f"Step blocked: {gate.reason}")
>>> elif gate.decision == GateDecision.REQUIRE_APPROVAL:
... print(f"Waiting for approval: {gate.approval_url}")
>>> if gate.retry_context and gate.retry_context.prior_completion_status == "completed":
... prior = gate.retry_context.prior_output # previous result, if any
"""
body = {
body: dict[str, Any] = {
"step_name": request.step_name,
"step_type": request.step_type.value,
"step_input": request.step_input,
Expand All @@ -4046,6 +4092,8 @@ async def step_gate(
body["tool_context"] = tc
if request.retry_policy is not None:
body["retry_policy"] = request.retry_policy.value
if request.idempotency_key is not None:
body["idempotency_key"] = request.idempotency_key

if self._config.debug:
self._logger.debug(
Expand All @@ -4055,15 +4103,21 @@ async def step_gate(
step_type=request.step_type.value,
)

response = await self._orchestrator_request(
"POST",
f"/api/v1/workflows/{workflow_id}/steps/{step_id}/gate",
json_data=body,
path = f"/api/v1/workflows/{workflow_id}/steps/{step_id}/gate"
if include_prior_output:
path += "?include_prior_output=true"
response = await self._step_request_with_idempotency_check(
path, body, workflow_id=workflow_id, step_id=step_id
)
if not isinstance(response, dict):
msg = "Unexpected response type from step gate"
raise TypeError(msg)

retry_context = None
rc_raw = response.get("retry_context")
if isinstance(rc_raw, dict):
retry_context = RetryContext.model_validate(rc_raw)

return StepGateResponse(
decision=GateDecision(response["decision"]),
step_id=response["step_id"],
Expand All @@ -4074,6 +4128,7 @@ async def step_gate(
policies_matched=response.get("policies_matched"),
cached=response.get("cached", False),
decision_source=response.get("decision_source"),
retry_context=retry_context,
)

async def mark_step_completed(
Expand All @@ -4091,6 +4146,10 @@ async def mark_step_completed(
step_id: Step ID
request: Optional completion request with output data

Raises:
IdempotencyKeyMismatchError: If ``request.idempotency_key`` does not match the
key recorded on the earlier gate call for this (workflow_id, step_id).

Example:
>>> await client.mark_step_completed(
... "wf_123",
Expand All @@ -4107,16 +4166,54 @@ async def mark_step_completed(
body["tokens_out"] = request.tokens_out
if request.cost_usd is not None:
body["cost_usd"] = request.cost_usd
if request.idempotency_key is not None:
body["idempotency_key"] = request.idempotency_key

await self._orchestrator_request(
"POST",
await self._step_request_with_idempotency_check(
f"/api/v1/workflows/{workflow_id}/steps/{step_id}/complete",
json_data=body,
body,
workflow_id=workflow_id,
step_id=step_id,
)

if self._config.debug:
self._logger.debug("Step marked completed", workflow_id=workflow_id, step_id=step_id)

async def _step_request_with_idempotency_check(
self,
path: str,
body: dict[str, Any],
*,
workflow_id: str,
step_id: str,
) -> dict[str, Any] | list[Any] | None:
"""POST to a step gate/complete endpoint, mapping 409 IDEMPOTENCY_KEY_MISMATCH to
IdempotencyKeyMismatchError. All other errors are handled like _orchestrator_request.
"""
url = f"{self._config.endpoint}{path}"
try:
response = await self._http_client.request("POST", url, json=body)
response.raise_for_status()
if response.status_code == 204: # noqa: PLR2004
return None
result: dict[str, Any] | list[Any] = response.json()
return result # noqa: TRY300
except httpx.ConnectError as e:
msg = f"Failed to connect to Orchestrator: {e}"
raise ConnectionError(msg) from e
except httpx.TimeoutException as e:
msg = f"Request timed out: {e}"
raise TimeoutError(msg) from e
except httpx.HTTPStatusError as e:
if e.response.status_code == 409: # noqa: PLR2004
idem = _parse_idempotency_key_mismatch(
e.response, workflow_id=workflow_id, step_id=step_id
)
if idem is not None:
raise idem from e
msg = f"HTTP {e.response.status_code}: {e.response.text}"
raise AxonFlowError(msg) from e

async def complete_workflow(self, workflow_id: str) -> None:
"""Complete a workflow successfully.

Expand Down Expand Up @@ -7109,9 +7206,18 @@ def step_gate(
workflow_id: str,
step_id: str,
request: StepGateRequest,
*,
include_prior_output: bool = False,
) -> StepGateResponse:
"""Check policy gate for a workflow step."""
return self._run_sync(self._async_client.step_gate(workflow_id, step_id, request))
return self._run_sync(
self._async_client.step_gate(
workflow_id,
step_id,
request,
include_prior_output=include_prior_output,
)
)

def mark_step_completed(
self,
Expand Down
35 changes: 35 additions & 0 deletions axonflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,38 @@ def __init__(
self.plan_id = plan_id
self.expected_version = expected_version
self.current_version = current_version


class IdempotencyKeyMismatchError(AxonFlowError):
"""Idempotency key mismatch on a step gate or complete call (HTTP 409).

Raised when an ``idempotency_key`` on a ``/gate`` or ``/complete`` request conflicts
with the key recorded on an earlier gate call for the same ``(workflow_id, step_id)``.
Maps to HTTP 409 with ``error.code == "IDEMPOTENCY_KEY_MISMATCH"``.

``expected_idempotency_key`` is the empty string ``""`` when the gate call had no
key but complete did; conversely ``received_idempotency_key`` is ``""`` when complete
omitted a key that gate had set.
"""

def __init__(
self,
message: str,
workflow_id: str,
step_id: str,
expected_idempotency_key: str,
received_idempotency_key: str,
) -> None:
super().__init__(
message,
details={
"workflow_id": workflow_id,
"step_id": step_id,
"expected_idempotency_key": expected_idempotency_key,
"received_idempotency_key": received_idempotency_key,
},
)
self.workflow_id = workflow_id
self.step_id = step_id
self.expected_idempotency_key = expected_idempotency_key
self.received_idempotency_key = received_idempotency_key
Loading
Loading