Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk_v2/cs/src/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("Microsoft.AI.Foundry.Local.Tests")]
[assembly: InternalsVisibleTo("AudioStreamTest")]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] // for Mock of ICoreInterop
115 changes: 115 additions & 0 deletions sdk_v2/cs/src/Detail/CoreInterop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,31 @@ private static unsafe partial void CoreExecuteCommandWithCallback(RequestBuffer*
nint callbackPtr, // NativeCallbackFn pointer
nint userData);

[LibraryImport(LibraryName, EntryPoint = "execute_command_with_binary")]
[UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })]
private static unsafe partial void CoreExecuteCommandWithBinary(StreamingRequestBuffer* nativeRequest,
ResponseBuffer* nativeResponse);

// --- Audio streaming P/Invoke imports (kept for future dedicated entry points) ---

[LibraryImport(LibraryName, EntryPoint = "audio_stream_start")]
[UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })]
private static unsafe partial void CoreAudioStreamStart(
RequestBuffer* request,
ResponseBuffer* response);

[LibraryImport(LibraryName, EntryPoint = "audio_stream_push")]
[UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })]
private static unsafe partial void CoreAudioStreamPush(
StreamingRequestBuffer* request,
ResponseBuffer* response);

[LibraryImport(LibraryName, EntryPoint = "audio_stream_stop")]
[UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })]
private static unsafe partial void CoreAudioStreamStop(
RequestBuffer* request,
ResponseBuffer* response);

// helper to capture exceptions in callbacks
internal class CallbackHelper
{
Expand Down Expand Up @@ -331,4 +356,94 @@ public Task<Response> ExecuteCommandWithCallbackAsync(string commandName, CoreIn
return Task.Run(() => ExecuteCommandWithCallback(commandName, commandInput, callback), ct);
}

/// <summary>
/// Marshal a ResponseBuffer from unmanaged memory into a managed Response and free the unmanaged memory.
/// </summary>
private Response MarshalResponse(ResponseBuffer response)
{
Response result = new();

if (response.Data != IntPtr.Zero && response.DataLength > 0)
{
byte[] managedResponse = new byte[response.DataLength];
Marshal.Copy(response.Data, managedResponse, 0, response.DataLength);
result.Data = System.Text.Encoding.UTF8.GetString(managedResponse);
}

if (response.Error != IntPtr.Zero && response.ErrorLength > 0)
{
result.Error = Marshal.PtrToStringUTF8(response.Error, response.ErrorLength)!;
}

Marshal.FreeHGlobal(response.Data);
Marshal.FreeHGlobal(response.Error);

return result;
}

// --- Audio streaming managed implementations ---
// Route through the existing execute_command / execute_command_with_binary entry points.
// The Core handles audio_stream_start / audio_stream_stop as command cases in ExecuteCommandManaged,
// and audio_stream_push as a command case in ExecuteCommandWithBinaryManaged.

public Response StartAudioStream(CoreInteropRequest request)
{
return ExecuteCommand("audio_stream_start", request);
}

public Response PushAudioData(CoreInteropRequest request, ReadOnlyMemory<byte> audioData)
{
try
{
var commandInputJson = request.ToJson();
byte[] commandBytes = System.Text.Encoding.UTF8.GetBytes("audio_stream_push");
byte[] inputBytes = System.Text.Encoding.UTF8.GetBytes(commandInputJson);

IntPtr commandPtr = Marshal.AllocHGlobal(commandBytes.Length);
Marshal.Copy(commandBytes, 0, commandPtr, commandBytes.Length);

IntPtr inputPtr = Marshal.AllocHGlobal(inputBytes.Length);
Marshal.Copy(inputBytes, 0, inputPtr, inputBytes.Length);

// Pin the managed audio data so GC won't move it during the native call
using var audioHandle = audioData.Pin();

unsafe
{
var reqBuf = new StreamingRequestBuffer
{
Command = commandPtr,
CommandLength = commandBytes.Length,
Data = inputPtr,
DataLength = inputBytes.Length,
BinaryData = (nint)audioHandle.Pointer,
BinaryDataLength = audioData.Length
};

ResponseBuffer response = default;

try
{
CoreExecuteCommandWithBinary(&reqBuf, &response);
}
finally
{
Marshal.FreeHGlobal(commandPtr);
Marshal.FreeHGlobal(inputPtr);
}

return MarshalResponse(response);
}
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
throw new FoundryLocalException("Error executing audio_stream_push", ex, _logger);
}
}

public Response StopAudioStream(CoreInteropRequest request)
{
return ExecuteCommand("audio_stream_stop", request);
}

}
17 changes: 17 additions & 0 deletions sdk_v2/cs/src/Detail/ICoreInterop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,21 @@ Task<Response> ExecuteCommandAsync(string commandName, CoreInteropRequest? comma
Task<Response> ExecuteCommandWithCallbackAsync(string commandName, CoreInteropRequest? commandInput,
CallbackFn callback,
CancellationToken? ct = null);

// --- Audio streaming session support ---

[StructLayout(LayoutKind.Sequential)]
protected unsafe struct StreamingRequestBuffer
{
public nint Command;
public int CommandLength;
public nint Data; // JSON params
public int DataLength;
public nint BinaryData; // raw PCM audio bytes
public int BinaryDataLength;
}

Response StartAudioStream(CoreInteropRequest request);
Response PushAudioData(CoreInteropRequest request, ReadOnlyMemory<byte> audioData);
Response StopAudioStream(CoreInteropRequest request);
}
3 changes: 3 additions & 0 deletions sdk_v2/cs/src/Detail/JsonSerializationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ namespace Microsoft.AI.Foundry.Local.Detail;
[JsonSerializable(typeof(IList<FunctionDefinition>))]
[JsonSerializable(typeof(PropertyDefinition))]
[JsonSerializable(typeof(IList<PropertyDefinition>))]
// --- NEW: Audio streaming types ---
[JsonSerializable(typeof(AudioStreamTranscriptionResult))]
[JsonSerializable(typeof(CoreErrorResponse))]
[JsonSourceGenerationOptions(DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
WriteIndented = false)]
internal partial class JsonSerializationContext : JsonSerializerContext
Expand Down
124 changes: 10 additions & 114 deletions sdk_v2/cs/src/OpenAI/AudioClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@

