Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 134 additions & 34 deletions src/dynamic_mixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::time::Duration;
use crate::source::{Source, UniformSourceIterator};
use crate::Sample;

type DynSource<S> = Box<dyn Source<Item = S> + Send + 'static>;

/// Builds a new mixer.
///
/// You can choose the characteristics of the output thanks to this constructor. All the sounds
Expand All @@ -27,21 +29,32 @@ where
sample_rate,
});

let output = DynamicMixer {
current_sources: Vec::with_capacity(16),
input: input.clone(),
sample_count: 0,
still_pending: vec![],
still_current: vec![],
};
let output = input.clone().into();

(input, output)
}

/// A mixer that can be used as the global target of a sink. This should be used with `MixerDriver` and `MixerController`.
///
/// Note that this does _not_ require a `Source` implementation - as the inputs are determined by `MixerDriver`, the
/// mixer itself cannot specify e.g. frame length or total duration.
// TODO: Should we allow mixers to specify frame lenght,
// TODO: Add an "advanced" mixer interface and implement it for any `T: Mixer`, so that things like the
// pending sources queue can be configured - e.g. for single-threaded mixers.
pub trait Mixer: Iterator {
/// Adds a new source to mix to the existing ones.
fn drain_sources(
&mut self,
sample_count: usize,
sources: &mut Vec<DynSource<Self::Item>>,
);
}

/// The input of the mixer.
pub struct DynamicMixerController<S> {
has_pending: AtomicBool,
pending_sources: Mutex<Vec<Box<dyn Source<Item = S> + Send>>>,
// TODO: Make this configurable - we can probably use a lockfree queue
pending_sources: Mutex<Vec<DynSource<S>>>,
channels: u16,
sample_rate: u32,
}
Expand All @@ -60,32 +73,121 @@ where
self.pending_sources
.lock()
.unwrap()
.push(Box::new(uniform_source) as Box<_>);
.push(Box::new(uniform_source) as _);
self.has_pending.store(true, Ordering::SeqCst); // TODO: can we relax this ordering?
}
}

