From ab46d1c62b6087b58c46a4b1acf9536835e8e299 Mon Sep 17 00:00:00 2001 From: Celestial Date: Sat, 14 Feb 2026 12:33:59 +0100 Subject: [PATCH] chore: implement phase 4 protocol adapter boundary --- .../ard/ARCHITECTURE_RISKS_HEXAGONAL_PLAN.md | 5 + src/adapters/cli/mapper.rs | 23 +--- src/app/runner/core/mod.rs | 7 +- src/application/local_run.rs | 23 +++- src/args/types.rs | 29 ++++ src/domain/mod.rs | 2 +- src/domain/run.rs | 32 ++--- src/entry/plan/build.rs | 16 ++- src/lib.rs | 1 + src/protocol/builtins.rs | 128 +++++++++++++++--- src/protocol/examples/chat_websocket.rs | 10 +- src/protocol/examples/game_udp.rs | 10 +- src/protocol/examples/telemetry_mqtt.rs | 10 +- src/protocol/mod.rs | 2 +- src/protocol/registry.rs | 16 +-- src/protocol/runtime.rs | 59 ++++---- src/protocol/runtime/tests/datagram_mqtt.rs | 16 ++- .../runtime/tests/scheme_resolution.rs | 8 +- .../runtime/tests/transport_http_grpc.rs | 8 +- src/protocol/tests.rs | 46 ++++--- src/protocol/traits.rs | 35 ++++- 21 files changed, 338 insertions(+), 148 deletions(-) diff --git a/docs/architecture/ard/ARCHITECTURE_RISKS_HEXAGONAL_PLAN.md b/docs/architecture/ard/ARCHITECTURE_RISKS_HEXAGONAL_PLAN.md index b83ae9f..36620ea 100644 --- a/docs/architecture/ard/ARCHITECTURE_RISKS_HEXAGONAL_PLAN.md +++ b/docs/architecture/ard/ARCHITECTURE_RISKS_HEXAGONAL_PLAN.md @@ -303,6 +303,11 @@ Phase 3 artifacts (implemented): Exit criteria: - Application no longer depends on `protocol/runtime` internals. +Phase 4 artifacts (implemented): +- Transport adapter boundary contract: `src/protocol/traits.rs` +- Registry-driven builtin transport adapter wiring: `src/protocol/builtins.rs`, `src/protocol/registry.rs`, `src/protocol/runtime.rs` +- Domain protocol key flow in planning/local-run adapter seams: `src/entry/plan/build.rs`, `src/application/local_run.rs`, `src/app/runner/core/mod.rs` + ### Phase 5: Distributed slice extraction (2-3 weeks) 1. Introduce `DistributedRunCommand` and domain state models. 2. Move controller/agent workflows into application services. diff --git a/src/adapters/cli/mapper.rs b/src/adapters/cli/mapper.rs index 7a35c07..d991ece 100644 --- a/src/adapters/cli/mapper.rs +++ b/src/adapters/cli/mapper.rs @@ -52,30 +52,11 @@ fn to_run_config(args: &TesterArgs) -> RunConfig { } const fn map_protocol(protocol: CliProtocol) -> ProtocolKind { - match protocol { - CliProtocol::Http => ProtocolKind::Http, - CliProtocol::GrpcUnary => ProtocolKind::GrpcUnary, - CliProtocol::GrpcStreaming => ProtocolKind::GrpcStreaming, - CliProtocol::Websocket => ProtocolKind::Websocket, - CliProtocol::Tcp => ProtocolKind::Tcp, - CliProtocol::Udp => ProtocolKind::Udp, - CliProtocol::Quic => ProtocolKind::Quic, - CliProtocol::Mqtt => ProtocolKind::Mqtt, - CliProtocol::Enet => ProtocolKind::Enet, - CliProtocol::Kcp => ProtocolKind::Kcp, - CliProtocol::Raknet => ProtocolKind::Raknet, - } + protocol.to_domain() } const fn map_load_mode(load_mode: CliLoadMode) -> LoadMode { - match load_mode { - CliLoadMode::Arrival => LoadMode::Arrival, - CliLoadMode::Step => LoadMode::Step, - CliLoadMode::Ramp => LoadMode::Ramp, - CliLoadMode::Jitter => LoadMode::Jitter, - CliLoadMode::Burst => LoadMode::Burst, - CliLoadMode::Soak => LoadMode::Soak, - } + load_mode.to_domain() } fn map_scenario(scenario: &CliScenario) -> Scenario { diff --git a/src/app/runner/core/mod.rs b/src/app/runner/core/mod.rs index 27c8a6c..38a0c57 100644 --- a/src/app/runner/core/mod.rs +++ b/src/app/runner/core/mod.rs @@ -10,6 +10,7 @@ use crate::{ OutputPort, ShutdownPort, TrafficPort, }, args::TesterArgs, + domain::run::ProtocolKind, error::AppResult, metrics::{self, Metrics}, protocol, @@ -32,7 +33,8 @@ pub(crate) async fn run_local( stream_tx: Option>, external_shutdown: Option>, ) -> AppResult { - let command = LocalRunExecutionCommand::new(args, stream_tx, external_shutdown); + let protocol = args.protocol.to_domain(); + let command = LocalRunExecutionCommand::new(protocol, args, stream_tx, external_shutdown); let shutdown_adapter = RuntimeShutdownAdapter; let traffic_adapter = RuntimeTrafficAdapter; let metrics_adapter = RuntimeMetricsAdapter; @@ -96,12 +98,13 @@ struct RuntimeTrafficAdapter; impl TrafficPort for RuntimeTrafficAdapter { fn setup_request_sender( &self, + protocol: ProtocolKind, args: &TesterArgs, shutdown_tx: &ShutdownSender, metrics_tx: &mpsc::Sender, log_sink: Option<&std::sync::Arc>, ) -> AppResult> { - protocol::setup_request_sender(args, shutdown_tx, metrics_tx, log_sink) + protocol::setup_request_sender(protocol, args, shutdown_tx, metrics_tx, log_sink) } } diff --git a/src/application/local_run.rs b/src/application/local_run.rs index 39d299a..d2a8da1 100644 --- a/src/application/local_run.rs +++ b/src/application/local_run.rs @@ -8,6 +8,7 @@ use tracing::{info, warn}; use crate::app::logs; use crate::args::TesterArgs; +use crate::domain::run::ProtocolKind; use crate::error::{AppError, AppResult, ValidationError}; use crate::metrics::{self, Metrics}; use crate::shutdown::{ShutdownReceiver, ShutdownSender}; @@ -29,6 +30,7 @@ pub(crate) struct RunOutcome { } pub(crate) struct LocalRunExecutionCommand { + protocol: ProtocolKind, args: TesterArgs, stream_tx: Option>, external_shutdown: Option>, @@ -37,11 +39,13 @@ pub(crate) struct LocalRunExecutionCommand { impl LocalRunExecutionCommand { #[must_use] pub(crate) const fn new( + protocol: ProtocolKind, args: TesterArgs, stream_tx: Option>, external_shutdown: Option>, ) -> Self { Self { + protocol, args, stream_tx, external_shutdown, @@ -52,11 +56,17 @@ impl LocalRunExecutionCommand { pub(crate) fn into_parts( self, ) -> ( + ProtocolKind, TesterArgs, Option>, Option>, ) { - (self.args, self.stream_tx, self.external_shutdown) + ( + self.protocol, + self.args, + self.stream_tx, + self.external_shutdown, + ) } } @@ -80,6 +90,7 @@ pub(crate) trait ShutdownPort { pub(crate) trait TrafficPort { fn setup_request_sender( &self, + protocol: ProtocolKind, args: &TesterArgs, shutdown_tx: &ShutdownSender, metrics_tx: &mpsc::Sender, @@ -178,7 +189,7 @@ where TMetrics: MetricsPort, TOutput: OutputPort + Sync, { - let (args, stream_tx, external_shutdown) = command.into_parts(); + let (protocol, args, stream_tx, external_shutdown) = command.into_parts(); #[cfg(feature = "wasm")] let mut plugin_host = WasmPluginHost::from_paths(&args.plugin)?; @@ -240,6 +251,7 @@ where .await?; let request_sender_handle = match traffic_port.setup_request_sender( + protocol, &args, &shutdown_tx, &metrics_tx, @@ -369,6 +381,7 @@ mod tests { impl TrafficPort for FakeTrafficPort { fn setup_request_sender( &self, + _protocol: ProtocolKind, _args: &TesterArgs, _shutdown_tx: &ShutdownSender, _metrics_tx: &mpsc::Sender, @@ -506,7 +519,8 @@ mod tests { splash_cancelled: false, finalize_called: finalize_called.clone(), }; - let command = LocalRunExecutionCommand::new(parse_args()?, None, None); + let args = parse_args()?; + let command = LocalRunExecutionCommand::new(args.protocol.to_domain(), args, None, None); let outcome = execute( command, @@ -534,7 +548,8 @@ mod tests { splash_cancelled: true, finalize_called: finalize_called.clone(), }; - let command = LocalRunExecutionCommand::new(parse_args()?, None, None); + let args = parse_args()?; + let command = LocalRunExecutionCommand::new(args.protocol.to_domain(), args, None, None); let result = execute( command, diff --git a/src/args/types.rs b/src/args/types.rs index f9e010c..7a48a89 100644 --- a/src/args/types.rs +++ b/src/args/types.rs @@ -131,6 +131,23 @@ impl Protocol { Protocol::Raknet => "raknet", } } + + #[must_use] + pub const fn to_domain(self) -> crate::domain::run::ProtocolKind { + match self { + Protocol::Http => crate::domain::run::ProtocolKind::Http, + Protocol::GrpcUnary => crate::domain::run::ProtocolKind::GrpcUnary, + Protocol::GrpcStreaming => crate::domain::run::ProtocolKind::GrpcStreaming, + Protocol::Websocket => crate::domain::run::ProtocolKind::Websocket, + Protocol::Tcp => crate::domain::run::ProtocolKind::Tcp, + Protocol::Udp => crate::domain::run::ProtocolKind::Udp, + Protocol::Quic => crate::domain::run::ProtocolKind::Quic, + Protocol::Mqtt => crate::domain::run::ProtocolKind::Mqtt, + Protocol::Enet => crate::domain::run::ProtocolKind::Enet, + Protocol::Kcp => crate::domain::run::ProtocolKind::Kcp, + Protocol::Raknet => crate::domain::run::ProtocolKind::Raknet, + } + } } #[derive(Debug, Clone, Copy, ValueEnum, Deserialize, Serialize, PartialEq, Eq)] @@ -156,6 +173,18 @@ impl LoadMode { LoadMode::Soak => "soak", } } + + #[must_use] + pub const fn to_domain(self) -> crate::domain::run::LoadMode { + match self { + LoadMode::Arrival => crate::domain::run::LoadMode::Arrival, + LoadMode::Step => crate::domain::run::LoadMode::Step, + LoadMode::Ramp => crate::domain::run::LoadMode::Ramp, + LoadMode::Jitter => crate::domain::run::LoadMode::Jitter, + LoadMode::Burst => crate::domain::run::LoadMode::Burst, + LoadMode::Soak => crate::domain::run::LoadMode::Soak, + } + } } #[derive(Debug, Clone, Copy, PartialEq, Eq)] diff --git a/src/domain/mod.rs b/src/domain/mod.rs index bcb3b38..9137f27 100644 --- a/src/domain/mod.rs +++ b/src/domain/mod.rs @@ -1 +1 @@ -pub(crate) mod run; +pub mod run; diff --git a/src/domain/run.rs b/src/domain/run.rs index 99aa039..cb98803 100644 --- a/src/domain/run.rs +++ b/src/domain/run.rs @@ -1,5 +1,5 @@ #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub(crate) enum ProtocolKind { +pub enum ProtocolKind { Http, GrpcUnary, GrpcStreaming, @@ -15,7 +15,7 @@ pub(crate) enum ProtocolKind { impl ProtocolKind { #[must_use] - pub(crate) const fn as_str(self) -> &'static str { + pub const fn as_str(self) -> &'static str { match self { ProtocolKind::Http => "http", ProtocolKind::GrpcUnary => "grpc-unary", @@ -33,7 +33,7 @@ impl ProtocolKind { } #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub(crate) enum LoadMode { +pub enum LoadMode { Arrival, Step, Ramp, @@ -44,7 +44,7 @@ pub(crate) enum LoadMode { impl LoadMode { #[must_use] - pub(crate) const fn as_str(self) -> &'static str { + pub const fn as_str(self) -> &'static str { match self { LoadMode::Arrival => "arrival", LoadMode::Step => "step", @@ -57,23 +57,23 @@ impl LoadMode { } #[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) struct Scenario { - pub(crate) base_url: Option, - pub(crate) vars_count: usize, - pub(crate) step_count: usize, +pub struct Scenario { + pub base_url: Option, + pub vars_count: usize, + pub step_count: usize, } #[derive(Debug, Clone)] -pub(crate) struct RunConfig { - pub(crate) protocol: ProtocolKind, - pub(crate) load_mode: LoadMode, - pub(crate) target_url: Option, - pub(crate) scenario: Option, +pub struct RunConfig { + pub protocol: ProtocolKind, + pub load_mode: LoadMode, + pub target_url: Option, + pub scenario: Option, } impl RunConfig { #[must_use] - pub(crate) fn scenario_step_count(&self) -> usize { + pub fn scenario_step_count(&self) -> usize { self.scenario .as_ref() .map(|scenario| scenario.step_count) @@ -81,7 +81,7 @@ impl RunConfig { } #[must_use] - pub(crate) fn scenario_vars_count(&self) -> usize { + pub fn scenario_vars_count(&self) -> usize { self.scenario .as_ref() .map(|scenario| scenario.vars_count) @@ -89,7 +89,7 @@ impl RunConfig { } #[must_use] - pub(crate) fn scenario_base_url(&self) -> Option<&str> { + pub fn scenario_base_url(&self) -> Option<&str> { self.scenario .as_ref() .and_then(|scenario| scenario.base_url.as_deref()) diff --git a/src/entry/plan/build.rs b/src/entry/plan/build.rs index 52b92d4..6c208e2 100644 --- a/src/entry/plan/build.rs +++ b/src/entry/plan/build.rs @@ -205,11 +205,13 @@ fn validate_db_logging(args: &TesterArgs) -> AppResult<()> { } fn validate_protocol_support(args: &TesterArgs) -> AppResult<()> { + let protocol = args.protocol.to_domain(); + let load_mode = args.load_mode.to_domain(); let registry = protocol_registry(); - let Some(adapter) = registry.adapter(args.protocol) else { + let Some(adapter) = registry.adapter(protocol) else { let supported = registry.executable_protocols_csv(); return Err(AppError::validation(ValidationError::UnsupportedProtocol { - protocol: args.protocol.as_str().to_owned(), + protocol: protocol.as_str().to_owned(), supported, })); }; @@ -218,18 +220,18 @@ fn validate_protocol_support(args: &TesterArgs) -> AppResult<()> { adapter.display_name(), adapter.supports_stateful_connections() ); - if !registry.supports_execution(args.protocol) { + if !registry.supports_execution(protocol) { let supported = registry.executable_protocols_csv(); return Err(AppError::validation(ValidationError::UnsupportedProtocol { - protocol: args.protocol.as_str().to_owned(), + protocol: protocol.as_str().to_owned(), supported, })); } - if !registry.supports_load_mode(args.protocol, args.load_mode) { + if !registry.supports_load_mode(protocol, load_mode) { return Err(AppError::validation( ValidationError::UnsupportedLoadModeForProtocol { - protocol: args.protocol.as_str().to_owned(), - load_mode: args.load_mode.as_str().to_owned(), + protocol: protocol.as_str().to_owned(), + load_mode: load_mode.as_str().to_owned(), }, )); } diff --git a/src/lib.rs b/src/lib.rs index 30a5452..e58cff8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,7 @@ //! grows. pub mod args; pub mod config; +pub mod domain; pub mod error; pub mod http; pub mod metrics; diff --git a/src/protocol/builtins.rs b/src/protocol/builtins.rs index b0eadf6..af231b7 100644 --- a/src/protocol/builtins.rs +++ b/src/protocol/builtins.rs @@ -1,6 +1,15 @@ -use crate::args::{LoadMode, Protocol}; +use std::sync::Arc; -use super::ProtocolAdapter; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +use crate::args::TesterArgs; +use crate::domain::run::{LoadMode, ProtocolKind}; +use crate::error::AppResult; +use crate::metrics::{LogSink, Metrics}; +use crate::shutdown::ShutdownSender; + +use super::{ProtocolAdapter, TransportAdapter}; const ALL_LOAD_MODES: &[LoadMode] = &[ LoadMode::Arrival, @@ -15,20 +24,29 @@ const ARRIVAL_RAMP_ONLY: &[LoadMode] = &[LoadMode::Arrival, LoadMode::Ramp]; #[derive(Clone)] pub(super) struct StaticProtocolAdapter { - protocol: Protocol, + protocol: ProtocolKind, display_name: &'static str, executes_traffic: bool, supports_stateful_connections: bool, supported_load_modes: &'static [LoadMode], + setup_request_sender: SetupRequestSenderFn, } +type SetupRequestSenderFn = fn( + &TesterArgs, + &ShutdownSender, + &mpsc::Sender, + Option<&Arc>, +) -> AppResult>; + impl StaticProtocolAdapter { const fn new( - protocol: Protocol, + protocol: ProtocolKind, display_name: &'static str, executes_traffic: bool, supports_stateful_connections: bool, supported_load_modes: &'static [LoadMode], + setup_request_sender: SetupRequestSenderFn, ) -> Self { Self { protocol, @@ -36,68 +54,134 @@ impl StaticProtocolAdapter { executes_traffic, supports_stateful_connections, supported_load_modes, + setup_request_sender, } } pub(super) const fn http() -> Self { - Self::new(Protocol::Http, "HTTP", true, true, ALL_LOAD_MODES) + Self::new( + ProtocolKind::Http, + "HTTP", + true, + true, + ALL_LOAD_MODES, + crate::http::setup_request_sender, + ) } pub(super) const fn grpc_unary() -> Self { Self::new( - Protocol::GrpcUnary, + ProtocolKind::GrpcUnary, "gRPC Unary", true, true, ARRIVAL_RAMP_ONLY, + super::runtime::setup_grpc_unary_sender, ) } pub(super) const fn grpc_streaming() -> Self { Self::new( - Protocol::GrpcStreaming, + ProtocolKind::GrpcStreaming, "gRPC Streaming", true, true, ALL_LOAD_MODES, + super::runtime::setup_grpc_streaming_sender, ) } pub(super) const fn websocket() -> Self { - Self::new(Protocol::Websocket, "WebSocket", true, true, ALL_LOAD_MODES) + Self::new( + ProtocolKind::Websocket, + "WebSocket", + true, + true, + ALL_LOAD_MODES, + super::runtime::setup_websocket_sender, + ) } pub(super) const fn tcp() -> Self { - Self::new(Protocol::Tcp, "TCP", true, true, ALL_LOAD_MODES) + Self::new( + ProtocolKind::Tcp, + "TCP", + true, + true, + ALL_LOAD_MODES, + super::runtime::setup_tcp_sender, + ) } pub(super) const fn udp() -> Self { - Self::new(Protocol::Udp, "UDP", true, false, ALL_LOAD_MODES) + Self::new( + ProtocolKind::Udp, + "UDP", + true, + false, + ALL_LOAD_MODES, + super::runtime::setup_udp_sender, + ) } pub(super) const fn quic() -> Self { - Self::new(Protocol::Quic, "QUIC", true, true, ALL_LOAD_MODES) + Self::new( + ProtocolKind::Quic, + "QUIC", + true, + true, + ALL_LOAD_MODES, + super::runtime::setup_quic_sender, + ) } pub(super) const fn mqtt() -> Self { - Self::new(Protocol::Mqtt, "MQTT", true, true, SOAK_BURST_ONLY) + Self::new( + ProtocolKind::Mqtt, + "MQTT", + true, + true, + SOAK_BURST_ONLY, + super::runtime::setup_mqtt_sender, + ) } pub(super) const fn enet() -> Self { - Self::new(Protocol::Enet, "ENet", true, true, ALL_LOAD_MODES) + Self::new( + ProtocolKind::Enet, + "ENet", + true, + true, + ALL_LOAD_MODES, + super::runtime::setup_enet_sender, + ) } pub(super) const fn kcp() -> Self { - Self::new(Protocol::Kcp, "KCP", true, true, ALL_LOAD_MODES) + Self::new( + ProtocolKind::Kcp, + "KCP", + true, + true, + ALL_LOAD_MODES, + super::runtime::setup_kcp_sender, + ) } pub(super) const fn raknet() -> Self { - Self::new(Protocol::Raknet, "RakNet", true, true, ALL_LOAD_MODES) + Self::new( + ProtocolKind::Raknet, + "RakNet", + true, + true, + ALL_LOAD_MODES, + super::runtime::setup_raknet_sender, + ) } } impl ProtocolAdapter for StaticProtocolAdapter { - fn protocol(&self) -> Protocol { + fn protocol(&self) -> ProtocolKind { self.protocol } @@ -118,6 +202,18 @@ impl ProtocolAdapter for StaticProtocolAdapter { } } +impl TransportAdapter for StaticProtocolAdapter { + fn setup_request_sender( + &self, + args: &TesterArgs, + shutdown_tx: &ShutdownSender, + metrics_tx: &mpsc::Sender, + log_sink: Option<&Arc>, + ) -> AppResult> { + (self.setup_request_sender)(args, shutdown_tx, metrics_tx, log_sink) + } +} + pub(super) const fn builtins() -> [StaticProtocolAdapter; 11] { [ StaticProtocolAdapter::http(), diff --git a/src/protocol/examples/chat_websocket.rs b/src/protocol/examples/chat_websocket.rs index df66c3e..89a08de 100644 --- a/src/protocol/examples/chat_websocket.rs +++ b/src/protocol/examples/chat_websocket.rs @@ -1,6 +1,6 @@ -use crate::args::{LoadMode, Protocol}; +use crate::domain::run::{LoadMode, ProtocolKind}; -use crate::protocol::ProtocolAdapter; +use crate::protocol::{ProtocolAdapter, TransportAdapter}; const LOAD_MODES: &[LoadMode] = &[ LoadMode::Arrival, @@ -14,8 +14,8 @@ const LOAD_MODES: &[LoadMode] = &[ pub struct ChatWebSocketPlugin; impl ProtocolAdapter for ChatWebSocketPlugin { - fn protocol(&self) -> Protocol { - Protocol::Websocket + fn protocol(&self) -> ProtocolKind { + ProtocolKind::Websocket } fn display_name(&self) -> &'static str { @@ -34,3 +34,5 @@ impl ProtocolAdapter for ChatWebSocketPlugin { LOAD_MODES } } + +impl TransportAdapter for ChatWebSocketPlugin {} diff --git a/src/protocol/examples/game_udp.rs b/src/protocol/examples/game_udp.rs index 6573e68..f7cd472 100644 --- a/src/protocol/examples/game_udp.rs +++ b/src/protocol/examples/game_udp.rs @@ -1,6 +1,6 @@ -use crate::args::{LoadMode, Protocol}; +use crate::domain::run::{LoadMode, ProtocolKind}; -use crate::protocol::ProtocolAdapter; +use crate::protocol::{ProtocolAdapter, TransportAdapter}; const LOAD_MODES: &[LoadMode] = &[LoadMode::Arrival, LoadMode::Jitter, LoadMode::Burst]; @@ -8,8 +8,8 @@ const LOAD_MODES: &[LoadMode] = &[LoadMode::Arrival, LoadMode::Jitter, LoadMode: pub struct GameUdpPlugin; impl ProtocolAdapter for GameUdpPlugin { - fn protocol(&self) -> Protocol { - Protocol::Udp + fn protocol(&self) -> ProtocolKind { + ProtocolKind::Udp } fn display_name(&self) -> &'static str { @@ -28,3 +28,5 @@ impl ProtocolAdapter for GameUdpPlugin { LOAD_MODES } } + +impl TransportAdapter for GameUdpPlugin {} diff --git a/src/protocol/examples/telemetry_mqtt.rs b/src/protocol/examples/telemetry_mqtt.rs index fa0e6f9..100e032 100644 --- a/src/protocol/examples/telemetry_mqtt.rs +++ b/src/protocol/examples/telemetry_mqtt.rs @@ -1,6 +1,6 @@ -use crate::args::{LoadMode, Protocol}; +use crate::domain::run::{LoadMode, ProtocolKind}; -use crate::protocol::ProtocolAdapter; +use crate::protocol::{ProtocolAdapter, TransportAdapter}; const LOAD_MODES: &[LoadMode] = &[LoadMode::Soak, LoadMode::Burst]; @@ -8,8 +8,8 @@ const LOAD_MODES: &[LoadMode] = &[LoadMode::Soak, LoadMode::Burst]; pub struct TelemetryMqttPlugin; impl ProtocolAdapter for TelemetryMqttPlugin { - fn protocol(&self) -> Protocol { - Protocol::Mqtt + fn protocol(&self) -> ProtocolKind { + ProtocolKind::Mqtt } fn display_name(&self) -> &'static str { @@ -28,3 +28,5 @@ impl ProtocolAdapter for TelemetryMqttPlugin { LOAD_MODES } } + +impl TransportAdapter for TelemetryMqttPlugin {} diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 0bbf784..b7b79d5 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -12,4 +12,4 @@ mod tests; pub(crate) use registry::ProtocolRegistry; pub use registry::protocol_registry; pub use runtime::setup_request_sender; -pub use traits::{ProtocolAdapter, ProtocolAdapterError}; +pub use traits::{ProtocolAdapter, ProtocolAdapterError, TransportAdapter}; diff --git a/src/protocol/registry.rs b/src/protocol/registry.rs index c36817b..3578258 100644 --- a/src/protocol/registry.rs +++ b/src/protocol/registry.rs @@ -1,13 +1,13 @@ use std::sync::{Arc, OnceLock}; -use crate::args::{LoadMode, Protocol}; +use crate::domain::run::{LoadMode, ProtocolKind}; use super::builtins; -use super::{ProtocolAdapter, ProtocolAdapterError}; +use super::{ProtocolAdapterError, TransportAdapter}; #[derive(Clone)] pub struct ProtocolRegistry { - adapters: Vec>, + adapters: Vec>, } impl ProtocolRegistry { @@ -35,7 +35,7 @@ impl ProtocolRegistry { /// registered. pub fn register_adapter