namespace Microsoft.AI.Foundry.Local;

using System.Runtime.CompilerServices;
using System.Threading.Channels;

using Betalgo.Ranul.OpenAI.ObjectModels.RequestModels;
using Betalgo.Ranul.OpenAI.ObjectModels.ResponseModels;

Expand Down Expand Up @@ -46,6 +43,16 @@ public record AudioSettings
/// </summary>
public AudioSettings Settings { get; } = new();

/// <summary>
/// Create a real-time streaming transcription session.
/// Audio data is pushed in as PCM chunks and transcription results are returned as an async stream.
/// </summary>
/// <returns>A streaming session that must be disposed when done.</returns>
public AudioTranscriptionStreamSession CreateStreamingSession()
{
return new AudioTranscriptionStreamSession(_modelId);
}

/// <summary>
/// Transcribe audio from a file.
/// </summary>
Expand All @@ -63,28 +70,6 @@ public async Task<AudioCreateTranscriptionResponse> TranscribeAudioAsync(string
.ConfigureAwait(false);
}

/// <summary>
/// Transcribe audio from a file with streamed output.
/// </summary>
/// <param name="audioFilePath">
/// Path to file containing audio recording.
/// Supported formats: mp3
/// </param>
/// <param name="ct">Cancellation token.</param>
/// <returns>An asynchronous enumerable of transcription responses.</returns>
public async IAsyncEnumerable<AudioCreateTranscriptionResponse> TranscribeAudioStreamingAsync(
string audioFilePath, [EnumeratorCancellation] CancellationToken ct)
{
var enumerable = Utils.CallWithExceptionHandling(
() => TranscribeAudioStreamingImplAsync(audioFilePath, ct),
"Error during streaming audio transcription.", _logger).ConfigureAwait(false);

await foreach (var item in enumerable)
{
yield return item;
}
}

