diff --git a/sdk_v2/cs/src/AssemblyInfo.cs b/sdk_v2/cs/src/AssemblyInfo.cs index 9bebe71b..987f9de6 100644 --- a/sdk_v2/cs/src/AssemblyInfo.cs +++ b/sdk_v2/cs/src/AssemblyInfo.cs @@ -7,4 +7,5 @@ using System.Runtime.CompilerServices; [assembly: InternalsVisibleTo("Microsoft.AI.Foundry.Local.Tests")] +[assembly: InternalsVisibleTo("AudioStreamTest")] [assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] // for Mock of ICoreInterop diff --git a/sdk_v2/cs/src/Detail/CoreInterop.cs b/sdk_v2/cs/src/Detail/CoreInterop.cs index 8411473b..c5eba7ec 100644 --- a/sdk_v2/cs/src/Detail/CoreInterop.cs +++ b/sdk_v2/cs/src/Detail/CoreInterop.cs @@ -158,6 +158,31 @@ private static unsafe partial void CoreExecuteCommandWithCallback(RequestBuffer* nint callbackPtr, // NativeCallbackFn pointer nint userData); + [LibraryImport(LibraryName, EntryPoint = "execute_command_with_binary")] + [UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })] + private static unsafe partial void CoreExecuteCommandWithBinary(StreamingRequestBuffer* nativeRequest, + ResponseBuffer* nativeResponse); + + // --- Audio streaming P/Invoke imports (kept for future dedicated entry points) --- + + [LibraryImport(LibraryName, EntryPoint = "audio_stream_start")] + [UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })] + private static unsafe partial void CoreAudioStreamStart( + RequestBuffer* request, + ResponseBuffer* response); + + [LibraryImport(LibraryName, EntryPoint = "audio_stream_push")] + [UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })] + private static unsafe partial void CoreAudioStreamPush( + StreamingRequestBuffer* request, + ResponseBuffer* response); + + [LibraryImport(LibraryName, EntryPoint = "audio_stream_stop")] + [UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })] + private static unsafe partial void CoreAudioStreamStop( + RequestBuffer* request, + ResponseBuffer* response); + // helper to capture exceptions in callbacks internal class CallbackHelper { @@ -331,4 +356,94 @@ public Task ExecuteCommandWithCallbackAsync(string commandName, CoreIn return Task.Run(() => ExecuteCommandWithCallback(commandName, commandInput, callback), ct); } + /// + /// Marshal a ResponseBuffer from unmanaged memory into a managed Response and free the unmanaged memory. + /// + private Response MarshalResponse(ResponseBuffer response) + { + Response result = new(); + + if (response.Data != IntPtr.Zero && response.DataLength > 0) + { + byte[] managedResponse = new byte[response.DataLength]; + Marshal.Copy(response.Data, managedResponse, 0, response.DataLength); + result.Data = System.Text.Encoding.UTF8.GetString(managedResponse); + } + + if (response.Error != IntPtr.Zero && response.ErrorLength > 0) + { + result.Error = Marshal.PtrToStringUTF8(response.Error, response.ErrorLength)!; + } + + Marshal.FreeHGlobal(response.Data); + Marshal.FreeHGlobal(response.Error); + + return result; + } + + // --- Audio streaming managed implementations --- + // Route through the existing execute_command / execute_command_with_binary entry points. + // The Core handles audio_stream_start / audio_stream_stop as command cases in ExecuteCommandManaged, + // and audio_stream_push as a command case in ExecuteCommandWithBinaryManaged. + + public Response StartAudioStream(CoreInteropRequest request) + { + return ExecuteCommand("audio_stream_start", request); + } + + public Response PushAudioData(CoreInteropRequest request, ReadOnlyMemory audioData) + { + try + { + var commandInputJson = request.ToJson(); + byte[] commandBytes = System.Text.Encoding.UTF8.GetBytes("audio_stream_push"); + byte[] inputBytes = System.Text.Encoding.UTF8.GetBytes(commandInputJson); + + IntPtr commandPtr = Marshal.AllocHGlobal(commandBytes.Length); + Marshal.Copy(commandBytes, 0, commandPtr, commandBytes.Length); + + IntPtr inputPtr = Marshal.AllocHGlobal(inputBytes.Length); + Marshal.Copy(inputBytes, 0, inputPtr, inputBytes.Length); + + // Pin the managed audio data so GC won't move it during the native call + using var audioHandle = audioData.Pin(); + + unsafe + { + var reqBuf = new StreamingRequestBuffer + { + Command = commandPtr, + CommandLength = commandBytes.Length, + Data = inputPtr, + DataLength = inputBytes.Length, + BinaryData = (nint)audioHandle.Pointer, + BinaryDataLength = audioData.Length + }; + + ResponseBuffer response = default; + + try + { + CoreExecuteCommandWithBinary(&reqBuf, &response); + } + finally + { + Marshal.FreeHGlobal(commandPtr); + Marshal.FreeHGlobal(inputPtr); + } + + return MarshalResponse(response); + } + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + throw new FoundryLocalException("Error executing audio_stream_push", ex, _logger); + } + } + + public Response StopAudioStream(CoreInteropRequest request) + { + return ExecuteCommand("audio_stream_stop", request); + } + } diff --git a/sdk_v2/cs/src/Detail/ICoreInterop.cs b/sdk_v2/cs/src/Detail/ICoreInterop.cs index 1fff9dde..b493dfb7 100644 --- a/sdk_v2/cs/src/Detail/ICoreInterop.cs +++ b/sdk_v2/cs/src/Detail/ICoreInterop.cs @@ -51,4 +51,21 @@ Task ExecuteCommandAsync(string commandName, CoreInteropRequest? comma Task ExecuteCommandWithCallbackAsync(string commandName, CoreInteropRequest? commandInput, CallbackFn callback, CancellationToken? ct = null); + + // --- Audio streaming session support --- + + [StructLayout(LayoutKind.Sequential)] + protected unsafe struct StreamingRequestBuffer + { + public nint Command; + public int CommandLength; + public nint Data; // JSON params + public int DataLength; + public nint BinaryData; // raw PCM audio bytes + public int BinaryDataLength; + } + + Response StartAudioStream(CoreInteropRequest request); + Response PushAudioData(CoreInteropRequest request, ReadOnlyMemory audioData); + Response StopAudioStream(CoreInteropRequest request); } diff --git a/sdk_v2/cs/src/Detail/JsonSerializationContext.cs b/sdk_v2/cs/src/Detail/JsonSerializationContext.cs index 894f9454..3cc079f3 100644 --- a/sdk_v2/cs/src/Detail/JsonSerializationContext.cs +++ b/sdk_v2/cs/src/Detail/JsonSerializationContext.cs @@ -33,6 +33,9 @@ namespace Microsoft.AI.Foundry.Local.Detail; [JsonSerializable(typeof(IList))] [JsonSerializable(typeof(PropertyDefinition))] [JsonSerializable(typeof(IList))] +// --- NEW: Audio streaming types --- +[JsonSerializable(typeof(AudioStreamTranscriptionResult))] +[JsonSerializable(typeof(CoreErrorResponse))] [JsonSourceGenerationOptions(DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, WriteIndented = false)] internal partial class JsonSerializationContext : JsonSerializerContext diff --git a/sdk_v2/cs/src/OpenAI/AudioClient.cs b/sdk_v2/cs/src/OpenAI/AudioClient.cs index 5475185c..1f44996b 100644 --- a/sdk_v2/cs/src/OpenAI/AudioClient.cs +++ b/sdk_v2/cs/src/OpenAI/AudioClient.cs @@ -6,9 +6,6 @@ namespace Microsoft.AI.Foundry.Local; -using System.Runtime.CompilerServices; -using System.Threading.Channels; - using Betalgo.Ranul.OpenAI.ObjectModels.RequestModels; using Betalgo.Ranul.OpenAI.ObjectModels.ResponseModels; @@ -46,6 +43,16 @@ public record AudioSettings /// public AudioSettings Settings { get; } = new(); + /// + /// Create a real-time streaming transcription session. + /// Audio data is pushed in as PCM chunks and transcription results are returned as an async stream. + /// + /// A streaming session that must be disposed when done. + public AudioTranscriptionStreamSession CreateStreamingSession() + { + return new AudioTranscriptionStreamSession(_modelId); + } + /// /// Transcribe audio from a file. /// @@ -63,28 +70,6 @@ public async Task TranscribeAudioAsync(string .ConfigureAwait(false); } - /// - /// Transcribe audio from a file with streamed output. - /// - /// - /// Path to file containing audio recording. - /// Supported formats: mp3 - /// - /// Cancellation token. - /// An asynchronous enumerable of transcription responses. - public async IAsyncEnumerable TranscribeAudioStreamingAsync( - string audioFilePath, [EnumeratorCancellation] CancellationToken ct) - { - var enumerable = Utils.CallWithExceptionHandling( - () => TranscribeAudioStreamingImplAsync(audioFilePath, ct), - "Error during streaming audio transcription.", _logger).ConfigureAwait(false); - - await foreach (var item in enumerable) - { - yield return item; - } - } - private async Task TranscribeAudioImplAsync(string audioFilePath, CancellationToken? ct) { @@ -107,93 +92,4 @@ private async Task TranscribeAudioImplAsync(st return output; } - - private async IAsyncEnumerable TranscribeAudioStreamingImplAsync( - string audioFilePath, [EnumeratorCancellation] CancellationToken ct) - { - var openaiRequest = AudioTranscriptionCreateRequestExtended.FromUserInput(_modelId, audioFilePath, Settings); - - var request = new CoreInteropRequest - { - Params = new Dictionary - { - { "OpenAICreateRequest", openaiRequest.ToJson() }, - } - }; - - var channel = Channel.CreateUnbounded( - new UnboundedChannelOptions - { - SingleWriter = true, - SingleReader = true, - AllowSynchronousContinuations = true - }); - - // The callback will push ChatResponse objects into the channel. - // The channel reader will return the values to the user. - // This setup prevents the user from blocking the thread generating the responses. - _ = Task.Run(async () => - { - try - { - var failed = false; - - var res = await _coreInterop.ExecuteCommandWithCallbackAsync( - "audio_transcribe", - request, - async (callbackData) => - { - try - { - if (!failed) - { - var audioCompletion = callbackData.ToAudioTranscription(_logger); - await channel.Writer.WriteAsync(audioCompletion); - } - } - catch (Exception ex) - { - // propagate exception to reader - channel.Writer.TryComplete( - new FoundryLocalException( - "Error processing streaming audio transcription callback data.", ex, _logger)); - failed = true; - } - }, - ct - ).ConfigureAwait(false); - - // If the native layer returned an error (e.g. missing audio file, invalid model) - // without invoking any callbacks, propagate it so the caller sees an exception - // instead of an empty stream. - if (res.Error != null) - { - channel.Writer.TryComplete( - new FoundryLocalException($"Error from audio_transcribe command: {res.Error}", _logger)); - return; - } - - // use TryComplete as an exception in the callback may have already closed the channel - _ = channel.Writer.TryComplete(); - } - // Ignore cancellation exceptions so we don't convert them into errors - catch (Exception ex) when (ex is not OperationCanceledException) - { - channel.Writer.TryComplete( - new FoundryLocalException("Error executing streaming chat completion.", ex, _logger)); - } - catch (OperationCanceledException) - { - // Complete the channel on cancellation but don't turn it into an error - channel.Writer.TryComplete(); - } - }, ct); - - // Start reading from the channel as items arrive. - // This will continue until ExecuteCommandWithCallbackAsync completes and closes the channel. - await foreach (var item in channel.Reader.ReadAllAsync(ct)) - { - yield return item; - } - } } diff --git a/sdk_v2/cs/src/OpenAI/AudioStreamTranscriptionTypes.cs b/sdk_v2/cs/src/OpenAI/AudioStreamTranscriptionTypes.cs new file mode 100644 index 00000000..02c4169e --- /dev/null +++ b/sdk_v2/cs/src/OpenAI/AudioStreamTranscriptionTypes.cs @@ -0,0 +1,74 @@ +namespace Microsoft.AI.Foundry.Local; + +using System.Text.Json; +using System.Text.Json.Serialization; +using Microsoft.AI.Foundry.Local.Detail; + +public record AudioStreamTranscriptionResult +{ + /// + /// Whether this is a final or partial (interim) result. + /// - Nemotron models always return true (every result is final). + /// - Other models (e.g., Azure Embedded) may return false for interim + /// hypotheses that will be replaced by a subsequent final result. + /// + [JsonPropertyName("is_final")] + public bool IsFinal { get; init; } + + /// + /// Newly transcribed text from this audio chunk only (incremental hypothesis). + /// This is NOT the full accumulated transcript — each result contains only + /// the text decoded from the most recent audio chunk. + /// + [JsonPropertyName("text")] + public string Text { get; init; } = string.Empty; + + /// Start time offset of this segment in the audio stream (seconds). + [JsonPropertyName("start_time")] + public double? StartTime { get; init; } + + /// End time offset of this segment in the audio stream (seconds). + [JsonPropertyName("end_time")] + public double? EndTime { get; init; } + + /// Confidence score (0.0 - 1.0) if available. + [JsonPropertyName("confidence")] + public float? Confidence { get; init; } + + internal static AudioStreamTranscriptionResult FromJson(string json) + { + return JsonSerializer.Deserialize(json, + JsonSerializationContext.Default.AudioStreamTranscriptionResult) + ?? throw new FoundryLocalException("Failed to deserialize AudioStreamTranscriptionResult"); + } +} + +internal record CoreErrorResponse +{ + [JsonPropertyName("code")] + public string Code { get; init; } = ""; + + [JsonPropertyName("message")] + public string Message { get; init; } = ""; + + [JsonPropertyName("isTransient")] + public bool IsTransient { get; init; } + + /// + /// Attempt to parse a native error string as structured JSON. + /// Returns null if the error is not valid JSON or doesn't match the schema, + /// which should be treated as a permanent/unknown error. + /// + internal static CoreErrorResponse? TryParse(string errorString) + { + try + { + return JsonSerializer.Deserialize(errorString, + JsonSerializationContext.Default.CoreErrorResponse); + } + catch + { + return null; // unstructured error — treat as permanent + } + } +} \ No newline at end of file diff --git a/sdk_v2/cs/src/OpenAI/AudioStreamingClient.cs b/sdk_v2/cs/src/OpenAI/AudioStreamingClient.cs new file mode 100644 index 00000000..f0a1904d --- /dev/null +++ b/sdk_v2/cs/src/OpenAI/AudioStreamingClient.cs @@ -0,0 +1,405 @@ +// -------------------------------------------------------------------------------------------------------------------- +// +// Copyright (c) Microsoft. All rights reserved. +// +// -------------------------------------------------------------------------------------------------------------------- + +namespace Microsoft.AI.Foundry.Local; + +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using System.Globalization; +using System.Threading.Channels; +using Microsoft.AI.Foundry.Local.Detail; +using Microsoft.Extensions.Logging; + + +/// +/// Session for real-time audio streaming ASR (Automatic Speech Recognition). +/// Audio data from a microphone (or other source) is pushed in as PCM chunks, +/// and transcription results are returned as an async stream. +/// +/// Created via . +/// +/// Thread safety: PushAudioAsync can be called from any thread (including high-frequency +/// audio callbacks). Pushes are internally serialized via a bounded channel to prevent +/// unbounded memory growth and ensure ordering. +/// + + +public sealed class AudioTranscriptionStreamSession : IAsyncDisposable +{ + private readonly string _modelId; + private readonly ICoreInterop _coreInterop = FoundryLocalManager.Instance.CoreInterop; + private readonly ILogger _logger = FoundryLocalManager.Instance.Logger; + + // Session state — protected by _lock + private readonly AsyncLock _lock = new(); + private string? _sessionHandle; + private bool _started; + private bool _stopped; + + // Output channel: native callback writes, user reads via GetTranscriptionStream + private Channel? _outputChannel; + + // Internal push queue: user writes audio chunks, background loop drains to native core. + // Bounded to prevent unbounded memory growth if native core is slower than real-time. + private Channel>? _pushChannel; + private Task? _pushLoopTask; + + // Dedicated CTS for the push loop — decoupled from StartAsync's caller token. + // Cancelled only during StopAsync/DisposeAsync to allow clean drain. + private CancellationTokenSource? _sessionCts; + + // Snapshot of settings captured at StartAsync — prevents mutation after session starts. + private AudioStreamTranscriptionOptions? _activeSettings; + + /// + /// Audio format settings for the streaming session. + /// Must be configured before calling . + /// Settings are frozen once the session starts. + /// + public record AudioStreamTranscriptionOptions + { + /// PCM sample rate in Hz. Default: 16000. + public int SampleRate { get; set; } = 16000; + + /// Number of audio channels. Default: 1 (mono). + public int Channels { get; set; } = 1; + + /// Optional BCP-47 language hint (e.g., "en", "zh"). + public string? Language { get; set; } + + /// + /// Maximum number of audio chunks buffered in the internal push queue. + /// If the queue is full, AppendAsync will asynchronously wait. + /// Default: 100 (~3 seconds of audio at typical chunk sizes). + /// + public int PushQueueCapacity { get; set; } = 100; + + internal AudioStreamTranscriptionOptions Snapshot() => this with { }; // record copy + } + + public AudioStreamTranscriptionOptions Settings { get; } = new(); + + internal AudioTranscriptionStreamSession(string modelId) + { + _modelId = modelId; + } + + /// + /// Start a real-time audio streaming session. + /// Must be called before or . + /// Settings are frozen after this call. + /// + /// Cancellation token. + public async Task StartAsync(CancellationToken ct = default) + { + using var disposable = await _lock.LockAsync().ConfigureAwait(false); + + if (_started) + { + throw new FoundryLocalException("Streaming session already started. Call StopAsync first."); + } + + // Freeze settings + _activeSettings = Settings.Snapshot(); + + _outputChannel = Channel.CreateUnbounded( + new UnboundedChannelOptions + { + SingleWriter = true, // only the native callback writes + SingleReader = true, + AllowSynchronousContinuations = true + }); + + _pushChannel = Channel.CreateBounded>( + new BoundedChannelOptions(_activeSettings.PushQueueCapacity) + { + SingleReader = true, // only the push loop reads + SingleWriter = false, // multiple threads may push audio data + FullMode = BoundedChannelFullMode.Wait + }); + + var request = new CoreInteropRequest + { + Params = new Dictionary + { + { "Model", _modelId }, + { "SampleRate", _activeSettings.SampleRate.ToString(CultureInfo.InvariantCulture) }, + { "Channels", _activeSettings.Channels.ToString(CultureInfo.InvariantCulture) }, + } + }; + + if (_activeSettings.Language != null) + { + request.Params["Language"] = _activeSettings.Language; + } + + // StartAudioStream uses existing execute_command entry point — synchronous P/Invoke + var response = await Task.Run( + () => _coreInterop.StartAudioStream(request), ct) + .ConfigureAwait(false); + + if (response.Error != null) + { + _outputChannel.Writer.TryComplete(); + throw new FoundryLocalException( + $"Error starting audio stream session: {response.Error}", _logger); + } + + _sessionHandle = response.Data + ?? throw new FoundryLocalException("Native core did not return a session handle.", _logger); + _started = true; + _stopped = false; + + // Use a dedicated CTS for the push loop — NOT the caller's ct. +#pragma warning disable IDISP003 // Dispose previous before re-assigning + _sessionCts = new CancellationTokenSource(); +#pragma warning restore IDISP003 +#pragma warning disable IDISP013 // Await in using + _pushLoopTask = Task.Run(() => PushLoopAsync(_sessionCts.Token), CancellationToken.None); +#pragma warning restore IDISP013 + } + + /// + /// Push a chunk of raw PCM audio data to the streaming session. + /// Can be called from any thread (including audio device callbacks). + /// Chunks are internally queued and serialized to the native core. + /// + /// Raw PCM audio bytes matching the configured format. + /// Cancellation token. + public async ValueTask AppendAsync(ReadOnlyMemory pcmData, CancellationToken ct = default) + { + if (!_started || _stopped) + { + throw new FoundryLocalException("No active streaming session. Call StartAsync first."); + } + + // Copy the data to avoid issues if the caller reuses the buffer (e.g. NAudio reuses e.Buffer) + var copy = new byte[pcmData.Length]; + pcmData.CopyTo(copy); + + await _pushChannel!.Writer.WriteAsync(copy, ct).ConfigureAwait(false); + } + + /// + /// Internal loop that drains the push queue and sends chunks to native core one at a time. + /// Implements retry for transient native errors and terminates the session on permanent failures. + /// + private async Task PushLoopAsync(CancellationToken ct) + { + const int maxRetries = 3; + var initialRetryDelay = TimeSpan.FromMilliseconds(50); + + try + { + await foreach (var audioData in _pushChannel!.Reader.ReadAllAsync(ct).ConfigureAwait(false)) + { + var request = new CoreInteropRequest + { + Params = new Dictionary { { "SessionHandle", _sessionHandle! } } + }; + + var pushed = false; + for (int attempt = 0; attempt <= maxRetries && !pushed; attempt++) + { + var response = _coreInterop.PushAudioData(request, audioData); + + if (response.Error == null) + { + pushed = true; + + // Parse transcription result from push response and surface it + if (!string.IsNullOrEmpty(response.Data)) + { + try + { + var transcription = AudioStreamTranscriptionResult.FromJson(response.Data); + if (!string.IsNullOrEmpty(transcription.Text)) + { + _outputChannel?.Writer.TryWrite(transcription); + } + } + catch (Exception parseEx) + { + // Non-fatal: log and continue if response isn't a transcription result + _logger.LogDebug(parseEx, "Could not parse push response as transcription result"); + } + } + + continue; + } + + // Parse structured error to determine transient vs permanent + var errorInfo = CoreErrorResponse.TryParse(response.Error); + + if (errorInfo?.IsTransient == true && attempt < maxRetries) + { + var delay = initialRetryDelay * Math.Pow(2, attempt); + _logger.LogWarning( + "Transient push error (attempt {Attempt}/{Max}): {Code}. Retrying in {Delay}ms", + attempt + 1, maxRetries, errorInfo.Code, delay.TotalMilliseconds); + await Task.Delay(delay, ct).ConfigureAwait(false); + continue; + } + + // Permanent error or retries exhausted — terminate the session + var fatalEx = new FoundryLocalException( + $"Push failed permanently (code={errorInfo?.Code ?? "UNKNOWN"}): {response.Error}", + _logger); + _logger.LogError("Terminating push loop due to permanent push failure: {Error}", + response.Error); + _outputChannel?.Writer.TryComplete(fatalEx); + return; // exit push loop + } + } + } + catch (OperationCanceledException) + { + // Expected on cancellation — push loop exits cleanly + } + catch (Exception ex) + { + _logger.LogError(ex, "Push loop terminated with unexpected error"); + _outputChannel?.Writer.TryComplete( + new FoundryLocalException("Push loop terminated unexpectedly.", ex, _logger)); + } + } + + /// + /// Get the async stream of transcription results. + /// Results arrive as the native ASR engine processes audio data. + /// + /// Cancellation token. + /// Async enumerable of transcription results. + public async IAsyncEnumerable GetTranscriptionStream( + [EnumeratorCancellation] CancellationToken ct = default) + { + if (_outputChannel == null) + { + throw new FoundryLocalException("No active streaming session. Call StartAsync first."); + } + + await foreach (var item in _outputChannel.Reader.ReadAllAsync(ct).ConfigureAwait(false)) + { + yield return item; + } + } + + /// + /// Signal end-of-audio and stop the streaming session. + /// Any remaining buffered audio in the push queue will be drained to native core first. + /// Final results are delivered through before it completes. + /// + /// Cancellation token. + public async Task StopAsync(CancellationToken ct = default) + { + using var disposable = await _lock.LockAsync().ConfigureAwait(false); + + if (!_started || _stopped) + { + return; // already stopped or never started + } + + _stopped = true; + + // 1. Complete the push channel so the push loop drains remaining items and exits + _pushChannel?.Writer.TryComplete(); + + // 2. Wait for the push loop to finish draining + if (_pushLoopTask != null) + { + await _pushLoopTask.ConfigureAwait(false); + } + + // 3. Cancel the session CTS (no-op if push loop already exited) + _sessionCts?.Cancel(); + + // 4. Tell native core to flush and finalize. + // This MUST happen even if ct is cancelled — otherwise native session leaks. + var request = new CoreInteropRequest + { + Params = new Dictionary { { "SessionHandle", _sessionHandle! } } + }; + + ICoreInterop.Response? response = null; + try + { + response = await Task.Run( + () => _coreInterop.StopAudioStream(request), ct) + .ConfigureAwait(false); + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + // ct fired, but we MUST still stop the native session to avoid a leak. + _logger.LogWarning("StopAsync cancelled — performing best-effort native session stop."); + try + { + response = await Task.Run( + () => _coreInterop.StopAudioStream(request)) + .ConfigureAwait(false); + } + catch (Exception cleanupEx) + { + _logger.LogError(cleanupEx, "Best-effort native session stop failed."); + } + + throw; // Re-throw the cancellation after cleanup + } + finally + { + // Parse final transcription from stop response before completing the channel + if (response?.Data != null) + { + try + { + var finalResult = AudioStreamTranscriptionResult.FromJson(response.Data); + if (!string.IsNullOrEmpty(finalResult.Text)) + { + _outputChannel?.Writer.TryWrite(finalResult); + } + } + catch (Exception parseEx) + { + _logger.LogDebug(parseEx, "Could not parse stop response as transcription result"); + } + } + + _sessionHandle = null; + _started = false; + _sessionCts?.Dispose(); + _sessionCts = null; + + // Complete the output channel AFTER writing final result + _outputChannel?.Writer.TryComplete(); + } + + if (response?.Error != null) + { + throw new FoundryLocalException( + $"Error stopping audio stream session: {response.Error}", _logger); + } + } + + public async ValueTask DisposeAsync() + { + try + { + if (_started && !_stopped) + { + await StopAsync().ConfigureAwait(false); + } + } + catch (Exception ex) + { + // DisposeAsync must never throw — log and swallow + _logger.LogWarning(ex, "Error during DisposeAsync cleanup."); + } + finally + { + _sessionCts?.Dispose(); + _lock.Dispose(); + } + } +} \ No newline at end of file diff --git a/sdk_v2/cs/test/FoundryLocal.Tests/ModelTests.cs b/sdk_v2/cs/test/FoundryLocal.Tests/ModelTests.cs index b5a49657..0e2ea1dc 100644 --- a/sdk_v2/cs/test/FoundryLocal.Tests/ModelTests.cs +++ b/sdk_v2/cs/test/FoundryLocal.Tests/ModelTests.cs @@ -52,3 +52,5 @@ public async Task GetLastestVersion_Works() await Assert.That(latestB).IsEqualTo(variants[1]); } } + + diff --git a/sdk_v2/cs/test/FoundryLocal.Tests/Utils.cs b/sdk_v2/cs/test/FoundryLocal.Tests/Utils.cs index 55808da9..6da59baf 100644 --- a/sdk_v2/cs/test/FoundryLocal.Tests/Utils.cs +++ b/sdk_v2/cs/test/FoundryLocal.Tests/Utils.cs @@ -1,452 +1,74 @@ -// -------------------------------------------------------------------------------------------------------------------- -// -// Copyright (c) Microsoft. All rights reserved. -// -// -------------------------------------------------------------------------------------------------------------------- - -namespace Microsoft.AI.Foundry.Local.Tests; - -using System; -using System.Collections.Generic; -using System.Runtime.CompilerServices; -using System.Text.Json; - -using Microsoft.AI.Foundry.Local.Detail; -using Microsoft.Extensions.Configuration; +using Microsoft.AI.Foundry.Local; using Microsoft.Extensions.Logging; -using Microsoft.VisualStudio.TestPlatform.TestHost; +var loggerFactory = LoggerFactory.Create(b => b.AddConsole().SetMinimumLevel(LogLevel.Debug)); +var logger = loggerFactory.CreateLogger("AudioStreamTest"); -using Moq; +// Point to the directory containing Core + ORT DLLs +var corePath = @"C:\Users\ruiren\Desktop\audio-stream-test\Microsoft.AI.Foundry.Local.Core.dll"; -internal static class Utils +var config = new Configuration { - internal struct TestCatalogInfo - { - internal readonly List TestCatalog { get; } - internal readonly string ModelListJson { get; } - - internal TestCatalogInfo(bool includeCuda) - { - - TestCatalog = Utils.BuildTestCatalog(includeCuda); - ModelListJson = JsonSerializer.Serialize(TestCatalog, JsonSerializationContext.Default.ListModelInfo); - } - } - - internal static readonly TestCatalogInfo TestCatalog = new(true); - - [Before(Assembly)] - public static void AssemblyInit(AssemblyHookContext _) - { - using var loggerFactory = LoggerFactory.Create(builder => - { - builder - .AddConsole() - .SetMinimumLevel(LogLevel.Debug); - }); - - ILogger logger = loggerFactory.CreateLogger(); - - // Read configuration from appsettings.Test.json - logger.LogDebug("Reading configuration from appsettings.Test.json"); - var configuration = new ConfigurationBuilder() - .SetBasePath(Directory.GetCurrentDirectory()) - .AddJsonFile("appsettings.Test.json", optional: true, reloadOnChange: false) - .Build(); - - var testModelCacheDirName = "test-data-shared"; - string testDataSharedPath; - if (Path.IsPathRooted(testModelCacheDirName) || - testModelCacheDirName.Contains(Path.DirectorySeparatorChar) || - testModelCacheDirName.Contains(Path.AltDirectorySeparatorChar)) - { - // It's a relative or complete filepath, resolve from current directory - testDataSharedPath = Path.GetFullPath(testModelCacheDirName); - } - else - { - // It's just a directory name, combine with repo root parent - testDataSharedPath = Path.GetFullPath(Path.Combine(GetRepoRoot(), "..", testModelCacheDirName)); - } - - logger.LogInformation("Using test model cache directory: {testDataSharedPath}", testDataSharedPath); - - if (!Directory.Exists(testDataSharedPath)) - { - throw new DirectoryNotFoundException($"Test model cache directory does not exist: {testDataSharedPath}"); - - } - - var config = new Configuration - { - AppName = "FoundryLocalSdkTest", - LogLevel = Local.LogLevel.Debug, - Web = new Configuration.WebService - { - Urls = "http://127.0.0.1:0" - }, - ModelCacheDir = testDataSharedPath, - LogsDir = Path.Combine(GetRepoRoot(), "sdk_v2", "cs", "logs") - }; - - // Initialize the singleton instance. - FoundryLocalManager.CreateAsync(config, logger).GetAwaiter().GetResult(); - - // standalone instance for testing individual components that skips the 'initialize' command - CoreInterop = new CoreInterop(logger); - } - - internal static ICoreInterop CoreInterop { get; private set; } = default!; - - internal static Mock CreateCapturingLoggerMock(List sink) - { - var mock = new Mock(); - mock.Setup(x => x.Log( - It.IsAny(), - It.IsAny(), - It.IsAny(), - It.IsAny(), - (Func)It.IsAny())) - .Callback((LogLevel level, EventId id, object state, Exception? ex, Delegate formatter) => - { - var message = formatter.DynamicInvoke(state, ex) as string; - sink.Add($"{level}: {message}"); - }); - - return mock; - } - - internal sealed record InteropCommandInterceptInfo - { - public string CommandName { get; init; } = default!; - public string? CommandInput { get; init; } - public string ResponseData { get; init; } = default!; - public string? ResponseError { get; init; } - } - - internal static Mock CreateCoreInteropWithIntercept(ICoreInterop coreInterop, - List intercepts) - { - var mock = new Mock(); - var interceptNames = new HashSet(StringComparer.InvariantCulture); - - foreach (var intercept in intercepts) - { - if (!interceptNames.Add(intercept.CommandName)) - { - throw new ArgumentException($"Duplicate intercept for command {intercept.CommandName}"); - } - - mock.Setup(x => x.ExecuteCommand(It.Is(s => s == intercept.CommandName), It.IsAny())) - .Returns(new ICoreInterop.Response - { - Data = intercept.ResponseData, - Error = intercept.ResponseError - }); - - mock.Setup(x => x.ExecuteCommandAsync(It.Is(s => s == intercept.CommandName), - It.IsAny(), - It.IsAny())) - .ReturnsAsync(new ICoreInterop.Response - { - Data = intercept.ResponseData, - Error = intercept.ResponseError - }); - } - - mock.Setup(x => x.ExecuteCommand(It.Is(s => !interceptNames.Contains(s)), - It.IsAny())) - .Returns((string commandName, CoreInteropRequest? commandInput) => - coreInterop.ExecuteCommand(commandName, commandInput)); - - mock.Setup(x => x.ExecuteCommandAsync(It.Is(s => !interceptNames.Contains(s)), - It.IsAny(), - It.IsAny())) - .Returns((string commandName, CoreInteropRequest? commandInput, CancellationToken? ct) => - coreInterop.ExecuteCommandAsync(commandName, commandInput, ct)); - - return mock; - } - - internal static bool IsRunningInCI() + AppName = "AudioStreamTest", + LogLevel = Microsoft.AI.Foundry.Local.LogLevel.Debug, + AdditionalSettings = new Dictionary { - var azureDevOps = Environment.GetEnvironmentVariable("TF_BUILD"); - var githubActions = Environment.GetEnvironmentVariable("GITHUB_ACTIONS"); - var isCI = string.Equals(azureDevOps, "True", StringComparison.OrdinalIgnoreCase) || - string.Equals(githubActions, "true", StringComparison.OrdinalIgnoreCase); - - return isCI; + { "FoundryLocalCorePath", corePath } } +}; - private static List BuildTestCatalog(bool includeCuda = true) - { - // Mirrors MOCK_CATALOG_DATA ordering and fields (Python tests) - var common = new - { - ProviderType = "AzureFoundry", - Version = 1, - ModelType = "ONNX", - PromptTemplate = (PromptTemplate?)null, - Publisher = "Microsoft", - Task = "chat-completion", - FileSizeMb = 10403, - ModelSettings = new ModelSettings { Parameters = [] }, - SupportsToolCalling = false, - License = "MIT", - LicenseDescription = "License…", - MaxOutputTokens = 1024L, - MinFLVersion = "1.0.0", - }; - - var list = new List - { - // model-1 generic-gpu, generic-cpu:2, generic-cpu:1 - new() - { - Id = "model-1-generic-gpu:1", - Name = "model-1-generic-gpu", - DisplayName = "model-1-generic-gpu", - Uri = "azureml://registries/azureml/models/model-1-generic-gpu/versions/1", - Runtime = new Runtime { DeviceType = DeviceType.GPU, ExecutionProvider = "WebGpuExecutionProvider" }, - Alias = "model-1", - // ParentModelUri = "azureml://registries/azureml/models/model-1/versions/1", - ProviderType = common.ProviderType, Version = common.Version, ModelType = common.ModelType, - PromptTemplate = common.PromptTemplate, Publisher = common.Publisher, Task = common.Task, - FileSizeMb = common.FileSizeMb, ModelSettings = common.ModelSettings, - SupportsToolCalling = common.SupportsToolCalling, License = common.License, - LicenseDescription = common.LicenseDescription, MaxOutputTokens = common.MaxOutputTokens, - MinFLVersion = common.MinFLVersion - }, - new() - { - Id = "model-1-generic-cpu:2", - Name = "model-1-generic-cpu", - DisplayName = "model-1-generic-cpu", - Uri = "azureml://registries/azureml/models/model-1-generic-cpu/versions/2", - Runtime = new Runtime { DeviceType = DeviceType.CPU, ExecutionProvider = "CPUExecutionProvider" }, - Alias = "model-1", - // ParentModelUri = "azureml://registries/azureml/models/model-1/versions/2", - ProviderType = common.ProviderType, - Version = common.Version, ModelType = common.ModelType, - PromptTemplate = common.PromptTemplate, - Publisher = common.Publisher, Task = common.Task, - FileSizeMb = common.FileSizeMb - 10, // smaller so default chosen in test that sorts on this - ModelSettings = common.ModelSettings, - SupportsToolCalling = common.SupportsToolCalling, - License = common.License, - LicenseDescription = common.LicenseDescription, - MaxOutputTokens = common.MaxOutputTokens, - MinFLVersion = common.MinFLVersion - }, - new() - { - Id = "model-1-generic-cpu:1", - Name = "model-1-generic-cpu", - DisplayName = "model-1-generic-cpu", - Uri = "azureml://registries/azureml/models/model-1-generic-cpu/versions/1", - Runtime = new Runtime { DeviceType = DeviceType.CPU, ExecutionProvider = "CPUExecutionProvider" }, - Alias = "model-1", - //ParentModelUri = "azureml://registries/azureml/models/model-1/versions/1", - ProviderType = common.ProviderType, - Version = common.Version, - ModelType = common.ModelType, - PromptTemplate = common.PromptTemplate, - Publisher = common.Publisher, Task = common.Task, - FileSizeMb = common.FileSizeMb, - ModelSettings = common.ModelSettings, - SupportsToolCalling = common.SupportsToolCalling, - License = common.License, - LicenseDescription = common.LicenseDescription, - MaxOutputTokens = common.MaxOutputTokens, - MinFLVersion = common.MinFLVersion - }, +Console.WriteLine("=== Initializing FoundryLocalManager ==="); +await FoundryLocalManager.CreateAsync(config, logger); +var manager = FoundryLocalManager.Instance; - // model-2 npu:2, npu:1, generic-cpu:1 - new() - { - Id = "model-2-npu:2", - Name = "model-2-npu", - DisplayName = "model-2-npu", - Uri = "azureml://registries/azureml/models/model-2-npu/versions/2", - Runtime = new Runtime { DeviceType = DeviceType.NPU, ExecutionProvider = "QNNExecutionProvider" }, - Alias = "model-2", - //ParentModelUri = "azureml://registries/azureml/models/model-2/versions/2", - ProviderType = common.ProviderType, - Version = common.Version, ModelType = common.ModelType, - PromptTemplate = common.PromptTemplate, - Publisher = common.Publisher, Task = common.Task, - FileSizeMb = common.FileSizeMb, - ModelSettings = common.ModelSettings, - SupportsToolCalling = common.SupportsToolCalling, - License = common.License, - LicenseDescription = common.LicenseDescription, - MaxOutputTokens = common.MaxOutputTokens, - MinFLVersion = common.MinFLVersion - }, - new() - { - Id = "model-2-npu:1", - Name = "model-2-npu", - DisplayName = "model-2-npu", - Uri = "azureml://registries/azureml/models/model-2-npu/versions/1", - Runtime = new Runtime { DeviceType = DeviceType.NPU, ExecutionProvider = "QNNExecutionProvider" }, - Alias = "model-2", - //ParentModelUri = "azureml://registries/azureml/models/model-2/versions/1", - ProviderType = common.ProviderType, - Version = common.Version, ModelType = common.ModelType, - PromptTemplate = common.PromptTemplate, - Publisher = common.Publisher, Task = common.Task, - FileSizeMb = common.FileSizeMb, - ModelSettings = common.ModelSettings, - SupportsToolCalling = common.SupportsToolCalling, - License = common.License, - LicenseDescription = common.LicenseDescription, - MaxOutputTokens = common.MaxOutputTokens, - MinFLVersion = common.MinFLVersion - }, - new() - { - Id = "model-2-generic-cpu:1", - Name = "model-2-generic-cpu", - DisplayName = "model-2-generic-cpu", - Uri = "azureml://registries/azureml/models/model-2-generic-cpu/versions/1", - Runtime = new Runtime { DeviceType = DeviceType.CPU, ExecutionProvider = "CPUExecutionProvider" }, - Alias = "model-2", - //ParentModelUri = "azureml://registries/azureml/models/model-2/versions/1", - ProviderType = common.ProviderType, - Version = common.Version, ModelType = common.ModelType, - PromptTemplate = common.PromptTemplate, - Publisher = common.Publisher, Task = common.Task, - FileSizeMb = common.FileSizeMb, - ModelSettings = common.ModelSettings, - SupportsToolCalling = common.SupportsToolCalling, - License = common.License, - LicenseDescription = common.LicenseDescription, - MaxOutputTokens = common.MaxOutputTokens, - MinFLVersion = common.MinFLVersion - }, - }; +Console.WriteLine("=== Getting Catalog ==="); +var catalog = await manager.GetCatalogAsync(); +var models = await catalog.ListModelsAsync(); +Console.WriteLine($"Found {models.Count} models"); - // model-3 cuda-gpu (optional), generic-gpu, generic-cpu - if (includeCuda) - { - list.Add(new ModelInfo - { - Id = "model-3-cuda-gpu:1", - Name = "model-3-cuda-gpu", - DisplayName = "model-3-cuda-gpu", - Uri = "azureml://registries/azureml/models/model-3-cuda-gpu/versions/1", - Runtime = new Runtime { DeviceType = DeviceType.GPU, ExecutionProvider = "CUDAExecutionProvider" }, - Alias = "model-3", - //ParentModelUri = "azureml://registries/azureml/models/model-3/versions/1", - ProviderType = common.ProviderType, - Version = common.Version, - ModelType = common.ModelType, - PromptTemplate = common.PromptTemplate, - Publisher = common.Publisher, - Task = common.Task, - FileSizeMb = common.FileSizeMb, - ModelSettings = common.ModelSettings, - SupportsToolCalling = common.SupportsToolCalling, - License = common.License, - LicenseDescription = common.LicenseDescription, - MaxOutputTokens = common.MaxOutputTokens, - MinFLVersion = common.MinFLVersion - }); - } - - list.AddRange(new[] - { - new ModelInfo - { - Id = "model-3-generic-gpu:1", - Name = "model-3-generic-gpu", - DisplayName = "model-3-generic-gpu", - Uri = "azureml://registries/azureml/models/model-3-generic-gpu/versions/1", - Runtime = new Runtime { DeviceType = DeviceType.GPU, ExecutionProvider = "WebGpuExecutionProvider" }, - Alias = "model-3", - //ParentModelUri = "azureml://registries/azureml/models/model-3/versions/1", - ProviderType = common.ProviderType, - Version = common.Version, ModelType = common.ModelType, - PromptTemplate = common.PromptTemplate, - Publisher = common.Publisher, Task = common.Task, - FileSizeMb = common.FileSizeMb, - ModelSettings = common.ModelSettings, - SupportsToolCalling = common.SupportsToolCalling, - License = common.License, - LicenseDescription = common.LicenseDescription, - MaxOutputTokens = common.MaxOutputTokens, - MinFLVersion = common.MinFLVersion - }, - new ModelInfo - { - Id = "model-3-generic-cpu:1", - Name = "model-3-generic-cpu", - DisplayName = "model-3-generic-cpu", - Uri = "azureml://registries/azureml/models/model-3-generic-cpu/versions/1", - Runtime = new Runtime { DeviceType = DeviceType.CPU, ExecutionProvider = "CPUExecutionProvider" }, - Alias = "model-3", - //ParentModelUri = "azureml://registries/azureml/models/model-3/versions/1", - ProviderType = common.ProviderType, - Version = common.Version, - ModelType = common.ModelType, - PromptTemplate = common.PromptTemplate, - Publisher = common.Publisher, Task = common.Task, - FileSizeMb = common.FileSizeMb, - ModelSettings = common.ModelSettings, - SupportsToolCalling = common.SupportsToolCalling, - License = common.License, - LicenseDescription = common.LicenseDescription, - MaxOutputTokens = common.MaxOutputTokens, - MinFLVersion = common.MinFLVersion - } - }); - - // model-4 generic-gpu (nullable prompt) - list.Add(new ModelInfo - { - Id = "model-4-generic-gpu:1", - Name = "model-4-generic-gpu", - DisplayName = "model-4-generic-gpu", - Uri = "azureml://registries/azureml/models/model-4-generic-gpu/versions/1", - Runtime = new Runtime { DeviceType = DeviceType.GPU, ExecutionProvider = "WebGpuExecutionProvider" }, - Alias = "model-4", - //ParentModelUri = "azureml://registries/azureml/models/model-4/versions/1", - ProviderType = common.ProviderType, - Version = common.Version, - ModelType = common.ModelType, - PromptTemplate = null, - Publisher = common.Publisher, - Task = common.Task, - FileSizeMb = common.FileSizeMb, - ModelSettings = common.ModelSettings, - SupportsToolCalling = common.SupportsToolCalling, - License = common.License, - LicenseDescription = common.LicenseDescription, - MaxOutputTokens = common.MaxOutputTokens, - MinFLVersion = common.MinFLVersion - }); - - return list; - } - - private static string GetSourceFilePath([CallerFilePath] string path = "") => path; - - // Gets the root directory of the foundry-local-sdk repository by finding the .git directory. - private static string GetRepoRoot() - { - var sourceFile = GetSourceFilePath(); - var dir = new DirectoryInfo(Path.GetDirectoryName(sourceFile)!); +// Find and load a whisper model +var model = await catalog.GetModelAsync("whisper-tiny"); +if (model == null) +{ + Console.WriteLine("whisper-tiny not found. Available models:"); + foreach (var m in models) + Console.WriteLine($" - {m.Alias}"); + return; +} - while (dir != null) - { - if (Directory.Exists(Path.Combine(dir.FullName, ".git"))) - return dir.FullName; +Console.WriteLine($"=== Downloading {model.Alias} ==="); +await model.DownloadAsync(p => Console.Write($"\r Progress: {p:F1}%")); +Console.WriteLine(); + +Console.WriteLine($"=== Loading {model.Alias} ==="); +await model.LoadAsync(); +Console.WriteLine("Model loaded."); + +Console.WriteLine("=== Creating streaming session ==="); +var audioClient = await model.GetAudioClientAsync(); +var streamingClient = audioClient.CreateStreamingSession(); +streamingClient.Settings.SampleRate = 16000; +streamingClient.Settings.Channels = 1; +streamingClient.Settings.BitsPerSample = 16; +streamingClient.Settings.Language = "en"; + +Console.WriteLine("=== Starting streaming session ==="); +await streamingClient.StartAsync(); +Console.WriteLine("Session started!"); + +// Push some fake PCM data (silence — 100ms at 16kHz 16-bit mono = 3200 bytes) +var fakePcm = new byte[3200]; +Console.WriteLine("=== Pushing audio chunks ==="); +for (int i = 0; i < 5; i++) +{ + await streamingClient.AppendAsync(fakePcm); + Console.WriteLine($" Pushed chunk {i + 1}"); +} - dir = dir.Parent; - } +Console.WriteLine("=== Stopping session ==="); +await streamingClient.StopAsync(); +Console.WriteLine("Session stopped."); - throw new InvalidOperationException("Could not find git repository root from test file location"); - } -} +Console.WriteLine("=== Unloading model ==="); +await model.UnloadAsync(); +Console.WriteLine("Done! All plumbing works end-to-end."); \ No newline at end of file