From f44f7b49a162d2600c0012fec9afd93b42da6fc8 Mon Sep 17 00:00:00 2001 From: Celestial Date: Sat, 14 Feb 2026 12:57:51 +0100 Subject: [PATCH] chore: extract phase 5 distributed slice seams --- .../ard/ARCHITECTURE_RISKS_HEXAGONAL_PLAN.md | 5 + src/adapters/cli/mapper.rs | 10 +- src/application/commands.rs | 61 +++-- src/application/distributed_run.rs | 35 +++ src/application/mod.rs | 1 + src/distributed/controller.rs | 1 + src/distributed/controller/auto/events.rs | 60 ++--- src/distributed/controller/auto/finalize.rs | 85 +------ src/distributed/controller/auto/setup.rs | 42 +--- .../controller/manual/loop_handlers.rs | 47 ++-- .../controller/manual/run_finalize.rs | 72 +----- .../controller/manual/run_lifecycle.rs | 35 +-- src/distributed/controller/manual/state.rs | 17 +- src/distributed/controller/output.rs | 218 ++++++++++++++++++ src/distributed/controller/shared.rs | 4 +- .../controller/shared/aggregation.rs | 49 +--- src/distributed/controller/shared/events.rs | 3 - src/entry/plan/build.rs | 4 +- src/entry/plan/execute.rs | 37 ++- src/entry/plan/types.rs | 5 +- 20 files changed, 394 insertions(+), 397 deletions(-) create mode 100644 src/application/distributed_run.rs create mode 100644 src/distributed/controller/output.rs diff --git a/docs/architecture/ard/ARCHITECTURE_RISKS_HEXAGONAL_PLAN.md b/docs/architecture/ard/ARCHITECTURE_RISKS_HEXAGONAL_PLAN.md index 36620ea..58583b6 100644 --- a/docs/architecture/ard/ARCHITECTURE_RISKS_HEXAGONAL_PLAN.md +++ b/docs/architecture/ard/ARCHITECTURE_RISKS_HEXAGONAL_PLAN.md @@ -316,6 +316,11 @@ Phase 4 artifacts (implemented): Exit criteria: - distributed application flow has no direct UI/charts/sinks imports. +Phase 5 artifacts (implemented): +- Distributed run command + application execution port: `src/application/commands.rs`, `src/application/distributed_run.rs`, `src/entry/plan/types.rs`, `src/entry/plan/build.rs`, `src/entry/plan/execute.rs` +- Controller output adapter/event boundary for auto/manual flows: `src/distributed/controller/output.rs`, `src/distributed/controller/auto/setup.rs`, `src/distributed/controller/auto/events.rs`, `src/distributed/controller/auto/finalize.rs`, `src/distributed/controller/manual/run_lifecycle.rs`, `src/distributed/controller/manual/loop_handlers.rs`, `src/distributed/controller/manual/run_finalize.rs`, `src/distributed/controller/manual/state.rs` +- Shared aggregation narrowed to state aggregation primitives: `src/distributed/controller/shared/aggregation.rs`, `src/distributed/controller/shared/events.rs` + ### Phase 6: Replay/compare slice extraction (1-2 weeks) 1. Split replay/compare use cases from terminal event loop logic. 2. Keep key event handling and rendering in adapters. diff --git a/src/adapters/cli/mapper.rs b/src/adapters/cli/mapper.rs index d991ece..c19fa91 100644 --- a/src/adapters/cli/mapper.rs +++ b/src/adapters/cli/mapper.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use crate::application::commands::{ - AgentRunCommand, ControllerRunCommand, LocalRunCommand, ReplayRunCommand, ServiceCommand, + DistributedRunCommand, LocalRunCommand, ReplayRunCommand, ServiceCommand, }; use crate::args::{ LoadMode as CliLoadMode, Protocol as CliProtocol, Scenario as CliScenario, TesterArgs, @@ -32,14 +32,14 @@ pub(crate) const fn to_service_command(args: TesterArgs) -> ServiceCommand { pub(crate) fn to_controller_run_command( args: TesterArgs, scenarios: Option>, -) -> ControllerRunCommand { +) -> DistributedRunCommand { let run_config = to_run_config(&args); - ControllerRunCommand::new(run_config, args, scenarios) + DistributedRunCommand::new_controller(run_config, args, scenarios) } -pub(crate) fn to_agent_run_command(args: TesterArgs) -> AgentRunCommand { +pub(crate) fn to_agent_run_command(args: TesterArgs) -> DistributedRunCommand { let run_config = to_run_config(&args); - AgentRunCommand::new(run_config, args) + DistributedRunCommand::new_agent(run_config, args) } fn to_run_config(args: &TesterArgs) -> RunConfig { diff --git a/src/application/commands.rs b/src/application/commands.rs index 698b951..fecef71 100644 --- a/src/application/commands.rs +++ b/src/application/commands.rs @@ -78,15 +78,23 @@ impl ServiceCommand { } #[derive(Debug)] -pub(crate) struct ControllerRunCommand { +pub(crate) enum DistributedRunMode { + Controller { + scenarios: Option>, + }, + Agent, +} + +#[derive(Debug)] +pub(crate) struct DistributedRunCommand { run_config: RunConfig, args: TesterArgs, - scenarios: Option>, + mode: DistributedRunMode, } -impl ControllerRunCommand { +impl DistributedRunCommand { #[must_use] - pub(crate) const fn new( + pub(crate) const fn new_controller( run_config: RunConfig, args: TesterArgs, scenarios: Option>, @@ -94,36 +102,17 @@ impl ControllerRunCommand { Self { run_config, args, - scenarios, + mode: DistributedRunMode::Controller { scenarios }, } } #[must_use] - pub(crate) const fn run_config(&self) -> &RunConfig { - &self.run_config - } - - #[must_use] - pub(crate) const fn no_color(&self) -> bool { - self.args.no_color - } - - #[must_use] - pub(crate) fn into_parts(self) -> (TesterArgs, Option>) { - (self.args, self.scenarios) - } -} - -#[derive(Debug)] -pub(crate) struct AgentRunCommand { - run_config: RunConfig, - args: TesterArgs, -} - -impl AgentRunCommand { - #[must_use] - pub(crate) const fn new(run_config: RunConfig, args: TesterArgs) -> Self { - Self { run_config, args } + pub(crate) const fn new_agent(run_config: RunConfig, args: TesterArgs) -> Self { + Self { + run_config, + args, + mode: DistributedRunMode::Agent, + } } #[must_use] @@ -137,7 +126,15 @@ impl AgentRunCommand { } #[must_use] - pub(crate) fn into_args(self) -> TesterArgs { - self.args + pub(crate) const fn mode_name(&self) -> &'static str { + match self.mode { + DistributedRunMode::Controller { .. } => "controller", + DistributedRunMode::Agent => "agent", + } + } + + #[must_use] + pub(crate) fn into_parts(self) -> (TesterArgs, DistributedRunMode) { + (self.args, self.mode) } } diff --git a/src/application/distributed_run.rs b/src/application/distributed_run.rs new file mode 100644 index 0000000..f469d26 --- /dev/null +++ b/src/application/distributed_run.rs @@ -0,0 +1,35 @@ +use std::collections::BTreeMap; + +use async_trait::async_trait; + +use crate::application::commands::{DistributedRunCommand, DistributedRunMode}; +use crate::args::TesterArgs; +use crate::config::types::ScenarioConfig; +use crate::error::AppResult; + +#[async_trait] +pub(crate) trait DistributedRunPort { + async fn run_controller( + &self, + args: &TesterArgs, + scenarios: Option>, + ) -> AppResult<()>; + + async fn run_agent(&self, args: TesterArgs) -> AppResult<()>; +} + +pub(crate) async fn execute( + command: DistributedRunCommand, + distributed_port: &TPort, +) -> AppResult<()> +where + TPort: DistributedRunPort + Sync, +{ + let (args, mode) = command.into_parts(); + match mode { + DistributedRunMode::Controller { scenarios } => { + distributed_port.run_controller(&args, scenarios).await + } + DistributedRunMode::Agent => distributed_port.run_agent(args).await, + } +} diff --git a/src/application/mod.rs b/src/application/mod.rs index 2e3292c..20f2795 100644 --- a/src/application/mod.rs +++ b/src/application/mod.rs @@ -1,2 +1,3 @@ pub(crate) mod commands; +pub(crate) mod distributed_run; pub(crate) mod local_run; diff --git a/src/distributed/controller.rs b/src/distributed/controller.rs index 9220624..15d978a 100644 --- a/src/distributed/controller.rs +++ b/src/distributed/controller.rs @@ -4,6 +4,7 @@ mod control; mod http; mod load; mod manual; +mod output; mod runner; mod shared; diff --git a/src/distributed/controller/auto/events.rs b/src/distributed/controller/auto/events.rs index 35c7cc3..743fd50 100644 --- a/src/distributed/controller/auto/events.rs +++ b/src/distributed/controller/auto/events.rs @@ -1,23 +1,22 @@ -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, HashSet}; use tokio::sync::mpsc; use tokio::time::MissedTickBehavior; use crate::args::TesterArgs; -use crate::metrics::AggregatedMetricSample; +use super::super::output::{DistributedOutputState, OutputEvent, handle_output_event}; use super::super::shared::{ - AgentEvent, AgentSnapshot, event_agent_id, handle_agent_event, record_aggregated_sample, - resolve_heartbeat_check_interval, resolve_sink_interval, update_ui, write_streaming_sinks, + AgentEvent, AgentSnapshot, event_agent_id, handle_agent_event, + resolve_heartbeat_check_interval, resolve_sink_interval, }; use super::setup::AutoRunSetup; use crate::distributed::protocol::{WireMessage, read_message}; pub(super) struct AutoRunOutcome { pub(super) run_id: String, - pub(super) shutdown_tx: Option, + pub(super) output_state: DistributedOutputState, pub(super) agent_states: HashMap, - pub(super) aggregated_samples: Vec, pub(super) runtime_errors: Vec, pub(super) channel_closed: bool, pub(super) pending_agents: HashSet, @@ -30,10 +29,7 @@ pub(super) async fn collect_auto_run_events( let AutoRunSetup { run_id, agents, - ui_tx, - shutdown_tx, - charts_enabled, - sink_updates_enabled, + mut output_state, heartbeat_timeout, report_deadline, } = setup; @@ -42,12 +38,8 @@ pub(super) async fn collect_auto_run_events( let mut agent_states: HashMap = HashMap::new(); let mut pending_agents: HashSet = agents.iter().map(|agent| agent.agent_id.clone()).collect(); - let mut ui_latency_window: VecDeque<(u64, u64)> = VecDeque::new(); - let mut ui_rps_window: VecDeque<(u64, u64)> = VecDeque::new(); - let mut aggregated_samples: Vec = Vec::new(); let mut sink_interval = tokio::time::interval(resolve_sink_interval(args.sinks.as_ref())); sink_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - let mut sink_dirty = false; let mut channel_closed = false; let mut heartbeat_interval = tokio::time::interval(resolve_heartbeat_check_interval(heartbeat_timeout)); @@ -186,36 +178,32 @@ pub(super) async fn collect_auto_run_events( &mut pending_agents, &mut agent_states, &mut runtime_errors, - &mut sink_dirty, ); if is_disconnected { disconnected_agents.insert(agent_id.clone()); last_seen.remove(agent_id.as_str()); } - if charts_enabled { - record_aggregated_sample(&mut aggregated_samples, &agent_states); - } - if let Some(ui_tx) = ui_tx.as_ref() { - update_ui( - ui_tx, - args, - &agent_states, - &mut ui_latency_window, - &mut ui_rps_window, - ); - } + handle_output_event( + args, + &mut output_state, + &agent_states, + &mut runtime_errors, + OutputEvent::AgentStateUpdated, + ) + .await; if pending_agents.is_empty() { break; } } _ = sink_interval.tick() => { - if sink_updates_enabled && sink_dirty { - if let Err(err) = write_streaming_sinks(args, &agent_states).await { - runtime_errors.push(err.to_string()); - } else { - sink_dirty = false; - } - } + handle_output_event( + args, + &mut output_state, + &agent_states, + &mut runtime_errors, + OutputEvent::SinkTick, + ) + .await; } _ = heartbeat_interval.tick() => { let now = tokio::time::Instant::now(); @@ -237,7 +225,6 @@ pub(super) async fn collect_auto_run_events( &mut pending_agents, &mut agent_states, &mut runtime_errors, - &mut sink_dirty, ); last_seen.remove(&agent_id); } @@ -251,9 +238,8 @@ pub(super) async fn collect_auto_run_events( AutoRunOutcome { run_id, - shutdown_tx, + output_state, agent_states, - aggregated_samples, runtime_errors, channel_closed, pending_agents, diff --git a/src/distributed/controller/auto/finalize.rs b/src/distributed/controller/auto/finalize.rs index 94aa345..1fe6d0f 100644 --- a/src/distributed/controller/auto/finalize.rs +++ b/src/distributed/controller/auto/finalize.rs @@ -1,40 +1,23 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use crate::args::TesterArgs; use crate::error::{AppError, AppResult, DistributedError}; -use crate::metrics::AggregatedMetricSample; -use crate::sinks::config::SinkStats; -use crate::sinks::writers::write_sinks; -use super::super::shared::{AgentSnapshot, aggregate_snapshots, write_aggregated_charts}; +use super::super::output::finalize_output; use super::events::AutoRunOutcome; -use crate::distributed::summary::{ - Percentiles, SummaryPercentiles, compute_summary_stats, print_summary, -}; pub(super) async fn finalize_auto_run(args: &TesterArgs, outcome: AutoRunOutcome) -> AppResult<()> { let AutoRunOutcome { run_id: _run_id, - shutdown_tx, + mut output_state, agent_states, - aggregated_samples, mut runtime_errors, channel_closed, pending_agents, } = outcome; append_channel_closure_errors(channel_closed, &pending_agents, &mut runtime_errors); - append_summary_errors( - args, - &agent_states, - &aggregated_samples, - &mut runtime_errors, - ) - .await; - - if let Some(shutdown_tx) = shutdown_tx.as_ref() { - drop(shutdown_tx.send(())); - } + finalize_output(args, &mut output_state, &agent_states, &mut runtime_errors).await; if !runtime_errors.is_empty() { eprintln!("Runtime errors:"); @@ -63,63 +46,3 @@ fn append_channel_closure_errors( } } } - -async fn append_summary_errors( - args: &TesterArgs, - agent_states: &HashMap, - aggregated_samples: &[AggregatedMetricSample], - runtime_errors: &mut Vec, -) { - if agent_states.is_empty() { - runtime_errors.push("No successful agent reports received.".to_owned()); - return; - } - - let Ok((summary, merged_hist, success_hist)) = aggregate_snapshots(agent_states) else { - runtime_errors.push("Failed to aggregate agent summaries.".to_owned()); - return; - }; - let (p50, p90, p99) = merged_hist.percentiles(); - let (success_p50, success_p90, success_p99) = success_hist.percentiles(); - let stats = compute_summary_stats(&summary); - let mut charts_output_path: Option = None; - if !args.no_charts { - match write_aggregated_charts(aggregated_samples, args).await { - Ok(path) => charts_output_path = path, - Err(err) => runtime_errors.push(err.to_string()), - } - } - - let percentiles = SummaryPercentiles { - all: Percentiles { p50, p90, p99 }, - ok: Percentiles { - p50: success_p50, - p90: success_p90, - p99: success_p99, - }, - }; - - print_summary(&summary, percentiles, args, charts_output_path.as_deref()); - - if let Some(sinks) = args.sinks.as_ref() { - let sink_stats = SinkStats { - duration: summary.duration, - total_requests: summary.total_requests, - successful_requests: summary.successful_requests, - error_requests: summary.error_requests, - timeout_requests: summary.timeout_requests, - min_latency_ms: summary.min_latency_ms, - max_latency_ms: summary.max_latency_ms, - avg_latency_ms: summary.avg_latency_ms, - p50_latency_ms: p50, - p90_latency_ms: p90, - p99_latency_ms: p99, - success_rate_x100: stats.success_rate_x100, - avg_rps_x100: stats.avg_rps_x100, - avg_rpm_x100: stats.avg_rpm_x100, - }; - if let Err(err) = write_sinks(sinks, &sink_stats).await { - runtime_errors.push(format!("Sinks: {}", err)); - } - } -} diff --git a/src/distributed/controller/auto/setup.rs b/src/distributed/controller/auto/setup.rs index 12fffab..097eed4 100644 --- a/src/distributed/controller/auto/setup.rs +++ b/src/distributed/controller/auto/setup.rs @@ -1,16 +1,14 @@ -use std::io::IsTerminal; use std::time::Duration; -use tokio::sync::watch; use tokio::time::Instant; use tracing::{debug, info}; use crate::args::TesterArgs; use crate::error::{AppError, AppResult, DistributedError}; -use crate::ui::{model::UiData, render::setup_render_ui}; use super::super::agent::{AgentConn, accept_agent}; use super::super::load::apply_load_share; +use super::super::output::{DistributedOutputState, setup_output_state}; use super::super::shared::{DEFAULT_START_AFTER_MS, REPORT_GRACE_SECS, resolve_agent_wait_timeout}; use crate::distributed::protocol::{ConfigMessage, StartMessage, WireMessage, send_message}; use crate::distributed::utils::build_run_id; @@ -19,10 +17,7 @@ use crate::distributed::wire::build_wire_args; pub(super) struct AutoRunSetup { pub(super) run_id: String, pub(super) agents: Vec, - pub(super) ui_tx: Option>, - pub(super) shutdown_tx: Option, - pub(super) charts_enabled: bool, - pub(super) sink_updates_enabled: bool, + pub(super) output_state: DistributedOutputState, pub(super) heartbeat_timeout: Duration, pub(super) report_deadline: Instant, } @@ -72,9 +67,7 @@ pub(super) async fn prepare_auto_run(args: &TesterArgs) -> AppResult AppResult ( - Option>, - Option, -) { - let ui_enabled = - args.distributed_stream_summaries && !args.no_ui && std::io::stdout().is_terminal(); - if !ui_enabled { - return (None, None); - } - - let target_duration = Duration::from_secs(args.target_duration.get()); - let (shutdown_tx, _) = crate::system::shutdown_handlers::shutdown_channel(); - let (ui_tx, _) = watch::channel(UiData { - target_duration, - ui_window_ms: args.ui_window_ms.get(), - no_color: args.no_color, - ..UiData::default() - }); - let _ui_handle = setup_render_ui(&shutdown_tx, &ui_tx); - (Some(ui_tx), Some(shutdown_tx)) -} - fn compute_weights(agents: &[AgentConn]) -> Vec { agents.iter().map(|agent| agent.weight).collect() } diff --git a/src/distributed/controller/manual/loop_handlers.rs b/src/distributed/controller/manual/loop_handlers.rs index 8a8419c..ead884a 100644 --- a/src/distributed/controller/manual/loop_handlers.rs +++ b/src/distributed/controller/manual/loop_handlers.rs @@ -9,10 +9,8 @@ use crate::args::TesterArgs; use crate::error::{AppError, AppResult, DistributedError}; use super::super::control::{ControlCommand, ControlError, ControlResponse}; -use super::super::shared::{ - AgentEvent, event_agent_id, handle_agent_event, record_aggregated_sample, update_ui, - write_streaming_sinks, -}; +use super::super::output::{OutputEvent, handle_output_event}; +use super::super::shared::{AgentEvent, event_agent_id, handle_agent_event}; use super::run_finalize::finalize_manual_run; use super::run_lifecycle::request_stop; use super::state::{ManualAgent, ManualRunState}; @@ -63,20 +61,21 @@ pub(super) async fn handle_active_run( let Some(event) = event else { return Err(AppError::distributed(DistributedError::AgentEventChannelClosed)); }; - on_event(args, state, event, disconnected_agents, last_seen, agent_pool); + on_event(args, state, event, disconnected_agents, last_seen, agent_pool).await; if state.pending_agents.is_empty() { finish_run = true; finish_error = finalize_manual_run(args, state).await.err(); } } _ = state.sink_interval.tick() => { - if state.sink_updates_enabled && state.sink_dirty { - if let Err(err) = write_streaming_sinks(args, &state.agent_states).await { - state.runtime_errors.push(err.to_string()); - } else { - state.sink_dirty = false; - } - } + handle_output_event( + args, + &mut state.output_state, + &state.agent_states, + &mut state.runtime_errors, + OutputEvent::SinkTick, + ) + .await; } _ = heartbeat_interval.tick() => { process_heartbeat_timeouts(state, heartbeat_timeout, last_seen, disconnected_agents, agent_pool); @@ -102,7 +101,7 @@ pub(super) async fn handle_active_run( Ok(finish_run) } -fn on_event( +async fn on_event( args: &TesterArgs, state: &mut ManualRunState, event: AgentEvent, @@ -129,7 +128,6 @@ fn on_event( &mut state.pending_agents, &mut state.agent_states, &mut state.runtime_errors, - &mut state.sink_dirty, ); if is_disconnected { disconnected_agents.insert(agent_id.clone()); @@ -144,18 +142,14 @@ fn on_event( next }); } - if state.charts_enabled { - record_aggregated_sample(&mut state.aggregated_samples, &state.agent_states); - } - if let Some(ui_tx) = state.ui_tx.as_ref() { - update_ui( - ui_tx, - args, - &state.agent_states, - &mut state.ui_latency_window, - &mut state.ui_rps_window, - ); - } + handle_output_event( + args, + &mut state.output_state, + &state.agent_states, + &mut state.runtime_errors, + OutputEvent::AgentStateUpdated, + ) + .await; } fn process_heartbeat_timeouts( @@ -192,7 +186,6 @@ fn process_heartbeat_timeouts( &mut state.pending_agents, &mut state.agent_states, &mut state.runtime_errors, - &mut state.sink_dirty, ); last_seen.rcu(|current| { let mut next = current.clone(); diff --git a/src/distributed/controller/manual/run_finalize.rs b/src/distributed/controller/manual/run_finalize.rs index 8372e49..176587a 100644 --- a/src/distributed/controller/manual/run_finalize.rs +++ b/src/distributed/controller/manual/run_finalize.rs @@ -1,76 +1,20 @@ use crate::args::TesterArgs; use crate::error::{AppError, AppResult, DistributedError}; -use crate::sinks::config::SinkStats; -use super::super::shared::{aggregate_snapshots, write_aggregated_charts}; +use super::super::output::finalize_output; use super::state::ManualRunState; -use crate::distributed::summary::{ - Percentiles, SummaryPercentiles, compute_summary_stats, print_summary, -}; pub(super) async fn finalize_manual_run( args: &TesterArgs, state: &mut ManualRunState, ) -> AppResult<()> { - if state.agent_states.is_empty() { - state - .runtime_errors - .push("No successful agent reports received.".to_owned()); - } else if let Ok((summary, merged_hist, success_hist)) = - aggregate_snapshots(&state.agent_states) - { - let (p50, p90, p99) = merged_hist.percentiles(); - let (success_p50, success_p90, success_p99) = success_hist.percentiles(); - let stats = compute_summary_stats(&summary); - let mut charts_output_path: Option = None; - if state.charts_enabled { - match write_aggregated_charts(&state.aggregated_samples, args).await { - Ok(path) => charts_output_path = path, - Err(err) => state.runtime_errors.push(err.to_string()), - } - } - - let percentiles = SummaryPercentiles { - all: Percentiles { p50, p90, p99 }, - ok: Percentiles { - p50: success_p50, - p90: success_p90, - p99: success_p99, - }, - }; - - print_summary(&summary, percentiles, args, charts_output_path.as_deref()); - - if let Some(sinks) = args.sinks.as_ref() { - let sink_stats = SinkStats { - duration: summary.duration, - total_requests: summary.total_requests, - successful_requests: summary.successful_requests, - error_requests: summary.error_requests, - timeout_requests: summary.timeout_requests, - min_latency_ms: summary.min_latency_ms, - max_latency_ms: summary.max_latency_ms, - avg_latency_ms: summary.avg_latency_ms, - p50_latency_ms: p50, - p90_latency_ms: p90, - p99_latency_ms: p99, - success_rate_x100: stats.success_rate_x100, - avg_rps_x100: stats.avg_rps_x100, - avg_rpm_x100: stats.avg_rpm_x100, - }; - if let Err(err) = crate::sinks::writers::write_sinks(sinks, &sink_stats).await { - state.runtime_errors.push(format!("Sinks: {}", err)); - } - } - } else { - state - .runtime_errors - .push("Failed to aggregate agent summaries.".to_owned()); - } - - if let Some(shutdown_tx) = state.shutdown_tx.as_ref() { - drop(shutdown_tx.send(())); - } + finalize_output( + args, + &mut state.output_state, + &state.agent_states, + &mut state.runtime_errors, + ) + .await; if !state.runtime_errors.is_empty() { eprintln!("Runtime errors:"); diff --git a/src/distributed/controller/manual/run_lifecycle.rs b/src/distributed/controller/manual/run_lifecycle.rs index 1c1d698..c4e5654 100644 --- a/src/distributed/controller/manual/run_lifecycle.rs +++ b/src/distributed/controller/manual/run_lifecycle.rs @@ -1,17 +1,15 @@ -use std::collections::{HashMap, HashSet, VecDeque}; -use std::io::IsTerminal; +use std::collections::{HashMap, HashSet}; use std::time::Duration; use arcshift::ArcShift; -use tokio::sync::watch; use tokio::time::{Instant, MissedTickBehavior}; use crate::args::{Scenario, TesterArgs}; use crate::config::apply::scenario::{ScenarioDefaults, parse_scenario}; -use crate::ui::{model::UiData, render::setup_render_ui}; use super::super::control::{ControlError, ControlStartRequest}; use super::super::load::apply_load_share; +use super::super::output::setup_output_state; use super::super::shared::{DEFAULT_START_AFTER_MS, REPORT_GRACE_SECS, resolve_sink_interval}; use super::state::{ManualAgent, ManualRunState, ScenarioState}; use crate::distributed::protocol::{ConfigMessage, StartMessage, StopMessage, WireMessage}; @@ -109,25 +107,7 @@ pub(super) async fn start_manual_run( )); } - let ui_enabled = - args.distributed_stream_summaries && !args.no_ui && std::io::stdout().is_terminal(); - let (ui_tx, shutdown_tx, _ui_handle) = if ui_enabled { - let target_duration = Duration::from_secs(args.target_duration.get()); - let (shutdown_tx, _) = crate::system::shutdown_handlers::shutdown_channel(); - let (ui_tx, _) = watch::channel(UiData { - target_duration, - ui_window_ms: args.ui_window_ms.get(), - no_color: args.no_color, - ..UiData::default() - }); - let handle = setup_render_ui(&shutdown_tx, &ui_tx); - (Some(ui_tx), Some(shutdown_tx), Some(handle)) - } else { - (None, None, None) - }; - - let sink_updates_enabled = args.distributed_stream_summaries && args.sinks.is_some(); - let charts_enabled = !args.no_charts && args.distributed_stream_summaries; + let output_state = setup_output_state(args); let mut sink_interval = tokio::time::interval(resolve_sink_interval(args.sinks.as_ref())); sink_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -142,17 +122,10 @@ pub(super) async fn start_manual_run( run_id, pending_agents, agent_states: HashMap::new(), - aggregated_samples: Vec::new(), runtime_errors: Vec::new(), - sink_dirty: false, - sink_updates_enabled, sink_interval, - ui_tx, - shutdown_tx, - ui_latency_window: VecDeque::new(), - ui_rps_window: VecDeque::new(), + output_state, deadline: report_deadline, - charts_enabled, }) } diff --git a/src/distributed/controller/manual/state.rs b/src/distributed/controller/manual/state.rs index 6832922..08be91c 100644 --- a/src/distributed/controller/manual/state.rs +++ b/src/distributed/controller/manual/state.rs @@ -1,17 +1,15 @@ -use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::time::Duration; use arcshift::ArcShift; -use tokio::sync::{mpsc, watch}; +use tokio::sync::mpsc; use tokio::time::Instant; use crate::args::{Scenario, TesterArgs}; use crate::config::types::ScenarioConfig; -use crate::metrics::AggregatedMetricSample; -use crate::shutdown::ShutdownSender; -use crate::ui::model::UiData; use super::super::control::{ControlError, ControlStartRequest}; +use super::super::output::DistributedOutputState; use super::super::shared::AgentEvent; use crate::distributed::protocol::WireMessage; @@ -26,17 +24,10 @@ pub(super) struct ManualRunState { pub(super) run_id: String, pub(super) pending_agents: HashSet, pub(super) agent_states: HashMap, - pub(super) aggregated_samples: Vec, pub(super) runtime_errors: Vec, - pub(super) sink_dirty: bool, - pub(super) sink_updates_enabled: bool, pub(super) sink_interval: tokio::time::Interval, - pub(super) ui_tx: Option>, - pub(super) shutdown_tx: Option, - pub(super) ui_latency_window: VecDeque<(u64, u64)>, - pub(super) ui_rps_window: VecDeque<(u64, u64)>, + pub(super) output_state: DistributedOutputState, pub(super) deadline: Instant, - pub(super) charts_enabled: bool, } pub(super) struct ScenarioState { diff --git a/src/distributed/controller/output.rs b/src/distributed/controller/output.rs new file mode 100644 index 0000000..7303532 --- /dev/null +++ b/src/distributed/controller/output.rs @@ -0,0 +1,218 @@ +use std::collections::{HashMap, VecDeque}; +use std::io::IsTerminal; +use std::time::Duration; + +use tokio::sync::watch; + +use crate::args::TesterArgs; +use crate::charts; +use crate::distributed::summary::{ + Percentiles, SummaryPercentiles, compute_summary_stats, print_summary, +}; +use crate::error::AppResult; +use crate::metrics::AggregatedMetricSample; +use crate::shutdown::ShutdownSender; +use crate::sinks::config::SinkStats; +use crate::sinks::writers::write_sinks; +use crate::ui::{model::UiData, render::setup_render_ui}; + +use super::shared::{AgentSnapshot, aggregate_snapshots, record_aggregated_sample, update_ui}; + +pub(in crate::distributed::controller) enum OutputEvent { + AgentStateUpdated, + SinkTick, +} + +pub(in crate::distributed::controller) struct DistributedOutputState { + charts_enabled: bool, + sink_updates_enabled: bool, + sink_dirty: bool, + aggregated_samples: Vec, + ui_tx: Option>, + shutdown_tx: Option, + ui_latency_window: VecDeque<(u64, u64)>, + ui_rps_window: VecDeque<(u64, u64)>, +} + +pub(in crate::distributed::controller) fn setup_output_state( + args: &TesterArgs, +) -> DistributedOutputState { + let streaming_enabled = args.distributed_stream_summaries; + let ui_enabled = streaming_enabled && !args.no_ui && std::io::stdout().is_terminal(); + let (ui_tx, shutdown_tx) = if ui_enabled { + let target_duration = Duration::from_secs(args.target_duration.get()); + let (shutdown_tx, _) = crate::system::shutdown_handlers::shutdown_channel(); + let (ui_tx, _) = watch::channel(UiData { + target_duration, + ui_window_ms: args.ui_window_ms.get(), + no_color: args.no_color, + ..UiData::default() + }); + let _ui_handle = setup_render_ui(&shutdown_tx, &ui_tx); + (Some(ui_tx), Some(shutdown_tx)) + } else { + (None, None) + }; + + DistributedOutputState { + charts_enabled: !args.no_charts && streaming_enabled, + sink_updates_enabled: streaming_enabled && args.sinks.is_some(), + sink_dirty: false, + aggregated_samples: Vec::new(), + ui_tx, + shutdown_tx, + ui_latency_window: VecDeque::new(), + ui_rps_window: VecDeque::new(), + } +} + +pub(in crate::distributed::controller) async fn handle_output_event( + args: &TesterArgs, + state: &mut DistributedOutputState, + agent_states: &HashMap, + runtime_errors: &mut Vec, + event: OutputEvent, +) { + match event { + OutputEvent::AgentStateUpdated => { + if state.charts_enabled { + record_aggregated_sample(&mut state.aggregated_samples, agent_states); + } + if let Some(ui_tx) = state.ui_tx.as_ref() { + update_ui( + ui_tx, + args, + agent_states, + &mut state.ui_latency_window, + &mut state.ui_rps_window, + ); + } + if state.sink_updates_enabled { + state.sink_dirty = true; + } + } + OutputEvent::SinkTick => { + if state.sink_updates_enabled && state.sink_dirty { + if let Err(err) = write_streaming_sinks(args, agent_states).await { + runtime_errors.push(err.to_string()); + } else { + state.sink_dirty = false; + } + } + } + } +} + +pub(in crate::distributed::controller) async fn finalize_output( + args: &TesterArgs, + state: &mut DistributedOutputState, + agent_states: &HashMap, + runtime_errors: &mut Vec, +) { + if agent_states.is_empty() { + runtime_errors.push("No successful agent reports received.".to_owned()); + send_shutdown_signal(state); + return; + } + + let Ok((summary, merged_hist, success_hist)) = aggregate_snapshots(agent_states) else { + runtime_errors.push("Failed to aggregate agent summaries.".to_owned()); + send_shutdown_signal(state); + return; + }; + + let (p50, p90, p99) = merged_hist.percentiles(); + let (success_p50, success_p90, success_p99) = success_hist.percentiles(); + let stats = compute_summary_stats(&summary); + let mut charts_output_path: Option = None; + if state.charts_enabled { + match write_aggregated_charts(&state.aggregated_samples, args).await { + Ok(path) => charts_output_path = path, + Err(err) => runtime_errors.push(err.to_string()), + } + } + + let percentiles = SummaryPercentiles { + all: Percentiles { p50, p90, p99 }, + ok: Percentiles { + p50: success_p50, + p90: success_p90, + p99: success_p99, + }, + }; + + print_summary(&summary, percentiles, args, charts_output_path.as_deref()); + + if let Some(sinks) = args.sinks.as_ref() { + let sink_stats = SinkStats { + duration: summary.duration, + total_requests: summary.total_requests, + successful_requests: summary.successful_requests, + error_requests: summary.error_requests, + timeout_requests: summary.timeout_requests, + min_latency_ms: summary.min_latency_ms, + max_latency_ms: summary.max_latency_ms, + avg_latency_ms: summary.avg_latency_ms, + p50_latency_ms: p50, + p90_latency_ms: p90, + p99_latency_ms: p99, + success_rate_x100: stats.success_rate_x100, + avg_rps_x100: stats.avg_rps_x100, + avg_rpm_x100: stats.avg_rpm_x100, + }; + if let Err(err) = write_sinks(sinks, &sink_stats).await { + runtime_errors.push(format!("Sinks: {}", err)); + } + } + + send_shutdown_signal(state); +} + +fn send_shutdown_signal(state: &DistributedOutputState) { + if let Some(shutdown_tx) = state.shutdown_tx.as_ref() { + drop(shutdown_tx.send(())); + } +} + +async fn write_streaming_sinks( + args: &TesterArgs, + agent_states: &HashMap, +) -> AppResult<()> { + if agent_states.is_empty() { + return Ok(()); + } + let (summary, merged_hist, _success_hist) = aggregate_snapshots(agent_states)?; + let (p50, p90, p99) = merged_hist.percentiles(); + let stats = compute_summary_stats(&summary); + + if let Some(sinks) = args.sinks.as_ref() { + let sink_stats = SinkStats { + duration: summary.duration, + total_requests: summary.total_requests, + successful_requests: summary.successful_requests, + error_requests: summary.error_requests, + timeout_requests: summary.timeout_requests, + min_latency_ms: summary.min_latency_ms, + max_latency_ms: summary.max_latency_ms, + avg_latency_ms: summary.avg_latency_ms, + p50_latency_ms: p50, + p90_latency_ms: p90, + p99_latency_ms: p99, + success_rate_x100: stats.success_rate_x100, + avg_rps_x100: stats.avg_rps_x100, + avg_rpm_x100: stats.avg_rpm_x100, + }; + write_sinks(sinks, &sink_stats).await?; + } + Ok(()) +} + +async fn write_aggregated_charts( + samples: &[AggregatedMetricSample], + args: &TesterArgs, +) -> AppResult> { + if args.no_charts || samples.len() < 2 { + return Ok(None); + } + charts::plot_aggregated_metrics(samples, args).await +} diff --git a/src/distributed/controller/shared.rs b/src/distributed/controller/shared.rs index 46bb3d1..62e0f3c 100644 --- a/src/distributed/controller/shared.rs +++ b/src/distributed/controller/shared.rs @@ -3,9 +3,7 @@ mod events; mod timing; mod ui; -pub(super) use aggregation::{ - aggregate_snapshots, record_aggregated_sample, write_aggregated_charts, write_streaming_sinks, -}; +pub(super) use aggregation::{aggregate_snapshots, record_aggregated_sample}; pub(super) use events::{AgentEvent, AgentSnapshot, event_agent_id, handle_agent_event}; pub(super) use timing::{ DEFAULT_START_AFTER_MS, REPORT_GRACE_SECS, resolve_agent_wait_timeout, diff --git a/src/distributed/controller/shared/aggregation.rs b/src/distributed/controller/shared/aggregation.rs index d3ea470..3a425a4 100644 --- a/src/distributed/controller/shared/aggregation.rs +++ b/src/distributed/controller/shared/aggregation.rs @@ -1,48 +1,11 @@ use std::collections::HashMap; -use crate::args::TesterArgs; -use crate::charts; use crate::error::AppResult; use crate::metrics::{AggregatedMetricSample, LatencyHistogram}; -use crate::sinks::config::SinkStats; -use crate::sinks::writers::write_sinks; -use super::super::super::summary::{compute_summary_stats, merge_summaries}; +use super::super::super::summary::merge_summaries; use super::events::AgentSnapshot; -pub(in crate::distributed::controller) async fn write_streaming_sinks( - args: &TesterArgs, - agent_states: &HashMap, -) -> AppResult<()> { - if agent_states.is_empty() { - return Ok(()); - } - let (summary, merged_hist, _success_hist) = aggregate_snapshots(agent_states)?; - let (p50, p90, p99) = merged_hist.percentiles(); - let stats = compute_summary_stats(&summary); - - if let Some(sinks) = args.sinks.as_ref() { - let sink_stats = SinkStats { - duration: summary.duration, - total_requests: summary.total_requests, - successful_requests: summary.successful_requests, - error_requests: summary.error_requests, - timeout_requests: summary.timeout_requests, - min_latency_ms: summary.min_latency_ms, - max_latency_ms: summary.max_latency_ms, - avg_latency_ms: summary.avg_latency_ms, - p50_latency_ms: p50, - p90_latency_ms: p90, - p99_latency_ms: p99, - success_rate_x100: stats.success_rate_x100, - avg_rps_x100: stats.avg_rps_x100, - avg_rpm_x100: stats.avg_rpm_x100, - }; - write_sinks(sinks, &sink_stats).await?; - } - Ok(()) -} - pub(in crate::distributed::controller) fn aggregate_snapshots( agent_states: &HashMap, ) -> AppResult<( @@ -100,13 +63,3 @@ pub(in crate::distributed::controller) fn record_aggregated_sample( samples.push(sample); } - -pub(in crate::distributed::controller) async fn write_aggregated_charts( - samples: &[AggregatedMetricSample], - args: &TesterArgs, -) -> AppResult> { - if args.no_charts || samples.len() < 2 { - return Ok(None); - } - charts::plot_aggregated_metrics(samples, args).await -} diff --git a/src/distributed/controller/shared/events.rs b/src/distributed/controller/shared/events.rs index feee787..ebae01b 100644 --- a/src/distributed/controller/shared/events.rs +++ b/src/distributed/controller/shared/events.rs @@ -40,7 +40,6 @@ pub(in crate::distributed::controller) fn handle_agent_event( pending_agents: &mut HashSet, agent_states: &mut HashMap, runtime_errors: &mut Vec, - sink_dirty: &mut bool, ) { match event { AgentEvent::Stream { agent_id, message } => { @@ -91,7 +90,6 @@ pub(in crate::distributed::controller) fn handle_agent_event( success_histogram, }, ); - *sink_dirty = true; } Err(err) => runtime_errors.push(format!( "Agent {} histogram decode failed: {}", @@ -154,7 +152,6 @@ pub(in crate::distributed::controller) fn handle_agent_event( success_histogram, }, ); - *sink_dirty = true; } Err(err) => runtime_errors.push(format!( "Agent {} histogram decode failed: {}", diff --git a/src/entry/plan/build.rs b/src/entry/plan/build.rs index 6c208e2..0af7f39 100644 --- a/src/entry/plan/build.rs +++ b/src/entry/plan/build.rs @@ -108,7 +108,7 @@ pub(crate) fn build_plan(mut args: TesterArgs, matches: &ArgMatches) -> AppResul } if args.controller_listen.is_some() { - return Ok(RunPlan::Controller(to_controller_run_command( + return Ok(RunPlan::Distributed(to_controller_run_command( args, scenario_registry, ))); @@ -124,7 +124,7 @@ pub(crate) fn build_plan(mut args: TesterArgs, matches: &ArgMatches) -> AppResul } if args.agent_join.is_some() { - return Ok(RunPlan::Agent(to_agent_run_command(args))); + return Ok(RunPlan::Distributed(to_agent_run_command(args))); } Ok(RunPlan::Local(to_local_run_command(args)?)) diff --git a/src/entry/plan/execute.rs b/src/entry/plan/execute.rs index a3deeaf..43e371d 100644 --- a/src/entry/plan/execute.rs +++ b/src/entry/plan/execute.rs @@ -1,7 +1,13 @@ +use std::collections::BTreeMap; + +use async_trait::async_trait; use rand::distributions::Distribution; use rand::thread_rng; use crate::app::{self, run_cleanup, run_compare, run_local, run_replay}; +use crate::application::distributed_run::{self, DistributedRunPort}; +use crate::args::TesterArgs; +use crate::config::types::ScenarioConfig; use crate::domain::run::RunConfig; use crate::error::{AppError, AppResult, ValidationError}; use crate::system::banner; @@ -23,18 +29,12 @@ pub(crate) async fn execute_plan(plan: RunPlan) -> AppResult<()> { crate::service::handle_service_action(command.as_args())?; Ok(()) } - RunPlan::Controller(command) => { - log_run_command("controller", command.run_config()); + RunPlan::Distributed(command) => { + log_run_command(command.mode_name(), command.run_config()); banner::print_cli_banner(command.no_color()); println!(); - let (args, scenarios) = command.into_parts(); - crate::distributed::run_controller(&args, scenarios).await - } - RunPlan::Agent(command) => { - log_run_command("agent", command.run_config()); - banner::print_cli_banner(command.no_color()); - println!(); - crate::distributed::run_agent(command.into_args()).await + let distributed_port = RuntimeDistributedPort; + distributed_run::execute(command, &distributed_port).await } RunPlan::Local(command) => { log_run_command("local", command.run_config()); @@ -54,6 +54,23 @@ pub(crate) async fn execute_plan(plan: RunPlan) -> AppResult<()> { } } +struct RuntimeDistributedPort; + +#[async_trait] +impl DistributedRunPort for RuntimeDistributedPort { + async fn run_controller( + &self, + args: &TesterArgs, + scenarios: Option>, + ) -> AppResult<()> { + crate::distributed::run_controller(args, scenarios).await + } + + async fn run_agent(&self, args: TesterArgs) -> AppResult<()> { + crate::distributed::run_agent(args).await + } +} + fn log_run_command(kind: &str, run_config: &RunConfig) { let target = run_config.target_url.as_deref().unwrap_or(""); let scenario_steps = run_config.scenario_step_count(); diff --git a/src/entry/plan/types.rs b/src/entry/plan/types.rs index fe0baa0..90081b5 100644 --- a/src/entry/plan/types.rs +++ b/src/entry/plan/types.rs @@ -1,5 +1,5 @@ use crate::application::commands::{ - AgentRunCommand, ControllerRunCommand, LocalRunCommand, ReplayRunCommand, ServiceCommand, + DistributedRunCommand, LocalRunCommand, ReplayRunCommand, ServiceCommand, }; use crate::args::{CleanupArgs, CompareArgs}; @@ -15,7 +15,6 @@ pub(in crate::entry) enum RunPlan { Replay(ReplayRunCommand), DumpUrls(DumpUrlsPlan), Service(ServiceCommand), - Controller(ControllerRunCommand), - Agent(AgentRunCommand), + Distributed(DistributedRunCommand), Local(LocalRunCommand), }