diff --git a/dwctl/src/lib.rs b/dwctl/src/lib.rs index e58c690a8..9581e38ec 100644 --- a/dwctl/src/lib.rs +++ b/dwctl/src/lib.rs @@ -902,7 +902,7 @@ fn create_cors_layer(config: &Config) -> anyhow::Result { /// /// - `state`: Mutable application state (metrics recorder may be initialized here) /// - `onwards_router`: Pre-configured router for AI request proxying -/// - `analytics_sender`: Optional sender for analytics records (from background services) +/// - `analytics_enabled`: Whether analytics is enabled /// - `metrics_recorder`: Optional GenAI metrics recorder (created before background services) /// /// # Returns @@ -916,7 +916,7 @@ fn create_cors_layer(config: &Config) -> anyhow::Result { pub async fn build_router( state: &mut AppState, onwards_router: Router, - analytics_sender: Option, + analytics_enabled: bool, metrics_recorder: Option, strict_mode: bool, ) -> anyhow::Result { @@ -929,10 +929,10 @@ pub async fn build_router( // Both require the RequestLoggerLayer to capture request/response data, but use // different handlers to process that data. let request_logging_enabled = state.outlet_db.is_some() && state.config.enable_request_logging; - let analytics_enabled = state.config.enable_analytics; let outlet_layer = if request_logging_enabled || analytics_enabled { // Store the metrics recorder in state (created earlier in Application::new) + let metrics_recorder_for_analytics = metrics_recorder.clone(); state.metrics_recorder = metrics_recorder; // Build handler chain based on config @@ -950,9 +950,11 @@ pub async fn build_router( } // Add AnalyticsHandler for analytics/billing if enabled - // The batcher is spawned in setup_background_services and managed by BackgroundServices - if let Some(sender) = analytics_sender { - let analytics_handler = request_logging::AnalyticsHandler::new(sender, uuid::Uuid::new_v4(), state.config.clone()); + // The handler owns an AnalyticsWriter that does batch enrichment + transactional writes + if analytics_enabled { + let pool = state.db.write().clone(); + let analytics_handler = + request_logging::AnalyticsHandler::new(pool, uuid::Uuid::new_v4(), state.config.clone(), metrics_recorder_for_analytics); multi_handler = multi_handler.with(analytics_handler); } @@ -964,7 +966,9 @@ pub async fn build_router( capture_request_body: true, capture_response_body: true, path_filter: None, // No path filter needed - applied directly to ai_router - ..Default::default() + // Larger buffer since outlet is now the only queue for billing data. + // Default is 4096; we use 16384 to reduce drop risk under load. + channel_capacity: 16384, }; Some(RequestLoggerLayer::new(outlet_config, multi_handler)) } @@ -1483,8 +1487,8 @@ pub struct BackgroundServices { onwards_sender: Option>, #[allow(dead_code)] // Used in sync_onwards_config method strict_mode: bool, - /// Sender for analytics records (if analytics is enabled) - analytics_sender: Option, + /// Whether analytics is enabled (handler is created in build_router) + analytics_enabled: bool, // JoinSet is cancel-safe - can be polled in select! without losing tasks background_tasks: tokio::task::JoinSet>, // Map task IDs to names for logging @@ -1616,7 +1620,6 @@ async fn setup_background_services( outlet_pool: Option, config: Config, shutdown_token: tokio_util::sync::CancellationToken, - metrics_recorder: Option, ) -> anyhow::Result { use fusillade::manager::postgres::BatchInsertStrategy; let drop_guard = shutdown_token.clone().drop_guard(); @@ -1986,20 +1989,7 @@ async fn setup_background_services( }); } - // Start analytics batcher if enabled - let analytics_sender = if config.enable_analytics { - let (batcher, sender) = request_logging::AnalyticsBatcher::new(pool.clone(), config.clone(), metrics_recorder); - - let batcher_shutdown = shutdown_token.clone(); - background_tasks.spawn("analytics-batcher", async move { - batcher.run(batcher_shutdown).await; - Ok(()) - }); - - Some(sender) - } else { - None - }; + let analytics_enabled = config.enable_analytics; let (background_tasks, task_names) = background_tasks.into_parts(); @@ -2009,7 +1999,7 @@ async fn setup_background_services( onwards_targets: initial_targets, onwards_sender, strict_mode: config.onwards.strict_mode, - analytics_sender, + analytics_enabled, background_tasks, task_names, shutdown_token, @@ -2077,7 +2067,6 @@ impl Application { let shutdown_token = tokio_util::sync::CancellationToken::new(); // Create GenAI metrics recorder if both metrics and analytics are enabled - // This is created here (before background services) so the analytics batcher can use it let metrics_recorder = if config.enable_metrics && config.enable_analytics { let gen_ai_registry = prometheus::Registry::new(); Some(GenAiMetrics::new(&gen_ai_registry).map_err(|e| anyhow::anyhow!("Failed to create GenAI metrics: {}", e))?) @@ -2094,7 +2083,6 @@ impl Application { outlet_pools.as_ref().map(|p| (**p).clone()), config.clone(), shutdown_token.clone(), - metrics_recorder.clone(), ) .await?; @@ -2138,7 +2126,7 @@ impl Application { let router = build_router( &mut app_state, onwards_router, - bg_services.analytics_sender.clone(), + bg_services.analytics_enabled, metrics_recorder, bg_services.onwards_targets.strict_mode, ) diff --git a/dwctl/src/request_logging/analytics_handler.rs b/dwctl/src/request_logging/analytics_handler.rs index d4179a2ed..5f9f7f9f1 100644 --- a/dwctl/src/request_logging/analytics_handler.rs +++ b/dwctl/src/request_logging/analytics_handler.rs @@ -1,42 +1,33 @@ //! Analytics request handler for AI proxy requests. //! -//! This module provides [`AnalyticsHandler`], a standalone implementation of the [`outlet::RequestHandler`] +//! This module provides [`AnalyticsHandler`], an implementation of the [`outlet::RequestHandler`] //! trait that handles analytics, billing (credit deduction), and Prometheus metrics recording. //! -//! # Decoupling from Request Logging -//! -//! Previously, analytics was coupled to request logging via outlet-postgres. The analytics logic -//! lived inside a "serializer" callback, meaning if request logging was disabled, no analytics -//! would be recorded either. -//! -//! This handler can be used independently or composed with other handlers (like PostgresHandler -//! for request logging) using [`outlet::MultiHandler`]. -//! //! # Architecture //! -//! The handler does minimal work per-request - it extracts raw metrics and sends them to a -//! background batcher via a channel. The batcher handles: -//! - Batch enrichment (user lookup, model/tariff lookup) -//! - Transactional writes (analytics + credits in single transaction) +//! The handler overrides `handle_response_batch` to process outlet's batched responses +//! directly, with no intermediate channel or accumulation loop: //! -//! This design keeps the hot path fast while ensuring data consistency. +//! ```text +//! outlet background task +//! → accumulates responses into batch +//! → calls handle_response_batch(&[(req, res)]) +//! → extracts RawAnalyticsRecords +//! → calls AnalyticsWriter.flush() +//! → batch enrichment (user lookup, pricing lookup) +//! → transactional write (analytics + credits) +//! → retry with exponential backoff +//! → metrics recording +//! ``` //! //! # Example //! //! ```ignore //! use outlet::{MultiHandler, RequestLoggerConfig, RequestLoggerLayer}; -//! use dwctl::request_logging::{AnalyticsHandler, AnalyticsBatcher}; +//! use dwctl::request_logging::AnalyticsHandler; //! -//! // Create batcher and get sender -//! let (batcher, sender) = AnalyticsBatcher::new(pool, config.analytics.clone()); +//! let analytics = AnalyticsHandler::new(pool, instance_id, config, metrics_recorder); //! -//! // Spawn batcher background task -//! tokio::spawn(batcher.run(cancellation_token)); -//! -//! // Create handler with sender -//! let analytics = AnalyticsHandler::new(sender, instance_id, config, metrics_recorder); -//! -//! // Use with MultiHandler for composition //! let handler = MultiHandler::new() //! .with(postgres_handler) // request logging //! .with(analytics); // analytics/billing @@ -45,144 +36,147 @@ //! ``` use crate::config::Config; +use crate::metrics::MetricsRecorder; use crate::request_logging::AiResponse; -use crate::request_logging::batcher::{AnalyticsSender, RawAnalyticsRecord}; +use crate::request_logging::batcher::{AnalyticsWriter, RawAnalyticsRecord}; use crate::request_logging::serializers::{Auth, UsageMetrics, parse_ai_response}; use crate::request_logging::utils::{extract_header_as_string, extract_header_as_uuid}; -use metrics::counter; use outlet::{RequestData, RequestHandler, ResponseData}; use serde_json::Value; -use tracing::{Instrument, info_span, warn}; +use sqlx::PgPool; +use tracing::{Instrument, info_span}; use uuid::Uuid; -/// A request handler that sends analytics data to a background batcher. -/// -/// This handler implements [`outlet::RequestHandler`] and can be used standalone or composed -/// with other handlers using [`outlet::MultiHandler`]. -/// -/// The handler does minimal work per-request: -/// 1. Parses the AI response to extract token usage -/// 2. Extracts raw data from request headers (bearer token, fusillade metadata) -/// 3. Sends `RawAnalyticsRecord` to the batcher via channel +/// A request handler that processes analytics data via batched writes. /// -/// All database operations (enrichment, writes) happen in the background batcher. -pub struct AnalyticsHandler { - sender: AnalyticsSender, +/// This handler implements [`outlet::RequestHandler`] and overrides `handle_response_batch` +/// to process outlet's batched responses directly. For each batch, it extracts raw metrics, +/// then delegates to [`AnalyticsWriter`] for enrichment and transactional database writes. +pub struct AnalyticsHandler +where + M: MetricsRecorder + Clone + Send + Sync + 'static, +{ + writer: AnalyticsWriter, instance_id: Uuid, config: Config, } -impl AnalyticsHandler { +impl AnalyticsHandler +where + M: MetricsRecorder + Clone + Send + Sync + 'static, +{ /// Creates a new analytics handler. /// /// # Arguments /// - /// * `sender` - Channel sender to the analytics batcher + /// * `pool` - Database connection pool for analytics writes /// * `instance_id` - Unique identifier for this service instance /// * `config` - Application configuration - pub fn new(sender: AnalyticsSender, instance_id: Uuid, config: Config) -> Self { + /// * `metrics_recorder` - Optional metrics recorder for Prometheus metrics + pub fn new(pool: PgPool, instance_id: Uuid, config: Config, metrics_recorder: Option) -> Self { + let writer = AnalyticsWriter::new(pool, config.clone(), metrics_recorder); Self { - sender, + writer, instance_id, config, } } + + /// Extract a [`RawAnalyticsRecord`] from a request/response pair. + fn extract_record(&self, request_data: &RequestData, response_data: &ResponseData) -> RawAnalyticsRecord { + // Try to parse the response - may fail for error responses (4xx, 5xx) + let parse_result = parse_ai_response(request_data, response_data); + + // Use parsed response for metrics, or fallback to Other for error responses + let metrics_response = match &parse_result { + Ok(response) => response.clone(), + Err(_) => AiResponse::Other(Value::Null), + }; + + // Extract basic metrics - captures status_code, duration, model from request, etc. + let metrics = UsageMetrics::extract(self.instance_id, request_data, response_data, &metrics_response, &self.config); + + // Extract auth information from headers + let auth = Auth::from_request(request_data, &self.config); + + // Extract fusillade batch metadata from headers + let fusillade_batch_id = extract_header_as_uuid(request_data, "x-fusillade-batch-id"); + let fusillade_request_id = extract_header_as_uuid(request_data, "x-fusillade-request-id"); + let custom_id = extract_header_as_string(request_data, "x-fusillade-custom-id"); + let batch_completion_window = extract_header_as_string(request_data, "x-fusillade-batch-completion-window"); + let batch_request_source = extract_header_as_string(request_data, "x-fusillade-batch-request-source").unwrap_or_default(); + + // Extract batch creation timestamp for pricing lookup + let batch_created_at = extract_header_as_string(request_data, "x-fusillade-batch-created-at") + .and_then(|s| s.parse::>().ok()); + + // Extract bearer token from auth + let bearer_token = match &auth { + Auth::ApiKey { bearer_token } => Some(bearer_token.clone()), + Auth::None => None, + }; + + RawAnalyticsRecord { + instance_id: metrics.instance_id, + correlation_id: metrics.correlation_id, + timestamp: metrics.timestamp, + method: metrics.method, + uri: metrics.uri, + request_model: metrics.request_model, + response_model: metrics.response_model, + status_code: metrics.status_code, + duration_ms: metrics.duration_ms, + duration_to_first_byte_ms: metrics.duration_to_first_byte_ms, + prompt_tokens: metrics.prompt_tokens, + completion_tokens: metrics.completion_tokens, + total_tokens: metrics.total_tokens, + response_type: metrics.response_type, + server_address: metrics.server_address, + server_port: metrics.server_port, + bearer_token, + fusillade_batch_id, + fusillade_request_id, + custom_id, + batch_completion_window, + batch_created_at, + batch_request_source, + trace_id: request_data.trace_id.clone(), + } + } } -impl RequestHandler for AnalyticsHandler { +impl RequestHandler for AnalyticsHandler +where + M: MetricsRecorder + Clone + Send + Sync + 'static, +{ /// No-op for request phase - analytics only needs response data. - async fn handle_request(&self, _data: RequestData) { - // Analytics doesn't need the request phase - } + async fn handle_request(&self, _data: RequestData) {} - /// Extracts raw analytics data and sends to background batcher. + /// Extracts analytics data and flushes a single record. /// - /// This method does minimal work per-request: - /// 1. Parses the AI response to extract token usage - /// 2. Extracts raw data from headers (bearer token, fusillade metadata) - /// 3. Sends `RawAnalyticsRecord` to batcher via channel - /// - /// All database work (enrichment, writes, credit deduction) happens in the batcher. + /// In practice, outlet 0.8+ always calls `handle_response_batch` instead. + /// This method exists as a fallback for direct single-item calls. async fn handle_response(&self, request_data: RequestData, response_data: ResponseData) { - let correlation_id = request_data.correlation_id; - let span = info_span!( - "dwctl.analytics_handler", - correlation_id = correlation_id, - status = %response_data.status - ); - - async { - // Try to parse the response - may fail for error responses (4xx, 5xx) - let parse_result = parse_ai_response(&request_data, &response_data); - - // Use parsed response for metrics, or fallback to Other for error responses - let metrics_response = match &parse_result { - Ok(response) => response.clone(), - Err(_) => AiResponse::Other(Value::Null), - }; - - // Extract basic metrics - captures status_code, duration, model from request, etc. - let metrics = UsageMetrics::extract(self.instance_id, &request_data, &response_data, &metrics_response, &self.config); - - // Extract auth information from headers - let auth = Auth::from_request(&request_data, &self.config); - - // Extract fusillade batch metadata from headers - let fusillade_batch_id = extract_header_as_uuid(&request_data, "x-fusillade-batch-id"); - let fusillade_request_id = extract_header_as_uuid(&request_data, "x-fusillade-request-id"); - let custom_id = extract_header_as_string(&request_data, "x-fusillade-custom-id"); - let batch_completion_window = extract_header_as_string(&request_data, "x-fusillade-batch-completion-window"); - let batch_request_source = extract_header_as_string(&request_data, "x-fusillade-batch-request-source").unwrap_or_default(); + let record = self.extract_record(&request_data, &response_data); + self.writer.flush(&[record]).await; + } - // Extract batch creation timestamp for pricing lookup - // This ensures batch requests are priced as of batch creation, not processing time - let batch_created_at = extract_header_as_string(&request_data, "x-fusillade-batch-created-at") - .and_then(|s| s.parse::>().ok()); + /// Extracts analytics data from a batch of responses and flushes to the database. + /// + /// This is the primary entry point — outlet's background task dispatches accumulated + /// responses as batches. For each pair, we extract a `RawAnalyticsRecord`, then + /// delegate to the writer for batch enrichment and transactional writes. + async fn handle_response_batch(&self, batch: &[(RequestData, ResponseData)]) { + if batch.is_empty() { + return; + } - // Extract bearer token from auth - let bearer_token = match &auth { - Auth::ApiKey { bearer_token } => Some(bearer_token.clone()), - Auth::None => None, - }; + let span = info_span!("dwctl.analytics_handler_batch", batch_size = batch.len(),); - // Build the raw record (no DB enrichment) - // Note: request_origin is computed in the batcher after api_key_purpose is resolved - let record = RawAnalyticsRecord { - instance_id: metrics.instance_id, - correlation_id: metrics.correlation_id, - timestamp: metrics.timestamp, - method: metrics.method, - uri: metrics.uri, - request_model: metrics.request_model, - response_model: metrics.response_model, - status_code: metrics.status_code, - duration_ms: metrics.duration_ms, - duration_to_first_byte_ms: metrics.duration_to_first_byte_ms, - prompt_tokens: metrics.prompt_tokens, - completion_tokens: metrics.completion_tokens, - total_tokens: metrics.total_tokens, - response_type: metrics.response_type, - server_address: metrics.server_address, - server_port: metrics.server_port, - bearer_token, - fusillade_batch_id, - fusillade_request_id, - custom_id, - batch_completion_window, - batch_created_at, - batch_request_source, - trace_id: request_data.trace_id.clone(), - }; + async { + let records: Vec = batch.iter().map(|(req, res)| self.extract_record(req, res)).collect(); - // Send to batcher (non-blocking, just puts in channel) - if let Err(e) = self.sender.send(record).await { - counter!("dwctl_analytics_send_errors_total").increment(1); - warn!( - correlation_id = correlation_id, - error = %e, - "Failed to send analytics record to batcher - channel may be full or closed" - ); - } + self.writer.flush(&records).await; } .instrument(span) .await; @@ -195,7 +189,6 @@ mod tests { use axum::http::{Method, StatusCode, Uri}; use std::collections::HashMap; use std::time::{Duration, SystemTime}; - use tokio::sync::mpsc; fn create_test_request_data() -> RequestData { RequestData { @@ -222,16 +215,6 @@ mod tests { } } - #[test] - fn test_analytics_handler_creation() { - // Create a channel for testing - let (tx, _rx) = mpsc::channel::(100); - let config = Config::default(); - - // Verify the handler can be constructed - let _handler = AnalyticsHandler::new(tx, Uuid::new_v4(), config); - } - #[test] fn test_request_data_creation() { let data = create_test_request_data(); @@ -245,22 +228,4 @@ mod tests { assert_eq!(data.correlation_id, 123); assert_eq!(data.status, StatusCode::OK); } - - #[tokio::test] - async fn test_handler_sends_to_channel() { - let (tx, mut rx) = mpsc::channel::(100); - let config = Config::default(); - let handler = AnalyticsHandler::new(tx, Uuid::new_v4(), config); - - // Call handle_response - let request_data = create_test_request_data(); - let response_data = create_test_response_data(); - handler.handle_response(request_data, response_data).await; - - // Verify record was sent to channel - let record = rx.try_recv().expect("Should have received a record"); - assert_eq!(record.correlation_id, 123); - assert_eq!(record.method, "POST"); - assert!(record.uri.contains("chat/completions")); - } } diff --git a/dwctl/src/request_logging/batcher.rs b/dwctl/src/request_logging/batcher.rs index 0b0055677..8dfd0d169 100644 --- a/dwctl/src/request_logging/batcher.rs +++ b/dwctl/src/request_logging/batcher.rs @@ -1,33 +1,31 @@ -//! Analytics batching system for efficient database writes. +//! Analytics writer for efficient batched database writes. //! -//! This module provides [`AnalyticsBatcher`] which accumulates analytics records -//! and writes them to the database in batches, significantly reducing per-request -//! database overhead. +//! This module provides [`AnalyticsWriter`] which enriches and writes analytics records +//! to the database in batches, significantly reducing per-request database overhead. //! //! # Architecture //! //! ```text -//! Request → AnalyticsHandler (extract only) → Channel → AnalyticsBatcher -//! ↓ -//! [Accumulate in buffer] -//! ↓ -//! [Flush immediately (write-through)] -//! ↓ -//! Phase 1: Batch enrich -//! - Token → user_id lookup -//! - Model → pricing lookup -//! ↓ -//! Phase 2: Batch write (transaction) -//! - INSERT http_analytics -//! - INSERT credit_transactions -//! ↓ -//! Phase 3: Record metrics +//! outlet background task +//! → accumulates responses into batch +//! → calls AnalyticsHandler.handle_response_batch() +//! → extracts RawAnalyticsRecords from batch +//! → calls AnalyticsWriter.flush() +//! Phase 1: Batch enrich +//! - Token → user_id lookup +//! - Model → pricing lookup +//! Phase 2: Batch write (transaction) +//! - INSERT http_analytics +//! - INSERT credit_transactions +//! Phase 3: Record metrics //! ``` //! //! # Key Design Decisions //! -//! - **All DB work in batcher**: The handler sends unenriched `RawAnalyticsRecord`s. -//! Enrichment (user lookup, pricing lookup) happens in the batcher via batch queries. +//! - **Outlet handles batching**: The outlet middleware accumulates request/response +//! pairs and dispatches them as batches. No separate channel or accumulation loop. +//! - **All DB work in writer**: The handler extracts unenriched `RawAnalyticsRecord`s. +//! Enrichment (user lookup, pricing lookup) happens in the writer via batch queries. //! - **Transactional writes**: Analytics and credit inserts happen in a single transaction. //! Either both succeed or both roll back. //! - **Batch enrichment**: User and pricing lookups are batched using `IN` clauses, @@ -46,14 +44,10 @@ use sqlx::PgPool; use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::sync::{RwLock, mpsc}; -use tokio_util::sync::CancellationToken; -use tracing::{Instrument, debug, error, info, info_span, trace, warn}; +use tokio::sync::RwLock; +use tracing::{Instrument, debug, error, info_span, trace, warn}; use uuid::Uuid; -/// Channel buffer size - how many records can be queued before backpressure -const CHANNEL_BUFFER_SIZE: usize = 10_000; - /// Raw analytics record sent through the channel (unenriched). /// /// This contains only data that can be extracted from the request/response @@ -112,24 +106,19 @@ struct EnrichedRecord { output_price_per_token: Option, } -/// Sender handle for submitting analytics records to the batcher -pub type AnalyticsSender = mpsc::Sender; - -/// Analytics batcher that accumulates records and writes them in batches. +/// Analytics writer that enriches and writes batches of records to the database. /// /// This significantly reduces database overhead by: /// 1. Batching enrichment queries (user lookup, pricing lookup) /// 2. Batching INSERT operations (analytics, credits) /// 3. Using a single transaction for consistency /// 4. Retrying failed batches with exponential backoff -pub struct AnalyticsBatcher +pub struct AnalyticsWriter where M: MetricsRecorder + Clone + Send + Sync + 'static, { pool: PgPool, metrics_recorder: Option, - receiver: mpsc::Receiver, - batch_size: usize, max_retries: u32, retry_base_delay: std::time::Duration, /// Global rate limiter for onwards sync notifications. @@ -139,35 +128,25 @@ where onwards_sync_notification_interval: Duration, } -impl AnalyticsBatcher +impl AnalyticsWriter where M: MetricsRecorder + Clone + Send + Sync + 'static, { - /// Creates a new analytics batcher and returns the batcher along with a sender. + /// Creates a new analytics writer. /// /// # Arguments /// /// * `pool` - Database connection pool for batch writes /// * `config` - Application configuration (includes batch settings) /// * `metrics_recorder` - Optional metrics recorder for Prometheus metrics - /// - /// # Returns - /// - /// A tuple of (batcher, sender) where the sender is used by AnalyticsHandler - /// to submit records. - pub fn new(pool: PgPool, config: Config, metrics_recorder: Option) -> (Self, AnalyticsSender) { - let (sender, receiver) = mpsc::channel(CHANNEL_BUFFER_SIZE); - - let batch_size = config.analytics.batch_size; + pub fn new(pool: PgPool, config: Config, metrics_recorder: Option) -> Self { let max_retries = config.analytics.max_retries; let retry_base_delay = std::time::Duration::from_millis(config.analytics.retry_base_delay_ms); let onwards_sync_notification_interval = Duration::from_millis(config.analytics.balance_notification_interval_milliseconds); - let batcher = Self { + Self { pool, metrics_recorder, - receiver, - batch_size, max_retries, retry_base_delay, last_onwards_sync_notification: Arc::new(RwLock::new( @@ -176,108 +155,35 @@ where .unwrap_or_else(Instant::now), )), onwards_sync_notification_interval, - }; - - (batcher, sender) - } - - /// Runs the batcher's background write loop. - /// - /// This should be spawned as a tokio task. The strategy is: - /// 1. Block until at least one record arrives - /// 2. Non-blocking drain of all available records in the channel - /// 3. Write the batch immediately - /// 4. Repeat - /// - /// This minimizes latency at low load (single record → immediate write) while - /// getting batching efficiency at high load (records queue while writing → bigger batch). - pub async fn run(mut self, shutdown_token: CancellationToken) { - info!( - max_batch_size = self.batch_size, - max_retries = self.max_retries, - retry_base_delay_ms = self.retry_base_delay.as_millis() as u64, - "Analytics batcher started (write-through mode with retry)" - ); - - let mut buffer: Vec = Vec::with_capacity(self.batch_size); - - loop { - // Step 1: Wait for at least one record OR shutdown - tokio::select! { - biased; // Check shutdown first - - _ = shutdown_token.cancelled() => { - info!("Shutdown signal received, draining analytics channel"); - self.receiver.close(); - // Drain remaining records in batches to avoid OOM with large backlogs - while let Some(record) = self.receiver.recv().await { - buffer.push(record); - if buffer.len() >= self.batch_size { - self.flush_batch(&mut buffer).await; - } - } - if !buffer.is_empty() { - self.flush_batch(&mut buffer).await; - } - info!("Analytics batcher shutdown complete"); - break; - } - - maybe_record = self.receiver.recv() => { - match maybe_record { - Some(record) => buffer.push(record), - None => { - // Channel closed (all senders dropped) - info!("Analytics channel closed, shutting down batcher"); - if !buffer.is_empty() { - self.flush_batch(&mut buffer).await; - } - break; - } - } - } - } - - // Step 2: Non-blocking drain of all available records (up to batch_size) - while buffer.len() < self.batch_size { - match self.receiver.try_recv() { - Ok(record) => buffer.push(record), - Err(_) => break, // Channel empty or closed - } - } - - // Step 3: Write immediately - self.flush_batch(&mut buffer).await; } } - /// Flushes the buffer to the database with retry on failure. + /// Flushes records to the database with retry on failure. /// /// This performs: /// 1. Batch enrichment (user lookup, pricing lookup) - no retry, data issues won't fix themselves /// 2. Transactional write (analytics + credits) - retried with exponential backoff /// 3. Metrics recording - async fn flush_batch(&self, buffer: &mut Vec) { - if buffer.is_empty() { + pub async fn flush(&self, records: &[RawAnalyticsRecord]) { + if records.is_empty() { return; } - let batch_size = buffer.len(); + let batch_size = records.len(); let span = info_span!("dwctl.flush_analytics_batch", batch_size = batch_size); async { let start = std::time::Instant::now(); // Collect correlation IDs for log correlation - let correlation_ids: Vec = buffer.iter().map(|r| r.correlation_id).collect(); + let correlation_ids: Vec = records.iter().map(|r| r.correlation_id).collect(); // Phase 1: Batch enrich (no retry - enrichment failures are usually data issues) - let enriched = match self.enrich_batch(buffer).await { + let enriched = match self.enrich_batch(records).await { Ok(enriched) => enriched, Err(e) => { error!(error = %e, batch_size = batch_size, ?correlation_ids, "Failed to enrich analytics batch"); counter!("dwctl_analytics_batch_errors_total", "phase" => "enrich").increment(1); - buffer.clear(); return; } }; @@ -328,7 +234,6 @@ where "Failed to write analytics batch after all retries, dropping batch" ); counter!("dwctl_analytics_batch_errors_total", "phase" => "write").increment(1); - buffer.clear(); return; } @@ -357,8 +262,6 @@ where ?correlation_ids, "Flushed analytics batch" ); - - buffer.clear(); } .instrument(span) .await; @@ -1159,7 +1062,7 @@ mod tests { } } - /// Helper to call find_best_tariff without needing a full batcher + /// Helper to call find_best_tariff without needing a full writer fn find_tariff( tariffs: &[TariffInfo], api_key_purpose: Option<&ApiKeyPurpose>, @@ -1632,22 +1535,11 @@ mod integration_tests { } } - /// Run the batcher with given records and wait for completion - async fn run_batcher_with_records(pool: &PgPool, records: Vec) { + /// Flush records through the analytics writer + async fn flush_records(pool: &PgPool, records: &[RawAnalyticsRecord]) { let config = crate::test::utils::create_test_config(); - let (batcher, sender) = AnalyticsBatcher::::new(pool.clone(), config, None); - - // Send all records - for record in records { - sender.send(record).await.unwrap(); - } - - // Drop sender to close channel - drop(sender); - - // Run batcher until channel is drained - let shutdown = CancellationToken::new(); - batcher.run(shutdown).await; + let writer = AnalyticsWriter::::new(pool.clone(), config, None); + writer.flush(records).await; } #[sqlx::test] @@ -1669,7 +1561,7 @@ mod integration_tests { let record = create_raw_record("gpt-4-test", Some(api_key), 1000, 500); // Run batcher - run_batcher_with_records(&pool, vec![record]).await; + flush_records(&pool, &[record]).await; // Verify: Balance should be deducted let mut conn = pool.acquire().await.unwrap(); @@ -1728,7 +1620,7 @@ mod integration_tests { let realtime_record = create_raw_record("gpt-4-turbo-test", Some(realtime_key), 1000, 500); // Run batcher - run_batcher_with_records(&pool, vec![batch_record, realtime_record]).await; + flush_records(&pool, &[batch_record, realtime_record]).await; // Expected costs: // Batch: (1000 * 0.00005) + (500 * 0.00010) = 0.05 + 0.05 = 0.10 @@ -1778,7 +1670,7 @@ mod integration_tests { let record = create_raw_record("gpt-4-fallback-test", Some(batch_key), 1000, 500); // Run batcher - run_batcher_with_records(&pool, vec![record]).await; + flush_records(&pool, &[record]).await; // Expected: Should fall back to realtime pricing // Cost: (1000 * 0.00015) + (500 * 0.00030) = 0.15 + 0.15 = 0.30 @@ -1810,7 +1702,7 @@ mod integration_tests { let record = create_raw_record("gpt-4-no-tariff", Some(api_key), 1000, 500); // Run batcher - run_batcher_with_records(&pool, vec![record]).await; + flush_records(&pool, &[record]).await; // Verify: Balance should NOT be deducted (no pricing) let mut conn = pool.acquire().await.unwrap(); @@ -1851,7 +1743,7 @@ mod integration_tests { let record = create_raw_record("gpt-4-unauth-test", None, 1000, 500); // Run batcher - should not panic or create transactions - run_batcher_with_records(&pool, vec![record]).await; + flush_records(&pool, &[record]).await; // Verify: Analytics record was created let count = sqlx::query_scalar!("SELECT COUNT(*) FROM http_analytics WHERE model = 'gpt-4-unauth-test'") @@ -1901,7 +1793,7 @@ mod integration_tests { let record = create_raw_record("gpt-4-depletion-test", Some(api_key), 1000, 500); // Run batcher - this should trigger balance depletion notification - run_batcher_with_records(&pool, vec![record]).await; + flush_records(&pool, &[record]).await; // Should receive notification for balance depletion let notification = timeout(Duration::from_secs(2), listener.recv()) @@ -1971,15 +1863,10 @@ mod integration_tests { let mut config = crate::test::utils::create_test_config(); config.analytics.balance_notification_interval_milliseconds = 100; - // Run batcher with all 3 records - should trigger 3 depletions but only 1 notification + // Flush all 3 records - should trigger 3 depletions but only 1 notification // due to rate limiting (interval is 100ms) - let (batcher, sender) = AnalyticsBatcher::::new(pool.clone(), config, None); - for record in [record1, record2, record3] { - sender.send(record).await.unwrap(); - } - drop(sender); - let shutdown = tokio_util::sync::CancellationToken::new(); - batcher.run(shutdown).await; + let writer = AnalyticsWriter::::new(pool.clone(), config, None); + writer.flush(&[record1, record2, record3]).await; // Should receive ONLY ONE notification despite 3 balance depletions let first_notification = timeout(Duration::from_secs(2), listener.recv()) diff --git a/dwctl/src/request_logging/mod.rs b/dwctl/src/request_logging/mod.rs index 2e8578ad8..a0a688865 100644 --- a/dwctl/src/request_logging/mod.rs +++ b/dwctl/src/request_logging/mod.rs @@ -6,5 +6,4 @@ pub mod stream_usage; mod utils; pub use analytics_handler::AnalyticsHandler; -pub use batcher::AnalyticsBatcher; pub use models::{AiRequest, AiResponse, ParsedAIRequest}; diff --git a/dwctl/src/test/mod.rs b/dwctl/src/test/mod.rs index 3b0e16fcd..efce49085 100644 --- a/dwctl/src/test/mod.rs +++ b/dwctl/src/test/mod.rs @@ -893,7 +893,7 @@ async fn test_request_logging_disabled(pool: PgPool) { .limiters(limiters) .build(); let onwards_router = axum::Router::new(); // Empty onwards router for testing - let router = super::build_router(&mut app_state, onwards_router, None, None, false) + let router = super::build_router(&mut app_state, onwards_router, false, None, false) .await .expect("Failed to build router"); @@ -1232,7 +1232,7 @@ async fn test_build_router_with_metrics_disabled(pool: PgPool) { .build(); let onwards_router = axum::Router::new(); - let router = super::build_router(&mut app_state, onwards_router, None, None, false) + let router = super::build_router(&mut app_state, onwards_router, false, None, false) .await .expect("Failed to build router"); let server = axum_test::TestServer::new(router).expect("Failed to create test server"); @@ -1263,7 +1263,7 @@ async fn test_build_router_with_metrics_enabled(pool: PgPool) { .build(); let onwards_router = axum::Router::new(); - let router = super::build_router(&mut app_state, onwards_router, None, None, false) + let router = super::build_router(&mut app_state, onwards_router, false, None, false) .await .expect("Failed to build router"); let server = axum_test::TestServer::new(router).expect("Failed to create test server");