(&mut self, adapter: P) -> Result<(), ProtocolAdapterError> where - P: ProtocolAdapter + 'static, + P: TransportAdapter + 'static, { let protocol = adapter.protocol(); if self @@ -51,7 +51,7 @@ impl ProtocolRegistry { Ok(()) } - pub fn adapter(&self, protocol: Protocol) -> Option<&dyn ProtocolAdapter> { + pub fn adapter(&self, protocol: ProtocolKind) -> Option<&dyn TransportAdapter> { self.adapters .iter() .find(|adapter| adapter.protocol() == protocol) @@ -59,14 +59,14 @@ impl ProtocolRegistry { } #[must_use] - pub fn supports_execution(&self, protocol: Protocol) -> bool { + pub fn supports_execution(&self, protocol: ProtocolKind) -> bool { self.adapter(protocol) - .map(ProtocolAdapter::executes_traffic) + .map(|adapter| adapter.executes_traffic()) .unwrap_or(false) } #[must_use] - pub fn supports_load_mode(&self, protocol: Protocol, load_mode: LoadMode) -> bool { + pub fn supports_load_mode(&self, protocol: ProtocolKind, load_mode: LoadMode) -> bool { self.adapter(protocol) .map(|adapter| adapter.supported_load_modes().contains(&load_mode)) .unwrap_or(false) diff --git a/src/protocol/runtime.rs b/src/protocol/runtime.rs index 2e76a9a..8af0b69 100644 --- a/src/protocol/runtime.rs +++ b/src/protocol/runtime.rs @@ -16,6 +16,7 @@ use tokio::task::JoinHandle; use url::Url; use crate::args::TesterArgs; +use crate::domain::run::ProtocolKind; use crate::error::{AppError, AppResult, ValidationError}; use crate::metrics::{LogSink, Metrics}; use crate::shutdown::ShutdownSender; @@ -33,37 +34,31 @@ use transports::{tcp_request_once, websocket_request_once}; /// /// Returns an error when protocol settings are invalid or unsupported. pub fn setup_request_sender( + protocol: ProtocolKind, args: &TesterArgs, shutdown_tx: &ShutdownSender, metrics_tx: &mpsc::Sender, log_sink: Option<&Arc>, ) -> AppResult> { - match args.protocol { - crate::args::Protocol::Http => { - crate::http::setup_request_sender(args, shutdown_tx, metrics_tx, log_sink) - } - crate::args::Protocol::Tcp => setup_tcp_sender(args, shutdown_tx, metrics_tx, log_sink), - crate::args::Protocol::Udp => setup_udp_sender(args, shutdown_tx, metrics_tx, log_sink), - crate::args::Protocol::Websocket => { - setup_websocket_sender(args, shutdown_tx, metrics_tx, log_sink) - } - crate::args::Protocol::GrpcUnary => { - setup_grpc_unary_sender(args, shutdown_tx, metrics_tx, log_sink) - } - crate::args::Protocol::GrpcStreaming => { - setup_grpc_streaming_sender(args, shutdown_tx, metrics_tx, log_sink) - } - crate::args::Protocol::Quic => setup_quic_sender(args, shutdown_tx, metrics_tx, log_sink), - crate::args::Protocol::Mqtt => setup_mqtt_sender(args, shutdown_tx, metrics_tx, log_sink), - crate::args::Protocol::Enet => setup_enet_sender(args, shutdown_tx, metrics_tx, log_sink), - crate::args::Protocol::Kcp => setup_kcp_sender(args, shutdown_tx, metrics_tx, log_sink), - crate::args::Protocol::Raknet => { - setup_raknet_sender(args, shutdown_tx, metrics_tx, log_sink) - } + let registry = super::protocol_registry(); + let Some(adapter) = registry.adapter(protocol) else { + let supported = registry.executable_protocols_csv(); + return Err(AppError::validation(ValidationError::UnsupportedProtocol { + protocol: protocol.as_str().to_owned(), + supported, + })); + }; + if !adapter.executes_traffic() { + let supported = registry.executable_protocols_csv(); + return Err(AppError::validation(ValidationError::UnsupportedProtocol { + protocol: protocol.as_str().to_owned(), + supported, + })); } + adapter.setup_request_sender(args, shutdown_tx, metrics_tx, log_sink) } -fn setup_tcp_sender( +pub(super) fn setup_tcp_sender( args: &TesterArgs, shutdown_tx: &ShutdownSender, metrics_tx: &mpsc::Sender, @@ -86,7 +81,7 @@ fn setup_tcp_sender( )) } -fn setup_udp_sender( +pub(super) fn setup_udp_sender( args: &TesterArgs, shutdown_tx: &ShutdownSender, metrics_tx: &mpsc::Sender, @@ -102,7 +97,7 @@ fn setup_udp_sender( ) } -fn setup_quic_sender( +pub(super) fn setup_quic_sender( args: &TesterArgs, shutdown_tx: &ShutdownSender, metrics_tx: &mpsc::Sender, @@ -118,7 +113,7 @@ fn setup_quic_sender( ) } -fn setup_enet_sender( +pub(super) fn setup_enet_sender( args: &TesterArgs, shutdown_tx: &ShutdownSender, metrics_tx: &mpsc::Sender, @@ -134,7 +129,7 @@ fn setup_enet_sender( ) } -fn setup_kcp_sender( +pub(super) fn setup_kcp_sender( args: &TesterArgs, shutdown_tx: &ShutdownSender, metrics_tx: &mpsc::Sender, @@ -150,7 +145,7 @@ fn setup_kcp_sender( ) } -fn setup_raknet_sender( +pub(super) fn setup_raknet_sender( args: &TesterArgs, shutdown_tx: &ShutdownSender, metrics_tx: &mpsc::Sender, @@ -171,7 +166,7 @@ fn setup_raknet_sender( ) } -fn setup_mqtt_sender( +pub(super) fn setup_mqtt_sender( args: &TesterArgs, shutdown_tx: &ShutdownSender, metrics_tx: &mpsc::Sender, @@ -205,7 +200,7 @@ fn setup_mqtt_sender( )) } -fn setup_websocket_sender( +pub(super) fn setup_websocket_sender( args: &TesterArgs, shutdown_tx: &ShutdownSender, metrics_tx: &mpsc::Sender, @@ -228,7 +223,7 @@ fn setup_websocket_sender( )) } -fn setup_grpc_unary_sender( +pub(super) fn setup_grpc_unary_sender( args: &TesterArgs, shutdown_tx: &ShutdownSender, metrics_tx: &mpsc::Sender, @@ -237,7 +232,7 @@ fn setup_grpc_unary_sender( setup_grpc_sender(args, shutdown_tx, metrics_tx, log_sink, false) } -fn setup_grpc_streaming_sender( +pub(super) fn setup_grpc_streaming_sender( args: &TesterArgs, shutdown_tx: &ShutdownSender, metrics_tx: &mpsc::Sender, diff --git a/src/protocol/runtime/tests/datagram_mqtt.rs b/src/protocol/runtime/tests/datagram_mqtt.rs index 709ac99..90c2102 100644 --- a/src/protocol/runtime/tests/datagram_mqtt.rs +++ b/src/protocol/runtime/tests/datagram_mqtt.rs @@ -42,7 +42,13 @@ fn datagram_protocols_emit_success_metric() -> AppResult<()> { let (shutdown_tx, _) = broadcast::channel::<()>(SHUTDOWN_CHANNEL_CAPACITY); let (metrics_tx, mut metrics_rx) = mpsc::channel::(8); - let sender_task = setup_request_sender(&args, &shutdown_tx, &metrics_tx, None)?; + let sender_task = setup_request_sender( + args.protocol.to_domain(), + &args, + &shutdown_tx, + &metrics_tx, + None, + )?; let metric = wait_metric(&mut metrics_rx, protocol).await?; if metric.timed_out { return Err(AppError::validation(format!( @@ -95,7 +101,13 @@ fn mqtt_protocol_emits_success_metric() -> AppResult<()> { let (shutdown_tx, _) = broadcast::channel::<()>(SHUTDOWN_CHANNEL_CAPACITY); let (metrics_tx, mut metrics_rx) = mpsc::channel::(8); - let sender_task = setup_request_sender(&args, &shutdown_tx, &metrics_tx, None)?; + let sender_task = setup_request_sender( + args.protocol.to_domain(), + &args, + &shutdown_tx, + &metrics_tx, + None, + )?; let metric = wait_metric(&mut metrics_rx, "mqtt").await?; if metric.timed_out { return Err(AppError::validation("Unexpected timeout for mqtt")); diff --git a/src/protocol/runtime/tests/scheme_resolution.rs b/src/protocol/runtime/tests/scheme_resolution.rs index ce9868f..5074786 100644 --- a/src/protocol/runtime/tests/scheme_resolution.rs +++ b/src/protocol/runtime/tests/scheme_resolution.rs @@ -68,7 +68,13 @@ fn planned_protocols_reject_unsupported_url_schemes() -> AppResult<()> { let args = parse_args(protocol, load_mode, url)?; let (shutdown_tx, _) = broadcast::channel::<()>(SHUTDOWN_CHANNEL_CAPACITY); let (metrics_tx, _metrics_rx) = mpsc::channel::(2); - let err = match setup_request_sender(&args, &shutdown_tx, &metrics_tx, None) { + let err = match setup_request_sender( + args.protocol.to_domain(), + &args, + &shutdown_tx, + &metrics_tx, + None, + ) { Ok(_) => { return Err(AppError::validation( "Expected unsupported scheme error but sender setup succeeded", diff --git a/src/protocol/runtime/tests/transport_http_grpc.rs b/src/protocol/runtime/tests/transport_http_grpc.rs index a2c93e4..4d202b9 100644 --- a/src/protocol/runtime/tests/transport_http_grpc.rs +++ b/src/protocol/runtime/tests/transport_http_grpc.rs @@ -52,7 +52,13 @@ fn transport_and_http_protocols_emit_success_metric() -> AppResult<()> { let (shutdown_tx, _) = broadcast::channel::<()>(SHUTDOWN_CHANNEL_CAPACITY); let (metrics_tx, mut metrics_rx) = mpsc::channel::(8); - let sender_task = setup_request_sender(&args, &shutdown_tx, &metrics_tx, None)?; + let sender_task = setup_request_sender( + args.protocol.to_domain(), + &args, + &shutdown_tx, + &metrics_tx, + None, + )?; let metric = wait_metric(&mut metrics_rx, label).await?; if metric.timed_out { return Err(AppError::validation(format!( diff --git a/src/protocol/tests.rs b/src/protocol/tests.rs index 72b62b6..15c3570 100644 --- a/src/protocol/tests.rs +++ b/src/protocol/tests.rs @@ -1,12 +1,12 @@ use super::*; -use crate::args::{LoadMode, Protocol}; +use crate::domain::run::{LoadMode, ProtocolKind}; #[derive(Clone)] struct FakeAdapter; impl ProtocolAdapter for FakeAdapter { - fn protocol(&self) -> Protocol { - Protocol::Http + fn protocol(&self) -> ProtocolKind { + ProtocolKind::Http } fn display_name(&self) -> &'static str { @@ -33,68 +33,70 @@ impl ProtocolAdapter for FakeAdapter { } } +impl TransportAdapter for FakeAdapter {} + #[test] fn builtins_register_http_as_executable() { let registry = ProtocolRegistry::with_builtins(); - assert!(registry.adapter(Protocol::Http).is_some()); - assert!(registry.supports_execution(Protocol::Http)); - assert!(registry.supports_load_mode(Protocol::Http, LoadMode::Soak)); + assert!(registry.adapter(ProtocolKind::Http).is_some()); + assert!(registry.supports_execution(ProtocolKind::Http)); + assert!(registry.supports_load_mode(ProtocolKind::Http, LoadMode::Soak)); } #[test] fn builtins_mark_websocket_as_executable() { let registry = ProtocolRegistry::with_builtins(); - assert!(registry.adapter(Protocol::Websocket).is_some()); - assert!(registry.supports_execution(Protocol::Websocket)); + assert!(registry.adapter(ProtocolKind::Websocket).is_some()); + assert!(registry.supports_execution(ProtocolKind::Websocket)); } #[test] fn builtins_mark_grpc_unary_as_executable() { let registry = ProtocolRegistry::with_builtins(); - assert!(registry.adapter(Protocol::GrpcUnary).is_some()); - assert!(registry.supports_execution(Protocol::GrpcUnary)); + assert!(registry.adapter(ProtocolKind::GrpcUnary).is_some()); + assert!(registry.supports_execution(ProtocolKind::GrpcUnary)); } #[test] fn builtins_mark_grpc_streaming_as_executable() { let registry = ProtocolRegistry::with_builtins(); - assert!(registry.adapter(Protocol::GrpcStreaming).is_some()); - assert!(registry.supports_execution(Protocol::GrpcStreaming)); + assert!(registry.adapter(ProtocolKind::GrpcStreaming).is_some()); + assert!(registry.supports_execution(ProtocolKind::GrpcStreaming)); } #[test] fn builtins_mark_quic_as_executable() { let registry = ProtocolRegistry::with_builtins(); - assert!(registry.adapter(Protocol::Quic).is_some()); - assert!(registry.supports_execution(Protocol::Quic)); + assert!(registry.adapter(ProtocolKind::Quic).is_some()); + assert!(registry.supports_execution(ProtocolKind::Quic)); } #[test] fn builtins_mark_mqtt_as_executable() { let registry = ProtocolRegistry::with_builtins(); - assert!(registry.adapter(Protocol::Mqtt).is_some()); - assert!(registry.supports_execution(Protocol::Mqtt)); + assert!(registry.adapter(ProtocolKind::Mqtt).is_some()); + assert!(registry.supports_execution(ProtocolKind::Mqtt)); } #[test] fn builtins_mark_enet_as_executable() { let registry = ProtocolRegistry::with_builtins(); - assert!(registry.adapter(Protocol::Enet).is_some()); - assert!(registry.supports_execution(Protocol::Enet)); + assert!(registry.adapter(ProtocolKind::Enet).is_some()); + assert!(registry.supports_execution(ProtocolKind::Enet)); } #[test] fn builtins_mark_kcp_as_executable() { let registry = ProtocolRegistry::with_builtins(); - assert!(registry.adapter(Protocol::Kcp).is_some()); - assert!(registry.supports_execution(Protocol::Kcp)); + assert!(registry.adapter(ProtocolKind::Kcp).is_some()); + assert!(registry.supports_execution(ProtocolKind::Kcp)); } #[test] fn builtins_mark_raknet_as_executable() { let registry = ProtocolRegistry::with_builtins(); - assert!(registry.adapter(Protocol::Raknet).is_some()); - assert!(registry.supports_execution(Protocol::Raknet)); + assert!(registry.adapter(ProtocolKind::Raknet).is_some()); + assert!(registry.supports_execution(ProtocolKind::Raknet)); } #[test] diff --git a/src/protocol/traits.rs b/src/protocol/traits.rs index d5bfae5..a292620 100644 --- a/src/protocol/traits.rs +++ b/src/protocol/traits.rs @@ -1,4 +1,13 @@ -use crate::args::{LoadMode, Protocol}; +use std::sync::Arc; + +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +use crate::args::TesterArgs; +use crate::domain::run::{LoadMode, ProtocolKind}; +use crate::error::{AppError, AppResult, ValidationError}; +use crate::metrics::{LogSink, Metrics}; +use crate::shutdown::ShutdownSender; #[derive(Debug, Clone, PartialEq, Eq)] pub struct ProtocolAdapterError { @@ -6,9 +15,31 @@ pub struct ProtocolAdapterError { } pub trait ProtocolAdapter: Send + Sync { - fn protocol(&self) -> Protocol; + fn protocol(&self) -> ProtocolKind; fn display_name(&self) -> &'static str; fn executes_traffic(&self) -> bool; fn supports_stateful_connections(&self) -> bool; fn supported_load_modes(&self) -> &'static [LoadMode]; } + +pub trait TransportAdapter: ProtocolAdapter { + /// Creates the request sender task for this protocol adapter. + /// + /// # Errors + /// + /// Returns `UnsupportedProtocol` when the adapter does not provide + /// traffic execution behavior. + fn setup_request_sender( + &self, + args: &TesterArgs, + shutdown_tx: &ShutdownSender, + metrics_tx: &mpsc::Sender, + log_sink: Option<&Arc>, + ) -> AppResult> { + let _ = (args, shutdown_tx, metrics_tx, log_sink); + Err(AppError::validation(ValidationError::UnsupportedProtocol { + protocol: self.protocol().as_str().to_owned(), + supported: String::new(), + })) + } +}