private async Task<AudioCreateTranscriptionResponse> TranscribeAudioImplAsync(string audioFilePath,
CancellationToken? ct)
{
Expand All @@ -107,93 +92,4 @@ private async Task<AudioCreateTranscriptionResponse> TranscribeAudioImplAsync(st

return output;
}

private async IAsyncEnumerable<AudioCreateTranscriptionResponse> TranscribeAudioStreamingImplAsync(
string audioFilePath, [EnumeratorCancellation] CancellationToken ct)
{
var openaiRequest = AudioTranscriptionCreateRequestExtended.FromUserInput(_modelId, audioFilePath, Settings);

var request = new CoreInteropRequest
{
Params = new Dictionary<string, string>
{
{ "OpenAICreateRequest", openaiRequest.ToJson() },
}
};

var channel = Channel.CreateUnbounded<AudioCreateTranscriptionResponse>(
new UnboundedChannelOptions
{
SingleWriter = true,
SingleReader = true,
AllowSynchronousContinuations = true
});

// The callback will push ChatResponse objects into the channel.
// The channel reader will return the values to the user.
// This setup prevents the user from blocking the thread generating the responses.
_ = Task.Run(async () =>
{
try
{
var failed = false;

var res = await _coreInterop.ExecuteCommandWithCallbackAsync(
"audio_transcribe",
request,
async (callbackData) =>
{
try
{
if (!failed)
{
var audioCompletion = callbackData.ToAudioTranscription(_logger);
await channel.Writer.WriteAsync(audioCompletion);
}
}
catch (Exception ex)
{
// propagate exception to reader
channel.Writer.TryComplete(
new FoundryLocalException(
"Error processing streaming audio transcription callback data.", ex, _logger));
failed = true;
}
},
ct
).ConfigureAwait(false);

// If the native layer returned an error (e.g. missing audio file, invalid model)
// without invoking any callbacks, propagate it so the caller sees an exception
// instead of an empty stream.
if (res.Error != null)
{
channel.Writer.TryComplete(
new FoundryLocalException($"Error from audio_transcribe command: {res.Error}", _logger));
return;
}

// use TryComplete as an exception in the callback may have already closed the channel
_ = channel.Writer.TryComplete();
}
// Ignore cancellation exceptions so we don't convert them into errors
catch (Exception ex) when (ex is not OperationCanceledException)
{
channel.Writer.TryComplete(
new FoundryLocalException("Error executing streaming chat completion.", ex, _logger));
}
catch (OperationCanceledException)
{
// Complete the channel on cancellation but don't turn it into an error
channel.Writer.TryComplete();
}
}, ct);

// Start reading from the channel as items arrive.
// This will continue until ExecuteCommandWithCallbackAsync completes and closes the channel.
await foreach (var item in channel.Reader.ReadAllAsync(ct))
{
yield return item;
}
}
}
74 changes: 74 additions & 0 deletions sdk_v2/cs/src/OpenAI/AudioStreamTranscriptionTypes.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
namespace Microsoft.AI.Foundry.Local;

using System.Text.Json;
using System.Text.Json.Serialization;
using Microsoft.AI.Foundry.Local.Detail;

public record AudioStreamTranscriptionResult
{
/// <summary>
/// Whether this is a final or partial (interim) result.
/// - Nemotron models always return <c>true</c> (every result is final).
/// - Other models (e.g., Azure Embedded) may return <c>false</c> for interim
/// hypotheses that will be replaced by a subsequent final result.
/// </summary>
[JsonPropertyName("is_final")]
public bool IsFinal { get; init; }

/// <summary>
/// Newly transcribed text from this audio chunk only (incremental hypothesis).
/// This is NOT the full accumulated transcript — each result contains only
/// the text decoded from the most recent audio chunk.
/// </summary>
[JsonPropertyName("text")]
public string Text { get; init; } = string.Empty;

/// <summary>Start time offset of this segment in the audio stream (seconds).</summary>
[JsonPropertyName("start_time")]
public double? StartTime { get; init; }

/// <summary>End time offset of this segment in the audio stream (seconds).</summary>
[JsonPropertyName("end_time")]
public double? EndTime { get; init; }

/// <summary>Confidence score (0.0 - 1.0) if available.</summary>
[JsonPropertyName("confidence")]
public float? Confidence { get; init; }

internal static AudioStreamTranscriptionResult FromJson(string json)
{
return JsonSerializer.Deserialize(json,
JsonSerializationContext.Default.AudioStreamTranscriptionResult)
?? throw new FoundryLocalException("Failed to deserialize AudioStreamTranscriptionResult");
}
}

internal record CoreErrorResponse
{
[JsonPropertyName("code")]
public string Code { get; init; } = "";

[JsonPropertyName("message")]
public string Message { get; init; } = "";

[JsonPropertyName("isTransient")]
public bool IsTransient { get; init; }

/// <summary>
/// Attempt to parse a native error string as structured JSON.
/// Returns null if the error is not valid JSON or doesn't match the schema,
/// which should be treated as a permanent/unknown error.
/// </summary>
internal static CoreErrorResponse? TryParse(string errorString)
{
try
{
return JsonSerializer.Deserialize(errorString,
JsonSerializationContext.Default.CoreErrorResponse);
}
catch
{
return null; // unstructured error — treat as permanent
}
}
}
Loading
Loading