Skip to content

fix(streaming): add buffer size limit to StreamProcessor#84

Closed
echobt wants to merge 1 commit intomainfrom
fix/streaming-buffer-limit
Closed

fix(streaming): add buffer size limit to StreamProcessor#84
echobt wants to merge 1 commit intomainfrom
fix/streaming-buffer-limit

Conversation

@echobt
Copy link
Contributor

@echobt echobt commented Feb 4, 2026

Summary

Adds a maximum buffer size limit to StreamProcessor to prevent unbounded memory growth during long streaming sessions.

Problem

The StreamProcessor's event buffer (VecDeque) has no size limit. If drain_events() is not called regularly, the buffer can grow indefinitely during long streams, potentially causing memory exhaustion.

Changes

  • Added MAX_BUFFER_SIZE constant (10,000 events)
  • Modified process() to drop oldest events when buffer is full
  • Pre-allocated buffer capacity in new() for better performance

Testing

  • Verified code compiles with cargo check -p cortex-engine

Verification

cargo check -p cortex-engine

@greptile-apps
Copy link

greptile-apps bot commented Feb 4, 2026

Greptile Overview

Greptile Summary

Added memory safety guard to StreamProcessor by introducing a 10,000-event buffer limit. When the buffer reaches capacity, the oldest events are silently dropped to prevent unbounded memory growth during long streaming sessions.

Key Changes:

  • Introduced MAX_BUFFER_SIZE constant set to 10,000 events
  • Added overflow protection in process() method that drops oldest events when limit is reached
  • Pre-allocated buffer capacity to 1,024 for improved performance

Impact:
The change prevents potential OOM issues in scenarios where drain_events() is not called frequently enough. The implementation is straightforward and follows a standard circular buffer pattern by dropping the oldest data first (FIFO).

Confidence Score: 4/5

  • Safe to merge with minor consideration for observability improvements
  • The implementation correctly prevents unbounded memory growth with a simple and effective approach. The logic is sound - dropping oldest events when buffer is full is standard practice. Minor consideration: events are dropped silently without logging or metrics, which could make debugging harder in production scenarios where event loss occurs
  • No files require special attention - the single change is straightforward and low-risk

Important Files Changed

Filename Overview
src/cortex-engine/src/streaming.rs Added buffer size limit (10K events) to prevent unbounded memory growth; silently drops oldest events when full

Sequence Diagram

sequenceDiagram
    participant Caller
    participant StreamProcessor
    participant Buffer as VecDeque<StreamEvent>
    
    Note over StreamProcessor: MAX_BUFFER_SIZE = 10,000
    
    Caller->>StreamProcessor: process(event)
    StreamProcessor->>Buffer: check len()
    
    alt Buffer length >= 10,000
        Buffer->>Buffer: pop_front() (drop oldest)
        Note over Buffer: Oldest event discarded
    end
    
    StreamProcessor->>Buffer: push_back(new event)
    Note over Buffer: Buffer maintains max 10K events
    
    Caller->>StreamProcessor: drain_events()
    StreamProcessor->>Buffer: drain(..)
    Buffer-->>Caller: Vec<StreamEvent>
    Note over Buffer: Buffer emptied
Loading

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 file reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Comment on lines +291 to 295
// Enforce buffer size limit to prevent unbounded memory growth
if self.buffer.len() >= MAX_BUFFER_SIZE {
self.buffer.pop_front();
}
self.buffer.push_back(event);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider logging when dropping events - silent data loss could be hard to debug

Suggested change
// Enforce buffer size limit to prevent unbounded memory growth
if self.buffer.len() >= MAX_BUFFER_SIZE {
self.buffer.pop_front();
}
self.buffer.push_back(event);
// Enforce buffer size limit to prevent unbounded memory growth
if self.buffer.len() >= MAX_BUFFER_SIZE {
self.buffer.pop_front();
// Consider: log::warn!("StreamProcessor buffer full, dropping oldest event");
}

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/cortex-engine/src/streaming.rs
Line: 291:295

Comment:
Consider logging when dropping events - silent data loss could be hard to debug

```suggestion
        // Enforce buffer size limit to prevent unbounded memory growth
        if self.buffer.len() >= MAX_BUFFER_SIZE {
            self.buffer.pop_front();
            // Consider: log::warn!("StreamProcessor buffer full, dropping oldest event");
        }
```

<sub>Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!</sub>

How can I resolve this? If you propose a fix, please make it concise.

@echobt
Copy link
Contributor Author

echobt commented Feb 4, 2026

Closing to consolidate: This streaming buffer fix will be merged with PR #85 (ToolState transition validation) into a consolidated protocol robustness PR.

@echobt echobt closed this Feb 4, 2026
echobt added a commit that referenced this pull request Feb 4, 2026
## Summary

This PR consolidates **2 protocol robustness improvements**.

### Included PRs:
- #84: Add buffer size limit to StreamProcessor
- #85: Add ToolState transition validation

### Key Changes:
- Added MAX_BUFFER_SIZE constant (10,000 events) for StreamProcessor
- Modified process() to drop oldest events when buffer is full
- Pre-allocated buffer capacity in new() for better performance
- Added can_transition_to() method to ToolState enum
- Updated update_tool_state to log warnings on invalid transitions
- Documented valid state machine transitions

### Files Modified:
- src/cortex-engine/src/streaming.rs
- src/cortex-protocol/src/protocol/message_parts.rs

Closes #84, #85
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant