Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
adcaf61
Bring entity logic into DurableTask.Core (first milestone) (#887)
sebastianburckhardt Aug 29, 2023
ac49392
Merge branch 'main' into feature/core-entities
sebastianburckhardt Sep 7, 2023
7eca55e
fix semantic merge conflict.
sebastianburckhardt Sep 7, 2023
ff23b97
Revise entity state and status format and access (#955)
sebastianburckhardt Sep 8, 2023
48d7d0d
Revise entity message format and external access (#956)
sebastianburckhardt Sep 8, 2023
d3e2057
make current critical section id publicly visible (#958)
sebastianburckhardt Sep 8, 2023
e043e38
remove orchestration tags from entity action (#952)
sebastianburckhardt Sep 8, 2023
73e5203
Rename OperationBatchRequest and OperationBatchResponse (#953)
sebastianburckhardt Sep 8, 2023
3f98043
Revise how entity batches are executed and handle failures (#954)
sebastianburckhardt Sep 8, 2023
3548d9f
revise operation result encoding and add more comments. (#965)
sebastianburckhardt Sep 13, 2023
e729bb6
revise entity backend properties and implement entity backend queries…
sebastianburckhardt Sep 21, 2023
8a907d0
Update versions for ADO feed (#973)
jviau Sep 21, 2023
f49f615
Add no-warn for NU5104 (#974)
jviau Sep 21, 2023
cb47d63
revise propagation path for entity parameters (#971)
sebastianburckhardt Oct 3, 2023
9b842ca
Revise entity queries (#981)
sebastianburckhardt Oct 5, 2023
f303e9d
fix bugs in tracking store implementation (#979)
sebastianburckhardt Oct 5, 2023
3ba55f4
add scheduled start time parameter to the start-new-orchestration ope…
sebastianburckhardt Oct 6, 2023
b50f47c
Merge branch 'main' into feature/core-entities
sebastianburckhardt Oct 6, 2023
42ab8c1
Revise serialization of entitymessages (#972)
sebastianburckhardt Oct 6, 2023
25c0e76
Rename includeStateless to includeTransient in entity queries (#985)
sebastianburckhardt Oct 9, 2023
da2eaf4
Rev to entities-preview.2 (#986)
jviau Oct 9, 2023
9957df7
fix null reference exception when running on older backends (#989)
sebastianburckhardt Oct 12, 2023
cc0214a
Prepare for public preview (#994)
jviau Oct 17, 2023
ca71117
Merge branch 'main' into feature/core-entities
jviau Oct 17, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
340 changes: 340 additions & 0 deletions Test/DurableTask.Core.Tests/MessageSorterTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,340 @@
// ---------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ---------------------------------------------------------------

namespace DurableTask.Core.Tests
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using DurableTask.Core.Entities;
using DurableTask.Core.Entities.EventFormat;
using Microsoft.VisualStudio.TestTools.UnitTesting;

[TestClass]
public class MessageSorterTests
{
private static readonly TimeSpan ReorderWindow = TimeSpan.FromMinutes(30);

[TestMethod]
public void SimpleInOrder()
{
var senderId = "A";
var receiverId = "B";

var senderSorter = new MessageSorter();

var message1 = Send(senderId, receiverId, "1", senderSorter, DateTime.UtcNow);
var message2 = Send(senderId, receiverId, "2", senderSorter, DateTime.UtcNow);
var message3 = Send(senderId, receiverId, "3", senderSorter, DateTime.UtcNow);

List<RequestMessage> batch;
MessageSorter receiverSorter = new MessageSorter();

// delivering the sequence in order produces 1 message each time
batch = receiverSorter.ReceiveInOrder(message1, ReorderWindow).ToList();
Assert.That.Single(batch).Input.Equals("1");
batch = receiverSorter.ReceiveInOrder(message2, ReorderWindow).ToList();
Assert.That.Single(batch).Input.Equals("2");
batch = receiverSorter.ReceiveInOrder(message3, ReorderWindow).ToList();
Assert.That.Single(batch).Input.Equals("3");

Assert.AreEqual(0, receiverSorter.NumberBufferedRequests);
}

[TestMethod]
public void WackySystemClock()
{
var senderId = "A";
var receiverId = "B";

var senderSorter = new MessageSorter();

// simulate system clock that goes backwards - mechanism should still guarantee monotonicitty
var message1 = Send(senderId, receiverId, "1", senderSorter, DateTime.UtcNow);
var message2 = Send(senderId, receiverId, "2", senderSorter, DateTime.UtcNow - TimeSpan.FromSeconds(1));
var message3 = Send(senderId, receiverId, "3", senderSorter, DateTime.UtcNow - TimeSpan.FromSeconds(2));

List<RequestMessage> batch;
MessageSorter receiverSorter = new MessageSorter();

// delivering the sequence in order produces 1 message each time
batch = receiverSorter.ReceiveInOrder(message1, ReorderWindow).ToList();
Assert.That.Single(batch).Input.Equals("1");
batch = receiverSorter.ReceiveInOrder(message2, ReorderWindow).ToList();
Assert.That.Single(batch).Input.Equals("2");
batch = receiverSorter.ReceiveInOrder(message3, ReorderWindow).ToList();
Assert.That.Single(batch).Input.Equals("3");

Assert.AreEqual(0, receiverSorter.NumberBufferedRequests);
}

[TestMethod]
public void DelayedElement()
{
var senderId = "A";
var receiverId = "B";

var senderSorter = new MessageSorter();

var message1 = Send(senderId, receiverId, "1", senderSorter, DateTime.UtcNow);
var message2 = Send(senderId, receiverId, "2", senderSorter, DateTime.UtcNow);
var message3 = Send(senderId, receiverId, "3", senderSorter, DateTime.UtcNow);

List<RequestMessage> batch;
MessageSorter receiverSorter;

// delivering first message last delays all messages until getting the first one
receiverSorter = new MessageSorter();
batch = receiverSorter.ReceiveInOrder(message2, ReorderWindow).ToList();
Assert.That.Empty(batch);
batch = receiverSorter.ReceiveInOrder(message3, ReorderWindow).ToList();
Assert.That.Empty(batch);
batch = receiverSorter.ReceiveInOrder(message1, ReorderWindow).ToList();
Assert.That.Collection(
batch,
first => first.Input.Equals("1"),
second => second.Input.Equals("2"),
third => third.Input.Equals("3"));

Assert.AreEqual(0, receiverSorter.NumberBufferedRequests);
}

[TestMethod]
public void NoFilteringOrSortingPastReorderWindow()
{
var senderId = "A";
var receiverId = "B";

var senderSorter = new MessageSorter();
var now = DateTime.UtcNow;

// last message is sent after an interval exceeding the reorder window
var message1 = Send(senderId, receiverId, "1", senderSorter, now);
var message2 = Send(senderId, receiverId, "2", senderSorter, now + TimeSpan.FromTicks(1));
var message3 = Send(senderId, receiverId, "3", senderSorter, now + TimeSpan.FromTicks(2) + ReorderWindow);

List<RequestMessage> batch;
MessageSorter receiverSorter = new MessageSorter();

// delivering the sequence in order produces 1 message each time
receiverSorter = new MessageSorter();
batch = receiverSorter.ReceiveInOrder(message1, ReorderWindow).ToList();
Assert.That.Single(batch).Input.Equals("1");
batch = receiverSorter.ReceiveInOrder(message2, ReorderWindow).ToList();
Assert.That.Single(batch).Input.Equals("2");
batch = receiverSorter.ReceiveInOrder(message3, ReorderWindow).ToList();
Assert.That.Single(batch).Input.Equals("3");

// duplicates are not filtered or sorted, but simply passed through, because we are past the reorder window
batch = receiverSorter.ReceiveInOrder(message2, ReorderWindow).ToList();
Assert.That.Single(batch).Input.Equals("2");
batch = receiverSorter.ReceiveInOrder(message1, ReorderWindow).ToList();
Assert.That.Single(batch).Input.Equals("1");

Assert.AreEqual(0, receiverSorter.NumberBufferedRequests);
}

[TestMethod]
public void DuplicatedElements()
{
var senderId = "A";
var receiverId = "B";

var senderSorter = new MessageSorter();

var message1 = Send(senderId, receiverId, "1", senderSorter, DateTime.UtcNow);
var message2 = Send(senderId, receiverId, "2", senderSorter, DateTime.UtcNow);
var message3 = Send(senderId, receiverId, "3", senderSorter, DateTime.UtcNow);

List<RequestMessage> batch;
MessageSorter receiverSorter;

// delivering first message last delays all messages until getting the first one
receiverSorter = new MessageSorter();
batch = receiverSorter.ReceiveInOrder(message2, ReorderWindow).ToList();
Assert.That.Empty(batch);
batch = receiverSorter.ReceiveInOrder(message2, ReorderWindow).ToList();
Assert.That.Empty(batch);
batch = receiverSorter.ReceiveInOrder(message1, ReorderWindow).ToList();
Assert.That.Collection(
batch,
first => first.Input.Equals("1"),
second => second.Input.Equals("2"));
batch = receiverSorter.ReceiveInOrder(message2, ReorderWindow).ToList();
Assert.That.Empty(batch);
batch = receiverSorter.ReceiveInOrder(message1, ReorderWindow).ToList();
Assert.That.Empty(batch);
batch = receiverSorter.ReceiveInOrder(message3, ReorderWindow).ToList();
Assert.That.Single(batch).Input.Equals("3");
batch = receiverSorter.ReceiveInOrder(message3, ReorderWindow).ToList();
Assert.That.Empty(batch);
batch = receiverSorter.ReceiveInOrder(message2, ReorderWindow).ToList();
Assert.That.Empty(batch);
batch = receiverSorter.ReceiveInOrder(message1, ReorderWindow).ToList();
Assert.That.Empty(batch);

Assert.AreEqual(0, receiverSorter.NumberBufferedRequests);
}

[TestMethod]
public void RandomShuffleAndDuplication()
{
var senderId = "A";
var receiverId = "B";

var senderSorter = new MessageSorter();
var receiverSorter = new MessageSorter();

var messageCount = 100;
var duplicateCount = 100;

// create a ordered sequence of messages
var messages = new List<RequestMessage>();
for (int i = 0; i < messageCount; i++)
{
messages.Add(Send(senderId, receiverId, i.ToString(), senderSorter, DateTime.UtcNow));
}

// add some random duplicates
var random = new Random(0);
for (int i = 0; i < duplicateCount; i++)
{
messages.Add(messages[random.Next(messageCount)]);
}

// shuffle the messages
Shuffle(messages, random);

// deliver all the messages
var deliveredMessages = new List<RequestMessage>();

foreach (var msg in messages)
{
foreach (var deliveredMessage in receiverSorter.ReceiveInOrder(msg, ReorderWindow))
{
deliveredMessages.Add(deliveredMessage);
}
}

// check that the delivered messages are the original sequence
Assert.AreEqual(messageCount, deliveredMessages.Count());
for (int i = 0; i < messageCount; i++)
{
Assert.AreEqual(i.ToString(), deliveredMessages[i].Input);
}

Assert.AreEqual(0, receiverSorter.NumberBufferedRequests);
}

/// <summary>
/// Tests that if messages get reordered beyond the supported reorder window,
/// we still deliver them all but they may now be out of order.
/// </summary>
[TestMethod]
public void RandomCollection()
{
var senderId = "A";
var receiverId = "B";

var senderSorter = new MessageSorter();
var receiverSorter = new MessageSorter();

var messageCount = 100;

var random = new Random(0);
var now = DateTime.UtcNow;

// create a ordered sequence of messages
var messages = new List<RequestMessage>();
for (int i = 0; i < messageCount; i++)
{
messages.Add(Send(senderId, receiverId, i.ToString(), senderSorter, now + TimeSpan.FromSeconds(random.Next(5)), TimeSpan.FromSeconds(10)));
}

// shuffle the messages
Shuffle(messages, random);

// add a final message
messages.Add(Send(senderId, receiverId, (messageCount + 1).ToString(), senderSorter, now + TimeSpan.FromSeconds(1000), TimeSpan.FromSeconds(10)));

// deliver all the messages
var deliveredMessages = new List<RequestMessage>();

for (int i = 0; i < messageCount; i++)
{
foreach (var deliveredMessage in receiverSorter.ReceiveInOrder(messages[i], TimeSpan.FromSeconds(10)))
{
deliveredMessages.Add(deliveredMessage);
}

Assert.AreEqual(i + 1, deliveredMessages.Count + receiverSorter.NumberBufferedRequests);
}

// receive the final messages
foreach (var deliveredMessage in receiverSorter.ReceiveInOrder(messages[messageCount], TimeSpan.FromSeconds(10)))
{
deliveredMessages.Add(deliveredMessage);
}

// check that all messages were delivered
Assert.AreEqual(messageCount + 1, deliveredMessages.Count());

Assert.AreEqual(0, receiverSorter.NumberBufferedRequests);
}

private static RequestMessage Send(string senderId, string receiverId, string input, MessageSorter sorter, DateTime now, TimeSpan? reorderWindow = null)
{
var msg = new RequestMessage()
{
Id = Guid.NewGuid(),
ParentInstanceId = senderId,
Input = input,
};
sorter.LabelOutgoingMessage(msg, receiverId, now, reorderWindow.HasValue ? reorderWindow.Value : ReorderWindow);
return msg;
}

private static void Shuffle<T>(IList<T> list, Random random)
{
int n = list.Count;
while (n > 1)
{
n--;
int k = random.Next(n + 1);
T value = list[k];
list[k] = list[n];
list[n] = value;
}
}
}

internal static class AssertExtensions
{
public static void Empty<T>(this Assert assert, IEnumerable<T> collection)
{
Assert.AreEqual(0, collection.Count());
}

public static T Single<T>(this Assert assert, IEnumerable<T> collection)
{
var e = collection.GetEnumerator();
Assert.IsTrue(e.MoveNext());
T element = e.Current;
Assert.IsFalse(e.MoveNext());
return element;
}

public static void Collection<T>(this Assert assert, IEnumerable<T> collection, params Action<T>[] elementInspectors)
{
var list = collection.ToList();
Assert.AreEqual(elementInspectors.Length, list.Count);
for(int i = 0; i < elementInspectors.Length; i++)
{
elementInspectors[i](list[i]);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<Description>Orchestration message and runtime state is stored in Azure Service Fabric reliable collections.</Description>
<Authors>Microsoft</Authors>
<Platforms>AnyCPU;x64</Platforms>
<NoWarn>$(NoWarn);NU5104</NoWarn>
</PropertyGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net462'">
Expand Down
Loading