Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/architecture/ard/ARCHITECTURE_RISKS_HEXAGONAL_PLAN.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,11 @@ Phase 3 artifacts (implemented):
Exit criteria:
- Application no longer depends on `protocol/runtime` internals.

Phase 4 artifacts (implemented):
- Transport adapter boundary contract: `src/protocol/traits.rs`
- Registry-driven builtin transport adapter wiring: `src/protocol/builtins.rs`, `src/protocol/registry.rs`, `src/protocol/runtime.rs`
- Domain protocol key flow in planning/local-run adapter seams: `src/entry/plan/build.rs`, `src/application/local_run.rs`, `src/app/runner/core/mod.rs`

### Phase 5: Distributed slice extraction (2-3 weeks)
1. Introduce `DistributedRunCommand` and domain state models.
2. Move controller/agent workflows into application services.
Expand Down
23 changes: 2 additions & 21 deletions src/adapters/cli/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,30 +52,11 @@ fn to_run_config(args: &TesterArgs) -> RunConfig {
}

const fn map_protocol(protocol: CliProtocol) -> ProtocolKind {
match protocol {
CliProtocol::Http => ProtocolKind::Http,
CliProtocol::GrpcUnary => ProtocolKind::GrpcUnary,
CliProtocol::GrpcStreaming => ProtocolKind::GrpcStreaming,
CliProtocol::Websocket => ProtocolKind::Websocket,
CliProtocol::Tcp => ProtocolKind::Tcp,
CliProtocol::Udp => ProtocolKind::Udp,
CliProtocol::Quic => ProtocolKind::Quic,
CliProtocol::Mqtt => ProtocolKind::Mqtt,
CliProtocol::Enet => ProtocolKind::Enet,
CliProtocol::Kcp => ProtocolKind::Kcp,
CliProtocol::Raknet => ProtocolKind::Raknet,
}
protocol.to_domain()
}

const fn map_load_mode(load_mode: CliLoadMode) -> LoadMode {
match load_mode {
CliLoadMode::Arrival => LoadMode::Arrival,
CliLoadMode::Step => LoadMode::Step,
CliLoadMode::Ramp => LoadMode::Ramp,
CliLoadMode::Jitter => LoadMode::Jitter,
CliLoadMode::Burst => LoadMode::Burst,
CliLoadMode::Soak => LoadMode::Soak,
}
load_mode.to_domain()
}

