diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 953d2706..5024f941 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -63,6 +63,7 @@ pub mod resource_manager; pub mod state; pub mod stats; pub mod telemetry; +pub mod timing; pub mod types; // Convenience re-exports for commonly used types @@ -91,6 +92,9 @@ pub use stats::{NodeStats, NodeStatsUpdate}; // Telemetry pub use telemetry::{TelemetryConfig, TelemetryEmitter, TelemetryEvent}; +// Timing helpers +pub use timing::*; + // Pin definitions pub use pins::{InputPin, OutputPin, PinCardinality}; diff --git a/crates/core/src/timing.rs b/crates/core/src/timing.rs new file mode 100644 index 00000000..670046a8 --- /dev/null +++ b/crates/core/src/timing.rs @@ -0,0 +1,208 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +//! Timing helpers and canonical semantics for media packets. +//! +//! # Timing contract +//! +//! - `timestamp_us` is media presentation time in microseconds, relative to the +//! stream's epoch (normally the first frame = 0). It is monotonic and +//! non-decreasing per stream. +//! - `duration_us` is the playback duration for the payload in microseconds and +//! should be set whenever it can be derived (decode, demux, resample). +//! - `sequence` is a monotonic counter scoped to the stream, useful for loss and +//! ordering detection when absolute time is absent. +//! - Nodes should preserve `timestamp_us` and `duration_us` across transforms, or +//! recompute them when they change payload timing (e.g., resampling). If timing +//! is unknown, leave fields as `None` rather than inventing values. +//! - Engines and transports may introduce buffering but must not reorder packets +//! with identical timestamps; late/drop policies should be explicit in nodes +//! (see pacer/mixer). +//! +//! This module provides lightweight helpers for duration math and monotonicity +//! checks to keep node implementations consistent. + +use crate::types::PacketMetadata; + +/// Microseconds per second constant. +pub const MICROS_PER_SECOND: u64 = 1_000_000; +/// Milliseconds per second constant. +pub const MILLIS_PER_SECOND: u64 = 1_000; + +/// Convert microseconds to milliseconds, rounding up (never under-reports duration). +#[must_use] +pub const fn duration_us_to_ms_ceil(duration_us: u64) -> u64 { + duration_us.saturating_add(999) / 1000 +} + +/// Convert frames-per-channel to duration in microseconds. +/// +/// Returns `None` if `sample_rate` is 0 to avoid divide-by-zero. +#[must_use] +pub fn frames_to_duration_us(frames_per_channel: u64, sample_rate: u32) -> Option { + if sample_rate == 0 { + return None; + } + Some(frames_per_channel.saturating_mul(MICROS_PER_SECOND) / u64::from(sample_rate)) +} + +/// Convert an interleaved sample count to duration in microseconds. +/// +/// Returns `None` if `channels` or `sample_rate` is 0. +#[must_use] +pub fn samples_to_duration_us(samples: usize, channels: u16, sample_rate: u32) -> Option { + if channels == 0 || sample_rate == 0 { + return None; + } + let frames_per_channel = samples as u64 / u64::from(channels); + frames_to_duration_us(frames_per_channel, sample_rate) +} + +/// Advance a timestamp by a duration, if both are present. +/// +/// - If `timestamp_us` is `Some` and `duration_us` is `Some`, returns +/// `timestamp_us + duration_us` (saturating). +/// - Otherwise, returns `timestamp_us` unchanged. +#[must_use] +pub fn advance_timestamp(timestamp_us: Option, duration_us: Option) -> Option { + match (timestamp_us, duration_us) { + (Some(ts), Some(dur)) => Some(ts.saturating_add(dur)), + (ts, _) => ts, + } +} + +/// Returns true if `next` is monotonic (non-decreasing) with respect to `prev`. +/// +/// When either side is `None`, the check is treated as passing. +#[must_use] +pub fn is_monotonic(prev: Option, next: Option) -> bool { + match (prev, next) { + (Some(p), Some(n)) => n >= p, + _ => true, + } +} + +/// Extract the starting timestamp from metadata, or 0 if not present. +/// Useful when computing cumulative media time. +#[must_use] +pub fn starting_timestamp_or_zero(metadata: &Option) -> u64 { + metadata.as_ref().and_then(|m| m.timestamp_us).unwrap_or(0) +} + +/// Infer a duration from consecutive timestamps; fall back to `default` when missing or non-positive. +#[must_use] +pub fn infer_duration_us(current_ts: u64, previous_ts: Option, default: u64) -> u64 { + previous_ts.and_then(|prev| current_ts.checked_sub(prev)).filter(|d| *d > 0).unwrap_or(default) +} + +/// Merge metadata from multiple inputs: min timestamp, max duration, max sequence. +#[must_use] +pub fn merge_metadata<'a, I: Iterator>( + iter: I, +) -> Option { + let mut ts = None; + let mut dur = None; + let mut seq = None; + for m in iter { + if let Some(t) = m.timestamp_us { + ts = Some(ts.map_or(t, |prev: u64| prev.min(t))); + } + if let Some(d) = m.duration_us { + dur = Some(dur.map_or(d, |prev: u64| prev.max(d))); + } + if let Some(s) = m.sequence { + seq = Some(seq.map_or(s, |prev: u64| prev.max(s))); + } + } + if ts.is_some() || dur.is_some() || seq.is_some() { + Some(PacketMetadata { timestamp_us: ts, duration_us: dur, sequence: seq }) + } else { + None + } +} + +/// A simple media clock that tracks media time in microseconds with an optional initial delay. +#[derive(Debug, Clone)] +pub struct MediaClock { + initial_delay_us: u64, + media_time_us: u64, + seeded: bool, +} + +impl MediaClock { + /// Create a new clock with an initial delay (microseconds). + pub const fn new(initial_delay_ms: u64) -> Self { + Self { initial_delay_us: initial_delay_ms * 1000, media_time_us: 0, seeded: false } + } + + /// Seed the clock from an absolute media timestamp. Idempotent (first seed wins). + pub fn seed_from_timestamp_us(&mut self, timestamp_us: u64) { + if !self.seeded { + self.media_time_us = timestamp_us; + self.seeded = true; + } + } + + /// Advance by a duration (or default) and return the duration used (ms rounded up). + pub fn advance_by_duration_us(&mut self, duration_us: Option, default: u64) -> u64 { + let dur = duration_us.unwrap_or(default); + self.media_time_us = self.media_time_us.saturating_add(dur); + duration_us_to_ms_ceil(dur) + } + + /// Current media timestamp in microseconds (includes initial delay). + #[must_use] + pub const fn timestamp_us(&self) -> u64 { + self.initial_delay_us.saturating_add(self.media_time_us) + } + + /// Current media timestamp in milliseconds (rounded up). + #[must_use] + pub const fn timestamp_ms(&self) -> u64 { + duration_us_to_ms_ceil(self.timestamp_us()) + } + + /// Returns true if at a group boundary (e.g., for MoQ keyframes). + #[must_use] + pub fn is_group_boundary_ms(&self, group_duration_ms: u64) -> bool { + let t = self.timestamp_ms(); + group_duration_ms > 0 && t.is_multiple_of(group_duration_ms) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_frames_to_duration_us() { + assert_eq!(frames_to_duration_us(960, 48_000), Some(20_000)); + assert_eq!(frames_to_duration_us(0, 48_000), Some(0)); + assert_eq!(frames_to_duration_us(1_920, 48_000), Some(40_000)); + assert_eq!(frames_to_duration_us(1, 0), None); + } + + #[test] + fn test_samples_to_duration_us() { + assert_eq!(samples_to_duration_us(1_920, 2, 48_000), Some(20_000)); + assert_eq!(samples_to_duration_us(1_920, 0, 48_000), None); + assert_eq!(samples_to_duration_us(1_920, 2, 0), None); + } + + #[test] + fn test_advance_timestamp() { + assert_eq!(advance_timestamp(Some(1_000), Some(500)), Some(1_500)); + assert_eq!(advance_timestamp(Some(1_000), None), Some(1_000)); + assert_eq!(advance_timestamp(None, Some(500)), None); + } + + #[test] + fn test_is_monotonic() { + assert!(is_monotonic(Some(1), Some(1))); + assert!(is_monotonic(Some(1), Some(2))); + assert!(!is_monotonic(Some(2), Some(1))); + assert!(is_monotonic(None, Some(1))); + assert!(is_monotonic(Some(1), None)); + } +} diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 86e35c5e..d6aeef6c 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -38,7 +38,8 @@ pub struct AudioFormat { } /// Optional timing and sequencing metadata that can be attached to packets. -/// Used for pacing, synchronization, and A/V alignment. +/// Used for pacing, synchronization, and A/V alignment. See `timing` module for +/// canonical semantics (media-time epoch, monotonicity, and preservation rules). #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, TS)] #[ts(export)] pub struct PacketMetadata { diff --git a/crates/engine/src/dynamic_pin_distributor.rs b/crates/engine/src/dynamic_pin_distributor.rs index 48147ba2..333ae716 100644 --- a/crates/engine/src/dynamic_pin_distributor.rs +++ b/crates/engine/src/dynamic_pin_distributor.rs @@ -16,6 +16,8 @@ use std::time::Instant; use streamkit_core::types::Packet; use tokio::sync::mpsc; +const EWMA_ALPHA: f64 = 0.1; + /// Information about a downstream connection. struct OutputConnection { tx: mpsc::Sender, @@ -44,8 +46,18 @@ pub struct PinDistributorActor { outputs_active_gauge: opentelemetry::metrics::Gauge, /// Telemetry: time spent blocked on downstream backpressure (send().await) send_wait_histogram: opentelemetry::metrics::Histogram, + /// Telemetry: depth of the distributor's incoming queue + queue_depth_gauge: opentelemetry::metrics::Gauge, + /// Telemetry: estimated backlog in bytes + queue_depth_bytes_gauge: opentelemetry::metrics::Gauge, + /// Telemetry: estimated backlog in media seconds (based on observed durations) + queue_depth_seconds_gauge: opentelemetry::metrics::Gauge, /// Pre-built metric labels - allocated once in new(), reused on every packet metric_labels: [opentelemetry::KeyValue; 2], + /// EWMA of packet size in bytes for this pin + avg_packet_size_bytes: f64, + /// EWMA of packet duration in seconds for this pin (when available) + avg_packet_duration_s: f64, } impl PinDistributorActor { @@ -81,6 +93,20 @@ impl PinDistributorActor { .f64_histogram("pin_distributor.send_wait_seconds") .with_description("Time spent waiting for downstream capacity (backpressure)") .build(); + let queue_depth_gauge = meter + .u64_gauge("pin_distributor.queue_depth") + .with_description("Current backlog of packets waiting to be distributed") + .build(); + let queue_depth_bytes_gauge = meter + .u64_gauge("pin_distributor.queue_depth_bytes") + .with_description("Estimated backlog (bytes) at pin distributor input") + .build(); + let queue_depth_seconds_gauge = meter + .f64_gauge("pin_distributor.queue_depth_seconds") + .with_description( + "Estimated backlog (seconds) at pin distributor input (from packet timing)", + ) + .build(); // Pre-build metric labels once - avoids allocation on every packet let metric_labels = [ @@ -100,7 +126,12 @@ impl PinDistributorActor { best_effort_drops_counter, outputs_active_gauge, send_wait_histogram, + queue_depth_gauge, + queue_depth_bytes_gauge, + queue_depth_seconds_gauge, metric_labels, + avg_packet_size_bytes: 0.0, + avg_packet_duration_s: 0.0, } } @@ -178,11 +209,41 @@ impl PinDistributorActor { /// /// For `Reliable` connections: synchronized backpressure - waits for slow consumers. /// For `BestEffort` connections: drops packets when buffer is full (no waiting). - #[allow(clippy::cognitive_complexity)] // Fan-out with mode handling requires multiple paths + #[allow( + clippy::cognitive_complexity, + clippy::cast_precision_loss, + clippy::cast_possible_truncation, + clippy::cast_sign_loss + )] // Fan-out with mode handling requires multiple paths and metric estimation casts async fn distribute_packet(&mut self, packet: Packet) { use futures::stream::{FuturesUnordered, StreamExt}; use tokio::sync::mpsc::error::TrySendError; + let queue_len = self.data_rx.len() as u64; + self.queue_depth_gauge.record(queue_len, &self.metric_labels); + + let (pkt_bytes, pkt_duration_s) = Self::packet_stats(&packet); + self.avg_packet_size_bytes = if self.avg_packet_size_bytes == 0.0 { + pkt_bytes + } else { + self.avg_packet_size_bytes.mul_add(1.0 - EWMA_ALPHA, pkt_bytes * EWMA_ALPHA) + }; + if let Some(dur) = pkt_duration_s { + self.avg_packet_duration_s = if self.avg_packet_duration_s == 0.0 { + dur + } else { + self.avg_packet_duration_s.mul_add(1.0 - EWMA_ALPHA, dur * EWMA_ALPHA) + }; + } + let est_bytes = (self.avg_packet_size_bytes * queue_len as f64) as u64; + if est_bytes > 0 { + self.queue_depth_bytes_gauge.record(est_bytes, &self.metric_labels); + } + if self.avg_packet_duration_s > 0.0 { + let est_seconds = self.avg_packet_duration_s * queue_len as f64; + self.queue_depth_seconds_gauge.record(est_seconds, &self.metric_labels); + } + if self.outputs.is_empty() { // No outputs configured - drop packet and record metric // Use pre-built labels - no allocation on hot path @@ -363,4 +424,31 @@ impl PinDistributorActor { self.best_effort_drops_counter.add(best_effort_drops, &self.metric_labels); } } + + /// Extract approximate size in bytes and optional duration (seconds) for a packet. + #[allow(clippy::cast_precision_loss)] + fn packet_stats(packet: &Packet) -> (f64, Option) { + match packet { + Packet::Audio(frame) => { + let bytes = (frame.samples.len() * std::mem::size_of::()) as f64; + let dur_s = frame.duration_us().map(|us| us as f64 / 1_000_000.0); + (bytes, dur_s) + }, + Packet::Binary { data, metadata, .. } => { + let bytes = data.len() as f64; + let dur_s = + metadata.as_ref().and_then(|m| m.duration_us).map(|us| us as f64 / 1_000_000.0); + (bytes, dur_s) + }, + Packet::Text(t) => (t.len() as f64, None), + Packet::Transcription(t) => ( + t.text.len() as f64, + t.metadata.as_ref().and_then(|m| m.duration_us).map(|us| us as f64 / 1_000_000.0), + ), + Packet::Custom(c) => ( + c.data.to_string().len() as f64, + c.metadata.as_ref().and_then(|m| m.duration_us).map(|us| us as f64 / 1_000_000.0), + ), + } + } } diff --git a/crates/nodes/src/audio/filters/mixer.rs b/crates/nodes/src/audio/filters/mixer.rs index 456dc80a..355fc02e 100644 --- a/crates/nodes/src/audio/filters/mixer.rs +++ b/crates/nodes/src/audio/filters/mixer.rs @@ -941,6 +941,21 @@ impl AudioMixerNode { return Ok(()); } + let timestamp_us = mix_frames + .iter() + .filter_map(|f| f.metadata.as_ref().and_then(|m| m.timestamp_us)) + .min(); + let duration_us = + mix_frames.iter().filter_map(|f| f.metadata.as_ref().and_then(|m| m.duration_us)).max(); + let sequence = + mix_frames.iter().filter_map(|f| f.metadata.as_ref().and_then(|m| m.sequence)).max(); + let combined_metadata = + if timestamp_us.is_some() || duration_us.is_some() || sequence.is_some() { + Some(PacketMetadata { timestamp_us, duration_us, sequence }) + } else { + None + }; + // Determine output configuration. // Output channels never decrease across the lifetime of the node, to avoid downstream // format flips when a higher-channel input ends. @@ -977,6 +992,7 @@ impl AudioMixerNode { output_channels, ); } + base.metadata.clone_from(&combined_metadata); base } else { // Fallback: allocate a fresh output buffer and mix all frames into it. @@ -991,7 +1007,7 @@ impl AudioMixerNode { // Preserve metadata from the first frame (timestamp, duration, etc.) // Use take() instead of clone() to avoid copying - we're about to clear the buffer anyway - let metadata = mix_frames.get_mut(0).and_then(|f| f.metadata.take()); + let metadata = combined_metadata.clone(); AudioFrame::with_metadata(sample_rate, output_channels, mixed_samples, metadata) }; @@ -2108,4 +2124,71 @@ mod tests { assert_state_stopped_eventually(&mut state_rx, std::time::Duration::from_secs(2)).await; node_handle.await.unwrap().unwrap(); } + + #[tokio::test] + async fn test_mix_and_send_combines_metadata() { + let node = AudioMixerNode::new(AudioMixerConfig::default()); + + let frame_a = AudioFrame::with_metadata( + 48_000, + 1, + vec![0.5f32; 4], + Some(PacketMetadata { + timestamp_us: Some(1), + duration_us: Some(20_000), + sequence: Some(1), + }), + ); + let frame_b = AudioFrame::with_metadata( + 48_000, + 1, + vec![0.25f32; 4], + Some(PacketMetadata { + timestamp_us: Some(2), + duration_us: Some(40_000), + sequence: Some(2), + }), + ); + + let (_tx_a, rx_a) = mpsc::channel(1); + let (_tx_b, rx_b) = mpsc::channel(1); + + let mut slots = vec![ + InputSlot { + name: Arc::from("a"), + rx: rx_a, + has_sent: true, + slow: false, + frame: Some(frame_a), + }, + InputSlot { + name: Arc::from("b"), + rx: rx_b, + has_sent: true, + slow: false, + frame: Some(frame_b), + }, + ]; + + let mut mix_frames = Vec::new(); + let (mock_sender, mut packet_rx) = + mpsc::channel::(4); + let mut output_sender = streamkit_core::OutputSender::new( + "mix_meta".to_string(), + streamkit_core::node::OutputRouting::Routed(mock_sender), + ); + + node.mix_and_send(&mut slots, &mut mix_frames, &mut output_sender, 2, false) + .await + .expect("mix should succeed"); + + let (_node, _pin, packet) = packet_rx.recv().await.expect("expected mixed packet"); + let Packet::Audio(frame) = packet else { + panic!("expected audio packet"); + }; + let meta = frame.metadata.expect("metadata expected"); + assert_eq!(meta.timestamp_us, Some(1)); + assert_eq!(meta.duration_us, Some(40_000)); + assert_eq!(meta.sequence, Some(2)); + } } diff --git a/crates/nodes/src/containers/webm.rs b/crates/nodes/src/containers/webm.rs index adf9564a..2f2d1f3b 100644 --- a/crates/nodes/src/containers/webm.rs +++ b/crates/nodes/src/containers/webm.rs @@ -10,10 +10,10 @@ use std::borrow::Cow; use std::io::{Cursor, Seek, SeekFrom, Write}; use std::sync::{Arc, Mutex}; use streamkit_core::stats::NodeStatsTracker; -use streamkit_core::types::{Packet, PacketType}; +use streamkit_core::types::{Packet, PacketMetadata, PacketType}; use streamkit_core::{ - state_helpers, InputPin, NodeContext, NodeRegistry, OutputPin, PinCardinality, ProcessorNode, - StreamKitError, + state_helpers, timing::MediaClock, InputPin, NodeContext, NodeRegistry, OutputPin, + PinCardinality, ProcessorNode, StreamKitError, }; use webm::mux::{AudioCodecId, SegmentBuilder, SegmentMode, Writer}; @@ -21,6 +21,8 @@ use webm::mux::{AudioCodecId, SegmentBuilder, SegmentMode, Writer}; /// Default chunk size for flushing buffers const DEFAULT_CHUNK_SIZE: usize = 65536; +/// Default frame duration when metadata is missing (20ms Opus frame). +const DEFAULT_FRAME_DURATION_US: u64 = 20_000; /// Opus codec lookahead at 48kHz in samples (typical libopus default). /// /// This is written to the OpusHead `pre_skip` field so decoders can trim encoder delay. @@ -383,7 +385,7 @@ impl ProcessorNode for WebMMuxerNode { // so we flush it after adding the first frame below let mut segment = builder.build(); - let mut current_timestamp_ns = 0u64; + let mut clock = MediaClock::new(0); let mut header_sent = false; tracing::info!("WebM segment built, entering receive loop to process incoming packets"); @@ -398,22 +400,32 @@ impl ProcessorNode for WebMMuxerNode { // data.len() // ); - // Calculate timestamp from metadata - // For Opus: timestamps should be in nanoseconds - if let Some(meta) = &metadata { - if let Some(timestamp_us) = meta.timestamp_us { - current_timestamp_ns = timestamp_us * 1000; - } else if let Some(duration_us) = meta.duration_us { - current_timestamp_ns += duration_us * 1000; - } else { - // Fallback: assume 20ms per packet (standard Opus frame) - current_timestamp_ns += 20_000_000; // 20ms in nanoseconds - } - } else { - // No metadata: fallback to assuming 20ms per packet - current_timestamp_ns = packet_count * 20_000_000; + // Calculate timestamp from metadata (microseconds). + let incoming_ts_us = metadata.as_ref().and_then(|m| m.timestamp_us); + let incoming_duration_us = metadata + .as_ref() + .and_then(|m| m.duration_us) + .or(Some(DEFAULT_FRAME_DURATION_US)); + + if let Some(ts) = incoming_ts_us { + clock.seed_from_timestamp_us(ts); + } else if clock.timestamp_us() == 0 { + clock.seed_from_timestamp_us(0); } + let presentation_ts_us = incoming_ts_us.unwrap_or_else(|| clock.timestamp_us()); + + // Advance clock for next frame + clock.advance_by_duration_us(incoming_duration_us, DEFAULT_FRAME_DURATION_US); + + let current_timestamp_ns = presentation_ts_us.saturating_mul(1000); + + let output_metadata = Some(PacketMetadata { + timestamp_us: Some(presentation_ts_us), + duration_us: incoming_duration_us, + sequence: metadata.as_ref().and_then(|m| m.sequence), + }); + // For audio, all frames are effectively "keyframes" (can start playback from any point) let is_keyframe = true; @@ -492,7 +504,7 @@ impl ProcessorNode for WebMMuxerNode { content_type: Some(Cow::Borrowed( "audio/webm; codecs=\"opus\"", )), - metadata: metadata.clone(), + metadata: output_metadata.clone(), }, ) .await diff --git a/crates/nodes/src/core/pacer.rs b/crates/nodes/src/core/pacer.rs index 55f251e8..f8ede39a 100644 --- a/crates/nodes/src/core/pacer.rs +++ b/crates/nodes/src/core/pacer.rs @@ -5,12 +5,14 @@ //! Pacer node - Paces packet output based on timing metadata or calculated durations use async_trait::async_trait; +use opentelemetry::global; +use opentelemetry::KeyValue; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::VecDeque; use std::time::Duration; use streamkit_core::control::NodeControlMessage; -use streamkit_core::types::{AudioFrame, Packet, PacketType}; +use streamkit_core::types::{AudioFrame, Packet, PacketMetadata, PacketType}; use streamkit_core::{ config_helpers, state_helpers, stats::NodeStatsTracker, InputPin, NodeContext, OutputPin, PinCardinality, ProcessorNode, StreamKitError, @@ -133,6 +135,16 @@ impl PacerNode { fn adjust_for_speed(&self, duration: Duration) -> Duration { duration.div_f32(self.speed) } + + fn packet_metadata(packet: &Packet) -> Option<&PacketMetadata> { + match packet { + Packet::Audio(frame) => frame.metadata.as_ref(), + Packet::Binary { metadata, .. } => metadata.as_ref(), + Packet::Custom(custom) => custom.metadata.as_ref(), + Packet::Transcription(transcription) => transcription.metadata.as_ref(), + Packet::Text(_) => None, + } + } } #[async_trait] @@ -171,6 +183,17 @@ impl ProcessorNode for PacerNode { state_helpers::emit_running(&context.state_tx, &node_name); + let meter = global::meter("skit_nodes"); + let lateness_histogram = meter + .f64_histogram("pacer.lateness_seconds") + .with_description("Pacer observed send lateness vs. packet timestamp") + .build(); + let queue_gauge = meter + .u64_gauge("pacer.queue_depth") + .with_description("Pacer buffered packet count") + .build(); + let metric_labels = [KeyValue::new("node", node_name.clone())]; + // Internal bounded queue for backpressure control let mut packet_queue: VecDeque = VecDeque::with_capacity(self.buffer_size); @@ -179,6 +202,7 @@ impl ProcessorNode for PacerNode { let mut packet_count = 0u64; let mut packets_sent = 0usize; let mut last_packet_time = Instant::now(); + let mut media_base: Option = None; // Gap threshold for detecting new segments (e.g., between TTS sentences) // A gap longer than this resets the burst counter let segment_gap_threshold = Duration::from_millis(300); @@ -222,6 +246,7 @@ impl ProcessorNode for PacerNode { // Queue packet for pacing packet_queue.push_back(packet); + queue_gauge.record(packet_queue.len() as u64, &metric_labels); // If this is the first packet, or duration changed, create/recreate the interval if interval.is_none() || current_duration != Some(adjusted_duration) { @@ -265,8 +290,30 @@ impl ProcessorNode for PacerNode { }, if !packet_queue.is_empty() => { // Send the next packet from queue if let Some(packet) = packet_queue.pop_front() { + queue_gauge.record(packet_queue.len() as u64, &metric_labels); let is_burst = packets_sent < self.initial_burst_packets; + if let Some(meta) = Self::packet_metadata(&packet) { + if let Some(ts) = meta.timestamp_us { + let base = media_base.get_or_insert_with(|| { + Instant::now() + .checked_sub(Duration::from_micros(ts)) + .unwrap_or_else(Instant::now) + }); + let due = *base + Duration::from_micros(ts); + let now = Instant::now(); + if now < due { + tokio::time::sleep_until(due).await; + } else { + let lateness = now.duration_since(due); + if lateness > Duration::from_millis(50) { + lateness_histogram.record(lateness.as_secs_f64(), &metric_labels); + tracing::debug!(?lateness, "Pacer sending late relative to timestamp"); + } + } + } + } + if is_burst { tracing::debug!( "Initial burst: sending packet {}/{} at 10x speed", diff --git a/crates/nodes/src/transport/moq/constants.rs b/crates/nodes/src/transport/moq/constants.rs index fbbfa3ad..86ef398f 100644 --- a/crates/nodes/src/transport/moq/constants.rs +++ b/crates/nodes/src/transport/moq/constants.rs @@ -8,39 +8,6 @@ use streamkit_core::types::PacketMetadata; pub const DEFAULT_AUDIO_FRAME_DURATION_US: u64 = 20_000; -const fn duration_us_to_ms_ceil(duration_us: u64) -> u64 { - // hang::Timestamp is millisecond granularity; round up so we never claim - // a frame is shorter than it is (helps avoid drift/under-runs). - duration_us.saturating_add(999) / 1000 -} - pub fn packet_duration_us(metadata: Option<&PacketMetadata>) -> Option { metadata.and_then(|m| m.duration_us).filter(|d| *d > 0) } - -#[derive(Debug, Clone)] -pub struct MediaClock { - initial_delay_ms: u64, - media_time_ms: u64, -} - -impl MediaClock { - pub const fn new(initial_delay_ms: u64) -> Self { - Self { initial_delay_ms, media_time_ms: 0 } - } - - pub const fn timestamp_ms(&self) -> u64 { - self.initial_delay_ms.saturating_add(self.media_time_ms) - } - - pub const fn is_group_boundary(&self, group_duration_ms: u64) -> bool { - group_duration_ms > 0 && self.media_time_ms.is_multiple_of(group_duration_ms) - } - - pub fn advance_by_duration_us(&mut self, duration_us: Option) -> u64 { - let duration_us = duration_us.unwrap_or(DEFAULT_AUDIO_FRAME_DURATION_US); - let frame_duration_ms = duration_us_to_ms_ceil(duration_us).max(1); - self.media_time_ms = self.media_time_ms.saturating_add(frame_duration_ms); - frame_duration_ms - } -} diff --git a/crates/nodes/src/transport/moq/peer.rs b/crates/nodes/src/transport/moq/peer.rs index 180c59fe..49e344e9 100644 --- a/crates/nodes/src/transport/moq/peer.rs +++ b/crates/nodes/src/transport/moq/peer.rs @@ -16,6 +16,7 @@ use serde::Deserialize; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; +use streamkit_core::timing::MediaClock; use streamkit_core::types::{Packet, PacketType}; use streamkit_core::{ state_helpers, stats::NodeStatsTracker, InputPin, NodeContext, OutputPin, PinCardinality, @@ -59,6 +60,7 @@ enum PublisherEvent { struct BidirectionalTaskConfig { input_broadcast: String, output_broadcast: String, + node_id: String, output_sender: streamkit_core::OutputSender, broadcast_rx: broadcast::Receiver, shutdown_rx: broadcast::Receiver<()>, @@ -293,14 +295,15 @@ impl ProcessorNode for MoqPeerNode { let sub_count = subscriber_count.clone(); let broadcast_rx = subscriber_broadcast_tx.subscribe(); - match Self::start_bidirectional_task( - conn, - BidirectionalTaskConfig { - input_broadcast: self.config.input_broadcast.clone(), - output_broadcast: self.config.output_broadcast.clone(), - output_sender: context.output_sender.clone(), - broadcast_rx, - shutdown_rx: shutdown_tx.subscribe(), + match Self::start_bidirectional_task( + conn, + BidirectionalTaskConfig { + input_broadcast: self.config.input_broadcast.clone(), + output_broadcast: self.config.output_broadcast.clone(), + node_id: node_name.clone(), + output_sender: context.output_sender.clone(), + broadcast_rx, + shutdown_rx: shutdown_tx.subscribe(), publisher_slot: publisher_slot.clone(), publisher_events: publisher_events_tx.clone(), subscriber_count: sub_count, @@ -398,6 +401,7 @@ impl ProcessorNode for MoqPeerNode { match Self::start_subscriber_task( conn, + node_name.clone(), self.config.output_broadcast.clone(), broadcast_rx, shutdown_tx.subscribe(), @@ -642,6 +646,7 @@ impl MoqPeerNode { Self::subscriber_send_loop( send_origin, config.output_broadcast, + config.node_id.clone(), config.broadcast_rx, &mut subscriber_shutdown_rx, config.output_group_duration_ms, @@ -968,6 +973,7 @@ impl MoqPeerNode { #[allow(clippy::too_many_arguments)] async fn start_subscriber_task( moq_connection: streamkit_core::moq_gateway::MoqConnection, + node_id: String, output_broadcast: String, broadcast_rx: broadcast::Receiver, mut shutdown_rx: broadcast::Receiver<()>, @@ -1006,6 +1012,7 @@ impl MoqPeerNode { let result = Self::subscriber_send_loop( send_origin, output_broadcast, + node_id, broadcast_rx, &mut shutdown_rx, output_group_duration_ms, @@ -1030,9 +1037,11 @@ impl MoqPeerNode { } /// Subscriber send loop - receives from broadcast channel and sends to client + #[allow(clippy::too_many_arguments)] async fn subscriber_send_loop( publish: moq_lite::OriginProducer, broadcast_name: String, + node_id: String, broadcast_rx: broadcast::Receiver, shutdown_rx: &mut broadcast::Receiver<()>, output_group_duration_ms: u64, @@ -1052,6 +1061,8 @@ impl MoqPeerNode { shutdown_rx, output_group_duration_ms, output_initial_delay_ms, + node_id, + broadcast_name, &stats_delta_tx, ) .await?; @@ -1119,19 +1130,32 @@ impl MoqPeerNode { } /// Run the main send loop, forwarding packets to the subscriber + #[allow(clippy::too_many_arguments)] async fn run_subscriber_send_loop( track_producer: &mut hang::TrackProducer, mut broadcast_rx: broadcast::Receiver, shutdown_rx: &mut broadcast::Receiver<()>, output_group_duration_ms: u64, output_initial_delay_ms: u64, + node_id: String, + broadcast_name: String, stats_delta_tx: &mpsc::Sender, ) -> Result { let mut packet_count: u64 = 0; let mut last_log = std::time::Instant::now(); let mut frame_count = 0u64; let group_duration_ms = output_group_duration_ms.max(1); - let mut clock = super::constants::MediaClock::new(output_initial_delay_ms); + let mut clock = MediaClock::new(output_initial_delay_ms); + let meter = opentelemetry::global::meter("skit_nodes"); + let gap_histogram = meter + .f64_histogram("moq.peer.inter_frame_ms") + .with_description("Gap between consecutive frames sent to subscribers") + .build(); + let metric_labels = [ + opentelemetry::KeyValue::new("node", node_id), + opentelemetry::KeyValue::new("broadcast", broadcast_name), + ]; + let mut last_ts_ms: Option = None; loop { tokio::select! { @@ -1144,6 +1168,9 @@ impl MoqPeerNode { &mut last_log, group_duration_ms, &mut clock, + &gap_histogram, + &metric_labels, + &mut last_ts_ms, stats_delta_tx, )? { SendResult::Continue => {} @@ -1161,7 +1188,7 @@ impl MoqPeerNode { } /// Handle a single broadcast receive result - #[allow(clippy::too_many_arguments)] + #[allow(clippy::too_many_arguments, clippy::cast_precision_loss)] fn handle_broadcast_recv( recv_result: Result, track_producer: &mut hang::TrackProducer, @@ -1169,7 +1196,10 @@ impl MoqPeerNode { frame_count: &mut u64, last_log: &mut std::time::Instant, group_duration_ms: u64, - clock: &mut super::constants::MediaClock, + clock: &mut MediaClock, + gap_histogram: &opentelemetry::metrics::Histogram, + metric_labels: &[opentelemetry::KeyValue], + last_ts_ms: &mut Option, stats_delta_tx: &mpsc::Sender, ) -> Result { match recv_result { @@ -1185,7 +1215,13 @@ impl MoqPeerNode { let is_first = *packet_count == 1; let timestamp_ms = clock.timestamp_ms(); - let keyframe = is_first || clock.is_group_boundary(group_duration_ms); + let keyframe = is_first || clock.is_group_boundary_ms(group_duration_ms); + + if let Some(prev) = *last_ts_ms { + let gap = timestamp_ms.saturating_sub(prev); + gap_histogram.record(gap as f64, metric_labels); + } + *last_ts_ms = Some(timestamp_ms); let timestamp = hang::Timestamp::from_millis(timestamp_ms).map_err(|_| { StreamKitError::Runtime("MoQ frame timestamp overflow".to_string()) @@ -1202,7 +1238,10 @@ impl MoqPeerNode { .try_send(NodeStatsDelta { errored: 1, ..Default::default() }); return Ok(SendResult::Stop); } - clock.advance_by_duration_us(broadcast_frame.duration_us); + clock.advance_by_duration_us( + broadcast_frame.duration_us, + super::constants::DEFAULT_AUDIO_FRAME_DURATION_US, + ); Ok(SendResult::Continue) }, Err(broadcast::error::RecvError::Lagged(n)) => { diff --git a/crates/nodes/src/transport/moq/pull.rs b/crates/nodes/src/transport/moq/pull.rs index 8b1c7a70..58b1f22a 100644 --- a/crates/nodes/src/transport/moq/pull.rs +++ b/crates/nodes/src/transport/moq/pull.rs @@ -4,6 +4,7 @@ //! MoQ Pull Node - subscribes to broadcasts from a MoQ server +use super::constants::DEFAULT_AUDIO_FRAME_DURATION_US; use async_trait::async_trait; use bytes::Buf; use moq_lite::coding::Decode; @@ -11,7 +12,8 @@ use moq_lite::AsPath; use schemars::JsonSchema; use serde::Deserialize; use std::time::Duration; -use streamkit_core::types::{Packet, PacketType}; +use streamkit_core::timing::MediaClock; +use streamkit_core::types::{Packet, PacketMetadata, PacketType}; use streamkit_core::{ state_helpers, stats::NodeStatsTracker, InputPin, NodeContext, OutputPin, PinCardinality, ProcessorNode, StreamKitError, @@ -241,11 +243,11 @@ enum StreamEndReason { impl MoqPullNode { fn strip_hang_timestamp_header( mut payload: bytes::Bytes, - ) -> Result { + ) -> Result<(u64, bytes::Bytes), moq_lite::Error> { // hang protocol: frame payload is prefixed with a varint u64 timestamp in microseconds. - // We discard it here and forward the remaining bytes (Opus frame data). - let _timestamp_micros = u64::decode(&mut payload, moq_lite::lite::Version::Draft02)?; - Ok(payload.copy_to_bytes(payload.remaining())) + // We parse it and forward the remaining bytes (Opus frame data). + let timestamp_micros = u64::decode(&mut payload, moq_lite::lite::Version::Draft02)?; + Ok((timestamp_micros, payload.copy_to_bytes(payload.remaining()))) } async fn read_next_raw_moq( @@ -415,7 +417,7 @@ impl MoqPullNode { // MoQ connection state machine with multiplexed track handling and error recovery // High complexity is inherent to protocol handling (track management, object streaming, packet routing) - #[allow(clippy::cognitive_complexity)] + #[allow(clippy::cognitive_complexity, clippy::too_many_lines)] async fn run_connection( &self, context: &mut NodeContext, @@ -527,6 +529,8 @@ impl MoqPullNode { let mut current_group: Option = None; let mut session_packet_count: u32 = 0; + let mut last_timestamp_us: Option = None; + let mut clock = MediaClock::new(0); let mut consecutive_cancels: u32 = 0; let mut last_payload_at = tokio::time::Instant::now(); @@ -630,16 +634,34 @@ impl MoqPullNode { ); } - let data = match Self::strip_hang_timestamp_header(payload) { - Ok(data) => data, - Err(e) => { - tracing::warn!("Failed to decode frame timestamp: {e}"); - stats_tracker.discarded(); - continue; - }, + let (timestamp_us, data) = + match Self::strip_hang_timestamp_header(payload) { + Ok(result) => result, + Err(e) => { + tracing::warn!("Failed to decode frame timestamp: {e}"); + stats_tracker.discarded(); + continue; + }, + }; + if last_timestamp_us.is_none() { + clock.seed_from_timestamp_us(timestamp_us); + } + let duration_us = last_timestamp_us + .and_then(|prev| timestamp_us.checked_sub(prev)) + .filter(|d| *d > 0) + .or(Some(DEFAULT_AUDIO_FRAME_DURATION_US)); + let metadata = Some(PacketMetadata { + timestamp_us: Some(timestamp_us), + duration_us, + sequence: None, + }); + last_timestamp_us = Some(timestamp_us); + + let packet = Packet::Binary { + data, + content_type: None, + metadata: metadata.clone(), }; - let packet = - Packet::Binary { data, content_type: None, metadata: None }; if track_pin_registered && track_pin_name != "out" @@ -671,16 +693,31 @@ impl MoqPullNode { ); } - let data = match Self::strip_hang_timestamp_header(first_payload) { - Ok(data) => data, - Err(e) => { - tracing::warn!("Failed to decode frame timestamp: {e}"); - stats_tracker.discarded(); - continue; - }, - }; - - let packet = Packet::Binary { data, content_type: None, metadata: None }; + let (timestamp_us, data) = + match Self::strip_hang_timestamp_header(first_payload) { + Ok(result) => result, + Err(e) => { + tracing::warn!("Failed to decode frame timestamp: {e}"); + stats_tracker.discarded(); + continue; + }, + }; + if last_timestamp_us.is_none() { + clock.seed_from_timestamp_us(timestamp_us); + } + let duration_us = last_timestamp_us + .and_then(|prev| timestamp_us.checked_sub(prev)) + .filter(|d| *d > 0) + .or(Some(DEFAULT_AUDIO_FRAME_DURATION_US)); + let metadata = Some(PacketMetadata { + timestamp_us: Some(timestamp_us), + duration_us, + sequence: None, + }); + last_timestamp_us = Some(timestamp_us); + + let packet = + Packet::Binary { data, content_type: None, metadata: metadata.clone() }; if track_pin_registered && track_pin_name != "out" && context @@ -778,10 +815,11 @@ mod tests { buf.extend_from_slice(b"opus-frame-bytes"); let payload = buf.freeze(); - let stripped = match MoqPullNode::strip_hang_timestamp_header(payload) { + let (ts, stripped) = match MoqPullNode::strip_hang_timestamp_header(payload) { Ok(stripped) => stripped, Err(e) => panic!("decode failed: {e}"), }; + assert_eq!(ts, 123); assert_eq!(&stripped[..], b"opus-frame-bytes"); } } diff --git a/crates/nodes/src/transport/moq/push.rs b/crates/nodes/src/transport/moq/push.rs index b84a1201..bbaa0441 100644 --- a/crates/nodes/src/transport/moq/push.rs +++ b/crates/nodes/src/transport/moq/push.rs @@ -4,9 +4,12 @@ //! MoQ Push Node - publishes packets to a MoQ broadcast +use super::constants::DEFAULT_AUDIO_FRAME_DURATION_US; use async_trait::async_trait; +use opentelemetry::{global, KeyValue}; use schemars::JsonSchema; use serde::Deserialize; +use streamkit_core::timing::MediaClock; use streamkit_core::types::{Packet, PacketType}; use streamkit_core::{ packet_helpers, state_helpers, stats::NodeStatsTracker, InputPin, NodeContext, OutputPin, @@ -193,10 +196,20 @@ impl ProcessorNode for MoqPushNode { let mut input_rx = context.take_input("in")?; let mut packet_count: u64 = 0; - let mut clock = super::constants::MediaClock::new(self.config.initial_delay_ms); + let mut clock = MediaClock::new(self.config.initial_delay_ms); + let mut seeded_from_timestamp = false; // Stats tracking let mut stats_tracker = NodeStatsTracker::new(node_name.clone(), context.stats_tx.clone()); + let meter = global::meter("skit_nodes"); + let clock_offset_histogram = meter + .f64_histogram("moq.push.clock_offset_ms") + .with_description("Offset between outgoing MoQ timestamp and upstream packet timestamp") + .build(); + let metric_labels = [ + KeyValue::new("node", node_name.clone()), + KeyValue::new("broadcast", self.config.broadcast.clone()), + ]; // Read opus packets and write them to the MoQ track tracing::info!("MoqPushNode waiting for input packets..."); @@ -220,10 +233,24 @@ impl ProcessorNode for MoqPushNode { tracing::debug!(packet = packet_count, "MoQ publisher sending packet"); } - let duration_us = super::constants::packet_duration_us(metadata.as_ref()); - let timestamp_ms = clock.timestamp_ms(); + let duration_us = super::constants::packet_duration_us(metadata.as_ref()) + .or(Some(DEFAULT_AUDIO_FRAME_DURATION_US)); + let timestamp_ms = if let Some(meta_ts) = + metadata.as_ref().and_then(|m| m.timestamp_us) + { + if !seeded_from_timestamp { + clock.seed_from_timestamp_us(meta_ts); + seeded_from_timestamp = true; + } + meta_ts + .saturating_add(999) + / 1_000 + + self.config.initial_delay_ms + } else { + clock.timestamp_ms() + }; let keyframe = - is_first || clock.is_group_boundary(self.config.group_duration_ms); + is_first || clock.is_group_boundary_ms(self.config.group_duration_ms); let timestamp = hang::Timestamp::from_millis(timestamp_ms).map_err(|_| { StreamKitError::Runtime("MoQ frame timestamp overflow".to_string()) @@ -243,7 +270,16 @@ impl ProcessorNode for MoqPushNode { return Err(StreamKitError::Runtime(err_msg)); } - clock.advance_by_duration_us(duration_us); + if let Some(meta_ts) = metadata.as_ref().and_then(|m| m.timestamp_us) { + let meta_ms = meta_ts / 1_000; + let offset = timestamp_ms.saturating_sub(meta_ms); + #[allow(clippy::cast_precision_loss)] + { + clock_offset_histogram.record(offset as f64, &metric_labels); + } + } + + clock.advance_by_duration_us(duration_us, DEFAULT_AUDIO_FRAME_DURATION_US); stats_tracker.sent(); } else { tracing::warn!("MoqPushNode received non-binary packet, ignoring"); diff --git a/docs/src/content/docs/guides/performance.md b/docs/src/content/docs/guides/performance.md index 3115695a..d997c1fa 100644 --- a/docs/src/content/docs/guides/performance.md +++ b/docs/src/content/docs/guides/performance.md @@ -88,6 +88,14 @@ Oneshot pipelines use larger defaults than dynamic sessions because they don't n Many nodes expose their own buffering/throughput controls in pipeline YAML, for example: - `core::pacer` / `audio::pacer`: `buffer_size` (queue depth) +- Timing metrics (exposed via OpenTelemetry): + - `pacer.lateness_seconds` (labels: `node`) — observed send lateness vs. packet timestamps. + - `pacer.queue_depth` (labels: `node`) — current pacer buffer depth. + - `moq.push.clock_offset_ms` (labels: `node`, `broadcast`) — delta between outbound MoQ timestamp and upstream packet timestamp. + - `moq.peer.inter_frame_ms` (labels: `node`, `broadcast`) — inter-frame gap seen by MoQ subscribers. + - `pin_distributor.queue_depth` (labels: `node_id`, `pin_name`) — packet backlog at each dynamic pin distributor (data plane fan-out). + - `pin_distributor.queue_depth_bytes` (labels: `node_id`, `pin_name`) — estimated backlog in bytes (EWMA-based). + - `pin_distributor.queue_depth_seconds` (labels: `node_id`, `pin_name`) — estimated backlog in seconds from packet timing (EWMA-based). - `containers::ogg::muxer` / `containers::webm::muxer`: `chunk_size` (flush threshold) - `core::file_reader` / `core::file_writer` / `transport::http::fetcher`: `chunk_size` - `audio::mixer`: `sync_timeout_ms` and (in clocked mode) `jitter_buffer_frames` diff --git a/samples/grafana-dashboard.json b/samples/grafana-dashboard.json index cceb5896..cc34b54c 100644 --- a/samples/grafana-dashboard.json +++ b/samples/grafana-dashboard.json @@ -1018,112 +1018,6 @@ "type": "bargauge", "description": "Cumulative packets sent, showing only the top talkers." }, - { - "datasource": { - "type": "prometheus", - "uid": "${DS_PROMETHEUS}" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "Seconds", - "axisPlacement": "auto", - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 20, - "gradientMode": "none", - "hideFrom": { - "tooltip": false, - "viz": false, - "legend": false - }, - "insertNulls": false, - "lineInterpolation": "smooth", - "lineWidth": 2, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "never", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - } - ] - }, - "unit": "s" - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 0, - "y": 35 - }, - "id": 13, - "options": { - "legend": { - "calcs": [ - "lastNotNull", - "max" - ], - "displayMode": "table", - "placement": "right", - "showLegend": true - }, - "tooltip": { - "mode": "multi", - "sort": "desc" - } - }, - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "${DS_PROMETHEUS}" - }, - "editorMode": "code", - "expr": "histogram_quantile(0.95, sum(rate(node_execution_duration_bucket[5m])) by (le, node_kind))", - "legendFormat": "{{node_kind}} p95", - "range": true, - "refId": "A" - }, - { - "datasource": { - "type": "prometheus", - "uid": "${DS_PROMETHEUS}" - }, - "editorMode": "code", - "expr": "histogram_quantile(0.99, sum(rate(node_execution_duration_bucket[5m])) by (le, node_kind))", - "legendFormat": "{{node_kind}} p99", - "range": true, - "refId": "B" - } - ], - "title": "Node Runtime Duration (p95, p99)", - "type": "timeseries", - "description": "Node task runtime percentiles per node kind (measured from spawn until node exits). Long-lived dynamic sessions only emit these when sessions are destroyed/cleaned up." - }, { "datasource": { "type": "prometheus", @@ -2058,6 +1952,599 @@ "title": "Pin Distributor Outputs Active", "type": "timeseries", "description": "Active downstream outputs per distributor. Sum = total fan-out edges; max = most fanned-out single output pin." + }, + { + "id": 27, + "title": "Pacer Lateness p95 (s)", + "type": "timeseries", + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "seconds", + "axisPlacement": "auto", + "drawStyle": "line", + "lineWidth": 2, + "fillOpacity": 15, + "showPoints": "never", + "spanNulls": false, + "lineInterpolation": "smooth", + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 80 + }, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "max" + ], + "displayMode": "list", + "placement": "right", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "refId": "A", + "range": true, + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum by(le, node) (rate(pacer_lateness_seconds_bucket[5m])))", + "legendFormat": "{{node}}" + } + ] + }, + { + "id": 28, + "title": "Pacer Queue Depth", + "type": "timeseries", + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "packets", + "axisPlacement": "auto", + "drawStyle": "line", + "lineWidth": 2, + "fillOpacity": 10, + "showPoints": "never", + "spanNulls": true, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 80 + }, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "max" + ], + "displayMode": "list", + "placement": "right", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "refId": "A", + "range": true, + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "avg by(node) (pacer_queue_depth)", + "legendFormat": "{{node}}" + } + ] + }, + { + "id": 29, + "title": "MoQ Push Clock Offset p95 (ms)", + "type": "timeseries", + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "milliseconds", + "axisPlacement": "auto", + "drawStyle": "line", + "lineWidth": 2, + "fillOpacity": 15, + "showPoints": "never", + "spanNulls": false, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 88 + }, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "max" + ], + "displayMode": "list", + "placement": "right", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "refId": "A", + "range": true, + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum by(le, node, broadcast) (rate(moq_push_clock_offset_ms_bucket[5m])))", + "legendFormat": "{{node}}/{{broadcast}}" + } + ] + }, + { + "id": 30, + "title": "MoQ Peer Inter-frame Gap p95 (ms)", + "type": "timeseries", + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "milliseconds", + "axisPlacement": "auto", + "drawStyle": "line", + "lineWidth": 2, + "fillOpacity": 15, + "showPoints": "never", + "spanNulls": false, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 88 + }, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "max" + ], + "displayMode": "list", + "placement": "right", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "refId": "A", + "range": true, + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum by(le, node, broadcast) (rate(moq_peer_inter_frame_ms_bucket[5m])))", + "legendFormat": "{{node}}/{{broadcast}}" + } + ] + }, + { + "id": 31, + "title": "Pin Distributor Queue Depth", + "type": "timeseries", + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "packets", + "axisPlacement": "auto", + "drawStyle": "line", + "lineWidth": 2, + "fillOpacity": 10, + "showPoints": "never", + "spanNulls": true, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 96 + }, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "max" + ], + "displayMode": "list", + "placement": "right", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "refId": "A", + "range": true, + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "avg by(node_id, pin_name) (pin_distributor_queue_depth)", + "legendFormat": "{{node_id}}/{{pin_name}}" + } + ] + }, + { + "id": 32, + "title": "Pin Distributor Queue Depth (bytes)", + "type": "timeseries", + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "bytes", + "axisPlacement": "auto", + "drawStyle": "line", + "lineWidth": 2, + "fillOpacity": 10, + "showPoints": "never", + "spanNulls": true, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 104 + }, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "max" + ], + "displayMode": "list", + "placement": "right", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "refId": "A", + "range": true, + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "avg by(node_id, pin_name) (pin_distributor_queue_depth_bytes)", + "legendFormat": "{{node_id}}/{{pin_name}}" + } + ] + }, + { + "id": 33, + "title": "Pin Distributor Queue Depth (seconds)", + "type": "timeseries", + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "seconds", + "axisPlacement": "auto", + "drawStyle": "line", + "lineWidth": 2, + "fillOpacity": 10, + "showPoints": "never", + "spanNulls": true, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 104 + }, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "max" + ], + "displayMode": "list", + "placement": "right", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "refId": "A", + "range": true, + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "avg by(node_id, pin_name) (pin_distributor_queue_depth_seconds)", + "legendFormat": "{{node_id}}/{{pin_name}}" + } + ] + }, + { + "id": 34, + "title": "Pin Distributor Send Wait p95 (s)", + "type": "timeseries", + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "seconds", + "axisPlacement": "auto", + "drawStyle": "line", + "lineWidth": 2, + "fillOpacity": 15, + "showPoints": "never", + "spanNulls": false, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 112 + }, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "max" + ], + "displayMode": "list", + "placement": "right", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "refId": "A", + "range": true, + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum by(le, node_id, pin_name) (rate(pin_distributor_send_wait_seconds_bucket[5m])))", + "legendFormat": "{{node_id}}/{{pin_name}}" + } + ] } ], "refresh": "10s",