From c21343a95c3f18682d4d724adfd080aebacc05e8 Mon Sep 17 00:00:00 2001 From: Jack Fransham Date: Tue, 5 Mar 2024 14:22:09 +0100 Subject: [PATCH 1/3] Make `DynamicMixer` and `OutputStream` generic This commit adds a new trait, `Mixer`, and refactors some of the code around generic mixers in order to support it. This is to make more of the code reusable without rewriting - specifically, to support a feature that I would like to implement in `bevy_audio` where audio sources can send audio to an intermediate mixer. The motivation for this was to add a global limiter to all game audio, so that loud sounds did not cause clipping on the master bus, but another possible usecase would be to split in-game audio and menu audio into separate "worlds" and apply, for example, a low-pass filter while in the pause screen. --- src/dynamic_mixer.rs | 164 ++++++++++++++++++++++++++++++++++--------- src/stream.rs | 41 ++++++++--- 2 files changed, 164 insertions(+), 41 deletions(-) diff --git a/src/dynamic_mixer.rs b/src/dynamic_mixer.rs index 045aed8e..9e64c80a 100644 --- a/src/dynamic_mixer.rs +++ b/src/dynamic_mixer.rs @@ -27,21 +27,32 @@ where sample_rate, }); - let output = DynamicMixer { - current_sources: Vec::with_capacity(16), - input: input.clone(), - sample_count: 0, - still_pending: vec![], - still_current: vec![], - }; + let output = input.clone().into(); (input, output) } +/// A mixer that can be used as the global target of a sink. This should be used with `MixerDriver` and `MixerController`. +/// +/// Note that this does _not_ require a `Source` implementation - as the inputs are determined by `MixerDriver`, the +/// mixer itself cannot specify e.g. frame length or total duration. +// TODO: Should we allow mixers to specify frame lenght, +// TODO: Add an "advanced" mixer interface and implement it for any `T: Mixer`, so that things like the +// pending sources queue can be configured - e.g. for single-threaded mixers. +pub trait Mixer: Iterator { + /// Adds a new source to mix to the existing ones. + fn drain_sources( + &mut self, + sample_count: usize, + sources: &mut Vec + Send>>, + ); +} + /// The input of the mixer. pub struct DynamicMixerController { has_pending: AtomicBool, - pending_sources: Mutex + Send>>>, + // TODO: Make this configurable - we can probably use a lockfree queue + pending_sources: Mutex + Send + 'static>>>, channels: u16, sample_rate: u32, } @@ -60,32 +71,121 @@ where self.pending_sources .lock() .unwrap() - .push(Box::new(uniform_source) as Box<_>); + .push(Box::new(uniform_source) as _); self.has_pending.store(true, Ordering::SeqCst); // TODO: can we relax this ordering? } } -/// The output of the mixer. Implements `Source`. -pub struct DynamicMixer { +pub type DynamicMixer = MixerDriver>; + +/// A basic summing mixer. +pub struct BasicMixer { // The current iterator that produces samples. current_sources: Vec + Send>>, + // A temporary vec used in start_pending_sources. + still_pending: Vec + Send>>, + + // A temporary vec used in sum_current_sources. + still_current: Vec + Send>>, +} + +impl Default for BasicMixer { + fn default() -> Self { + Self { + current_sources: Vec::with_capacity(16), + still_pending: vec![], + still_current: vec![], + } + } +} + +/// The output of the mixer. Implements `Source`. +pub struct MixerDriver { // The pending sounds. + // TODO: Should this be `Weak`, since a new controller cannot be created once the + // original one has been dropped. input: Arc>, // The number of samples produced so far. sample_count: usize, - // A temporary vec used in start_pending_sources. - still_pending: Vec + Send>>, + mixer: M, +} - // A temporary vec used in sum_current_sources. - still_current: Vec + Send>>, +impl MixerDriver { + /// Create a new mixer driver, given an existing mixer. + /// + /// If the mixer implements `Default`, you can use `MixerDriver::new`. + pub fn from_mixer( + mixer: M, + channels: u16, + sample_rate: u32, + ) -> (Arc>, Self) { + let controller = Arc::new(DynamicMixerController { + has_pending: AtomicBool::new(false), + pending_sources: Mutex::new(Vec::new()), + channels, + sample_rate, + }); + + let mixer = Self { + mixer, + input: controller.clone(), + sample_count: 0, + }; + + (controller, mixer) + } } -impl Source for DynamicMixer +impl MixerDriver where - S: Sample + Send + 'static, + M: Default, +{ + /// Create a new mixer driver, along with a controller to control the mixer by. + pub fn new(channels: u16, sample_rate: u32) -> (Arc>, Self) { + Self::from_mixer(Default::default(), channels, sample_rate) + } +} + +impl From>> for MixerDriver +where + M: Default, +{ + fn from(value: Arc>) -> Self { + Self { + mixer: Default::default(), + input: value, + sample_count: 0, + } + } +} + +impl Iterator for MixerDriver +where + M: Mixer + ?Sized, +{ + type Item = M::Item; + + fn next(&mut self) -> Option { + if self.input.has_pending.load(Ordering::SeqCst) { + let mut pending = self.input.pending_sources.lock().unwrap(); // TODO: relax ordering? + self.mixer.drain_sources(self.sample_count, &mut *pending); + let has_pending = !pending.is_empty(); + self.input.has_pending.store(has_pending, Ordering::SeqCst); // TODO: relax ordering? + } + + self.sample_count += 1; + + self.mixer.next() + } +} + +impl Source for MixerDriver +where + M: Mixer + ?Sized, + M::Item: Sample, { #[inline] fn current_frame_len(&self) -> Option { @@ -108,7 +208,7 @@ where } } -impl Iterator for DynamicMixer +impl Iterator for BasicMixer where S: Sample + Send + 'static, { @@ -116,12 +216,6 @@ where #[inline] fn next(&mut self) -> Option { - if self.input.has_pending.load(Ordering::SeqCst) { - self.start_pending_sources(); - } - - self.sample_count += 1; - let sum = self.sum_current_sources(); if self.current_sources.is_empty() { @@ -137,7 +231,7 @@ where } } -impl DynamicMixer +impl Mixer for BasicMixer where S: Sample + Send + 'static, { @@ -145,11 +239,13 @@ where // We need to ensure we start playing sources so that their samples are // in-step with the modulo of the samples produced so far. Otherwise, the // sound will play on the wrong channels, e.g. left / right will be reversed. - fn start_pending_sources(&mut self) { - let mut pending = self.input.pending_sources.lock().unwrap(); // TODO: relax ordering? - + fn drain_sources( + &mut self, + sample_count: usize, + pending: &mut Vec + Send>>, + ) { for source in pending.drain(..) { - let in_step = self.sample_count % source.channels() as usize == 0; + let in_step = sample_count % source.channels() as usize == 0; if in_step { self.current_sources.push(source); @@ -157,12 +253,14 @@ where self.still_pending.push(source); } } - std::mem::swap(&mut self.still_pending, &mut pending); - - let has_pending = !pending.is_empty(); - self.input.has_pending.store(has_pending, Ordering::SeqCst); // TODO: relax ordering? + std::mem::swap(&mut self.still_pending, pending); } +} +impl BasicMixer +where + S: Sample + Send + 'static, +{ fn sum_current_sources(&mut self) -> S { let mut sum = S::zero_value(); diff --git a/src/stream.rs b/src/stream.rs index 59a4c917..681c9e06 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,3 +1,4 @@ +use std::any::Any; use std::io::{Read, Seek}; use std::marker::Sync; use std::sync::{Arc, Weak}; @@ -10,12 +11,12 @@ use crate::source::Source; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::{Sample, SupportedStreamConfig}; -/// `cpal::Stream` container. Also see the more useful `OutputStreamHandle`. +/// Hosted stream container, usually a wrapper around `cpal::Stream`. Also see the more useful `OutputStreamHandle`. /// /// If this is dropped playback will end & attached `OutputStreamHandle`s will no longer work. -pub struct OutputStream { +pub struct OutputStream { mixer: Arc>, - _stream: cpal::Stream, + _stream: S, } /// More flexible handle to a `OutputStream` that provides playback. @@ -44,11 +45,7 @@ impl OutputStream { ) -> Result<(Self, OutputStreamHandle), StreamError> { let (mixer, _stream) = device.try_new_output_stream_config(config)?; _stream.play()?; - let out = Self { mixer, _stream }; - let handle = OutputStreamHandle { - mixer: Arc::downgrade(&out.mixer), - }; - Ok((out, handle)) + Ok(Self::new(mixer, _stream)) } /// Return a new stream & handle using the default output device. @@ -75,6 +72,34 @@ impl OutputStream { } } +impl OutputStream { + /// Create a new `OutputStream` from a mixer controller, plus any data that should be dropped when this stream is dropped. + /// + /// This can be used to create an output stream that writes to an intermediate mixer. + pub fn new( + mixer: Arc>, + extra_data: T, + ) -> (Self, OutputStreamHandle) { + let out = Self { + mixer, + _stream: extra_data, + }; + let handle = OutputStreamHandle { + mixer: Arc::downgrade(&out.mixer), + }; + (out, handle) + } +} + +impl OutputStream +where + T: 'static, +{ + pub fn into_any(self) -> Box> { + Box::new(self) as _ + } +} + impl OutputStreamHandle { /// Plays a source with a device until it ends. pub fn play_raw(&self, source: S) -> Result<(), PlayError> From fc08c0787617e5e72427edbc0b10cf2501b750c4 Mon Sep 17 00:00:00 2001 From: Jack Fransham Date: Tue, 5 Mar 2024 16:42:42 +0100 Subject: [PATCH 2/3] Fix mixer not implementing `sync` --- src/dynamic_mixer.rs | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/dynamic_mixer.rs b/src/dynamic_mixer.rs index 9e64c80a..2def7997 100644 --- a/src/dynamic_mixer.rs +++ b/src/dynamic_mixer.rs @@ -7,6 +7,8 @@ use std::time::Duration; use crate::source::{Source, UniformSourceIterator}; use crate::Sample; +type DynSource = Box + Send + Sync + 'static>; + /// Builds a new mixer. /// /// You can choose the characteristics of the output thanks to this constructor. All the sounds @@ -18,7 +20,7 @@ pub fn mixer( sample_rate: u32, ) -> (Arc>, DynamicMixer) where - S: Sample + Send + 'static, + S: Sample + Send + Sync + 'static, { let input = Arc::new(DynamicMixerController { has_pending: AtomicBool::new(false), @@ -44,7 +46,7 @@ pub trait Mixer: Iterator { fn drain_sources( &mut self, sample_count: usize, - sources: &mut Vec + Send>>, + sources: &mut Vec>, ); } @@ -52,20 +54,20 @@ pub trait Mixer: Iterator { pub struct DynamicMixerController { has_pending: AtomicBool, // TODO: Make this configurable - we can probably use a lockfree queue - pending_sources: Mutex + Send + 'static>>>, + pending_sources: Mutex>>, channels: u16, sample_rate: u32, } impl DynamicMixerController where - S: Sample + Send + 'static, + S: Sample + Send + Sync + 'static, { /// Adds a new source to mix to the existing ones. #[inline] pub fn add(&self, source: T) where - T: Source + Send + 'static, + T: Source + Send + Sync + 'static, { let uniform_source = UniformSourceIterator::new(source, self.channels, self.sample_rate); self.pending_sources @@ -81,13 +83,13 @@ pub type DynamicMixer = MixerDriver>; /// A basic summing mixer. pub struct BasicMixer { // The current iterator that produces samples. - current_sources: Vec + Send>>, + current_sources: Vec>, // A temporary vec used in start_pending_sources. - still_pending: Vec + Send>>, + still_pending: Vec>, // A temporary vec used in sum_current_sources. - still_current: Vec + Send>>, + still_current: Vec>, } impl Default for BasicMixer { @@ -210,7 +212,7 @@ where impl Iterator for BasicMixer where - S: Sample + Send + 'static, + S: Sample + Send + Sync + 'static, { type Item = S; @@ -233,7 +235,7 @@ where impl Mixer for BasicMixer where - S: Sample + Send + 'static, + S: Sample + Send + Sync + 'static, { // Samples from the #next() function are interlaced for each of the channels. // We need to ensure we start playing sources so that their samples are @@ -242,7 +244,7 @@ where fn drain_sources( &mut self, sample_count: usize, - pending: &mut Vec + Send>>, + pending: &mut Vec>, ) { for source in pending.drain(..) { let in_step = sample_count % source.channels() as usize == 0; @@ -259,7 +261,7 @@ where impl BasicMixer where - S: Sample + Send + 'static, + S: Sample + Send + Sync + 'static, { fn sum_current_sources(&mut self) -> S { let mut sum = S::zero_value(); From 1031592d0e4b8091e333efd0fd9d02851299d012 Mon Sep 17 00:00:00 2001 From: Jack Fransham Date: Tue, 5 Mar 2024 17:02:54 +0100 Subject: [PATCH 3/3] Remove sync bounds from mixer --- src/dynamic_mixer.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/dynamic_mixer.rs b/src/dynamic_mixer.rs index 2def7997..2466209e 100644 --- a/src/dynamic_mixer.rs +++ b/src/dynamic_mixer.rs @@ -7,7 +7,7 @@ use std::time::Duration; use crate::source::{Source, UniformSourceIterator}; use crate::Sample; -type DynSource = Box + Send + Sync + 'static>; +type DynSource = Box + Send + 'static>; /// Builds a new mixer. /// @@ -20,7 +20,7 @@ pub fn mixer( sample_rate: u32, ) -> (Arc>, DynamicMixer) where - S: Sample + Send + Sync + 'static, + S: Sample + Send + 'static, { let input = Arc::new(DynamicMixerController { has_pending: AtomicBool::new(false), @@ -61,13 +61,13 @@ pub struct DynamicMixerController { impl DynamicMixerController where - S: Sample + Send + Sync + 'static, + S: Sample + Send + 'static, { /// Adds a new source to mix to the existing ones. #[inline] pub fn add(&self, source: T) where - T: Source + Send + Sync + 'static, + T: Source + Send + 'static, { let uniform_source = UniformSourceIterator::new(source, self.channels, self.sample_rate); self.pending_sources @@ -212,7 +212,7 @@ where impl Iterator for BasicMixer where - S: Sample + Send + Sync + 'static, + S: Sample + Send + 'static, { type Item = S; @@ -235,7 +235,7 @@ where impl Mixer for BasicMixer where - S: Sample + Send + Sync + 'static, + S: Sample + Send + 'static, { // Samples from the #next() function are interlaced for each of the channels. // We need to ensure we start playing sources so that their samples are @@ -261,7 +261,7 @@ where impl BasicMixer where - S: Sample + Send + Sync + 'static, + S: Sample + Send + 'static, { fn sum_current_sources(&mut self) -> S { let mut sum = S::zero_value();