diff --git a/crates/sof-observer/src/app/config/base.rs b/crates/sof-observer/src/app/config/base.rs index 55107f27..7e302c9d 100644 --- a/crates/sof-observer/src/app/config/base.rs +++ b/crates/sof-observer/src/app/config/base.rs @@ -3,7 +3,9 @@ use std::{num::NonZeroUsize, path::PathBuf}; use super::{read_bool_env, read_env_var}; use crate::{ framework::{DerivedStateReplayBackend, DerivedStateReplayDurability}, - runtime::{DerivedStateReplayConfig, DerivedStateRuntimeConfig, ShredTrustMode}, + runtime::{ + DerivedStateReplayConfig, DerivedStateRuntimeConfig, RuntimeDeliveryProfile, ShredTrustMode, + }, }; fn read_optional_bool_env(name: &str) -> Option { @@ -34,6 +36,22 @@ pub fn read_runtime_current_thread() -> bool { read_bool_env("SOF_RUNTIME_CURRENT_THREAD", false) } +pub fn read_runtime_delivery_profile() -> RuntimeDeliveryProfile { + let Some(value) = + read_env_var("SOF_RUNTIME_DELIVERY_PROFILE").filter(|value| !value.trim().is_empty()) + else { + return RuntimeDeliveryProfile::default(); + }; + RuntimeDeliveryProfile::from_config_value(&value).unwrap_or_else(|| { + tracing::warn!( + value, + default = RuntimeDeliveryProfile::default().as_str(), + "unknown runtime delivery profile; using default" + ); + RuntimeDeliveryProfile::default() + }) +} + pub fn read_runtime_core() -> Option { read_env_var("SOF_RUNTIME_CORE").and_then(|value| value.parse::().ok()) } diff --git a/crates/sof-observer/src/app/runtime/entrypoints.rs b/crates/sof-observer/src/app/runtime/entrypoints.rs index 371377df..deefa571 100644 --- a/crates/sof-observer/src/app/runtime/entrypoints.rs +++ b/crates/sof-observer/src/app/runtime/entrypoints.rs @@ -13,18 +13,44 @@ pub(crate) enum RuntimeEntrypointError { Runloop { reason: String }, } -pub(crate) fn run() -> Result<(), RuntimeEntrypointError> { - run_with_hosts( - PluginHostBuilder::new().build(), - RuntimeExtensionHostBuilder::new().build(), +fn profiled_default_plugin_and_extension_hosts() -> (PluginHost, RuntimeExtensionHost) { + let profile = read_runtime_delivery_profile(); + ( + profile.plugin_host_builder().build(), + profile.extension_host_builder().build(), + ) +} + +fn profiled_default_hosts() -> (PluginHost, RuntimeExtensionHost, DerivedStateHost) { + let (plugin_host, extension_host) = profiled_default_plugin_and_extension_hosts(); + ( + plugin_host, + extension_host, DerivedStateHost::builder().build(), ) } +fn profiled_default_plugin_host() -> PluginHost { + read_runtime_delivery_profile() + .plugin_host_builder() + .build() +} + +fn profiled_default_extension_host() -> RuntimeExtensionHost { + read_runtime_delivery_profile() + .extension_host_builder() + .build() +} + +pub(crate) fn run() -> Result<(), RuntimeEntrypointError> { + let (plugin_host, extension_host, derived_state_host) = profiled_default_hosts(); + run_with_hosts(plugin_host, extension_host, derived_state_host) +} + pub(crate) fn run_with_plugin_host(plugin_host: PluginHost) -> Result<(), RuntimeEntrypointError> { run_with_hosts( plugin_host, - RuntimeExtensionHostBuilder::new().build(), + profiled_default_extension_host(), DerivedStateHost::builder().build(), ) } @@ -33,7 +59,7 @@ pub(crate) fn run_with_extension_host( extension_host: RuntimeExtensionHost, ) -> Result<(), RuntimeEntrypointError> { run_with_hosts( - PluginHostBuilder::new().build(), + profiled_default_plugin_host(), extension_host, DerivedStateHost::builder().build(), ) @@ -42,11 +68,8 @@ pub(crate) fn run_with_extension_host( pub(crate) fn run_with_derived_state_host( derived_state_host: DerivedStateHost, ) -> Result<(), RuntimeEntrypointError> { - run_with_hosts( - PluginHostBuilder::new().build(), - RuntimeExtensionHostBuilder::new().build(), - derived_state_host, - ) + let (plugin_host, extension_host) = profiled_default_plugin_and_extension_hosts(); + run_with_hosts(plugin_host, extension_host, derived_state_host) } pub(crate) fn run_with_hosts( @@ -204,13 +227,8 @@ fn pin_runtime_thread_if_requested() { } pub(crate) async fn run_async() -> Result<(), RuntimeEntrypointError> { - run_async_with_hosts( - PluginHostBuilder::new().build(), - RuntimeExtensionHostBuilder::new().build(), - DerivedStateHost::builder().build(), - None, - ) - .await + let (plugin_host, extension_host, derived_state_host) = profiled_default_hosts(); + run_async_with_hosts(plugin_host, extension_host, derived_state_host, None).await } pub(crate) async fn run_async_with_plugin_host( @@ -218,7 +236,7 @@ pub(crate) async fn run_async_with_plugin_host( ) -> Result<(), RuntimeEntrypointError> { run_async_with_hosts( plugin_host, - RuntimeExtensionHostBuilder::new().build(), + profiled_default_extension_host(), DerivedStateHost::builder().build(), None, ) @@ -229,7 +247,7 @@ pub(crate) async fn run_async_with_extension_host( extension_host: RuntimeExtensionHost, ) -> Result<(), RuntimeEntrypointError> { run_async_with_hosts( - PluginHostBuilder::new().build(), + profiled_default_plugin_host(), extension_host, DerivedStateHost::builder().build(), None, @@ -240,13 +258,8 @@ pub(crate) async fn run_async_with_extension_host( pub(crate) async fn run_async_with_derived_state_host( derived_state_host: DerivedStateHost, ) -> Result<(), RuntimeEntrypointError> { - run_async_with_hosts( - PluginHostBuilder::new().build(), - RuntimeExtensionHostBuilder::new().build(), - derived_state_host, - None, - ) - .await + let (plugin_host, extension_host) = profiled_default_plugin_and_extension_hosts(); + run_async_with_hosts(plugin_host, extension_host, derived_state_host, None).await } pub(crate) async fn run_async_with_hosts( @@ -313,9 +326,10 @@ async fn run_async_with_hosts_and_optional_shutdown( pub(crate) async fn run_async_with_kernel_bypass_ingress( packet_ingest_rx: ingest::RawPacketBatchReceiver, ) -> Result<(), RuntimeEntrypointError> { + let (plugin_host, extension_host) = profiled_default_plugin_and_extension_hosts(); run_async_with_hosts_and_kernel_bypass_ingress( - PluginHostBuilder::new().build(), - RuntimeExtensionHostBuilder::new().build(), + plugin_host, + extension_host, DerivedStateHost::builder().build(), None, packet_ingest_rx, @@ -330,7 +344,7 @@ pub(crate) async fn run_async_with_plugin_host_and_kernel_bypass_ingress( ) -> Result<(), RuntimeEntrypointError> { run_async_with_hosts_and_kernel_bypass_ingress( plugin_host, - RuntimeExtensionHostBuilder::new().build(), + profiled_default_extension_host(), DerivedStateHost::builder().build(), None, packet_ingest_rx, @@ -344,7 +358,7 @@ pub(crate) async fn run_async_with_extension_host_and_kernel_bypass_ingress( packet_ingest_rx: ingest::RawPacketBatchReceiver, ) -> Result<(), RuntimeEntrypointError> { run_async_with_hosts_and_kernel_bypass_ingress( - PluginHostBuilder::new().build(), + profiled_default_plugin_host(), extension_host, DerivedStateHost::builder().build(), None, @@ -358,9 +372,10 @@ pub(crate) async fn run_async_with_derived_state_host_and_kernel_bypass_ingress( derived_state_host: DerivedStateHost, packet_ingest_rx: ingest::RawPacketBatchReceiver, ) -> Result<(), RuntimeEntrypointError> { + let (plugin_host, extension_host) = profiled_default_plugin_and_extension_hosts(); run_async_with_hosts_and_kernel_bypass_ingress( - PluginHostBuilder::new().build(), - RuntimeExtensionHostBuilder::new().build(), + plugin_host, + extension_host, derived_state_host, None, packet_ingest_rx, diff --git a/crates/sof-observer/src/app/runtime/prelude.rs b/crates/sof-observer/src/app/runtime/prelude.rs index b3bb6f02..ffdadc59 100644 --- a/crates/sof-observer/src/app/runtime/prelude.rs +++ b/crates/sof-observer/src/app/runtime/prelude.rs @@ -34,8 +34,8 @@ pub(super) use crate::{ framework::{ CheckpointBarrierReason, DatasetEvent, DerivedStateHost, DiskDerivedStateReplaySource, FeedWatermarks, InMemoryDerivedStateReplaySource, ObservedRecentBlockhashEvent, PluginHost, - PluginHostBuilder, RawPacketEvent, ReorgEvent, RuntimeExtensionDispatchMetrics, - RuntimeExtensionHost, RuntimeExtensionHostBuilder, ShredEvent, SlotStatusEvent, + RawPacketEvent, ReorgEvent, RuntimeExtensionDispatchMetrics, RuntimeExtensionHost, + ShredEvent, SlotStatusEvent, }, ingest, reassembly::dataset::DataSetReassembler, diff --git a/crates/sof-observer/src/runtime.rs b/crates/sof-observer/src/runtime.rs index 4283fad7..4bdf122a 100644 --- a/crates/sof-observer/src/runtime.rs +++ b/crates/sof-observer/src/runtime.rs @@ -1,7 +1,5 @@ #![allow(clippy::missing_docs_in_private_items)] -#[cfg(feature = "gossip-bootstrap")] -use std::time::Duration; use std::{ collections::{HashMap, HashSet, VecDeque, hash_map::DefaultHasher, hash_map::Entry}, future::Future, @@ -10,8 +8,8 @@ use std::{ path::PathBuf, pin::Pin, sync::Arc, - thread, - time::Instant, + thread::{self, available_parallelism}, + time::{Duration, Instant}, }; #[cfg(feature = "gossip-bootstrap")] @@ -20,8 +18,8 @@ use crate::app::config::read_observability_bind_addr; use crate::framework::host::TransactionDispatchScope; use crate::framework::{ DerivedStateHost, DerivedStateReplayBackend, DerivedStateReplayDurability, - ObservedRecentBlockhashEvent, PluginHost, RuntimeExtensionHost, SignatureBytes, - TransactionEvent, + ObservedRecentBlockhashEvent, PluginDispatchMode, PluginHost, PluginHostBuilder, + RuntimeExtensionHost, RuntimeExtensionHostBuilder, SignatureBytes, TransactionEvent, }; #[cfg(feature = "kernel-bypass")] use crate::ingest::{RawPacketBatchReceiver, RawPacketBatchSender, create_raw_packet_batch_queue}; @@ -52,6 +50,12 @@ type ShutdownSignal = Pin + Send + 'static>>; const PROVIDER_REPLAY_DEDUPE_CAPACITY: usize = 65_536; const PROVIDER_REPLAY_DEDUPE_SLOT_WINDOW: u64 = 4_096; const MAX_PROVIDER_SERIALIZED_TRANSACTION_BYTES: usize = PACKET_DATA_SIZE; +const DEFAULT_RUNTIME_DELIVERY_EVENT_QUEUE_CAPACITY: usize = 8_192; +const DEFAULT_RUNTIME_DELIVERY_TRANSACTION_DISPATCH_WORKERS_CAP: usize = 8; +const DEFAULT_DERIVED_STATE_REPLAY_MAX_ENVELOPES: usize = 8_192; +const DEFAULT_DERIVED_STATE_REPLAY_MAX_SESSIONS: usize = 4; +const DEFAULT_RUNTIME_EXTENSION_QUEUE_DEPTH_WARN: u64 = 4_096; +const DEFAULT_RUNTIME_EXTENSION_DROP_WARN_DELTA: u64 = 100; #[cfg(feature = "gossip-bootstrap")] const PROVIDER_GOSSIP_CONTROL_PLANE_POLL_MS: u64 = 250; #[cfg(feature = "gossip-bootstrap")] @@ -189,6 +193,31 @@ pub enum RuntimeDeliveryProfile { DeliveryDisciplined, } +/// Concrete runtime defaults selected by one [`RuntimeDeliveryProfile`]. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct RuntimeDeliveryProfileSettings { + /// Bounded queue capacity used for async plugin dispatch lanes. + pub plugin_event_queue_capacity: usize, + /// Callback execution strategy used by plugin dispatch workers. + pub plugin_dispatch_mode: PluginDispatchMode, + /// Accepted-transaction dispatch workers per transaction priority lane. + pub plugin_transaction_dispatch_workers: usize, + /// Bounded queue capacity used for runtime extension packet dispatch. + pub extension_event_queue_capacity: usize, + /// Deadline applied to extension startup hooks. + pub extension_startup_timeout: Duration, + /// Deadline applied to extension shutdown hooks. + pub extension_shutdown_timeout: Duration, + /// Runtime-owned derived-state replay tail size. + pub derived_state_replay_max_envelopes: usize, + /// Runtime-owned derived-state replay session retention. + pub derived_state_replay_max_sessions: usize, + /// Runtime extension queue-depth warning threshold. + pub runtime_extension_queue_depth_warn: u64, + /// Runtime extension drop-count warning delta. + pub runtime_extension_drop_warn_delta: u64, +} + impl RuntimeDeliveryProfile { /// Returns env-string representation used by `SOF_RUNTIME_DELIVERY_PROFILE`. #[must_use] @@ -203,13 +232,111 @@ impl RuntimeDeliveryProfile { /// Parses config/env string into one typed delivery profile. #[must_use] pub fn from_config_value(value: &str) -> Option { - match value { + match value.trim().to_ascii_lowercase().as_str() { "latency_optimized" | "latency-optimized" => Some(Self::LatencyOptimized), "balanced" => Some(Self::Balanced), "delivery_disciplined" | "delivery-disciplined" => Some(Self::DeliveryDisciplined), _ => None, } } + + /// Returns concrete runtime defaults for this profile. + #[must_use] + pub fn settings(self) -> RuntimeDeliveryProfileSettings { + let default_transaction_workers = default_runtime_delivery_transaction_workers(); + match self { + Self::LatencyOptimized => RuntimeDeliveryProfileSettings { + plugin_event_queue_capacity: DEFAULT_RUNTIME_DELIVERY_EVENT_QUEUE_CAPACITY, + plugin_dispatch_mode: PluginDispatchMode::Sequential, + plugin_transaction_dispatch_workers: default_transaction_workers, + extension_event_queue_capacity: DEFAULT_RUNTIME_DELIVERY_EVENT_QUEUE_CAPACITY, + extension_startup_timeout: Duration::from_secs(5), + extension_shutdown_timeout: Duration::from_secs(3), + derived_state_replay_max_envelopes: DEFAULT_DERIVED_STATE_REPLAY_MAX_ENVELOPES, + derived_state_replay_max_sessions: DEFAULT_DERIVED_STATE_REPLAY_MAX_SESSIONS, + runtime_extension_queue_depth_warn: DEFAULT_RUNTIME_EXTENSION_QUEUE_DEPTH_WARN, + runtime_extension_drop_warn_delta: DEFAULT_RUNTIME_EXTENSION_DROP_WARN_DELTA, + }, + Self::Balanced => RuntimeDeliveryProfileSettings { + plugin_event_queue_capacity: DEFAULT_RUNTIME_DELIVERY_EVENT_QUEUE_CAPACITY * 2, + plugin_dispatch_mode: PluginDispatchMode::Sequential, + plugin_transaction_dispatch_workers: default_transaction_workers, + extension_event_queue_capacity: DEFAULT_RUNTIME_DELIVERY_EVENT_QUEUE_CAPACITY * 2, + extension_startup_timeout: Duration::from_secs(5), + extension_shutdown_timeout: Duration::from_secs(6), + derived_state_replay_max_envelopes: DEFAULT_DERIVED_STATE_REPLAY_MAX_ENVELOPES * 2, + derived_state_replay_max_sessions: DEFAULT_DERIVED_STATE_REPLAY_MAX_SESSIONS + 2, + runtime_extension_queue_depth_warn: DEFAULT_RUNTIME_EXTENSION_QUEUE_DEPTH_WARN * 2, + runtime_extension_drop_warn_delta: DEFAULT_RUNTIME_EXTENSION_DROP_WARN_DELTA / 2, + }, + Self::DeliveryDisciplined => RuntimeDeliveryProfileSettings { + plugin_event_queue_capacity: DEFAULT_RUNTIME_DELIVERY_EVENT_QUEUE_CAPACITY * 4, + plugin_dispatch_mode: PluginDispatchMode::Sequential, + plugin_transaction_dispatch_workers: 1, + extension_event_queue_capacity: DEFAULT_RUNTIME_DELIVERY_EVENT_QUEUE_CAPACITY * 4, + extension_startup_timeout: Duration::from_secs(10), + extension_shutdown_timeout: Duration::from_secs(15), + derived_state_replay_max_envelopes: DEFAULT_DERIVED_STATE_REPLAY_MAX_ENVELOPES * 4, + derived_state_replay_max_sessions: DEFAULT_DERIVED_STATE_REPLAY_MAX_SESSIONS * 2, + runtime_extension_queue_depth_warn: DEFAULT_RUNTIME_EXTENSION_QUEUE_DEPTH_WARN * 4, + runtime_extension_drop_warn_delta: DEFAULT_RUNTIME_EXTENSION_DROP_WARN_DELTA / 10, + }, + } + } + + /// Applies this profile to one plugin-host builder. + #[must_use] + pub fn apply_to_plugin_host_builder(self, builder: PluginHostBuilder) -> PluginHostBuilder { + let settings = self.settings(); + builder + .with_event_queue_capacity(settings.plugin_event_queue_capacity) + .with_dispatch_mode(settings.plugin_dispatch_mode) + .with_transaction_dispatch_workers(settings.plugin_transaction_dispatch_workers) + } + + /// Returns a plugin-host builder using this profile's dispatch defaults. + #[must_use] + pub fn plugin_host_builder(self) -> PluginHostBuilder { + self.apply_to_plugin_host_builder(PluginHostBuilder::new()) + } + + /// Applies this profile to one runtime-extension-host builder. + #[must_use] + pub fn apply_to_extension_host_builder( + self, + builder: RuntimeExtensionHostBuilder, + ) -> RuntimeExtensionHostBuilder { + let settings = self.settings(); + builder + .with_event_queue_capacity(settings.extension_event_queue_capacity) + .with_startup_timeout(settings.extension_startup_timeout) + .with_shutdown_timeout(settings.extension_shutdown_timeout) + } + + /// Returns a runtime-extension-host builder using this profile's dispatch defaults. + #[must_use] + pub fn extension_host_builder(self) -> RuntimeExtensionHostBuilder { + self.apply_to_extension_host_builder(RuntimeExtensionHostBuilder::new()) + } + + /// Applies this profile's env-backed defaults to one setup bundle. + #[must_use] + pub fn apply_to_setup(self, setup: RuntimeSetup) -> RuntimeSetup { + let settings = self.settings(); + setup + .with_runtime_delivery_profile(self) + .with_derived_state_replay_max_envelopes(settings.derived_state_replay_max_envelopes) + .with_derived_state_replay_max_sessions(settings.derived_state_replay_max_sessions) + .with_runtime_extension_queue_depth_warn(settings.runtime_extension_queue_depth_warn) + .with_runtime_extension_drop_warn_delta(settings.runtime_extension_drop_warn_delta) + } +} + +fn default_runtime_delivery_transaction_workers() -> usize { + available_parallelism() + .map(usize::from) + .unwrap_or(1) + .clamp(1, DEFAULT_RUNTIME_DELIVERY_TRANSACTION_DISPATCH_WORKERS_CAP) } /// Explicit trust posture for raw-shred ingest. @@ -690,6 +817,24 @@ impl RuntimeSetup { self.with_env("SOF_LOG_DATASET_RECONSTRUCTION", enabled.to_string()) } + /// Sets `SOF_RUNTIME_EXTENSION_QUEUE_DEPTH_WARN`. + #[must_use] + pub fn with_runtime_extension_queue_depth_warn(self, queue_depth: u64) -> Self { + self.with_env( + "SOF_RUNTIME_EXTENSION_QUEUE_DEPTH_WARN", + queue_depth.to_string(), + ) + } + + /// Sets `SOF_RUNTIME_EXTENSION_DROP_WARN_DELTA`. + #[must_use] + pub fn with_runtime_extension_drop_warn_delta(self, drop_delta: u64) -> Self { + self.with_env( + "SOF_RUNTIME_EXTENSION_DROP_WARN_DELTA", + drop_delta.to_string(), + ) + } + /// Sets `SOF_DERIVED_STATE_CHECKPOINT_INTERVAL_MS`. /// /// Use `0` to disable periodic checkpoint barriers. @@ -1518,8 +1663,12 @@ pub fn run_with_hosts_and_setup( pub struct ObserverRuntime { /// Plugin host invoked by the packaged observer runtime. plugin_host: PluginHost, + /// Whether the plugin host was supplied explicitly by the embedder. + custom_plugin_host: bool, /// Runtime extension host invoked by the packaged observer runtime. extension_host: RuntimeExtensionHost, + /// Whether the extension host was supplied explicitly by the embedder. + custom_extension_host: bool, /// Derived-state host invoked by the packaged observer runtime. derived_state_host: DerivedStateHost, /// Programmatic setup overrides applied before startup. @@ -1560,6 +1709,7 @@ impl ObserverRuntime { #[must_use] pub fn with_plugin_host(mut self, plugin_host: PluginHost) -> Self { self.plugin_host = plugin_host; + self.custom_plugin_host = true; self } @@ -1567,6 +1717,7 @@ impl ObserverRuntime { #[must_use] pub fn with_extension_host(mut self, extension_host: RuntimeExtensionHost) -> Self { self.extension_host = extension_host; + self.custom_extension_host = true; self } @@ -1584,6 +1735,22 @@ impl ObserverRuntime { self } + /// Applies one runtime-level downstream delivery profile. + /// + /// This updates SOF-owned default hosts and env-backed runtime defaults. Explicitly supplied + /// plugin or extension hosts keep their caller-provided configuration. + #[must_use] + pub fn with_runtime_delivery_profile(mut self, profile: RuntimeDeliveryProfile) -> Self { + self.setup = profile.apply_to_setup(self.setup); + if !self.custom_plugin_host { + self.plugin_host = profile.plugin_host_builder().build(); + } + if !self.custom_extension_host { + self.extension_host = profile.extension_host_builder().build(); + } + self + } + /// Replaces the raw-shred runtime with a processed provider-stream ingress. /// /// Processed provider streams such as Yellowstone gRPC or LaserStream feed @@ -3483,16 +3650,20 @@ mod tests { Arc, Mutex, atomic::{AtomicUsize, Ordering}, }, + thread, time::Instant, }; use super::*; + use crate::app::config::read_runtime_delivery_profile; use crate::event::TxKind; use crate::framework::{ - DerivedStateCheckpoint, DerivedStateConsumer, DerivedStateConsumerConfig, + DatasetEvent, DerivedStateCheckpoint, DerivedStateConsumer, DerivedStateConsumerConfig, DerivedStateConsumerContext, DerivedStateConsumerFault, DerivedStateFeedEnvelope, - DerivedStateFeedEvent, ExtensionContext, ExtensionManifest, ObserverPlugin, PluginConfig, - PluginContext, RawPacketEvent, RuntimeExtension, TransactionInterest, TransactionPrefilter, + DerivedStateFeedEvent, ExtensionCapability, ExtensionContext, ExtensionManifest, + ExtensionSetupError, ObserverPlugin, PacketSubscription, PluginConfig, PluginContext, + RawPacketEvent, RuntimeExtension, RuntimePacketEvent, RuntimePacketSourceKind, + TransactionInterest, TransactionPrefilter, }; use async_trait::async_trait; use sof_gossip_tuning::{GossipTuningProfile, HostProfilePreset, IngestQueueMode}; @@ -3502,6 +3673,7 @@ mod tests { use solana_signature::Signature; use solana_signer::Signer; use solana_transaction::versioned::VersionedTransaction; + use tokio::time::sleep; fn with_runtime_env_overrides( overrides: impl IntoIterator, @@ -3553,6 +3725,12 @@ mod tests { transaction_count: Arc, recent_blockhash_count: Arc, } + struct RuntimeDeliveryDatasetCounterPlugin { + counter: Arc, + } + struct RuntimeDeliveryTransactionCounterPlugin { + counter: Arc, + } #[cfg(feature = "gossip-bootstrap")] struct ClusterTopologyOnlyPlugin; struct StartupCounterPlugin { @@ -3561,6 +3739,9 @@ mod tests { struct StartupCounterExtension { counter: Arc, } + struct RuntimeDeliveryPacketCounterExtension { + counter: Arc, + } struct AccountTouchDerivedStateConsumer; struct TransactionStatusDerivedStateConsumer; struct BlockMetaDerivedStateConsumer; @@ -3636,6 +3817,28 @@ mod tests { } } + #[async_trait] + impl ObserverPlugin for RuntimeDeliveryDatasetCounterPlugin { + fn config(&self) -> PluginConfig { + PluginConfig::new().with_dataset() + } + + async fn on_dataset(&self, _event: DatasetEvent) { + self.counter.fetch_add(1, Ordering::Relaxed); + } + } + + #[async_trait] + impl ObserverPlugin for RuntimeDeliveryTransactionCounterPlugin { + fn config(&self) -> PluginConfig { + PluginConfig::new().with_transaction() + } + + async fn on_transaction(&self, _event: &TransactionEvent) { + self.counter.fetch_add(1, Ordering::Relaxed); + } + } + #[async_trait] impl ObserverPlugin for StartupCounterPlugin { fn config(&self) -> PluginConfig { @@ -3655,12 +3858,33 @@ mod tests { async fn setup( &self, _ctx: ExtensionContext, - ) -> Result { + ) -> Result { self.counter.fetch_add(1, Ordering::Relaxed); Ok(ExtensionManifest::default()) } } + #[async_trait] + impl RuntimeExtension for RuntimeDeliveryPacketCounterExtension { + async fn setup( + &self, + _ctx: ExtensionContext, + ) -> Result { + Ok(ExtensionManifest { + capabilities: vec![ExtensionCapability::ObserveObserverIngress], + subscriptions: vec![PacketSubscription { + source_kind: Some(RuntimePacketSourceKind::ObserverIngress), + ..PacketSubscription::default() + }], + ..ExtensionManifest::default() + }) + } + + async fn on_packet_received(&self, _event: RuntimePacketEvent) { + self.counter.fetch_add(1, Ordering::Relaxed); + } + } + impl DerivedStateConsumer for AccountTouchDerivedStateConsumer { fn name(&self) -> &'static str { "account-touch-derived-state" @@ -4584,6 +4808,372 @@ mod tests { ); } + #[test] + fn runtime_delivery_profile_parses_config_values_tolerantly() { + assert_eq!( + RuntimeDeliveryProfile::from_config_value("LATENCY-OPTIMIZED"), + Some(RuntimeDeliveryProfile::LatencyOptimized) + ); + assert_eq!( + RuntimeDeliveryProfile::from_config_value(" balanced "), + Some(RuntimeDeliveryProfile::Balanced) + ); + assert_eq!( + RuntimeDeliveryProfile::from_config_value("DELIVERY_DISCIPLINED"), + Some(RuntimeDeliveryProfile::DeliveryDisciplined) + ); + assert_eq!(RuntimeDeliveryProfile::from_config_value("lossless"), None); + } + + #[test] + fn runtime_delivery_profile_reads_env_override() { + with_runtime_env_overrides( + [( + String::from("SOF_RUNTIME_DELIVERY_PROFILE"), + String::from("delivery-disciplined"), + )], + || { + assert_eq!( + read_runtime_delivery_profile(), + RuntimeDeliveryProfile::DeliveryDisciplined + ); + }, + ); + + with_runtime_env_overrides( + [( + String::from("SOF_RUNTIME_DELIVERY_PROFILE"), + String::from("unknown"), + )], + || { + assert_eq!( + read_runtime_delivery_profile(), + RuntimeDeliveryProfile::LatencyOptimized + ); + }, + ); + } + + #[test] + fn runtime_delivery_profile_settings_preserve_default_and_scale_discipline() { + let latency = RuntimeDeliveryProfile::LatencyOptimized.settings(); + let balanced = RuntimeDeliveryProfile::Balanced.settings(); + let disciplined = RuntimeDeliveryProfile::DeliveryDisciplined.settings(); + + assert_eq!( + latency.plugin_event_queue_capacity, + DEFAULT_RUNTIME_DELIVERY_EVENT_QUEUE_CAPACITY + ); + assert_eq!( + latency.derived_state_replay_max_envelopes, + DEFAULT_DERIVED_STATE_REPLAY_MAX_ENVELOPES + ); + assert!(balanced.plugin_event_queue_capacity > latency.plugin_event_queue_capacity); + assert!(disciplined.plugin_event_queue_capacity > balanced.plugin_event_queue_capacity); + assert!(balanced.extension_shutdown_timeout > latency.extension_shutdown_timeout); + assert!(disciplined.extension_shutdown_timeout > balanced.extension_shutdown_timeout); + assert!(latency.plugin_transaction_dispatch_workers >= 1); + assert!( + latency.plugin_transaction_dispatch_workers + <= DEFAULT_RUNTIME_DELIVERY_TRANSACTION_DISPATCH_WORKERS_CAP + ); + assert!(balanced.plugin_transaction_dispatch_workers >= 1); + assert!( + balanced.plugin_transaction_dispatch_workers + <= DEFAULT_RUNTIME_DELIVERY_TRANSACTION_DISPATCH_WORKERS_CAP + ); + assert_eq!(disciplined.plugin_transaction_dispatch_workers, 1); + assert_eq!( + disciplined.plugin_dispatch_mode, + PluginDispatchMode::Sequential + ); + } + + #[test] + fn runtime_delivery_profile_settings_have_coherent_pressure_defaults() { + let profiles = [ + RuntimeDeliveryProfile::LatencyOptimized, + RuntimeDeliveryProfile::Balanced, + RuntimeDeliveryProfile::DeliveryDisciplined, + ]; + + for profile in profiles { + let settings = profile.settings(); + assert_eq!( + settings.plugin_event_queue_capacity, + settings.extension_event_queue_capacity + ); + assert_eq!( + settings.derived_state_replay_max_envelopes, + settings.plugin_event_queue_capacity + ); + assert!(settings.derived_state_replay_max_sessions > 0); + assert!(settings.runtime_extension_queue_depth_warn > 0); + assert!( + settings.runtime_extension_queue_depth_warn + < u64::try_from(settings.extension_event_queue_capacity) + .expect("profile queue capacity should fit in u64") + ); + assert!(settings.runtime_extension_drop_warn_delta > 0); + assert!(settings.extension_startup_timeout >= Duration::from_secs(5)); + assert!(settings.extension_shutdown_timeout >= Duration::from_secs(3)); + } + } + + #[test] + fn runtime_delivery_profile_applies_env_backed_defaults_to_setup() { + let setup = RuntimeDeliveryProfile::DeliveryDisciplined.apply_to_setup(RuntimeSetup::new()); + let overrides = setup.env_overrides.into_iter().collect::>(); + + assert_eq!( + overrides.get("SOF_RUNTIME_DELIVERY_PROFILE"), + Some(&"delivery_disciplined".to_owned()) + ); + assert_eq!( + overrides.get("SOF_DERIVED_STATE_REPLAY_MAX_ENVELOPES"), + Some(&"32768".to_owned()) + ); + assert_eq!( + overrides.get("SOF_DERIVED_STATE_REPLAY_MAX_SESSIONS"), + Some(&"8".to_owned()) + ); + assert_eq!( + overrides.get("SOF_RUNTIME_EXTENSION_QUEUE_DEPTH_WARN"), + Some(&"16384".to_owned()) + ); + assert_eq!( + overrides.get("SOF_RUNTIME_EXTENSION_DROP_WARN_DELTA"), + Some(&"10".to_owned()) + ); + } + + #[test] + fn observer_runtime_profile_preserves_explicit_plugin_and_extension_hosts() { + let plugin_host = PluginHost::builder().build(); + let extension_host = RuntimeExtensionHost::builder().build(); + + let runtime = ObserverRuntime::new() + .with_plugin_host(plugin_host) + .with_extension_host(extension_host) + .with_runtime_delivery_profile(RuntimeDeliveryProfile::DeliveryDisciplined); + let overrides = runtime + .setup + .env_overrides + .into_iter() + .collect::>(); + + assert!(runtime.custom_plugin_host); + assert!(runtime.custom_extension_host); + assert_eq!( + overrides.get("SOF_RUNTIME_DELIVERY_PROFILE"), + Some(&"delivery_disciplined".to_owned()) + ); + } + + #[test] + #[ignore = "soak/benchmark fixture for runtime delivery profile plugin dataset dispatch"] + fn runtime_delivery_profile_plugin_dataset_dispatch_profile_fixture() { + let iterations = profile_iterations(20_000); + for profile in runtime_delivery_profiles() { + let counter = Arc::new(AtomicUsize::new(0)); + let settings = profile.settings(); + let host = profile + .plugin_host_builder() + .add_plugin(RuntimeDeliveryDatasetCounterPlugin { + counter: Arc::clone(&counter), + }) + .build(); + let chunk_size = settings + .plugin_event_queue_capacity + .saturating_div(2) + .max(1); + + let started_at = Instant::now(); + for iteration in 0..iterations { + host.on_dataset(runtime_delivery_dataset_event(iteration as u64)); + if (iteration + 1) % chunk_size == 0 { + assert!(wait_until_counter( + counter.as_ref(), + iteration + 1, + Duration::from_secs(10), + )); + } + } + assert!(wait_until_counter( + counter.as_ref(), + iterations, + Duration::from_secs(10), + )); + let elapsed = started_at.elapsed(); + + assert_eq!(host.dropped_event_count(), 0); + println!( + "runtime_delivery_profile_plugin_dataset_dispatch_profile_fixture profile={:?} iterations={} elapsed_us={} avg_ns_per_iteration={}", + profile, + iterations, + elapsed.as_micros(), + avg_ns_per_iteration(elapsed, iterations), + ); + } + } + + #[test] + #[ignore = "soak/benchmark fixture for runtime delivery profile transaction dispatch"] + fn runtime_delivery_profile_transaction_dispatch_profile_fixture() { + let iterations = profile_iterations(20_000); + let event = runtime_delivery_transaction_event(); + for profile in runtime_delivery_profiles() { + let counter = Arc::new(AtomicUsize::new(0)); + let settings = profile.settings(); + let host = profile + .plugin_host_builder() + .add_plugin(RuntimeDeliveryTransactionCounterPlugin { + counter: Arc::clone(&counter), + }) + .build(); + let chunk_size = settings + .plugin_event_queue_capacity + .saturating_div(2) + .max(1); + + let started_at = Instant::now(); + for iteration in 0..iterations { + host.on_transaction(event.clone()); + if (iteration + 1) % chunk_size == 0 { + assert!(wait_until_counter( + counter.as_ref(), + iteration + 1, + Duration::from_secs(10), + )); + } + } + assert!(wait_until_counter( + counter.as_ref(), + iterations, + Duration::from_secs(10), + )); + let elapsed = started_at.elapsed(); + + assert_eq!(host.dropped_event_count(), 0); + println!( + "runtime_delivery_profile_transaction_dispatch_profile_fixture profile={:?} iterations={} transaction_workers={} elapsed_us={} avg_ns_per_iteration={}", + profile, + iterations, + settings.plugin_transaction_dispatch_workers, + elapsed.as_micros(), + avg_ns_per_iteration(elapsed, iterations), + ); + } + } + + #[tokio::test] + #[ignore = "soak/benchmark fixture for runtime delivery profile extension dispatch"] + async fn runtime_delivery_profile_extension_dispatch_profile_fixture() { + let iterations = profile_iterations(20_000); + let source = "127.0.0.1:9001" + .parse::() + .expect("valid packet source"); + for profile in runtime_delivery_profiles() { + let counter = Arc::new(AtomicUsize::new(0)); + let settings = profile.settings(); + let host = profile + .extension_host_builder() + .add_extension(RuntimeDeliveryPacketCounterExtension { + counter: Arc::clone(&counter), + }) + .build(); + let startup = host.startup().await; + assert_eq!(startup.active_extensions, 1); + let chunk_size = settings + .extension_event_queue_capacity + .saturating_div(2) + .max(1); + + let started_at = Instant::now(); + for iteration in 0..iterations { + host.on_observer_packet(source, &[7_u8; 32]); + if (iteration + 1) % chunk_size == 0 { + assert!( + wait_until_counter_async( + counter.as_ref(), + iteration + 1, + Duration::from_secs(10), + ) + .await + ); + } + } + assert!( + wait_until_counter_async(counter.as_ref(), iterations, Duration::from_secs(10),) + .await + ); + let elapsed = started_at.elapsed(); + + assert_eq!(host.dropped_event_count(), 0); + host.shutdown().await; + println!( + "runtime_delivery_profile_extension_dispatch_profile_fixture profile={:?} iterations={} elapsed_us={} avg_ns_per_iteration={}", + profile, + iterations, + elapsed.as_micros(), + avg_ns_per_iteration(elapsed, iterations), + ); + } + } + + const fn runtime_delivery_profiles() -> [RuntimeDeliveryProfile; 3] { + [ + RuntimeDeliveryProfile::LatencyOptimized, + RuntimeDeliveryProfile::Balanced, + RuntimeDeliveryProfile::DeliveryDisciplined, + ] + } + + fn runtime_delivery_dataset_event(slot: u64) -> DatasetEvent { + DatasetEvent { + slot, + start_index: 0, + end_index: 0, + last_in_slot: false, + shreds: 1, + payload_len: 1, + tx_count: 1, + } + } + + fn runtime_delivery_transaction_event() -> TransactionEvent { + let ProviderStreamUpdate::Transaction(event) = sample_provider_transaction_update() else { + panic!("sample transaction helper returns transaction update"); + }; + event + } + + fn wait_until_counter(counter: &AtomicUsize, expected: usize, timeout: Duration) -> bool { + let deadline = Instant::now() + timeout; + while Instant::now() < deadline { + if counter.load(Ordering::Relaxed) >= expected { + return true; + } + thread::sleep(Duration::from_millis(10)); + } + counter.load(Ordering::Relaxed) >= expected + } + + async fn wait_until_counter_async( + counter: &AtomicUsize, + expected: usize, + timeout: Duration, + ) -> bool { + let deadline = Instant::now() + timeout; + while Instant::now() < deadline { + if counter.load(Ordering::Relaxed) >= expected { + return true; + } + sleep(Duration::from_millis(10)).await; + } + counter.load(Ordering::Relaxed) >= expected + } + #[test] fn disabled_observability_config_is_not_serialized() { let setup = diff --git a/docs/architecture/adr/0013-runtime-delivery-profiles.md b/docs/architecture/adr/0013-runtime-delivery-profiles.md index 49367fe7..6a4a9e6b 100644 --- a/docs/architecture/adr/0013-runtime-delivery-profiles.md +++ b/docs/architecture/adr/0013-runtime-delivery-profiles.md @@ -65,7 +65,8 @@ The profile selection MUST: - be represented as a typed enum in Rust - be mirrored as a typed enum in TypeScript and Python SDKs -- fail configuration loading on unknown values instead of silently falling back +- parse unknown values as invalid at the typed parser boundary +- make any environment/default fallback explicit and test-covered instead of hidden - apply at the runtime-instance level, not per hook or per plugin in the first version The profile selection MUST NOT: @@ -257,18 +258,49 @@ Allowed configuration entry points: - Rust config builders and typed config structs - generated TypeScript and Python SDK config types -- environment or file decoding layers that parse into the typed enum and reject unknown values +- environment or file decoding layers that parse into the typed enum before applying defaults The first version MUST NOT add: - per-hook profile selection - per-plugin fairness toggles - arbitrary queue-policy combinators -- hidden fallback from one profile to another +- undocumented fallback from one profile to another Future narrower overrides, if ever added, require a follow-up ADR and must be justified by a real measured workload rather than speculative flexibility. +### First Implementation Defaults + +The first Rust implementation maps the profiles to the following tested runtime defaults. These +values are implementation defaults, not wire-protocol guarantees, but they define the intended +operational shape of the initial profile set. + +| Profile | Plugin queue | Plugin dispatch | Transaction dispatch workers | Extension queue | Extension startup | Extension shutdown | Derived-state replay envelopes | Derived-state replay sessions | Extension queue warning | Extension drop warning | +| --- | ---: | --- | --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | +| `LatencyOptimized` | `8192` | sequential | `min(host parallelism, 8)` | `8192` | `5s` | `3s` | `8192` | `4` | `4096` | `100` | +| `Balanced` | `16384` | sequential | `min(host parallelism, 8)` | `16384` | `5s` | `6s` | `16384` | `6` | `8192` | `50` | +| `DeliveryDisciplined` | `32768` | sequential | `1` | `32768` | `10s` | `15s` | `32768` | `8` | `16384` | `10` | + +Configuration guidance: + +- Use `LatencyOptimized` when callback latency and newest-state freshness matter more than + downstream retention under pressure. +- Use `Balanced` for mixed services that can afford more buffering and want earlier drop warnings + without changing ingest ownership. +- Use `DeliveryDisciplined` for stateful or analytical consumers that want stronger lane-local + drain discipline and can pay the extra memory and latency budget. + +Manual soak/benchmark fixtures: + +```sh +cargo test -p sof runtime_delivery_profile_plugin_dataset_dispatch_profile_fixture --lib -- --ignored --nocapture +cargo test -p sof runtime_delivery_profile_transaction_dispatch_profile_fixture --lib -- --ignored --nocapture +cargo test -p sof runtime_delivery_profile_extension_dispatch_profile_fixture --lib -- --ignored --nocapture +``` + +Set `SOF_PROFILE_ITERATIONS` to raise or lower the event count for local soak runs. + ## Guarantees and Non-Guarantees The docs for this feature must define, per profile: @@ -354,7 +386,7 @@ Implementation work for this ADR is not complete until the following exist: - regression tests that prove the documented drop or drain behavior for each profile - explicit observability for pressure escalation and shedding decisions - docs that define profile guarantees and non-guarantees without marketing language -- TS/Python enum generation that preserves typed selection and typed failure on unknown values +- TS/Python enum generation that preserves typed selection and invalid-value handling - validation that no profile allows ingest to be stalled by slow downstream consumers - behavioral regression scenarios covering burst pressure on plugin lanes, slow consumer on a non-hot lane, shutdown with queued work, provider reconnect plus backlog, mixed transaction and diff --git a/docs/gitbook/operations/knob-registry.md b/docs/gitbook/operations/knob-registry.md index d302d78d..bad35ef3 100644 --- a/docs/gitbook/operations/knob-registry.md +++ b/docs/gitbook/operations/knob-registry.md @@ -74,6 +74,7 @@ Snapshot date: | `SOF_WORKER_THREADS` | host parallelism | all builds | Base worker parallelism used by the runtime. | | `SOF_RUNTIME_CURRENT_THREAD` | `false` | all builds | Run SOF on a dedicated current-thread Tokio runtime. | | `SOF_RUNTIME_CORE` | unset | all builds | Optional CPU core pin for the dedicated runtime thread when current-thread mode is enabled. | +| `SOF_RUNTIME_DELIVERY_PROFILE` | `latency_optimized` | all builds | Runtime-owned downstream delivery profile. Supported values: `latency_optimized`, `balanced`, `delivery_disciplined`. | | `SOF_DATASET_WORKERS` | `SOF_WORKER_THREADS` | all builds | Number of dataset reconstruction workers. | | `SOF_PACKET_WORKERS` | `SOF_WORKER_THREADS` | all builds | Number of packet verify/FEC/reassembly workers. | | `SOF_PACKET_WORKER_QUEUE_CAPACITY` | `256` | all builds | Queue depth per packet worker. | diff --git a/docs/operations/advanced-env.md b/docs/operations/advanced-env.md index 25205c6e..0d646878 100644 --- a/docs/operations/advanced-env.md +++ b/docs/operations/advanced-env.md @@ -81,6 +81,7 @@ Duplicate-shred mode guidance: | `SOF_WORKER_THREADS` | host parallelism | Oversizing can add contention and context-switch overhead. | | `SOF_RUNTIME_CURRENT_THREAD` | `false` | Runs SOF on a dedicated current-thread Tokio runtime instead of the default multithreaded runtime. Useful only when you are explicitly isolating one observer thread and measuring the result. | | `SOF_RUNTIME_CORE` | unset | Optional CPU core pin for the dedicated runtime thread when `SOF_RUNTIME_CURRENT_THREAD=true`. Incorrect pinning can hurt overall throughput on small hosts. | +| `SOF_RUNTIME_DELIVERY_PROFILE` | `latency_optimized` | Selects a tested downstream delivery posture: `latency_optimized` for freshest low-latency callbacks, `balanced` for more buffering and earlier drop warnings, or `delivery_disciplined` for stronger lane-local drain discipline with higher memory and latency cost. | | `SOF_DATASET_WORKERS` | `SOF_WORKER_THREADS` | Dataset reconstruction runs on Tokio's blocking pool; too high can still cause queue churn, extra memory pressure, and CPU contention without higher throughput. | | `SOF_PACKET_WORKERS` | `SOF_WORKER_THREADS` | Packet verification/FEC/reassembly fanout; too low underuses multi-core hosts, too high adds scheduling and cache overhead. | | `SOF_PACKET_WORKER_QUEUE_CAPACITY` | `256` | Queue depth per packet worker. Current full-queue policy is `drop_newest`, so raising this mostly trades packet loss for extra latency/memory unless workers also get faster. |