Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/architecture/ard/ARCHITECTURE_RISKS_HEXAGONAL_PLAN.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,11 @@ Phase 4 artifacts (implemented):
Exit criteria:
- distributed application flow has no direct UI/charts/sinks imports.

Phase 5 artifacts (implemented):
- Distributed run command + application execution port: `src/application/commands.rs`, `src/application/distributed_run.rs`, `src/entry/plan/types.rs`, `src/entry/plan/build.rs`, `src/entry/plan/execute.rs`
- Controller output adapter/event boundary for auto/manual flows: `src/distributed/controller/output.rs`, `src/distributed/controller/auto/setup.rs`, `src/distributed/controller/auto/events.rs`, `src/distributed/controller/auto/finalize.rs`, `src/distributed/controller/manual/run_lifecycle.rs`, `src/distributed/controller/manual/loop_handlers.rs`, `src/distributed/controller/manual/run_finalize.rs`, `src/distributed/controller/manual/state.rs`
- Shared aggregation narrowed to state aggregation primitives: `src/distributed/controller/shared/aggregation.rs`, `src/distributed/controller/shared/events.rs`

### Phase 6: Replay/compare slice extraction (1-2 weeks)
1. Split replay/compare use cases from terminal event loop logic.
2. Keep key event handling and rendering in adapters.
Expand Down
10 changes: 5 additions & 5 deletions src/adapters/cli/mapper.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::BTreeMap;