/// The output of the mixer. Implements `Source`.
pub struct DynamicMixer<S> {
pub type DynamicMixer<S> = MixerDriver<S, BasicMixer<S>>;

/// A basic summing mixer.
pub struct BasicMixer<S> {
// The current iterator that produces samples.
current_sources: Vec<Box<dyn Source<Item = S> + Send>>,
current_sources: Vec<DynSource<S>>,

// A temporary vec used in start_pending_sources.
still_pending: Vec<DynSource<S>>,

// A temporary vec used in sum_current_sources.
still_current: Vec<DynSource<S>>,
}

impl<S> Default for BasicMixer<S> {
fn default() -> Self {
Self {
current_sources: Vec::with_capacity(16),
still_pending: vec![],
still_current: vec![],
}
}
}

/// The output of the mixer. Implements `Source`.
pub struct MixerDriver<S, M: ?Sized> {
// The pending sounds.
// TODO: Should this be `Weak`, since a new controller cannot be created once the
// original one has been dropped.
input: Arc<DynamicMixerController<S>>,

// The number of samples produced so far.
sample_count: usize,

// A temporary vec used in start_pending_sources.
still_pending: Vec<Box<dyn Source<Item = S> + Send>>,
mixer: M,
}

// A temporary vec used in sum_current_sources.
still_current: Vec<Box<dyn Source<Item = S> + Send>>,
impl<S, M> MixerDriver<S, M> {
/// Create a new mixer driver, given an existing mixer.
///
/// If the mixer implements `Default`, you can use `MixerDriver::new`.
pub fn from_mixer(
mixer: M,
channels: u16,
sample_rate: u32,
) -> (Arc<DynamicMixerController<S>>, Self) {
let controller = Arc::new(DynamicMixerController {
has_pending: AtomicBool::new(false),
pending_sources: Mutex::new(Vec::new()),
channels,
sample_rate,
});

let mixer = Self {
mixer,
input: controller.clone(),
sample_count: 0,
};

(controller, mixer)
}
}

impl<S> Source for DynamicMixer<S>
impl<S, M> MixerDriver<S, M>
where
S: Sample + Send + 'static,
M: Default,
{
/// Create a new mixer driver, along with a controller to control the mixer by.
pub fn new(channels: u16, sample_rate: u32) -> (Arc<DynamicMixerController<S>>, Self) {
Self::from_mixer(Default::default(), channels, sample_rate)
}
}

impl<S, M> From<Arc<DynamicMixerController<S>>> for MixerDriver<S, M>
where
M: Default,
{
fn from(value: Arc<DynamicMixerController<S>>) -> Self {
Self {
mixer: Default::default(),
input: value,
sample_count: 0,
}
}
}

impl<M> Iterator for MixerDriver<M::Item, M>
where
M: Mixer + ?Sized,
{
type Item = M::Item;

fn next(&mut self) -> Option<Self::Item> {
if self.input.has_pending.load(Ordering::SeqCst) {
let mut pending = self.input.pending_sources.lock().unwrap(); // TODO: relax ordering?
self.mixer.drain_sources(self.sample_count, &mut *pending);
let has_pending = !pending.is_empty();
self.input.has_pending.store(has_pending, Ordering::SeqCst); // TODO: relax ordering?
}

self.sample_count += 1;

self.mixer.next()
}
}

impl<M> Source for MixerDriver<M::Item, M>
where
M: Mixer + ?Sized,
M::Item: Sample,
{
#[inline]
fn current_frame_len(&self) -> Option<usize> {
Expand All @@ -108,20 +210,14 @@ where
}
}

impl<S> Iterator for DynamicMixer<S>
impl<S> Iterator for BasicMixer<S>
where
S: Sample + Send + 'static,
{
type Item = S;

#[inline]
fn next(&mut self) -> Option<S> {
if self.input.has_pending.load(Ordering::SeqCst) {
self.start_pending_sources();
}

self.sample_count += 1;

let sum = self.sum_current_sources();

if self.current_sources.is_empty() {
Expand All @@ -137,32 +233,36 @@ where
}
}

impl<S> DynamicMixer<S>
impl<S> Mixer for BasicMixer<S>
where
S: Sample + Send + 'static,
{
// Samples from the #next() function are interlaced for each of the channels.
// We need to ensure we start playing sources so that their samples are
// in-step with the modulo of the samples produced so far. Otherwise, the
// sound will play on the wrong channels, e.g. left / right will be reversed.
fn start_pending_sources(&mut self) {
let mut pending = self.input.pending_sources.lock().unwrap(); // TODO: relax ordering?

fn drain_sources(
&mut self,
sample_count: usize,
pending: &mut Vec<DynSource<S>>,
) {
for source in pending.drain(..) {
let in_step = self.sample_count % source.channels() as usize == 0;
let in_step = sample_count % source.channels() as usize == 0;

if in_step {
self.current_sources.push(source);
} else {
self.still_pending.push(source);
}
}
std::mem::swap(&mut self.still_pending, &mut pending);

let has_pending = !pending.is_empty();
self.input.has_pending.store(has_pending, Ordering::SeqCst); // TODO: relax ordering?
std::mem::swap(&mut self.still_pending, pending);
}
}

impl<S> BasicMixer<S>
where
S: Sample + Send + 'static,
{
fn sum_current_sources(&mut self) -> S {
let mut sum = S::zero_value();

Expand Down
41 changes: 33 additions & 8 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::any::Any;
use std::io::{Read, Seek};
use std::marker::Sync;
use std::sync::{Arc, Weak};
Expand All @@ -10,12 +11,12 @@ use crate::source::Source;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{Sample, SupportedStreamConfig};

/// `cpal::Stream` container. Also see the more useful `OutputStreamHandle`.
/// Hosted stream container, usually a wrapper around `cpal::Stream`. Also see the more useful `OutputStreamHandle`.
///
/// If this is dropped playback will end & attached `OutputStreamHandle`s will no longer work.
pub struct OutputStream {
pub struct OutputStream<S: ?Sized = cpal::Stream> {
mixer: Arc<DynamicMixerController<f32>>,
_stream: cpal::Stream,
_stream: S,
}

/// More flexible handle to a `OutputStream` that provides playback.
Expand Down Expand Up @@ -44,11 +45,7 @@ impl OutputStream {
) -> Result<(Self, OutputStreamHandle), StreamError> {
let (mixer, _stream) = device.try_new_output_stream_config(config)?;
_stream.play()?;
let out = Self { mixer, _stream };
let handle = OutputStreamHandle {
mixer: Arc::downgrade(&out.mixer),
};
Ok((out, handle))
Ok(Self::new(mixer, _stream))
}

/// Return a new stream & handle using the default output device.
Expand All @@ -75,6 +72,34 @@ impl OutputStream {
}
}

impl<T> OutputStream<T> {
/// Create a new `OutputStream` from a mixer controller, plus any data that should be dropped when this stream is dropped.
///
/// This can be used to create an output stream that writes to an intermediate mixer.
pub fn new(
mixer: Arc<DynamicMixerController<f32>>,
extra_data: T,
) -> (Self, OutputStreamHandle) {
let out = Self {
mixer,
_stream: extra_data,
};
let handle = OutputStreamHandle {
mixer: Arc::downgrade(&out.mixer),
};
(out, handle)
}
}

impl<T> OutputStream<T>
where
T: 'static,
{
pub fn into_any(self) -> Box<OutputStream<dyn Any>> {
Box::new(self) as _
}
}

impl OutputStreamHandle {
/// Plays a source with a device until it ends.
pub fn play_raw<S>(&self, source: S) -> Result<(), PlayError>
Expand Down