Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions Test/DurableTask.Core.Tests/TestTaskEntityDispatcher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using DurableTask.Core.Entities;
using DurableTask.Core.Entities.OperationFormat;
using DurableTask.Core.History;
using DurableTask.Core.Logging;
using DurableTask.Core.Middleware;
using DurableTask.Emulator;
using DurableTask.Test.Orchestrations;
using Microsoft.Extensions.Logging;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Collections.Generic;
using static DurableTask.Core.TaskEntityDispatcher;

namespace DurableTask.Core.Tests
{
[TestClass]
public class TestTaskEntityDispatcher
{
/// <summary>
/// Utiliy function to create a TaskEntityDispatcher instance. To be expanded upon as per testing needs.
/// </summary>
/// <returns></returns>
private TaskEntityDispatcher GetTaskEntityDispatcher()
{
// TODO: these should probably be injectable parameters to this method,
// initialized with sensible defaults if not provided
var service = new LocalOrchestrationService();
ILoggerFactory loggerFactory = null;
var entityManager = new NameVersionObjectManager<TaskEntity>();
var entityMiddleware = new DispatchMiddlewarePipeline();
var logger = new LogHelper(loggerFactory?.CreateLogger("DurableTask.Core"));

TaskEntityDispatcher dispatcher = new TaskEntityDispatcher(
service, entityManager, entityMiddleware, logger, ErrorPropagationMode.UseFailureDetails);
return dispatcher;
}

/// <summary>
/// See: https://github.com/Azure/durabletask/pull/1080
/// This test is motivated by a regression where Entities
/// that scheduled sub-orchestrators would incorrectly set
/// the FireAndForget tag on the ExecutionStartedEvent, causing
/// them to then receive a SubOrchestrationCompleted event, which
/// they did not know how to handle. Eventually, this led to them deleting
/// their own state. This test checks against that case.
/// </summary>
[TestMethod]
public void TestEntityDoesNotSetFireAndForgetTags()
{
TaskEntityDispatcher dispatcher = GetTaskEntityDispatcher();

// Prepare effects
var effects = new WorkItemEffects();
effects.taskIdCounter = 0;
effects.InstanceMessages = new List<TaskMessage>();

// Prepare runtime state
var mockEntityStartEvent = new ExecutionStartedEvent(-1, null)
{
OrchestrationInstance = new OrchestrationInstance(),
Name = "testentity",
Version = "1.0",
};
var runtimeState = new OrchestrationRuntimeState();
runtimeState.AddEvent(mockEntityStartEvent);

// Prepare action.
// This mocks starting a new orchestration from an entity.
var action = new StartNewOrchestrationOperationAction()
{
InstanceId = "testsample",
Name = "test",
Version = "1.0",
Input = null,
};

// Invoke the dispatcher and obtain resulting event
dispatcher.ProcessSendStartMessage(effects, runtimeState, action);
HistoryEvent resultEvent = effects.InstanceMessages[0].Event;

Assert.IsInstanceOfType(resultEvent, typeof(ExecutionStartedEvent));
var executionStartedEvent = (ExecutionStartedEvent)resultEvent;

// The resulting event should contain a fire and forget tag
Copy link
Collaborator Author

@nytian nytian May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be contain not not contain . Only when there is a fire and forget tag then the orchestration won't send back a OrchestrationCompleted event.

bool hasFireAndForgetTag = executionStartedEvent.Tags.ContainsKey(OrchestrationTags.FireAndForget);
Assert.IsTrue(hasFireAndForgetTag);
}
}
}
23 changes: 13 additions & 10 deletions src/DurableTask.Core/TaskEntityDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@
// ----------------------------------------------------------------------------------
namespace DurableTask.Core
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core.Common;
using DurableTask.Core.Entities;
using DurableTask.Core.Entities.EventFormat;
Expand All @@ -27,6 +22,11 @@ namespace DurableTask.Core
using DurableTask.Core.Middleware;
using DurableTask.Core.Tracing;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Dispatcher for orchestrations and entities to handle processing and renewing, completion of orchestration events.
Expand Down Expand Up @@ -182,7 +182,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
}
}

