From fff9b9b0bb236b291bdc6ad9b3b3eed61dc827ef Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sat, 27 Sep 2025 23:48:24 +0200 Subject: [PATCH 01/11] feat: make Stream implement Send on macOS --- CHANGELOG.md | 1 + src/host/coreaudio/macos/device.rs | 143 +++++++++++----------- src/host/coreaudio/macos/enumerate.rs | 3 - src/host/coreaudio/macos/mod.rs | 164 ++++++++++++++++++++++++-- 4 files changed, 222 insertions(+), 89 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dd5d47ac0..e500b0362 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ - CoreAudio: Update `mach2` to 0.5. - CoreAudio: Configure device buffer to ensure predictable callback buffer sizes. - CoreAudio: Fix timestamp accuracy. +- CoreAudio: Make `Stream` implement `Send` on macOS by refactoring internal synchronization. - Emscripten: Add `BufferSize::Fixed` validation against supported range. - iOS: Fix example by properly activating audio session. - iOS: Add complete AVAudioSession integration for device enumeration and buffer size control. diff --git a/src/host/coreaudio/macos/device.rs b/src/host/coreaudio/macos/device.rs index 0d5150fcd..501b5b083 100644 --- a/src/host/coreaudio/macos/device.rs +++ b/src/host/coreaudio/macos/device.rs @@ -18,13 +18,12 @@ use objc2_audio_toolbox::{ }; use objc2_core_audio::{ kAudioDevicePropertyAvailableNominalSampleRates, kAudioDevicePropertyBufferFrameSize, - kAudioDevicePropertyBufferFrameSizeRange, kAudioDevicePropertyDeviceIsAlive, - kAudioDevicePropertyNominalSampleRate, kAudioDevicePropertyStreamConfiguration, - kAudioDevicePropertyStreamFormat, kAudioObjectPropertyElementMaster, - kAudioObjectPropertyScopeGlobal, kAudioObjectPropertyScopeInput, - kAudioObjectPropertyScopeOutput, AudioDeviceID, AudioObjectGetPropertyData, - AudioObjectGetPropertyDataSize, AudioObjectID, AudioObjectPropertyAddress, - AudioObjectPropertyScope, AudioObjectSetPropertyData, + kAudioDevicePropertyBufferFrameSizeRange, kAudioDevicePropertyNominalSampleRate, + kAudioDevicePropertyStreamConfiguration, kAudioDevicePropertyStreamFormat, + kAudioObjectPropertyElementMaster, kAudioObjectPropertyScopeGlobal, + kAudioObjectPropertyScopeInput, kAudioObjectPropertyScopeOutput, AudioDeviceID, + AudioObjectGetPropertyData, AudioObjectGetPropertyDataSize, AudioObjectID, + AudioObjectPropertyAddress, AudioObjectPropertyScope, AudioObjectSetPropertyData, }; use objc2_core_audio_types::{ AudioBuffer, AudioBufferList, AudioStreamBasicDescription, AudioValueRange, @@ -36,7 +35,6 @@ pub use super::enumerate::{ use std::fmt; use std::mem::{self}; use std::ptr::{null, NonNull}; -use std::rc::Rc; use std::slice; use std::sync::mpsc::{channel, RecvTimeoutError}; use std::sync::{Arc, Mutex}; @@ -253,45 +251,6 @@ fn get_io_buffer_frame_size_range( }) } -/// Register the on-disconnect callback. -/// This will both stop the stream and call the error callback with DeviceNotAvailable. -/// This function should only be called once per stream. -fn add_disconnect_listener( - stream: &Stream, - error_callback: Arc>, -) -> Result<(), BuildStreamError> -where - E: FnMut(StreamError) + Send + 'static, -{ - let stream_inner_weak = Rc::downgrade(&stream.inner); - let mut stream_inner = stream.inner.borrow_mut(); - stream_inner._disconnect_listener = Some(AudioObjectPropertyListener::new( - stream_inner.device_id, - AudioObjectPropertyAddress { - mSelector: kAudioDevicePropertyDeviceIsAlive, - mScope: kAudioObjectPropertyScopeGlobal, - mElement: kAudioObjectPropertyElementMaster, - }, - move || { - if let Some(stream_inner_strong) = stream_inner_weak.upgrade() { - match stream_inner_strong.try_borrow_mut() { - Ok(mut stream_inner) => { - let _ = stream_inner.pause(); - } - Err(_) => { - // Could not acquire mutable borrow. This can occur if there are - // overlapping borrows, if the stream is already in use, or if a panic - // occurred during a previous borrow. Still notify about device - // disconnection even if we can't pause. - } - } - (error_callback.lock().unwrap())(StreamError::DeviceNotAvailable); - } - }, - )?); - Ok(()) -} - impl DeviceTrait for Device { type SupportedInputConfigs = SupportedInputConfigs; type SupportedOutputConfigs = SupportedOutputConfigs; @@ -750,21 +709,38 @@ impl Device { Ok(()) })?; - let stream = Stream::new(StreamInner { - playing: true, - _disconnect_listener: None, - audio_unit, - device_id: self.audio_device_id, - _loopback_device: loopback_aggregate, - }); - - // If we didn't request the default device, stop the stream if the - // device disconnects. - if !is_default_device(self) { - add_disconnect_listener(&stream, error_callback_disconnect)?; - } + // Create error callback for stream - either dummy or real based on device type + let error_callback_for_stream: super::ErrorCallback = if is_default_device(self) { + Box::new(|_: StreamError| {}) + } else { + let error_callback_clone = error_callback_disconnect.clone(); + Box::new(move |err: StreamError| { + if let Ok(mut cb) = error_callback_clone.lock() { + cb(err); + } + }) + }; + + let stream = Stream::new( + StreamInner { + playing: true, + audio_unit, + device_id: self.audio_device_id, + _loopback_device: loopback_aggregate, + }, + error_callback_for_stream, + )?; - stream.inner.borrow_mut().audio_unit.start()?; + stream + .inner + .lock() + .map_err(|_| BuildStreamError::BackendSpecific { + err: BackendSpecificError { + description: "Failed to acquire stream lock".to_string(), + }, + })? + .audio_unit + .start()?; Ok(stream) } @@ -838,21 +814,38 @@ impl Device { Ok(()) })?; - let stream = Stream::new(StreamInner { - playing: true, - _disconnect_listener: None, - audio_unit, - device_id: self.audio_device_id, - _loopback_device: None, - }); - - // If we didn't request the default device, stop the stream if the - // device disconnects. - if !is_default_device(self) { - add_disconnect_listener(&stream, error_callback_disconnect)?; - } + // Create error callback for stream - either dummy or real based on device type + let error_callback_for_stream: super::ErrorCallback = if is_default_device(self) { + Box::new(|_: StreamError| {}) + } else { + let error_callback_clone = error_callback_disconnect.clone(); + Box::new(move |err: StreamError| { + if let Ok(mut cb) = error_callback_clone.lock() { + cb(err); + } + }) + }; + + let stream = Stream::new( + StreamInner { + playing: true, + audio_unit, + device_id: self.audio_device_id, + _loopback_device: None, + }, + error_callback_for_stream, + )?; - stream.inner.borrow_mut().audio_unit.start()?; + stream + .inner + .lock() + .map_err(|_| BuildStreamError::BackendSpecific { + err: BackendSpecificError { + description: "Failed to acquire stream lock".to_string(), + }, + })? + .audio_unit + .start()?; Ok(stream) } diff --git a/src/host/coreaudio/macos/enumerate.rs b/src/host/coreaudio/macos/enumerate.rs index 806044883..4f337352f 100644 --- a/src/host/coreaudio/macos/enumerate.rs +++ b/src/host/coreaudio/macos/enumerate.rs @@ -73,9 +73,6 @@ impl Devices { } } -unsafe impl Send for Devices {} -unsafe impl Sync for Devices {} - impl Iterator for Devices { type Item = Device; fn next(&mut self) -> Option { diff --git a/src/host/coreaudio/macos/mod.rs b/src/host/coreaudio/macos/mod.rs index e29d05768..1a9be759c 100644 --- a/src/host/coreaudio/macos/mod.rs +++ b/src/host/coreaudio/macos/mod.rs @@ -7,11 +7,14 @@ use crate::traits::{HostTrait, StreamTrait}; use crate::{BackendSpecificError, DevicesError, PauseStreamError, PlayStreamError}; use coreaudio::audio_unit::AudioUnit; use objc2_core_audio::AudioDeviceID; -use std::cell::RefCell; -use std::rc::Rc; +use std::sync::{Arc, Mutex, Weak}; pub use self::enumerate::{default_input_device, default_output_device, Devices}; +use objc2_core_audio::{ + kAudioDevicePropertyDeviceIsAlive, kAudioObjectPropertyElementMain, + kAudioObjectPropertyScopeGlobal, AudioObjectPropertyAddress, +}; use property_listener::AudioObjectPropertyListener; mod device; @@ -52,11 +55,85 @@ impl HostTrait for Host { } } +/// Type alias for the error callback to reduce complexity +type ErrorCallback = Box; + +/// Manages device disconnection listener separately from StreamInner to allow Stream to be Send. +/// The AudioObjectPropertyListener contains non-Send closures, so we keep it separate and +/// manage its lifetime independently. +struct DisconnectManager { + _listener: AudioObjectPropertyListener, +} + +impl DisconnectManager { + /// Create a new DisconnectManager that monitors device disconnection + /// + /// # Safety + /// + /// The caller must ensure: + /// - `device_id` is a valid AudioDeviceID + /// - The error callback is Send + 'static + fn new( + device_id: AudioDeviceID, + stream_weak: Weak>, + error_callback: Arc>, + ) -> Result, crate::BuildStreamError> { + let property_address = AudioObjectPropertyAddress { + mSelector: kAudioDevicePropertyDeviceIsAlive, + mScope: kAudioObjectPropertyScopeGlobal, + mElement: kAudioObjectPropertyElementMain, + }; + + let callback = { + let stream_weak = stream_weak.clone(); + let error_callback = error_callback.clone(); + move || { + // Check if stream still exists + if let Some(stream_arc) = stream_weak.upgrade() { + // First, try to pause the stream to stop playback + match stream_arc.lock() { + Ok(mut stream_inner) => { + let _ = stream_inner.pause(); + } + Err(_) => { + // Could not acquire lock. This can occur if there are + // overlapping locks, if the stream is already in use, or if a panic + // occurred during a previous lock. Still notify about device + // disconnection even if we can't pause. + } + } + + // Call the error callback to notify about device disconnection + if let Ok(mut cb) = error_callback.lock() { + cb(crate::StreamError::DeviceNotAvailable); + } + } + } + }; + + let listener = AudioObjectPropertyListener::new(device_id, property_address, callback) + .map_err(|e| { + let description = format!("Failed to create disconnect listener: {e}"); + crate::BuildStreamError::BackendSpecific { + err: crate::BackendSpecificError { description }, + } + })?; + + Ok(Arc::new(DisconnectManager { + _listener: listener, + })) + } +} + +// SAFETY: DisconnectManager is only used to keep the AudioObjectPropertyListener alive +// and is never actually accessed across threads. The AudioObjectPropertyListener +// itself handles thread safety for the CoreAudio callback internally. +unsafe impl Send for DisconnectManager {} +unsafe impl Sync for DisconnectManager {} + struct StreamInner { playing: bool, audio_unit: AudioUnit, - /// Manage the lifetime of the closure that handles device disconnection. - _disconnect_listener: Option, // Track the device with which the audio unit was spawned. // // We must do this so that we can avoid changing the device sample rate if there is already @@ -96,26 +173,54 @@ impl StreamInner { #[derive(Clone)] pub struct Stream { - inner: Rc>, + inner: Arc>, + // Manages the device disconnection listener separately to allow Stream to be Send. + // The DisconnectManager contains the non-Send AudioObjectPropertyListener. + _disconnect_manager: Arc, } impl Stream { - fn new(inner: StreamInner) -> Self { - Self { - inner: Rc::new(RefCell::new(inner)), - } + fn new( + inner: StreamInner, + error_callback: ErrorCallback, + ) -> Result { + let device_id = inner.device_id; + let inner_arc = Arc::new(Mutex::new(inner)); + let weak_inner = Arc::downgrade(&inner_arc); + + let error_callback = Arc::new(Mutex::new(error_callback)); + let disconnect_manager = DisconnectManager::new(device_id, weak_inner, error_callback)?; + + Ok(Self { + inner: inner_arc, + _disconnect_manager: disconnect_manager, + }) } } impl StreamTrait for Stream { fn play(&self) -> Result<(), PlayStreamError> { - let mut stream = self.inner.borrow_mut(); + let mut stream = self + .inner + .lock() + .map_err(|_| PlayStreamError::BackendSpecific { + err: BackendSpecificError { + description: "Failed to acquire stream lock".to_string(), + }, + })?; stream.play() } fn pause(&self) -> Result<(), PauseStreamError> { - let mut stream = self.inner.borrow_mut(); + let mut stream = self + .inner + .lock() + .map_err(|_| PauseStreamError::BackendSpecific { + err: BackendSpecificError { + description: "Failed to acquire stream lock".to_string(), + }, + })?; stream.pause() } @@ -123,6 +228,43 @@ impl StreamTrait for Stream { #[cfg(test)] mod test { + #[test] + fn test_stream_thread_transfer() { + let host = default_host(); + let device = host.default_output_device().unwrap(); + + let mut supported_configs_range = device.supported_output_configs().unwrap(); + let supported_config = supported_configs_range + .next() + .unwrap() + .with_max_sample_rate(); + let config = supported_config.config(); + + let stream = device + .build_output_stream( + &config, + write_silence::, + move |err| println!("Error: {err}"), + None, + ) + .unwrap(); + + // Move stream to another thread and back - this should compile and work + let handle = std::thread::spawn(move || { + // Stream is now owned by this thread + stream.play().unwrap(); + std::thread::sleep(std::time::Duration::from_millis(100)); + stream.pause().unwrap(); + stream // Return stream back to main thread + }); + + let stream = handle.join().unwrap(); + // Stream is back in main thread + stream.play().unwrap(); + std::thread::sleep(std::time::Duration::from_millis(100)); + stream.pause().unwrap(); + } + use crate::{ default_host, traits::{DeviceTrait, HostTrait, StreamTrait}, From ac4624a444364ea70c0228b29352a4045b22e11a Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sun, 28 Sep 2025 20:18:01 +0200 Subject: [PATCH 02/11] refactor: move disconnect handling to dedicated thread --- src/host/coreaudio/macos/mod.rs | 94 ++++++++++++++++----------------- 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/src/host/coreaudio/macos/mod.rs b/src/host/coreaudio/macos/mod.rs index 1a9be759c..3d0ce6345 100644 --- a/src/host/coreaudio/macos/mod.rs +++ b/src/host/coreaudio/macos/mod.rs @@ -7,7 +7,7 @@ use crate::traits::{HostTrait, StreamTrait}; use crate::{BackendSpecificError, DevicesError, PauseStreamError, PlayStreamError}; use coreaudio::audio_unit::AudioUnit; use objc2_core_audio::AudioDeviceID; -use std::sync::{Arc, Mutex, Weak}; +use std::sync::{mpsc, Arc, Mutex, Weak}; pub use self::enumerate::{default_input_device, default_output_device, Devices}; @@ -58,38 +58,50 @@ impl HostTrait for Host { /// Type alias for the error callback to reduce complexity type ErrorCallback = Box; -/// Manages device disconnection listener separately from StreamInner to allow Stream to be Send. -/// The AudioObjectPropertyListener contains non-Send closures, so we keep it separate and -/// manage its lifetime independently. +/// Manages device disconnection listener on a dedicated thread to ensure the +/// AudioObjectPropertyListener is always created and dropped on the same thread. +/// This avoids potential threading issues with CoreAudio APIs. struct DisconnectManager { - _listener: AudioObjectPropertyListener, + _shutdown_tx: mpsc::Sender<()>, } impl DisconnectManager { - /// Create a new DisconnectManager that monitors device disconnection - /// - /// # Safety - /// - /// The caller must ensure: - /// - `device_id` is a valid AudioDeviceID - /// - The error callback is Send + 'static + /// Create a new DisconnectManager that monitors device disconnection on a dedicated thread fn new( device_id: AudioDeviceID, stream_weak: Weak>, error_callback: Arc>, - ) -> Result, crate::BuildStreamError> { - let property_address = AudioObjectPropertyAddress { - mSelector: kAudioDevicePropertyDeviceIsAlive, - mScope: kAudioObjectPropertyScopeGlobal, - mElement: kAudioObjectPropertyElementMain, - }; - - let callback = { - let stream_weak = stream_weak.clone(); - let error_callback = error_callback.clone(); - move || { + ) -> Result { + let (shutdown_tx, shutdown_rx) = mpsc::channel(); + let (disconnect_tx, disconnect_rx) = mpsc::channel(); + + // Spawn dedicated thread to own the AudioObjectPropertyListener + let disconnect_tx_clone = disconnect_tx.clone(); + std::thread::spawn(move || { + let property_address = AudioObjectPropertyAddress { + mSelector: kAudioDevicePropertyDeviceIsAlive, + mScope: kAudioObjectPropertyScopeGlobal, + mElement: kAudioObjectPropertyElementMain, + }; + + // Create the listener on this dedicated thread + let _listener = + AudioObjectPropertyListener::new(device_id, property_address, move || { + let _ = disconnect_tx_clone.send(()); + }) + .unwrap(); + + // Drop the listener on this thread after receiving a shutdown signal + let _ = shutdown_rx.recv(); + }); + + // Handle disconnect events on the main thread pool + let stream_weak_clone = stream_weak.clone(); + let error_callback_clone = error_callback.clone(); + std::thread::spawn(move || { + while disconnect_rx.recv().is_ok() { // Check if stream still exists - if let Some(stream_arc) = stream_weak.upgrade() { + if let Some(stream_arc) = stream_weak_clone.upgrade() { // First, try to pause the stream to stop playback match stream_arc.lock() { Ok(mut stream_inner) => { @@ -104,33 +116,22 @@ impl DisconnectManager { } // Call the error callback to notify about device disconnection - if let Ok(mut cb) = error_callback.lock() { + if let Ok(mut cb) = error_callback_clone.lock() { cb(crate::StreamError::DeviceNotAvailable); } + } else { + // Stream is gone, exit the handler thread + break; } } - }; - - let listener = AudioObjectPropertyListener::new(device_id, property_address, callback) - .map_err(|e| { - let description = format!("Failed to create disconnect listener: {e}"); - crate::BuildStreamError::BackendSpecific { - err: crate::BackendSpecificError { description }, - } - })?; + }); - Ok(Arc::new(DisconnectManager { - _listener: listener, - })) + Ok(DisconnectManager { + _shutdown_tx: shutdown_tx, + }) } } -// SAFETY: DisconnectManager is only used to keep the AudioObjectPropertyListener alive -// and is never actually accessed across threads. The AudioObjectPropertyListener -// itself handles thread safety for the CoreAudio callback internally. -unsafe impl Send for DisconnectManager {} -unsafe impl Sync for DisconnectManager {} - struct StreamInner { playing: bool, audio_unit: AudioUnit, @@ -171,12 +172,11 @@ impl StreamInner { } } -#[derive(Clone)] pub struct Stream { inner: Arc>, // Manages the device disconnection listener separately to allow Stream to be Send. // The DisconnectManager contains the non-Send AudioObjectPropertyListener. - _disconnect_manager: Arc, + _disconnect_manager: DisconnectManager, } impl Stream { @@ -205,7 +205,7 @@ impl StreamTrait for Stream { .lock() .map_err(|_| PlayStreamError::BackendSpecific { err: BackendSpecificError { - description: "Failed to acquire stream lock".to_string(), + description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(), }, })?; @@ -218,7 +218,7 @@ impl StreamTrait for Stream { .lock() .map_err(|_| PauseStreamError::BackendSpecific { err: BackendSpecificError { - description: "Failed to acquire stream lock".to_string(), + description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(), }, })?; From e8e8a194aa92127c53cba34e16dd5aa28def3585 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sun, 28 Sep 2025 20:43:15 +0200 Subject: [PATCH 03/11] docs: note removal of Clone from Stream --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e500b0362..879c3dc57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ - CoreAudio: Configure device buffer to ensure predictable callback buffer sizes. - CoreAudio: Fix timestamp accuracy. - CoreAudio: Make `Stream` implement `Send` on macOS by refactoring internal synchronization. +- CoreAudio: Remove `Clone` impl from `Stream`. - Emscripten: Add `BufferSize::Fixed` validation against supported range. - iOS: Fix example by properly activating audio session. - iOS: Add complete AVAudioSession integration for device enumeration and buffer size control. From b13827a05adf562f70e645d649330c9c779856ad Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sun, 28 Sep 2025 21:01:33 +0200 Subject: [PATCH 04/11] feat: make CoreAudio Stream Send on iOS --- CHANGELOG.md | 2 +- src/host/coreaudio/ios/mod.rs | 24 ++++++++++++---- src/host/coreaudio/mod.rs | 53 +++++++++++++++++++++++++++++++++++ 3 files changed, 73 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 879c3dc57..f6646ab37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,7 @@ - CoreAudio: Update `mach2` to 0.5. - CoreAudio: Configure device buffer to ensure predictable callback buffer sizes. - CoreAudio: Fix timestamp accuracy. -- CoreAudio: Make `Stream` implement `Send` on macOS by refactoring internal synchronization. +- CoreAudio: Make `Stream` implement `Send`. - CoreAudio: Remove `Clone` impl from `Stream`. - Emscripten: Add `BufferSize::Fixed` validation against supported range. - iOS: Fix example by properly activating audio session. diff --git a/src/host/coreaudio/ios/mod.rs b/src/host/coreaudio/ios/mod.rs index a52914410..521cfb376 100644 --- a/src/host/coreaudio/ios/mod.rs +++ b/src/host/coreaudio/ios/mod.rs @@ -1,6 +1,6 @@ //! CoreAudio implementation for iOS using AVAudioSession and RemoteIO Audio Units. -use std::cell::RefCell; +use std::sync::Mutex; use coreaudio::audio_unit::render_callback::data; use coreaudio::audio_unit::{render_callback, AudioUnit, Element, Scope}; @@ -212,20 +212,27 @@ impl DeviceTrait for Device { } pub struct Stream { - inner: RefCell, + inner: Mutex, } impl Stream { fn new(inner: StreamInner) -> Self { Self { - inner: RefCell::new(inner), + inner: Mutex::new(inner), } } } impl StreamTrait for Stream { fn play(&self) -> Result<(), PlayStreamError> { - let mut stream = self.inner.borrow_mut(); + let mut stream = self + .inner + .lock() + .map_err(|_| PlayStreamError::BackendSpecific { + err: BackendSpecificError { + description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(), + }, + })?; if !stream.playing { if let Err(e) = stream.audio_unit.start() { @@ -239,7 +246,14 @@ impl StreamTrait for Stream { } fn pause(&self) -> Result<(), PauseStreamError> { - let mut stream = self.inner.borrow_mut(); + let mut stream = self + .inner + .lock() + .map_err(|_| PauseStreamError::BackendSpecific { + err: BackendSpecificError { + description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(), + }, + })?; if stream.playing { if let Err(e) = stream.audio_unit.stop() { diff --git a/src/host/coreaudio/mod.rs b/src/host/coreaudio/mod.rs index ab4950c82..b98040036 100644 --- a/src/host/coreaudio/mod.rs +++ b/src/host/coreaudio/mod.rs @@ -126,3 +126,56 @@ impl From for DefaultStreamConfigError { } pub(crate) type OSStatus = i32; + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + default_host, + traits::{DeviceTrait, HostTrait, StreamTrait}, + Sample, + }; + + #[test] + fn test_stream_thread_transfer() { + let host = default_host(); + let device = host.default_output_device().unwrap(); + + let mut supported_configs_range = device.supported_output_configs().unwrap(); + let supported_config = supported_configs_range + .next() + .unwrap() + .with_max_sample_rate(); + let config = supported_config.config(); + + let stream = device + .build_output_stream( + &config, + write_silence::, + move |err| println!("Error: {err}"), + None, + ) + .unwrap(); + + // Move stream to another thread and back - this should compile and work + let handle = std::thread::spawn(move || { + // Stream is now owned by this thread + stream.play().unwrap(); + std::thread::sleep(std::time::Duration::from_millis(100)); + stream.pause().unwrap(); + stream // Return stream back to main thread + }); + + let stream = handle.join().unwrap(); + // Stream is back in main thread + stream.play().unwrap(); + std::thread::sleep(std::time::Duration::from_millis(100)); + stream.pause().unwrap(); + } + + fn write_silence(data: &mut [T], _: &crate::OutputCallbackInfo) { + for sample in data.iter_mut() { + *sample = Sample::EQUILIBRIUM; + } + } +} From d7ef5a0f0a5b7b37bc39abb892130ad1ac87f571 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sun, 28 Sep 2025 21:04:51 +0200 Subject: [PATCH 05/11] refactor: remove unused import in coreaudio tests --- src/host/coreaudio/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/host/coreaudio/mod.rs b/src/host/coreaudio/mod.rs index b98040036..7390967f2 100644 --- a/src/host/coreaudio/mod.rs +++ b/src/host/coreaudio/mod.rs @@ -129,7 +129,6 @@ pub(crate) type OSStatus = i32; #[cfg(test)] mod tests { - use super::*; use crate::{ default_host, traits::{DeviceTrait, HostTrait, StreamTrait}, From 3a7bfa074c30a85fca7b16a3fc7d5bab7dfc3837 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Mon, 6 Oct 2025 22:02:13 +0200 Subject: [PATCH 06/11] refactor: improve error callback handling and disconnect listener robustness - Use try_lock to avoid panics and recover from poisoned locks when invoking error callbacks - Skip error callback if lock is busy - Enhance disconnect listener thread to report creation errors and ensure proper synchronization - Update stream lock error message for clarity --- src/host/coreaudio/macos/device.rs | 50 +++++++++++++++++++---- src/host/coreaudio/macos/mod.rs | 65 ++++++++++++++++++++---------- 2 files changed, 86 insertions(+), 29 deletions(-) diff --git a/src/host/coreaudio/macos/device.rs b/src/host/coreaudio/macos/device.rs index 501b5b083..49a949cd5 100644 --- a/src/host/coreaudio/macos/device.rs +++ b/src/host/coreaudio/macos/device.rs @@ -686,7 +686,17 @@ impl Device { let callback = match host_time_to_stream_instant(args.time_stamp.mHostTime) { Err(err) => { - (error_callback.lock().unwrap())(err.into()); + // Try to invoke error callback, recovering from poison if needed + match error_callback.try_lock() { + Ok(mut cb) => cb(err.into()), + Err(std::sync::TryLockError::Poisoned(guard)) => { + // Recover from poisoned lock to still report this error + guard.into_inner()(err.into()); + } + Err(std::sync::TryLockError::WouldBlock) => { + // Skip if callback is busy + } + } return Err(()); } Ok(cb) => cb, @@ -715,8 +725,15 @@ impl Device { } else { let error_callback_clone = error_callback_disconnect.clone(); Box::new(move |err: StreamError| { - if let Ok(mut cb) = error_callback_clone.lock() { - cb(err); + match error_callback_clone.try_lock() { + Ok(mut cb) => cb(err), + Err(std::sync::TryLockError::Poisoned(guard)) => { + // Recover from poisoned lock to still report this error + guard.into_inner()(err); + } + Err(std::sync::TryLockError::WouldBlock) => { + // Skip if callback is busy + } } }) }; @@ -736,7 +753,7 @@ impl Device { .lock() .map_err(|_| BuildStreamError::BackendSpecific { err: BackendSpecificError { - description: "Failed to acquire stream lock".to_string(), + description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(), }, })? .audio_unit @@ -791,7 +808,17 @@ impl Device { let callback = match host_time_to_stream_instant(args.time_stamp.mHostTime) { Err(err) => { - (error_callback.lock().unwrap())(err.into()); + // Try to invoke error callback, recovering from poison if needed + match error_callback.try_lock() { + Ok(mut cb) => cb(err.into()), + Err(std::sync::TryLockError::Poisoned(guard)) => { + // Recover from poisoned lock to still report this error + guard.into_inner()(err.into()); + } + Err(std::sync::TryLockError::WouldBlock) => { + // Skip if callback is busy + } + } return Err(()); } Ok(cb) => cb, @@ -820,8 +847,15 @@ impl Device { } else { let error_callback_clone = error_callback_disconnect.clone(); Box::new(move |err: StreamError| { - if let Ok(mut cb) = error_callback_clone.lock() { - cb(err); + match error_callback_clone.try_lock() { + Ok(mut cb) => cb(err), + Err(std::sync::TryLockError::Poisoned(guard)) => { + // Recover from poisoned lock to still report this error + guard.into_inner()(err); + } + Err(std::sync::TryLockError::WouldBlock) => { + // Skip if callback is busy + } } }) }; @@ -841,7 +875,7 @@ impl Device { .lock() .map_err(|_| BuildStreamError::BackendSpecific { err: BackendSpecificError { - description: "Failed to acquire stream lock".to_string(), + description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(), }, })? .audio_unit diff --git a/src/host/coreaudio/macos/mod.rs b/src/host/coreaudio/macos/mod.rs index 3d0ce6345..d63870a9d 100644 --- a/src/host/coreaudio/macos/mod.rs +++ b/src/host/coreaudio/macos/mod.rs @@ -61,6 +61,12 @@ type ErrorCallback = Box; /// Manages device disconnection listener on a dedicated thread to ensure the /// AudioObjectPropertyListener is always created and dropped on the same thread. /// This avoids potential threading issues with CoreAudio APIs. +/// +/// When a device disconnects, this manager: +/// 1. Attempts to pause the stream to stop audio I/O +/// 2. Calls the error callback with `StreamError::DeviceNotAvailable` +/// +/// The dedicated thread architecture ensures `Stream` can implement `Send`. struct DisconnectManager { _shutdown_tx: mpsc::Sender<()>, } @@ -74,6 +80,7 @@ impl DisconnectManager { ) -> Result { let (shutdown_tx, shutdown_rx) = mpsc::channel(); let (disconnect_tx, disconnect_rx) = mpsc::channel(); + let (ready_tx, ready_rx) = mpsc::channel(); // Spawn dedicated thread to own the AudioObjectPropertyListener let disconnect_tx_clone = disconnect_tx.clone(); @@ -85,16 +92,29 @@ impl DisconnectManager { }; // Create the listener on this dedicated thread - let _listener = - AudioObjectPropertyListener::new(device_id, property_address, move || { - let _ = disconnect_tx_clone.send(()); - }) - .unwrap(); - - // Drop the listener on this thread after receiving a shutdown signal - let _ = shutdown_rx.recv(); + match AudioObjectPropertyListener::new(device_id, property_address, move || { + let _ = disconnect_tx_clone.send(()); + }) { + Ok(_listener) => { + let _ = ready_tx.send(Ok(())); + // Drop the listener on this thread after receiving a shutdown signal + let _ = shutdown_rx.recv(); + } + Err(e) => { + let _ = ready_tx.send(Err(e)); + } + } }); + // Wait for listener creation to complete or fail + ready_rx + .recv() + .map_err(|_| crate::BuildStreamError::BackendSpecific { + err: BackendSpecificError { + description: "Disconnect listener thread terminated unexpectedly".to_string(), + }, + })??; + // Handle disconnect events on the main thread pool let stream_weak_clone = stream_weak.clone(); let error_callback_clone = error_callback.clone(); @@ -103,21 +123,24 @@ impl DisconnectManager { // Check if stream still exists if let Some(stream_arc) = stream_weak_clone.upgrade() { // First, try to pause the stream to stop playback - match stream_arc.lock() { - Ok(mut stream_inner) => { - let _ = stream_inner.pause(); - } - Err(_) => { - // Could not acquire lock. This can occur if there are - // overlapping locks, if the stream is already in use, or if a panic - // occurred during a previous lock. Still notify about device - // disconnection even if we can't pause. - } + if let Ok(mut stream_inner) = stream_arc.try_lock() { + let _ = stream_inner.pause(); } - // Call the error callback to notify about device disconnection - if let Ok(mut cb) = error_callback_clone.lock() { - cb(crate::StreamError::DeviceNotAvailable); + // Always try to notify about device disconnection + match error_callback_clone.try_lock() { + Ok(mut cb) => { + cb(crate::StreamError::DeviceNotAvailable); + } + Err(std::sync::TryLockError::WouldBlock) => { + // Error callback is being invoked - skip this notification + } + Err(std::sync::TryLockError::Poisoned(guard)) => { + // Error callback panicked - try to recover and still notify + // This is critical: device disconnected AND callback is broken + let mut cb = guard.into_inner(); + cb(crate::StreamError::DeviceNotAvailable); + } } } else { // Stream is gone, exit the handler thread From 5ac779cb3c86019c66981c5aed76a4b4e4aa4b0c Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Mon, 6 Oct 2025 22:42:24 +0200 Subject: [PATCH 07/11] refactor: move error callback handling into helper function Consolidate repeated error callback logic into invoke_error_callback for better readability and maintainability. Update usages in device.rs and disconnect manager. Add documentation for the helper. --- src/host/coreaudio/macos/device.rs | 63 ++++++------------------------ src/host/coreaudio/macos/mod.rs | 42 +++++++++++++------- 2 files changed, 40 insertions(+), 65 deletions(-) diff --git a/src/host/coreaudio/macos/device.rs b/src/host/coreaudio/macos/device.rs index 49a949cd5..154a4b1f1 100644 --- a/src/host/coreaudio/macos/device.rs +++ b/src/host/coreaudio/macos/device.rs @@ -40,6 +40,7 @@ use std::sync::mpsc::{channel, RecvTimeoutError}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; +use super::invoke_error_callback; use super::property_listener::AudioObjectPropertyListener; use coreaudio::audio_unit::macos_helpers::get_device_name; /// Attempt to set the device sample rate to the provided rate. @@ -669,16 +670,14 @@ impl Device { type Args = render_callback::Args; audio_unit.set_input_callback(move |args: Args| unsafe { - let ptr = (*args.data.data).mBuffers.as_ptr(); - let len = (*args.data.data).mNumberBuffers as usize; - let buffers: &[AudioBuffer] = slice::from_raw_parts(ptr, len); - - // TODO: Perhaps loop over all buffers instead? + // SAFETY: We configure the stream format as interleaved (via asbd_from_config which + // does not set kAudioFormatFlagIsNonInterleaved). Interleaved format always has + // exactly one buffer containing all channels, so mBuffers[0] is always valid. let AudioBuffer { mNumberChannels: channels, mDataByteSize: data_byte_size, mData: data, - } = buffers[0]; + } = (*args.data.data).mBuffers[0]; let data = data as *mut (); let len = data_byte_size as usize / bytes_per_channel; @@ -686,17 +685,7 @@ impl Device { let callback = match host_time_to_stream_instant(args.time_stamp.mHostTime) { Err(err) => { - // Try to invoke error callback, recovering from poison if needed - match error_callback.try_lock() { - Ok(mut cb) => cb(err.into()), - Err(std::sync::TryLockError::Poisoned(guard)) => { - // Recover from poisoned lock to still report this error - guard.into_inner()(err.into()); - } - Err(std::sync::TryLockError::WouldBlock) => { - // Skip if callback is busy - } - } + invoke_error_callback(&error_callback, err.into()); return Err(()); } Ok(cb) => cb, @@ -725,16 +714,7 @@ impl Device { } else { let error_callback_clone = error_callback_disconnect.clone(); Box::new(move |err: StreamError| { - match error_callback_clone.try_lock() { - Ok(mut cb) => cb(err), - Err(std::sync::TryLockError::Poisoned(guard)) => { - // Recover from poisoned lock to still report this error - guard.into_inner()(err); - } - Err(std::sync::TryLockError::WouldBlock) => { - // Skip if callback is busy - } - } + invoke_error_callback(&error_callback_clone, err); }) }; @@ -793,9 +773,9 @@ impl Device { type Args = render_callback::Args; audio_unit.set_render_callback(move |args: Args| unsafe { - // If `run()` is currently running, then a callback will be available from this list. - // Otherwise, we just fill the buffer with zeroes and return. - + // SAFETY: We configure the stream format as interleaved (via asbd_from_config which + // does not set kAudioFormatFlagIsNonInterleaved). Interleaved format always has + // exactly one buffer containing all channels, so mBuffers[0] is always valid. let AudioBuffer { mNumberChannels: channels, mDataByteSize: data_byte_size, @@ -808,17 +788,7 @@ impl Device { let callback = match host_time_to_stream_instant(args.time_stamp.mHostTime) { Err(err) => { - // Try to invoke error callback, recovering from poison if needed - match error_callback.try_lock() { - Ok(mut cb) => cb(err.into()), - Err(std::sync::TryLockError::Poisoned(guard)) => { - // Recover from poisoned lock to still report this error - guard.into_inner()(err.into()); - } - Err(std::sync::TryLockError::WouldBlock) => { - // Skip if callback is busy - } - } + invoke_error_callback(&error_callback, err.into()); return Err(()); } Ok(cb) => cb, @@ -847,16 +817,7 @@ impl Device { } else { let error_callback_clone = error_callback_disconnect.clone(); Box::new(move |err: StreamError| { - match error_callback_clone.try_lock() { - Ok(mut cb) => cb(err), - Err(std::sync::TryLockError::Poisoned(guard)) => { - // Recover from poisoned lock to still report this error - guard.into_inner()(err); - } - Err(std::sync::TryLockError::WouldBlock) => { - // Skip if callback is busy - } - } + invoke_error_callback(&error_callback_clone, err); }) }; diff --git a/src/host/coreaudio/macos/mod.rs b/src/host/coreaudio/macos/mod.rs index d63870a9d..eb44eb27a 100644 --- a/src/host/coreaudio/macos/mod.rs +++ b/src/host/coreaudio/macos/mod.rs @@ -58,6 +58,30 @@ impl HostTrait for Host { /// Type alias for the error callback to reduce complexity type ErrorCallback = Box; +/// Invoke error callback, recovering from poisoned mutex if needed. +/// Returns true if callback was invoked, false if skipped due to WouldBlock. +#[inline] +fn invoke_error_callback(error_callback: &Arc>, err: crate::StreamError) -> bool +where + E: FnMut(crate::StreamError) + Send, +{ + match error_callback.try_lock() { + Ok(mut cb) => { + cb(err); + true + } + Err(std::sync::TryLockError::Poisoned(guard)) => { + // Recover from poisoned lock to still report this error + guard.into_inner()(err); + true + } + Err(std::sync::TryLockError::WouldBlock) => { + // Skip if callback is busy + false + } + } +} + /// Manages device disconnection listener on a dedicated thread to ensure the /// AudioObjectPropertyListener is always created and dropped on the same thread. /// This avoids potential threading issues with CoreAudio APIs. @@ -128,20 +152,10 @@ impl DisconnectManager { } // Always try to notify about device disconnection - match error_callback_clone.try_lock() { - Ok(mut cb) => { - cb(crate::StreamError::DeviceNotAvailable); - } - Err(std::sync::TryLockError::WouldBlock) => { - // Error callback is being invoked - skip this notification - } - Err(std::sync::TryLockError::Poisoned(guard)) => { - // Error callback panicked - try to recover and still notify - // This is critical: device disconnected AND callback is broken - let mut cb = guard.into_inner(); - cb(crate::StreamError::DeviceNotAvailable); - } - } + invoke_error_callback( + &error_callback_clone, + crate::StreamError::DeviceNotAvailable, + ); } else { // Stream is gone, exit the handler thread break; From ea34010b0ec677b37caf44850fd4f462b926f617 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Fri, 10 Oct 2025 22:33:14 +0200 Subject: [PATCH 08/11] feat: add compile-time Send assertions for Stream types in all hosts Removes redundant thread transfer tests for CoreAudio streams. --- src/host/aaudio/mod.rs | 3 ++ src/host/alsa/mod.rs | 3 ++ src/host/asio/stream.rs | 3 ++ src/host/coreaudio/macos/mod.rs | 37 ----------------------- src/host/coreaudio/mod.rs | 53 ++------------------------------- src/host/emscripten/mod.rs | 3 ++ src/host/jack/stream.rs | 3 ++ src/host/mod.rs | 10 +++++++ src/host/null/mod.rs | 3 ++ src/host/wasapi/stream.rs | 3 ++ src/host/webaudio/mod.rs | 3 ++ 11 files changed, 36 insertions(+), 88 deletions(-) diff --git a/src/host/aaudio/mod.rs b/src/host/aaudio/mod.rs index 658568731..a1e48964b 100644 --- a/src/host/aaudio/mod.rs +++ b/src/host/aaudio/mod.rs @@ -49,6 +49,9 @@ pub enum Stream { // TODO: Is this still in-progress? https://github.com/rust-mobile/ndk/pull/497 unsafe impl Send for Stream {} +// Compile-time assertion that Stream is Send +crate::assert_stream_send!(Stream); + pub type SupportedInputConfigs = VecIntoIter; pub type SupportedOutputConfigs = VecIntoIter; pub type Devices = VecIntoIter; diff --git a/src/host/alsa/mod.rs b/src/host/alsa/mod.rs index f270777ef..075a26e9b 100644 --- a/src/host/alsa/mod.rs +++ b/src/host/alsa/mod.rs @@ -624,6 +624,9 @@ pub struct Stream { trigger: TriggerSender, } +// Compile-time assertion that Stream is Send +crate::assert_stream_send!(Stream); + struct StreamWorkerContext { descriptors: Box<[libc::pollfd]>, transfer_buffer: Box<[u8]>, diff --git a/src/host/asio/stream.rs b/src/host/asio/stream.rs index 5539316a1..b2f76b9ae 100644 --- a/src/host/asio/stream.rs +++ b/src/host/asio/stream.rs @@ -22,6 +22,9 @@ pub struct Stream { callback_id: sys::CallbackId, } +// Compile-time assertion that Stream is Send +crate::assert_stream_send!(Stream); + impl Stream { pub fn play(&self) -> Result<(), PlayStreamError> { self.playing.store(true, Ordering::SeqCst); diff --git a/src/host/coreaudio/macos/mod.rs b/src/host/coreaudio/macos/mod.rs index eb44eb27a..a7a025166 100644 --- a/src/host/coreaudio/macos/mod.rs +++ b/src/host/coreaudio/macos/mod.rs @@ -265,43 +265,6 @@ impl StreamTrait for Stream { #[cfg(test)] mod test { - #[test] - fn test_stream_thread_transfer() { - let host = default_host(); - let device = host.default_output_device().unwrap(); - - let mut supported_configs_range = device.supported_output_configs().unwrap(); - let supported_config = supported_configs_range - .next() - .unwrap() - .with_max_sample_rate(); - let config = supported_config.config(); - - let stream = device - .build_output_stream( - &config, - write_silence::, - move |err| println!("Error: {err}"), - None, - ) - .unwrap(); - - // Move stream to another thread and back - this should compile and work - let handle = std::thread::spawn(move || { - // Stream is now owned by this thread - stream.play().unwrap(); - std::thread::sleep(std::time::Duration::from_millis(100)); - stream.pause().unwrap(); - stream // Return stream back to main thread - }); - - let stream = handle.join().unwrap(); - // Stream is back in main thread - stream.play().unwrap(); - std::thread::sleep(std::time::Duration::from_millis(100)); - stream.pause().unwrap(); - } - use crate::{ default_host, traits::{DeviceTrait, HostTrait, StreamTrait}, diff --git a/src/host/coreaudio/mod.rs b/src/host/coreaudio/mod.rs index 7390967f2..ed276eca0 100644 --- a/src/host/coreaudio/mod.rs +++ b/src/host/coreaudio/mod.rs @@ -127,54 +127,5 @@ impl From for DefaultStreamConfigError { pub(crate) type OSStatus = i32; -#[cfg(test)] -mod tests { - use crate::{ - default_host, - traits::{DeviceTrait, HostTrait, StreamTrait}, - Sample, - }; - - #[test] - fn test_stream_thread_transfer() { - let host = default_host(); - let device = host.default_output_device().unwrap(); - - let mut supported_configs_range = device.supported_output_configs().unwrap(); - let supported_config = supported_configs_range - .next() - .unwrap() - .with_max_sample_rate(); - let config = supported_config.config(); - - let stream = device - .build_output_stream( - &config, - write_silence::, - move |err| println!("Error: {err}"), - None, - ) - .unwrap(); - - // Move stream to another thread and back - this should compile and work - let handle = std::thread::spawn(move || { - // Stream is now owned by this thread - stream.play().unwrap(); - std::thread::sleep(std::time::Duration::from_millis(100)); - stream.pause().unwrap(); - stream // Return stream back to main thread - }); - - let stream = handle.join().unwrap(); - // Stream is back in main thread - stream.play().unwrap(); - std::thread::sleep(std::time::Duration::from_millis(100)); - stream.pause().unwrap(); - } - - fn write_silence(data: &mut [T], _: &crate::OutputCallbackInfo) { - for sample in data.iter_mut() { - *sample = Sample::EQUILIBRIUM; - } - } -} +// Compile-time assertion that Stream is Send +crate::assert_stream_send!(Stream); diff --git a/src/host/emscripten/mod.rs b/src/host/emscripten/mod.rs index 7a548746e..85280514b 100644 --- a/src/host/emscripten/mod.rs +++ b/src/host/emscripten/mod.rs @@ -34,6 +34,9 @@ pub struct Stream { audio_ctxt: AudioContext, } +// Compile-time assertion that Stream is Send +crate::assert_stream_send!(Stream); + // Index within the `streams` array of the events loop. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamId(usize); diff --git a/src/host/jack/stream.rs b/src/host/jack/stream.rs index 0d48530aa..a77c3240e 100644 --- a/src/host/jack/stream.rs +++ b/src/host/jack/stream.rs @@ -21,6 +21,9 @@ pub struct Stream { output_port_names: Vec, } +// Compile-time assertion that Stream is Send +crate::assert_stream_send!(Stream); + impl Stream { // TODO: Return error messages pub fn new_input( diff --git a/src/host/mod.rs b/src/host/mod.rs index 0c61a5910..1aecf93ab 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -28,3 +28,13 @@ pub(crate) mod null; pub(crate) mod wasapi; #[cfg(all(target_arch = "wasm32", feature = "wasm-bindgen"))] pub(crate) mod webaudio; + +/// Compile-time assertion that a type implements Send. +/// Use this macro in each host module to ensure Stream is Send. +#[macro_export] +macro_rules! assert_stream_send { + ($t:ty) => { + const fn _assert_stream_send() {} + const _: () = _assert_stream_send::<$t>(); + }; +} diff --git a/src/host/null/mod.rs b/src/host/null/mod.rs index 0a9752b70..16d83fb32 100644 --- a/src/host/null/mod.rs +++ b/src/host/null/mod.rs @@ -19,6 +19,9 @@ pub struct Host; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Stream; +// Compile-time assertion that Stream is Send +crate::assert_stream_send!(Stream); + #[derive(Clone)] pub struct SupportedInputConfigs; #[derive(Clone)] diff --git a/src/host/wasapi/stream.rs b/src/host/wasapi/stream.rs index 6b2d1c207..6aa8d1f00 100644 --- a/src/host/wasapi/stream.rs +++ b/src/host/wasapi/stream.rs @@ -31,6 +31,9 @@ pub struct Stream { pending_scheduled_event: Foundation::HANDLE, } +// Compile-time assertion that Stream is Send +crate::assert_stream_send!(Stream); + struct RunContext { // Streams that have been created in this event loop. stream: StreamInner, diff --git a/src/host/webaudio/mod.rs b/src/host/webaudio/mod.rs index b4d840cae..89801a4e1 100644 --- a/src/host/webaudio/mod.rs +++ b/src/host/webaudio/mod.rs @@ -31,6 +31,9 @@ pub struct Stream { buffer_size_frames: usize, } +// Compile-time assertion that Stream is Send +crate::assert_stream_send!(Stream); + pub type SupportedInputConfigs = ::std::vec::IntoIter; pub type SupportedOutputConfigs = ::std::vec::IntoIter; From ddfac5e95a1eb8be7939fa809559ffa4eaea0497 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Fri, 10 Oct 2025 22:58:03 +0200 Subject: [PATCH 09/11] feat: impl Send for Stream on WASAPI --- src/host/wasapi/stream.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/host/wasapi/stream.rs b/src/host/wasapi/stream.rs index 6aa8d1f00..81f73d53c 100644 --- a/src/host/wasapi/stream.rs +++ b/src/host/wasapi/stream.rs @@ -31,6 +31,10 @@ pub struct Stream { pending_scheduled_event: Foundation::HANDLE, } +// Windows Event HANDLEs are safe to send between threads - they are designed for synchronization. +// See: https://learn.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-createeventa +unsafe impl Send for Stream {} + // Compile-time assertion that Stream is Send crate::assert_stream_send!(Stream); From 684f8aa6dc6b94fe643eadc2401c11a938284e3d Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Fri, 10 Oct 2025 23:00:29 +0200 Subject: [PATCH 10/11] feat: impl Send for Stream on WASM --- src/host/emscripten/mod.rs | 3 +++ src/host/webaudio/mod.rs | 3 +++ 2 files changed, 6 insertions(+) diff --git a/src/host/emscripten/mod.rs b/src/host/emscripten/mod.rs index 85280514b..dafc041af 100644 --- a/src/host/emscripten/mod.rs +++ b/src/host/emscripten/mod.rs @@ -34,6 +34,9 @@ pub struct Stream { audio_ctxt: AudioContext, } +// WASM runs in a single-threaded environment, so Send is safe by design. +unsafe impl Send for Stream {} + // Compile-time assertion that Stream is Send crate::assert_stream_send!(Stream); diff --git a/src/host/webaudio/mod.rs b/src/host/webaudio/mod.rs index 89801a4e1..ec4ed560f 100644 --- a/src/host/webaudio/mod.rs +++ b/src/host/webaudio/mod.rs @@ -31,6 +31,9 @@ pub struct Stream { buffer_size_frames: usize, } +// WASM runs in a single-threaded environment, so Send is safe by design. +unsafe impl Send for Stream {} + // Compile-time assertion that Stream is Send crate::assert_stream_send!(Stream); From 4d23d9e9b1e3ac9caed8d831c547d3424c2b7bd4 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sat, 11 Oct 2025 19:33:31 +0200 Subject: [PATCH 11/11] docs: update changelog for WASAPI and Wasm Stream Send impls --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f6646ab37..bf88d2465 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,7 +31,9 @@ - WASAPI: Expose `IMMDevice` from WASAPI host Device. - WASAPI: Add `I24` and `U24` sample format support (24-bit samples stored in 4 bytes). - WASAPI: Update `windows` to >= 0.58, <= 0.62. +- WASAPI: Make `Stream` implement `Send`. - Wasm: Removed optional `wee-alloc` feature for security reasons. +- Wasm: Make `Stream` implement `Send`. - WebAudio: Add `BufferSize::Fixed` validation against supported range. # Version 0.16.0 (2025-06-07)