From 9c989b88db006c74dfbb7924e38efa9672faf2fa Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Thu, 21 Sep 2023 14:53:53 -0700 Subject: [PATCH 1/4] pass entity parameters to orchestration executor --- src/Shared/Grpc/ProtoUtils.cs | 20 ++++++++++++++++++++ src/Worker/Grpc/GrpcOrchestrationRunner.cs | 2 +- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/Shared/Grpc/ProtoUtils.cs b/src/Shared/Grpc/ProtoUtils.cs index 63c0e1307..f4d0ef49f 100644 --- a/src/Shared/Grpc/ProtoUtils.cs +++ b/src/Shared/Grpc/ProtoUtils.cs @@ -7,6 +7,7 @@ using System.Text; using DurableTask.Core; using DurableTask.Core.Command; +using DurableTask.Core.Entities; using DurableTask.Core.Entities.OperationFormat; using DurableTask.Core.History; using Google.Protobuf; @@ -631,6 +632,25 @@ internal static OrchestrationStatus ToCore(this P.OrchestrationStatus status) return batchResult; } + /// + /// Converts the gRPC representation of orchestrator entity parameters to the DT.Core representation. + /// + /// The DT.Core representation. + /// The gRPC representation. + [return: NotNullIfNotNull("parameters")] + internal static TaskOrchestrationEntityParameters? ToCore(this P.OrchestratorEntityParameters? parameters) + { + if (parameters == null) + { + return null; + } + + return new TaskOrchestrationEntityParameters() + { + EntityMessageReorderWindow = parameters.EntityMessageReorderWindow.ToTimeSpan(), + }; + } + /// /// Gets the approximate byte count for a . /// diff --git a/src/Worker/Grpc/GrpcOrchestrationRunner.cs b/src/Worker/Grpc/GrpcOrchestrationRunner.cs index 478509168..7f39f0ff8 100644 --- a/src/Worker/Grpc/GrpcOrchestrationRunner.cs +++ b/src/Worker/Grpc/GrpcOrchestrationRunner.cs @@ -113,7 +113,7 @@ public static string LoadAndRun( ? DurableTaskShimFactory.Default : ActivatorUtilities.GetServiceOrCreateInstance(services); TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, parent); - TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover); + TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore()); OrchestratorExecutionResult result = executor.Execute(); P.OrchestratorResponse response = ProtoUtils.ConstructOrchestratorResponse( From 919e878ace0a1b933fa8bd44bf2c7e465aa99f1a Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Thu, 21 Sep 2023 14:54:11 -0700 Subject: [PATCH 2/4] remove erroneous check --- src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index 73d920745..b9e1484fb 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -202,11 +202,6 @@ public override async Task CreateTimer(DateTime fireAt, CancellationToken cancel /// public override Task WaitForExternalEvent(string eventName, CancellationToken cancellationToken = default) { - if (typeof(T) == typeof(OperationResult)) - { - throw new ArgumentException($"the type {nameof(OperationResult)} cannot be used for application-defined events", nameof(T)); - } - // Return immediately if this external event has already arrived. if (this.externalEventBuffer.TryTake(eventName, out string? bufferedEventPayload)) { From 9946c3e43ca4fe7830b2bdf6ec87913cce9ab98f Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Thu, 21 Sep 2023 14:54:45 -0700 Subject: [PATCH 3/4] pass EntityMessageEvent as the original object --- src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs index c43118d0d..289bb3101 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs @@ -76,7 +76,7 @@ public override async Task LockEntitiesAsync(IEnumerable(criticalSectionId.ToString()); @@ -155,7 +155,7 @@ public void ExitCriticalSection(Guid? matchCriticalSectionId = null) // releaseMessage.EventContent); } - this.wrapper.innerContext.SendEvent(releaseMessage.TargetInstance, releaseMessage.EventName, releaseMessage.ContentAsObject()); + this.wrapper.innerContext.SendEvent(releaseMessage.TargetInstance, releaseMessage.EventName, releaseMessage); } } } @@ -212,7 +212,7 @@ Guid SendOperationMessage(string instanceId, string operationName, object? input // entityMessageEvent.ToString()); } - this.wrapper.innerContext.SendEvent(entityMessageEvent.TargetInstance, entityMessageEvent.EventName, entityMessageEvent.ContentAsObject()); + this.wrapper.innerContext.SendEvent(entityMessageEvent.TargetInstance, entityMessageEvent.EventName, entityMessageEvent); return guid; } From ee242143524c7e8c5d7044df31935ed31e8030f3 Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Thu, 21 Sep 2023 15:33:04 -0700 Subject: [PATCH 4/4] latest proto --- eng/proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eng/proto b/eng/proto index dad676a26..76cf3b856 160000 --- a/eng/proto +++ b/eng/proto @@ -1 +1 @@ -Subproject commit dad676a26b2fdc23e3dabb3ee7380e646d581410 +Subproject commit 76cf3b85656282deb5700524e8baa641706ecddb