Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub mod resource_manager;
pub mod state;
pub mod stats;
pub mod telemetry;
pub mod timing;
pub mod types;

// Convenience re-exports for commonly used types
Expand Down Expand Up @@ -91,6 +92,9 @@ pub use stats::{NodeStats, NodeStatsUpdate};
// Telemetry
pub use telemetry::{TelemetryConfig, TelemetryEmitter, TelemetryEvent};

// Timing helpers
pub use timing::*;

// Pin definitions
pub use pins::{InputPin, OutputPin, PinCardinality};

Expand Down
208 changes: 208 additions & 0 deletions crates/core/src/timing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// SPDX-FileCopyrightText: © 2025 StreamKit Contributors
//
// SPDX-License-Identifier: MPL-2.0

//! Timing helpers and canonical semantics for media packets.
//!
//! # Timing contract
//!
//! - `timestamp_us` is media presentation time in microseconds, relative to the
//! stream's epoch (normally the first frame = 0). It is monotonic and
//! non-decreasing per stream.
//! - `duration_us` is the playback duration for the payload in microseconds and
//! should be set whenever it can be derived (decode, demux, resample).
//! - `sequence` is a monotonic counter scoped to the stream, useful for loss and
//! ordering detection when absolute time is absent.
//! - Nodes should preserve `timestamp_us` and `duration_us` across transforms, or
//! recompute them when they change payload timing (e.g., resampling). If timing
//! is unknown, leave fields as `None` rather than inventing values.
//! - Engines and transports may introduce buffering but must not reorder packets
//! with identical timestamps; late/drop policies should be explicit in nodes
//! (see pacer/mixer).
//!
//! This module provides lightweight helpers for duration math and monotonicity
//! checks to keep node implementations consistent.

use crate::types::PacketMetadata;

/// Microseconds per second constant.
pub const MICROS_PER_SECOND: u64 = 1_000_000;
/// Milliseconds per second constant.
pub const MILLIS_PER_SECOND: u64 = 1_000;

/// Convert microseconds to milliseconds, rounding up (never under-reports duration).
#[must_use]
pub const fn duration_us_to_ms_ceil(duration_us: u64) -> u64 {
duration_us.saturating_add(999) / 1000
}

/// Convert frames-per-channel to duration in microseconds.
///
/// Returns `None` if `sample_rate` is 0 to avoid divide-by-zero.
#[must_use]
pub fn frames_to_duration_us(frames_per_channel: u64, sample_rate: u32) -> Option<u64> {
if sample_rate == 0 {
return None;
}
Some(frames_per_channel.saturating_mul(MICROS_PER_SECOND) / u64::from(sample_rate))
}

/// Convert an interleaved sample count to duration in microseconds.
///
/// Returns `None` if `channels` or `sample_rate` is 0.
#[must_use]
pub fn samples_to_duration_us(samples: usize, channels: u16, sample_rate: u32) -> Option<u64> {
if channels == 0 || sample_rate == 0 {
return None;
}
let frames_per_channel = samples as u64 / u64::from(channels);
frames_to_duration_us(frames_per_channel, sample_rate)
}

/// Advance a timestamp by a duration, if both are present.
///
/// - If `timestamp_us` is `Some` and `duration_us` is `Some`, returns
/// `timestamp_us + duration_us` (saturating).
/// - Otherwise, returns `timestamp_us` unchanged.
#[must_use]
pub fn advance_timestamp(timestamp_us: Option<u64>, duration_us: Option<u64>) -> Option<u64> {
match (timestamp_us, duration_us) {
(Some(ts), Some(dur)) => Some(ts.saturating_add(dur)),
(ts, _) => ts,
}
}

/// Returns true if `next` is monotonic (non-decreasing) with respect to `prev`.
///
/// When either side is `None`, the check is treated as passing.
#[must_use]
pub fn is_monotonic(prev: Option<u64>, next: Option<u64>) -> bool {
match (prev, next) {
(Some(p), Some(n)) => n >= p,
_ => true,
}
}

/// Extract the starting timestamp from metadata, or 0 if not present.
/// Useful when computing cumulative media time.
#[must_use]
pub fn starting_timestamp_or_zero(metadata: &Option<PacketMetadata>) -> u64 {
metadata.as_ref().and_then(|m| m.timestamp_us).unwrap_or(0)
}

/// Infer a duration from consecutive timestamps; fall back to `default` when missing or non-positive.
#[must_use]
pub fn infer_duration_us(current_ts: u64, previous_ts: Option<u64>, default: u64) -> u64 {
previous_ts.and_then(|prev| current_ts.checked_sub(prev)).filter(|d| *d > 0).unwrap_or(default)
}

