Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions app/agent_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1525,6 +1525,59 @@ async def _handle_chat_message(self, payload: Dict):
# Local TUI/CLI message
platform = "CraftBot TUI"

# Direct reply bypass - skip routing LLM when target_session_id is provided
target_session_id = payload.get("target_session_id")
if target_session_id:
logger.info(f"[CHAT] Direct reply to session {target_session_id}")

# Fire the target trigger directly, bypassing routing LLM
fired = await self.triggers.fire(
target_session_id, message=chat_content, platform=platform
)

if fired:
logger.info(f"[CHAT] Successfully resumed session {target_session_id}")

# Reset task status from "waiting" to "running" when user replies
if self.ui_controller:
from app.ui_layer.events import UIEvent, UIEventType

self.ui_controller.event_bus.emit(
UIEvent(
type=UIEventType.TASK_UPDATE,
data={
"task_id": target_session_id,
"status": "running",
},
)
)

# Check if there are still other tasks waiting
triggers = await self.triggers.list_triggers()
has_waiting_tasks = any(
getattr(t, 'waiting_for_reply', False)
for t in triggers
if t.session_id != target_session_id
)
if not has_waiting_tasks:
self.ui_controller.event_bus.emit(
UIEvent(
type=UIEventType.AGENT_STATE_CHANGED,
data={
"state": "working",
"status_message": "Agent is working...",
},
)
)

return # Task will resume with user message in event stream

# If fire() returns False, no waiting trigger found for this session
# Fall through to normal routing (conversation mode)
logger.warning(
f"[CHAT] Session {target_session_id} not found or expired, falling through to normal routing"
)

# Check active tasks — route message to matching session if possible
# Use active_task_ids from state_manager (not just triggers in queue) to ensure
# all running tasks are visible for routing, not just those waiting in queue
Expand Down
8 changes: 7 additions & 1 deletion app/ui_layer/adapters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,10 @@ def _handle_agent_message(self, event: UIEvent) -> None:
agent_name = onboarding_manager.state.agent_name or "Agent"
asyncio.create_task(
self._display_chat_message(
agent_name, event.data.get("message", ""), "agent"
agent_name,
event.data.get("message", ""),
"agent",
task_session_id=event.task_id,
)
)

Expand Down Expand Up @@ -438,6 +441,7 @@ async def _display_chat_message(
label: str,
message: str,
style: str,
task_session_id: Optional[str] = None,
) -> None:
"""
Display a chat message.
Expand All @@ -446,6 +450,7 @@ async def _display_chat_message(
label: Message sender label
message: Message content
style: Style identifier
task_session_id: Optional task session ID for reply feature
"""
import time

Expand All @@ -455,6 +460,7 @@ async def _display_chat_message(
content=message,
style=style,
timestamp=time.time(),
task_session_id=task_session_id,
)
)

Expand Down
60 changes: 55 additions & 5 deletions app/ui_layer/adapters/browser_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ async def append_message(self, message: ChatMessage) -> None:
style=message.style,
timestamp=message.timestamp,
attachments=attachments_data,
task_session_id=message.task_session_id,
)
self._storage.insert_message(stored)
except Exception:
Expand Down Expand Up @@ -265,6 +266,10 @@ async def append_message(self, message: ChatMessage) -> None:
for att in message.attachments
]

# Include task session ID for reply feature
if message.task_session_id:
message_data["taskSessionId"] = message.task_session_id

await self._adapter._broadcast({
"type": "chat_message",
"data": message_data,
Expand Down Expand Up @@ -676,6 +681,36 @@ def metrics_collector(self) -> MetricsCollector:
"""Get the metrics collector for dashboard data."""
return self._metrics_collector

async def submit_message(
self,
message: str,
reply_context: Optional[Dict[str, Any]] = None
) -> None:
"""
Submit a message from the user with optional reply context.

Overrides base class to handle reply-to-chat/task feature.
Appends reply context to the message before routing to the agent.

Args:
message: The user's input message
reply_context: Optional dict with {sessionId?: str, originalMessage: str}
"""
agent_context = message

# Add reply context note (similar to attachment_note pattern)
if reply_context and reply_context.get("originalMessage"):
reply_note = f"\n\n[REPLYING TO PREVIOUS AGENT MESSAGE]:\n{reply_context['originalMessage']}"
agent_context = message + reply_note

# Pass to controller with target session ID if replying
target_session_id = reply_context.get("sessionId") if reply_context else None
await self._controller.submit_message(
agent_context,
self._adapter_id,
target_session_id=target_session_id
)

def _handle_task_start(self, event: UIEvent) -> None:
"""Handle task start event with metrics tracking."""
# Call parent implementation
Expand Down Expand Up @@ -874,16 +909,17 @@ async def _handle_ws_message(self, data: Dict[str, Any]) -> None:
msg_type = data.get("type")

if msg_type == "message":
# User sent a message (may include attachments)
# User sent a message (may include attachments and/or reply context)
content = data.get("content", "")
attachments = data.get("attachments", [])
reply_context = data.get("replyContext") # {sessionId?: str, originalMessage: str}

if attachments:
# Message with attachments - use custom handler
await self._handle_chat_message_with_attachments(content, attachments)
await self._handle_chat_message_with_attachments(content, attachments, reply_context)
elif content:
# Regular message without attachments - use normal flow
await self.submit_message(content)
await self.submit_message(content, reply_context)

elif msg_type == "chat_attachment_upload":
# Upload attachment for chat message
Expand Down Expand Up @@ -3660,8 +3696,13 @@ async def _handle_file_download(self, file_path: str) -> None:
},
})

