Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion src/cortex-engine/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ use futures::Stream;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;

/// Maximum number of events to buffer before dropping old ones.
/// Prevents unbounded memory growth if drain_events() is not called regularly.
const MAX_BUFFER_SIZE: usize = 10_000;

/// Token usage for streaming.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct StreamTokenUsage {
Expand Down Expand Up @@ -213,7 +217,7 @@ impl StreamProcessor {
Self {
state: StreamState::Idle,
content: StreamContent::new(),
buffer: VecDeque::new(),
buffer: VecDeque::with_capacity(1024), // Pre-allocate reasonable capacity
start_time: None,
first_token_time: None,
last_event_time: None,
Expand Down Expand Up @@ -284,6 +288,10 @@ impl StreamProcessor {
}
}

// Enforce buffer size limit to prevent unbounded memory growth
if self.buffer.len() >= MAX_BUFFER_SIZE {
self.buffer.pop_front();
}
self.buffer.push_back(event);
}

Expand Down
1 change: 1 addition & 0 deletions src/cortex-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ uuid = { workspace = true, features = ["serde", "v4"] }
chrono = { workspace = true }
strum_macros = "0.27"
base64 = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
pretty_assertions = { workspace = true }
Expand Down
49 changes: 49 additions & 0 deletions src/cortex-protocol/src/protocol/message_parts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,45 @@ pub enum ToolState {
},
}


impl ToolState {
/// Check if transitioning to the given state is valid.
///
/// Valid transitions:
/// - Pending -> Running, Completed, Error
/// - Running -> Completed, Error
/// - Completed -> (terminal, no transitions)
/// - Error -> (terminal, no transitions)
///
/// State machine:
/// ```text
/// Pending -> Running -> Completed
/// | |
/// | +-> Error
/// +-> Completed
/// +-> Error
/// ```
pub fn can_transition_to(&self, target: &ToolState) -> bool {
match (self, target) {
// From Pending, can go to any non-Pending state
(ToolState::Pending { .. }, ToolState::Running { .. }) => true,
(ToolState::Pending { .. }, ToolState::Completed { .. }) => true,
(ToolState::Pending { .. }, ToolState::Error { .. }) => true,

// From Running, can go to Completed or Error
(ToolState::Running { .. }, ToolState::Completed { .. }) => true,
(ToolState::Running { .. }, ToolState::Error { .. }) => true,

// Terminal states cannot transition
(ToolState::Completed { .. }, _) => false,
(ToolState::Error { .. }, _) => false,

// Any other transition is invalid
_ => false,
}
}
}

/// Subtask execution status.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, JsonSchema)]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -552,6 +591,8 @@ impl MessageWithParts {
}

/// Update a tool state by call ID.
///
/// Logs a warning if the state transition is invalid (e.g., from a terminal state).
pub fn update_tool_state(&mut self, call_id: &str, new_state: ToolState) -> bool {
for part in &mut self.parts {
if let MessagePart::Tool {
Expand All @@ -561,6 +602,14 @@ impl MessageWithParts {
} = &mut part.part
{
if cid == call_id {
if !state.can_transition_to(&new_state) {
tracing::warn!(
"Invalid ToolState transition from {:?} to {:?} for call_id {}",
state,
new_state,
call_id
);
}
*state = new_state;
Comment on lines +605 to 613
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The state transition validation logs a warning but still updates the state. This means invalid transitions (e.g., from terminal states) are allowed at runtime. Consider whether invalid transitions should be rejected instead of just logged.

Suggested change
if !state.can_transition_to(&new_state) {
tracing::warn!(
"Invalid ToolState transition from {:?} to {:?} for call_id {}",
state,
new_state,
call_id
);
}
*state = new_state;
if cid == call_id {
if !state.can_transition_to(&new_state) {
tracing::warn!(
"Invalid ToolState transition from {:?} to {:?} for call_id {}",
state,
new_state,
call_id
);
return false;
}
*state = new_state;
return true;
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/cortex-protocol/src/protocol/message_parts.rs
Line: 605:613

Comment:
The state transition validation logs a warning but still updates the state. This means invalid transitions (e.g., from terminal states) are allowed at runtime. Consider whether invalid transitions should be rejected instead of just logged.

```suggestion
                if cid == call_id {
                    if !state.can_transition_to(&new_state) {
                        tracing::warn!(
                            "Invalid ToolState transition from {:?} to {:?} for call_id {}",
                            state,
                            new_state,
                            call_id
                        );
                        return false;
                    }
                    *state = new_state;
                    return true;
                }
```

How can I resolve this? If you propose a fix, please make it concise.

return true;
}
Expand Down
Loading