/// Merge metadata from multiple inputs: min timestamp, max duration, max sequence.
#[must_use]
pub fn merge_metadata<'a, I: Iterator<Item = &'a PacketMetadata>>(
iter: I,
) -> Option<PacketMetadata> {
let mut ts = None;
let mut dur = None;
let mut seq = None;
for m in iter {
if let Some(t) = m.timestamp_us {
ts = Some(ts.map_or(t, |prev: u64| prev.min(t)));
}
if let Some(d) = m.duration_us {
dur = Some(dur.map_or(d, |prev: u64| prev.max(d)));
}
if let Some(s) = m.sequence {
seq = Some(seq.map_or(s, |prev: u64| prev.max(s)));
}
}
if ts.is_some() || dur.is_some() || seq.is_some() {
Some(PacketMetadata { timestamp_us: ts, duration_us: dur, sequence: seq })
} else {
None
}
}

/// A simple media clock that tracks media time in microseconds with an optional initial delay.
#[derive(Debug, Clone)]
pub struct MediaClock {
initial_delay_us: u64,
media_time_us: u64,
seeded: bool,
}

impl MediaClock {
/// Create a new clock with an initial delay (microseconds).
pub const fn new(initial_delay_ms: u64) -> Self {
Self { initial_delay_us: initial_delay_ms * 1000, media_time_us: 0, seeded: false }
}

/// Seed the clock from an absolute media timestamp. Idempotent (first seed wins).
pub fn seed_from_timestamp_us(&mut self, timestamp_us: u64) {
if !self.seeded {
self.media_time_us = timestamp_us;
self.seeded = true;
}
}

/// Advance by a duration (or default) and return the duration used (ms rounded up).
pub fn advance_by_duration_us(&mut self, duration_us: Option<u64>, default: u64) -> u64 {
let dur = duration_us.unwrap_or(default);
self.media_time_us = self.media_time_us.saturating_add(dur);
duration_us_to_ms_ceil(dur)
}

/// Current media timestamp in microseconds (includes initial delay).
#[must_use]
pub const fn timestamp_us(&self) -> u64 {
self.initial_delay_us.saturating_add(self.media_time_us)
}

/// Current media timestamp in milliseconds (rounded up).
#[must_use]
pub const fn timestamp_ms(&self) -> u64 {
duration_us_to_ms_ceil(self.timestamp_us())
}

/// Returns true if at a group boundary (e.g., for MoQ keyframes).
#[must_use]
pub fn is_group_boundary_ms(&self, group_duration_ms: u64) -> bool {
let t = self.timestamp_ms();
group_duration_ms > 0 && t.is_multiple_of(group_duration_ms)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_frames_to_duration_us() {
assert_eq!(frames_to_duration_us(960, 48_000), Some(20_000));
assert_eq!(frames_to_duration_us(0, 48_000), Some(0));
assert_eq!(frames_to_duration_us(1_920, 48_000), Some(40_000));
assert_eq!(frames_to_duration_us(1, 0), None);
}

#[test]
fn test_samples_to_duration_us() {
assert_eq!(samples_to_duration_us(1_920, 2, 48_000), Some(20_000));
assert_eq!(samples_to_duration_us(1_920, 0, 48_000), None);
assert_eq!(samples_to_duration_us(1_920, 2, 0), None);
}

#[test]
fn test_advance_timestamp() {
assert_eq!(advance_timestamp(Some(1_000), Some(500)), Some(1_500));
assert_eq!(advance_timestamp(Some(1_000), None), Some(1_000));
assert_eq!(advance_timestamp(None, Some(500)), None);
}

#[test]
fn test_is_monotonic() {
assert!(is_monotonic(Some(1), Some(1)));
assert!(is_monotonic(Some(1), Some(2)));
assert!(!is_monotonic(Some(2), Some(1)));
assert!(is_monotonic(None, Some(1)));
assert!(is_monotonic(Some(1), None));
}
}
3 changes: 2 additions & 1 deletion crates/core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ pub struct AudioFormat {
}

/// Optional timing and sequencing metadata that can be attached to packets.
/// Used for pacing, synchronization, and A/V alignment.
/// Used for pacing, synchronization, and A/V alignment. See `timing` module for
/// canonical semantics (media-time epoch, monotonicity, and preservation rules).
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, TS)]
#[ts(export)]
pub struct PacketMetadata {
Expand Down
90 changes: 89 additions & 1 deletion crates/engine/src/dynamic_pin_distributor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::time::Instant;
use streamkit_core::types::Packet;
use tokio::sync::mpsc;

const EWMA_ALPHA: f64 = 0.1;

/// Information about a downstream connection.
struct OutputConnection {
tx: mpsc::Sender<Packet>,
Expand Down Expand Up @@ -44,8 +46,18 @@ pub struct PinDistributorActor {
outputs_active_gauge: opentelemetry::metrics::Gauge<u64>,
/// Telemetry: time spent blocked on downstream backpressure (send().await)
send_wait_histogram: opentelemetry::metrics::Histogram<f64>,
/// Telemetry: depth of the distributor's incoming queue
queue_depth_gauge: opentelemetry::metrics::Gauge<u64>,
/// Telemetry: estimated backlog in bytes
queue_depth_bytes_gauge: opentelemetry::metrics::Gauge<u64>,
/// Telemetry: estimated backlog in media seconds (based on observed durations)
queue_depth_seconds_gauge: opentelemetry::metrics::Gauge<f64>,
/// Pre-built metric labels - allocated once in new(), reused on every packet
metric_labels: [opentelemetry::KeyValue; 2],
/// EWMA of packet size in bytes for this pin
avg_packet_size_bytes: f64,
/// EWMA of packet duration in seconds for this pin (when available)
avg_packet_duration_s: f64,
}

impl PinDistributorActor {
Expand Down Expand Up @@ -81,6 +93,20 @@ impl PinDistributorActor {
.f64_histogram("pin_distributor.send_wait_seconds")
.with_description("Time spent waiting for downstream capacity (backpressure)")
.build();
let queue_depth_gauge = meter
.u64_gauge("pin_distributor.queue_depth")
.with_description("Current backlog of packets waiting to be distributed")
.build();
let queue_depth_bytes_gauge = meter
.u64_gauge("pin_distributor.queue_depth_bytes")
.with_description("Estimated backlog (bytes) at pin distributor input")
.build();
let queue_depth_seconds_gauge = meter
.f64_gauge("pin_distributor.queue_depth_seconds")
.with_description(
"Estimated backlog (seconds) at pin distributor input (from packet timing)",
)
.build();

// Pre-build metric labels once - avoids allocation on every packet
let metric_labels = [
Expand All @@ -100,7 +126,12 @@ impl PinDistributorActor {
best_effort_drops_counter,
outputs_active_gauge,
send_wait_histogram,
queue_depth_gauge,
queue_depth_bytes_gauge,
queue_depth_seconds_gauge,
metric_labels,
avg_packet_size_bytes: 0.0,
avg_packet_duration_s: 0.0,
}
}

Expand Down Expand Up @@ -178,11 +209,41 @@ impl PinDistributorActor {
///
/// For `Reliable` connections: synchronized backpressure - waits for slow consumers.
/// For `BestEffort` connections: drops packets when buffer is full (no waiting).
#[allow(clippy::cognitive_complexity)] // Fan-out with mode handling requires multiple paths
#[allow(
clippy::cognitive_complexity,
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::cast_sign_loss
)] // Fan-out with mode handling requires multiple paths and metric estimation casts
async fn distribute_packet(&mut self, packet: Packet) {
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::sync::mpsc::error::TrySendError;

let queue_len = self.data_rx.len() as u64;
self.queue_depth_gauge.record(queue_len, &self.metric_labels);

let (pkt_bytes, pkt_duration_s) = Self::packet_stats(&packet);
self.avg_packet_size_bytes = if self.avg_packet_size_bytes == 0.0 {
pkt_bytes
} else {
self.avg_packet_size_bytes.mul_add(1.0 - EWMA_ALPHA, pkt_bytes * EWMA_ALPHA)
};
if let Some(dur) = pkt_duration_s {
self.avg_packet_duration_s = if self.avg_packet_duration_s == 0.0 {
dur
} else {
self.avg_packet_duration_s.mul_add(1.0 - EWMA_ALPHA, dur * EWMA_ALPHA)
};
}
let est_bytes = (self.avg_packet_size_bytes * queue_len as f64) as u64;
if est_bytes > 0 {
self.queue_depth_bytes_gauge.record(est_bytes, &self.metric_labels);
}
if self.avg_packet_duration_s > 0.0 {
let est_seconds = self.avg_packet_duration_s * queue_len as f64;
self.queue_depth_seconds_gauge.record(est_seconds, &self.metric_labels);
}

if self.outputs.is_empty() {
// No outputs configured - drop packet and record metric
// Use pre-built labels - no allocation on hot path
Expand Down Expand Up @@ -363,4 +424,31 @@ impl PinDistributorActor {
self.best_effort_drops_counter.add(best_effort_drops, &self.metric_labels);
}
}

/// Extract approximate size in bytes and optional duration (seconds) for a packet.
#[allow(clippy::cast_precision_loss)]
fn packet_stats(packet: &Packet) -> (f64, Option<f64>) {
match packet {
Packet::Audio(frame) => {
let bytes = (frame.samples.len() * std::mem::size_of::<f32>()) as f64;
let dur_s = frame.duration_us().map(|us| us as f64 / 1_000_000.0);
(bytes, dur_s)
},
Packet::Binary { data, metadata, .. } => {
let bytes = data.len() as f64;
let dur_s =
metadata.as_ref().and_then(|m| m.duration_us).map(|us| us as f64 / 1_000_000.0);
(bytes, dur_s)
},
Packet::Text(t) => (t.len() as f64, None),
Packet::Transcription(t) => (
t.text.len() as f64,
t.metadata.as_ref().and_then(|m| m.duration_us).map(|us| us as f64 / 1_000_000.0),
),
Packet::Custom(c) => (
c.data.to_string().len() as f64,
c.metadata.as_ref().and_then(|m| m.duration_us).map(|us| us as f64 / 1_000_000.0),
),
}
}
}
Loading
Loading