From 768aa0b7a121bb2aea268ad2d8a62911c32ad522 Mon Sep 17 00:00:00 2001 From: Estefania Tenorio <8483207+esttenorio@users.noreply.github.com> Date: Wed, 5 Mar 2025 15:06:58 -0800 Subject: [PATCH 01/10] hooking up uninitialization for localruntime --- .../Process.Abstractions/KernelProcessStep.cs | 9 +++ .../KernelProcessStepContext.cs | 15 ++++ .../Process.Abstractions/KernelProxyStep.cs | 10 +++ .../Process.LocalRuntime/LocalProcess.cs | 4 + .../Process.LocalRuntime/LocalStep.cs | 40 +++++++++- .../Runtime.Local/LocalProxyTests.cs | 73 ++++++++++++------- 6 files changed, 122 insertions(+), 29 deletions(-) diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProcessStep.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStep.cs index c3162340bb35..861f6bbde6bf 100644 --- a/dotnet/src/Experimental/Process.Abstractions/KernelProcessStep.cs +++ b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStep.cs @@ -14,6 +14,15 @@ public virtual ValueTask ActivateAsync(KernelProcessStepState state) { return default; } + + /// + /// Function triggered when deinitializing the KernelProcessStep + /// + /// + public virtual ValueTask DeactivateAsync(KernelProcessStepContext context) + { + return default; + } } /// diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepContext.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepContext.cs index a3a4f614ee27..e3990ddac77f 100644 --- a/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepContext.cs +++ b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepContext.cs @@ -72,4 +72,19 @@ public async Task EmitExternalEventAsync(KernelProcessProxyMessage processEventD await this._externalMessageChannel.EmitExternalEventAsync(processEventData.ExternalTopicName, processEventData).ConfigureAwait(false); } + + /// + /// Closes connection with external messaging channel + /// + /// + /// + public async Task CloseExternalEventChannelAsync() + { + if (this._externalMessageChannel == null) + { + throw new KernelException("External message channel not configured for step"); + } + + await this._externalMessageChannel.Uninitialize().ConfigureAwait(false); + } } diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProxyStep.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProxyStep.cs index 0d9d161fdf8e..c91a6b918153 100644 --- a/dotnet/src/Experimental/Process.Abstractions/KernelProxyStep.cs +++ b/dotnet/src/Experimental/Process.Abstractions/KernelProxyStep.cs @@ -20,6 +20,16 @@ public static class Functions public const string EmitExternalEvent = nameof(EmitExternalEvent); } + /// + /// On deactivation, external communication channel must be closed + /// + /// instance of + /// + public override async ValueTask DeactivateAsync(KernelProcessStepContext context) + { + await context.CloseExternalEventChannelAsync().ConfigureAwait(false); + } + /// /// Step function used to emit events externally /// diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs index 583bfcf4565a..36fadcde8059 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs @@ -409,5 +409,9 @@ public void Dispose() this._joinableTaskContext.Dispose(); this._joinableTaskContext.Dispose(); this._processCancelSource?.Dispose(); + foreach (var step in this._steps) + { + step.Dispose(); + } } } diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs index 53a7a4f69a3b..3028d144d20f 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs @@ -15,7 +15,7 @@ namespace Microsoft.SemanticKernel; /// /// Represents a step in a process that is running in-process. /// -internal class LocalStep : IKernelProcessMessageChannel +internal class LocalStep : IKernelProcessMessageChannel, IDisposable { private readonly Queue _outgoingEventQueue = new(); private readonly Lazy _initializeTask; @@ -61,6 +61,11 @@ public LocalStep(KernelProcessStepInfo stepInfo, Kernel kernel, string? parentPr this._eventNamespace = $"{this._stepInfo.State.Name}_{this._stepInfo.State.Id}"; } + ~LocalStep() + { + this.Dispose(); + } + /// /// The Id of the parent process if one exists. /// @@ -283,6 +288,25 @@ protected virtual async ValueTask InitializeStepAsync() await activateTask.ConfigureAwait(false); } + /// + /// Deinitializes the step + /// + protected virtual async ValueTask DeinitializeStepAsync() + { + // Instantiate an instance of the inner step object + KernelProcessStep stepInstance = (KernelProcessStep)ActivatorUtilities.CreateInstance(this._kernel.Services, this._stepInfo.InnerStepType); + + MethodInfo methodInfo = + this._stepInfo.InnerStepType.GetMethod(nameof(KernelProcessStep.DeactivateAsync)) ?? + throw new KernelException("The DeactivateAsync method for the KernelProcessStep could not be found.").Log(this._logger); + var context = new KernelProcessStepContext(this, this.ExternalMessageChannel); + ValueTask deactivateTask = + (ValueTask?)methodInfo.Invoke(stepInstance, [context]) ?? + throw new KernelException("The DeactivateAsync method failed to complete.").Log(this._logger); + + await deactivateTask.ConfigureAwait(false); + } + /// /// Invokes the provides function with the provided kernel and arguments. /// @@ -329,4 +353,18 @@ protected ProcessEvent ScopedEvent(ProcessEvent localEvent) Verify.NotNull(localEvent, nameof(localEvent)); return localEvent with { Namespace = $"{this.Name}_{this.Id}" }; } + + /// + /// Dispose of step resources + /// + public void Dispose() + { + if (this._initializeTask.IsValueCreated) + { + // Ensure initialization is complete + this._initializeTask.Value.AsTask().Wait(); + } + + this.DeinitializeStepAsync().AsTask().Wait(); + } } diff --git a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProxyTests.cs b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProxyTests.cs index d08f9de9d3e6..6989a4414a2c 100644 --- a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProxyTests.cs +++ b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProxyTests.cs @@ -37,24 +37,32 @@ public async Task ProcessWithProxyWithSingleTopicCalledTwiceAsync() Kernel kernel = new(); // Act - LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, null, this._startProcessEvent, externalMessageChannel: mockProxyClient); - - // Assert - Assert.NotNull(mockProxyClient); - Assert.Equal(1, mockProxyClient.InitializationCounter); - Assert.Single(mockProxyClient.CloudEvents); - Assert.Equal(this._topic1, mockProxyClient.CloudEvents[0].TopicName); - Assert.Equal("1", mockProxyClient.CloudEvents[0].Data?.EventData); - - // Act - await processContext.SendEventAsync(new() { Id = this._startProcessEvent, Data = null }); - - // Assert - Assert.NotNull(mockProxyClient); - Assert.Equal(1, mockProxyClient.InitializationCounter); - Assert.Equal(2, mockProxyClient.CloudEvents.Count); - Assert.Equal(this._topic1, mockProxyClient.CloudEvents[1].TopicName); - Assert.Equal("2", mockProxyClient.CloudEvents[1].Data?.EventData); + using (LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, null, this._startProcessEvent, externalMessageChannel: mockProxyClient)) + { + // Assert + var runningProcessId = await processContext.GetProcessIdAsync(); + + Assert.NotNull(mockProxyClient); + Assert.Equal(1, mockProxyClient.InitializationCounter); + Assert.Equal(0, mockProxyClient.UninitializationCounter); + Assert.Single(mockProxyClient.CloudEvents); + Assert.Equal(this._topic1, mockProxyClient.CloudEvents[0].TopicName); + Assert.Equal(runningProcessId, mockProxyClient.CloudEvents[0].Data?.ProcessId); + Assert.Equal("1", mockProxyClient.CloudEvents[0].Data?.EventData); + + // Act + await processContext.SendEventAsync(new() { Id = this._startProcessEvent, Data = null }); + + // Assert + Assert.NotNull(mockProxyClient); + Assert.Equal(1, mockProxyClient.InitializationCounter); + Assert.Equal(0, mockProxyClient.UninitializationCounter); + Assert.Equal(2, mockProxyClient.CloudEvents.Count); + Assert.Equal(this._topic1, mockProxyClient.CloudEvents[1].TopicName); + Assert.Equal(runningProcessId, mockProxyClient.CloudEvents[0].Data?.ProcessId); + Assert.Equal("2", mockProxyClient.CloudEvents[1].Data?.EventData); + } + Assert.Equal(1, mockProxyClient.UninitializationCounter); } /// @@ -114,18 +122,27 @@ public async Task ProcessWithCyclesAndProxyWithTwoTopicsAsync() Kernel kernel = new(); // Act - LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, null, this._startProcessEvent, externalMessageChannel: mockProxyClient); + using (LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, null, this._startProcessEvent, externalMessageChannel: mockProxyClient)) + { + // Assert + var runningProcessId = await processContext.GetProcessIdAsync(); + + + Assert.NotNull(mockProxyClient); + Assert.True(0 < mockProxyClient.InitializationCounter); + Assert.Equal(0, mockProxyClient.UninitializationCounter); + Assert.Equal(3, mockProxyClient.CloudEvents.Count); + Assert.Equal(this._topic1, mockProxyClient.CloudEvents[0].TopicName); + Assert.Equal(runningProcessId, mockProxyClient.CloudEvents[0].Data?.ProcessId); + Assert.Equal("1", mockProxyClient.CloudEvents[0].Data?.EventData); + Assert.Equal(this._topic1, mockProxyClient.CloudEvents[1].TopicName); + Assert.Equal("2", mockProxyClient.CloudEvents[1].Data?.EventData); + Assert.Equal(this._topic2, mockProxyClient.CloudEvents[2].TopicName); + Assert.Equal("2", mockProxyClient.CloudEvents[2].Data?.EventData); + } // Assert - Assert.NotNull(mockProxyClient); - Assert.True(0 < mockProxyClient.InitializationCounter); - Assert.Equal(3, mockProxyClient.CloudEvents.Count); - Assert.Equal(this._topic1, mockProxyClient.CloudEvents[0].TopicName); - Assert.Equal("1", mockProxyClient.CloudEvents[0].Data?.EventData); - Assert.Equal(this._topic1, mockProxyClient.CloudEvents[1].TopicName); - Assert.Equal("2", mockProxyClient.CloudEvents[1].Data?.EventData); - Assert.Equal(this._topic2, mockProxyClient.CloudEvents[2].TopicName); - Assert.Equal("2", mockProxyClient.CloudEvents[2].Data?.EventData); + Assert.Equal(1, mockProxyClient.UninitializationCounter); } private async Task RunProcessAsync(Kernel kernel, KernelProcess process, object? input, string inputEvent, IExternalKernelProcessMessageChannel? externalMessageChannel) From 988eaf8c9ad244d83b985738029ddb6fa91165a3 Mon Sep 17 00:00:00 2001 From: Estefania Tenorio <8483207+esttenorio@users.noreply.github.com> Date: Thu, 6 Mar 2025 09:51:53 -0800 Subject: [PATCH 02/10] fixing build errors --- .../Process.LocalRuntime/LocalProcess.cs | 7 +-- .../Process.LocalRuntime/LocalStep.cs | 46 +++++++++++++------ 2 files changed, 32 insertions(+), 21 deletions(-) diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs index 36fadcde8059..c4edb5e6dd65 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs @@ -18,8 +18,6 @@ namespace Microsoft.SemanticKernel; internal sealed class LocalProcess : LocalStep, IDisposable { - private readonly JoinableTaskFactory _joinableTaskFactory; - private readonly JoinableTaskContext _joinableTaskContext; private readonly Channel _externalEventChannel; private readonly Lazy _initializeTask; @@ -46,8 +44,6 @@ internal LocalProcess(KernelProcess process, Kernel kernel) this._process = process; this._initializeTask = new Lazy(this.InitializeProcessAsync); this._externalEventChannel = Channel.CreateUnbounded(); - this._joinableTaskContext = new JoinableTaskContext(); - this._joinableTaskFactory = new JoinableTaskFactory(this._joinableTaskContext); this._logger = this._kernel.LoggerFactory?.CreateLogger(this.Name) ?? new NullLogger(); } @@ -403,11 +399,10 @@ internal override async Task ToKernelProcessStepInfoAsync #endregion - public void Dispose() + public override void Dispose() { this._externalEventChannel.Writer.Complete(); this._joinableTaskContext.Dispose(); - this._joinableTaskContext.Dispose(); this._processCancelSource?.Dispose(); foreach (var step in this._steps) { diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs index 3028d144d20f..ca2efdbcd209 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs @@ -9,6 +9,7 @@ using Microsoft.Extensions.Logging.Abstractions; using Microsoft.SemanticKernel.Process.Internal; using Microsoft.SemanticKernel.Process.Runtime; +using Microsoft.VisualStudio.Threading; namespace Microsoft.SemanticKernel; @@ -22,6 +23,9 @@ internal class LocalStep : IKernelProcessMessageChannel, IDisposable private readonly KernelProcessStepInfo _stepInfo; private readonly ILogger _logger; + private bool _disposed = false; + private KernelProcessStep? _stepInstance = null; + protected readonly Kernel _kernel; protected readonly Dictionary _functions = []; @@ -30,6 +34,9 @@ internal class LocalStep : IKernelProcessMessageChannel, IDisposable protected Dictionary?>? _initialInputs = []; protected Dictionary> _outputEdges; + internal readonly JoinableTaskFactory _joinableTaskFactory; + internal readonly JoinableTaskContext _joinableTaskContext; + internal readonly string _eventNamespace; /// @@ -59,6 +66,8 @@ public LocalStep(KernelProcessStepInfo stepInfo, Kernel kernel, string? parentPr this._logger = this._kernel.LoggerFactory?.CreateLogger(this._stepInfo.InnerStepType) ?? new NullLogger(); this._outputEdges = this._stepInfo.Edges.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.ToList()); this._eventNamespace = $"{this._stepInfo.State.Name}_{this._stepInfo.State.Id}"; + this._joinableTaskContext = new JoinableTaskContext(); + this._joinableTaskFactory = new JoinableTaskFactory(this._joinableTaskContext); } ~LocalStep() @@ -251,8 +260,8 @@ internal virtual async Task HandleMessageAsync(ProcessMessage message) protected virtual async ValueTask InitializeStepAsync() { // Instantiate an instance of the inner step object - KernelProcessStep stepInstance = (KernelProcessStep)ActivatorUtilities.CreateInstance(this._kernel.Services, this._stepInfo.InnerStepType); - var kernelPlugin = KernelPluginFactory.CreateFromObject(stepInstance, pluginName: this._stepInfo.State.Name); + this._stepInstance = (KernelProcessStep)ActivatorUtilities.CreateInstance(this._kernel.Services, this._stepInfo.InnerStepType); + var kernelPlugin = KernelPluginFactory.CreateFromObject(this._stepInstance, pluginName: this._stepInfo.State.Name); // Load the kernel functions foreach (KernelFunction f in kernelPlugin) @@ -281,10 +290,10 @@ protected virtual async ValueTask InitializeStepAsync() this._stepState = stateObject; ValueTask activateTask = - (ValueTask?)methodInfo.Invoke(stepInstance, [stateObject]) ?? + (ValueTask?)methodInfo.Invoke(this._stepInstance, [stateObject]) ?? throw new KernelException("The ActivateAsync method failed to complete.").Log(this._logger); - await stepInstance.ActivateAsync(stateObject).ConfigureAwait(false); + await this._stepInstance.ActivateAsync(stateObject).ConfigureAwait(false); await activateTask.ConfigureAwait(false); } @@ -293,15 +302,12 @@ protected virtual async ValueTask InitializeStepAsync() /// protected virtual async ValueTask DeinitializeStepAsync() { - // Instantiate an instance of the inner step object - KernelProcessStep stepInstance = (KernelProcessStep)ActivatorUtilities.CreateInstance(this._kernel.Services, this._stepInfo.InnerStepType); - MethodInfo methodInfo = - this._stepInfo.InnerStepType.GetMethod(nameof(KernelProcessStep.DeactivateAsync)) ?? - throw new KernelException("The DeactivateAsync method for the KernelProcessStep could not be found.").Log(this._logger); + this._stepInfo.InnerStepType.GetMethod(nameof(KernelProcessStep.DeactivateAsync), BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance | BindingFlags.Static) ?? + throw new KernelException($"The DeactivateAsync method for the KernelProcessStep could not be found for {this.Name}").Log(this._logger); var context = new KernelProcessStepContext(this, this.ExternalMessageChannel); ValueTask deactivateTask = - (ValueTask?)methodInfo.Invoke(stepInstance, [context]) ?? + (ValueTask?)methodInfo.Invoke(this._stepInstance, [context]) ?? throw new KernelException("The DeactivateAsync method failed to complete.").Log(this._logger); await deactivateTask.ConfigureAwait(false); @@ -357,14 +363,24 @@ protected ProcessEvent ScopedEvent(ProcessEvent localEvent) /// /// Dispose of step resources /// - public void Dispose() + public virtual void Dispose() + { + this.Dispose(true); + GC.SuppressFinalize(this); + } + + private void Dispose(bool disposing) { - if (this._initializeTask.IsValueCreated) + if (this._disposed) { - // Ensure initialization is complete - this._initializeTask.Value.AsTask().Wait(); + return; } - this.DeinitializeStepAsync().AsTask().Wait(); + if (disposing) + { + this._joinableTaskFactory.Run(() => this.DeinitializeStepAsync().AsTask()); + } + this._joinableTaskContext.Dispose(); + this._disposed = true; } } From fe759b83cd256dfb36a0205cf1999812d672cc0d Mon Sep 17 00:00:00 2001 From: Estefania Tenorio <8483207+esttenorio@users.noreply.github.com> Date: Thu, 6 Mar 2025 10:00:26 -0800 Subject: [PATCH 03/10] fixing build errors --- dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs | 2 ++ .../Process.UnitTests/Runtime.Local/LocalProxyTests.cs | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs index c4edb5e6dd65..bce201d22c73 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs @@ -399,6 +399,7 @@ internal override async Task ToKernelProcessStepInfoAsync #endregion +#pragma warning disable CA2215 public override void Dispose() { this._externalEventChannel.Writer.Complete(); @@ -409,4 +410,5 @@ public override void Dispose() step.Dispose(); } } +#pragma warning restore CA2215 } diff --git a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProxyTests.cs b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProxyTests.cs index 6989a4414a2c..77e9a7c018d2 100644 --- a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProxyTests.cs +++ b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProxyTests.cs @@ -127,7 +127,6 @@ public async Task ProcessWithCyclesAndProxyWithTwoTopicsAsync() // Assert var runningProcessId = await processContext.GetProcessIdAsync(); - Assert.NotNull(mockProxyClient); Assert.True(0 < mockProxyClient.InitializationCounter); Assert.Equal(0, mockProxyClient.UninitializationCounter); From bcfbc0d447eedf42334a7c8409e329aa1333eb76 Mon Sep 17 00:00:00 2001 From: Estefania Tenorio <8483207+esttenorio@users.noreply.github.com> Date: Thu, 6 Mar 2025 10:17:09 -0800 Subject: [PATCH 04/10] fixing unittest --- .../Process.LocalRuntime/LocalStep.cs | 15 +++++++++------ .../Runtime.Local/LocalProcessTests.cs | 17 +++++++++-------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs index ca2efdbcd209..91ed4504fbd0 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs @@ -303,14 +303,17 @@ protected virtual async ValueTask InitializeStepAsync() protected virtual async ValueTask DeinitializeStepAsync() { MethodInfo methodInfo = - this._stepInfo.InnerStepType.GetMethod(nameof(KernelProcessStep.DeactivateAsync), BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance | BindingFlags.Static) ?? + this._stepInfo.InnerStepType.GetMethod(nameof(KernelProcessStep.DeactivateAsync), BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance) ?? throw new KernelException($"The DeactivateAsync method for the KernelProcessStep could not be found for {this.Name}").Log(this._logger); - var context = new KernelProcessStepContext(this, this.ExternalMessageChannel); - ValueTask deactivateTask = - (ValueTask?)methodInfo.Invoke(this._stepInstance, [context]) ?? - throw new KernelException("The DeactivateAsync method failed to complete.").Log(this._logger); + if (this._stepInstance != null) + { + var context = new KernelProcessStepContext(this, this.ExternalMessageChannel); + ValueTask deactivateTask = + (ValueTask?)methodInfo.Invoke(this._stepInstance, [context]) ?? + throw new KernelException("The DeactivateAsync method failed to complete.").Log(this._logger); - await deactivateTask.ConfigureAwait(false); + await deactivateTask.ConfigureAwait(false); + } } /// diff --git a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs index d90478f450c3..6d298f181b55 100644 --- a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs +++ b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs @@ -26,15 +26,16 @@ public async Task ExecuteAsyncInitializesCorrectlyAsync() ], []); var mockKernel = new Kernel(); - using var localProcess = new LocalProcess(mockKernelProcess, mockKernel); - - // Act - await localProcess.StartAsync(); + using (var localProcess = new LocalProcess(mockKernelProcess, mockKernel)) + { + // Act + await localProcess.StartAsync(); - // Assert - Assert.Equal(2, localProcess._steps.Count); - Assert.Contains(localProcess._steps, s => s.Name == "Step1"); - Assert.Contains(localProcess._steps, s => s.Name == "Step2"); + // Assert + Assert.Equal(2, localProcess._steps.Count); + Assert.Contains(localProcess._steps, s => s.Name == "Step1"); + Assert.Contains(localProcess._steps, s => s.Name == "Step2"); + } } /// From d7059bf17ab52646008a370fcac3b73ebc12ade8 Mon Sep 17 00:00:00 2001 From: Estefania Tenorio <8483207+esttenorio@users.noreply.github.com> Date: Fri, 7 Mar 2025 10:09:12 -0800 Subject: [PATCH 05/10] addressing comments suggestion --- .../Process.LocalRuntime/LocalProcess.cs | 18 +++++-- .../Process.LocalRuntime/LocalStep.cs | 53 +++---------------- 2 files changed, 21 insertions(+), 50 deletions(-) diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs index bce201d22c73..3f28c89f35c6 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs @@ -18,6 +18,8 @@ namespace Microsoft.SemanticKernel; internal sealed class LocalProcess : LocalStep, IDisposable { + private readonly JoinableTaskFactory _joinableTaskFactory; + private readonly JoinableTaskContext _joinableTaskContext; private readonly Channel _externalEventChannel; private readonly Lazy _initializeTask; @@ -44,6 +46,8 @@ internal LocalProcess(KernelProcess process, Kernel kernel) this._process = process; this._initializeTask = new Lazy(this.InitializeProcessAsync); this._externalEventChannel = Channel.CreateUnbounded(); + this._joinableTaskContext = new JoinableTaskContext(); + this._joinableTaskFactory = new JoinableTaskFactory(this._joinableTaskContext); this._logger = this._kernel.LoggerFactory?.CreateLogger(this.Name) ?? new NullLogger(); } @@ -399,16 +403,20 @@ internal override async Task ToKernelProcessStepInfoAsync #endregion -#pragma warning disable CA2215 - public override void Dispose() + /// + internal override async ValueTask DeinitializeStepAsync() + { + await Task.Run(() => this.Dispose()).ConfigureAwait(false); + } + + public void Dispose() { this._externalEventChannel.Writer.Complete(); this._joinableTaskContext.Dispose(); - this._processCancelSource?.Dispose(); foreach (var step in this._steps) { - step.Dispose(); + this._joinableTaskFactory.Run(() => step.DeinitializeStepAsync().AsTask()); } + this._processCancelSource?.Dispose(); } -#pragma warning restore CA2215 } diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs index 91ed4504fbd0..63125c06ded6 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs @@ -9,21 +9,19 @@ using Microsoft.Extensions.Logging.Abstractions; using Microsoft.SemanticKernel.Process.Internal; using Microsoft.SemanticKernel.Process.Runtime; -using Microsoft.VisualStudio.Threading; namespace Microsoft.SemanticKernel; /// /// Represents a step in a process that is running in-process. /// -internal class LocalStep : IKernelProcessMessageChannel, IDisposable +internal class LocalStep : IKernelProcessMessageChannel { private readonly Queue _outgoingEventQueue = new(); private readonly Lazy _initializeTask; private readonly KernelProcessStepInfo _stepInfo; private readonly ILogger _logger; - private bool _disposed = false; private KernelProcessStep? _stepInstance = null; protected readonly Kernel _kernel; @@ -34,9 +32,6 @@ internal class LocalStep : IKernelProcessMessageChannel, IDisposable protected Dictionary?>? _initialInputs = []; protected Dictionary> _outputEdges; - internal readonly JoinableTaskFactory _joinableTaskFactory; - internal readonly JoinableTaskContext _joinableTaskContext; - internal readonly string _eventNamespace; /// @@ -66,13 +61,6 @@ public LocalStep(KernelProcessStepInfo stepInfo, Kernel kernel, string? parentPr this._logger = this._kernel.LoggerFactory?.CreateLogger(this._stepInfo.InnerStepType) ?? new NullLogger(); this._outputEdges = this._stepInfo.Edges.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.ToList()); this._eventNamespace = $"{this._stepInfo.State.Name}_{this._stepInfo.State.Id}"; - this._joinableTaskContext = new JoinableTaskContext(); - this._joinableTaskFactory = new JoinableTaskFactory(this._joinableTaskContext); - } - - ~LocalStep() - { - this.Dispose(); } /// @@ -300,17 +288,16 @@ protected virtual async ValueTask InitializeStepAsync() /// /// Deinitializes the step /// - protected virtual async ValueTask DeinitializeStepAsync() + internal virtual async ValueTask DeinitializeStepAsync() { - MethodInfo methodInfo = - this._stepInfo.InnerStepType.GetMethod(nameof(KernelProcessStep.DeactivateAsync), BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance) ?? - throw new KernelException($"The DeactivateAsync method for the KernelProcessStep could not be found for {this.Name}").Log(this._logger); - if (this._stepInstance != null) + MethodInfo? derivedMethod = this._stepInfo.InnerStepType.GetMethod(nameof(KernelProcessStep.DeactivateAsync), BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance); + + var context = new KernelProcessStepContext(this, this.ExternalMessageChannel); + if (derivedMethod != null && this._stepInstance != null) { - var context = new KernelProcessStepContext(this, this.ExternalMessageChannel); ValueTask deactivateTask = - (ValueTask?)methodInfo.Invoke(this._stepInstance, [context]) ?? - throw new KernelException("The DeactivateAsync method failed to complete.").Log(this._logger); + (ValueTask?)derivedMethod.Invoke(this._stepInstance, [context]) ?? + throw new KernelException($"The derived DeactivateAsync method failed to complete for step {this.Name}.").Log(this._logger); await deactivateTask.ConfigureAwait(false); } @@ -362,28 +349,4 @@ protected ProcessEvent ScopedEvent(ProcessEvent localEvent) Verify.NotNull(localEvent, nameof(localEvent)); return localEvent with { Namespace = $"{this.Name}_{this.Id}" }; } - - /// - /// Dispose of step resources - /// - public virtual void Dispose() - { - this.Dispose(true); - GC.SuppressFinalize(this); - } - - private void Dispose(bool disposing) - { - if (this._disposed) - { - return; - } - - if (disposing) - { - this._joinableTaskFactory.Run(() => this.DeinitializeStepAsync().AsTask()); - } - this._joinableTaskContext.Dispose(); - this._disposed = true; - } } From 56a12ce19e4439541abcd819f24c3abab5765858 Mon Sep 17 00:00:00 2001 From: Estefania Tenorio <8483207+esttenorio@users.noreply.github.com> Date: Fri, 7 Mar 2025 10:29:25 -0800 Subject: [PATCH 06/10] removing unnecessary changes --- .../Runtime.Local/LocalProcessTests.cs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs index 6d298f181b55..61823233abd6 100644 --- a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs +++ b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs @@ -26,16 +26,14 @@ public async Task ExecuteAsyncInitializesCorrectlyAsync() ], []); var mockKernel = new Kernel(); - using (var localProcess = new LocalProcess(mockKernelProcess, mockKernel)) - { - // Act - await localProcess.StartAsync(); + using var localProcess = new LocalProcess(mockKernelProcess, mockKernel); + // Act + await localProcess.StartAsync(); - // Assert - Assert.Equal(2, localProcess._steps.Count); - Assert.Contains(localProcess._steps, s => s.Name == "Step1"); - Assert.Contains(localProcess._steps, s => s.Name == "Step2"); - } + // Assert + Assert.Equal(2, localProcess._steps.Count); + Assert.Contains(localProcess._steps, s => s.Name == "Step1"); + Assert.Contains(localProcess._steps, s => s.Name == "Step2"); } /// From 3d1151b8b6c185168a66b9693250f610a43daa46 Mon Sep 17 00:00:00 2001 From: Estefania Tenorio <8483207+esttenorio@users.noreply.github.com> Date: Fri, 7 Mar 2025 13:34:29 -0800 Subject: [PATCH 07/10] addressing comments --- .../Process.Abstractions/KernelProcessStep.cs | 2 +- .../KernelProcessStepContext.cs | 37 +------------ .../KernelProcessStepExternalContext.cs | 54 +++++++++++++++++++ .../Process.Abstractions/KernelProxyStep.cs | 4 +- .../Process.LocalRuntime/LocalProcess.cs | 2 +- .../Process.LocalRuntime/LocalProxy.cs | 23 ++++++++ .../Process.LocalRuntime/LocalStep.cs | 10 ++-- .../process/Abstractions/StepExtensions.cs | 6 ++- 8 files changed, 91 insertions(+), 47 deletions(-) create mode 100644 dotnet/src/Experimental/Process.Abstractions/KernelProcessStepExternalContext.cs diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProcessStep.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStep.cs index 861f6bbde6bf..f8c706fdb5e0 100644 --- a/dotnet/src/Experimental/Process.Abstractions/KernelProcessStep.cs +++ b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStep.cs @@ -19,7 +19,7 @@ public virtual ValueTask ActivateAsync(KernelProcessStepState state) /// Function triggered when deinitializing the KernelProcessStep /// /// - public virtual ValueTask DeactivateAsync(KernelProcessStepContext context) + public virtual ValueTask DeactivateAsync() { return default; } diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepContext.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepContext.cs index e3990ddac77f..e652e0adb367 100644 --- a/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepContext.cs +++ b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepContext.cs @@ -10,17 +10,14 @@ namespace Microsoft.SemanticKernel; public sealed class KernelProcessStepContext { private readonly IKernelProcessMessageChannel _stepMessageChannel; - private readonly IExternalKernelProcessMessageChannel? _externalMessageChannel; /// /// Initializes a new instance of the class. /// /// An instance of . - /// An instance of - public KernelProcessStepContext(IKernelProcessMessageChannel channel, IExternalKernelProcessMessageChannel? externalMessageChannel = null) + public KernelProcessStepContext(IKernelProcessMessageChannel channel) { this._stepMessageChannel = channel; - this._externalMessageChannel = externalMessageChannel; } /// @@ -55,36 +52,4 @@ public ValueTask EmitEventAsync( Visibility = visibility }); } - - /// - /// Emit an external event to through a - /// component if connected from within the SK process - /// - /// data containing event details - /// - /// - public async Task EmitExternalEventAsync(KernelProcessProxyMessage processEventData) - { - if (this._externalMessageChannel == null) - { - throw new KernelException($"External message channel not configured for step with topic {processEventData.ExternalTopicName}"); - } - - await this._externalMessageChannel.EmitExternalEventAsync(processEventData.ExternalTopicName, processEventData).ConfigureAwait(false); - } - - /// - /// Closes connection with external messaging channel - /// - /// - /// - public async Task CloseExternalEventChannelAsync() - { - if (this._externalMessageChannel == null) - { - throw new KernelException("External message channel not configured for step"); - } - - await this._externalMessageChannel.Uninitialize().ConfigureAwait(false); - } } diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepExternalContext.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepExternalContext.cs new file mode 100644 index 000000000000..b20c6de61c27 --- /dev/null +++ b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepExternalContext.cs @@ -0,0 +1,54 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Threading.Tasks; + +namespace Microsoft.SemanticKernel; + +/// +/// Provides step related functionality for Kernel Functions running in a step to emit events externally. +/// +public class KernelProcessStepExternalContext +{ + private readonly IExternalKernelProcessMessageChannel? _externalMessageChannel; + + /// + /// Initializes a new instance of the class. + /// + /// An instance of + public KernelProcessStepExternalContext(IExternalKernelProcessMessageChannel? externalMessageChannel = null) + { + this._externalMessageChannel = externalMessageChannel; + } + + /// + /// Emit an external event to through a + /// component if connected from within the SK process + /// + /// data containing event details + /// + /// + public async Task EmitExternalEventAsync(KernelProcessProxyMessage processEventData) + { + if (this._externalMessageChannel == null) + { + throw new KernelException($"External message channel not configured for step with topic {processEventData.ExternalTopicName}"); + } + + await this._externalMessageChannel.EmitExternalEventAsync(processEventData.ExternalTopicName, processEventData).ConfigureAwait(false); + } + + /// + /// Closes connection with external messaging channel + /// + /// + /// + public async Task CloseExternalEventChannelAsync() + { + if (this._externalMessageChannel == null) + { + throw new KernelException("External message channel not configured for step"); + } + + await this._externalMessageChannel.Uninitialize().ConfigureAwait(false); + } +} diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProxyStep.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProxyStep.cs index c91a6b918153..50619efef346 100644 --- a/dotnet/src/Experimental/Process.Abstractions/KernelProxyStep.cs +++ b/dotnet/src/Experimental/Process.Abstractions/KernelProxyStep.cs @@ -25,7 +25,7 @@ public static class Functions /// /// instance of /// - public override async ValueTask DeactivateAsync(KernelProcessStepContext context) + public async ValueTask DeactivateAsync(KernelProcessStepExternalContext context) { await context.CloseExternalEventChannelAsync().ConfigureAwait(false); } @@ -37,7 +37,7 @@ public override async ValueTask DeactivateAsync(KernelProcessStepContext context /// event data passed to proxy step /// [KernelFunction(Functions.EmitExternalEvent)] - public Task EmitExternalEventAsync(KernelProcessStepContext context, KernelProcessProxyMessage proxyEvent) + public Task EmitExternalEventAsync(KernelProcessStepExternalContext context, KernelProcessProxyMessage proxyEvent) { Verify.NotNull(proxyEvent.ExternalTopicName, nameof(proxyEvent.ExternalTopicName)); return context.EmitExternalEventAsync(proxyEvent); diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs index 3f28c89f35c6..b0b774043577 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs @@ -404,7 +404,7 @@ internal override async Task ToKernelProcessStepInfoAsync #endregion /// - internal override async ValueTask DeinitializeStepAsync() + public override async ValueTask DeinitializeStepAsync() { await Task.Run(() => this.Dispose()).ConfigureAwait(false); } diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalProxy.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalProxy.cs index 9ea020c17fcb..30f8e53c5dbb 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalProxy.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalProxy.cs @@ -1,9 +1,11 @@ // Copyright (c) Microsoft. All rights reserved. using System.Collections.Generic; using System.Linq; +using System.Reflection; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.SemanticKernel.Process; using Microsoft.SemanticKernel.Process.Internal; using Microsoft.SemanticKernel.Process.Runtime; @@ -77,4 +79,25 @@ protected override async ValueTask InitializeStepAsync() await base.InitializeStepAsync().ConfigureAwait(false); this._isInitialized = true; } + + public override async ValueTask DeinitializeStepAsync() + { + // Proxy Step is the only step it uses an overloaded DeactivateAsync with parameters + MethodInfo? derivedMethod = this._stepInfo.InnerStepType.GetMethod( + nameof(KernelProxyStep.DeactivateAsync), + BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance, + binder: null, + types: [typeof(KernelProcessStepExternalContext)], + modifiers: null); + + if (derivedMethod != null && this._stepInstance != null) + { + var context = new KernelProcessStepExternalContext(this.ExternalMessageChannel); + ValueTask deactivateTask = + (ValueTask?)derivedMethod.Invoke(this._stepInstance, [context]) ?? + throw new KernelException($"The derived DeactivateAsync method failed to complete for step {this.Name}.").Log(this._logger); + + await deactivateTask.ConfigureAwait(false); + } + } } diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs index 63125c06ded6..faa5f9ce822b 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs @@ -19,11 +19,8 @@ internal class LocalStep : IKernelProcessMessageChannel { private readonly Queue _outgoingEventQueue = new(); private readonly Lazy _initializeTask; - private readonly KernelProcessStepInfo _stepInfo; private readonly ILogger _logger; - private KernelProcessStep? _stepInstance = null; - protected readonly Kernel _kernel; protected readonly Dictionary _functions = []; @@ -32,6 +29,8 @@ internal class LocalStep : IKernelProcessMessageChannel protected Dictionary?>? _initialInputs = []; protected Dictionary> _outputEdges; + internal KernelProcessStep? _stepInstance = null; + internal readonly KernelProcessStepInfo _stepInfo; internal readonly string _eventNamespace; /// @@ -288,15 +287,14 @@ protected virtual async ValueTask InitializeStepAsync() /// /// Deinitializes the step /// - internal virtual async ValueTask DeinitializeStepAsync() + public virtual async ValueTask DeinitializeStepAsync() { MethodInfo? derivedMethod = this._stepInfo.InnerStepType.GetMethod(nameof(KernelProcessStep.DeactivateAsync), BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance); - var context = new KernelProcessStepContext(this, this.ExternalMessageChannel); if (derivedMethod != null && this._stepInstance != null) { ValueTask deactivateTask = - (ValueTask?)derivedMethod.Invoke(this._stepInstance, [context]) ?? + (ValueTask?)derivedMethod.Invoke(this._stepInstance, []) ?? throw new KernelException($"The derived DeactivateAsync method failed to complete for step {this.Name}.").Log(this._logger); await deactivateTask.ConfigureAwait(false); diff --git a/dotnet/src/InternalUtilities/process/Abstractions/StepExtensions.cs b/dotnet/src/InternalUtilities/process/Abstractions/StepExtensions.cs index fead79cde844..9db4592f4407 100644 --- a/dotnet/src/InternalUtilities/process/Abstractions/StepExtensions.cs +++ b/dotnet/src/InternalUtilities/process/Abstractions/StepExtensions.cs @@ -131,7 +131,11 @@ public static void InitializeUserState(this KernelProcessStepState stateObject, // and are instantiated here. if (param.ParameterType == typeof(KernelProcessStepContext)) { - inputs[kvp.Key]![param.Name] = new KernelProcessStepContext(channel, externalMessageChannel); + inputs[kvp.Key]![param.Name] = new KernelProcessStepContext(channel); + } + else if (param.ParameterType == typeof(KernelProcessStepExternalContext)) + { + inputs[kvp.Key]![param.Name] = new KernelProcessStepExternalContext(externalMessageChannel); } else { From a81bc3aaca57f2085051102d3b0994648e6e446c Mon Sep 17 00:00:00 2001 From: Estefania Tenorio <8483207+esttenorio@users.noreply.github.com> Date: Mon, 10 Mar 2025 15:30:05 -0700 Subject: [PATCH 08/10] addressing offline comments + making sure proxy message has correct processId piped --- .../Process.Abstractions/KernelProcessStep.cs | 9 -- .../LocalKernelProcessContext.cs | 13 +- .../Process.LocalRuntime/LocalProcess.cs | 20 ++- .../Process.LocalRuntime/LocalProxy.cs | 7 +- .../Process.LocalRuntime/LocalStep.cs | 14 +- .../Runtime.Local/LocalProcessTests.cs | 10 +- .../Runtime.Local/LocalProxyTests.cs | 144 +++++++++++++++--- 7 files changed, 165 insertions(+), 52 deletions(-) diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProcessStep.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStep.cs index f8c706fdb5e0..c3162340bb35 100644 --- a/dotnet/src/Experimental/Process.Abstractions/KernelProcessStep.cs +++ b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStep.cs @@ -14,15 +14,6 @@ public virtual ValueTask ActivateAsync(KernelProcessStepState state) { return default; } - - /// - /// Function triggered when deinitializing the KernelProcessStep - /// - /// - public virtual ValueTask DeactivateAsync() - { - return default; - } } /// diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs index b6ec2c240ada..09cc6aa6df9e 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs @@ -2,6 +2,7 @@ using System; using System.Threading.Tasks; using Microsoft.SemanticKernel.Process; +using Microsoft.VisualStudio.Threading; namespace Microsoft.SemanticKernel; @@ -10,6 +11,9 @@ namespace Microsoft.SemanticKernel; /// public sealed class LocalKernelProcessContext : KernelProcessContext, IDisposable { + private readonly JoinableTaskFactory _joinableTaskFactory; + private readonly JoinableTaskContext _joinableTaskContext; + private readonly LocalProcess _localProcess; private readonly Kernel _kernel; @@ -25,6 +29,9 @@ internal LocalKernelProcessContext(KernelProcess process, Kernel kernel, Process EventProxy = eventProxy, ExternalMessageChannel = externalMessageChannel, }; + + this._joinableTaskContext = new JoinableTaskContext(); + this._joinableTaskFactory = new JoinableTaskFactory(this._joinableTaskContext); } internal Task StartWithEventAsync(KernelProcessEvent initialEvent, Kernel? kernel = null) => @@ -53,7 +60,11 @@ public override Task SendEventAsync(KernelProcessEvent processEvent) => /// /// Disposes of the resources used by the process. /// - public void Dispose() => this._localProcess.Dispose(); + public void Dispose() + { + this._joinableTaskFactory.Run(() => this._localProcess.DisposeAsync().AsTask()); + this._joinableTaskContext.Dispose(); + } /// public override Task GetExternalMessageChannelAsync() diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs index b0b774043577..cf0cded345ab 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs @@ -16,7 +16,7 @@ namespace Microsoft.SemanticKernel; internal delegate bool ProcessEventProxy(ProcessEvent processEvent); -internal sealed class LocalProcess : LocalStep, IDisposable +internal sealed class LocalProcess : LocalStep, System.IAsyncDisposable { private readonly JoinableTaskFactory _joinableTaskFactory; private readonly JoinableTaskContext _joinableTaskContext; @@ -49,8 +49,15 @@ internal LocalProcess(KernelProcess process, Kernel kernel) this._joinableTaskContext = new JoinableTaskContext(); this._joinableTaskFactory = new JoinableTaskFactory(this._joinableTaskContext); this._logger = this._kernel.LoggerFactory?.CreateLogger(this.Name) ?? new NullLogger(); + // if parent id is null this is the root process + this.RootProcessId = this.ParentProcessId == null ? this.Id : null; } + /// + /// The Id of the root process. + /// + internal string? RootProcessId { get; init; } + /// /// Starts the process with an initial event and an optional kernel. /// @@ -196,6 +203,7 @@ private ValueTask InitializeProcessAsync() new LocalProcess(processStep, this._kernel) { ParentProcessId = this.Id, + RootProcessId = this.RootProcessId, EventProxy = this.EventProxy, ExternalMessageChannel = this.ExternalMessageChannel, }; @@ -213,7 +221,7 @@ private ValueTask InitializeProcessAsync() localStep = new LocalProxy(proxyStep, this._kernel) { - ParentProcessId = this.Id, + ParentProcessId = this.RootProcessId, EventProxy = this.EventProxy, ExternalMessageChannel = this.ExternalMessageChannel, }; @@ -404,18 +412,18 @@ internal override async Task ToKernelProcessStepInfoAsync #endregion /// - public override async ValueTask DeinitializeStepAsync() + public override async Task DeinitializeStepAsync() { - await Task.Run(() => this.Dispose()).ConfigureAwait(false); + await this.DisposeAsync().ConfigureAwait(false); } - public void Dispose() + public async ValueTask DisposeAsync() { this._externalEventChannel.Writer.Complete(); this._joinableTaskContext.Dispose(); foreach (var step in this._steps) { - this._joinableTaskFactory.Run(() => step.DeinitializeStepAsync().AsTask()); + await step.DeinitializeStepAsync().ConfigureAwait(false); } this._processCancelSource?.Dispose(); } diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalProxy.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalProxy.cs index 30f8e53c5dbb..221ffcd2b371 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalProxy.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalProxy.cs @@ -80,9 +80,12 @@ protected override async ValueTask InitializeStepAsync() this._isInitialized = true; } - public override async ValueTask DeinitializeStepAsync() + /// + /// Deinitialization of the Proxy Step, calling + /// + /// + public override async Task DeinitializeStepAsync() { - // Proxy Step is the only step it uses an overloaded DeactivateAsync with parameters MethodInfo? derivedMethod = this._stepInfo.InnerStepType.GetMethod( nameof(KernelProxyStep.DeactivateAsync), BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance, diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs index faa5f9ce822b..68840b8849c9 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs @@ -287,18 +287,10 @@ protected virtual async ValueTask InitializeStepAsync() /// /// Deinitializes the step /// - public virtual async ValueTask DeinitializeStepAsync() + public virtual Task DeinitializeStepAsync() { - MethodInfo? derivedMethod = this._stepInfo.InnerStepType.GetMethod(nameof(KernelProcessStep.DeactivateAsync), BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance); - - if (derivedMethod != null && this._stepInstance != null) - { - ValueTask deactivateTask = - (ValueTask?)derivedMethod.Invoke(this._stepInstance, []) ?? - throw new KernelException($"The derived DeactivateAsync method failed to complete for step {this.Name}.").Log(this._logger); - - await deactivateTask.ConfigureAwait(false); - } + this._logger.LogInformation("Step {Name} has deinitialized", this.Name); + return Task.CompletedTask; } /// diff --git a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs index 61823233abd6..e409d45eb893 100644 --- a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs +++ b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs @@ -26,7 +26,7 @@ public async Task ExecuteAsyncInitializesCorrectlyAsync() ], []); var mockKernel = new Kernel(); - using var localProcess = new LocalProcess(mockKernelProcess, mockKernel); + await using var localProcess = new LocalProcess(mockKernelProcess, mockKernel); // Act await localProcess.StartAsync(); @@ -40,7 +40,7 @@ public async Task ExecuteAsyncInitializesCorrectlyAsync() /// Validates that the assigns and Id to the process if one is not already set. /// [Fact] - public void ProcessWithMissingIdIsAssignedAnId() + public async Task ProcessWithMissingIdIsAssignedAnIdAsync() { // Arrange var mockKernel = new Kernel(); @@ -52,7 +52,7 @@ public void ProcessWithMissingIdIsAssignedAnId() ], []); // Act - using var localProcess = new LocalProcess(mockKernelProcess, mockKernel); + await using var localProcess = new LocalProcess(mockKernelProcess, mockKernel); // Assert Assert.NotEmpty(localProcess.Id); @@ -62,7 +62,7 @@ public void ProcessWithMissingIdIsAssignedAnId() /// Validates that the assigns and Id to the process if one is not already set. /// [Fact] - public void ProcessWithAssignedIdIsNotOverwrittenId() + public async Task ProcessWithAssignedIdIsNotOverwrittenIdAsync() { // Arrange var mockKernel = new Kernel(); @@ -74,7 +74,7 @@ public void ProcessWithAssignedIdIsNotOverwrittenId() ], []); // Act - using var localProcess = new LocalProcess(mockKernelProcess, mockKernel); + await using var localProcess = new LocalProcess(mockKernelProcess, mockKernel); // Assert Assert.NotEmpty(localProcess.Id); diff --git a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProxyTests.cs b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProxyTests.cs index 77e9a7c018d2..b5969571da0c 100644 --- a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProxyTests.cs +++ b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProxyTests.cs @@ -85,8 +85,8 @@ public void ProcessWithProxyFailsToCreateDueMissingTopicRegistration() } /// - /// Validates the result as the first step in the process - /// and with a step as the map operation. + /// Validates the emits different topics from + /// different steps /// [Fact] public async Task ProcessWithCyclesAndProxyWithTwoTopicsAsync() @@ -94,29 +94,96 @@ public async Task ProcessWithCyclesAndProxyWithTwoTopicsAsync() // Arrange CommonSteps.CountStep.Index = 0; var mockProxyClient = new MockCloudEventClient(); - ProcessBuilder process = new(nameof(ProcessWithCyclesAndProxyWithTwoTopicsAsync)); + ProcessBuilder process = this.GetSampleProcessWithProxyEmittingTwoTopics(nameof(ProcessWithCyclesAndProxyWithTwoTopicsAsync)); + KernelProcess processInstance = process.Build(); + Kernel kernel = new(); - var counterStep = process.AddStepFromType(); - var evenNumberStep = process.AddStepFromType(); - var proxyStep = process.AddProxyStep([this._topic1, this._topic2]); + // Act + using (LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, null, this._startProcessEvent, externalMessageChannel: mockProxyClient)) + { + // Assert + var runningProcessId = await processContext.GetProcessIdAsync(); + + Assert.NotNull(mockProxyClient); + Assert.True(0 < mockProxyClient.InitializationCounter); + Assert.Equal(0, mockProxyClient.UninitializationCounter); + Assert.Equal(3, mockProxyClient.CloudEvents.Count); + Assert.Equal(this._topic1, mockProxyClient.CloudEvents[0].TopicName); + Assert.Equal(runningProcessId, mockProxyClient.CloudEvents[0].Data?.ProcessId); + Assert.Equal("1", mockProxyClient.CloudEvents[0].Data?.EventData); + Assert.Equal(this._topic1, mockProxyClient.CloudEvents[1].TopicName); + Assert.Equal("2", mockProxyClient.CloudEvents[1].Data?.EventData); + Assert.Equal(this._topic2, mockProxyClient.CloudEvents[2].TopicName); + Assert.Equal("2", mockProxyClient.CloudEvents[2].Data?.EventData); + } + + // Assert + Assert.Equal(1, mockProxyClient.UninitializationCounter); + } + + /// + /// Validates the emits different topics from + /// different steps from a nested process + /// + [Fact] + public async Task ProcessWithProxyIn2LevelsNestedProcessEmitsTwoTopicsAsync() + { + // Arrange + CommonSteps.CountStep.Index = 0; + var mockProxyClient = new MockCloudEventClient(); + ProcessBuilder process = new(nameof(ProcessWithProxyIn2LevelsNestedProcessEmitsTwoTopicsAsync)); + var innerProcess = process.AddStepFromProcess(this.GetSampleProcessWithProxyEmittingTwoTopics($"Inner-{nameof(ProcessWithProxyIn2LevelsNestedProcessEmitsTwoTopicsAsync)}")); process .OnInputEvent(this._startProcessEvent) - .SendEventTo(new(counterStep)); + .SendEventTo(innerProcess.WhereInputEventIs(this._startProcessEvent)); - counterStep - .OnFunctionResult() - .EmitExternalEvent(proxyStep, this._topic1) - .SendEventTo(new(evenNumberStep)); + KernelProcess processInstance = process.Build(); + Kernel kernel = new(); - // request another number if number is odd - evenNumberStep - .OnEvent(CommonSteps.EvenNumberDetectorStep.OutputEvents.OddNumber) - .SendEventTo(new(counterStep)); + // Act + using (LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, null, this._startProcessEvent, externalMessageChannel: mockProxyClient)) + { + // Assert + var runningProcessId = await processContext.GetProcessIdAsync(); - evenNumberStep - .OnEvent(CommonSteps.EvenNumberDetectorStep.OutputEvents.EvenNumber) - .EmitExternalEvent(proxyStep, this._topic2); + Assert.NotNull(mockProxyClient); + Assert.True(0 < mockProxyClient.InitializationCounter); + Assert.Equal(0, mockProxyClient.UninitializationCounter); + Assert.Equal(3, mockProxyClient.CloudEvents.Count); + Assert.Equal(this._topic1, mockProxyClient.CloudEvents[0].TopicName); + Assert.Equal(runningProcessId, mockProxyClient.CloudEvents[0].Data?.ProcessId); + Assert.Equal("1", mockProxyClient.CloudEvents[0].Data?.EventData); + Assert.Equal(this._topic1, mockProxyClient.CloudEvents[1].TopicName); + Assert.Equal("2", mockProxyClient.CloudEvents[1].Data?.EventData); + Assert.Equal(this._topic2, mockProxyClient.CloudEvents[2].TopicName); + Assert.Equal("2", mockProxyClient.CloudEvents[2].Data?.EventData); + } + + // Assert + Assert.Equal(1, mockProxyClient.UninitializationCounter); + } + + /// + /// Validates the emits different topics from + /// different steps from a deep nested process + /// + [Fact] + public async Task ProcessWithProxyIn4LevelsNestedProcessEmitsTwoTopicsAsync() + { + // Arrange + CommonSteps.CountStep.Index = 0; + var mockProxyClient = new MockCloudEventClient(); + ProcessBuilder process = new(nameof(ProcessWithProxyIn4LevelsNestedProcessEmitsTwoTopicsAsync)); + var innerProcess = process.AddStepFromProcess( + this.GetNestedProcess( + processName: $"Inner1-{nameof(ProcessWithProxyIn4LevelsNestedProcessEmitsTwoTopicsAsync)}", + internalProcess: this.GetSampleProcessWithProxyEmittingTwoTopics($"Inner2-{nameof(ProcessWithProxyIn4LevelsNestedProcessEmitsTwoTopicsAsync)}"), + inputEventName: this._startProcessEvent)); + + process + .OnInputEvent(this._startProcessEvent) + .SendEventTo(innerProcess.WhereInputEventIs(this._startProcessEvent)); KernelProcess processInstance = process.Build(); Kernel kernel = new(); @@ -144,6 +211,47 @@ public async Task ProcessWithCyclesAndProxyWithTwoTopicsAsync() Assert.Equal(1, mockProxyClient.UninitializationCounter); } + private ProcessBuilder GetNestedProcess(string processName, ProcessBuilder internalProcess, string inputEventName) + { + ProcessBuilder process = new(processName); + var innerProcess = process.AddStepFromProcess(this.GetSampleProcessWithProxyEmittingTwoTopics($"Inner-{processName}")); + + process + .OnInputEvent(inputEventName) + .SendEventTo(innerProcess.WhereInputEventIs(inputEventName)); + + return process; + } + + private ProcessBuilder GetSampleProcessWithProxyEmittingTwoTopics(string processName) + { + ProcessBuilder process = new(processName); + + var counterStep = process.AddStepFromType(); + var evenNumberStep = process.AddStepFromType(); + var proxyStep = process.AddProxyStep([this._topic1, this._topic2]); + + process + .OnInputEvent(this._startProcessEvent) + .SendEventTo(new(counterStep)); + + counterStep + .OnFunctionResult() + .EmitExternalEvent(proxyStep, this._topic1) + .SendEventTo(new(evenNumberStep)); + + // request another number if number is odd + evenNumberStep + .OnEvent(CommonSteps.EvenNumberDetectorStep.OutputEvents.OddNumber) + .SendEventTo(new(counterStep)); + + evenNumberStep + .OnEvent(CommonSteps.EvenNumberDetectorStep.OutputEvents.EvenNumber) + .EmitExternalEvent(proxyStep, this._topic2); + + return process; + } + private async Task RunProcessAsync(Kernel kernel, KernelProcess process, object? input, string inputEvent, IExternalKernelProcessMessageChannel? externalMessageChannel) { return From 87dd4bfabc3ca6d7ed8eaf387db5fb252f5e891c Mon Sep 17 00:00:00 2001 From: Estefania Tenorio <8483207+esttenorio@users.noreply.github.com> Date: Tue, 11 Mar 2025 10:48:18 -0700 Subject: [PATCH 09/10] changing local context to use IAsyncDisposable --- .../Program.cs | 2 +- .../Step00/Step00_Processes.cs | 2 +- .../Step01/Step01_Processes.cs | 2 +- .../Step02/Step02a_AccountOpening.cs | 6 +++--- .../Step02/Step02b_AccountOpening.cs | 6 +++--- .../Step03/Step03a_FoodPreparation.cs | 2 +- .../Step03/Step03b_FoodOrdering.cs | 2 +- .../Step04/Step04_AgentOrchestration.cs | 2 +- .../Step05/Step05_MapReduce.cs | 2 +- .../LocalKernelProcessContext.cs | 16 +++++++++++----- .../Process.LocalRuntime/LocalMap.cs | 2 +- .../Runtime.Local/LocalMapTests.cs | 18 +++++++++--------- .../Runtime.Local/LocalProcessTests.cs | 6 +++--- .../Runtime.Local/LocalProxyTests.cs | 8 ++++---- 14 files changed, 41 insertions(+), 35 deletions(-) diff --git a/dotnet/samples/Demos/ProcessFrameworkWithAspire/ProcessFramework.Aspire/ProcessFramework.Aspire.ProcessOrchestrator/Program.cs b/dotnet/samples/Demos/ProcessFrameworkWithAspire/ProcessFramework.Aspire/ProcessFramework.Aspire.ProcessOrchestrator/Program.cs index 0dac1b69d041..de3d58905e35 100644 --- a/dotnet/samples/Demos/ProcessFrameworkWithAspire/ProcessFramework.Aspire/ProcessFramework.Aspire.ProcessOrchestrator/Program.cs +++ b/dotnet/samples/Demos/ProcessFrameworkWithAspire/ProcessFramework.Aspire/ProcessFramework.Aspire.ProcessOrchestrator/Program.cs @@ -77,7 +77,7 @@ .StopProcess(); var process = processBuilder.Build(); - using var runningProcess = await process.StartAsync( + await using var runningProcess = await process.StartAsync( kernel, new KernelProcessEvent { Id = ProcessEvents.TranslateDocument, Data = "COME I FORNITORI INFLUENZANO I TUOI COSTI Quando scegli un piano di assicurazione sanitaria, uno dei fattori più importanti da considerare è la rete di fornitori in convenzione disponibili con il piano. Northwind Standard offre un'ampia varietà di fornitori in convenzione, tra cui medici di base, specialisti, ospedali e farmacie. Questo ti permette di scegliere un fornitore comodo per te e la tua famiglia, contribuendo al contempo a mantenere bassi i tuoi costi. Se scegli un fornitore in convenzione con il tuo piano, pagherai generalmente copay e franchigie più basse rispetto a un fornitore fuori rete. Inoltre, molti servizi, come l'assistenza preventiva, possono essere coperti senza alcun costo aggiuntivo se ricevuti da un fornitore in convenzione. È importante notare, tuttavia, che Northwind Standard non copre i servizi di emergenza, l'assistenza per la salute mentale e l'abuso di sostanze, né i servizi fuori rete. Questo significa che potresti dover pagare di tasca tua per questi servizi se ricevuti da un fornitore fuori rete. Quando scegli un fornitore in convenzione, ci sono alcuni suggerimenti da tenere a mente. Verifica che il fornitore sia in convenzione con il tuo piano. Puoi confermarlo chiamando l'ufficio del fornitore e chiedendo se è in rete con Northwind Standard. Puoi anche utilizzare lo strumento di ricerca fornitori sul sito web di Northwind Health per verificare la copertura. Assicurati che il fornitore stia accettando nuovi pazienti. Alcuni fornitori potrebbero essere in convenzione ma non accettare nuovi pazienti. Considera la posizione del fornitore. Se il fornitore è troppo lontano, potrebbe essere difficile raggiungere gli appuntamenti. Valuta gli orari dell'ufficio del fornitore. Se lavori durante il giorno, potresti aver bisogno di trovare un fornitore con orari serali o nel fine settimana. Scegliere un fornitore in convenzione può aiutarti a risparmiare sui costi sanitari. Seguendo i suggerimenti sopra e facendo ricerche sulle opzioni disponibili, puoi trovare un fornitore conveniente, accessibile e in rete con il tuo piano Northwind Standard." } ); diff --git a/dotnet/samples/GettingStartedWithProcesses/Step00/Step00_Processes.cs b/dotnet/samples/GettingStartedWithProcesses/Step00/Step00_Processes.cs index cf4333daba87..72f8c2306140 100644 --- a/dotnet/samples/GettingStartedWithProcesses/Step00/Step00_Processes.cs +++ b/dotnet/samples/GettingStartedWithProcesses/Step00/Step00_Processes.cs @@ -59,7 +59,7 @@ public async Task UseSimplestProcessAsync() KernelProcess kernelProcess = process.Build(); // Start the process with an initial external event - using var runningProcess = await kernelProcess.StartAsync( + await using var runningProcess = await kernelProcess.StartAsync( kernel, new KernelProcessEvent() { diff --git a/dotnet/samples/GettingStartedWithProcesses/Step01/Step01_Processes.cs b/dotnet/samples/GettingStartedWithProcesses/Step01/Step01_Processes.cs index 84b3d7584eb4..fe2fab2edcec 100644 --- a/dotnet/samples/GettingStartedWithProcesses/Step01/Step01_Processes.cs +++ b/dotnet/samples/GettingStartedWithProcesses/Step01/Step01_Processes.cs @@ -77,7 +77,7 @@ public async Task UseSimpleProcessAsync() Console.WriteLine($"Diagram generated at: {generatedImagePath}"); // Start the process with an initial external event - using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = ChatBotEvents.StartProcess, Data = null }); + await using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = ChatBotEvents.StartProcess, Data = null }); } /// diff --git a/dotnet/samples/GettingStartedWithProcesses/Step02/Step02a_AccountOpening.cs b/dotnet/samples/GettingStartedWithProcesses/Step02/Step02a_AccountOpening.cs index 1564dc679eec..2ee5bb33db1f 100644 --- a/dotnet/samples/GettingStartedWithProcesses/Step02/Step02a_AccountOpening.cs +++ b/dotnet/samples/GettingStartedWithProcesses/Step02/Step02a_AccountOpening.cs @@ -145,7 +145,7 @@ public async Task UseAccountOpeningProcessSuccessfulInteractionAsync() { Kernel kernel = CreateKernelWithChatCompletion(); KernelProcess kernelProcess = SetupAccountOpeningProcess(); - using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); + await using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); } /// @@ -156,7 +156,7 @@ public async Task UseAccountOpeningProcessFailureDueToCreditScoreFailureAsync() { Kernel kernel = CreateKernelWithChatCompletion(); KernelProcess kernelProcess = SetupAccountOpeningProcess(); - using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); + await using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); } /// @@ -167,6 +167,6 @@ public async Task UseAccountOpeningProcessFailureDueToFraudFailureAsync() { Kernel kernel = CreateKernelWithChatCompletion(); KernelProcess kernelProcess = SetupAccountOpeningProcess(); - using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); + await using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); } } diff --git a/dotnet/samples/GettingStartedWithProcesses/Step02/Step02b_AccountOpening.cs b/dotnet/samples/GettingStartedWithProcesses/Step02/Step02b_AccountOpening.cs index b14b659cd20f..581e41cf76d0 100644 --- a/dotnet/samples/GettingStartedWithProcesses/Step02/Step02b_AccountOpening.cs +++ b/dotnet/samples/GettingStartedWithProcesses/Step02/Step02b_AccountOpening.cs @@ -112,7 +112,7 @@ public async Task UseAccountOpeningProcessSuccessfulInteractionAsync() { Kernel kernel = CreateKernelWithChatCompletion(); KernelProcess kernelProcess = SetupAccountOpeningProcess(); - using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); + await using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); } /// @@ -123,7 +123,7 @@ public async Task UseAccountOpeningProcessFailureDueToCreditScoreFailureAsync() { Kernel kernel = CreateKernelWithChatCompletion(); KernelProcess kernelProcess = SetupAccountOpeningProcess(); - using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); + await using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); } /// @@ -134,6 +134,6 @@ public async Task UseAccountOpeningProcessFailureDueToFraudFailureAsync() { Kernel kernel = CreateKernelWithChatCompletion(); KernelProcess kernelProcess = SetupAccountOpeningProcess(); - using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); + await using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); } } diff --git a/dotnet/samples/GettingStartedWithProcesses/Step03/Step03a_FoodPreparation.cs b/dotnet/samples/GettingStartedWithProcesses/Step03/Step03a_FoodPreparation.cs index 0a5f4192c17a..99d2f2f4e122 100644 --- a/dotnet/samples/GettingStartedWithProcesses/Step03/Step03a_FoodPreparation.cs +++ b/dotnet/samples/GettingStartedWithProcesses/Step03/Step03a_FoodPreparation.cs @@ -252,7 +252,7 @@ protected async Task UsePrepareSpecificProductAsync(ProcessBuilder processBuilde // Assert Console.WriteLine($"=== Start SK Process '{processBuilder.Name}' ==="); - using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() + await using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = externalTriggerEvent, Data = new List() }); diff --git a/dotnet/samples/GettingStartedWithProcesses/Step03/Step03b_FoodOrdering.cs b/dotnet/samples/GettingStartedWithProcesses/Step03/Step03b_FoodOrdering.cs index a17f04f43578..e5077d7bea69 100644 --- a/dotnet/samples/GettingStartedWithProcesses/Step03/Step03b_FoodOrdering.cs +++ b/dotnet/samples/GettingStartedWithProcesses/Step03/Step03b_FoodOrdering.cs @@ -45,7 +45,7 @@ protected async Task UsePrepareFoodOrderProcessSingleItemAsync(FoodItem foodItem Kernel kernel = CreateKernelWithChatCompletion(); KernelProcess kernelProcess = SingleFoodItemProcess.CreateProcess().Build(); - using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() + await using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = SingleFoodItemProcess.ProcessEvents.SingleOrderReceived, Data = foodItem diff --git a/dotnet/samples/GettingStartedWithProcesses/Step04/Step04_AgentOrchestration.cs b/dotnet/samples/GettingStartedWithProcesses/Step04/Step04_AgentOrchestration.cs index 8ee3fb3adad6..20e506958bd3 100644 --- a/dotnet/samples/GettingStartedWithProcesses/Step04/Step04_AgentOrchestration.cs +++ b/dotnet/samples/GettingStartedWithProcesses/Step04/Step04_AgentOrchestration.cs @@ -65,7 +65,7 @@ private async Task RunProcessAsync(KernelProcess process) Kernel kernel = SetupKernel(history); // Execute process - using LocalKernelProcessContext localProcess = + await using LocalKernelProcessContext localProcess = await process.StartAsync( kernel, new KernelProcessEvent() diff --git a/dotnet/samples/GettingStartedWithProcesses/Step05/Step05_MapReduce.cs b/dotnet/samples/GettingStartedWithProcesses/Step05/Step05_MapReduce.cs index ca8d33818d53..4154fd486f9b 100644 --- a/dotnet/samples/GettingStartedWithProcesses/Step05/Step05_MapReduce.cs +++ b/dotnet/samples/GettingStartedWithProcesses/Step05/Step05_MapReduce.cs @@ -44,7 +44,7 @@ public async Task RunMapReduceAsync() // Execute the process Kernel kernel = new(); - using LocalKernelProcessContext localProcess = + await using LocalKernelProcessContext localProcess = await process.StartAsync( kernel, new KernelProcessEvent diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs index 09cc6aa6df9e..d744f87a2062 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs @@ -1,5 +1,4 @@ // Copyright (c) Microsoft. All rights reserved. -using System; using System.Threading.Tasks; using Microsoft.SemanticKernel.Process; using Microsoft.VisualStudio.Threading; @@ -9,7 +8,7 @@ namespace Microsoft.SemanticKernel; /// /// Provides context and actions on a process that is running locally. /// -public sealed class LocalKernelProcessContext : KernelProcessContext, IDisposable +public sealed class LocalKernelProcessContext : KernelProcessContext, System.IAsyncDisposable { private readonly JoinableTaskFactory _joinableTaskFactory; private readonly JoinableTaskContext _joinableTaskContext; @@ -60,10 +59,17 @@ public override Task SendEventAsync(KernelProcessEvent processEvent) => /// /// Disposes of the resources used by the process. /// - public void Dispose() + public async ValueTask DisposeAsync() { - this._joinableTaskFactory.Run(() => this._localProcess.DisposeAsync().AsTask()); - this._joinableTaskContext.Dispose(); + await this._localProcess.DisposeAsync().ConfigureAwait(false); + if (this._joinableTaskContext is System.IAsyncDisposable asyncDisposableContext) + { + await asyncDisposableContext.DisposeAsync().ConfigureAwait(false); + } + else + { + this._joinableTaskContext.Dispose(); + } } /// diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalMap.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalMap.cs index 9065ad94e19d..b0f80e890a56 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalMap.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalMap.cs @@ -98,7 +98,7 @@ internal override async Task HandleMessageAsync(ProcessMessage message) { foreach (var operation in mapOperations) { - operation.ProcessContext.Dispose(); + await operation.ProcessContext.DisposeAsync().ConfigureAwait(false); } } } diff --git a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalMapTests.cs b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalMapTests.cs index e9532394d56b..eb454c1e1858 100644 --- a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalMapTests.cs +++ b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalMapTests.cs @@ -38,7 +38,7 @@ public async Task ProcessMapResultAsFirstAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + await using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); // Assert UnionState unionState = await GetUnionStateAsync(processContext); @@ -68,7 +68,7 @@ public async Task ProcessMapResultFilterEventAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + await using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); // Assert UnionState unionState = await GetUnionStateAsync(processContext); @@ -99,7 +99,7 @@ public async Task ProcessMapResultWithTransformAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + await using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); // Assert UnionState unionState = await GetUnionStateAsync(processContext); @@ -130,7 +130,7 @@ public async Task ProcessMapResultOperationTargetAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + await using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); // Assert UnionState unionState = await GetUnionStateAsync(processContext); @@ -166,7 +166,7 @@ public async Task ProcessMapResultAsTargetAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + await using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); // Assert UnionState unionState = await GetUnionStateAsync(processContext); @@ -200,7 +200,7 @@ public async Task ProcessMapResultMultiEventAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + await using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); // Assert UnionState unionState = await GetUnionStateAsync(processContext); @@ -238,7 +238,7 @@ public async Task ProcessMapResultProcessOperationAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + await using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); // Assert UnionState unionState = await GetUnionStateAsync(processContext); @@ -315,7 +315,7 @@ public async Task ProcessMapResultWithTargetExtraAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + await using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); // Assert UnionState unionState = await GetUnionStateAsync(processContext); @@ -363,7 +363,7 @@ public async Task ProcessMapResultForNestedMapAsync() [1, 2, 3, 4, 5], [1, 2, 3, 4, 5], ]; - using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, input, "Start"); + await using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, input, "Start"); // Assert UnionState unionState = await GetUnionStateAsync(processContext); diff --git a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs index e409d45eb893..770eab991394 100644 --- a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs +++ b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs @@ -100,7 +100,7 @@ public async Task ProcessFunctionErrorHandledAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext runningProcess = await processInstance.StartAsync(kernel, new KernelProcessEvent() { Id = "Start" }); + await using LocalKernelProcessContext runningProcess = await processInstance.StartAsync(kernel, new KernelProcessEvent() { Id = "Start" }); // Assert Assert.True(kernel.Data.ContainsKey("error-function")); @@ -126,7 +126,7 @@ public async Task ProcessGlobalErrorHandledAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext runningProcess = await processInstance.StartAsync(kernel, new KernelProcessEvent() { Id = "Start" }); + await using LocalKernelProcessContext runningProcess = await processInstance.StartAsync(kernel, new KernelProcessEvent() { Id = "Start" }); // Assert Assert.True(kernel.Data.ContainsKey("error-global")); @@ -153,7 +153,7 @@ public async Task FunctionErrorHandlerTakesPrecedenceAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext runningProcess = await processInstance.StartAsync(kernel, new KernelProcessEvent() { Id = "Start" }); + await using LocalKernelProcessContext runningProcess = await processInstance.StartAsync(kernel, new KernelProcessEvent() { Id = "Start" }); // Assert Assert.False(kernel.Data.ContainsKey("error-global")); diff --git a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProxyTests.cs b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProxyTests.cs index b5969571da0c..ab732fcaec1a 100644 --- a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProxyTests.cs +++ b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProxyTests.cs @@ -37,7 +37,7 @@ public async Task ProcessWithProxyWithSingleTopicCalledTwiceAsync() Kernel kernel = new(); // Act - using (LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, null, this._startProcessEvent, externalMessageChannel: mockProxyClient)) + await using (LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, null, this._startProcessEvent, externalMessageChannel: mockProxyClient)) { // Assert var runningProcessId = await processContext.GetProcessIdAsync(); @@ -99,7 +99,7 @@ public async Task ProcessWithCyclesAndProxyWithTwoTopicsAsync() Kernel kernel = new(); // Act - using (LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, null, this._startProcessEvent, externalMessageChannel: mockProxyClient)) + await using (LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, null, this._startProcessEvent, externalMessageChannel: mockProxyClient)) { // Assert var runningProcessId = await processContext.GetProcessIdAsync(); @@ -142,7 +142,7 @@ public async Task ProcessWithProxyIn2LevelsNestedProcessEmitsTwoTopicsAsync() Kernel kernel = new(); // Act - using (LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, null, this._startProcessEvent, externalMessageChannel: mockProxyClient)) + await using (LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, null, this._startProcessEvent, externalMessageChannel: mockProxyClient)) { // Assert var runningProcessId = await processContext.GetProcessIdAsync(); @@ -189,7 +189,7 @@ public async Task ProcessWithProxyIn4LevelsNestedProcessEmitsTwoTopicsAsync() Kernel kernel = new(); // Act - using (LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, null, this._startProcessEvent, externalMessageChannel: mockProxyClient)) + await using (LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, null, this._startProcessEvent, externalMessageChannel: mockProxyClient)) { // Assert var runningProcessId = await processContext.GetProcessIdAsync(); From 2b0f958d31b644cf0e6344782e2162925814532e Mon Sep 17 00:00:00 2001 From: Estefania Tenorio <8483207+esttenorio@users.noreply.github.com> Date: Tue, 11 Mar 2025 15:59:24 -0700 Subject: [PATCH 10/10] removing unnecessary JoinableTaskFactory --- .../LocalKernelProcessContext.cs | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs index d744f87a2062..fd15fa16dd2a 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs @@ -1,7 +1,6 @@ // Copyright (c) Microsoft. All rights reserved. using System.Threading.Tasks; using Microsoft.SemanticKernel.Process; -using Microsoft.VisualStudio.Threading; namespace Microsoft.SemanticKernel; @@ -10,9 +9,6 @@ namespace Microsoft.SemanticKernel; /// public sealed class LocalKernelProcessContext : KernelProcessContext, System.IAsyncDisposable { - private readonly JoinableTaskFactory _joinableTaskFactory; - private readonly JoinableTaskContext _joinableTaskContext; - private readonly LocalProcess _localProcess; private readonly Kernel _kernel; @@ -28,9 +24,6 @@ internal LocalKernelProcessContext(KernelProcess process, Kernel kernel, Process EventProxy = eventProxy, ExternalMessageChannel = externalMessageChannel, }; - - this._joinableTaskContext = new JoinableTaskContext(); - this._joinableTaskFactory = new JoinableTaskFactory(this._joinableTaskContext); } internal Task StartWithEventAsync(KernelProcessEvent initialEvent, Kernel? kernel = null) => @@ -62,14 +55,6 @@ public override Task SendEventAsync(KernelProcessEvent processEvent) => public async ValueTask DisposeAsync() { await this._localProcess.DisposeAsync().ConfigureAwait(false); - if (this._joinableTaskContext is System.IAsyncDisposable asyncDisposableContext) - { - await asyncDisposableContext.DisposeAsync().ConfigureAwait(false); - } - else - { - this._joinableTaskContext.Dispose(); - } } ///