app-server: Replay pending item requests on thread/resume#12560
app-server: Replay pending item requests on thread/resume#12560euroelessar merged 14 commits intomainfrom
thread/resume#12560Conversation
d0a1dc7 to
593d7d9
Compare
Replay pending client requests after `thread/resume` and emit resolved notifications when those requests clear so approval/input UI state stays in sync after reconnects and across subscribed clients. Affected RPCs: - `item/commandExecution/requestApproval` - `item/fileChange/requestApproval` - `item/tool/requestUserInput` Motivation: - Resumed clients need to see pending approval/input requests that were already outstanding before the reconnect. - Clients also need an explicit signal when a pending request resolves or is cleared so stale UI can be removed on turn start, completion, or interruption. Implementation notes: - Track pending client requests in `ThreadState` and replay them after `thread/resume` attaches the connection. - Reuse the original JSON-RPC request id for replays and resend prerequisite notifications like `item/started` for pending file change approvals. - Emit `item/commandExecution/approvalResolved`, `item/fileChange/approvalResolved`, and `item/tool/requestUserInputResolved` when pending requests are answered or cleared by lifecycle cleanup. - Update the app-server protocol schema, generated TypeScript bindings, and README docs for the replay/resolution flow. High-level test plan: - Added automated coverage for replaying pending command execution and file change approval requests on `thread/resume`. - Added automated coverage for resolved notifications in command approval, file change approval, request_user_input, turn start, and turn interrupt flows. - Verified schema/docs updates in the relevant protocol and app-server tests. Manual testing: - Not run.
593d7d9 to
60ee51e
Compare
| CommandExecutionApprovalResolved => "item/commandExecution/approvalResolved" (v2::CommandExecutionApprovalResolvedNotification), | ||
| FileChangeOutputDelta => "item/fileChange/outputDelta" (v2::FileChangeOutputDeltaNotification), | ||
| FileChangeApprovalResolved => "item/fileChange/approvalResolved" (v2::FileChangeApprovalResolvedNotification), | ||
| ToolRequestUserInputResolved => "item/tool/requestUserInputResolved" (v2::ToolRequestUserInputResolvedNotification), |
There was a problem hiding this comment.
ooc, why do we need these? can't clients resolve things based on receiving item/completed?
There was a problem hiding this comment.
i.e. server will send:
item/startednotification for file change / command execution / etc.item/commandExecution/requestApprovalrequest- a client responds
item/completednotification
There was a problem hiding this comment.
- we don't have
item/started/item/completedevents for user input at all, as there is no corresponding item, iiuc - in case of file changes, they are synthesized, but not always? e.g. I don't think we never emit
item/completedin cases like turn interruption and such (which could be a bug); also harness is allowed to send multiple requests for the same item, so there is no one-to-one mapping between requests & items for file changes approvals in particular, it's many-to-one
due to points above it's just easier (from protocol perspective & client-side consumption) and more consistent to have dedicated events
as a side note, it should be fine to merge these events into just single one if you prefer, I've only split them to mirror having multiple independent rpcs
There was a problem hiding this comment.
ah ok, we might want to add an item for representing the user input tool call. I had to do that recently for dynamic tool calls: #12732
but you're right, we're going to move to a world soon where there are multiple approvals for a CommandExecution so we can't rely on item/completed for that either. proceed :)
There was a problem hiding this comment.
naming nit: since our requests are like item/commandExecution/requestApproval should we name these notifications item/commandExecution/requestApproval/resolved?
There was a problem hiding this comment.
basically just appending /resolved to the corresponding server method name
| notifications_before_request: &[ServerNotification], | ||
| ) -> bool { | ||
| // Hold the callback map lock until the replay is queued so an already-resolved | ||
| // request cannot be replayed to a resumed client. |
There was a problem hiding this comment.
stresses me out - prefer to make this not needed for correctness, or use try_send
I don't fully understand what race this prevents - even if we send an already-resolved request event that should be ok right? since the client will observe the resolution as well right after (due to ordering guarantee provided by the thread listener task)
There was a problem hiding this comment.
yeah, I agree, we already serialize resolution properly so should be fine, simplified
owenlin0
left a comment
There was a problem hiding this comment.
trying my best to review but it's getting quite hard to follow :(
| connection_ids: Option<&[ConnectionId]>, | ||
| request: ServerRequestPayload, | ||
| thread_id: Option<ThreadId>, | ||
| tracked_request: bool, |
There was a problem hiding this comment.
what makes a request "tracked" or not?
There was a problem hiding this comment.
it's essentially a workaround to avoid closing rpcs for dynamic tool & v1, as I wanted to maintain status quo in their behavior
alternatively could check for the type (but that feels more hackier), or spend more type drilling into them to make sure they are not broken by the change
There was a problem hiding this comment.
bit the bullet and gone through all the remaining server->client requests to add proper handling of abortion into them, so no need for tracked requests anymore
| .await | ||
| } | ||
|
|
||
| pub(crate) async fn send_request_with_server_request( |
There was a problem hiding this comment.
what's the difference between this and send_request?
There was a problem hiding this comment.
this one was returning a request id as well, merged them into one & updated all callsites for simplicity
| outgoing.send_server_notification(notification).await; | ||
| } | ||
|
|
||
| pub(crate) async fn abort_pending_client_requests(outgoing: &ThreadScopedOutgoingMessageSender) { |
There was a problem hiding this comment.
can this be a method on ThreadScopedOutgoingMessageSender?
| } = event; | ||
| match msg { | ||
| EventMsg::TurnStarted(_) => { | ||
| outgoing.abort_pending_client_requests().await; |
There was a problem hiding this comment.
nit: maybe worth a comment on why we need this, still not super clear to me 🤷
| @@ -29,13 +31,24 @@ pub(crate) struct PendingThreadResumeRequest { | |||
| } | |||
|
|
|||
| pub(crate) enum ThreadListenerCommand { | |||
There was a problem hiding this comment.
prob worth a comment on what this (and the variants) are used for
| #[derive(Default, Clone)] | ||
| pub(crate) struct TurnSummary { | ||
| pub(crate) file_change_started: HashSet<String>, | ||
| pub(crate) file_change_started: HashMap<String, Vec<FileUpdateChange>>, |
There was a problem hiding this comment.
curious, why did we need to change it to a hashmap here?
| code: INTERNAL_ERROR_CODE, | ||
| message: "client request resolved because the turn state was changed" | ||
| .to_string(), | ||
| data: Some(serde_json::json!({ "reason": "turnTransition" })), |
There was a problem hiding this comment.
can import the const you defined: TURN_TRANSITION_PENDING_REQUEST_ERROR_REASON
Replay pending client requests after
thread/resumeand emit resolved notifications when those requests clear so approval/input UI state stays in sync after reconnects and across subscribed clients.Affected RPCs:
item/commandExecution/requestApprovalitem/fileChange/requestApprovalitem/tool/requestUserInputMotivation:
Implementation notes:
OutgoingMessageSenderin order to replay them afterthread/resumeattaches the connection, using original request ids.item/commandExecution/approvalResolved,item/fileChange/approvalResolved, anditem/tool/requestUserInputResolvedwhen pending requests are answered or cleared by lifecycle cleanup.High-level test plan:
thread/resume.Manual testing: