LongRunningAgentServer: Durable prose-recovery + always-rotate#425
LongRunningAgentServer: Durable prose-recovery + always-rotate#425
Conversation
9354a2b to
62b5380
Compare
…tate Alternative to PR #416 — same durable-execution capability (heartbeat, CAS claim, retrieve endpoints, conversation_id rotation) but with a prose-recovery resume mechanism instead of structured carry-forward. Forked from main, scoped tightly to what prose recovery actually requires. All defenses live in ONE place (the bridge), SDK-agnostic. What's in this PR (vs main) =========================== Durable execution infrastructure (mirrors #416): - long_running/db.py — durability column migrations - long_running/models.py — durability columns on Response / Message - long_running/repository.py — claim_stale_response (CAS), heartbeat, ResponseInfo, get_messages tagged with attempt_number - long_running/settings.py — heartbeat interval/threshold - long_running/server.py — heartbeat, _try_claim_and_resume, _rotate_conversation_id, _inject_conversation_id, retrieve endpoint with stream resume, /_debug/kill_task, [durable] lifecycle logs Resume mechanism (DIFFERS from #416): - _build_prose_recovery_message: walks prior attempt's events, returns one Responses-API user message containing a flat prose narrative ("Called f(...) and got: ...", "Called g(...) — interrupted before result", "Was generating: ..."). Replaces #416's structured walker (~600 LOC → ~80 LOC). - _try_claim_and_resume appends the prose message to original_request. input[] instead of carrying forward structured tool pairs + synthetic [INTERRUPTED] outputs. - response.resumed SSE sentinel includes the rotated conversation_id so cooperating clients can use the rotated session for subsequent turns. SDK-agnostic UI-echo dedup (replaces #416's per-SDK adapter wrappers): - _trim_echoed_history: if request.input contains an assistant message, the client is echoing prior conversation history → trim to the latest user message. The SDK's own session/checkpointer storage is the source of truth for prior turns; the echo is redundant. - Wired into _handle_invocations_request, runs on every fresh POST. Resume-built input has only role:user items so the trim is a no-op. - Single ~25 LOC function in the bridge replaces the per-SDK dedups templates used to carry (OpenAI session.get_items() comparison; LangGraph agent.aget_state() comparison). What's NOT in this PR (vs #416) ================================ - integrations/openai/.../session.py: identical to main. No AsyncDatabricksSession.get_items wrap. - integrations/langchain/.../checkpoint.py: identical to main. No _build_tool_resume_repair, no _repair_loaded_checkpoint_tuple, no aget_tuple/get_tuple overrides. - src/databricks_ai_bridge/tool_repair.py: not added. No sanitize_tool_items, no _sanitize_request_input, no auto_sanitize_input setting. Always-rotate makes the SDK wrappers unnecessary (rotated session is clean). The trim hook makes per-SDK dedup unnecessary (handles UI echo at the request boundary, before the handler runs). All dedup logic now lives in one SDK-agnostic place. The trade ========= | Axis | #416 (structured) | This PR (prose) | |---|---|---| | Files vs main | 12 | 7 | | Total LOC vs main | +2293 | +1709 | | Resume builder LOC | ~600 | ~80 | | SDK adapter wrappers | required (~200 LOC) | none | | Input sanitizer | yes (~150 LOC) | none | | Per-template dedup hooks | yes (per SDK) | none (SDK-agnostic in bridge) | | Resume input shape | structured pairs + [INTERRUPTED] | single prose user message | | Cache prefix on resume | ~95% hit | ~0% hit beyond [system] | | Cache prefix on subsequent turns | natural structured prefix | [system]+[prose] for conversation lifetime | | Multi-turn fix | read-time repair on aget_tuple/get_items | always-rotate redirect via chatbot alias | | New infra needed | none | chatbot alias capture from response.resumed | Trade-offs detailed in app-templates/durable-recovery-recommendation.md. Tests: 118 pass (91 long_running_server + 27 long_running_db). Status ====== POC for review against #416. Not intended to merge unless empirical data on cache cost and tool-use quality justifies the trade vs #416. Companion PR: app-templates#204. Co-authored-by: Isaac
62b5380 to
8753772
Compare
Standalone documentation of the durable-resume design — for future maintainers to understand the system without reading the PR diff. Sections: - §1 Module purpose, capabilities, guarantees, non-goals - §2 Four customer journeys with sequence diagrams (author writes agent, pod crashes mid-tool, subsequent turn after crash, multi-pod contention) - §3 Architecture: storage layout (ER diagram), key flows (flowchart), prose-recovery construction (flowchart), UI-echo dedup (flowchart), heartbeat tuning, CAS atomicity (sequence diagram) - §4 Author-side requirements: what's invisible, chat-UI alias cooperation, exposed settings - §5 TaskFlow migration mapping (today → TaskFlow primitives, what stays, what gets deleted, sequencing) Six mermaid diagrams: 4 sequence diagrams, 1 ER, 4 flowcharts. Co-authored-by: Isaac
bbqiu
left a comment
There was a problem hiding this comment.
okay i left a few comments, but i think we should revisit the overall appraoch
it seems like there are lots of little hacks/cases that we added for compatibility with each of hte specific langgraph / openai agents examples that we have. taking a step back, i think the server should be responsible for:
- heartbeat per response that is running
- loop that scans for stale heartbeats (should have some random delay + jitter to decrease multiplle readers of the db at once)
- resumption logic that rotates the conversation ID + summarizes existing things
a. for followup messages after rotation, the dedupe should be in agent logic. this case of history from client + history from internal agent history should be handled
|
|
||
| class Message(Base): | ||
| """Stream events and output items for a response.""" | ||
| """Stream events and output items for a response. |
There was a problem hiding this comment.
note for myself to come back later
| } | ||
|
|
||
|
|
||
| def _trim_echoed_history(items: list[Any]) -> list[Any]: |
There was a problem hiding this comment.
this deduping should already happen within the agent right? this doesn't belong here
this is not very diff from client submitting more info than what is already in the session thread
There was a problem hiding this comment.
Did some analysis with claude on why this is needed. I agree that this might not be the best place for this, happy to discuss if we want to either move this into the Openai, Langgraph wrappers or the app templates themselves.
agent-openai-advanced/agent_server/utils.py:189does have adeduplicate_inputfunction — but its heuristic (len(session_items) >= len(messages) - 1) breaks under always-rotate. The OpenAI SDK's owndeduplicate_input_items_preferring_latest(agents/run_internal/items.py:280) does dedup tool calls bycall_id, but_dedupe_key(line 224) explicitly returnsNonefor any item with arolefield — so user/assistant message echo isn't covered. LangGraph'sadd_messagesdedupes by id, but MLflow'sresponses_to_ccdoesn't preserve IDs (mlflow/types/responses.py:315-383), so cross-boundary dedup never fires there either.
There was a problem hiding this comment.
After discussion on Slack, we decided to move the dedup logic into the app-templates instead!
| return trimmed | ||
|
|
||
|
|
||
| def _rotate_conversation_id( |
There was a problem hiding this comment.
i think we can simplify this to create a new UUID for the convo thread. are tehre benefits to keeping the naming scheme?
There was a problem hiding this comment.
We have some form of debug readability with this. Say the user looks into their conv store and sees the json dump + our injected system prompt, they might be confused as to where this came from. The id having a attempt-N part to it helps provide some visibility into this.
Per review on PR #425. Skipping #3 (trim) and #4 (conv_id naming) — addressed separately. #1 Drop owner_pod_id; ownership via heartbeat CAS on attempt_number - Remove owner_pod_id column from Response model - heartbeat_response(response_id, expected_attempt_number) CAS-checks the attempt_number column. If a heartbeat write returns 0 rows, the prior owner has been bumped by another pod's claim and the heartbeat task knows to stop. - _heartbeat() context manager takes attempt_number; passes it through from _run_background_stream / _run_background_invoke. - claim_stale_response() no longer takes a pod parameter. - _POD_LOG_ID retained for log-line identity only (not stored in DB). #2 Simplify prose recovery to json.dumps the events array - _build_prose_recovery_message: was ~110 LOC structural walker (function_call/output pairs, narrative messages, partial-text reassembly). Now ~15 LOC: filter events by prior_attempt_number, json.dumps them, wrap in a directive prompt asking the model to figure out what's done vs interrupted. #5 Drop _inject_conversation_id - The function was defensive injection of response_id into context.conversation_id when no client anchor was supplied. With rotation handling resume, and templates / chatbot consistently setting conv_id, the injection was redundant. Top-level review: proactive stale-scan loop with jitter - New _stale_response_scanner_loop: every ~30s ± 50% jitter, queries responses for in_progress rows with stale heartbeats and tries to claim+resume them. The proactive counterpart to lazy-on-GET claim; ensures crashed responses get recovered even if no client polls. - find_stale_response_ids repository function with LIMIT 50. - Spawned in the FastAPI lifespan alongside init_db; cancelled on shutdown. - Settings: stale_scan_interval_seconds=30.0, stale_scan_jitter_fraction=0.5. #6 Document /_debug/kill_task in AGENTS.md - New §4.4 explaining the test-only debug endpoint, env-var gating, what state it leaves the row in. AGENTS.md updates: - ER diagram: drop owner_pod_id, annotate attempt_number as CAS guard. - New §3.5 documenting the proactive scanner with mermaid flowchart. - §4.3 includes new scanner settings. Tests: 110 pass. Ruff/format/ty all clean. Co-authored-by: Isaac
The endpoint was gated by env-var check at route-registration time:
if os.getenv("LONG_RUNNING_ENABLE_DEBUG_KILL") == "1":
@self.app.post("/_debug/kill_task/{response_id}")
This worked locally but failed on Databricks Apps where the env var
appears in os.environ after the FastAPI app object is built. The
endpoint was never registered, returning 404 even with the env var
properly set in the deployment.
Move the env check inside the handler:
@self.app.post("/_debug/kill_task/{response_id}")
async def _debug_kill_task(response_id: str):
if os.getenv("LONG_RUNNING_ENABLE_DEBUG_KILL") != "1":
raise HTTPException(404, "Debug kill endpoint is disabled.")
...
Route is always registered; env var is checked per-request. Same security
posture (404 when disabled), works regardless of when env vars become
visible to the process.
Per design discussion with Bryan offline. Echo dedup is an agent-layer concern (the agent owns its SDK session/checkpointer, knows what's already persisted, decides how to combine that with caller input). The bridge should be minimal: heartbeat, scan loop, CAS claim, conv_id rotation, prose recovery — no manipulation of request.input shape. Removed: - _trim_echoed_history function (was ~30 LOC at module top of server.py) - Both call sites in _handle_invocations_request (non-background path) and _handle_background_request (background+DB path post-storage) - TestTrimEchoedHistory test class Templates now do dedup themselves (separate PR / templates branch): - agent-openai-advanced/utils.py:deduplicate_input checks session items - agent-langgraph-advanced/agent.py:stream_handler probes aget_state Updated AGENTS.md §3.4 to reflect the new placement; removed §3.4 mermaid diagram and the bullet referencing _trim_echoed_history in §5. Tests: 104 pass (was 110; 6 trim-specific tests removed). Co-authored-by: Isaac
Updates AGENTS.md to describe only the current design: - §1 guarantees: prose recovery is json.dumps(events) + directive (not narrative) - CUJ 2 sequence diagram: heartbeat is implicit-ownership CAS on attempt_number, no owner_pod_id mentions - CUJ 4 multi-pod contention: same - §3.3 resume input flowchart: simplified to filter+json.dumps shape - §3.7 (was duplicated 3.6): claim atomicity diagram updated to match the actual SQL (no owner_pod_id column) - §5 TaskFlow mapping: removed reference to deleted _inject_conversation_id - Quick reference: dropped _trim_echoed_history and _inject_conversation_id; added _stale_response_scanner_loop; pointed UI-echo dedup to templates No new content, just trimming stale references to functions / columns that have been removed.
| stream_event=error_event, | ||
| attempt_number=attempt, | ||
| ) | ||
| await update_response_status(response_id, "failed") |
There was a problem hiding this comment.
(nit for an edge case) whenever we write terminal status to the db, we should check that the current attempt number == the attempt number in the db (meaning this pod still owns the current response)
There was a problem hiding this comment.
claude's comment w/ more details:
## Guard terminal status writes on `attempt_number`
Every place we write a terminal status (`completed` / `failed`) should verify this pod still owns the run by gating on the `attempt_number` the task was started with. Without this guard, a pod whose heartbeat stalled and lost ownership via `claim_stale_response` can clobber the row's status while another pod is mid-resume — pollers and streamers stop early on a stale `completed`/`failed`.
**Sites that need the guard:**
- `_do_background_stream` final `update_response_status(response_id, "completed")` (`server.py:937`)
- `_do_background_invoke` final `update_response_status(response_id, "completed")` (`server.py:1018`)
- `_task_scope` error cleanup `update_response_status(response_id, "failed")` (`server.py:821`)
- `_deferred_mark_failed` `update_response_status(response_id, "failed")` (`server.py:94`)
- Pre-flight `update_response_status(response_id, "failed")` when no stream/invoke fn is registered (`server.py:871`, `:978`)
**Shape:**
```python
async def update_response_status(
response_id: str,
status: str,
*,
expected_attempt_number: int | None = None,
expected_current_status: str | None = None,
) -> bool:
stmt = update(Response).where(Response.response_id == response_id)
if expected_attempt_number is not None:
stmt = stmt.where(Response.attempt_number == expected_attempt_number)
if expected_current_status is not None:
stmt = stmt.where(Response.status == expected_current_status)
...
Thread attempt_number through to every callsite. It's already in scope inside _run_background_stream / _run_background_invoke; _task_scope and _deferred_mark_failed need it passed in (currently they only get response_id).
A False return is the caller's signal "you no longer own this run" — log it and exit cleanly, don't escalate to an error.
Address Bryan's nit (PR #425, comment PRRC_kwDONA8Fvc685ROc): when writing terminal status to the DB, check that the current attempt_number matches the one this pod thinks it owns — prevents a stale background task (deferred-fail timer, or post-cleanup) from clobbering an in-progress state that another pod has already claimed for resume. Targeted at the truly-racy paths only (sleep + write): - _deferred_mark_failed: takes new owning_attempt_number kwarg. Reads current attempt before writing; skips both the error event append AND the status update if ownership has moved. Status update also passes expected_attempt_number to the repository for belt-and- suspenders CAS at the SQL layer. - _task_scope: takes new attempt_number kwarg. Inline error-cleanup passes it as expected_attempt_number; the two _deferred_mark_failed fallback paths pass it as owning_attempt_number. - _run_background_stream / _run_background_invoke wire their attempt_number through to _task_scope. Repository: - update_response_status gains optional expected_attempt_number kwarg that adds `WHERE attempt_number = :expected` to the UPDATE. The success-path writes (status=completed in _do_background_stream / _do_background_invoke) and the stuck-row force-fail in _handle_retrieve_request are intentionally not CAS'd here — they happen synchronously after the handler exits while the heartbeat is still alive, so the race window is tiny. Can revisit if observed in prod. Tests: 105 pass (was 104 — added ownership-changed test for _deferred_mark_failed; updated 2 existing tests for the new kwarg).
CI's `ruff format --check` flagged three calls in test_long_running_server.py that fit on one line but my local formatter had wrapped onto multiple lines (different ruff config / line-length resolution between local and CI). Collapse them.
bbqiu
left a comment
There was a problem hiding this comment.
lgtm after fixing the last nit
| stream_event=error_event, | ||
| attempt_number=attempt, | ||
| ) | ||
| await update_response_status(response_id, "failed") |
There was a problem hiding this comment.
claude's comment w/ more details:
## Guard terminal status writes on `attempt_number`
Every place we write a terminal status (`completed` / `failed`) should verify this pod still owns the run by gating on the `attempt_number` the task was started with. Without this guard, a pod whose heartbeat stalled and lost ownership via `claim_stale_response` can clobber the row's status while another pod is mid-resume — pollers and streamers stop early on a stale `completed`/`failed`.
**Sites that need the guard:**
- `_do_background_stream` final `update_response_status(response_id, "completed")` (`server.py:937`)
- `_do_background_invoke` final `update_response_status(response_id, "completed")` (`server.py:1018`)
- `_task_scope` error cleanup `update_response_status(response_id, "failed")` (`server.py:821`)
- `_deferred_mark_failed` `update_response_status(response_id, "failed")` (`server.py:94`)
- Pre-flight `update_response_status(response_id, "failed")` when no stream/invoke fn is registered (`server.py:871`, `:978`)
**Shape:**
```python
async def update_response_status(
response_id: str,
status: str,
*,
expected_attempt_number: int | None = None,
expected_current_status: str | None = None,
) -> bool:
stmt = update(Response).where(Response.response_id == response_id)
if expected_attempt_number is not None:
stmt = stmt.where(Response.attempt_number == expected_attempt_number)
if expected_current_status is not None:
stmt = stmt.where(Response.status == expected_current_status)
...
Thread attempt_number through to every callsite. It's already in scope inside _run_background_stream / _run_background_invoke; _task_scope and _deferred_mark_failed need it passed in (currently they only get response_id).
A False return is the caller's signal "you no longer own this run" — log it and exit cleanly, don't escalate to an error.
| attempt_number=attempt_number, | ||
| ) | ||
|
|
||
| await update_response_status(response_id, "completed") |
There was a problem hiding this comment.
should we only do this with a matching attempt number?
Summary
Adds durable, crash-resumable execution to MLflow
ResponsesAgenthandlers viaLongRunningAgentServer. A pod that dies mid-stream is seamlessly recovered by another pod via prose recovery — the resumed attempt receives a single user message containing the prior attempt's stream events as JSON plus a directive to figure out what's done and continue.The bridge is minimal: heartbeat, proactive stale scan, atomic CAS claim, conversation_id rotation, prose-recovery message construction. Echo dedup (when the chat client echoes the full conversation history) is the agent handler's responsibility, since the agent owns its SDK session/checkpointer state.
Full design documentation in
src/databricks_ai_bridge/long_running/AGENTS.mdincluding 4 customer journeys, sequence diagrams, ER diagram, and the planned TaskFlow migration mapping.High-level flow
flowchart TD A[Client: POST /responses<br/>background:true] --> B[Bridge: store FULL original_request,<br/>spawn handler, heartbeat every 3s] B --> C{Run completes?} C -->|yes| D[status=completed<br/>events streamable via GET] C -->|pod crashes| E[heartbeat stops] E -->|stale > 10s| F1{stale detected by:} F1 --> F2[client GET /responses/{id}] F1 --> F3[scanner loop ~30s ± 50% jitter] F2 --> G[CAS claim attempt+=1<br/>build prose recovery msg<br/>rotate conv_id ::attempt-N] F3 --> G G --> H[Re-invoke handler on<br/>fresh rotated SDK session] H --> C G -.->|response.resumed sentinel<br/>carries rotated conv_id| I[Chatbot captures rotation,<br/>uses rotated id on next turn<br/>→ lands on clean rotated session]What the bridge does
Heartbeat + atomic CAS claim. The owning pod writes
heartbeat_atevery 3s. The heartbeat write is CAS-checked onattempt_number— if another pod has bumped the row toattempt_number = N+1, the prior pod's heartbeat at attempt N returns rowcount=0 and stops. Ownership is implicit: the pod that last successfully heartbeats at the current attempt is the de facto owner. There is noowner_pod_idcolumn.When
GET /responses/{id}(or the proactive scanner) sees the row's heartbeat is older than 10s, the new pod atomically claims via:Postgres row locking ensures exactly one pod wins.
Prose recovery. On claim, the new pod filters the message log to the prior attempt's stream events,
json.dumps-serializes them, and wraps them in a single user message:The model reads this and decides what to redo, what to skip. SDK-agnostic — no provider-specific tool-pair structure, no synthetic tool events, no per-SDK adapter code.
Always-rotate
conversation_id. On resume,context.conversation_idis rotated to{base}::attempt-Nso the SDK opens a fresh thread/session for the resumed attempt. The original session, which may carry orphantool_use/ partial checkpoint state from the crashed attempt, is never read again.The rotated value is surfaced in the SSE
response.resumedsentinel; cooperating chat clients (see app-templates#204) capture it and use the rotated session for subsequent turns.Proactive stale-scan loop. Each pod runs a background scanner every ~30s (± 50% jitter) that queries for in-progress responses with stale heartbeats and tries to claim+resume them. Crashed responses get recovered even when no client polls. CAS-claim semantics ensure only one pod wins on contention.
Stream resume.
GET /responses/{id}?stream=true&starting_after=Nreplays the message log past the cursor; aresponse.resumedsentinel marks attempt boundaries.What is NOT in the bridge
integrations/openai/.../session.pyandintegrations/langchain/.../checkpoint.pyare byte-identical to main. Noaget_tuple/get_itemsoverrides.request.inputshape. When the chat client echoes the full prior conversation, the agent handler is responsible for deduping its input against the SDK's session/checkpointer state. See companion PR app-templates#204 for the per-template ~10-line implementations.Files
LongRunningAgentServer(heartbeat, claim, retrieve, rotation, scanner)long_running/server.pylong_running/server.py::_build_prose_recovery_messagelong_running/repository.pylong_running/db.py,models.pylong_running/settings.pylong_running/AGENTS.mdtests/databricks_ai_bridge/test_long_running_*.pySettings
heartbeat_interval_secondsheartbeat_stale_threshold_secondstask_timeout_secondspoll_interval_secondsstale_scan_interval_secondsstale_scan_jitter_fractionTest plan
tests/databricks_ai_bridge/test_long_running_server.py,test_long_running_db.py)ruff check,ruff format --check,ty checkcleandhruv-lg-adv-durable-prose(LangGraph + Claude Sonnet 4.5) — multi-tool kill mid-deep_research + multi-turn ✅dhruv-oai-adv-durable-prose(OpenAI Agents SDK + GPT-5) — same matrix ✅dhruv-oai-cl-durable-prose(OpenAI Agents SDK + Claude Sonnet 4.5) — same matrix ✅Companion PR
app-templates#204 — chatbot proxy alias capture (cooperation for cross-turn rotation) + per-template UI-echo dedup.
Known follow-ups (non-blocking)
DELETE /responses/{id}cancellation (currently 501).LISTEN/NOTIFYreplacing the 1s poll loop in stream retrieve.heartbeat_miss_count,claim_latency_seconds,attempts_per_response,stale_scan_iterations.See AGENTS.md §5 for the planned TaskFlow migration —
LongRunningAgentServer's HTTP surface and@invoke()/@stream()contract stay the same when the engine internals swap.Text.Only.-.Prose.mov
Tool.Calling.Multiturn.-.Prose.mov