Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
abeef28
agent-*-advanced: wire up durable-resume branch of databricks-ai-bridge
dhruv0811 Apr 16, 2026
83a8e7e
Wire UI through LongRunningAgentServer's background+resume contract
dhruv0811 Apr 20, 2026
fa550af
Route AI SDK through Express /invocations proxy so background-rewrite…
dhruv0811 Apr 20, 2026
911f18b
Log response_id on first SSE event from Express proxy
dhruv0811 Apr 20, 2026
399ffde
Match API_PROXY + AGENT_BACKEND_URL in app.yaml too
dhruv0811 Apr 20, 2026
04e6b1b
start_app: point API_PROXY at the Express proxy, keep AGENT_BACKEND_U…
dhruv0811 Apr 20, 2026
948f7b4
Proxy: accept response_id from top-level, nested response.id, or id= …
dhruv0811 Apr 20, 2026
cea0508
Proxy: log upstream close + each resume-fetch attempt + final stream …
dhruv0811 Apr 20, 2026
cafd07d
Surface databricks_ai_bridge [durable] INFO logs into apps output
dhruv0811 Apr 20, 2026
4cbc677
Two-bubble UX on resume + match durable-resume wiring across templates
dhruv0811 Apr 20, 2026
a9c94c4
Heal orphan tool_calls in the OpenAI Session on every turn
dhruv0811 Apr 20, 2026
7f15d2f
Sanitize OpenAI session: dedupe + inject synthetic outputs in-place
dhruv0811 Apr 20, 2026
61dcfc0
Replace interrupted text with attempt 2's on durable resume
dhruv0811 Apr 20, 2026
75dae5d
Add debug logs for durable-resume data-resumed event propagation
dhruv0811 Apr 20, 2026
47a063f
Catch-all log in onData to trace which data parts reach client
dhruv0811 Apr 20, 2026
c27c016
Wipe text in-place on data-resumed instead of removing the part
dhruv0811 Apr 20, 2026
bfd8f6e
Post-stream truncate as belt-and-suspenders for durable-resume text wipe
dhruv0811 Apr 20, 2026
931ab0f
Post-stream truncate must create new part + message refs
dhruv0811 Apr 20, 2026
412e14f
Mid-stream text replacement via render-time slice in Messages
dhruv0811 Apr 20, 2026
89f096b
Merge remote-tracking branch 'origin/main' into dhruv0811/durable-exe…
dhruv0811 Apr 20, 2026
b17eec8
Remove debug console.logs from durable-resume UI path
dhruv0811 Apr 20, 2026
20f87cb
Use library-side durable-resume repair helpers
dhruv0811 Apr 21, 2026
0374ff4
Simplify: langgraph repair via middleware, UI minimal reset on resume
dhruv0811 Apr 21, 2026
e9b4064
debug: log response.resumed detection in chat.ts onChunk
dhruv0811 Apr 21, 2026
337c39f
debug: log every dataPart in chat.tsx onData to diagnose UI drop
dhruv0811 Apr 21, 2026
d144ec0
debug: log setMessages wipe details on data-resumed
dhruv0811 Apr 21, 2026
467d2ed
UI: render-time slice for durable-resume text wipe
dhruv0811 Apr 21, 2026
695dcfb
Simplify /invocations proxy: drop interruption suffix + writeEvent he…
dhruv0811 Apr 21, 2026
25202c7
Advanced templates: strip user-space durability code
dhruv0811 Apr 22, 2026
a912d8d
debug: disable UI wipe on data-resumed for observing inheritance
dhruv0811 Apr 22, 2026
4421511
Remove UI text-refresh plumbing now that server-side inheritance hand…
dhruv0811 Apr 23, 2026
7683079
Strip app-templates PR to the bare minimum durability surface
dhruv0811 Apr 23, 2026
5f3c507
chat: cap resume attempts per turn, don't reset on each chunk
dhruv0811 Apr 23, 2026
0ddbd60
Openai template: stable — per-type + per-call-id stream id tracking
dhruv0811 Apr 23, 2026
24140b3
Revert template-side durable-resume hardening; bridge fix suffices
dhruv0811 Apr 23, 2026
db9fb45
Refactor /invocations pumpStream into SSE parsing helpers
dhruv0811 Apr 23, 2026
9f7e95d
Default API_PROXY/AGENT_BACKEND_URL in chatbot, drop from advanced yaml
dhruv0811 Apr 23, 2026
ec73bc9
Restore API_PROXY line on advanced yaml files to match main
dhruv0811 Apr 23, 2026
0517e23
Fold getApiProxyUrl into request-context.ts
dhruv0811 Apr 23, 2026
0d30827
Templates: pin to prose-recovery bridge branch + LangGraph thread_id …
dhruv0811 Apr 28, 2026
140399b
Chatbot: capture rotated conversation_id from response.resumed sentinel
dhruv0811 Apr 28, 2026
31d87d6
agent-openai-advanced: trust session as authoritative for cross-turn …
dhruv0811 Apr 28, 2026
1aee3af
agent-langgraph-advanced: trust checkpointer state as authoritative f…
dhruv0811 Apr 28, 2026
dfa14ce
Revert per-template dedup hooks
dhruv0811 Apr 28, 2026
018492f
chatbot: drop non-null assertion in wrapResponseToCaptureRotation
dhruv0811 Apr 28, 2026
9d4af20
Revert agent_langgraph_memory from _MANAGED_SCHEMAS
dhruv0811 Apr 28, 2026
2a5c1cf
Revert "Revert agent_langgraph_memory from _MANAGED_SCHEMAS"
dhruv0811 Apr 28, 2026
8ecf2b7
Move UI-echo dedup into per-template handlers
dhruv0811 Apr 30, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion agent-langgraph-advanced/agent_server/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
logging.getLogger("mlflow.utils.autologging_utils").setLevel(logging.ERROR)
sp_workspace_client = WorkspaceClient()

