diff --git a/src/cortex-engine/src/streaming.rs b/src/cortex-engine/src/streaming.rs index 35bfcef..52c2a83 100644 --- a/src/cortex-engine/src/streaming.rs +++ b/src/cortex-engine/src/streaming.rs @@ -15,6 +15,10 @@ use futures::Stream; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; +/// Maximum number of events to buffer before dropping old ones. +/// Prevents unbounded memory growth if drain_events() is not called regularly. +const MAX_BUFFER_SIZE: usize = 10_000; + /// Token usage for streaming. #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct StreamTokenUsage { @@ -213,7 +217,7 @@ impl StreamProcessor { Self { state: StreamState::Idle, content: StreamContent::new(), - buffer: VecDeque::new(), + buffer: VecDeque::with_capacity(1024), // Pre-allocate reasonable capacity start_time: None, first_token_time: None, last_event_time: None, @@ -284,6 +288,10 @@ impl StreamProcessor { } } + // Enforce buffer size limit to prevent unbounded memory growth + if self.buffer.len() >= MAX_BUFFER_SIZE { + self.buffer.pop_front(); + } self.buffer.push_back(event); }