Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 7 additions & 25 deletions examples/wgpu_room/src/sine_track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,7 @@ pub struct SineParameters {

impl Default for SineParameters {
fn default() -> Self {
Self {
sample_rate: 48000,
freq: 440.0,
amplitude: 1.0,
num_channels: 2,
}
Self { sample_rate: 48000, freq: 440.0, amplitude: 1.0, num_channels: 2 }
}
}

Expand All @@ -46,7 +41,7 @@ impl SineTrack {
AudioSourceOptions::default(),
params.sample_rate,
params.num_channels,
None,
1000,
),
params,
room,
Expand All @@ -65,28 +60,18 @@ impl SineTrack {
RtcAudioSource::Native(self.rtc_source.clone()),
);

let task = tokio::spawn(Self::track_task(
close_rx,
self.rtc_source.clone(),
self.params.clone(),
));
let task =
tokio::spawn(Self::track_task(close_rx, self.rtc_source.clone(), self.params.clone()));

self.room
.local_participant()
.publish_track(
LocalTrack::Audio(track.clone()),
TrackPublishOptions {
source: TrackSource::Microphone,
..Default::default()
},
TrackPublishOptions { source: TrackSource::Microphone, ..Default::default() },
)
.await?;

let handle = TrackHandle {
close_tx,
track,
task,
};
let handle = TrackHandle { close_tx, track, task };

self.handle = Some(handle);
Ok(())
Expand All @@ -96,10 +81,7 @@ impl SineTrack {
if let Some(handle) = self.handle.take() {
handle.close_tx.send(()).ok();
handle.task.await.ok();
self.room
.local_participant()
.unpublish_track(&handle.track.sid())
.await?;
self.room.local_participant().unpublish_track(&handle.track.sid()).await?;
}

Ok(())
Expand Down
12 changes: 6 additions & 6 deletions libwebrtc/src/audio_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,22 @@ pub mod native {
options: AudioSourceOptions,
sample_rate: u32,
num_channels: u32,
enable_queue: Option<bool>,
queue_size_ms: u32,
) -> NativeAudioSource {
Self {
handle: imp_as::NativeAudioSource::new(
options,
sample_rate,
num_channels,
enable_queue,
queue_size_ms,
),
}
}

pub fn clear_buffer(&self) {
self.handle.clear_buffer()
}

pub async fn capture_frame(&self, frame: &AudioFrame<'_>) -> Result<(), RtcError> {
self.handle.capture_frame(frame).await
}
Expand All @@ -94,9 +98,5 @@ pub mod native {
pub fn num_channels(&self) -> u32 {
self.handle.num_channels()
}

pub fn enable_queue(&self) -> bool {
self.handle.enable_queue()
}
}
}
159 changes: 47 additions & 112 deletions libwebrtc/src/native/audio_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,106 +12,43 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{sync::Arc, time::Duration};

use cxx::SharedPtr;
use livekit_runtime::interval;
use tokio::sync::{
mpsc::{self, error::TryRecvError},
Mutex as AsyncMutex,
};
use tokio::sync::oneshot;
use webrtc_sys::audio_track as sys_at;

use crate::{audio_frame::AudioFrame, audio_source::AudioSourceOptions, RtcError, RtcErrorType};

const BUFFER_SIZE_MS: usize = 50;

#[derive(Clone)]
pub struct NativeAudioSource {
sys_handle: SharedPtr<sys_at::ffi::AudioTrackSource>,
inner: Arc<AsyncMutex<AudioSourceInner>>,
sample_rate: u32,
num_channels: u32,
samples_10ms: usize,
// whether to queue audio frames or send them immediately
// defaults to true
enable_queue: bool,
po_tx: mpsc::Sender<Vec<i16>>,
}

struct AudioSourceInner {
buf: Box<[i16]>,

// Amount of data from the previous frame that hasn't been sent to the libwebrtc source
// (because it requires 10ms of data)
len: usize,
queue_size_samples: u32,
}

impl NativeAudioSource {
pub fn new(
options: AudioSourceOptions,
sample_rate: u32,
num_channels: u32,
enable_queue: Option<bool>,
queue_size_ms: u32,
) -> NativeAudioSource {
let samples_10ms = (sample_rate / 100 * num_channels) as usize;
let (po_tx, mut po_rx) = mpsc::channel(BUFFER_SIZE_MS / 10);

let source = Self {
sys_handle: sys_at::ffi::new_audio_track_source(options.into()),
inner: Arc::new(AsyncMutex::new(AudioSourceInner {
buf: vec![0; samples_10ms].into_boxed_slice(),
len: 0,
})),
sample_rate,
num_channels,
samples_10ms,
enable_queue: enable_queue.unwrap_or(true),
po_tx,
};

livekit_runtime::spawn({
let source = source.clone();
async move {
let mut interval = interval(Duration::from_millis(10));
interval.set_missed_tick_behavior(livekit_runtime::MissedTickBehavior::Delay);
let blank_data = vec![0; samples_10ms];
let enable_queue = source.enable_queue;

loop {
if enable_queue {
interval.tick().await;
}

let frame = po_rx.try_recv();
if let Err(TryRecvError::Disconnected) = frame {
break;
}

if let Err(TryRecvError::Empty) = frame {
if enable_queue {
source.sys_handle.on_captured_frame(
&blank_data,
sample_rate,
num_channels,
blank_data.len() / num_channels as usize,
);
}
continue;
}

let frame = frame.unwrap();
source.sys_handle.on_captured_frame(
&frame,
sample_rate,
num_channels,
frame.len() / num_channels as usize,
);
}
}
});

source
assert!(queue_size_ms % 10 == 0, "queue_size_ms must be a multiple of 10");

print!(
"new audio source {} {} {} {}",
sample_rate, num_channels, queue_size_ms, options.echo_cancellation
);

let sys_handle = sys_at::ffi::new_audio_track_source(
options.into(),
sample_rate.try_into().unwrap(),
num_channels.try_into().unwrap(),
queue_size_ms.try_into().unwrap(),
);

let queue_size_samples = (queue_size_ms * sample_rate / 1000) * num_channels;
Self { sys_handle, sample_rate, num_channels, queue_size_samples }
}

pub fn sys_handle(&self) -> SharedPtr<sys_at::ffi::AudioTrackSource> {
Expand All @@ -134,8 +71,8 @@ impl NativeAudioSource {
self.num_channels
}

pub fn enable_queue(&self) -> bool {
self.enable_queue
pub fn clear_buffer(&self) {
self.sys_handle.clear_buffer();
}

pub async fn capture_frame(&self, frame: &AudioFrame<'_>) -> Result<(), RtcError> {
Expand All @@ -146,38 +83,36 @@ impl NativeAudioSource {
});
}

let mut inner = self.inner.lock().await;
let mut samples = 0;
// split frames into 10ms chunks
loop {
let remaining_samples = frame.data.len() - samples;
if remaining_samples == 0 {
break;
}
extern "C" fn lk_audio_source_complete(userdata: *const sys_at::SourceContext) {
let tx = unsafe { Box::from_raw(userdata as *mut oneshot::Sender<()>) };
let _ = tx.send(());
}

if (inner.len != 0 && remaining_samples > 0) || remaining_samples < self.samples_10ms {
let missing_len = self.samples_10ms - inner.len;
let to_add = missing_len.min(remaining_samples);
let start = inner.len;
inner.buf[start..start + to_add]
.copy_from_slice(&frame.data[samples..samples + to_add]);
inner.len += to_add;
samples += to_add;

if inner.len == self.samples_10ms {
let data = inner.buf.clone().to_vec();
let _ = self.po_tx.send(data).await;
inner.len = 0;
// iterate over chunks of self._queue_size_samples
for chunk in frame.data.chunks(self.queue_size_samples as usize) {
let nb_frames = chunk.len() / self.num_channels as usize;
let (tx, rx) = oneshot::channel::<()>();
let ctx = Box::new(tx);
let ctx_ptr = Box::into_raw(ctx) as *const sys_at::SourceContext;

unsafe {
if !self.sys_handle.capture_frame(
chunk,
self.sample_rate,
self.num_channels,
nb_frames,
ctx_ptr,
sys_at::CompleteCallback(lk_audio_source_complete),
) {
return Err(RtcError {
error_type: RtcErrorType::InvalidState,
message: "failed to capture frame".to_owned(),
});
}
continue;
}

if remaining_samples >= self.samples_10ms {
// TODO(theomonnom): avoid copying
let data = frame.data[samples..samples + self.samples_10ms].to_vec();
let _ = self.po_tx.send(data).await;
samples += self.samples_10ms;
}
let _ = rx.await;
println!("captured frame");
}

Ok(())
Expand Down
7 changes: 6 additions & 1 deletion livekit-ffi/protocol/audio_frame.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ message NewAudioSourceRequest {
optional AudioSourceOptions options = 2;
uint32 sample_rate = 3;
uint32 num_channels = 4;
optional bool enable_queue = 5;
uint32 queue_size_ms = 5;
}
message NewAudioSourceResponse { OwnedAudioSource source = 1; }

Expand All @@ -64,6 +64,11 @@ message CaptureAudioFrameCallback {
optional string error = 2;
}

message ClearAudioBufferRequest {
uint64 source_handle = 1;
}
message ClearAudioBufferResponse {}

// Create a new AudioResampler
message NewAudioResamplerRequest {}
message NewAudioResamplerResponse {
Expand Down
Loading