Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions crates/sof-observer/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,41 @@ pub struct RuntimeObservabilityConfig {
pub bind_addr: Option<SocketAddr>,
}

/// Typed runtime-level downstream delivery bias.
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub enum RuntimeDeliveryProfile {
/// Preserve current SOF behavior: tight bounded queues and freshness-first shedding.
#[default]
LatencyOptimized,
/// Keep ingest bounded while allowing more buffering and earlier pressure signaling.
Balanced,
/// Favor stronger downstream ordering and drain discipline on runtime-owned non-hot lanes.
DeliveryDisciplined,
}

impl RuntimeDeliveryProfile {
/// Returns env-string representation used by `SOF_RUNTIME_DELIVERY_PROFILE`.
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::LatencyOptimized => "latency_optimized",
Self::Balanced => "balanced",
Self::DeliveryDisciplined => "delivery_disciplined",
}
}

/// Parses config/env string into one typed delivery profile.
#[must_use]
pub fn from_config_value(value: &str) -> Option<Self> {
match value {
"latency_optimized" | "latency-optimized" => Some(Self::LatencyOptimized),
"balanced" => Some(Self::Balanced),
"delivery_disciplined" | "delivery-disciplined" => Some(Self::DeliveryDisciplined),
_ => None,
}
}
}

/// Explicit trust posture for raw-shred ingest.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ShredTrustMode {
Expand Down Expand Up @@ -382,6 +417,12 @@ impl RuntimeSetup {
}
}

/// Sets `SOF_RUNTIME_DELIVERY_PROFILE`.
#[must_use]
pub fn with_runtime_delivery_profile(self, profile: RuntimeDeliveryProfile) -> Self {
self.with_env("SOF_RUNTIME_DELIVERY_PROFILE", profile.as_str())
}

/// Sets `SOF_GOSSIP_ENTRYPOINT` from a list of entrypoints.
#[must_use]
pub fn with_gossip_entrypoints<I, S>(self, gossip_entrypoints: I) -> Self
Expand Down Expand Up @@ -4530,6 +4571,19 @@ mod tests {
);
}

#[test]
fn typed_runtime_delivery_profile_uses_expected_strings() {
let setup =
RuntimeSetup::new().with_runtime_delivery_profile(RuntimeDeliveryProfile::Balanced);
assert_eq!(
setup.env_overrides.last(),
Some(&(
String::from("SOF_RUNTIME_DELIVERY_PROFILE"),
String::from("balanced"),
))
);
}

#[test]
fn disabled_observability_config_is_not_serialized() {
let setup =
Expand Down
21 changes: 21 additions & 0 deletions crates/sof-support/src/bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use std::time::Duration;

/// Reads a positive profiling iteration count from `SOF_PROFILE_ITERATIONS`.
#[must_use]
pub fn profile_iterations(default: usize) -> usize {
crate::env_support::read_positive_usize("SOF_PROFILE_ITERATIONS", default)
}

/// Returns the average nanoseconds spent per iteration.
#[must_use]
pub fn avg_ns_per_iteration<I>(elapsed: Duration, iterations: I) -> u128
where
I: TryInto<u128>,
{
let iterations = iterations
.try_into()
.ok()
.filter(|value| *value > 0)
.unwrap_or(1);
elapsed.as_nanos().checked_div(iterations).unwrap_or(0)
}
27 changes: 27 additions & 0 deletions crates/sof-support/src/bytes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use sof_types::{PubkeyBytes, SignatureBytes};

/// Converts one 64-byte signature slice into `SignatureBytes`.
///
/// # Errors
///
/// Returns the error produced by `on_error` when `bytes` is not exactly 64 bytes long.
pub fn signature_bytes_from_slice<E, F>(bytes: &[u8], on_error: F) -> Result<SignatureBytes, E>
where
F: FnOnce() -> E,
{
let raw: [u8; 64] = bytes.try_into().map_err(|_error| on_error())?;
Ok(SignatureBytes::from(raw))
}

/// Converts one 32-byte pubkey slice into `PubkeyBytes`.
///
/// # Errors
///
/// Returns the error produced by `on_error` when `bytes` is not exactly 32 bytes long.
pub fn pubkey_bytes_from_slice<E, F>(bytes: &[u8], on_error: F) -> Result<PubkeyBytes, E>
where
F: FnOnce() -> E,
{
let raw: [u8; 32] = bytes.try_into().map_err(|_error| on_error())?;
Ok(PubkeyBytes::from(raw))
}
32 changes: 32 additions & 0 deletions crates/sof-support/src/collections_support.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::collections::HashMap;

/// Prunes one slot-keyed map down to the retained recent window once the threshold is crossed.
pub fn prune_recent_slots<T>(
slot_states: &mut HashMap<u64, T>,
slot: u64,
retained_lag: u64,
prune_threshold: usize,
) {
if slot_states.len() <= prune_threshold {
return;
}
let slot_floor = slot.saturating_sub(retained_lag);
slot_states.retain(|tracked_slot, _| *tracked_slot >= slot_floor);
}

#[cfg(test)]
mod tests {
use super::prune_recent_slots;

#[test]
fn prune_recent_slots_drops_old_entries_after_threshold() {
let mut slot_states = (0_u64..10_u64).map(|slot| (slot, slot)).collect();

prune_recent_slots(&mut slot_states, 9, 3, 4);

assert_eq!(slot_states.len(), 4);
assert!(!slot_states.contains_key(&5));
assert!(slot_states.contains_key(&6));
assert!(slot_states.contains_key(&9));
}
}
11 changes: 11 additions & 0 deletions crates/sof-support/src/env_support.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use std::env;

/// Reads one positive `usize` from an environment variable, or returns `default`.
#[must_use]
pub fn read_positive_usize(name: &str, default: usize) -> usize {
env::var(name)
.ok()
.and_then(|value| value.parse::<usize>().ok())
.filter(|value| *value > 0)
.unwrap_or(default)
}
Loading
Loading