diff --git a/Test/DurableTask.Core.Tests/TestTaskEntityDispatcher.cs b/Test/DurableTask.Core.Tests/TestTaskEntityDispatcher.cs new file mode 100644 index 000000000..3e5032156 --- /dev/null +++ b/Test/DurableTask.Core.Tests/TestTaskEntityDispatcher.cs @@ -0,0 +1,88 @@ +using DurableTask.Core.Entities; +using DurableTask.Core.Entities.OperationFormat; +using DurableTask.Core.History; +using DurableTask.Core.Logging; +using DurableTask.Core.Middleware; +using DurableTask.Emulator; +using DurableTask.Test.Orchestrations; +using Microsoft.Extensions.Logging; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using System.Collections.Generic; +using static DurableTask.Core.TaskEntityDispatcher; + +namespace DurableTask.Core.Tests +{ + [TestClass] + public class TestTaskEntityDispatcher + { + /// + /// Utiliy function to create a TaskEntityDispatcher instance. To be expanded upon as per testing needs. + /// + /// + private TaskEntityDispatcher GetTaskEntityDispatcher() + { + // TODO: these should probably be injectable parameters to this method, + // initialized with sensible defaults if not provided + var service = new LocalOrchestrationService(); + ILoggerFactory loggerFactory = null; + var entityManager = new NameVersionObjectManager(); + var entityMiddleware = new DispatchMiddlewarePipeline(); + var logger = new LogHelper(loggerFactory?.CreateLogger("DurableTask.Core")); + + TaskEntityDispatcher dispatcher = new TaskEntityDispatcher( + service, entityManager, entityMiddleware, logger, ErrorPropagationMode.UseFailureDetails); + return dispatcher; + } + + /// + /// See: https://github.com/Azure/durabletask/pull/1080 + /// This test is motivated by a regression where Entities + /// that scheduled sub-orchestrators would incorrectly set + /// the FireAndForget tag on the ExecutionStartedEvent, causing + /// them to then receive a SubOrchestrationCompleted event, which + /// they did not know how to handle. Eventually, this led to them deleting + /// their own state. This test checks against that case. + /// + [TestMethod] + public void TestEntityDoesNotSetFireAndForgetTags() + { + TaskEntityDispatcher dispatcher = GetTaskEntityDispatcher(); + + // Prepare effects + var effects = new WorkItemEffects(); + effects.taskIdCounter = 0; + effects.InstanceMessages = new List(); + + // Prepare runtime state + var mockEntityStartEvent = new ExecutionStartedEvent(-1, null) + { + OrchestrationInstance = new OrchestrationInstance(), + Name = "testentity", + Version = "1.0", + }; + var runtimeState = new OrchestrationRuntimeState(); + runtimeState.AddEvent(mockEntityStartEvent); + + // Prepare action. + // This mocks starting a new orchestration from an entity. + var action = new StartNewOrchestrationOperationAction() + { + InstanceId = "testsample", + Name = "test", + Version = "1.0", + Input = null, + }; + + // Invoke the dispatcher and obtain resulting event + dispatcher.ProcessSendStartMessage(effects, runtimeState, action); + HistoryEvent resultEvent = effects.InstanceMessages[0].Event; + + Assert.IsInstanceOfType(resultEvent, typeof(ExecutionStartedEvent)); + var executionStartedEvent = (ExecutionStartedEvent)resultEvent; + + // The resulting event should contain a fire and forget tag + bool hasFireAndForgetTag = executionStartedEvent.Tags.ContainsKey(OrchestrationTags.FireAndForget); + Assert.IsTrue(hasFireAndForgetTag); + } + } +} diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index 8e6dd7d8d..a7ecb5267 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -12,11 +12,6 @@ // ---------------------------------------------------------------------------------- namespace DurableTask.Core { - using System; - using System.Collections.Generic; - using System.Diagnostics; - using System.Threading; - using System.Threading.Tasks; using DurableTask.Core.Common; using DurableTask.Core.Entities; using DurableTask.Core.Entities.EventFormat; @@ -27,6 +22,11 @@ namespace DurableTask.Core using DurableTask.Core.Middleware; using DurableTask.Core.Tracing; using Newtonsoft.Json; + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Threading; + using System.Threading.Tasks; /// /// Dispatcher for orchestrations and entities to handle processing and renewing, completion of orchestration events. @@ -182,7 +182,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) } } - class WorkItemEffects + internal class WorkItemEffects { public List ActivityMessages; public List TimerMessages; @@ -563,6 +563,9 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, out SchedulerState sc } break; + + default: + throw new EntitySchedulerException($"The entity with instanceId '{instanceId}' received an unexpected event type of type '{e.EventType}'. This is not a valid entity message. This is a framework-internal error, please report this issue in the GitHub repo: 'https://github.com/Azure/durabletask/'"); } } @@ -791,7 +794,7 @@ void ProcessSendEventMessage(WorkItemEffects effects, OrchestrationInstance dest }); } - void ProcessSendStartMessage(WorkItemEffects effects, OrchestrationRuntimeState runtimeState, StartNewOrchestrationOperationAction action) + internal void ProcessSendStartMessage(WorkItemEffects effects, OrchestrationRuntimeState runtimeState, StartNewOrchestrationOperationAction action) { OrchestrationInstance destination = new OrchestrationInstance() { @@ -801,8 +804,8 @@ void ProcessSendStartMessage(WorkItemEffects effects, OrchestrationRuntimeState var executionStartedEvent = new ExecutionStartedEvent(-1, action.Input) { Tags = OrchestrationTags.MergeTags( - runtimeState.Tags, - new Dictionary() { { OrchestrationTags.FireAndForget, "" } }), + newTags: new Dictionary() { { OrchestrationTags.FireAndForget, "" } }, + existingTags: runtimeState.Tags), OrchestrationInstance = destination, ScheduledStartTime = action.ScheduledStartTime, ParentInstance = new ParentInstance @@ -881,4 +884,4 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ => return result; } } -} \ No newline at end of file +} diff --git a/src/DurableTask.Emulator/LocalOrchestrationService.cs b/src/DurableTask.Emulator/LocalOrchestrationService.cs index a8781df09..1331299ce 100644 --- a/src/DurableTask.Emulator/LocalOrchestrationService.cs +++ b/src/DurableTask.Emulator/LocalOrchestrationService.cs @@ -25,11 +25,12 @@ namespace DurableTask.Emulator using System.Text; using System.Threading; using System.Threading.Tasks; + using DurableTask.Core.Entities; /// /// Fully functional in-proc orchestration service for testing /// - public class LocalOrchestrationService : IOrchestrationService, IOrchestrationServiceClient, IDisposable + public class LocalOrchestrationService : IOrchestrationService, IOrchestrationServiceClient, IEntityOrchestrationService, IDisposable { // ReSharper disable once NotAccessedField.Local Dictionary sessionState; @@ -669,5 +670,36 @@ void Dispose(bool disposing) this.cancellationTokenSource.Dispose(); } } + + /// + /// Test only for core entities. The value is set as default. + EntityBackendProperties IEntityOrchestrationService.EntityBackendProperties => new EntityBackendProperties() + { + EntityMessageReorderWindow = TimeSpan.FromMinutes(30), + MaxEntityOperationBatchSize = null, + MaxConcurrentTaskEntityWorkItems = 100, + SupportsImplicitEntityDeletion = false, // not supported by this backend + MaximumSignalDelayTime = TimeSpan.FromDays(6), + UseSeparateQueueForEntityWorkItems = false, + }; + + /// + EntityBackendQueries IEntityOrchestrationService.EntityBackendQueries => null; + + /// + Task IEntityOrchestrationService.LockNextEntityWorkItemAsync( + TimeSpan receiveTimeout, + CancellationToken cancellationToken) + { + return this.LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, cancellationToken); + } + + /// + Task IEntityOrchestrationService.LockNextOrchestrationWorkItemAsync( + TimeSpan receiveTimeout, + CancellationToken cancellationToken) + { + return this.LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, cancellationToken); + } } } diff --git a/test/DurableTask.SqlServer.Tests/DurableTask.SqlServer.Tests.csproj b/test/DurableTask.SqlServer.Tests/DurableTask.SqlServer.Tests.csproj index cf578db09..e698b6266 100644 --- a/test/DurableTask.SqlServer.Tests/DurableTask.SqlServer.Tests.csproj +++ b/test/DurableTask.SqlServer.Tests/DurableTask.SqlServer.Tests.csproj @@ -23,7 +23,7 @@ - +