class WorkItemEffects
internal class WorkItemEffects
{
public List<TaskMessage> ActivityMessages;
public List<TaskMessage> TimerMessages;
Expand Down Expand Up @@ -563,6 +563,9 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, out SchedulerState sc
}

break;

default:
throw new EntitySchedulerException($"The entity with instanceId '{instanceId}' received an unexpected event type of type '{e.EventType}'. This is not a valid entity message. This is a framework-internal error, please report this issue in the GitHub repo: 'https://github.com/Azure/durabletask/'");
}
}

Expand Down Expand Up @@ -791,7 +794,7 @@ void ProcessSendEventMessage(WorkItemEffects effects, OrchestrationInstance dest
});
}

void ProcessSendStartMessage(WorkItemEffects effects, OrchestrationRuntimeState runtimeState, StartNewOrchestrationOperationAction action)
internal void ProcessSendStartMessage(WorkItemEffects effects, OrchestrationRuntimeState runtimeState, StartNewOrchestrationOperationAction action)
{
OrchestrationInstance destination = new OrchestrationInstance()
{
Expand All @@ -801,8 +804,8 @@ void ProcessSendStartMessage(WorkItemEffects effects, OrchestrationRuntimeState
var executionStartedEvent = new ExecutionStartedEvent(-1, action.Input)
{
Tags = OrchestrationTags.MergeTags(
runtimeState.Tags,
new Dictionary<string, string>() { { OrchestrationTags.FireAndForget, "" } }),
newTags: new Dictionary<string, string>() { { OrchestrationTags.FireAndForget, "" } },
existingTags: runtimeState.Tags),
OrchestrationInstance = destination,
ScheduledStartTime = action.ScheduledStartTime,
ParentInstance = new ParentInstance
Expand Down Expand Up @@ -881,4 +884,4 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>
return result;
}
}
}
}
34 changes: 33 additions & 1 deletion src/DurableTask.Emulator/LocalOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ namespace DurableTask.Emulator
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core.Entities;

/// <summary>
/// Fully functional in-proc orchestration service for testing
/// </summary>
public class LocalOrchestrationService : IOrchestrationService, IOrchestrationServiceClient, IDisposable
public class LocalOrchestrationService : IOrchestrationService, IOrchestrationServiceClient, IEntityOrchestrationService, IDisposable
{
// ReSharper disable once NotAccessedField.Local
Dictionary<string, byte[]> sessionState;
Expand Down Expand Up @@ -669,5 +670,36 @@ void Dispose(bool disposing)
this.cancellationTokenSource.Dispose();
}
}

/// <inheritdoc />
/// Test only for core entities. The value is set as default.
EntityBackendProperties IEntityOrchestrationService.EntityBackendProperties => new EntityBackendProperties()
{
EntityMessageReorderWindow = TimeSpan.FromMinutes(30),
MaxEntityOperationBatchSize = null,
MaxConcurrentTaskEntityWorkItems = 100,
SupportsImplicitEntityDeletion = false, // not supported by this backend
MaximumSignalDelayTime = TimeSpan.FromDays(6),
UseSeparateQueueForEntityWorkItems = false,
};

/// <inheritdoc />
EntityBackendQueries IEntityOrchestrationService.EntityBackendQueries => null;

/// <inheritdoc />
Task<TaskOrchestrationWorkItem> IEntityOrchestrationService.LockNextEntityWorkItemAsync(
TimeSpan receiveTimeout,
CancellationToken cancellationToken)
{
return this.LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, cancellationToken);
}

/// <inheritdoc />
Task<TaskOrchestrationWorkItem> IEntityOrchestrationService.LockNextOrchestrationWorkItemAsync(
TimeSpan receiveTimeout,
CancellationToken cancellationToken)
{
return this.LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" />
<PackageReference Include="MSTest.TestAdapter" Version="1.3.2" />
<PackageReference Include="MSTest.TestFramework" Version="1.3.2" />
<PackageReference Include="System.Data.SqlClient" Version="4.8.5" />
<PackageReference Include="System.Data.SqlClient" Version="4.8.6" />
</ItemGroup>

<ItemGroup>
Expand Down