diff --git a/CHANGELOG.md b/CHANGELOG.md index b956d6b..9066400 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,47 @@ All notable changes to the AxonFlow Python SDK will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [6.5.0] - 2026-04-21 + +### Added + +- **`retry_context` and `idempotency_key` support on the step gate** — + `StepGateResponse` now carries a `retry_context` object on every gate call with the + true `(workflow_id, step_id)` lifecycle: `gate_count`, `completion_count`, + `prior_completion_status` (`PriorCompletionStatus` enum — + `NONE` / `COMPLETED` / `GATED_NOT_COMPLETED`), `prior_output_available`, + `prior_output`, `prior_completion_at`, `first_attempt_at`, `last_attempt_at`, + `last_decision`, and `idempotency_key`. Prefer these fields to the legacy + `cached` / `decision_source` fields. +- **`client.step_gate(..., include_prior_output=False)`** — new keyword-only argument. + When `True`, the SDK sends `?include_prior_output=true` on the gate call and + `retry_context.prior_output` is populated when a prior `/complete` has landed. + Existing callers that omit the kwarg behave unchanged. +- **`StepGateRequest.idempotency_key`** — caller-supplied opaque business-level key + (max 255 chars). Immutable once recorded on the first gate call for a + `(workflow_id, step_id)`; subsequent gate/complete calls must pass the same key. +- **`MarkStepCompletedRequest.idempotency_key`** — must match the key set on the + corresponding gate call, if any. Mismatch (including missing-vs-set on either side) + surfaces as a typed `IdempotencyKeyMismatchError`. +- **`IdempotencyKeyMismatchError`** — typed exception raised by `step_gate` and + `mark_step_completed` when the platform returns HTTP 409 with + `error.code == "IDEMPOTENCY_KEY_MISMATCH"`. Surfaces `workflow_id`, `step_id`, + `expected_idempotency_key`, `received_idempotency_key`, and the human-readable `message`. + Exported from `axonflow` top-level. +- **`RetryContext`, `PriorCompletionStatus`** — exported pydantic model + enum. + +### Deprecated + +- **`StepGateResponse.cached`** and **`StepGateResponse.decision_source`** — still + populated but deprecated in favor of `retry_context.gate_count > 1` and + `retry_context.prior_completion_status`. Planned for removal in a future major version. + +### Compatibility + +Companion to the platform change that introduces `retry_context` on +`POST /api/v1/workflows/{workflow_id}/steps/{step_id}/gate`. Additive only — existing +callers that never set `idempotency_key` or `include_prior_output` see no behavior change. + ## [6.4.0] - 2026-04-18 ### Added diff --git a/axonflow/__init__.py b/axonflow/__init__.py index 6381f03..ce05ec3 100644 --- a/axonflow/__init__.py +++ b/axonflow/__init__.py @@ -59,6 +59,7 @@ ConfigurationError, ConnectionError, ConnectorError, + IdempotencyKeyMismatchError, PlanExecutionError, PolicyViolationError, RateLimitError, @@ -242,7 +243,9 @@ PendingApproval, PendingApprovalsResponse, PolicyMatch, + PriorCompletionStatus, RejectStepResponse, + RetryContext, RetryPolicy, StepGateRequest, StepGateResponse, @@ -404,6 +407,7 @@ "ConnectionError", "TimeoutError", "ConnectorError", + "IdempotencyKeyMismatchError", "PlanExecutionError", # Cost Controls types "BudgetScope", @@ -440,6 +444,8 @@ "CreateWorkflowResponse", "StepGateRequest", "StepGateResponse", + "RetryContext", + "PriorCompletionStatus", "WorkflowStepInfo", "WorkflowStatusResponse", "ListWorkflowsOptions", diff --git a/axonflow/_version.py b/axonflow/_version.py index ac5b762..404dfff 100644 --- a/axonflow/_version.py +++ b/axonflow/_version.py @@ -1,3 +1,3 @@ """Single source of truth for the AxonFlow SDK version.""" -__version__ = "6.4.0" +__version__ = "6.5.0" diff --git a/axonflow/client.py b/axonflow/client.py index 4b605c9..4dbbc5d 100644 --- a/axonflow/client.py +++ b/axonflow/client.py @@ -84,6 +84,7 @@ BudgetExceededError, ConnectionError, ConnectorError, + IdempotencyKeyMismatchError, PlanExecutionError, PolicyViolationError, TimeoutError, @@ -210,6 +211,7 @@ PendingApprovalsResponse, RejectStepResponse, ResumeFromCheckpointResponse, + RetryContext, StepGateRequest, StepGateResponse, StepType, @@ -255,6 +257,38 @@ def normalize_fractional_seconds(match: re.Match[str]) -> str: T = TypeVar("T") +def _parse_idempotency_key_mismatch( + response: httpx.Response, + *, + workflow_id: str, + step_id: str, +) -> IdempotencyKeyMismatchError | None: + """Inspect a 409 response body for IDEMPOTENCY_KEY_MISMATCH. + + Returns a typed :class:`IdempotencyKeyMismatchError` if the body matches the + contract shape (``error.code == "IDEMPOTENCY_KEY_MISMATCH"``), otherwise ``None``. + """ + try: + payload = response.json() + except ValueError: + return None + if not isinstance(payload, dict): + return None + err = payload.get("error") + if not isinstance(err, dict) or err.get("code") != "IDEMPOTENCY_KEY_MISMATCH": + return None + details = err.get("details") or {} + if not isinstance(details, dict): + details = {} + return IdempotencyKeyMismatchError( + message=str(err.get("message", "idempotency_key mismatch")), + workflow_id=str(details.get("workflow_id") or workflow_id), + step_id=str(details.get("step_id") or step_id), + expected_idempotency_key=str(details.get("expected_idempotency_key", "")), + received_idempotency_key=str(details.get("received_idempotency_key", "")), + ) + + def _parse_version(v: str) -> tuple[int, ...]: """Parse a semver string into a tuple of ints for correct numeric comparison.""" parts: list[int] = [] @@ -3999,6 +4033,8 @@ async def step_gate( workflow_id: str, step_id: str, request: StepGateRequest, + *, + include_prior_output: bool = False, ) -> StepGateResponse: """Check if a workflow step is allowed to proceed (step gate). @@ -4009,10 +4045,18 @@ async def step_gate( workflow_id: Workflow ID step_id: Unique step identifier (you provide this) request: Step gate request with step details + include_prior_output: When True, sends ``?include_prior_output=true`` and + ``retry_context.prior_output`` is populated when a prior /complete has + landed. Default False because prior output may be large and/or contain + sensitive data. Returns: Gate decision: allow, block, or require_approval + Raises: + IdempotencyKeyMismatchError: If ``request.idempotency_key`` conflicts with + the key recorded on an earlier gate call for this (workflow_id, step_id). + Example: >>> gate = await client.step_gate( ... "wf_123", @@ -4021,15 +4065,17 @@ async def step_gate( ... step_name="Generate Code", ... step_type=StepType.LLM_CALL, ... model="gpt-4", - ... provider="openai" - ... ) + ... provider="openai", + ... idempotency_key="payment:wire:acct4471:invoice-7721", + ... ), + ... include_prior_output=True, ... ) >>> if gate.decision == GateDecision.BLOCK: ... raise Exception(f"Step blocked: {gate.reason}") - >>> elif gate.decision == GateDecision.REQUIRE_APPROVAL: - ... print(f"Waiting for approval: {gate.approval_url}") + >>> if gate.retry_context and gate.retry_context.prior_completion_status == "completed": + ... prior = gate.retry_context.prior_output # previous result, if any """ - body = { + body: dict[str, Any] = { "step_name": request.step_name, "step_type": request.step_type.value, "step_input": request.step_input, @@ -4046,6 +4092,8 @@ async def step_gate( body["tool_context"] = tc if request.retry_policy is not None: body["retry_policy"] = request.retry_policy.value + if request.idempotency_key is not None: + body["idempotency_key"] = request.idempotency_key if self._config.debug: self._logger.debug( @@ -4055,15 +4103,21 @@ async def step_gate( step_type=request.step_type.value, ) - response = await self._orchestrator_request( - "POST", - f"/api/v1/workflows/{workflow_id}/steps/{step_id}/gate", - json_data=body, + path = f"/api/v1/workflows/{workflow_id}/steps/{step_id}/gate" + if include_prior_output: + path += "?include_prior_output=true" + response = await self._step_request_with_idempotency_check( + path, body, workflow_id=workflow_id, step_id=step_id ) if not isinstance(response, dict): msg = "Unexpected response type from step gate" raise TypeError(msg) + retry_context = None + rc_raw = response.get("retry_context") + if isinstance(rc_raw, dict): + retry_context = RetryContext.model_validate(rc_raw) + return StepGateResponse( decision=GateDecision(response["decision"]), step_id=response["step_id"], @@ -4074,6 +4128,7 @@ async def step_gate( policies_matched=response.get("policies_matched"), cached=response.get("cached", False), decision_source=response.get("decision_source"), + retry_context=retry_context, ) async def mark_step_completed( @@ -4091,6 +4146,10 @@ async def mark_step_completed( step_id: Step ID request: Optional completion request with output data + Raises: + IdempotencyKeyMismatchError: If ``request.idempotency_key`` does not match the + key recorded on the earlier gate call for this (workflow_id, step_id). + Example: >>> await client.mark_step_completed( ... "wf_123", @@ -4107,16 +4166,54 @@ async def mark_step_completed( body["tokens_out"] = request.tokens_out if request.cost_usd is not None: body["cost_usd"] = request.cost_usd + if request.idempotency_key is not None: + body["idempotency_key"] = request.idempotency_key - await self._orchestrator_request( - "POST", + await self._step_request_with_idempotency_check( f"/api/v1/workflows/{workflow_id}/steps/{step_id}/complete", - json_data=body, + body, + workflow_id=workflow_id, + step_id=step_id, ) if self._config.debug: self._logger.debug("Step marked completed", workflow_id=workflow_id, step_id=step_id) + async def _step_request_with_idempotency_check( + self, + path: str, + body: dict[str, Any], + *, + workflow_id: str, + step_id: str, + ) -> dict[str, Any] | list[Any] | None: + """POST to a step gate/complete endpoint, mapping 409 IDEMPOTENCY_KEY_MISMATCH to + IdempotencyKeyMismatchError. All other errors are handled like _orchestrator_request. + """ + url = f"{self._config.endpoint}{path}" + try: + response = await self._http_client.request("POST", url, json=body) + response.raise_for_status() + if response.status_code == 204: # noqa: PLR2004 + return None + result: dict[str, Any] | list[Any] = response.json() + return result # noqa: TRY300 + except httpx.ConnectError as e: + msg = f"Failed to connect to Orchestrator: {e}" + raise ConnectionError(msg) from e + except httpx.TimeoutException as e: + msg = f"Request timed out: {e}" + raise TimeoutError(msg) from e + except httpx.HTTPStatusError as e: + if e.response.status_code == 409: # noqa: PLR2004 + idem = _parse_idempotency_key_mismatch( + e.response, workflow_id=workflow_id, step_id=step_id + ) + if idem is not None: + raise idem from e + msg = f"HTTP {e.response.status_code}: {e.response.text}" + raise AxonFlowError(msg) from e + async def complete_workflow(self, workflow_id: str) -> None: """Complete a workflow successfully. @@ -7109,9 +7206,18 @@ def step_gate( workflow_id: str, step_id: str, request: StepGateRequest, + *, + include_prior_output: bool = False, ) -> StepGateResponse: """Check policy gate for a workflow step.""" - return self._run_sync(self._async_client.step_gate(workflow_id, step_id, request)) + return self._run_sync( + self._async_client.step_gate( + workflow_id, + step_id, + request, + include_prior_output=include_prior_output, + ) + ) def mark_step_completed( self, diff --git a/axonflow/exceptions.py b/axonflow/exceptions.py index ab51a73..e4e5d4a 100644 --- a/axonflow/exceptions.py +++ b/axonflow/exceptions.py @@ -162,3 +162,38 @@ def __init__( self.plan_id = plan_id self.expected_version = expected_version self.current_version = current_version + + +class IdempotencyKeyMismatchError(AxonFlowError): + """Idempotency key mismatch on a step gate or complete call (HTTP 409). + + Raised when an ``idempotency_key`` on a ``/gate`` or ``/complete`` request conflicts + with the key recorded on an earlier gate call for the same ``(workflow_id, step_id)``. + Maps to HTTP 409 with ``error.code == "IDEMPOTENCY_KEY_MISMATCH"``. + + ``expected_idempotency_key`` is the empty string ``""`` when the gate call had no + key but complete did; conversely ``received_idempotency_key`` is ``""`` when complete + omitted a key that gate had set. + """ + + def __init__( + self, + message: str, + workflow_id: str, + step_id: str, + expected_idempotency_key: str, + received_idempotency_key: str, + ) -> None: + super().__init__( + message, + details={ + "workflow_id": workflow_id, + "step_id": step_id, + "expected_idempotency_key": expected_idempotency_key, + "received_idempotency_key": received_idempotency_key, + }, + ) + self.workflow_id = workflow_id + self.step_id = step_id + self.expected_idempotency_key = expected_idempotency_key + self.received_idempotency_key = received_idempotency_key diff --git a/axonflow/workflow.py b/axonflow/workflow.py index c20722d..c3d3739 100644 --- a/axonflow/workflow.py +++ b/axonflow/workflow.py @@ -114,6 +114,79 @@ class RetryPolicy(str, Enum): """Force fresh policy evaluation regardless of prior decision.""" +class PriorCompletionStatus(str, Enum): + """State of the prior gate+complete cycle for a step.""" + + NONE = "none" + """First gate call, no prior gates on this step.""" + COMPLETED = "completed" + """A prior gate call and a prior /complete both landed for this (workflow_id, step_id).""" + GATED_NOT_COMPLETED = "gated_not_completed" + """A prior gate landed but no /complete has followed for this (workflow_id, step_id).""" + + +class RetryContext(BaseModel): + """First-class state signal returned on every step gate response. + + Replaces the ambiguous ``cached: bool`` field. Callers should migrate off + ``cached`` and ``decision_source`` to the richer fields here. + """ + + gate_count: int = Field( + ..., + ge=1, + description=( + "Number of /gate calls for this (workflow_id, step_id), including the current call." + ), + ) + completion_count: int = Field( + ..., + ge=0, + description="Number of successful /complete calls for this (workflow_id, step_id).", + ) + prior_completion_status: PriorCompletionStatus = Field( + ..., description="Whether a prior gate+complete cycle has landed." + ) + prior_output_available: bool = Field( + ..., + description='True iff prior_completion_status == "completed".', + ) + prior_output: dict[str, Any] | None = Field( + default=None, + description=( + "Output from the prior /complete, or None. Non-None only when the gate call was made " + "with include_prior_output=True AND prior_completion_status == 'completed'." + ), + ) + prior_completion_at: datetime | None = Field( + default=None, + description="Timestamp of the prior /complete, if any.", + ) + first_attempt_at: datetime = Field( + ..., + description="Timestamp of the first gate call for this (workflow_id, step_id).", + ) + last_attempt_at: datetime = Field( + ..., + description="Timestamp of the current gate call.", + ) + last_decision: GateDecision = Field( + ..., + description=( + "Decision of the immediately prior gate call. On first call, equals the current " + "call's decision." + ), + ) + idempotency_key: str = Field( + default="", + description=( + "Key the caller set on this step (from the first gate call that supplied one), or " + 'empty string "" if the caller never supplied one. Always present — never None — ' + "per the wire contract (WCP_RETRY_IDEMPOTENCY_WIRE_CONTRACT.md §3). Immutable once set." + ), + ) + + class StepGateRequest(BaseModel): """Request to check if a step is allowed to proceed.""" @@ -132,6 +205,16 @@ class StepGateRequest(BaseModel): description='Retry behavior: "idempotent" (default) returns cached decision, ' '"reevaluate" forces fresh evaluation', ) + idempotency_key: str | None = Field( + default=None, + max_length=255, + description=( + "Caller-supplied opaque business-level key. Once set on the first gate call for a " + "(workflow_id, step_id), it is immutable — subsequent gate/complete calls must pass " + "the same key or raise IdempotencyKeyMismatchError. Echoed on " + "retry_context.idempotency_key." + ), + ) class StepGateResponse(BaseModel): @@ -160,11 +243,25 @@ class StepGateResponse(BaseModel): ) cached: bool = Field( default=False, - description="Whether this response was served from a prior decision", + description=( + "[DEPRECATED] Use retry_context.gate_count > 1 instead. " + "Will be removed in a future major version." + ), ) decision_source: str | None = Field( default=None, - description='How the decision was produced: "fresh" or "cached"', + description=( + "[DEPRECATED] Use retry_context.prior_completion_status instead. " + "Will be removed in a future major version." + ), + ) + retry_context: RetryContext | None = Field( + default=None, + description=( + "First-class state signal for (workflow_id, step_id). Always present on every gate " + "response from platform v7.3.0+. Nullable in the SDK model only so older platform " + "responses that omit it still parse; expect it populated in practice." + ), ) def is_allowed(self) -> bool: @@ -288,6 +385,14 @@ class MarkStepCompletedRequest(BaseModel): tokens_in: int | None = Field(default=None, ge=0, description="Input tokens consumed") tokens_out: int | None = Field(default=None, ge=0, description="Output tokens produced") cost_usd: float | None = Field(default=None, ge=0, description="Cost in USD") + idempotency_key: str | None = Field( + default=None, + max_length=255, + description=( + "Must match the key passed on the corresponding gate call, if any. Mismatch " + "(including missing-vs-set on either side) raises IdempotencyKeyMismatchError." + ), + ) class AbortWorkflowRequest(BaseModel): diff --git a/examples/wcp_retry_idempotency.py b/examples/wcp_retry_idempotency.py new file mode 100644 index 0000000..3c78cae --- /dev/null +++ b/examples/wcp_retry_idempotency.py @@ -0,0 +1,210 @@ +"""WCP retry_context + idempotency_key E2E example (Issue #1673 Phase 1 + 2). + +Exercises the new SDK surface end-to-end against a running v7.3.0 enterprise +stack. Every assertion fails the process on mismatch. + +Run: + source /tmp/axonflow-e2e-env.sh + export AXONFLOW_BASE_URL=http://localhost:8080 + python examples/wcp_retry_idempotency.py +""" + +from __future__ import annotations + +import asyncio +import os +import sys + +from axonflow import AxonFlow +from axonflow.exceptions import IdempotencyKeyMismatchError +from axonflow.workflow import ( + CreateWorkflowRequest, + MarkStepCompletedRequest, + StepGateRequest, + StepType, +) + + +def must_env(name: str) -> str: + v = os.environ.get(name) + if not v: + print(f"missing env: {name}", file=sys.stderr) + sys.exit(1) + return v + + +def banner(msg: str) -> None: + print() + print("━━━", msg, "━━━") + + +def fail(msg: str) -> None: + print(f"FAIL: {msg}", file=sys.stderr) + sys.exit(1) + + +def assert_eq(label: str, want: object, got: object) -> None: + if want != got: + fail(f"{label}: want {want!r}, got {got!r}") + + +def assert_true(label: str, cond: bool) -> None: + if not cond: + fail(f"assertion failed: {label}") + + +async def act1(client: AxonFlow) -> None: + wf = await client.create_workflow(CreateWorkflowRequest(workflow_name="py-sdk-retry-context")) + print(f"workflow: {wf.workflow_id}") + + # 1) First gate — first-call invariants + first = await client.step_gate( + wf.workflow_id, + "step-1", + StepGateRequest(step_name="first-step", step_type=StepType.TOOL_CALL), + ) + rc = first.retry_context + assert rc is not None, "retry_context missing on first gate" + assert_eq("first gate_count", 1, rc.gate_count) + assert_eq("first completion_count", 0, rc.completion_count) + assert_eq("first prior_completion_status", "none", rc.prior_completion_status) + assert_true("first !prior_output_available", not rc.prior_output_available) + assert_eq( + "first last_decision (first-call invariant)", + first.decision.value, + rc.last_decision.value if hasattr(rc.last_decision, "value") else rc.last_decision, + ) + assert_eq("first FirstAttemptAt == LastAttemptAt", rc.first_attempt_at, rc.last_attempt_at) + print(" first gate invariants ✔") + + # 2) Complete, then re-gate + await client.mark_step_completed( + wf.workflow_id, + "step-1", + MarkStepCompletedRequest(output={"transfer_id": "TXN-py-1", "amount": 500}), + ) + re_gate = await client.step_gate( + wf.workflow_id, + "step-1", + StepGateRequest(step_type=StepType.TOOL_CALL), + ) + rc = re_gate.retry_context + assert rc is not None + assert_eq("re-gate post-complete gate_count", 2, rc.gate_count) + assert_eq("re-gate post-complete completion_count", 1, rc.completion_count) + assert_eq( + "re-gate post-complete prior_completion_status", "completed", rc.prior_completion_status + ) + assert_true("re-gate post-complete prior_output_available", rc.prior_output_available) + assert_true("re-gate post-complete prior_output omitted by default", rc.prior_output is None) + assert_true("re-gate post-complete cached==True", re_gate.cached is True) + print(" re-gate post-complete ✔") + + # 3) Gate on step-2 without completion (agent-crash simulation) + await client.step_gate( + wf.workflow_id, + "step-2", + StepGateRequest(step_name="second-step", step_type=StepType.TOOL_CALL), + ) + re_gate2 = await client.step_gate( + wf.workflow_id, + "step-2", + StepGateRequest(step_type=StepType.TOOL_CALL), + ) + assert re_gate2.retry_context is not None + assert_eq( + "gated_not_completed status", + "gated_not_completed", + re_gate2.retry_context.prior_completion_status, + ) + assert_eq("gated_not_completed completion_count", 0, re_gate2.retry_context.completion_count) + print(" gated_not_completed ✔") + + # 4) include_prior_output=True recovers the payload + with_prior = await client.step_gate( + wf.workflow_id, + "step-1", + StepGateRequest(step_type=StepType.TOOL_CALL), + include_prior_output=True, + ) + assert with_prior.retry_context is not None + assert_true("prior_output populated", with_prior.retry_context.prior_output is not None) + assert_eq( + "prior_output[transfer_id]", + "TXN-py-1", + with_prior.retry_context.prior_output["transfer_id"], + ) + print(" prior_output recovery ✔") + + +async def act2(client: AxonFlow) -> None: + wf = await client.create_workflow(CreateWorkflowRequest(workflow_name="py-sdk-idempotency-key")) + print(f"workflow: {wf.workflow_id}") + + original_key = "payment:wire:py-sdk-invoice-1" + + # 5) Gate with key — retry_context.idempotency_key echoes + first = await client.step_gate( + wf.workflow_id, + "step-1", + StepGateRequest( + step_name="wire", step_type=StepType.TOOL_CALL, idempotency_key=original_key + ), + ) + assert first.retry_context is not None + assert_eq( + "retry_context.idempotency_key echo", original_key, first.retry_context.idempotency_key + ) + print(" key round-trip ✔") + + # 6) Re-gate with different key → IdempotencyKeyMismatchError + try: + await client.step_gate( + wf.workflow_id, + "step-1", + StepGateRequest( + step_type=StepType.TOOL_CALL, idempotency_key="payment:wire:different-2" + ), + ) + fail("expected IdempotencyKeyMismatchError on gate with different key") + except IdempotencyKeyMismatchError as err: + assert_eq("mismatch expected_key", original_key, err.expected_idempotency_key) + assert_eq("mismatch received_key", "payment:wire:different-2", err.received_idempotency_key) + assert_true("mismatch workflow_id populated", err.workflow_id.startswith("wf_")) + assert_eq("mismatch step_id", "step-1", err.step_id) + print(" typed 409 error ✔") + + # 7) Complete with matching key + await client.mark_step_completed( + wf.workflow_id, + "step-1", + MarkStepCompletedRequest(output={"transfer_id": "TXN-K1"}, idempotency_key=original_key), + ) + print(" complete with matching key ✔") + + +async def main() -> None: + endpoint = os.environ.get("AXONFLOW_BASE_URL", "http://localhost:8080") + client_id = must_env("AXONFLOW_CLIENT_ID") + client_secret = must_env("AXONFLOW_CLIENT_SECRET") + + client = AxonFlow( + endpoint=endpoint, + client_id=client_id, + client_secret=client_secret, + ) + + try: + banner("Act 1 — retry_context (Python SDK)") + await act1(client) + + banner("Act 2 — idempotency_key (Python SDK)") + await act2(client) + + banner("All assertions passed ✔") + finally: + await client.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index 1385c47..7036a20 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "axonflow" -version = "6.4.0" +version = "6.5.0" description = "AxonFlow Python SDK - Enterprise AI Governance in 3 Lines of Code" readme = "README.md" license = {text = "MIT"} @@ -148,6 +148,7 @@ ignore = [ "axonflow/adapters/langchain.py" = ["PLC0415", "A002", "BLE001", "S110", "SIM105"] # Lazy import of langchain-core; `input` matches LangChain interface; broad except is intentional for non-fatal audit "axonflow/adapters/tool_wrapper.py" = ["PLC0415", "A002", "TRY301"] # Lazy import of langchain-core; `input` matches BaseTool interface; raise in _run_coro_sync try block "examples/**/*.py" = ["T201", "ANN", "S106", "ERA001", "BLE001", "PLC0415", "F541"] +"examples/wcp_retry_idempotency.py" = ["T201", "ANN", "S101", "S106", "ERA001", "BLE001", "PLC0415", "F541", "E501"] # E2E validation script: bare asserts + long assertion strings are intentional "scripts/**/*.py" = ["T201", "ANN", "S106", "BLE001", "UP045", "DTZ005", "PTH123"] [tool.ruff.lint.isort] diff --git a/tests/test_retry_context_idempotency.py b/tests/test_retry_context_idempotency.py new file mode 100644 index 0000000..308ecd1 --- /dev/null +++ b/tests/test_retry_context_idempotency.py @@ -0,0 +1,297 @@ +"""Unit tests for WCP retry_context + idempotency_key (#1673 Phase 1 + 2). + +Mirrors the six shapes from §6.8 of WCP_RETRY_IDEMPOTENCY_WIRE_CONTRACT.md. +""" + +from __future__ import annotations + +import pytest +from pytest_httpx import HTTPXMock + +from axonflow import AxonFlow, IdempotencyKeyMismatchError +from axonflow.workflow import ( + MarkStepCompletedRequest, + PriorCompletionStatus, + StepGateRequest, + StepType, +) + +GATE_URL = "https://test.axonflow.com/api/v1/workflows/wf_1/steps/step_1/gate" +COMPLETE_URL = "https://test.axonflow.com/api/v1/workflows/wf_1/steps/step_1/complete" + + +# --- Test a: first-call shape ------------------------------------------------ + + +@pytest.mark.asyncio +async def test_first_call_retry_context_shape(client: AxonFlow, httpx_mock: HTTPXMock) -> None: + now = "2026-04-21T15:30:45.123Z" + httpx_mock.add_response( + url=GATE_URL, + json={ + "decision": "allow", + "step_id": "step_1", + "cached": False, + "decision_source": "fresh", + "retry_context": { + "gate_count": 1, + "completion_count": 0, + "prior_completion_status": "none", + "prior_output_available": False, + "prior_output": None, + "prior_completion_at": None, + "first_attempt_at": now, + "last_attempt_at": now, + "last_decision": "allow", + "idempotency_key": "", + }, + }, + ) + + gate = await client.step_gate("wf_1", "step_1", StepGateRequest(step_type=StepType.LLM_CALL)) + rc = gate.retry_context + assert rc is not None + assert rc.gate_count == 1 + assert rc.completion_count == 0 + assert rc.prior_completion_status is PriorCompletionStatus.NONE + assert rc.prior_output_available is False + assert rc.prior_output is None + assert rc.prior_completion_at is None + assert rc.first_attempt_at == rc.last_attempt_at + assert rc.last_decision.value == gate.decision.value + assert rc.idempotency_key == "" + + +# --- Test b: second-call after completion ------------------------------------ + + +@pytest.mark.asyncio +async def test_second_call_after_completion(client: AxonFlow, httpx_mock: HTTPXMock) -> None: + httpx_mock.add_response( + url=GATE_URL, + json={ + "decision": "allow", + "step_id": "step_1", + "retry_context": { + "gate_count": 2, + "completion_count": 1, + "prior_completion_status": "completed", + "prior_output_available": True, + "prior_output": None, + "prior_completion_at": "2026-04-21T15:30:30.000Z", + "first_attempt_at": "2026-04-21T15:30:00.000Z", + "last_attempt_at": "2026-04-21T15:31:00.000Z", + "last_decision": "allow", + "idempotency_key": "", + }, + }, + ) + + gate = await client.step_gate("wf_1", "step_1", StepGateRequest(step_type=StepType.LLM_CALL)) + rc = gate.retry_context + assert rc is not None + assert rc.gate_count == 2 + assert rc.completion_count == 1 + assert rc.prior_completion_status is PriorCompletionStatus.COMPLETED + assert rc.prior_output_available is True + assert rc.prior_completion_at is not None + assert rc.first_attempt_at != rc.last_attempt_at + + +# --- Test c: second-call without completion ---------------------------------- + + +@pytest.mark.asyncio +async def test_second_call_without_completion(client: AxonFlow, httpx_mock: HTTPXMock) -> None: + httpx_mock.add_response( + url=GATE_URL, + json={ + "decision": "allow", + "step_id": "step_1", + "retry_context": { + "gate_count": 2, + "completion_count": 0, + "prior_completion_status": "gated_not_completed", + "prior_output_available": False, + "prior_output": None, + "prior_completion_at": None, + "first_attempt_at": "2026-04-21T15:30:00.000Z", + "last_attempt_at": "2026-04-21T15:31:00.000Z", + "last_decision": "allow", + "idempotency_key": "", + }, + }, + ) + + gate = await client.step_gate("wf_1", "step_1", StepGateRequest(step_type=StepType.LLM_CALL)) + rc = gate.retry_context + assert rc is not None + assert rc.gate_count == 2 + assert rc.completion_count == 0 + assert rc.prior_completion_status is PriorCompletionStatus.GATED_NOT_COMPLETED + assert rc.prior_output_available is False + assert rc.prior_completion_at is None + + +# --- Test d: include_prior_output=True --------------------------------------- + + +@pytest.mark.asyncio +async def test_include_prior_output_sends_query_param_and_populates_output( + client: AxonFlow, httpx_mock: HTTPXMock +) -> None: + prior_output = {"result": "ok", "score": 0.92} + httpx_mock.add_response( + url=GATE_URL + "?include_prior_output=true", + json={ + "decision": "allow", + "step_id": "step_1", + "retry_context": { + "gate_count": 2, + "completion_count": 1, + "prior_completion_status": "completed", + "prior_output_available": True, + "prior_output": prior_output, + "prior_completion_at": "2026-04-21T15:30:30.000Z", + "first_attempt_at": "2026-04-21T15:30:00.000Z", + "last_attempt_at": "2026-04-21T15:31:00.000Z", + "last_decision": "allow", + "idempotency_key": "", + }, + }, + ) + + gate = await client.step_gate( + "wf_1", + "step_1", + StepGateRequest(step_type=StepType.LLM_CALL), + include_prior_output=True, + ) + assert gate.retry_context is not None + assert gate.retry_context.prior_output == prior_output + + # Confirm the query param was actually sent + requests = httpx_mock.get_requests() + assert any("include_prior_output=true" in str(r.url) for r in requests), ( + f"Expected include_prior_output=true on query string, got {[str(r.url) for r in requests]}" + ) + + +# --- Test e: idempotency_key round-trip -------------------------------------- + + +@pytest.mark.asyncio +async def test_idempotency_key_round_trip(client: AxonFlow, httpx_mock: HTTPXMock) -> None: + key = "payment:wire:acct4471:invoice-7721" + httpx_mock.add_response( + url=GATE_URL, + json={ + "decision": "allow", + "step_id": "step_1", + "retry_context": { + "gate_count": 1, + "completion_count": 0, + "prior_completion_status": "none", + "prior_output_available": False, + "prior_output": None, + "prior_completion_at": None, + "first_attempt_at": "2026-04-21T15:30:00.000Z", + "last_attempt_at": "2026-04-21T15:30:00.000Z", + "last_decision": "allow", + "idempotency_key": key, + }, + }, + ) + httpx_mock.add_response( + url=COMPLETE_URL, + status_code=204, + ) + + gate = await client.step_gate( + "wf_1", + "step_1", + StepGateRequest(step_type=StepType.LLM_CALL, idempotency_key=key), + ) + assert gate.retry_context is not None + assert gate.retry_context.idempotency_key == key + + await client.mark_step_completed( + "wf_1", + "step_1", + MarkStepCompletedRequest(output={"ok": True}, idempotency_key=key), + ) + + requests = httpx_mock.get_requests() + import json as _json + + gate_body = _json.loads(requests[0].content) + complete_body = _json.loads(requests[1].content) + assert gate_body["idempotency_key"] == key + assert complete_body["idempotency_key"] == key + + +# --- Test f: 409 IDEMPOTENCY_KEY_MISMATCH ------------------------------------ + + +@pytest.mark.asyncio +async def test_mark_step_completed_409_raises_typed_error( + client: AxonFlow, httpx_mock: HTTPXMock +) -> None: + httpx_mock.add_response( + url=COMPLETE_URL, + status_code=409, + json={ + "error": { + "code": "IDEMPOTENCY_KEY_MISMATCH", + "message": "idempotency_key on complete does not match the key recorded on gate", + "details": { + "workflow_id": "wf_1", + "step_id": "step_1", + "expected_idempotency_key": "a", + "received_idempotency_key": "b", + }, + } + }, + ) + + with pytest.raises(IdempotencyKeyMismatchError) as excinfo: + await client.mark_step_completed( + "wf_1", + "step_1", + MarkStepCompletedRequest(idempotency_key="b"), + ) + + err = excinfo.value + assert err.workflow_id == "wf_1" + assert err.step_id == "step_1" + assert err.expected_idempotency_key == "a" + assert err.received_idempotency_key == "b" + + +@pytest.mark.asyncio +async def test_step_gate_409_raises_typed_error(client: AxonFlow, httpx_mock: HTTPXMock) -> None: + httpx_mock.add_response( + url=GATE_URL, + status_code=409, + json={ + "error": { + "code": "IDEMPOTENCY_KEY_MISMATCH", + "message": "mismatch", + "details": { + "workflow_id": "wf_1", + "step_id": "step_1", + "expected_idempotency_key": "a", + "received_idempotency_key": "b", + }, + } + }, + ) + + with pytest.raises(IdempotencyKeyMismatchError) as excinfo: + await client.step_gate( + "wf_1", + "step_1", + StepGateRequest(step_type=StepType.LLM_CALL, idempotency_key="b"), + ) + assert excinfo.value.expected_idempotency_key == "a" + assert excinfo.value.received_idempotency_key == "b"