async def _handle_chat_message_with_attachments(self, content: str, attachments: List[Dict[str, Any]]) -> None:
"""Handle user chat message with attachments."""
async def _handle_chat_message_with_attachments(
self,
content: str,
attachments: List[Dict[str, Any]],
reply_context: Optional[Dict[str, Any]] = None
) -> None:
"""Handle user chat message with attachments and optional reply context."""
import uuid
from app.ui_layer.state.ui_state import AgentStateType
from app.ui_layer.events import UIEvent, UIEventType
Expand Down Expand Up @@ -3726,6 +3767,11 @@ async def _handle_chat_message_with_attachments(self, content: str, attachments:
# (This is what the agent sees in the event stream - includes file paths)
agent_context = content + attachment_note

# Add reply context note (similar to attachment_note pattern)
if reply_context and reply_context.get("originalMessage"):
reply_note = f"\n\n[REPLYING TO PREVIOUS AGENT MESSAGE]:\n{reply_context['originalMessage']}"
agent_context = agent_context + reply_note

if not agent_context.strip():
return

Expand All @@ -3751,6 +3797,10 @@ async def _handle_chat_message_with_attachments(self, content: str, attachments:
"sender": {"id": self._adapter_id or "user", "type": "user"},
"gui_mode": self._controller._state_store.state.gui_mode,
}
# Include target session ID if replying to a specific session
if reply_context and reply_context.get("sessionId"):
payload["target_session_id"] = reply_context["sessionId"]

await self._controller._agent._handle_chat_message(payload)

except Exception as e:
Expand Down
40 changes: 37 additions & 3 deletions app/ui_layer/browser/frontend/src/contexts/WebSocketContext.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@ interface PendingAttachment {
content: string // base64
}

// Reply target for reply-to-chat/task feature
interface ReplyTarget {
type: 'chat' | 'task'
sessionId?: string // May be undefined for old messages without session tracking
displayName: string // Truncated preview for UI display
originalContent: string // Full content for agent context
}

// Reply context sent with message
interface ReplyContext {
sessionId?: string
originalMessage: string
}

interface WebSocketState {
connected: boolean
messages: ChatMessage[]
Expand All @@ -29,10 +43,12 @@ interface WebSocketState {
onboardingLoading: boolean
// Unread message tracking
lastSeenMessageId: string | null
// Reply state for reply-to-chat/task feature
replyTarget: ReplyTarget | null
}

interface WebSocketContextType extends WebSocketState {
sendMessage: (content: string, attachments?: PendingAttachment[]) => void
sendMessage: (content: string, attachments?: PendingAttachment[], replyContext?: ReplyContext) => void
sendCommand: (command: string) => void
clearMessages: () => void
cancelTask: (taskId: string) => void
Expand All @@ -46,6 +62,9 @@ interface WebSocketContextType extends WebSocketState {
goBackOnboardingStep: () => void
// Unread message tracking
markMessagesAsSeen: () => void
// Reply-to-chat/task methods
setReplyTarget: (target: ReplyTarget) => void
clearReplyTarget: () => void
}

// Initialize lastSeenMessageId from localStorage
Expand Down Expand Up @@ -86,6 +105,8 @@ const defaultState: WebSocketState = {
onboardingLoading: false,
// Unread message tracking
lastSeenMessageId: getInitialLastSeenMessageId(),
// Reply state
replyTarget: null,
}

const WebSocketContext = createContext<WebSocketContextType | undefined>(undefined)
Expand Down Expand Up @@ -462,12 +483,13 @@ export function WebSocketProvider({ children }: { children: ReactNode }) {
}
}, [connect])

const sendMessage = useCallback((content: string, attachments?: PendingAttachment[]) => {
const sendMessage = useCallback((content: string, attachments?: PendingAttachment[], replyContext?: ReplyContext) => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(JSON.stringify({
type: 'message',
content,
attachments: attachments || []
attachments: attachments || [],
replyContext: replyContext || null,
}))
}
}, [])
Expand Down Expand Up @@ -557,6 +579,16 @@ export function WebSocketProvider({ children }: { children: ReactNode }) {
})
}, [])

// Set reply target for reply-to-chat/task feature
const setReplyTarget = useCallback((target: ReplyTarget) => {
setState(prev => ({ ...prev, replyTarget: target }))
}, [])

// Clear reply target
const clearReplyTarget = useCallback(() => {
setState(prev => ({ ...prev, replyTarget: null }))
}, [])

return (
<WebSocketContext.Provider
value={{
Expand All @@ -573,6 +605,8 @@ export function WebSocketProvider({ children }: { children: ReactNode }) {
skipOnboardingStep,
goBackOnboardingStep,
markMessagesAsSeen,
setReplyTarget,
clearReplyTarget,
}}
>
{children}
Expand Down
Loading