diff --git a/dotnet/samples/Demos/ProcessFrameworkWithAspire/ProcessFramework.Aspire/ProcessFramework.Aspire.ProcessOrchestrator/Program.cs b/dotnet/samples/Demos/ProcessFrameworkWithAspire/ProcessFramework.Aspire/ProcessFramework.Aspire.ProcessOrchestrator/Program.cs index 0dac1b69d041..de3d58905e35 100644 --- a/dotnet/samples/Demos/ProcessFrameworkWithAspire/ProcessFramework.Aspire/ProcessFramework.Aspire.ProcessOrchestrator/Program.cs +++ b/dotnet/samples/Demos/ProcessFrameworkWithAspire/ProcessFramework.Aspire/ProcessFramework.Aspire.ProcessOrchestrator/Program.cs @@ -77,7 +77,7 @@ .StopProcess(); var process = processBuilder.Build(); - using var runningProcess = await process.StartAsync( + await using var runningProcess = await process.StartAsync( kernel, new KernelProcessEvent { Id = ProcessEvents.TranslateDocument, Data = "COME I FORNITORI INFLUENZANO I TUOI COSTI Quando scegli un piano di assicurazione sanitaria, uno dei fattori più importanti da considerare è la rete di fornitori in convenzione disponibili con il piano. Northwind Standard offre un'ampia varietà di fornitori in convenzione, tra cui medici di base, specialisti, ospedali e farmacie. Questo ti permette di scegliere un fornitore comodo per te e la tua famiglia, contribuendo al contempo a mantenere bassi i tuoi costi. Se scegli un fornitore in convenzione con il tuo piano, pagherai generalmente copay e franchigie più basse rispetto a un fornitore fuori rete. Inoltre, molti servizi, come l'assistenza preventiva, possono essere coperti senza alcun costo aggiuntivo se ricevuti da un fornitore in convenzione. È importante notare, tuttavia, che Northwind Standard non copre i servizi di emergenza, l'assistenza per la salute mentale e l'abuso di sostanze, né i servizi fuori rete. Questo significa che potresti dover pagare di tasca tua per questi servizi se ricevuti da un fornitore fuori rete. Quando scegli un fornitore in convenzione, ci sono alcuni suggerimenti da tenere a mente. Verifica che il fornitore sia in convenzione con il tuo piano. Puoi confermarlo chiamando l'ufficio del fornitore e chiedendo se è in rete con Northwind Standard. Puoi anche utilizzare lo strumento di ricerca fornitori sul sito web di Northwind Health per verificare la copertura. Assicurati che il fornitore stia accettando nuovi pazienti. Alcuni fornitori potrebbero essere in convenzione ma non accettare nuovi pazienti. Considera la posizione del fornitore. Se il fornitore è troppo lontano, potrebbe essere difficile raggiungere gli appuntamenti. Valuta gli orari dell'ufficio del fornitore. Se lavori durante il giorno, potresti aver bisogno di trovare un fornitore con orari serali o nel fine settimana. Scegliere un fornitore in convenzione può aiutarti a risparmiare sui costi sanitari. Seguendo i suggerimenti sopra e facendo ricerche sulle opzioni disponibili, puoi trovare un fornitore conveniente, accessibile e in rete con il tuo piano Northwind Standard." } ); diff --git a/dotnet/samples/GettingStartedWithProcesses/Step00/Step00_Processes.cs b/dotnet/samples/GettingStartedWithProcesses/Step00/Step00_Processes.cs index cf4333daba87..72f8c2306140 100644 --- a/dotnet/samples/GettingStartedWithProcesses/Step00/Step00_Processes.cs +++ b/dotnet/samples/GettingStartedWithProcesses/Step00/Step00_Processes.cs @@ -59,7 +59,7 @@ public async Task UseSimplestProcessAsync() KernelProcess kernelProcess = process.Build(); // Start the process with an initial external event - using var runningProcess = await kernelProcess.StartAsync( + await using var runningProcess = await kernelProcess.StartAsync( kernel, new KernelProcessEvent() { diff --git a/dotnet/samples/GettingStartedWithProcesses/Step01/Step01_Processes.cs b/dotnet/samples/GettingStartedWithProcesses/Step01/Step01_Processes.cs index 84b3d7584eb4..fe2fab2edcec 100644 --- a/dotnet/samples/GettingStartedWithProcesses/Step01/Step01_Processes.cs +++ b/dotnet/samples/GettingStartedWithProcesses/Step01/Step01_Processes.cs @@ -77,7 +77,7 @@ public async Task UseSimpleProcessAsync() Console.WriteLine($"Diagram generated at: {generatedImagePath}"); // Start the process with an initial external event - using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = ChatBotEvents.StartProcess, Data = null }); + await using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = ChatBotEvents.StartProcess, Data = null }); } /// diff --git a/dotnet/samples/GettingStartedWithProcesses/Step02/Step02a_AccountOpening.cs b/dotnet/samples/GettingStartedWithProcesses/Step02/Step02a_AccountOpening.cs index 1564dc679eec..2ee5bb33db1f 100644 --- a/dotnet/samples/GettingStartedWithProcesses/Step02/Step02a_AccountOpening.cs +++ b/dotnet/samples/GettingStartedWithProcesses/Step02/Step02a_AccountOpening.cs @@ -145,7 +145,7 @@ public async Task UseAccountOpeningProcessSuccessfulInteractionAsync() { Kernel kernel = CreateKernelWithChatCompletion(); KernelProcess kernelProcess = SetupAccountOpeningProcess(); - using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); + await using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); } /// @@ -156,7 +156,7 @@ public async Task UseAccountOpeningProcessFailureDueToCreditScoreFailureAsync() { Kernel kernel = CreateKernelWithChatCompletion(); KernelProcess kernelProcess = SetupAccountOpeningProcess(); - using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); + await using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); } /// @@ -167,6 +167,6 @@ public async Task UseAccountOpeningProcessFailureDueToFraudFailureAsync() { Kernel kernel = CreateKernelWithChatCompletion(); KernelProcess kernelProcess = SetupAccountOpeningProcess(); - using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); + await using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); } } diff --git a/dotnet/samples/GettingStartedWithProcesses/Step02/Step02b_AccountOpening.cs b/dotnet/samples/GettingStartedWithProcesses/Step02/Step02b_AccountOpening.cs index b14b659cd20f..581e41cf76d0 100644 --- a/dotnet/samples/GettingStartedWithProcesses/Step02/Step02b_AccountOpening.cs +++ b/dotnet/samples/GettingStartedWithProcesses/Step02/Step02b_AccountOpening.cs @@ -112,7 +112,7 @@ public async Task UseAccountOpeningProcessSuccessfulInteractionAsync() { Kernel kernel = CreateKernelWithChatCompletion(); KernelProcess kernelProcess = SetupAccountOpeningProcess(); - using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); + await using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); } /// @@ -123,7 +123,7 @@ public async Task UseAccountOpeningProcessFailureDueToCreditScoreFailureAsync() { Kernel kernel = CreateKernelWithChatCompletion(); KernelProcess kernelProcess = SetupAccountOpeningProcess(); - using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); + await using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); } /// @@ -134,6 +134,6 @@ public async Task UseAccountOpeningProcessFailureDueToFraudFailureAsync() { Kernel kernel = CreateKernelWithChatCompletion(); KernelProcess kernelProcess = SetupAccountOpeningProcess(); - using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); + await using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = AccountOpeningEvents.StartProcess, Data = null }); } } diff --git a/dotnet/samples/GettingStartedWithProcesses/Step03/Step03a_FoodPreparation.cs b/dotnet/samples/GettingStartedWithProcesses/Step03/Step03a_FoodPreparation.cs index 0a5f4192c17a..99d2f2f4e122 100644 --- a/dotnet/samples/GettingStartedWithProcesses/Step03/Step03a_FoodPreparation.cs +++ b/dotnet/samples/GettingStartedWithProcesses/Step03/Step03a_FoodPreparation.cs @@ -252,7 +252,7 @@ protected async Task UsePrepareSpecificProductAsync(ProcessBuilder processBuilde // Assert Console.WriteLine($"=== Start SK Process '{processBuilder.Name}' ==="); - using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() + await using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = externalTriggerEvent, Data = new List() }); diff --git a/dotnet/samples/GettingStartedWithProcesses/Step03/Step03b_FoodOrdering.cs b/dotnet/samples/GettingStartedWithProcesses/Step03/Step03b_FoodOrdering.cs index a17f04f43578..e5077d7bea69 100644 --- a/dotnet/samples/GettingStartedWithProcesses/Step03/Step03b_FoodOrdering.cs +++ b/dotnet/samples/GettingStartedWithProcesses/Step03/Step03b_FoodOrdering.cs @@ -45,7 +45,7 @@ protected async Task UsePrepareFoodOrderProcessSingleItemAsync(FoodItem foodItem Kernel kernel = CreateKernelWithChatCompletion(); KernelProcess kernelProcess = SingleFoodItemProcess.CreateProcess().Build(); - using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() + await using var runningProcess = await kernelProcess.StartAsync(kernel, new KernelProcessEvent() { Id = SingleFoodItemProcess.ProcessEvents.SingleOrderReceived, Data = foodItem diff --git a/dotnet/samples/GettingStartedWithProcesses/Step04/Step04_AgentOrchestration.cs b/dotnet/samples/GettingStartedWithProcesses/Step04/Step04_AgentOrchestration.cs index 8ee3fb3adad6..20e506958bd3 100644 --- a/dotnet/samples/GettingStartedWithProcesses/Step04/Step04_AgentOrchestration.cs +++ b/dotnet/samples/GettingStartedWithProcesses/Step04/Step04_AgentOrchestration.cs @@ -65,7 +65,7 @@ private async Task RunProcessAsync(KernelProcess process) Kernel kernel = SetupKernel(history); // Execute process - using LocalKernelProcessContext localProcess = + await using LocalKernelProcessContext localProcess = await process.StartAsync( kernel, new KernelProcessEvent() diff --git a/dotnet/samples/GettingStartedWithProcesses/Step05/Step05_MapReduce.cs b/dotnet/samples/GettingStartedWithProcesses/Step05/Step05_MapReduce.cs index ca8d33818d53..4154fd486f9b 100644 --- a/dotnet/samples/GettingStartedWithProcesses/Step05/Step05_MapReduce.cs +++ b/dotnet/samples/GettingStartedWithProcesses/Step05/Step05_MapReduce.cs @@ -44,7 +44,7 @@ public async Task RunMapReduceAsync() // Execute the process Kernel kernel = new(); - using LocalKernelProcessContext localProcess = + await using LocalKernelProcessContext localProcess = await process.StartAsync( kernel, new KernelProcessEvent diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepContext.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepContext.cs index a3a4f614ee27..e652e0adb367 100644 --- a/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepContext.cs +++ b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepContext.cs @@ -10,17 +10,14 @@ namespace Microsoft.SemanticKernel; public sealed class KernelProcessStepContext { private readonly IKernelProcessMessageChannel _stepMessageChannel; - private readonly IExternalKernelProcessMessageChannel? _externalMessageChannel; /// /// Initializes a new instance of the class. /// /// An instance of . - /// An instance of - public KernelProcessStepContext(IKernelProcessMessageChannel channel, IExternalKernelProcessMessageChannel? externalMessageChannel = null) + public KernelProcessStepContext(IKernelProcessMessageChannel channel) { this._stepMessageChannel = channel; - this._externalMessageChannel = externalMessageChannel; } /// @@ -55,21 +52,4 @@ public ValueTask EmitEventAsync( Visibility = visibility }); } - - /// - /// Emit an external event to through a - /// component if connected from within the SK process - /// - /// data containing event details - /// - /// - public async Task EmitExternalEventAsync(KernelProcessProxyMessage processEventData) - { - if (this._externalMessageChannel == null) - { - throw new KernelException($"External message channel not configured for step with topic {processEventData.ExternalTopicName}"); - } - - await this._externalMessageChannel.EmitExternalEventAsync(processEventData.ExternalTopicName, processEventData).ConfigureAwait(false); - } } diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepExternalContext.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepExternalContext.cs new file mode 100644 index 000000000000..b20c6de61c27 --- /dev/null +++ b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepExternalContext.cs @@ -0,0 +1,54 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Threading.Tasks; + +namespace Microsoft.SemanticKernel; + +/// +/// Provides step related functionality for Kernel Functions running in a step to emit events externally. +/// +public class KernelProcessStepExternalContext +{ + private readonly IExternalKernelProcessMessageChannel? _externalMessageChannel; + + /// + /// Initializes a new instance of the class. + /// + /// An instance of + public KernelProcessStepExternalContext(IExternalKernelProcessMessageChannel? externalMessageChannel = null) + { + this._externalMessageChannel = externalMessageChannel; + } + + /// + /// Emit an external event to through a + /// component if connected from within the SK process + /// + /// data containing event details + /// + /// + public async Task EmitExternalEventAsync(KernelProcessProxyMessage processEventData) + { + if (this._externalMessageChannel == null) + { + throw new KernelException($"External message channel not configured for step with topic {processEventData.ExternalTopicName}"); + } + + await this._externalMessageChannel.EmitExternalEventAsync(processEventData.ExternalTopicName, processEventData).ConfigureAwait(false); + } + + /// + /// Closes connection with external messaging channel + /// + /// + /// + public async Task CloseExternalEventChannelAsync() + { + if (this._externalMessageChannel == null) + { + throw new KernelException("External message channel not configured for step"); + } + + await this._externalMessageChannel.Uninitialize().ConfigureAwait(false); + } +} diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProxyStep.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProxyStep.cs index 0d9d161fdf8e..50619efef346 100644 --- a/dotnet/src/Experimental/Process.Abstractions/KernelProxyStep.cs +++ b/dotnet/src/Experimental/Process.Abstractions/KernelProxyStep.cs @@ -20,6 +20,16 @@ public static class Functions public const string EmitExternalEvent = nameof(EmitExternalEvent); } + /// + /// On deactivation, external communication channel must be closed + /// + /// instance of + /// + public async ValueTask DeactivateAsync(KernelProcessStepExternalContext context) + { + await context.CloseExternalEventChannelAsync().ConfigureAwait(false); + } + /// /// Step function used to emit events externally /// @@ -27,7 +37,7 @@ public static class Functions /// event data passed to proxy step /// [KernelFunction(Functions.EmitExternalEvent)] - public Task EmitExternalEventAsync(KernelProcessStepContext context, KernelProcessProxyMessage proxyEvent) + public Task EmitExternalEventAsync(KernelProcessStepExternalContext context, KernelProcessProxyMessage proxyEvent) { Verify.NotNull(proxyEvent.ExternalTopicName, nameof(proxyEvent.ExternalTopicName)); return context.EmitExternalEventAsync(proxyEvent); diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs index b6ec2c240ada..fd15fa16dd2a 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs @@ -1,5 +1,4 @@ // Copyright (c) Microsoft. All rights reserved. -using System; using System.Threading.Tasks; using Microsoft.SemanticKernel.Process; @@ -8,7 +7,7 @@ namespace Microsoft.SemanticKernel; /// /// Provides context and actions on a process that is running locally. /// -public sealed class LocalKernelProcessContext : KernelProcessContext, IDisposable +public sealed class LocalKernelProcessContext : KernelProcessContext, System.IAsyncDisposable { private readonly LocalProcess _localProcess; private readonly Kernel _kernel; @@ -53,7 +52,10 @@ public override Task SendEventAsync(KernelProcessEvent processEvent) => /// /// Disposes of the resources used by the process. /// - public void Dispose() => this._localProcess.Dispose(); + public async ValueTask DisposeAsync() + { + await this._localProcess.DisposeAsync().ConfigureAwait(false); + } /// public override Task GetExternalMessageChannelAsync() diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalMap.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalMap.cs index 9065ad94e19d..b0f80e890a56 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalMap.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalMap.cs @@ -98,7 +98,7 @@ internal override async Task HandleMessageAsync(ProcessMessage message) { foreach (var operation in mapOperations) { - operation.ProcessContext.Dispose(); + await operation.ProcessContext.DisposeAsync().ConfigureAwait(false); } } } diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs index 583bfcf4565a..cf0cded345ab 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs @@ -16,7 +16,7 @@ namespace Microsoft.SemanticKernel; internal delegate bool ProcessEventProxy(ProcessEvent processEvent); -internal sealed class LocalProcess : LocalStep, IDisposable +internal sealed class LocalProcess : LocalStep, System.IAsyncDisposable { private readonly JoinableTaskFactory _joinableTaskFactory; private readonly JoinableTaskContext _joinableTaskContext; @@ -49,8 +49,15 @@ internal LocalProcess(KernelProcess process, Kernel kernel) this._joinableTaskContext = new JoinableTaskContext(); this._joinableTaskFactory = new JoinableTaskFactory(this._joinableTaskContext); this._logger = this._kernel.LoggerFactory?.CreateLogger(this.Name) ?? new NullLogger(); + // if parent id is null this is the root process + this.RootProcessId = this.ParentProcessId == null ? this.Id : null; } + /// + /// The Id of the root process. + /// + internal string? RootProcessId { get; init; } + /// /// Starts the process with an initial event and an optional kernel. /// @@ -196,6 +203,7 @@ private ValueTask InitializeProcessAsync() new LocalProcess(processStep, this._kernel) { ParentProcessId = this.Id, + RootProcessId = this.RootProcessId, EventProxy = this.EventProxy, ExternalMessageChannel = this.ExternalMessageChannel, }; @@ -213,7 +221,7 @@ private ValueTask InitializeProcessAsync() localStep = new LocalProxy(proxyStep, this._kernel) { - ParentProcessId = this.Id, + ParentProcessId = this.RootProcessId, EventProxy = this.EventProxy, ExternalMessageChannel = this.ExternalMessageChannel, }; @@ -403,11 +411,20 @@ internal override async Task ToKernelProcessStepInfoAsync #endregion - public void Dispose() + /// + public override async Task DeinitializeStepAsync() + { + await this.DisposeAsync().ConfigureAwait(false); + } + + public async ValueTask DisposeAsync() { this._externalEventChannel.Writer.Complete(); this._joinableTaskContext.Dispose(); - this._joinableTaskContext.Dispose(); + foreach (var step in this._steps) + { + await step.DeinitializeStepAsync().ConfigureAwait(false); + } this._processCancelSource?.Dispose(); } } diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalProxy.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalProxy.cs index 9ea020c17fcb..221ffcd2b371 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalProxy.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalProxy.cs @@ -1,9 +1,11 @@ // Copyright (c) Microsoft. All rights reserved. using System.Collections.Generic; using System.Linq; +using System.Reflection; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.SemanticKernel.Process; using Microsoft.SemanticKernel.Process.Internal; using Microsoft.SemanticKernel.Process.Runtime; @@ -77,4 +79,28 @@ protected override async ValueTask InitializeStepAsync() await base.InitializeStepAsync().ConfigureAwait(false); this._isInitialized = true; } + + /// + /// Deinitialization of the Proxy Step, calling + /// + /// + public override async Task DeinitializeStepAsync() + { + MethodInfo? derivedMethod = this._stepInfo.InnerStepType.GetMethod( + nameof(KernelProxyStep.DeactivateAsync), + BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance, + binder: null, + types: [typeof(KernelProcessStepExternalContext)], + modifiers: null); + + if (derivedMethod != null && this._stepInstance != null) + { + var context = new KernelProcessStepExternalContext(this.ExternalMessageChannel); + ValueTask deactivateTask = + (ValueTask?)derivedMethod.Invoke(this._stepInstance, [context]) ?? + throw new KernelException($"The derived DeactivateAsync method failed to complete for step {this.Name}.").Log(this._logger); + + await deactivateTask.ConfigureAwait(false); + } + } } diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs index 53a7a4f69a3b..68840b8849c9 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs @@ -19,7 +19,6 @@ internal class LocalStep : IKernelProcessMessageChannel { private readonly Queue _outgoingEventQueue = new(); private readonly Lazy _initializeTask; - private readonly KernelProcessStepInfo _stepInfo; private readonly ILogger _logger; protected readonly Kernel _kernel; @@ -30,6 +29,8 @@ internal class LocalStep : IKernelProcessMessageChannel protected Dictionary?>? _initialInputs = []; protected Dictionary> _outputEdges; + internal KernelProcessStep? _stepInstance = null; + internal readonly KernelProcessStepInfo _stepInfo; internal readonly string _eventNamespace; /// @@ -246,8 +247,8 @@ internal virtual async Task HandleMessageAsync(ProcessMessage message) protected virtual async ValueTask InitializeStepAsync() { // Instantiate an instance of the inner step object - KernelProcessStep stepInstance = (KernelProcessStep)ActivatorUtilities.CreateInstance(this._kernel.Services, this._stepInfo.InnerStepType); - var kernelPlugin = KernelPluginFactory.CreateFromObject(stepInstance, pluginName: this._stepInfo.State.Name); + this._stepInstance = (KernelProcessStep)ActivatorUtilities.CreateInstance(this._kernel.Services, this._stepInfo.InnerStepType); + var kernelPlugin = KernelPluginFactory.CreateFromObject(this._stepInstance, pluginName: this._stepInfo.State.Name); // Load the kernel functions foreach (KernelFunction f in kernelPlugin) @@ -276,13 +277,22 @@ protected virtual async ValueTask InitializeStepAsync() this._stepState = stateObject; ValueTask activateTask = - (ValueTask?)methodInfo.Invoke(stepInstance, [stateObject]) ?? + (ValueTask?)methodInfo.Invoke(this._stepInstance, [stateObject]) ?? throw new KernelException("The ActivateAsync method failed to complete.").Log(this._logger); - await stepInstance.ActivateAsync(stateObject).ConfigureAwait(false); + await this._stepInstance.ActivateAsync(stateObject).ConfigureAwait(false); await activateTask.ConfigureAwait(false); } + /// + /// Deinitializes the step + /// + public virtual Task DeinitializeStepAsync() + { + this._logger.LogInformation("Step {Name} has deinitialized", this.Name); + return Task.CompletedTask; + } + /// /// Invokes the provides function with the provided kernel and arguments. /// diff --git a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalMapTests.cs b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalMapTests.cs index e9532394d56b..eb454c1e1858 100644 --- a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalMapTests.cs +++ b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalMapTests.cs @@ -38,7 +38,7 @@ public async Task ProcessMapResultAsFirstAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + await using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); // Assert UnionState unionState = await GetUnionStateAsync(processContext); @@ -68,7 +68,7 @@ public async Task ProcessMapResultFilterEventAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + await using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); // Assert UnionState unionState = await GetUnionStateAsync(processContext); @@ -99,7 +99,7 @@ public async Task ProcessMapResultWithTransformAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + await using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); // Assert UnionState unionState = await GetUnionStateAsync(processContext); @@ -130,7 +130,7 @@ public async Task ProcessMapResultOperationTargetAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + await using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); // Assert UnionState unionState = await GetUnionStateAsync(processContext); @@ -166,7 +166,7 @@ public async Task ProcessMapResultAsTargetAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + await using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); // Assert UnionState unionState = await GetUnionStateAsync(processContext); @@ -200,7 +200,7 @@ public async Task ProcessMapResultMultiEventAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + await using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); // Assert UnionState unionState = await GetUnionStateAsync(processContext); @@ -238,7 +238,7 @@ public async Task ProcessMapResultProcessOperationAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + await using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); // Assert UnionState unionState = await GetUnionStateAsync(processContext); @@ -315,7 +315,7 @@ public async Task ProcessMapResultWithTargetExtraAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + await using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); // Assert UnionState unionState = await GetUnionStateAsync(processContext); @@ -363,7 +363,7 @@ public async Task ProcessMapResultForNestedMapAsync() [1, 2, 3, 4, 5], [1, 2, 3, 4, 5], ]; - using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, input, "Start"); + await using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, input, "Start"); // Assert UnionState unionState = await GetUnionStateAsync(processContext); diff --git a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs index d90478f450c3..770eab991394 100644 --- a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs +++ b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs @@ -26,8 +26,7 @@ public async Task ExecuteAsyncInitializesCorrectlyAsync() ], []); var mockKernel = new Kernel(); - using var localProcess = new LocalProcess(mockKernelProcess, mockKernel); - + await using var localProcess = new LocalProcess(mockKernelProcess, mockKernel); // Act await localProcess.StartAsync(); @@ -41,7 +40,7 @@ public async Task ExecuteAsyncInitializesCorrectlyAsync() /// Validates that the assigns and Id to the process if one is not already set. /// [Fact] - public void ProcessWithMissingIdIsAssignedAnId() + public async Task ProcessWithMissingIdIsAssignedAnIdAsync() { // Arrange var mockKernel = new Kernel(); @@ -53,7 +52,7 @@ public void ProcessWithMissingIdIsAssignedAnId() ], []); // Act - using var localProcess = new LocalProcess(mockKernelProcess, mockKernel); + await using var localProcess = new LocalProcess(mockKernelProcess, mockKernel); // Assert Assert.NotEmpty(localProcess.Id); @@ -63,7 +62,7 @@ public void ProcessWithMissingIdIsAssignedAnId() /// Validates that the assigns and Id to the process if one is not already set. /// [Fact] - public void ProcessWithAssignedIdIsNotOverwrittenId() + public async Task ProcessWithAssignedIdIsNotOverwrittenIdAsync() { // Arrange var mockKernel = new Kernel(); @@ -75,7 +74,7 @@ public void ProcessWithAssignedIdIsNotOverwrittenId() ], []); // Act - using var localProcess = new LocalProcess(mockKernelProcess, mockKernel); + await using var localProcess = new LocalProcess(mockKernelProcess, mockKernel); // Assert Assert.NotEmpty(localProcess.Id); @@ -101,7 +100,7 @@ public async Task ProcessFunctionErrorHandledAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext runningProcess = await processInstance.StartAsync(kernel, new KernelProcessEvent() { Id = "Start" }); + await using LocalKernelProcessContext runningProcess = await processInstance.StartAsync(kernel, new KernelProcessEvent() { Id = "Start" }); // Assert Assert.True(kernel.Data.ContainsKey("error-function")); @@ -127,7 +126,7 @@ public async Task ProcessGlobalErrorHandledAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext runningProcess = await processInstance.StartAsync(kernel, new KernelProcessEvent() { Id = "Start" }); + await using LocalKernelProcessContext runningProcess = await processInstance.StartAsync(kernel, new KernelProcessEvent() { Id = "Start" }); // Assert Assert.True(kernel.Data.ContainsKey("error-global")); @@ -154,7 +153,7 @@ public async Task FunctionErrorHandlerTakesPrecedenceAsync() Kernel kernel = new(); // Act - using LocalKernelProcessContext runningProcess = await processInstance.StartAsync(kernel, new KernelProcessEvent() { Id = "Start" }); + await using LocalKernelProcessContext runningProcess = await processInstance.StartAsync(kernel, new KernelProcessEvent() { Id = "Start" }); // Assert Assert.False(kernel.Data.ContainsKey("error-global")); diff --git a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProxyTests.cs b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProxyTests.cs index d08f9de9d3e6..ab732fcaec1a 100644 --- a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProxyTests.cs +++ b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProxyTests.cs @@ -37,24 +37,32 @@ public async Task ProcessWithProxyWithSingleTopicCalledTwiceAsync() Kernel kernel = new(); // Act - LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, null, this._startProcessEvent, externalMessageChannel: mockProxyClient); + await using (LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, null, this._startProcessEvent, externalMessageChannel: mockProxyClient)) + { + // Assert + var runningProcessId = await processContext.GetProcessIdAsync(); - // Assert - Assert.NotNull(mockProxyClient); - Assert.Equal(1, mockProxyClient.InitializationCounter); - Assert.Single(mockProxyClient.CloudEvents); - Assert.Equal(this._topic1, mockProxyClient.CloudEvents[0].TopicName); - Assert.Equal("1", mockProxyClient.CloudEvents[0].Data?.EventData); + Assert.NotNull(mockProxyClient); + Assert.Equal(1, mockProxyClient.InitializationCounter); + Assert.Equal(0, mockProxyClient.UninitializationCounter); + Assert.Single(mockProxyClient.CloudEvents); + Assert.Equal(this._topic1, mockProxyClient.CloudEvents[0].TopicName); + Assert.Equal(runningProcessId, mockProxyClient.CloudEvents[0].Data?.ProcessId); + Assert.Equal("1", mockProxyClient.CloudEvents[0].Data?.EventData); - // Act - await processContext.SendEventAsync(new() { Id = this._startProcessEvent, Data = null }); + // Act + await processContext.SendEventAsync(new() { Id = this._startProcessEvent, Data = null }); - // Assert - Assert.NotNull(mockProxyClient); - Assert.Equal(1, mockProxyClient.InitializationCounter); - Assert.Equal(2, mockProxyClient.CloudEvents.Count); - Assert.Equal(this._topic1, mockProxyClient.CloudEvents[1].TopicName); - Assert.Equal("2", mockProxyClient.CloudEvents[1].Data?.EventData); + // Assert + Assert.NotNull(mockProxyClient); + Assert.Equal(1, mockProxyClient.InitializationCounter); + Assert.Equal(0, mockProxyClient.UninitializationCounter); + Assert.Equal(2, mockProxyClient.CloudEvents.Count); + Assert.Equal(this._topic1, mockProxyClient.CloudEvents[1].TopicName); + Assert.Equal(runningProcessId, mockProxyClient.CloudEvents[0].Data?.ProcessId); + Assert.Equal("2", mockProxyClient.CloudEvents[1].Data?.EventData); + } + Assert.Equal(1, mockProxyClient.UninitializationCounter); } /// @@ -77,8 +85,8 @@ public void ProcessWithProxyFailsToCreateDueMissingTopicRegistration() } /// - /// Validates the result as the first step in the process - /// and with a step as the map operation. + /// Validates the emits different topics from + /// different steps /// [Fact] public async Task ProcessWithCyclesAndProxyWithTwoTopicsAsync() @@ -86,7 +94,138 @@ public async Task ProcessWithCyclesAndProxyWithTwoTopicsAsync() // Arrange CommonSteps.CountStep.Index = 0; var mockProxyClient = new MockCloudEventClient(); - ProcessBuilder process = new(nameof(ProcessWithCyclesAndProxyWithTwoTopicsAsync)); + ProcessBuilder process = this.GetSampleProcessWithProxyEmittingTwoTopics(nameof(ProcessWithCyclesAndProxyWithTwoTopicsAsync)); + KernelProcess processInstance = process.Build(); + Kernel kernel = new(); + + // Act + await using (LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, null, this._startProcessEvent, externalMessageChannel: mockProxyClient)) + { + // Assert + var runningProcessId = await processContext.GetProcessIdAsync(); + + Assert.NotNull(mockProxyClient); + Assert.True(0 < mockProxyClient.InitializationCounter); + Assert.Equal(0, mockProxyClient.UninitializationCounter); + Assert.Equal(3, mockProxyClient.CloudEvents.Count); + Assert.Equal(this._topic1, mockProxyClient.CloudEvents[0].TopicName); + Assert.Equal(runningProcessId, mockProxyClient.CloudEvents[0].Data?.ProcessId); + Assert.Equal("1", mockProxyClient.CloudEvents[0].Data?.EventData); + Assert.Equal(this._topic1, mockProxyClient.CloudEvents[1].TopicName); + Assert.Equal("2", mockProxyClient.CloudEvents[1].Data?.EventData); + Assert.Equal(this._topic2, mockProxyClient.CloudEvents[2].TopicName); + Assert.Equal("2", mockProxyClient.CloudEvents[2].Data?.EventData); + } + + // Assert + Assert.Equal(1, mockProxyClient.UninitializationCounter); + } + + /// + /// Validates the emits different topics from + /// different steps from a nested process + /// + [Fact] + public async Task ProcessWithProxyIn2LevelsNestedProcessEmitsTwoTopicsAsync() + { + // Arrange + CommonSteps.CountStep.Index = 0; + var mockProxyClient = new MockCloudEventClient(); + ProcessBuilder process = new(nameof(ProcessWithProxyIn2LevelsNestedProcessEmitsTwoTopicsAsync)); + var innerProcess = process.AddStepFromProcess(this.GetSampleProcessWithProxyEmittingTwoTopics($"Inner-{nameof(ProcessWithProxyIn2LevelsNestedProcessEmitsTwoTopicsAsync)}")); + + process + .OnInputEvent(this._startProcessEvent) + .SendEventTo(innerProcess.WhereInputEventIs(this._startProcessEvent)); + + KernelProcess processInstance = process.Build(); + Kernel kernel = new(); + + // Act + await using (LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, null, this._startProcessEvent, externalMessageChannel: mockProxyClient)) + { + // Assert + var runningProcessId = await processContext.GetProcessIdAsync(); + + Assert.NotNull(mockProxyClient); + Assert.True(0 < mockProxyClient.InitializationCounter); + Assert.Equal(0, mockProxyClient.UninitializationCounter); + Assert.Equal(3, mockProxyClient.CloudEvents.Count); + Assert.Equal(this._topic1, mockProxyClient.CloudEvents[0].TopicName); + Assert.Equal(runningProcessId, mockProxyClient.CloudEvents[0].Data?.ProcessId); + Assert.Equal("1", mockProxyClient.CloudEvents[0].Data?.EventData); + Assert.Equal(this._topic1, mockProxyClient.CloudEvents[1].TopicName); + Assert.Equal("2", mockProxyClient.CloudEvents[1].Data?.EventData); + Assert.Equal(this._topic2, mockProxyClient.CloudEvents[2].TopicName); + Assert.Equal("2", mockProxyClient.CloudEvents[2].Data?.EventData); + } + + // Assert + Assert.Equal(1, mockProxyClient.UninitializationCounter); + } + + /// + /// Validates the emits different topics from + /// different steps from a deep nested process + /// + [Fact] + public async Task ProcessWithProxyIn4LevelsNestedProcessEmitsTwoTopicsAsync() + { + // Arrange + CommonSteps.CountStep.Index = 0; + var mockProxyClient = new MockCloudEventClient(); + ProcessBuilder process = new(nameof(ProcessWithProxyIn4LevelsNestedProcessEmitsTwoTopicsAsync)); + var innerProcess = process.AddStepFromProcess( + this.GetNestedProcess( + processName: $"Inner1-{nameof(ProcessWithProxyIn4LevelsNestedProcessEmitsTwoTopicsAsync)}", + internalProcess: this.GetSampleProcessWithProxyEmittingTwoTopics($"Inner2-{nameof(ProcessWithProxyIn4LevelsNestedProcessEmitsTwoTopicsAsync)}"), + inputEventName: this._startProcessEvent)); + + process + .OnInputEvent(this._startProcessEvent) + .SendEventTo(innerProcess.WhereInputEventIs(this._startProcessEvent)); + + KernelProcess processInstance = process.Build(); + Kernel kernel = new(); + + // Act + await using (LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, null, this._startProcessEvent, externalMessageChannel: mockProxyClient)) + { + // Assert + var runningProcessId = await processContext.GetProcessIdAsync(); + + Assert.NotNull(mockProxyClient); + Assert.True(0 < mockProxyClient.InitializationCounter); + Assert.Equal(0, mockProxyClient.UninitializationCounter); + Assert.Equal(3, mockProxyClient.CloudEvents.Count); + Assert.Equal(this._topic1, mockProxyClient.CloudEvents[0].TopicName); + Assert.Equal(runningProcessId, mockProxyClient.CloudEvents[0].Data?.ProcessId); + Assert.Equal("1", mockProxyClient.CloudEvents[0].Data?.EventData); + Assert.Equal(this._topic1, mockProxyClient.CloudEvents[1].TopicName); + Assert.Equal("2", mockProxyClient.CloudEvents[1].Data?.EventData); + Assert.Equal(this._topic2, mockProxyClient.CloudEvents[2].TopicName); + Assert.Equal("2", mockProxyClient.CloudEvents[2].Data?.EventData); + } + + // Assert + Assert.Equal(1, mockProxyClient.UninitializationCounter); + } + + private ProcessBuilder GetNestedProcess(string processName, ProcessBuilder internalProcess, string inputEventName) + { + ProcessBuilder process = new(processName); + var innerProcess = process.AddStepFromProcess(this.GetSampleProcessWithProxyEmittingTwoTopics($"Inner-{processName}")); + + process + .OnInputEvent(inputEventName) + .SendEventTo(innerProcess.WhereInputEventIs(inputEventName)); + + return process; + } + + private ProcessBuilder GetSampleProcessWithProxyEmittingTwoTopics(string processName) + { + ProcessBuilder process = new(processName); var counterStep = process.AddStepFromType(); var evenNumberStep = process.AddStepFromType(); @@ -110,22 +249,7 @@ public async Task ProcessWithCyclesAndProxyWithTwoTopicsAsync() .OnEvent(CommonSteps.EvenNumberDetectorStep.OutputEvents.EvenNumber) .EmitExternalEvent(proxyStep, this._topic2); - KernelProcess processInstance = process.Build(); - Kernel kernel = new(); - - // Act - LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, null, this._startProcessEvent, externalMessageChannel: mockProxyClient); - - // Assert - Assert.NotNull(mockProxyClient); - Assert.True(0 < mockProxyClient.InitializationCounter); - Assert.Equal(3, mockProxyClient.CloudEvents.Count); - Assert.Equal(this._topic1, mockProxyClient.CloudEvents[0].TopicName); - Assert.Equal("1", mockProxyClient.CloudEvents[0].Data?.EventData); - Assert.Equal(this._topic1, mockProxyClient.CloudEvents[1].TopicName); - Assert.Equal("2", mockProxyClient.CloudEvents[1].Data?.EventData); - Assert.Equal(this._topic2, mockProxyClient.CloudEvents[2].TopicName); - Assert.Equal("2", mockProxyClient.CloudEvents[2].Data?.EventData); + return process; } private async Task RunProcessAsync(Kernel kernel, KernelProcess process, object? input, string inputEvent, IExternalKernelProcessMessageChannel? externalMessageChannel) diff --git a/dotnet/src/InternalUtilities/process/Abstractions/StepExtensions.cs b/dotnet/src/InternalUtilities/process/Abstractions/StepExtensions.cs index fead79cde844..9db4592f4407 100644 --- a/dotnet/src/InternalUtilities/process/Abstractions/StepExtensions.cs +++ b/dotnet/src/InternalUtilities/process/Abstractions/StepExtensions.cs @@ -131,7 +131,11 @@ public static void InitializeUserState(this KernelProcessStepState stateObject, // and are instantiated here. if (param.ParameterType == typeof(KernelProcessStepContext)) { - inputs[kvp.Key]![param.Name] = new KernelProcessStepContext(channel, externalMessageChannel); + inputs[kvp.Key]![param.Name] = new KernelProcessStepContext(channel); + } + else if (param.ParameterType == typeof(KernelProcessStepExternalContext)) + { + inputs[kvp.Key]![param.Name] = new KernelProcessStepExternalContext(externalMessageChannel); } else {