fn map_scenario(scenario: &CliScenario) -> Scenario {
Expand Down
7 changes: 5 additions & 2 deletions src/app/runner/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
OutputPort, ShutdownPort, TrafficPort,
},
args::TesterArgs,
domain::run::ProtocolKind,
error::AppResult,
metrics::{self, Metrics},
protocol,
Expand All @@ -32,7 +33,8 @@ pub(crate) async fn run_local(
stream_tx: Option<mpsc::UnboundedSender<metrics::StreamSnapshot>>,
external_shutdown: Option<watch::Receiver<bool>>,
) -> AppResult<RunOutcome> {
let command = LocalRunExecutionCommand::new(args, stream_tx, external_shutdown);
let protocol = args.protocol.to_domain();
let command = LocalRunExecutionCommand::new(protocol, args, stream_tx, external_shutdown);
let shutdown_adapter = RuntimeShutdownAdapter;
let traffic_adapter = RuntimeTrafficAdapter;
let metrics_adapter = RuntimeMetricsAdapter;
Expand Down Expand Up @@ -96,12 +98,13 @@ struct RuntimeTrafficAdapter;
impl TrafficPort for RuntimeTrafficAdapter {
fn setup_request_sender(
&self,
protocol: ProtocolKind,
args: &TesterArgs,
shutdown_tx: &ShutdownSender,
metrics_tx: &mpsc::Sender<Metrics>,
log_sink: Option<&std::sync::Arc<metrics::LogSink>>,
) -> AppResult<tokio::task::JoinHandle<()>> {
protocol::setup_request_sender(args, shutdown_tx, metrics_tx, log_sink)
protocol::setup_request_sender(protocol, args, shutdown_tx, metrics_tx, log_sink)
}
}

Expand Down
23 changes: 19 additions & 4 deletions src/application/local_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tracing::{info, warn};

use crate::app::logs;
use crate::args::TesterArgs;
use crate::domain::run::ProtocolKind;
use crate::error::{AppError, AppResult, ValidationError};
use crate::metrics::{self, Metrics};
use crate::shutdown::{ShutdownReceiver, ShutdownSender};
Expand All @@ -29,6 +30,7 @@ pub(crate) struct RunOutcome {
}

pub(crate) struct LocalRunExecutionCommand {
protocol: ProtocolKind,
args: TesterArgs,
stream_tx: Option<mpsc::UnboundedSender<metrics::StreamSnapshot>>,
external_shutdown: Option<watch::Receiver<bool>>,
Expand All @@ -37,11 +39,13 @@ pub(crate) struct LocalRunExecutionCommand {
impl LocalRunExecutionCommand {
#[must_use]
pub(crate) const fn new(
protocol: ProtocolKind,
args: TesterArgs,
stream_tx: Option<mpsc::UnboundedSender<metrics::StreamSnapshot>>,
external_shutdown: Option<watch::Receiver<bool>>,
) -> Self {
Self {
protocol,
args,
stream_tx,
external_shutdown,
Expand All @@ -52,11 +56,17 @@ impl LocalRunExecutionCommand {
pub(crate) fn into_parts(
self,
) -> (
ProtocolKind,
TesterArgs,
Option<mpsc::UnboundedSender<metrics::StreamSnapshot>>,
Option<watch::Receiver<bool>>,
) {
(self.args, self.stream_tx, self.external_shutdown)
(
self.protocol,
self.args,
self.stream_tx,
self.external_shutdown,
)
}
}

Expand All @@ -80,6 +90,7 @@ pub(crate) trait ShutdownPort {
pub(crate) trait TrafficPort {
fn setup_request_sender(
&self,
protocol: ProtocolKind,
args: &TesterArgs,
shutdown_tx: &ShutdownSender,
metrics_tx: &mpsc::Sender<Metrics>,
Expand Down Expand Up @@ -178,7 +189,7 @@ where
TMetrics: MetricsPort,
TOutput: OutputPort + Sync,
{
let (args, stream_tx, external_shutdown) = command.into_parts();
let (protocol, args, stream_tx, external_shutdown) = command.into_parts();

#[cfg(feature = "wasm")]
let mut plugin_host = WasmPluginHost::from_paths(&args.plugin)?;
Expand Down Expand Up @@ -240,6 +251,7 @@ where
.await?;

let request_sender_handle = match traffic_port.setup_request_sender(
protocol,
&args,
&shutdown_tx,
&metrics_tx,
Expand Down Expand Up @@ -369,6 +381,7 @@ mod tests {
impl TrafficPort for FakeTrafficPort {
fn setup_request_sender(
&self,
_protocol: ProtocolKind,
_args: &TesterArgs,
_shutdown_tx: &ShutdownSender,
_metrics_tx: &mpsc::Sender<Metrics>,
Expand Down Expand Up @@ -506,7 +519,8 @@ mod tests {
splash_cancelled: false,
finalize_called: finalize_called.clone(),
};
let command = LocalRunExecutionCommand::new(parse_args()?, None, None);
let args = parse_args()?;
let command = LocalRunExecutionCommand::new(args.protocol.to_domain(), args, None, None);

let outcome = execute(
command,
Expand Down Expand Up @@ -534,7 +548,8 @@ mod tests {
splash_cancelled: true,
finalize_called: finalize_called.clone(),
};
let command = LocalRunExecutionCommand::new(parse_args()?, None, None);
let args = parse_args()?;
let command = LocalRunExecutionCommand::new(args.protocol.to_domain(), args, None, None);

let result = execute(
command,
Expand Down
29 changes: 29 additions & 0 deletions src/args/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,23 @@ impl Protocol {
Protocol::Raknet => "raknet",
}
}

#[must_use]
pub const fn to_domain(self) -> crate::domain::run::ProtocolKind {
match self {
Protocol::Http => crate::domain::run::ProtocolKind::Http,
Protocol::GrpcUnary => crate::domain::run::ProtocolKind::GrpcUnary,
Protocol::GrpcStreaming => crate::domain::run::ProtocolKind::GrpcStreaming,
Protocol::Websocket => crate::domain::run::ProtocolKind::Websocket,
Protocol::Tcp => crate::domain::run::ProtocolKind::Tcp,
Protocol::Udp => crate::domain::run::ProtocolKind::Udp,
Protocol::Quic => crate::domain::run::ProtocolKind::Quic,
Protocol::Mqtt => crate::domain::run::ProtocolKind::Mqtt,
Protocol::Enet => crate::domain::run::ProtocolKind::Enet,
Protocol::Kcp => crate::domain::run::ProtocolKind::Kcp,
Protocol::Raknet => crate::domain::run::ProtocolKind::Raknet,
}
}
}

#[derive(Debug, Clone, Copy, ValueEnum, Deserialize, Serialize, PartialEq, Eq)]
Expand All @@ -156,6 +173,18 @@ impl LoadMode {
LoadMode::Soak => "soak",
}
}

#[must_use]
pub const fn to_domain(self) -> crate::domain::run::LoadMode {
match self {
LoadMode::Arrival => crate::domain::run::LoadMode::Arrival,
LoadMode::Step => crate::domain::run::LoadMode::Step,
LoadMode::Ramp => crate::domain::run::LoadMode::Ramp,
LoadMode::Jitter => crate::domain::run::LoadMode::Jitter,
LoadMode::Burst => crate::domain::run::LoadMode::Burst,
LoadMode::Soak => crate::domain::run::LoadMode::Soak,
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down
2 changes: 1 addition & 1 deletion src/domain/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pub(crate) mod run;
pub mod run;
32 changes: 16 additions & 16 deletions src/domain/run.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ProtocolKind {
pub enum ProtocolKind {
Http,
GrpcUnary,
GrpcStreaming,
Expand All @@ -15,7 +15,7 @@ pub(crate) enum ProtocolKind {

impl ProtocolKind {
#[must_use]
pub(crate) const fn as_str(self) -> &'static str {
pub const fn as_str(self) -> &'static str {
match self {
ProtocolKind::Http => "http",
ProtocolKind::GrpcUnary => "grpc-unary",
Expand All @@ -33,7 +33,7 @@ impl ProtocolKind {
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum LoadMode {
pub enum LoadMode {
Arrival,
Step,
Ramp,
Expand All @@ -44,7 +44,7 @@ pub(crate) enum LoadMode {

impl LoadMode {
#[must_use]
pub(crate) const fn as_str(self) -> &'static str {
pub const fn as_str(self) -> &'static str {
match self {
LoadMode::Arrival => "arrival",
LoadMode::Step => "step",
Expand All @@ -57,39 +57,39 @@ impl LoadMode {
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct Scenario {
pub(crate) base_url: Option<String>,
pub(crate) vars_count: usize,
pub(crate) step_count: usize,
pub struct Scenario {
pub base_url: Option<String>,
pub vars_count: usize,
pub step_count: usize,
}

#[derive(Debug, Clone)]
pub(crate) struct RunConfig {
pub(crate) protocol: ProtocolKind,
pub(crate) load_mode: LoadMode,
pub(crate) target_url: Option<String>,
pub(crate) scenario: Option<Scenario>,
pub struct RunConfig {
pub protocol: ProtocolKind,
pub load_mode: LoadMode,
pub target_url: Option<String>,
pub scenario: Option<Scenario>,
}

impl RunConfig {
#[must_use]
pub(crate) fn scenario_step_count(&self) -> usize {
pub fn scenario_step_count(&self) -> usize {
self.scenario
.as_ref()
.map(|scenario| scenario.step_count)
.unwrap_or(0)
}

#[must_use]
pub(crate) fn scenario_vars_count(&self) -> usize {
pub fn scenario_vars_count(&self) -> usize {
self.scenario
.as_ref()
.map(|scenario| scenario.vars_count)
.unwrap_or(0)
}

#[must_use]
pub(crate) fn scenario_base_url(&self) -> Option<&str> {
pub fn scenario_base_url(&self) -> Option<&str> {
self.scenario
.as_ref()
.and_then(|scenario| scenario.base_url.as_deref())
Expand Down
16 changes: 9 additions & 7 deletions src/entry/plan/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,13 @@ fn validate_db_logging(args: &TesterArgs) -> AppResult<()> {
}

fn validate_protocol_support(args: &TesterArgs) -> AppResult<()> {
let protocol = args.protocol.to_domain();
let load_mode = args.load_mode.to_domain();
let registry = protocol_registry();
let Some(adapter) = registry.adapter(args.protocol) else {
let Some(adapter) = registry.adapter(protocol) else {
let supported = registry.executable_protocols_csv();
return Err(AppError::validation(ValidationError::UnsupportedProtocol {
protocol: args.protocol.as_str().to_owned(),
protocol: protocol.as_str().to_owned(),
supported,
}));
};
Expand All @@ -218,18 +220,18 @@ fn validate_protocol_support(args: &TesterArgs) -> AppResult<()> {
adapter.display_name(),
adapter.supports_stateful_connections()
);
if !registry.supports_execution(args.protocol) {
if !registry.supports_execution(protocol) {
let supported = registry.executable_protocols_csv();
return Err(AppError::validation(ValidationError::UnsupportedProtocol {
protocol: args.protocol.as_str().to_owned(),
protocol: protocol.as_str().to_owned(),
supported,
}));
}
if !registry.supports_load_mode(args.protocol, args.load_mode) {
if !registry.supports_load_mode(protocol, load_mode) {
return Err(AppError::validation(
ValidationError::UnsupportedLoadModeForProtocol {
protocol: args.protocol.as_str().to_owned(),
load_mode: args.load_mode.as_str().to_owned(),
protocol: protocol.as_str().to_owned(),
load_mode: load_mode.as_str().to_owned(),
},
));
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! grows.
pub mod args;
pub mod config;
pub mod domain;
pub mod error;
pub mod http;
pub mod metrics;
Expand Down
Loading