LLM_ENDPOINT_NAME = "databricks-claude-sonnet-4-5"
LLM_ENDPOINT_NAME = "databricks-gpt-5-2"
LAKEBASE_CONFIG = init_lakebase_config()


Expand Down Expand Up @@ -123,6 +123,20 @@ async def stream_handler(
# For on-behalf-of user authentication, pass get_user_workspace_client() to init_agent.
agent = await init_agent(store=store, checkpointer=checkpointer)

# When the checkpointer already has prior turns for this thread,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this logic to a function that is equivalent to the deduplicate_input in the openai template? Location and usage wise.

# the chat client's full-history echo is redundant — `add_messages`
# would append duplicates (it dedupes by `id`, but MLflow's
# `responses_to_cc` doesn't preserve IDs, so dedup never fires).
# Forward only the latest user message; the checkpointer prepends
# the rest.
state = await agent.aget_state(config)
if state and state.values.get("messages") and input_state["messages"]:
last_user = next(
(m for m in reversed(input_state["messages"]) if m.get("role") == "user"),
None,
)
input_state["messages"] = [last_user] if last_user else []

async for event in process_agent_astream_events(
agent.astream(input_state, config, stream_mode=["updates", "messages"])
):
Expand Down
10 changes: 10 additions & 0 deletions agent-langgraph-advanced/agent_server/start_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ def transform_stream_event(self, event, response_id):
poll_interval_seconds=float(os.getenv("POLL_INTERVAL_SECONDS", "1.0")),
)

log_level = os.getenv("LOG_LEVEL", "INFO")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a very messy way to propogate logs, can we see if its possible to clean this up? and change the comments to be bridge specific, not durable specific since all bridge logs will be shown with this.

