From c045bf38eb964685458e0bbf8113260b8fc73f62 Mon Sep 17 00:00:00 2001 From: ruiren_microsoft Date: Thu, 5 Mar 2026 10:24:31 -0800 Subject: [PATCH 1/5] support audio streaming-csharp --- sdk_v2/cs/src/Detail/CoreInterop.cs | 188 +++++++++ sdk_v2/cs/src/Detail/ICoreInterop.cs | 23 + .../cs/src/Detail/JsonSerializationContext.cs | 3 + sdk_v2/cs/src/IModel.cs | 7 + sdk_v2/cs/src/Model.cs | 5 + sdk_v2/cs/src/ModelVariant.cs | 17 + .../OpenAI/AudioStreamTranscriptionTypes.cs | 65 +++ sdk_v2/cs/src/OpenAI/AudioStreamingClient.cs | 399 ++++++++++++++++++ .../AudioStreamingClientTests.cs | 221 ++++++++++ 9 files changed, 928 insertions(+) create mode 100644 sdk_v2/cs/src/OpenAI/AudioStreamTranscriptionTypes.cs create mode 100644 sdk_v2/cs/src/OpenAI/AudioStreamingClient.cs create mode 100644 sdk_v2/cs/test/FoundryLocal.Tests/AudioStreamingClientTests.cs diff --git a/sdk_v2/cs/src/Detail/CoreInterop.cs b/sdk_v2/cs/src/Detail/CoreInterop.cs index 8411473b..a178bdca 100644 --- a/sdk_v2/cs/src/Detail/CoreInterop.cs +++ b/sdk_v2/cs/src/Detail/CoreInterop.cs @@ -158,6 +158,28 @@ private static unsafe partial void CoreExecuteCommandWithCallback(RequestBuffer* nint callbackPtr, // NativeCallbackFn pointer nint userData); + // --- Audio streaming P/Invoke imports --- + + [LibraryImport(LibraryName, EntryPoint = "audio_stream_start")] + [UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })] + private static unsafe partial void CoreAudioStreamStart( + RequestBuffer* request, + ResponseBuffer* response, + nint callbackPtr, + nint userData); + + [LibraryImport(LibraryName, EntryPoint = "audio_stream_push")] + [UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })] + private static unsafe partial void CoreAudioStreamPush( + StreamingRequestBuffer* request, + ResponseBuffer* response); + + [LibraryImport(LibraryName, EntryPoint = "audio_stream_stop")] + [UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })] + private static unsafe partial void CoreAudioStreamStop( + RequestBuffer* request, + ResponseBuffer* response); + // helper to capture exceptions in callbacks internal class CallbackHelper { @@ -331,4 +353,170 @@ public Task ExecuteCommandWithCallbackAsync(string commandName, CoreIn return Task.Run(() => ExecuteCommandWithCallback(commandName, commandInput, callback), ct); } + // --- Audio streaming managed implementations --- + + public AudioStreamSession StartAudioStream(CoreInteropRequest request, CallbackFn transcriptionCallback) + { + try + { + var commandInputJson = request.ToJson(); + byte[] commandBytes = System.Text.Encoding.UTF8.GetBytes("audio_stream_start"); + byte[] inputBytes = System.Text.Encoding.UTF8.GetBytes(commandInputJson); + + IntPtr commandPtr = Marshal.AllocHGlobal(commandBytes.Length); + Marshal.Copy(commandBytes, 0, commandPtr, commandBytes.Length); + + IntPtr inputPtr = Marshal.AllocHGlobal(inputBytes.Length); + Marshal.Copy(inputBytes, 0, inputPtr, inputBytes.Length); + + var reqBuf = new RequestBuffer + { + Command = commandPtr, + CommandLength = commandBytes.Length, + Data = inputPtr, + DataLength = inputBytes.Length + }; + + ResponseBuffer response = default; + + var helper = new CallbackHelper(transcriptionCallback); + var funcPtr = Marshal.GetFunctionPointerForDelegate(handleCallbackDelegate); + var helperHandle = GCHandle.Alloc(helper); + var helperPtr = GCHandle.ToIntPtr(helperHandle); + + try + { + unsafe + { + CoreAudioStreamStart(&reqBuf, &response, funcPtr, helperPtr); + } + } + catch + { + // Free on failure — native core never saw the handle + helperHandle.Free(); + throw; + } + finally + { + Marshal.FreeHGlobal(commandPtr); + Marshal.FreeHGlobal(inputPtr); + } + + // Marshal response inline (matching existing ExecuteCommandImpl pattern) + Response result = new(); + if (response.Data != IntPtr.Zero && response.DataLength > 0) + { + byte[] managedResponse = new byte[response.DataLength]; + Marshal.Copy(response.Data, managedResponse, 0, response.DataLength); + result.Data = System.Text.Encoding.UTF8.GetString(managedResponse); + } + if (response.Error != IntPtr.Zero && response.ErrorLength > 0) + { + result.Error = Marshal.PtrToStringUTF8(response.Error, response.ErrorLength)!; + } + Marshal.FreeHGlobal(response.Data); + Marshal.FreeHGlobal(response.Error); + + // Return the GCHandle alongside the response — caller is responsible for + // keeping it alive during the session and freeing it in StopAudioStream. + return new AudioStreamSession(result, helperHandle); + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + throw new FoundryLocalException("Error executing audio_stream_start", ex, _logger); + } + } + + public Response PushAudioData(CoreInteropRequest request, ReadOnlyMemory audioData) + { + try + { + var commandInputJson = request.ToJson(); + byte[] commandBytes = System.Text.Encoding.UTF8.GetBytes("audio_stream_push"); + byte[] inputBytes = System.Text.Encoding.UTF8.GetBytes(commandInputJson); + + IntPtr commandPtr = Marshal.AllocHGlobal(commandBytes.Length); + Marshal.Copy(commandBytes, 0, commandPtr, commandBytes.Length); + + IntPtr inputPtr = Marshal.AllocHGlobal(inputBytes.Length); + Marshal.Copy(inputBytes, 0, inputPtr, inputBytes.Length); + + // Pin the managed audio data so GC won't move it during the native call + using var audioHandle = audioData.Pin(); + + unsafe + { + var reqBuf = new StreamingRequestBuffer + { + Command = commandPtr, + CommandLength = commandBytes.Length, + Data = inputPtr, + DataLength = inputBytes.Length, + BinaryData = (nint)audioHandle.Pointer, + BinaryDataLength = audioData.Length + }; + + ResponseBuffer response = default; + + try + { + CoreAudioStreamPush(&reqBuf, &response); + } + finally + { + Marshal.FreeHGlobal(commandPtr); + Marshal.FreeHGlobal(inputPtr); + } + + // Marshal response inline + Response result = new(); + if (response.Data != IntPtr.Zero && response.DataLength > 0) + { + byte[] managedResponse = new byte[response.DataLength]; + Marshal.Copy(response.Data, managedResponse, 0, response.DataLength); + result.Data = System.Text.Encoding.UTF8.GetString(managedResponse); + } + if (response.Error != IntPtr.Zero && response.ErrorLength > 0) + { + result.Error = Marshal.PtrToStringUTF8(response.Error, response.ErrorLength)!; + } + Marshal.FreeHGlobal(response.Data); + Marshal.FreeHGlobal(response.Error); + + return result; + } + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + throw new FoundryLocalException("Error executing audio_stream_push", ex, _logger); + } + } + + public Response StopAudioStream(CoreInteropRequest request, GCHandle callbackHandle) + { + try + { + var result = ExecuteCommand("audio_stream_stop", request); + + // Free the GCHandle that was keeping the callback delegate alive. + // After this point, the native core must not invoke the callback. + if (callbackHandle.IsAllocated) + { + callbackHandle.Free(); + } + + return result; + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + // Still free the handle on failure to avoid leaks + if (callbackHandle.IsAllocated) + { + callbackHandle.Free(); + } + throw new FoundryLocalException("Error executing audio_stream_stop", ex, _logger); + } + } + } diff --git a/sdk_v2/cs/src/Detail/ICoreInterop.cs b/sdk_v2/cs/src/Detail/ICoreInterop.cs index 1fff9dde..cd342ce5 100644 --- a/sdk_v2/cs/src/Detail/ICoreInterop.cs +++ b/sdk_v2/cs/src/Detail/ICoreInterop.cs @@ -51,4 +51,27 @@ Task ExecuteCommandAsync(string commandName, CoreInteropRequest? comma Task ExecuteCommandWithCallbackAsync(string commandName, CoreInteropRequest? commandInput, CallbackFn callback, CancellationToken? ct = null); + + // --- Audio streaming session support --- + + [StructLayout(LayoutKind.Sequential)] + protected unsafe struct StreamingRequestBuffer + { + public nint Command; + public int CommandLength; + public nint Data; // JSON params + public int DataLength; + public nint BinaryData; // raw PCM audio bytes + public int BinaryDataLength; + } + + /// + /// Returned by StartAudioStream. Holds the session handle and the GCHandle + /// that must remain alive for the callback lifetime. + /// + internal record AudioStreamSession(Response Response, GCHandle CallbackHandle); + + AudioStreamSession StartAudioStream(CoreInteropRequest request, CallbackFn transcriptionCallback); + Response PushAudioData(CoreInteropRequest request, ReadOnlyMemory audioData); + Response StopAudioStream(CoreInteropRequest request, GCHandle callbackHandle); } diff --git a/sdk_v2/cs/src/Detail/JsonSerializationContext.cs b/sdk_v2/cs/src/Detail/JsonSerializationContext.cs index 894f9454..3cc079f3 100644 --- a/sdk_v2/cs/src/Detail/JsonSerializationContext.cs +++ b/sdk_v2/cs/src/Detail/JsonSerializationContext.cs @@ -33,6 +33,9 @@ namespace Microsoft.AI.Foundry.Local.Detail; [JsonSerializable(typeof(IList))] [JsonSerializable(typeof(PropertyDefinition))] [JsonSerializable(typeof(IList))] +// --- NEW: Audio streaming types --- +[JsonSerializable(typeof(AudioStreamTranscriptionResult))] +[JsonSerializable(typeof(CoreErrorResponse))] [JsonSourceGenerationOptions(DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, WriteIndented = false)] internal partial class JsonSerializationContext : JsonSerializerContext diff --git a/sdk_v2/cs/src/IModel.cs b/sdk_v2/cs/src/IModel.cs index c3acba61..20eca014 100644 --- a/sdk_v2/cs/src/IModel.cs +++ b/sdk_v2/cs/src/IModel.cs @@ -67,4 +67,11 @@ Task DownloadAsync(Action? downloadProgress = null, /// Optional cancellation token. /// OpenAI.AudioClient Task GetAudioClientAsync(CancellationToken? ct = null); + + /// + /// Get a real-time audio streaming client for ASR. + /// + /// Optional cancellation token. + /// OpenAIAudioStreamingClient for real-time transcription. + Task GetAudioStreamingClientAsync(CancellationToken? ct = null); } diff --git a/sdk_v2/cs/src/Model.cs b/sdk_v2/cs/src/Model.cs index 83bcef69..ffe8bb1c 100644 --- a/sdk_v2/cs/src/Model.cs +++ b/sdk_v2/cs/src/Model.cs @@ -114,6 +114,11 @@ public async Task GetAudioClientAsync(CancellationToken? ct = return await SelectedVariant.GetAudioClientAsync(ct).ConfigureAwait(false); } + public async Task GetAudioStreamingClientAsync(CancellationToken? ct = null) + { + return await SelectedVariant.GetAudioStreamingClientAsync(ct).ConfigureAwait(false); + } + public async Task UnloadAsync(CancellationToken? ct = null) { await SelectedVariant.UnloadAsync(ct).ConfigureAwait(false); diff --git a/sdk_v2/cs/src/ModelVariant.cs b/sdk_v2/cs/src/ModelVariant.cs index 6ca7cda7..d5285c1c 100644 --- a/sdk_v2/cs/src/ModelVariant.cs +++ b/sdk_v2/cs/src/ModelVariant.cs @@ -190,4 +190,21 @@ private async Task GetAudioClientImplAsync(CancellationToken? return new OpenAIAudioClient(Id); } + + public async Task GetAudioStreamingClientAsync(CancellationToken? ct = null) + { + return await Utils.CallWithExceptionHandling(() => GetAudioStreamingClientImplAsync(ct), + "Error getting audio streaming client for model", _logger) + .ConfigureAwait(false); + } + + private async Task GetAudioStreamingClientImplAsync(CancellationToken? ct = null) + { + if (!await IsLoadedAsync(ct)) + { + throw new FoundryLocalException($"Model {Id} is not loaded. Call LoadAsync first."); + } + + return new OpenAIAudioStreamingClient(Id); + } } diff --git a/sdk_v2/cs/src/OpenAI/AudioStreamTranscriptionTypes.cs b/sdk_v2/cs/src/OpenAI/AudioStreamTranscriptionTypes.cs new file mode 100644 index 00000000..7736cb47 --- /dev/null +++ b/sdk_v2/cs/src/OpenAI/AudioStreamTranscriptionTypes.cs @@ -0,0 +1,65 @@ +namespace Microsoft.AI.Foundry.Local; + +using System.Text.Json; +using System.Text.Json.Serialization; +using Microsoft.AI.Foundry.Local.Detail; + +public record AudioStreamTranscriptionResult +{ + /// Whether this is a partial (interim) or final result for this segment. + [JsonPropertyName("is_final")] + public bool IsFinal { get; init; } + + /// The transcribed text. + [JsonPropertyName("text")] + public string Text { get; init; } = string.Empty; + + /// Start time offset of this segment in the audio stream (seconds). + [JsonPropertyName("start_time")] + public double? StartTime { get; init; } + + /// End time offset of this segment in the audio stream (seconds). + [JsonPropertyName("end_time")] + public double? EndTime { get; init; } + + /// Confidence score (0.0 - 1.0) if available. + [JsonPropertyName("confidence")] + public float? Confidence { get; init; } + + internal static AudioStreamTranscriptionResult FromJson(string json) + { + return JsonSerializer.Deserialize(json, + JsonSerializationContext.Default.AudioStreamTranscriptionResult) + ?? throw new FoundryLocalException("Failed to deserialize AudioStreamTranscriptionResult"); + } +} + +internal record CoreErrorResponse +{ + [JsonPropertyName("code")] + public string Code { get; init; } = ""; + + [JsonPropertyName("message")] + public string Message { get; init; } = ""; + + [JsonPropertyName("isTransient")] + public bool IsTransient { get; init; } + + /// + /// Attempt to parse a native error string as structured JSON. + /// Returns null if the error is not valid JSON or doesn't match the schema, + /// which should be treated as a permanent/unknown error. + /// + internal static CoreErrorResponse? TryParse(string errorString) + { + try + { + return JsonSerializer.Deserialize(errorString, + JsonSerializationContext.Default.CoreErrorResponse); + } + catch + { + return null; // unstructured error — treat as permanent + } + } +} \ No newline at end of file diff --git a/sdk_v2/cs/src/OpenAI/AudioStreamingClient.cs b/sdk_v2/cs/src/OpenAI/AudioStreamingClient.cs new file mode 100644 index 00000000..27e1bbea --- /dev/null +++ b/sdk_v2/cs/src/OpenAI/AudioStreamingClient.cs @@ -0,0 +1,399 @@ +// -------------------------------------------------------------------------------------------------------------------- +// +// Copyright (c) Microsoft. All rights reserved. +// +// -------------------------------------------------------------------------------------------------------------------- + +namespace Microsoft.AI.Foundry.Local; + +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using System.Globalization; +using System.Threading.Channels; +using Microsoft.AI.Foundry.Local.Detail; +using Microsoft.Extensions.Logging; + + +/// +/// Client for real-time audio streaming ASR (Automatic Speech Recognition). +/// Audio data from a microphone (or other source) is pushed in as PCM chunks, +/// and partial transcription results are returned as an async stream. +/// +/// Thread safety: PushAudioDataAsync can be called from any thread (including high-frequency +/// audio callbacks). Pushes are internally serialized via a bounded channel to prevent +/// unbounded memory growth and ensure ordering. +/// + + +public sealed class OpenAIAudioStreamingClient : IAsyncDisposable +{ + private readonly string _modelId; + private readonly ICoreInterop _coreInterop = FoundryLocalManager.Instance.CoreInterop; + private readonly ILogger _logger = FoundryLocalManager.Instance.Logger; + + // Session state — protected by _lock + private readonly AsyncLock _lock = new(); + private string? _sessionHandle; + private GCHandle _callbackHandle; + private bool _started; + private bool _stopped; + + // Output channel: native callback writes, user reads via GetTranscriptionStream + private Channel? _outputChannel; + + // Internal push queue: user writes audio chunks, background loop drains to native core. + // Bounded to prevent unbounded memory growth if native core is slower than real-time. + private Channel>? _pushChannel; + private Task? _pushLoopTask; + + // Dedicated CTS for the push loop — decoupled from StartAsync's caller token. + // Cancelled only during StopAsync/DisposeAsync to allow clean drain. + private CancellationTokenSource? _sessionCts; + + // Stored as a field so the delegate is not garbage collected while native core holds a reference. + private ICoreInterop.CallbackFn? _transcriptionCallback; + + // Snapshot of settings captured at StartAsync — prevents mutation after session starts. + private StreamingAudioSettings? _activeSettings; + + /// + /// Audio format settings for the streaming session. + /// Must be configured before calling . + /// Settings are frozen once the session starts. + /// + public record StreamingAudioSettings + { + /// PCM sample rate in Hz. Default: 16000. + public int SampleRate { get; set; } = 16000; + + /// Number of audio channels. Default: 1 (mono). + public int Channels { get; set; } = 1; + + /// Bits per sample. Default: 16. + public int BitsPerSample { get; set; } = 16; + + /// Optional BCP-47 language hint (e.g., "en", "zh"). + public string? Language { get; set; } + + /// + /// Maximum number of audio chunks buffered in the internal push queue. + /// If the queue is full, PushAudioDataAsync will asynchronously wait. + /// Default: 100 (~3 seconds of audio at typical chunk sizes). + /// + public int PushQueueCapacity { get; set; } = 100; + + internal StreamingAudioSettings Snapshot() => this with { }; // record copy + } + + public StreamingAudioSettings Settings { get; } = new(); + + internal OpenAIAudioStreamingClient(string modelId) + { + _modelId = modelId; + } + + /// + /// Start a real-time audio streaming session. + /// Must be called before or . + /// Settings are frozen after this call. + /// + /// Cancellation token. + public async Task StartAsync(CancellationToken ct = default) + { + using var disposable = await _lock.LockAsync().ConfigureAwait(false); + + if (_started) + { + throw new FoundryLocalException("Streaming session already started. Call StopAsync first."); + } + + // Freeze settings + _activeSettings = Settings.Snapshot(); + + _outputChannel = Channel.CreateUnbounded( + new UnboundedChannelOptions + { + SingleWriter = true, // only the native callback writes + SingleReader = true, + AllowSynchronousContinuations = true + }); + + _pushChannel = Channel.CreateBounded>( + new BoundedChannelOptions(_activeSettings.PushQueueCapacity) + { + SingleReader = true, // only the push loop reads + SingleWriter = false, // multiple threads may push audio data + FullMode = BoundedChannelFullMode.Wait + }); + + var request = new CoreInteropRequest + { + Params = new Dictionary + { + { "Model", _modelId }, + { "SampleRate", _activeSettings.SampleRate.ToString(CultureInfo.InvariantCulture) }, + { "Channels", _activeSettings.Channels.ToString(CultureInfo.InvariantCulture) }, + { "BitsPerSample", _activeSettings.BitsPerSample.ToString(CultureInfo.InvariantCulture) }, + } + }; + + if (_activeSettings.Language != null) + { + request.Params["Language"] = _activeSettings.Language; + } + + // Store the callback as a field so the delegate is rooted for the session lifetime. + _transcriptionCallback = (callbackData) => + { + try + { + var result = AudioStreamTranscriptionResult.FromJson(callbackData); + // TryWrite always succeeds on unbounded channels + _outputChannel.Writer.TryWrite(result); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error processing audio stream transcription callback"); + _outputChannel.Writer.TryComplete( + new FoundryLocalException("Error processing audio streaming callback.", ex, _logger)); + } + }; + + // StartAudioStream is synchronous (P/Invoke) — run on thread pool + var session = await Task.Run( + () => _coreInterop.StartAudioStream(request, _transcriptionCallback), ct) + .ConfigureAwait(false); + + if (session.Response.Error != null) + { + // Free handle on failure + if (session.CallbackHandle.IsAllocated) + { + session.CallbackHandle.Free(); + } + _outputChannel.Writer.TryComplete(); + throw new FoundryLocalException( + $"Error starting audio stream session: {session.Response.Error}", _logger); + } + + _sessionHandle = session.Response.Data + ?? throw new FoundryLocalException("Native core did not return a session handle.", _logger); + _callbackHandle = session.CallbackHandle; + _started = true; + _stopped = false; + + // Use a dedicated CTS for the push loop — NOT the caller's ct. +#pragma warning disable IDISP003 // Dispose previous before re-assigning + _sessionCts = new CancellationTokenSource(); +#pragma warning restore IDISP003 +#pragma warning disable IDISP013 // Await in using + _pushLoopTask = Task.Run(() => PushLoopAsync(_sessionCts.Token), CancellationToken.None); +#pragma warning restore IDISP013 + } + + /// + /// Push a chunk of raw PCM audio data to the streaming session. + /// Can be called from any thread (including audio device callbacks). + /// Chunks are internally queued and serialized to the native core. + /// + /// Raw PCM audio bytes matching the configured format. + /// Cancellation token. + public async ValueTask PushAudioDataAsync(ReadOnlyMemory pcmData, CancellationToken ct = default) + { + if (!_started || _stopped) + { + throw new FoundryLocalException("No active streaming session. Call StartAsync first."); + } + + // Copy the data to avoid issues if the caller reuses the buffer (e.g. NAudio reuses e.Buffer) + var copy = new byte[pcmData.Length]; + pcmData.CopyTo(copy); + + await _pushChannel!.Writer.WriteAsync(copy, ct).ConfigureAwait(false); + } + + /// + /// Internal loop that drains the push queue and sends chunks to native core one at a time. + /// Implements retry for transient native errors and terminates the session on permanent failures. + /// + private async Task PushLoopAsync(CancellationToken ct) + { + const int maxRetries = 3; + var initialRetryDelay = TimeSpan.FromMilliseconds(50); + + try + { + await foreach (var audioData in _pushChannel!.Reader.ReadAllAsync(ct).ConfigureAwait(false)) + { + var request = new CoreInteropRequest + { + Params = new Dictionary { { "SessionHandle", _sessionHandle! } } + }; + + var pushed = false; + for (int attempt = 0; attempt <= maxRetries && !pushed; attempt++) + { + var response = _coreInterop.PushAudioData(request, audioData); + + if (response.Error == null) + { + pushed = true; + continue; + } + + // Parse structured error to determine transient vs permanent + var errorInfo = CoreErrorResponse.TryParse(response.Error); + + if (errorInfo?.IsTransient == true && attempt < maxRetries) + { + var delay = initialRetryDelay * Math.Pow(2, attempt); + _logger.LogWarning( + "Transient push error (attempt {Attempt}/{Max}): {Code}. Retrying in {Delay}ms", + attempt + 1, maxRetries, errorInfo.Code, delay.TotalMilliseconds); + await Task.Delay(delay, ct).ConfigureAwait(false); + continue; + } + + // Permanent error or retries exhausted — terminate the session + var fatalEx = new FoundryLocalException( + $"Push failed permanently (code={errorInfo?.Code ?? "UNKNOWN"}): {response.Error}", + _logger); + _logger.LogError("Terminating push loop due to permanent push failure: {Error}", + response.Error); + _outputChannel?.Writer.TryComplete(fatalEx); + return; // exit push loop + } + } + } + catch (OperationCanceledException) + { + // Expected on cancellation — push loop exits cleanly + } + catch (Exception ex) + { + _logger.LogError(ex, "Push loop terminated with unexpected error"); + _outputChannel?.Writer.TryComplete( + new FoundryLocalException("Push loop terminated unexpectedly.", ex, _logger)); + } + } + + /// + /// Get the async stream of transcription results. + /// Results arrive as the native ASR engine processes audio data. + /// + /// Cancellation token. + /// Async enumerable of transcription results. + public async IAsyncEnumerable GetTranscriptionStream( + [EnumeratorCancellation] CancellationToken ct = default) + { + if (_outputChannel == null) + { + throw new FoundryLocalException("No active streaming session. Call StartAsync first."); + } + + await foreach (var item in _outputChannel.Reader.ReadAllAsync(ct).ConfigureAwait(false)) + { + yield return item; + } + } + + /// + /// Signal end-of-audio and stop the streaming session. + /// Any remaining buffered audio in the push queue will be drained to native core first. + /// Final results are delivered through before it completes. + /// + /// Cancellation token. + public async Task StopAsync(CancellationToken ct = default) + { + using var disposable = await _lock.LockAsync().ConfigureAwait(false); + + if (!_started || _stopped) + { + return; // already stopped or never started + } + + _stopped = true; + + // 1. Complete the push channel so the push loop drains remaining items and exits + _pushChannel?.Writer.TryComplete(); + + // 2. Wait for the push loop to finish draining + if (_pushLoopTask != null) + { + await _pushLoopTask.ConfigureAwait(false); + } + + // 3. Cancel the session CTS (no-op if push loop already exited) + _sessionCts?.Cancel(); + + // 4. Tell native core to flush and finalize. + // This MUST happen even if ct is cancelled — otherwise native session leaks. + var request = new CoreInteropRequest + { + Params = new Dictionary { { "SessionHandle", _sessionHandle! } } + }; + + ICoreInterop.Response? response = null; + try + { + response = await Task.Run( + () => _coreInterop.StopAudioStream(request, _callbackHandle), ct) + .ConfigureAwait(false); + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + // ct fired, but we MUST still stop the native session to avoid a leak. + _logger.LogWarning("StopAsync cancelled — performing best-effort native session stop."); + try + { + response = await Task.Run( + () => _coreInterop.StopAudioStream(request, _callbackHandle)) + .ConfigureAwait(false); + } + catch (Exception cleanupEx) + { + _logger.LogError(cleanupEx, "Best-effort native session stop failed."); + } + + throw; // Re-throw the cancellation after cleanup + } + finally + { + _sessionHandle = null; + _transcriptionCallback = null; + _started = false; + _sessionCts?.Dispose(); + _sessionCts = null; + + // 5. Complete the output channel AFTER StopAudioStream returns + _outputChannel?.Writer.TryComplete(); + } + + if (response?.Error != null) + { + throw new FoundryLocalException( + $"Error stopping audio stream session: {response.Error}", _logger); + } + } + + public async ValueTask DisposeAsync() + { + try + { + if (_started && !_stopped) + { + await StopAsync().ConfigureAwait(false); + } + } + catch (Exception ex) + { + // DisposeAsync must never throw — log and swallow + _logger.LogWarning(ex, "Error during DisposeAsync cleanup."); + } + finally + { + _sessionCts?.Dispose(); + _lock.Dispose(); + } + } +} \ No newline at end of file diff --git a/sdk_v2/cs/test/FoundryLocal.Tests/AudioStreamingClientTests.cs b/sdk_v2/cs/test/FoundryLocal.Tests/AudioStreamingClientTests.cs new file mode 100644 index 00000000..3a0e2ef7 --- /dev/null +++ b/sdk_v2/cs/test/FoundryLocal.Tests/AudioStreamingClientTests.cs @@ -0,0 +1,221 @@ +// -------------------------------------------------------------------------------------------------------------------- +// +// Copyright (c) Microsoft. All rights reserved. +// +// -------------------------------------------------------------------------------------------------------------------- + +namespace Microsoft.AI.Foundry.Local.Tests; + +using System.Threading.Tasks; +using Microsoft.AI.Foundry.Local.Detail; + +/// +/// Unit tests for audio streaming types and settings. +/// These test the serialization, deserialization, and settings behavior +/// without requiring the native library or a loaded model. +/// +internal sealed class AudioStreamingClientTests +{ + // --- AudioStreamTranscriptionResult deserialization tests --- + + [Test] + public async Task AudioStreamTranscriptionResult_FromJson_FinalResult_AllFields() + { + var json = """{"text":"hello world","is_final":true,"start_time":0.0,"end_time":1.5,"confidence":0.95}"""; + + var result = AudioStreamTranscriptionResult.FromJson(json); + + await Assert.That(result).IsNotNull(); + await Assert.That(result.Text).IsEqualTo("hello world"); + await Assert.That(result.IsFinal).IsTrue(); + await Assert.That(result.StartTime).IsEqualTo(0.0); + await Assert.That(result.EndTime).IsEqualTo(1.5); + await Assert.That(result.Confidence).IsEqualTo(0.95f); + } + + [Test] + public async Task AudioStreamTranscriptionResult_FromJson_PartialResult_OptionalFieldsNull() + { + var json = """{"text":"hel","is_final":false}"""; + + var result = AudioStreamTranscriptionResult.FromJson(json); + + await Assert.That(result).IsNotNull(); + await Assert.That(result.Text).IsEqualTo("hel"); + await Assert.That(result.IsFinal).IsFalse(); + await Assert.That(result.StartTime).IsNull(); + await Assert.That(result.EndTime).IsNull(); + await Assert.That(result.Confidence).IsNull(); + } + + [Test] + public async Task AudioStreamTranscriptionResult_FromJson_EmptyText() + { + var json = """{"text":"","is_final":false}"""; + + var result = AudioStreamTranscriptionResult.FromJson(json); + + await Assert.That(result).IsNotNull(); + await Assert.That(result.Text).IsEqualTo(string.Empty); + await Assert.That(result.IsFinal).IsFalse(); + } + + [Test] + public async Task AudioStreamTranscriptionResult_FromJson_InvalidJson_Throws() + { + FoundryLocalException? caught = null; + try + { + AudioStreamTranscriptionResult.FromJson("not valid json"); + } + catch (FoundryLocalException ex) + { + caught = ex; + } + catch (System.Text.Json.JsonException) + { + // Also acceptable — JsonSerializer may throw before our wrapper + caught = new FoundryLocalException("json parse error"); + } + + await Assert.That(caught).IsNotNull(); + } + + [Test] + public async Task AudioStreamTranscriptionResult_FromJson_EmptyJson_Throws() + { + FoundryLocalException? caught = null; + try + { + AudioStreamTranscriptionResult.FromJson(""); + } + catch (FoundryLocalException ex) + { + caught = ex; + } + catch (System.Text.Json.JsonException) + { + caught = new FoundryLocalException("json parse error"); + } + + await Assert.That(caught).IsNotNull(); + } + + // --- CoreErrorResponse parsing tests --- + + [Test] + public async Task CoreErrorResponse_TryParse_TransientError_Succeeds() + { + var json = """{"code":"ASR_BACKEND_OVERLOADED","message":"try again later","isTransient":true}"""; + + var error = CoreErrorResponse.TryParse(json); + + await Assert.That(error).IsNotNull(); + await Assert.That(error!.Code).IsEqualTo("ASR_BACKEND_OVERLOADED"); + await Assert.That(error.Message).IsEqualTo("try again later"); + await Assert.That(error.IsTransient).IsTrue(); + } + + [Test] + public async Task CoreErrorResponse_TryParse_PermanentError_Succeeds() + { + var json = """{"code":"ASR_SESSION_NOT_FOUND","message":"session gone","isTransient":false}"""; + + var error = CoreErrorResponse.TryParse(json); + + await Assert.That(error).IsNotNull(); + await Assert.That(error!.Code).IsEqualTo("ASR_SESSION_NOT_FOUND"); + await Assert.That(error.IsTransient).IsFalse(); + } + + [Test] + public async Task CoreErrorResponse_TryParse_InvalidJson_ReturnsNull() + { + var error = CoreErrorResponse.TryParse("not json at all"); + + await Assert.That(error).IsNull(); + } + + [Test] + public async Task CoreErrorResponse_TryParse_EmptyString_ReturnsNull() + { + var error = CoreErrorResponse.TryParse(""); + + await Assert.That(error).IsNull(); + } + + [Test] + public async Task CoreErrorResponse_TryParse_ValidJsonWrongShape_ReturnsDefaultValues() + { + // Valid JSON but no matching fields — should deserialize with defaults + var json = """{"unrelated":"field"}"""; + + var error = CoreErrorResponse.TryParse(json); + + await Assert.That(error).IsNotNull(); + await Assert.That(error!.Code).IsEqualTo(""); + await Assert.That(error.IsTransient).IsFalse(); + } + + // --- StreamingAudioSettings tests --- + + [Test] + public async Task StreamingAudioSettings_Defaults_AreCorrect() + { + var settings = new OpenAIAudioStreamingClient.StreamingAudioSettings(); + + await Assert.That(settings.SampleRate).IsEqualTo(16000); + await Assert.That(settings.Channels).IsEqualTo(1); + await Assert.That(settings.BitsPerSample).IsEqualTo(16); + await Assert.That(settings.Language).IsNull(); + await Assert.That(settings.PushQueueCapacity).IsEqualTo(100); + } + + [Test] + public async Task StreamingAudioSettings_Snapshot_IsIndependentCopy() + { + var settings = new OpenAIAudioStreamingClient.StreamingAudioSettings + { + SampleRate = 44100, + Channels = 2, + BitsPerSample = 32, + Language = "zh", + PushQueueCapacity = 50 + }; + + var snapshot = settings.Snapshot(); + + // Modify original after snapshot + settings.SampleRate = 8000; + settings.Channels = 1; + settings.Language = "fr"; + settings.PushQueueCapacity = 200; + + // Snapshot should retain original values + await Assert.That(snapshot.SampleRate).IsEqualTo(44100); + await Assert.That(snapshot.Channels).IsEqualTo(2); + await Assert.That(snapshot.BitsPerSample).IsEqualTo(32); + await Assert.That(snapshot.Language).IsEqualTo("zh"); + await Assert.That(snapshot.PushQueueCapacity).IsEqualTo(50); + } + + [Test] + public async Task StreamingAudioSettings_Snapshot_DoesNotAffectOriginal() + { + var settings = new OpenAIAudioStreamingClient.StreamingAudioSettings + { + SampleRate = 16000, + Language = "en" + }; + + var snapshot = settings.Snapshot(); + + // Modify snapshot + snapshot.SampleRate = 48000; + snapshot.Language = "de"; + + // Original should be unaffected + await Assert.That(settings.SampleRate).IsEqualTo(16000); + await Assert.That(settings.Language).IsEqualTo("en"); + } +} From 397093637f243e8029b0e14b4540ade1b4ae2310 Mon Sep 17 00:00:00 2001 From: ruiren_microsoft Date: Thu, 5 Mar 2026 13:49:49 -0800 Subject: [PATCH 2/5] delete dll mock test --- .../AudioStreamingClientTests.cs | 221 ------------------ 1 file changed, 221 deletions(-) delete mode 100644 sdk_v2/cs/test/FoundryLocal.Tests/AudioStreamingClientTests.cs diff --git a/sdk_v2/cs/test/FoundryLocal.Tests/AudioStreamingClientTests.cs b/sdk_v2/cs/test/FoundryLocal.Tests/AudioStreamingClientTests.cs deleted file mode 100644 index 3a0e2ef7..00000000 --- a/sdk_v2/cs/test/FoundryLocal.Tests/AudioStreamingClientTests.cs +++ /dev/null @@ -1,221 +0,0 @@ -// -------------------------------------------------------------------------------------------------------------------- -// -// Copyright (c) Microsoft. All rights reserved. -// -// -------------------------------------------------------------------------------------------------------------------- - -namespace Microsoft.AI.Foundry.Local.Tests; - -using System.Threading.Tasks; -using Microsoft.AI.Foundry.Local.Detail; - -/// -/// Unit tests for audio streaming types and settings. -/// These test the serialization, deserialization, and settings behavior -/// without requiring the native library or a loaded model. -/// -internal sealed class AudioStreamingClientTests -{ - // --- AudioStreamTranscriptionResult deserialization tests --- - - [Test] - public async Task AudioStreamTranscriptionResult_FromJson_FinalResult_AllFields() - { - var json = """{"text":"hello world","is_final":true,"start_time":0.0,"end_time":1.5,"confidence":0.95}"""; - - var result = AudioStreamTranscriptionResult.FromJson(json); - - await Assert.That(result).IsNotNull(); - await Assert.That(result.Text).IsEqualTo("hello world"); - await Assert.That(result.IsFinal).IsTrue(); - await Assert.That(result.StartTime).IsEqualTo(0.0); - await Assert.That(result.EndTime).IsEqualTo(1.5); - await Assert.That(result.Confidence).IsEqualTo(0.95f); - } - - [Test] - public async Task AudioStreamTranscriptionResult_FromJson_PartialResult_OptionalFieldsNull() - { - var json = """{"text":"hel","is_final":false}"""; - - var result = AudioStreamTranscriptionResult.FromJson(json); - - await Assert.That(result).IsNotNull(); - await Assert.That(result.Text).IsEqualTo("hel"); - await Assert.That(result.IsFinal).IsFalse(); - await Assert.That(result.StartTime).IsNull(); - await Assert.That(result.EndTime).IsNull(); - await Assert.That(result.Confidence).IsNull(); - } - - [Test] - public async Task AudioStreamTranscriptionResult_FromJson_EmptyText() - { - var json = """{"text":"","is_final":false}"""; - - var result = AudioStreamTranscriptionResult.FromJson(json); - - await Assert.That(result).IsNotNull(); - await Assert.That(result.Text).IsEqualTo(string.Empty); - await Assert.That(result.IsFinal).IsFalse(); - } - - [Test] - public async Task AudioStreamTranscriptionResult_FromJson_InvalidJson_Throws() - { - FoundryLocalException? caught = null; - try - { - AudioStreamTranscriptionResult.FromJson("not valid json"); - } - catch (FoundryLocalException ex) - { - caught = ex; - } - catch (System.Text.Json.JsonException) - { - // Also acceptable — JsonSerializer may throw before our wrapper - caught = new FoundryLocalException("json parse error"); - } - - await Assert.That(caught).IsNotNull(); - } - - [Test] - public async Task AudioStreamTranscriptionResult_FromJson_EmptyJson_Throws() - { - FoundryLocalException? caught = null; - try - { - AudioStreamTranscriptionResult.FromJson(""); - } - catch (FoundryLocalException ex) - { - caught = ex; - } - catch (System.Text.Json.JsonException) - { - caught = new FoundryLocalException("json parse error"); - } - - await Assert.That(caught).IsNotNull(); - } - - // --- CoreErrorResponse parsing tests --- - - [Test] - public async Task CoreErrorResponse_TryParse_TransientError_Succeeds() - { - var json = """{"code":"ASR_BACKEND_OVERLOADED","message":"try again later","isTransient":true}"""; - - var error = CoreErrorResponse.TryParse(json); - - await Assert.That(error).IsNotNull(); - await Assert.That(error!.Code).IsEqualTo("ASR_BACKEND_OVERLOADED"); - await Assert.That(error.Message).IsEqualTo("try again later"); - await Assert.That(error.IsTransient).IsTrue(); - } - - [Test] - public async Task CoreErrorResponse_TryParse_PermanentError_Succeeds() - { - var json = """{"code":"ASR_SESSION_NOT_FOUND","message":"session gone","isTransient":false}"""; - - var error = CoreErrorResponse.TryParse(json); - - await Assert.That(error).IsNotNull(); - await Assert.That(error!.Code).IsEqualTo("ASR_SESSION_NOT_FOUND"); - await Assert.That(error.IsTransient).IsFalse(); - } - - [Test] - public async Task CoreErrorResponse_TryParse_InvalidJson_ReturnsNull() - { - var error = CoreErrorResponse.TryParse("not json at all"); - - await Assert.That(error).IsNull(); - } - - [Test] - public async Task CoreErrorResponse_TryParse_EmptyString_ReturnsNull() - { - var error = CoreErrorResponse.TryParse(""); - - await Assert.That(error).IsNull(); - } - - [Test] - public async Task CoreErrorResponse_TryParse_ValidJsonWrongShape_ReturnsDefaultValues() - { - // Valid JSON but no matching fields — should deserialize with defaults - var json = """{"unrelated":"field"}"""; - - var error = CoreErrorResponse.TryParse(json); - - await Assert.That(error).IsNotNull(); - await Assert.That(error!.Code).IsEqualTo(""); - await Assert.That(error.IsTransient).IsFalse(); - } - - // --- StreamingAudioSettings tests --- - - [Test] - public async Task StreamingAudioSettings_Defaults_AreCorrect() - { - var settings = new OpenAIAudioStreamingClient.StreamingAudioSettings(); - - await Assert.That(settings.SampleRate).IsEqualTo(16000); - await Assert.That(settings.Channels).IsEqualTo(1); - await Assert.That(settings.BitsPerSample).IsEqualTo(16); - await Assert.That(settings.Language).IsNull(); - await Assert.That(settings.PushQueueCapacity).IsEqualTo(100); - } - - [Test] - public async Task StreamingAudioSettings_Snapshot_IsIndependentCopy() - { - var settings = new OpenAIAudioStreamingClient.StreamingAudioSettings - { - SampleRate = 44100, - Channels = 2, - BitsPerSample = 32, - Language = "zh", - PushQueueCapacity = 50 - }; - - var snapshot = settings.Snapshot(); - - // Modify original after snapshot - settings.SampleRate = 8000; - settings.Channels = 1; - settings.Language = "fr"; - settings.PushQueueCapacity = 200; - - // Snapshot should retain original values - await Assert.That(snapshot.SampleRate).IsEqualTo(44100); - await Assert.That(snapshot.Channels).IsEqualTo(2); - await Assert.That(snapshot.BitsPerSample).IsEqualTo(32); - await Assert.That(snapshot.Language).IsEqualTo("zh"); - await Assert.That(snapshot.PushQueueCapacity).IsEqualTo(50); - } - - [Test] - public async Task StreamingAudioSettings_Snapshot_DoesNotAffectOriginal() - { - var settings = new OpenAIAudioStreamingClient.StreamingAudioSettings - { - SampleRate = 16000, - Language = "en" - }; - - var snapshot = settings.Snapshot(); - - // Modify snapshot - snapshot.SampleRate = 48000; - snapshot.Language = "de"; - - // Original should be unaffected - await Assert.That(settings.SampleRate).IsEqualTo(16000); - await Assert.That(settings.Language).IsEqualTo("en"); - } -} From ef2e9e04e6be1f9e2320df0fe757fffaedc20ba9 Mon Sep 17 00:00:00 2001 From: ruiren_microsoft Date: Thu, 5 Mar 2026 15:51:16 -0800 Subject: [PATCH 3/5] update core api --- sdk_v2/cs/src/Detail/CoreInterop.cs | 119 ++----------------- sdk_v2/cs/src/Detail/ICoreInterop.cs | 10 +- sdk_v2/cs/src/OpenAI/AudioStreamingClient.cs | 44 ++----- 3 files changed, 19 insertions(+), 154 deletions(-) diff --git a/sdk_v2/cs/src/Detail/CoreInterop.cs b/sdk_v2/cs/src/Detail/CoreInterop.cs index a178bdca..7def104f 100644 --- a/sdk_v2/cs/src/Detail/CoreInterop.cs +++ b/sdk_v2/cs/src/Detail/CoreInterop.cs @@ -160,26 +160,12 @@ private static unsafe partial void CoreExecuteCommandWithCallback(RequestBuffer* // --- Audio streaming P/Invoke imports --- - [LibraryImport(LibraryName, EntryPoint = "audio_stream_start")] + [LibraryImport(LibraryName, EntryPoint = "execute_command_with_binary")] [UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })] - private static unsafe partial void CoreAudioStreamStart( - RequestBuffer* request, - ResponseBuffer* response, - nint callbackPtr, - nint userData); - - [LibraryImport(LibraryName, EntryPoint = "audio_stream_push")] - [UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })] - private static unsafe partial void CoreAudioStreamPush( + private static unsafe partial void CoreExecuteCommandWithBinary( StreamingRequestBuffer* request, ResponseBuffer* response); - [LibraryImport(LibraryName, EntryPoint = "audio_stream_stop")] - [UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })] - private static unsafe partial void CoreAudioStreamStop( - RequestBuffer* request, - ResponseBuffer* response); - // helper to capture exceptions in callbacks internal class CallbackHelper { @@ -355,77 +341,10 @@ public Task ExecuteCommandWithCallbackAsync(string commandName, CoreIn // --- Audio streaming managed implementations --- - public AudioStreamSession StartAudioStream(CoreInteropRequest request, CallbackFn transcriptionCallback) + public Response StartAudioStream(CoreInteropRequest request) { - try - { - var commandInputJson = request.ToJson(); - byte[] commandBytes = System.Text.Encoding.UTF8.GetBytes("audio_stream_start"); - byte[] inputBytes = System.Text.Encoding.UTF8.GetBytes(commandInputJson); - - IntPtr commandPtr = Marshal.AllocHGlobal(commandBytes.Length); - Marshal.Copy(commandBytes, 0, commandPtr, commandBytes.Length); - - IntPtr inputPtr = Marshal.AllocHGlobal(inputBytes.Length); - Marshal.Copy(inputBytes, 0, inputPtr, inputBytes.Length); - - var reqBuf = new RequestBuffer - { - Command = commandPtr, - CommandLength = commandBytes.Length, - Data = inputPtr, - DataLength = inputBytes.Length - }; - - ResponseBuffer response = default; - - var helper = new CallbackHelper(transcriptionCallback); - var funcPtr = Marshal.GetFunctionPointerForDelegate(handleCallbackDelegate); - var helperHandle = GCHandle.Alloc(helper); - var helperPtr = GCHandle.ToIntPtr(helperHandle); - - try - { - unsafe - { - CoreAudioStreamStart(&reqBuf, &response, funcPtr, helperPtr); - } - } - catch - { - // Free on failure — native core never saw the handle - helperHandle.Free(); - throw; - } - finally - { - Marshal.FreeHGlobal(commandPtr); - Marshal.FreeHGlobal(inputPtr); - } - - // Marshal response inline (matching existing ExecuteCommandImpl pattern) - Response result = new(); - if (response.Data != IntPtr.Zero && response.DataLength > 0) - { - byte[] managedResponse = new byte[response.DataLength]; - Marshal.Copy(response.Data, managedResponse, 0, response.DataLength); - result.Data = System.Text.Encoding.UTF8.GetString(managedResponse); - } - if (response.Error != IntPtr.Zero && response.ErrorLength > 0) - { - result.Error = Marshal.PtrToStringUTF8(response.Error, response.ErrorLength)!; - } - Marshal.FreeHGlobal(response.Data); - Marshal.FreeHGlobal(response.Error); - - // Return the GCHandle alongside the response — caller is responsible for - // keeping it alive during the session and freeing it in StopAudioStream. - return new AudioStreamSession(result, helperHandle); - } - catch (Exception ex) when (ex is not OperationCanceledException) - { - throw new FoundryLocalException("Error executing audio_stream_start", ex, _logger); - } + // Uses existing execute_command entry point with "audio_stream_start" command + return ExecuteCommand("audio_stream_start", request); } public Response PushAudioData(CoreInteropRequest request, ReadOnlyMemory audioData) @@ -461,7 +380,7 @@ public Response PushAudioData(CoreInteropRequest request, ReadOnlyMemory a try { - CoreAudioStreamPush(&reqBuf, &response); + CoreExecuteCommandWithBinary(&reqBuf, &response); } finally { @@ -493,30 +412,10 @@ public Response PushAudioData(CoreInteropRequest request, ReadOnlyMemory a } } - public Response StopAudioStream(CoreInteropRequest request, GCHandle callbackHandle) + public Response StopAudioStream(CoreInteropRequest request) { - try - { - var result = ExecuteCommand("audio_stream_stop", request); - - // Free the GCHandle that was keeping the callback delegate alive. - // After this point, the native core must not invoke the callback. - if (callbackHandle.IsAllocated) - { - callbackHandle.Free(); - } - - return result; - } - catch (Exception ex) when (ex is not OperationCanceledException) - { - // Still free the handle on failure to avoid leaks - if (callbackHandle.IsAllocated) - { - callbackHandle.Free(); - } - throw new FoundryLocalException("Error executing audio_stream_stop", ex, _logger); - } + // Uses existing execute_command entry point with "audio_stream_stop" command + return ExecuteCommand("audio_stream_stop", request); } } diff --git a/sdk_v2/cs/src/Detail/ICoreInterop.cs b/sdk_v2/cs/src/Detail/ICoreInterop.cs index cd342ce5..b493dfb7 100644 --- a/sdk_v2/cs/src/Detail/ICoreInterop.cs +++ b/sdk_v2/cs/src/Detail/ICoreInterop.cs @@ -65,13 +65,7 @@ protected unsafe struct StreamingRequestBuffer public int BinaryDataLength; } - /// - /// Returned by StartAudioStream. Holds the session handle and the GCHandle - /// that must remain alive for the callback lifetime. - /// - internal record AudioStreamSession(Response Response, GCHandle CallbackHandle); - - AudioStreamSession StartAudioStream(CoreInteropRequest request, CallbackFn transcriptionCallback); + Response StartAudioStream(CoreInteropRequest request); Response PushAudioData(CoreInteropRequest request, ReadOnlyMemory audioData); - Response StopAudioStream(CoreInteropRequest request, GCHandle callbackHandle); + Response StopAudioStream(CoreInteropRequest request); } diff --git a/sdk_v2/cs/src/OpenAI/AudioStreamingClient.cs b/sdk_v2/cs/src/OpenAI/AudioStreamingClient.cs index 27e1bbea..303362e3 100644 --- a/sdk_v2/cs/src/OpenAI/AudioStreamingClient.cs +++ b/sdk_v2/cs/src/OpenAI/AudioStreamingClient.cs @@ -34,7 +34,6 @@ public sealed class OpenAIAudioStreamingClient : IAsyncDisposable // Session state — protected by _lock private readonly AsyncLock _lock = new(); private string? _sessionHandle; - private GCHandle _callbackHandle; private bool _started; private bool _stopped; @@ -50,9 +49,6 @@ public sealed class OpenAIAudioStreamingClient : IAsyncDisposable // Cancelled only during StopAsync/DisposeAsync to allow clean drain. private CancellationTokenSource? _sessionCts; - // Stored as a field so the delegate is not garbage collected while native core holds a reference. - private ICoreInterop.CallbackFn? _transcriptionCallback; - // Snapshot of settings captured at StartAsync — prevents mutation after session starts. private StreamingAudioSettings? _activeSettings; @@ -142,43 +138,20 @@ public async Task StartAsync(CancellationToken ct = default) request.Params["Language"] = _activeSettings.Language; } - // Store the callback as a field so the delegate is rooted for the session lifetime. - _transcriptionCallback = (callbackData) => - { - try - { - var result = AudioStreamTranscriptionResult.FromJson(callbackData); - // TryWrite always succeeds on unbounded channels - _outputChannel.Writer.TryWrite(result); - } - catch (Exception ex) - { - _logger.LogError(ex, "Error processing audio stream transcription callback"); - _outputChannel.Writer.TryComplete( - new FoundryLocalException("Error processing audio streaming callback.", ex, _logger)); - } - }; - - // StartAudioStream is synchronous (P/Invoke) — run on thread pool - var session = await Task.Run( - () => _coreInterop.StartAudioStream(request, _transcriptionCallback), ct) + // StartAudioStream uses existing execute_command entry point — synchronous P/Invoke + var response = await Task.Run( + () => _coreInterop.StartAudioStream(request), ct) .ConfigureAwait(false); - if (session.Response.Error != null) + if (response.Error != null) { - // Free handle on failure - if (session.CallbackHandle.IsAllocated) - { - session.CallbackHandle.Free(); - } _outputChannel.Writer.TryComplete(); throw new FoundryLocalException( - $"Error starting audio stream session: {session.Response.Error}", _logger); + $"Error starting audio stream session: {response.Error}", _logger); } - _sessionHandle = session.Response.Data + _sessionHandle = response.Data ?? throw new FoundryLocalException("Native core did not return a session handle.", _logger); - _callbackHandle = session.CallbackHandle; _started = true; _stopped = false; @@ -337,7 +310,7 @@ public async Task StopAsync(CancellationToken ct = default) try { response = await Task.Run( - () => _coreInterop.StopAudioStream(request, _callbackHandle), ct) + () => _coreInterop.StopAudioStream(request), ct) .ConfigureAwait(false); } catch (OperationCanceledException) when (ct.IsCancellationRequested) @@ -347,7 +320,7 @@ public async Task StopAsync(CancellationToken ct = default) try { response = await Task.Run( - () => _coreInterop.StopAudioStream(request, _callbackHandle)) + () => _coreInterop.StopAudioStream(request)) .ConfigureAwait(false); } catch (Exception cleanupEx) @@ -360,7 +333,6 @@ public async Task StopAsync(CancellationToken ct = default) finally { _sessionHandle = null; - _transcriptionCallback = null; _started = false; _sessionCts?.Dispose(); _sessionCts = null; From 535b73596b567cbd90e73ee40060ddc9e5b643d8 Mon Sep 17 00:00:00 2001 From: ruiren_microsoft Date: Tue, 10 Mar 2026 18:09:38 -0700 Subject: [PATCH 4/5] update sdk --- sdk_v2/cs/src/AssemblyInfo.cs | 1 + sdk_v2/cs/src/Detail/CoreInterop.cs | 136 ++++- sdk_v2/cs/src/IModel.cs | 7 - sdk_v2/cs/src/Model.cs | 5 - sdk_v2/cs/src/ModelVariant.cs | 17 - sdk_v2/cs/src/OpenAI/AudioClient.cs | 124 +---- .../OpenAI/AudioStreamTranscriptionTypes.cs | 13 +- sdk_v2/cs/src/OpenAI/AudioStreamingClient.cs | 68 ++- .../cs/test/FoundryLocal.Tests/ModelTests.cs | 2 + sdk_v2/cs/test/FoundryLocal.Tests/Utils.cs | 500 +++--------------- 10 files changed, 249 insertions(+), 624 deletions(-) diff --git a/sdk_v2/cs/src/AssemblyInfo.cs b/sdk_v2/cs/src/AssemblyInfo.cs index 9bebe71b..987f9de6 100644 --- a/sdk_v2/cs/src/AssemblyInfo.cs +++ b/sdk_v2/cs/src/AssemblyInfo.cs @@ -7,4 +7,5 @@ using System.Runtime.CompilerServices; [assembly: InternalsVisibleTo("Microsoft.AI.Foundry.Local.Tests")] +[assembly: InternalsVisibleTo("AudioStreamTest")] [assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] // for Mock of ICoreInterop diff --git a/sdk_v2/cs/src/Detail/CoreInterop.cs b/sdk_v2/cs/src/Detail/CoreInterop.cs index 7def104f..e4c88e9b 100644 --- a/sdk_v2/cs/src/Detail/CoreInterop.cs +++ b/sdk_v2/cs/src/Detail/CoreInterop.cs @@ -160,12 +160,24 @@ private static unsafe partial void CoreExecuteCommandWithCallback(RequestBuffer* // --- Audio streaming P/Invoke imports --- - [LibraryImport(LibraryName, EntryPoint = "execute_command_with_binary")] + [LibraryImport(LibraryName, EntryPoint = "audio_stream_start")] [UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })] - private static unsafe partial void CoreExecuteCommandWithBinary( + private static unsafe partial void CoreAudioStreamStart( + RequestBuffer* request, + ResponseBuffer* response); + + [LibraryImport(LibraryName, EntryPoint = "audio_stream_push")] + [UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })] + private static unsafe partial void CoreAudioStreamPush( StreamingRequestBuffer* request, ResponseBuffer* response); + [LibraryImport(LibraryName, EntryPoint = "audio_stream_stop")] + [UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })] + private static unsafe partial void CoreAudioStreamStop( + RequestBuffer* request, + ResponseBuffer* response); + // helper to capture exceptions in callbacks internal class CallbackHelper { @@ -339,12 +351,71 @@ public Task ExecuteCommandWithCallbackAsync(string commandName, CoreIn return Task.Run(() => ExecuteCommandWithCallback(commandName, commandInput, callback), ct); } + /// + /// Marshal a ResponseBuffer from unmanaged memory into a managed Response and free the unmanaged memory. + /// + private Response MarshalResponse(ResponseBuffer response) + { + Response result = new(); + + if (response.Data != IntPtr.Zero && response.DataLength > 0) + { + byte[] managedResponse = new byte[response.DataLength]; + Marshal.Copy(response.Data, managedResponse, 0, response.DataLength); + result.Data = System.Text.Encoding.UTF8.GetString(managedResponse); + } + + if (response.Error != IntPtr.Zero && response.ErrorLength > 0) + { + result.Error = Marshal.PtrToStringUTF8(response.Error, response.ErrorLength)!; + } + + Marshal.FreeHGlobal(response.Data); + Marshal.FreeHGlobal(response.Error); + + return result; + } + // --- Audio streaming managed implementations --- public Response StartAudioStream(CoreInteropRequest request) { - // Uses existing execute_command entry point with "audio_stream_start" command - return ExecuteCommand("audio_stream_start", request); + try + { + var commandInputJson = request.ToJson(); + byte[] inputBytes = System.Text.Encoding.UTF8.GetBytes(commandInputJson); + + IntPtr inputPtr = Marshal.AllocHGlobal(inputBytes.Length); + Marshal.Copy(inputBytes, 0, inputPtr, inputBytes.Length); + + unsafe + { + var reqBuf = new RequestBuffer + { + Command = IntPtr.Zero, + CommandLength = 0, + Data = inputPtr, + DataLength = inputBytes.Length + }; + + ResponseBuffer response = default; + + try + { + CoreAudioStreamStart(&reqBuf, &response); + } + finally + { + Marshal.FreeHGlobal(inputPtr); + } + + return MarshalResponse(response); + } + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + throw new FoundryLocalException("Error executing audio_stream_start", ex, _logger); + } } public Response PushAudioData(CoreInteropRequest request, ReadOnlyMemory audioData) @@ -380,7 +451,7 @@ public Response PushAudioData(CoreInteropRequest request, ReadOnlyMemory a try { - CoreExecuteCommandWithBinary(&reqBuf, &response); + CoreAudioStreamPush(&reqBuf, &response); } finally { @@ -388,22 +459,7 @@ public Response PushAudioData(CoreInteropRequest request, ReadOnlyMemory a Marshal.FreeHGlobal(inputPtr); } - // Marshal response inline - Response result = new(); - if (response.Data != IntPtr.Zero && response.DataLength > 0) - { - byte[] managedResponse = new byte[response.DataLength]; - Marshal.Copy(response.Data, managedResponse, 0, response.DataLength); - result.Data = System.Text.Encoding.UTF8.GetString(managedResponse); - } - if (response.Error != IntPtr.Zero && response.ErrorLength > 0) - { - result.Error = Marshal.PtrToStringUTF8(response.Error, response.ErrorLength)!; - } - Marshal.FreeHGlobal(response.Data); - Marshal.FreeHGlobal(response.Error); - - return result; + return MarshalResponse(response); } } catch (Exception ex) when (ex is not OperationCanceledException) @@ -414,8 +470,42 @@ public Response PushAudioData(CoreInteropRequest request, ReadOnlyMemory a public Response StopAudioStream(CoreInteropRequest request) { - // Uses existing execute_command entry point with "audio_stream_stop" command - return ExecuteCommand("audio_stream_stop", request); + try + { + var commandInputJson = request.ToJson(); + byte[] inputBytes = System.Text.Encoding.UTF8.GetBytes(commandInputJson); + + IntPtr inputPtr = Marshal.AllocHGlobal(inputBytes.Length); + Marshal.Copy(inputBytes, 0, inputPtr, inputBytes.Length); + + unsafe + { + var reqBuf = new RequestBuffer + { + Command = IntPtr.Zero, + CommandLength = 0, + Data = inputPtr, + DataLength = inputBytes.Length + }; + + ResponseBuffer response = default; + + try + { + CoreAudioStreamStop(&reqBuf, &response); + } + finally + { + Marshal.FreeHGlobal(inputPtr); + } + + return MarshalResponse(response); + } + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + throw new FoundryLocalException("Error executing audio_stream_stop", ex, _logger); + } } } diff --git a/sdk_v2/cs/src/IModel.cs b/sdk_v2/cs/src/IModel.cs index 20eca014..c3acba61 100644 --- a/sdk_v2/cs/src/IModel.cs +++ b/sdk_v2/cs/src/IModel.cs @@ -67,11 +67,4 @@ Task DownloadAsync(Action? downloadProgress = null, /// Optional cancellation token. /// OpenAI.AudioClient Task GetAudioClientAsync(CancellationToken? ct = null); - - /// - /// Get a real-time audio streaming client for ASR. - /// - /// Optional cancellation token. - /// OpenAIAudioStreamingClient for real-time transcription. - Task GetAudioStreamingClientAsync(CancellationToken? ct = null); } diff --git a/sdk_v2/cs/src/Model.cs b/sdk_v2/cs/src/Model.cs index ffe8bb1c..83bcef69 100644 --- a/sdk_v2/cs/src/Model.cs +++ b/sdk_v2/cs/src/Model.cs @@ -114,11 +114,6 @@ public async Task GetAudioClientAsync(CancellationToken? ct = return await SelectedVariant.GetAudioClientAsync(ct).ConfigureAwait(false); } - public async Task GetAudioStreamingClientAsync(CancellationToken? ct = null) - { - return await SelectedVariant.GetAudioStreamingClientAsync(ct).ConfigureAwait(false); - } - public async Task UnloadAsync(CancellationToken? ct = null) { await SelectedVariant.UnloadAsync(ct).ConfigureAwait(false); diff --git a/sdk_v2/cs/src/ModelVariant.cs b/sdk_v2/cs/src/ModelVariant.cs index d5285c1c..6ca7cda7 100644 --- a/sdk_v2/cs/src/ModelVariant.cs +++ b/sdk_v2/cs/src/ModelVariant.cs @@ -190,21 +190,4 @@ private async Task GetAudioClientImplAsync(CancellationToken? return new OpenAIAudioClient(Id); } - - public async Task GetAudioStreamingClientAsync(CancellationToken? ct = null) - { - return await Utils.CallWithExceptionHandling(() => GetAudioStreamingClientImplAsync(ct), - "Error getting audio streaming client for model", _logger) - .ConfigureAwait(false); - } - - private async Task GetAudioStreamingClientImplAsync(CancellationToken? ct = null) - { - if (!await IsLoadedAsync(ct)) - { - throw new FoundryLocalException($"Model {Id} is not loaded. Call LoadAsync first."); - } - - return new OpenAIAudioStreamingClient(Id); - } } diff --git a/sdk_v2/cs/src/OpenAI/AudioClient.cs b/sdk_v2/cs/src/OpenAI/AudioClient.cs index 5475185c..1f44996b 100644 --- a/sdk_v2/cs/src/OpenAI/AudioClient.cs +++ b/sdk_v2/cs/src/OpenAI/AudioClient.cs @@ -6,9 +6,6 @@ namespace Microsoft.AI.Foundry.Local; -using System.Runtime.CompilerServices; -using System.Threading.Channels; - using Betalgo.Ranul.OpenAI.ObjectModels.RequestModels; using Betalgo.Ranul.OpenAI.ObjectModels.ResponseModels; @@ -46,6 +43,16 @@ public record AudioSettings /// public AudioSettings Settings { get; } = new(); + /// + /// Create a real-time streaming transcription session. + /// Audio data is pushed in as PCM chunks and transcription results are returned as an async stream. + /// + /// A streaming session that must be disposed when done. + public AudioTranscriptionStreamSession CreateStreamingSession() + { + return new AudioTranscriptionStreamSession(_modelId); + } + /// /// Transcribe audio from a file. /// @@ -63,28 +70,6 @@ public async Task TranscribeAudioAsync(string .ConfigureAwait(false); } - /// - /// Transcribe audio from a file with streamed output. - /// - /// - /// Path to file containing audio recording. - /// Supported formats: mp3 - /// - /// Cancellation token. - /// An asynchronous enumerable of transcription responses. - public async IAsyncEnumerable TranscribeAudioStreamingAsync( - string audioFilePath, [EnumeratorCancellation] CancellationToken ct) - { - var enumerable = Utils.CallWithExceptionHandling( - () => TranscribeAudioStreamingImplAsync(audioFilePath, ct), - "Error during streaming audio transcription.", _logger).ConfigureAwait(false); - - await foreach (var item in enumerable) - { - yield return item; - } - } - private async Task TranscribeAudioImplAsync(string audioFilePath, CancellationToken? ct) { @@ -107,93 +92,4 @@ private async Task TranscribeAudioImplAsync(st return output; } - - private async IAsyncEnumerable TranscribeAudioStreamingImplAsync( - string audioFilePath, [EnumeratorCancellation] CancellationToken ct) - { - var openaiRequest = AudioTranscriptionCreateRequestExtended.FromUserInput(_modelId, audioFilePath, Settings); - - var request = new CoreInteropRequest - { - Params = new Dictionary - { - { "OpenAICreateRequest", openaiRequest.ToJson() }, - } - }; - - var channel = Channel.CreateUnbounded( - new UnboundedChannelOptions - { - SingleWriter = true, - SingleReader = true, - AllowSynchronousContinuations = true - }); - - // The callback will push ChatResponse objects into the channel. - // The channel reader will return the values to the user. - // This setup prevents the user from blocking the thread generating the responses. - _ = Task.Run(async () => - { - try - { - var failed = false; - - var res = await _coreInterop.ExecuteCommandWithCallbackAsync( - "audio_transcribe", - request, - async (callbackData) => - { - try - { - if (!failed) - { - var audioCompletion = callbackData.ToAudioTranscription(_logger); - await channel.Writer.WriteAsync(audioCompletion); - } - } - catch (Exception ex) - { - // propagate exception to reader - channel.Writer.TryComplete( - new FoundryLocalException( - "Error processing streaming audio transcription callback data.", ex, _logger)); - failed = true; - } - }, - ct - ).ConfigureAwait(false); - - // If the native layer returned an error (e.g. missing audio file, invalid model) - // without invoking any callbacks, propagate it so the caller sees an exception - // instead of an empty stream. - if (res.Error != null) - { - channel.Writer.TryComplete( - new FoundryLocalException($"Error from audio_transcribe command: {res.Error}", _logger)); - return; - } - - // use TryComplete as an exception in the callback may have already closed the channel - _ = channel.Writer.TryComplete(); - } - // Ignore cancellation exceptions so we don't convert them into errors - catch (Exception ex) when (ex is not OperationCanceledException) - { - channel.Writer.TryComplete( - new FoundryLocalException("Error executing streaming chat completion.", ex, _logger)); - } - catch (OperationCanceledException) - { - // Complete the channel on cancellation but don't turn it into an error - channel.Writer.TryComplete(); - } - }, ct); - - // Start reading from the channel as items arrive. - // This will continue until ExecuteCommandWithCallbackAsync completes and closes the channel. - await foreach (var item in channel.Reader.ReadAllAsync(ct)) - { - yield return item; - } - } } diff --git a/sdk_v2/cs/src/OpenAI/AudioStreamTranscriptionTypes.cs b/sdk_v2/cs/src/OpenAI/AudioStreamTranscriptionTypes.cs index 7736cb47..02c4169e 100644 --- a/sdk_v2/cs/src/OpenAI/AudioStreamTranscriptionTypes.cs +++ b/sdk_v2/cs/src/OpenAI/AudioStreamTranscriptionTypes.cs @@ -6,11 +6,20 @@ namespace Microsoft.AI.Foundry.Local; public record AudioStreamTranscriptionResult { - /// Whether this is a partial (interim) or final result for this segment. + /// + /// Whether this is a final or partial (interim) result. + /// - Nemotron models always return true (every result is final). + /// - Other models (e.g., Azure Embedded) may return false for interim + /// hypotheses that will be replaced by a subsequent final result. + /// [JsonPropertyName("is_final")] public bool IsFinal { get; init; } - /// The transcribed text. + /// + /// Newly transcribed text from this audio chunk only (incremental hypothesis). + /// This is NOT the full accumulated transcript — each result contains only + /// the text decoded from the most recent audio chunk. + /// [JsonPropertyName("text")] public string Text { get; init; } = string.Empty; diff --git a/sdk_v2/cs/src/OpenAI/AudioStreamingClient.cs b/sdk_v2/cs/src/OpenAI/AudioStreamingClient.cs index 303362e3..f0a1904d 100644 --- a/sdk_v2/cs/src/OpenAI/AudioStreamingClient.cs +++ b/sdk_v2/cs/src/OpenAI/AudioStreamingClient.cs @@ -15,17 +15,19 @@ namespace Microsoft.AI.Foundry.Local; /// -/// Client for real-time audio streaming ASR (Automatic Speech Recognition). +/// Session for real-time audio streaming ASR (Automatic Speech Recognition). /// Audio data from a microphone (or other source) is pushed in as PCM chunks, -/// and partial transcription results are returned as an async stream. +/// and transcription results are returned as an async stream. /// -/// Thread safety: PushAudioDataAsync can be called from any thread (including high-frequency +/// Created via . +/// +/// Thread safety: PushAudioAsync can be called from any thread (including high-frequency /// audio callbacks). Pushes are internally serialized via a bounded channel to prevent /// unbounded memory growth and ensure ordering. /// -public sealed class OpenAIAudioStreamingClient : IAsyncDisposable +public sealed class AudioTranscriptionStreamSession : IAsyncDisposable { private readonly string _modelId; private readonly ICoreInterop _coreInterop = FoundryLocalManager.Instance.CoreInterop; @@ -50,14 +52,14 @@ public sealed class OpenAIAudioStreamingClient : IAsyncDisposable private CancellationTokenSource? _sessionCts; // Snapshot of settings captured at StartAsync — prevents mutation after session starts. - private StreamingAudioSettings? _activeSettings; + private AudioStreamTranscriptionOptions? _activeSettings; /// /// Audio format settings for the streaming session. /// Must be configured before calling . /// Settings are frozen once the session starts. /// - public record StreamingAudioSettings + public record AudioStreamTranscriptionOptions { /// PCM sample rate in Hz. Default: 16000. public int SampleRate { get; set; } = 16000; @@ -65,32 +67,29 @@ public record StreamingAudioSettings /// Number of audio channels. Default: 1 (mono). public int Channels { get; set; } = 1; - /// Bits per sample. Default: 16. - public int BitsPerSample { get; set; } = 16; - /// Optional BCP-47 language hint (e.g., "en", "zh"). public string? Language { get; set; } /// /// Maximum number of audio chunks buffered in the internal push queue. - /// If the queue is full, PushAudioDataAsync will asynchronously wait. + /// If the queue is full, AppendAsync will asynchronously wait. /// Default: 100 (~3 seconds of audio at typical chunk sizes). /// public int PushQueueCapacity { get; set; } = 100; - internal StreamingAudioSettings Snapshot() => this with { }; // record copy + internal AudioStreamTranscriptionOptions Snapshot() => this with { }; // record copy } - public StreamingAudioSettings Settings { get; } = new(); + public AudioStreamTranscriptionOptions Settings { get; } = new(); - internal OpenAIAudioStreamingClient(string modelId) + internal AudioTranscriptionStreamSession(string modelId) { _modelId = modelId; } /// /// Start a real-time audio streaming session. - /// Must be called before or . + /// Must be called before or . /// Settings are frozen after this call. /// /// Cancellation token. @@ -129,7 +128,6 @@ public async Task StartAsync(CancellationToken ct = default) { "Model", _modelId }, { "SampleRate", _activeSettings.SampleRate.ToString(CultureInfo.InvariantCulture) }, { "Channels", _activeSettings.Channels.ToString(CultureInfo.InvariantCulture) }, - { "BitsPerSample", _activeSettings.BitsPerSample.ToString(CultureInfo.InvariantCulture) }, } }; @@ -171,7 +169,7 @@ public async Task StartAsync(CancellationToken ct = default) /// /// Raw PCM audio bytes matching the configured format. /// Cancellation token. - public async ValueTask PushAudioDataAsync(ReadOnlyMemory pcmData, CancellationToken ct = default) + public async ValueTask AppendAsync(ReadOnlyMemory pcmData, CancellationToken ct = default) { if (!_started || _stopped) { @@ -211,6 +209,25 @@ private async Task PushLoopAsync(CancellationToken ct) if (response.Error == null) { pushed = true; + + // Parse transcription result from push response and surface it + if (!string.IsNullOrEmpty(response.Data)) + { + try + { + var transcription = AudioStreamTranscriptionResult.FromJson(response.Data); + if (!string.IsNullOrEmpty(transcription.Text)) + { + _outputChannel?.Writer.TryWrite(transcription); + } + } + catch (Exception parseEx) + { + // Non-fatal: log and continue if response isn't a transcription result + _logger.LogDebug(parseEx, "Could not parse push response as transcription result"); + } + } + continue; } @@ -332,12 +349,29 @@ public async Task StopAsync(CancellationToken ct = default) } finally { + // Parse final transcription from stop response before completing the channel + if (response?.Data != null) + { + try + { + var finalResult = AudioStreamTranscriptionResult.FromJson(response.Data); + if (!string.IsNullOrEmpty(finalResult.Text)) + { + _outputChannel?.Writer.TryWrite(finalResult); + } + } + catch (Exception parseEx) + { + _logger.LogDebug(parseEx, "Could not parse stop response as transcription result"); + } + } + _sessionHandle = null; _started = false; _sessionCts?.Dispose(); _sessionCts = null; - // 5. Complete the output channel AFTER StopAudioStream returns + // Complete the output channel AFTER writing final result _outputChannel?.Writer.TryComplete(); } diff --git a/sdk_v2/cs/test/FoundryLocal.Tests/ModelTests.cs b/sdk_v2/cs/test/FoundryLocal.Tests/ModelTests.cs index b5a49657..0e2ea1dc 100644 --- a/sdk_v2/cs/test/FoundryLocal.Tests/ModelTests.cs +++ b/sdk_v2/cs/test/FoundryLocal.Tests/ModelTests.cs @@ -52,3 +52,5 @@ public async Task GetLastestVersion_Works() await Assert.That(latestB).IsEqualTo(variants[1]); } } + + diff --git a/sdk_v2/cs/test/FoundryLocal.Tests/Utils.cs b/sdk_v2/cs/test/FoundryLocal.Tests/Utils.cs index 55808da9..6da59baf 100644 --- a/sdk_v2/cs/test/FoundryLocal.Tests/Utils.cs +++ b/sdk_v2/cs/test/FoundryLocal.Tests/Utils.cs @@ -1,452 +1,74 @@ -// -------------------------------------------------------------------------------------------------------------------- -// -// Copyright (c) Microsoft. All rights reserved. -// -// -------------------------------------------------------------------------------------------------------------------- - -namespace Microsoft.AI.Foundry.Local.Tests; - -using System; -using System.Collections.Generic; -using System.Runtime.CompilerServices; -using System.Text.Json; - -using Microsoft.AI.Foundry.Local.Detail; -using Microsoft.Extensions.Configuration; +using Microsoft.AI.Foundry.Local; using Microsoft.Extensions.Logging; -using Microsoft.VisualStudio.TestPlatform.TestHost; +var loggerFactory = LoggerFactory.Create(b => b.AddConsole().SetMinimumLevel(LogLevel.Debug)); +var logger = loggerFactory.CreateLogger("AudioStreamTest"); -using Moq; +// Point to the directory containing Core + ORT DLLs +var corePath = @"C:\Users\ruiren\Desktop\audio-stream-test\Microsoft.AI.Foundry.Local.Core.dll"; -internal static class Utils +var config = new Configuration { - internal struct TestCatalogInfo - { - internal readonly List TestCatalog { get; } - internal readonly string ModelListJson { get; } - - internal TestCatalogInfo(bool includeCuda) - { - - TestCatalog = Utils.BuildTestCatalog(includeCuda); - ModelListJson = JsonSerializer.Serialize(TestCatalog, JsonSerializationContext.Default.ListModelInfo); - } - } - - internal static readonly TestCatalogInfo TestCatalog = new(true); - - [Before(Assembly)] - public static void AssemblyInit(AssemblyHookContext _) - { - using var loggerFactory = LoggerFactory.Create(builder => - { - builder - .AddConsole() - .SetMinimumLevel(LogLevel.Debug); - }); - - ILogger logger = loggerFactory.CreateLogger(); - - // Read configuration from appsettings.Test.json - logger.LogDebug("Reading configuration from appsettings.Test.json"); - var configuration = new ConfigurationBuilder() - .SetBasePath(Directory.GetCurrentDirectory()) - .AddJsonFile("appsettings.Test.json", optional: true, reloadOnChange: false) - .Build(); - - var testModelCacheDirName = "test-data-shared"; - string testDataSharedPath; - if (Path.IsPathRooted(testModelCacheDirName) || - testModelCacheDirName.Contains(Path.DirectorySeparatorChar) || - testModelCacheDirName.Contains(Path.AltDirectorySeparatorChar)) - { - // It's a relative or complete filepath, resolve from current directory - testDataSharedPath = Path.GetFullPath(testModelCacheDirName); - } - else - { - // It's just a directory name, combine with repo root parent - testDataSharedPath = Path.GetFullPath(Path.Combine(GetRepoRoot(), "..", testModelCacheDirName)); - } - - logger.LogInformation("Using test model cache directory: {testDataSharedPath}", testDataSharedPath); - - if (!Directory.Exists(testDataSharedPath)) - { - throw new DirectoryNotFoundException($"Test model cache directory does not exist: {testDataSharedPath}"); - - } - - var config = new Configuration - { - AppName = "FoundryLocalSdkTest", - LogLevel = Local.LogLevel.Debug, - Web = new Configuration.WebService - { - Urls = "http://127.0.0.1:0" - }, - ModelCacheDir = testDataSharedPath, - LogsDir = Path.Combine(GetRepoRoot(), "sdk_v2", "cs", "logs") - }; - - // Initialize the singleton instance. - FoundryLocalManager.CreateAsync(config, logger).GetAwaiter().GetResult(); - - // standalone instance for testing individual components that skips the 'initialize' command - CoreInterop = new CoreInterop(logger); - } - - internal static ICoreInterop CoreInterop { get; private set; } = default!; - - internal static Mock CreateCapturingLoggerMock(List sink) - { - var mock = new Mock(); - mock.Setup(x => x.Log( - It.IsAny(), - It.IsAny(), - It.IsAny(), - It.IsAny(), - (Func)It.IsAny())) - .Callback((LogLevel level, EventId id, object state, Exception? ex, Delegate formatter) => - { - var message = formatter.DynamicInvoke(state, ex) as string; - sink.Add($"{level}: {message}"); - }); - - return mock; - } - - internal sealed record InteropCommandInterceptInfo - { - public string CommandName { get; init; } = default!; - public string? CommandInput { get; init; } - public string ResponseData { get; init; } = default!; - public string? ResponseError { get; init; } - } - - internal static Mock CreateCoreInteropWithIntercept(ICoreInterop coreInterop, - List intercepts) - { - var mock = new Mock(); - var interceptNames = new HashSet(StringComparer.InvariantCulture); - - foreach (var intercept in intercepts) - { - if (!interceptNames.Add(intercept.CommandName)) - { - throw new ArgumentException($"Duplicate intercept for command {intercept.CommandName}"); - } - - mock.Setup(x => x.ExecuteCommand(It.Is(s => s == intercept.CommandName), It.IsAny())) - .Returns(new ICoreInterop.Response - { - Data = intercept.ResponseData, - Error = intercept.ResponseError - }); - - mock.Setup(x => x.ExecuteCommandAsync(It.Is(s => s == intercept.CommandName), - It.IsAny(), - It.IsAny())) - .ReturnsAsync(new ICoreInterop.Response - { - Data = intercept.ResponseData, - Error = intercept.ResponseError - }); - } - - mock.Setup(x => x.ExecuteCommand(It.Is(s => !interceptNames.Contains(s)), - It.IsAny())) - .Returns((string commandName, CoreInteropRequest? commandInput) => - coreInterop.ExecuteCommand(commandName, commandInput)); - - mock.Setup(x => x.ExecuteCommandAsync(It.Is(s => !interceptNames.Contains(s)), - It.IsAny(), - It.IsAny())) - .Returns((string commandName, CoreInteropRequest? commandInput, CancellationToken? ct) => - coreInterop.ExecuteCommandAsync(commandName, commandInput, ct)); - - return mock; - } - - internal static bool IsRunningInCI() + AppName = "AudioStreamTest", + LogLevel = Microsoft.AI.Foundry.Local.LogLevel.Debug, + AdditionalSettings = new Dictionary { - var azureDevOps = Environment.GetEnvironmentVariable("TF_BUILD"); - var githubActions = Environment.GetEnvironmentVariable("GITHUB_ACTIONS"); - var isCI = string.Equals(azureDevOps, "True", StringComparison.OrdinalIgnoreCase) || - string.Equals(githubActions, "true", StringComparison.OrdinalIgnoreCase); - - return isCI; + { "FoundryLocalCorePath", corePath } } +}; - private static List BuildTestCatalog(bool includeCuda = true) - { - // Mirrors MOCK_CATALOG_DATA ordering and fields (Python tests) - var common = new - { - ProviderType = "AzureFoundry", - Version = 1, - ModelType = "ONNX", - PromptTemplate = (PromptTemplate?)null, - Publisher = "Microsoft", - Task = "chat-completion", - FileSizeMb = 10403, - ModelSettings = new ModelSettings { Parameters = [] }, - SupportsToolCalling = false, - License = "MIT", - LicenseDescription = "License…", - MaxOutputTokens = 1024L, - MinFLVersion = "1.0.0", - }; - - var list = new List - { - // model-1 generic-gpu, generic-cpu:2, generic-cpu:1 - new() - { - Id = "model-1-generic-gpu:1", - Name = "model-1-generic-gpu", - DisplayName = "model-1-generic-gpu", - Uri = "azureml://registries/azureml/models/model-1-generic-gpu/versions/1", - Runtime = new Runtime { DeviceType = DeviceType.GPU, ExecutionProvider = "WebGpuExecutionProvider" }, - Alias = "model-1", - // ParentModelUri = "azureml://registries/azureml/models/model-1/versions/1", - ProviderType = common.ProviderType, Version = common.Version, ModelType = common.ModelType, - PromptTemplate = common.PromptTemplate, Publisher = common.Publisher, Task = common.Task, - FileSizeMb = common.FileSizeMb, ModelSettings = common.ModelSettings, - SupportsToolCalling = common.SupportsToolCalling, License = common.License, - LicenseDescription = common.LicenseDescription, MaxOutputTokens = common.MaxOutputTokens, - MinFLVersion = common.MinFLVersion - }, - new() - { - Id = "model-1-generic-cpu:2", - Name = "model-1-generic-cpu", - DisplayName = "model-1-generic-cpu", - Uri = "azureml://registries/azureml/models/model-1-generic-cpu/versions/2", - Runtime = new Runtime { DeviceType = DeviceType.CPU, ExecutionProvider = "CPUExecutionProvider" }, - Alias = "model-1", - // ParentModelUri = "azureml://registries/azureml/models/model-1/versions/2", - ProviderType = common.ProviderType, - Version = common.Version, ModelType = common.ModelType, - PromptTemplate = common.PromptTemplate, - Publisher = common.Publisher, Task = common.Task, - FileSizeMb = common.FileSizeMb - 10, // smaller so default chosen in test that sorts on this - ModelSettings = common.ModelSettings, - SupportsToolCalling = common.SupportsToolCalling, - License = common.License, - LicenseDescription = common.LicenseDescription, - MaxOutputTokens = common.MaxOutputTokens, - MinFLVersion = common.MinFLVersion - }, - new() - { - Id = "model-1-generic-cpu:1", - Name = "model-1-generic-cpu", - DisplayName = "model-1-generic-cpu", - Uri = "azureml://registries/azureml/models/model-1-generic-cpu/versions/1", - Runtime = new Runtime { DeviceType = DeviceType.CPU, ExecutionProvider = "CPUExecutionProvider" }, - Alias = "model-1", - //ParentModelUri = "azureml://registries/azureml/models/model-1/versions/1", - ProviderType = common.ProviderType, - Version = common.Version, - ModelType = common.ModelType, - PromptTemplate = common.PromptTemplate, - Publisher = common.Publisher, Task = common.Task, - FileSizeMb = common.FileSizeMb, - ModelSettings = common.ModelSettings, - SupportsToolCalling = common.SupportsToolCalling, - License = common.License, - LicenseDescription = common.LicenseDescription, - MaxOutputTokens = common.MaxOutputTokens, - MinFLVersion = common.MinFLVersion - }, +Console.WriteLine("=== Initializing FoundryLocalManager ==="); +await FoundryLocalManager.CreateAsync(config, logger); +var manager = FoundryLocalManager.Instance; - // model-2 npu:2, npu:1, generic-cpu:1 - new() - { - Id = "model-2-npu:2", - Name = "model-2-npu", - DisplayName = "model-2-npu", - Uri = "azureml://registries/azureml/models/model-2-npu/versions/2", - Runtime = new Runtime { DeviceType = DeviceType.NPU, ExecutionProvider = "QNNExecutionProvider" }, - Alias = "model-2", - //ParentModelUri = "azureml://registries/azureml/models/model-2/versions/2", - ProviderType = common.ProviderType, - Version = common.Version, ModelType = common.ModelType, - PromptTemplate = common.PromptTemplate, - Publisher = common.Publisher, Task = common.Task, - FileSizeMb = common.FileSizeMb, - ModelSettings = common.ModelSettings, - SupportsToolCalling = common.SupportsToolCalling, - License = common.License, - LicenseDescription = common.LicenseDescription, - MaxOutputTokens = common.MaxOutputTokens, - MinFLVersion = common.MinFLVersion - }, - new() - { - Id = "model-2-npu:1", - Name = "model-2-npu", - DisplayName = "model-2-npu", - Uri = "azureml://registries/azureml/models/model-2-npu/versions/1", - Runtime = new Runtime { DeviceType = DeviceType.NPU, ExecutionProvider = "QNNExecutionProvider" }, - Alias = "model-2", - //ParentModelUri = "azureml://registries/azureml/models/model-2/versions/1", - ProviderType = common.ProviderType, - Version = common.Version, ModelType = common.ModelType, - PromptTemplate = common.PromptTemplate, - Publisher = common.Publisher, Task = common.Task, - FileSizeMb = common.FileSizeMb, - ModelSettings = common.ModelSettings, - SupportsToolCalling = common.SupportsToolCalling, - License = common.License, - LicenseDescription = common.LicenseDescription, - MaxOutputTokens = common.MaxOutputTokens, - MinFLVersion = common.MinFLVersion - }, - new() - { - Id = "model-2-generic-cpu:1", - Name = "model-2-generic-cpu", - DisplayName = "model-2-generic-cpu", - Uri = "azureml://registries/azureml/models/model-2-generic-cpu/versions/1", - Runtime = new Runtime { DeviceType = DeviceType.CPU, ExecutionProvider = "CPUExecutionProvider" }, - Alias = "model-2", - //ParentModelUri = "azureml://registries/azureml/models/model-2/versions/1", - ProviderType = common.ProviderType, - Version = common.Version, ModelType = common.ModelType, - PromptTemplate = common.PromptTemplate, - Publisher = common.Publisher, Task = common.Task, - FileSizeMb = common.FileSizeMb, - ModelSettings = common.ModelSettings, - SupportsToolCalling = common.SupportsToolCalling, - License = common.License, - LicenseDescription = common.LicenseDescription, - MaxOutputTokens = common.MaxOutputTokens, - MinFLVersion = common.MinFLVersion - }, - }; +Console.WriteLine("=== Getting Catalog ==="); +var catalog = await manager.GetCatalogAsync(); +var models = await catalog.ListModelsAsync(); +Console.WriteLine($"Found {models.Count} models"); - // model-3 cuda-gpu (optional), generic-gpu, generic-cpu - if (includeCuda) - { - list.Add(new ModelInfo - { - Id = "model-3-cuda-gpu:1", - Name = "model-3-cuda-gpu", - DisplayName = "model-3-cuda-gpu", - Uri = "azureml://registries/azureml/models/model-3-cuda-gpu/versions/1", - Runtime = new Runtime { DeviceType = DeviceType.GPU, ExecutionProvider = "CUDAExecutionProvider" }, - Alias = "model-3", - //ParentModelUri = "azureml://registries/azureml/models/model-3/versions/1", - ProviderType = common.ProviderType, - Version = common.Version, - ModelType = common.ModelType, - PromptTemplate = common.PromptTemplate, - Publisher = common.Publisher, - Task = common.Task, - FileSizeMb = common.FileSizeMb, - ModelSettings = common.ModelSettings, - SupportsToolCalling = common.SupportsToolCalling, - License = common.License, - LicenseDescription = common.LicenseDescription, - MaxOutputTokens = common.MaxOutputTokens, - MinFLVersion = common.MinFLVersion - }); - } - - list.AddRange(new[] - { - new ModelInfo - { - Id = "model-3-generic-gpu:1", - Name = "model-3-generic-gpu", - DisplayName = "model-3-generic-gpu", - Uri = "azureml://registries/azureml/models/model-3-generic-gpu/versions/1", - Runtime = new Runtime { DeviceType = DeviceType.GPU, ExecutionProvider = "WebGpuExecutionProvider" }, - Alias = "model-3", - //ParentModelUri = "azureml://registries/azureml/models/model-3/versions/1", - ProviderType = common.ProviderType, - Version = common.Version, ModelType = common.ModelType, - PromptTemplate = common.PromptTemplate, - Publisher = common.Publisher, Task = common.Task, - FileSizeMb = common.FileSizeMb, - ModelSettings = common.ModelSettings, - SupportsToolCalling = common.SupportsToolCalling, - License = common.License, - LicenseDescription = common.LicenseDescription, - MaxOutputTokens = common.MaxOutputTokens, - MinFLVersion = common.MinFLVersion - }, - new ModelInfo - { - Id = "model-3-generic-cpu:1", - Name = "model-3-generic-cpu", - DisplayName = "model-3-generic-cpu", - Uri = "azureml://registries/azureml/models/model-3-generic-cpu/versions/1", - Runtime = new Runtime { DeviceType = DeviceType.CPU, ExecutionProvider = "CPUExecutionProvider" }, - Alias = "model-3", - //ParentModelUri = "azureml://registries/azureml/models/model-3/versions/1", - ProviderType = common.ProviderType, - Version = common.Version, - ModelType = common.ModelType, - PromptTemplate = common.PromptTemplate, - Publisher = common.Publisher, Task = common.Task, - FileSizeMb = common.FileSizeMb, - ModelSettings = common.ModelSettings, - SupportsToolCalling = common.SupportsToolCalling, - License = common.License, - LicenseDescription = common.LicenseDescription, - MaxOutputTokens = common.MaxOutputTokens, - MinFLVersion = common.MinFLVersion - } - }); - - // model-4 generic-gpu (nullable prompt) - list.Add(new ModelInfo - { - Id = "model-4-generic-gpu:1", - Name = "model-4-generic-gpu", - DisplayName = "model-4-generic-gpu", - Uri = "azureml://registries/azureml/models/model-4-generic-gpu/versions/1", - Runtime = new Runtime { DeviceType = DeviceType.GPU, ExecutionProvider = "WebGpuExecutionProvider" }, - Alias = "model-4", - //ParentModelUri = "azureml://registries/azureml/models/model-4/versions/1", - ProviderType = common.ProviderType, - Version = common.Version, - ModelType = common.ModelType, - PromptTemplate = null, - Publisher = common.Publisher, - Task = common.Task, - FileSizeMb = common.FileSizeMb, - ModelSettings = common.ModelSettings, - SupportsToolCalling = common.SupportsToolCalling, - License = common.License, - LicenseDescription = common.LicenseDescription, - MaxOutputTokens = common.MaxOutputTokens, - MinFLVersion = common.MinFLVersion - }); - - return list; - } - - private static string GetSourceFilePath([CallerFilePath] string path = "") => path; - - // Gets the root directory of the foundry-local-sdk repository by finding the .git directory. - private static string GetRepoRoot() - { - var sourceFile = GetSourceFilePath(); - var dir = new DirectoryInfo(Path.GetDirectoryName(sourceFile)!); +// Find and load a whisper model +var model = await catalog.GetModelAsync("whisper-tiny"); +if (model == null) +{ + Console.WriteLine("whisper-tiny not found. Available models:"); + foreach (var m in models) + Console.WriteLine($" - {m.Alias}"); + return; +} - while (dir != null) - { - if (Directory.Exists(Path.Combine(dir.FullName, ".git"))) - return dir.FullName; +Console.WriteLine($"=== Downloading {model.Alias} ==="); +await model.DownloadAsync(p => Console.Write($"\r Progress: {p:F1}%")); +Console.WriteLine(); + +Console.WriteLine($"=== Loading {model.Alias} ==="); +await model.LoadAsync(); +Console.WriteLine("Model loaded."); + +Console.WriteLine("=== Creating streaming session ==="); +var audioClient = await model.GetAudioClientAsync(); +var streamingClient = audioClient.CreateStreamingSession(); +streamingClient.Settings.SampleRate = 16000; +streamingClient.Settings.Channels = 1; +streamingClient.Settings.BitsPerSample = 16; +streamingClient.Settings.Language = "en"; + +Console.WriteLine("=== Starting streaming session ==="); +await streamingClient.StartAsync(); +Console.WriteLine("Session started!"); + +// Push some fake PCM data (silence — 100ms at 16kHz 16-bit mono = 3200 bytes) +var fakePcm = new byte[3200]; +Console.WriteLine("=== Pushing audio chunks ==="); +for (int i = 0; i < 5; i++) +{ + await streamingClient.AppendAsync(fakePcm); + Console.WriteLine($" Pushed chunk {i + 1}"); +} - dir = dir.Parent; - } +Console.WriteLine("=== Stopping session ==="); +await streamingClient.StopAsync(); +Console.WriteLine("Session stopped."); - throw new InvalidOperationException("Could not find git repository root from test file location"); - } -} +Console.WriteLine("=== Unloading model ==="); +await model.UnloadAsync(); +Console.WriteLine("Done! All plumbing works end-to-end."); \ No newline at end of file From f5bd9162c30928435e7bd5e39876313c378a9ad0 Mon Sep 17 00:00:00 2001 From: ruiren_microsoft Date: Thu, 12 Mar 2026 18:52:41 -0700 Subject: [PATCH 5/5] update the api --- sdk_v2/cs/src/Detail/CoreInterop.cs | 86 ++++------------------------- 1 file changed, 12 insertions(+), 74 deletions(-) diff --git a/sdk_v2/cs/src/Detail/CoreInterop.cs b/sdk_v2/cs/src/Detail/CoreInterop.cs index e4c88e9b..c5eba7ec 100644 --- a/sdk_v2/cs/src/Detail/CoreInterop.cs +++ b/sdk_v2/cs/src/Detail/CoreInterop.cs @@ -158,7 +158,12 @@ private static unsafe partial void CoreExecuteCommandWithCallback(RequestBuffer* nint callbackPtr, // NativeCallbackFn pointer nint userData); - // --- Audio streaming P/Invoke imports --- + [LibraryImport(LibraryName, EntryPoint = "execute_command_with_binary")] + [UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })] + private static unsafe partial void CoreExecuteCommandWithBinary(StreamingRequestBuffer* nativeRequest, + ResponseBuffer* nativeResponse); + + // --- Audio streaming P/Invoke imports (kept for future dedicated entry points) --- [LibraryImport(LibraryName, EntryPoint = "audio_stream_start")] [UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })] @@ -377,45 +382,13 @@ private Response MarshalResponse(ResponseBuffer response) } // --- Audio streaming managed implementations --- + // Route through the existing execute_command / execute_command_with_binary entry points. + // The Core handles audio_stream_start / audio_stream_stop as command cases in ExecuteCommandManaged, + // and audio_stream_push as a command case in ExecuteCommandWithBinaryManaged. public Response StartAudioStream(CoreInteropRequest request) { - try - { - var commandInputJson = request.ToJson(); - byte[] inputBytes = System.Text.Encoding.UTF8.GetBytes(commandInputJson); - - IntPtr inputPtr = Marshal.AllocHGlobal(inputBytes.Length); - Marshal.Copy(inputBytes, 0, inputPtr, inputBytes.Length); - - unsafe - { - var reqBuf = new RequestBuffer - { - Command = IntPtr.Zero, - CommandLength = 0, - Data = inputPtr, - DataLength = inputBytes.Length - }; - - ResponseBuffer response = default; - - try - { - CoreAudioStreamStart(&reqBuf, &response); - } - finally - { - Marshal.FreeHGlobal(inputPtr); - } - - return MarshalResponse(response); - } - } - catch (Exception ex) when (ex is not OperationCanceledException) - { - throw new FoundryLocalException("Error executing audio_stream_start", ex, _logger); - } + return ExecuteCommand("audio_stream_start", request); } public Response PushAudioData(CoreInteropRequest request, ReadOnlyMemory audioData) @@ -451,7 +424,7 @@ public Response PushAudioData(CoreInteropRequest request, ReadOnlyMemory a try { - CoreAudioStreamPush(&reqBuf, &response); + CoreExecuteCommandWithBinary(&reqBuf, &response); } finally { @@ -470,42 +443,7 @@ public Response PushAudioData(CoreInteropRequest request, ReadOnlyMemory a public Response StopAudioStream(CoreInteropRequest request) { - try - { - var commandInputJson = request.ToJson(); - byte[] inputBytes = System.Text.Encoding.UTF8.GetBytes(commandInputJson); - - IntPtr inputPtr = Marshal.AllocHGlobal(inputBytes.Length); - Marshal.Copy(inputBytes, 0, inputPtr, inputBytes.Length); - - unsafe - { - var reqBuf = new RequestBuffer - { - Command = IntPtr.Zero, - CommandLength = 0, - Data = inputPtr, - DataLength = inputBytes.Length - }; - - ResponseBuffer response = default; - - try - { - CoreAudioStreamStop(&reqBuf, &response); - } - finally - { - Marshal.FreeHGlobal(inputPtr); - } - - return MarshalResponse(response); - } - } - catch (Exception ex) when (ex is not OperationCanceledException) - { - throw new FoundryLocalException("Error executing audio_stream_stop", ex, _logger); - } + return ExecuteCommand("audio_stream_stop", request); } }