use crate::application::commands::{
AgentRunCommand, ControllerRunCommand, LocalRunCommand, ReplayRunCommand, ServiceCommand,
DistributedRunCommand, LocalRunCommand, ReplayRunCommand, ServiceCommand,
};
use crate::args::{
LoadMode as CliLoadMode, Protocol as CliProtocol, Scenario as CliScenario, TesterArgs,
Expand Down Expand Up @@ -32,14 +32,14 @@ pub(crate) const fn to_service_command(args: TesterArgs) -> ServiceCommand {
pub(crate) fn to_controller_run_command(
args: TesterArgs,
scenarios: Option<BTreeMap<String, ScenarioConfig>>,
) -> ControllerRunCommand {
) -> DistributedRunCommand {
let run_config = to_run_config(&args);
ControllerRunCommand::new(run_config, args, scenarios)
DistributedRunCommand::new_controller(run_config, args, scenarios)
}

pub(crate) fn to_agent_run_command(args: TesterArgs) -> AgentRunCommand {
pub(crate) fn to_agent_run_command(args: TesterArgs) -> DistributedRunCommand {
let run_config = to_run_config(&args);
AgentRunCommand::new(run_config, args)
DistributedRunCommand::new_agent(run_config, args)
}

fn to_run_config(args: &TesterArgs) -> RunConfig {
Expand Down
61 changes: 29 additions & 32 deletions src/application/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,52 +78,41 @@ impl ServiceCommand {
}

#[derive(Debug)]
pub(crate) struct ControllerRunCommand {
pub(crate) enum DistributedRunMode {
Controller {
scenarios: Option<BTreeMap<String, ScenarioConfig>>,
},
Agent,
}

#[derive(Debug)]
pub(crate) struct DistributedRunCommand {
run_config: RunConfig,
args: TesterArgs,
scenarios: Option<BTreeMap<String, ScenarioConfig>>,
mode: DistributedRunMode,
}

impl ControllerRunCommand {
impl DistributedRunCommand {
#[must_use]
pub(crate) const fn new(
pub(crate) const fn new_controller(
run_config: RunConfig,
args: TesterArgs,
scenarios: Option<BTreeMap<String, ScenarioConfig>>,
) -> Self {
Self {
run_config,
args,
scenarios,
mode: DistributedRunMode::Controller { scenarios },
}
}

#[must_use]
pub(crate) const fn run_config(&self) -> &RunConfig {
&self.run_config
}

#[must_use]
pub(crate) const fn no_color(&self) -> bool {
self.args.no_color
}

#[must_use]
pub(crate) fn into_parts(self) -> (TesterArgs, Option<BTreeMap<String, ScenarioConfig>>) {
(self.args, self.scenarios)
}
}

#[derive(Debug)]
pub(crate) struct AgentRunCommand {
run_config: RunConfig,
args: TesterArgs,
}

impl AgentRunCommand {
#[must_use]
pub(crate) const fn new(run_config: RunConfig, args: TesterArgs) -> Self {
Self { run_config, args }
pub(crate) const fn new_agent(run_config: RunConfig, args: TesterArgs) -> Self {
Self {
run_config,
args,
mode: DistributedRunMode::Agent,
}
}

#[must_use]
Expand All @@ -137,7 +126,15 @@ impl AgentRunCommand {
}

#[must_use]
pub(crate) fn into_args(self) -> TesterArgs {
self.args
pub(crate) const fn mode_name(&self) -> &'static str {
match self.mode {
DistributedRunMode::Controller { .. } => "controller",
DistributedRunMode::Agent => "agent",
}
}

#[must_use]
pub(crate) fn into_parts(self) -> (TesterArgs, DistributedRunMode) {
(self.args, self.mode)
}
}
35 changes: 35 additions & 0 deletions src/application/distributed_run.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::collections::BTreeMap;

use async_trait::async_trait;

use crate::application::commands::{DistributedRunCommand, DistributedRunMode};
use crate::args::TesterArgs;
use crate::config::types::ScenarioConfig;
use crate::error::AppResult;

#[async_trait]
pub(crate) trait DistributedRunPort {
async fn run_controller(
&self,
args: &TesterArgs,
scenarios: Option<BTreeMap<String, ScenarioConfig>>,
) -> AppResult<()>;

async fn run_agent(&self, args: TesterArgs) -> AppResult<()>;
}

pub(crate) async fn execute<TPort>(
command: DistributedRunCommand,
distributed_port: &TPort,
) -> AppResult<()>
where
TPort: DistributedRunPort + Sync,
{
let (args, mode) = command.into_parts();
match mode {
DistributedRunMode::Controller { scenarios } => {
distributed_port.run_controller(&args, scenarios).await
}
DistributedRunMode::Agent => distributed_port.run_agent(args).await,
}
}
1 change: 1 addition & 0 deletions src/application/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub(crate) mod commands;
pub(crate) mod distributed_run;
pub(crate) mod local_run;
1 change: 1 addition & 0 deletions src/distributed/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod control;
mod http;
mod load;
mod manual;
mod output;
mod runner;
mod shared;

Expand Down
60 changes: 23 additions & 37 deletions src/distributed/controller/auto/events.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashMap, HashSet};

use tokio::sync::mpsc;
use tokio::time::MissedTickBehavior;

use crate::args::TesterArgs;
use crate::metrics::AggregatedMetricSample;

use super::super::output::{DistributedOutputState, OutputEvent, handle_output_event};
use super::super::shared::{
AgentEvent, AgentSnapshot, event_agent_id, handle_agent_event, record_aggregated_sample,
resolve_heartbeat_check_interval, resolve_sink_interval, update_ui, write_streaming_sinks,
AgentEvent, AgentSnapshot, event_agent_id, handle_agent_event,
resolve_heartbeat_check_interval, resolve_sink_interval,
};
use super::setup::AutoRunSetup;
use crate::distributed::protocol::{WireMessage, read_message};

pub(super) struct AutoRunOutcome {
pub(super) run_id: String,
pub(super) shutdown_tx: Option<crate::shutdown::ShutdownSender>,
pub(super) output_state: DistributedOutputState,
pub(super) agent_states: HashMap<String, AgentSnapshot>,
pub(super) aggregated_samples: Vec<AggregatedMetricSample>,
pub(super) runtime_errors: Vec<String>,
pub(super) channel_closed: bool,
pub(super) pending_agents: HashSet<String>,
Expand All @@ -30,10 +29,7 @@ pub(super) async fn collect_auto_run_events(
let AutoRunSetup {
run_id,
agents,
ui_tx,
shutdown_tx,
charts_enabled,
sink_updates_enabled,
mut output_state,
heartbeat_timeout,
report_deadline,
} = setup;
Expand All @@ -42,12 +38,8 @@ pub(super) async fn collect_auto_run_events(
let mut agent_states: HashMap<String, AgentSnapshot> = HashMap::new();
let mut pending_agents: HashSet<String> =
agents.iter().map(|agent| agent.agent_id.clone()).collect();
let mut ui_latency_window: VecDeque<(u64, u64)> = VecDeque::new();
let mut ui_rps_window: VecDeque<(u64, u64)> = VecDeque::new();
let mut aggregated_samples: Vec<AggregatedMetricSample> = Vec::new();
let mut sink_interval = tokio::time::interval(resolve_sink_interval(args.sinks.as_ref()));
sink_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut sink_dirty = false;
let mut channel_closed = false;
let mut heartbeat_interval =
tokio::time::interval(resolve_heartbeat_check_interval(heartbeat_timeout));
Expand Down Expand Up @@ -186,36 +178,32 @@ pub(super) async fn collect_auto_run_events(
&mut pending_agents,
&mut agent_states,
&mut runtime_errors,
&mut sink_dirty,
);
if is_disconnected {
disconnected_agents.insert(agent_id.clone());
last_seen.remove(agent_id.as_str());
}
if charts_enabled {
record_aggregated_sample(&mut aggregated_samples, &agent_states);
}
if let Some(ui_tx) = ui_tx.as_ref() {
update_ui(
ui_tx,
args,
&agent_states,
&mut ui_latency_window,
&mut ui_rps_window,
);
}
handle_output_event(
args,
&mut output_state,
&agent_states,
&mut runtime_errors,
OutputEvent::AgentStateUpdated,
)
.await;
if pending_agents.is_empty() {
break;
}
}
_ = sink_interval.tick() => {
if sink_updates_enabled && sink_dirty {
if let Err(err) = write_streaming_sinks(args, &agent_states).await {
runtime_errors.push(err.to_string());
} else {
sink_dirty = false;
}
}
handle_output_event(
args,
&mut output_state,
&agent_states,
&mut runtime_errors,
OutputEvent::SinkTick,
)
.await;
}
_ = heartbeat_interval.tick() => {
let now = tokio::time::Instant::now();
Expand All @@ -237,7 +225,6 @@ pub(super) async fn collect_auto_run_events(
&mut pending_agents,
&mut agent_states,
&mut runtime_errors,
&mut sink_dirty,
);
last_seen.remove(&agent_id);
}
Expand All @@ -251,9 +238,8 @@ pub(super) async fn collect_auto_run_events(

AutoRunOutcome {
run_id,
shutdown_tx,
output_state,
agent_states,
aggregated_samples,
runtime_errors,
channel_closed,
pending_agents,
Expand Down
Loading