_lvl = getattr(logging, log_level.upper(), logging.INFO)
logging.getLogger("agent_server").setLevel(_lvl)
# Surface [durable] lifecycle logs from LongRunningAgentServer into apps logs.
logging.getLogger("databricks_ai_bridge").setLevel(_lvl)
if not logging.getLogger().handlers:
logging.basicConfig(level=_lvl, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
else:
logging.getLogger().setLevel(_lvl)

# Define the app as a module level variable to enable multiple workers
app = agent_server.app # noqa: F841
setup_mlflow_git_based_version_tracking()
Expand Down
6 changes: 6 additions & 0 deletions agent-langgraph-advanced/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ setup = [
[tool.uv]
default-groups = ["dev", "setup"]

# TEMPORARY: point at the open PR branch while ML-64230 durable-resume
# changes are in review. Revert to the registry release once merged.
[tool.uv.sources]
databricks-ai-bridge = { git = "https://github.com/databricks/databricks-ai-bridge", branch = "dhruv0811/durable-execution-prose-recovery" }
databricks-langchain = { git = "https://github.com/databricks/databricks-ai-bridge", branch = "dhruv0811/durable-execution-prose-recovery", subdirectory = "integrations/langchain" }


[tool.pytest.ini_options]
base_url = "http://localhost:8000"
Expand Down
35 changes: 31 additions & 4 deletions agent-langgraph-advanced/scripts/start_app.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion agent-openai-advanced/agent_server/start_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,18 @@ def transform_stream_event(self, event, response_id):
)

log_level = os.getenv("LOG_LEVEL", "INFO")
logging.getLogger("agent_server").setLevel(getattr(logging, log_level.upper(), logging.INFO))
_lvl = getattr(logging, log_level.upper(), logging.INFO)
logging.getLogger("agent_server").setLevel(_lvl)
# Surface [durable] lifecycle logs from LongRunningAgentServer into apps logs.
# These are INFO-level in databricks_ai_bridge but the library logger defaults
# to WARNING unless the host process sets it explicitly.
logging.getLogger("databricks_ai_bridge").setLevel(_lvl)
# Ensure the root handler actually emits at this level too. uvicorn sets up
# its own handlers for 'uvicorn.*' but leaves root untouched.
if not logging.getLogger().handlers:
logging.basicConfig(level=_lvl, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
else:
logging.getLogger().setLevel(_lvl)

# Wrap the existing lifespan to ensure session tables are created before serving requests
_original_lifespan = agent_server.app.router.lifespan_context
Expand Down
23 changes: 16 additions & 7 deletions agent-openai-advanced/agent_server/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class LakebaseConfig:
autoscaling_endpoint: Optional[str]
autoscaling_project: Optional[str]
autoscaling_branch: Optional[str]
memory_schema: Optional[str] = None

@property
def description(self) -> str:
Expand Down Expand Up @@ -103,13 +104,15 @@ def init_lakebase_config() -> LakebaseConfig:
" Option 3 (provisioned): LAKEBASE_INSTANCE_NAME=<your-instance-name>\n"
)

memory_schema = os.getenv("LAKEBASE_AGENT_MEMORY_SCHEMA") or None

# Priority: endpoint > project+branch > instance_name (mutually exclusive in the library)
if endpoint:
return LakebaseConfig(instance_name=None, autoscaling_endpoint=endpoint, autoscaling_project=None, autoscaling_branch=None)
return LakebaseConfig(instance_name=None, autoscaling_endpoint=endpoint, autoscaling_project=None, autoscaling_branch=None, memory_schema=memory_schema)
elif has_autoscaling:
return LakebaseConfig(instance_name=None, autoscaling_endpoint=None, autoscaling_project=project, autoscaling_branch=branch)
return LakebaseConfig(instance_name=None, autoscaling_endpoint=None, autoscaling_project=project, autoscaling_branch=branch, memory_schema=memory_schema)
else:
return LakebaseConfig(instance_name=resolve_lakebase_instance_name(raw_name), autoscaling_endpoint=None, autoscaling_project=None, autoscaling_branch=None)
return LakebaseConfig(instance_name=resolve_lakebase_instance_name(raw_name), autoscaling_endpoint=None, autoscaling_project=None, autoscaling_branch=None, memory_schema=memory_schema)


def get_lakebase_access_error_message(lakebase_description: str) -> str:
Expand Down Expand Up @@ -187,9 +190,12 @@ async def deduplicate_input(request: ResponsesAgentRequest, session: AsyncDatabr
"""Return the input messages to pass to the Runner, avoiding duplication with session history.

When a client sends the full conversation history AND the session already has
that history persisted, passing everything through would duplicate messages.
If the session already covers the prior turns, only the latest message is needed
since the session will prepend the full history automatically.
that history persisted, passing everything through would duplicate messages
in the LLM call (Runner combines session items + input items, and the OpenAI
SDK's `_dedupe_key` doesn't dedupe role-bearing items — see
`agents/run_internal/items.py:224-250`). If the session already has any
items, the prior turns are persisted there and we only need to forward the
latest user message.
"""
messages = [i.model_dump() for i in request.input]
# Normalize assistant message content from string to structured list format.
Expand All @@ -204,7 +210,10 @@ async def deduplicate_input(request: ResponsesAgentRequest, session: AsyncDatabr
):
msg["content"] = [{"type": "output_text", "text": msg["content"], "annotations": []}]
session_items = await session.get_items()
if len(session_items) >= len(messages) - 1:
# Trust the session as authoritative for prior turns. Forward only the
# latest message (the new user turn). The Runner will prepend session
# history on the LLM call automatically.
if session_items and len(messages) > 1:
return [messages[-1]]
return messages

Expand Down
6 changes: 6 additions & 0 deletions agent-openai-advanced/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ setup = [
[tool.uv]
default-groups = ["dev", "setup"]

# TEMPORARY: point at the open PR branch while ML-64230 durable-resume
# changes are in review. Revert to the registry release once merged.
[tool.uv.sources]
databricks-ai-bridge = { git = "https://github.com/databricks/databricks-ai-bridge", branch = "dhruv0811/durable-execution-prose-recovery" }
databricks-openai = { git = "https://github.com/databricks/databricks-ai-bridge", branch = "dhruv0811/durable-execution-prose-recovery", subdirectory = "integrations/openai" }

[tool.pytest.ini_options]
base_url = "http://localhost:8000"
addopts = "-n 7"
Expand Down
35 changes: 31 additions & 4 deletions agent-openai-advanced/scripts/start_app.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading