Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 16 additions & 28 deletions dwctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ fn create_cors_layer(config: &Config) -> anyhow::Result<CorsLayer> {
///
/// - `state`: Mutable application state (metrics recorder may be initialized here)
/// - `onwards_router`: Pre-configured router for AI request proxying
/// - `analytics_sender`: Optional sender for analytics records (from background services)
/// - `analytics_enabled`: Whether analytics is enabled
/// - `metrics_recorder`: Optional GenAI metrics recorder (created before background services)
///
/// # Returns
Expand All @@ -916,7 +916,7 @@ fn create_cors_layer(config: &Config) -> anyhow::Result<CorsLayer> {
pub async fn build_router(
state: &mut AppState,
onwards_router: Router,
analytics_sender: Option<request_logging::batcher::AnalyticsSender>,
analytics_enabled: bool,
metrics_recorder: Option<GenAiMetrics>,
strict_mode: bool,
) -> anyhow::Result<Router> {
Expand All @@ -929,10 +929,10 @@ pub async fn build_router(
// Both require the RequestLoggerLayer to capture request/response data, but use
// different handlers to process that data.
let request_logging_enabled = state.outlet_db.is_some() && state.config.enable_request_logging;
let analytics_enabled = state.config.enable_analytics;

let outlet_layer = if request_logging_enabled || analytics_enabled {
// Store the metrics recorder in state (created earlier in Application::new)
let metrics_recorder_for_analytics = metrics_recorder.clone();
state.metrics_recorder = metrics_recorder;

// Build handler chain based on config
Expand All @@ -950,9 +950,11 @@ pub async fn build_router(
}

// Add AnalyticsHandler for analytics/billing if enabled
// The batcher is spawned in setup_background_services and managed by BackgroundServices
if let Some(sender) = analytics_sender {
let analytics_handler = request_logging::AnalyticsHandler::new(sender, uuid::Uuid::new_v4(), state.config.clone());
// The handler owns an AnalyticsWriter that does batch enrichment + transactional writes
if analytics_enabled {
let pool = state.db.write().clone();
let analytics_handler =
request_logging::AnalyticsHandler::new(pool, uuid::Uuid::new_v4(), state.config.clone(), metrics_recorder_for_analytics);
multi_handler = multi_handler.with(analytics_handler);
}

Expand All @@ -964,7 +966,9 @@ pub async fn build_router(
capture_request_body: true,
capture_response_body: true,
path_filter: None, // No path filter needed - applied directly to ai_router
..Default::default()
// Larger buffer since outlet is now the only queue for billing data.
// Default is 4096; we use 16384 to reduce drop risk under load.
channel_capacity: 16384,
};
Some(RequestLoggerLayer::new(outlet_config, multi_handler))
}
Expand Down Expand Up @@ -1483,8 +1487,8 @@ pub struct BackgroundServices {
onwards_sender: Option<tokio::sync::watch::Sender<onwards::target::Targets>>,
#[allow(dead_code)] // Used in sync_onwards_config method
strict_mode: bool,
/// Sender for analytics records (if analytics is enabled)
analytics_sender: Option<request_logging::batcher::AnalyticsSender>,
/// Whether analytics is enabled (handler is created in build_router)
analytics_enabled: bool,
// JoinSet is cancel-safe - can be polled in select! without losing tasks
background_tasks: tokio::task::JoinSet<anyhow::Result<()>>,
// Map task IDs to names for logging
Expand Down Expand Up @@ -1616,7 +1620,6 @@ async fn setup_background_services(
outlet_pool: Option<PgPool>,
config: Config,
shutdown_token: tokio_util::sync::CancellationToken,
metrics_recorder: Option<GenAiMetrics>,
) -> anyhow::Result<BackgroundServices> {
use fusillade::manager::postgres::BatchInsertStrategy;
let drop_guard = shutdown_token.clone().drop_guard();
Expand Down Expand Up @@ -1986,20 +1989,7 @@ async fn setup_background_services(
});
}

// Start analytics batcher if enabled
let analytics_sender = if config.enable_analytics {
let (batcher, sender) = request_logging::AnalyticsBatcher::new(pool.clone(), config.clone(), metrics_recorder);

let batcher_shutdown = shutdown_token.clone();
background_tasks.spawn("analytics-batcher", async move {
batcher.run(batcher_shutdown).await;
Ok(())
});

Some(sender)
} else {
None
};
let analytics_enabled = config.enable_analytics;

let (background_tasks, task_names) = background_tasks.into_parts();

Expand All @@ -2009,7 +1999,7 @@ async fn setup_background_services(
onwards_targets: initial_targets,
onwards_sender,
strict_mode: config.onwards.strict_mode,
analytics_sender,
analytics_enabled,
background_tasks,
task_names,
shutdown_token,
Expand Down Expand Up @@ -2077,7 +2067,6 @@ impl Application {
let shutdown_token = tokio_util::sync::CancellationToken::new();

// Create GenAI metrics recorder if both metrics and analytics are enabled
// This is created here (before background services) so the analytics batcher can use it
let metrics_recorder = if config.enable_metrics && config.enable_analytics {
let gen_ai_registry = prometheus::Registry::new();
Some(GenAiMetrics::new(&gen_ai_registry).map_err(|e| anyhow::anyhow!("Failed to create GenAI metrics: {}", e))?)
Expand All @@ -2094,7 +2083,6 @@ impl Application {
outlet_pools.as_ref().map(|p| (**p).clone()),
config.clone(),
shutdown_token.clone(),
metrics_recorder.clone(),
)
.await?;

Expand Down Expand Up @@ -2138,7 +2126,7 @@ impl Application {
let router = build_router(
&mut app_state,
onwards_router,
bg_services.analytics_sender.clone(),
bg_services.analytics_enabled,
metrics_recorder,
bg_services.onwards_targets.strict_mode,
)
Expand Down
Loading
Loading