Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions src/DurableTask.Core/Entities/ClientEntityHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static class ClientEntityHelpers
/// <param name="input">The serialized input for the operation.</param>
/// <param name="scheduledTimeUtc">The time to schedule this signal, or null if not a scheduled signal</param>
/// <returns>The event to send.</returns>
public static EventToSend EmitOperationSignal(OrchestrationInstance targetInstance, Guid requestId, string operationName, string input, (DateTime Original, DateTime Capped)? scheduledTimeUtc)
public static EntityMessageEvent EmitOperationSignal(OrchestrationInstance targetInstance, Guid requestId, string operationName, string? input, (DateTime Original, DateTime Capped)? scheduledTimeUtc)
{
var request = new RequestMessage()
{
Expand All @@ -46,13 +46,11 @@ public static EventToSend EmitOperationSignal(OrchestrationInstance targetInstan
Input = input,
};

var jrequest = JToken.FromObject(request, Serializer.InternalSerializer);

var eventName = scheduledTimeUtc.HasValue
? EntityMessageEventNames.ScheduledRequestMessageEventName(scheduledTimeUtc.Value.Capped)
: EntityMessageEventNames.RequestMessageEventName;

return new EventToSend(eventName, jrequest, targetInstance);
return new EntityMessageEvent(eventName, request, targetInstance);
}

/// <summary>
Expand All @@ -61,16 +59,15 @@ public static EventToSend EmitOperationSignal(OrchestrationInstance targetInstan
/// <param name="targetInstance">The target instance.</param>
/// <param name="lockOwnerInstanceId">The instance id of the entity to be unlocked.</param>
/// <returns>The event to send.</returns>
public static EventToSend EmitUnlockForOrphanedLock(OrchestrationInstance targetInstance, string lockOwnerInstanceId)
public static EntityMessageEvent EmitUnlockForOrphanedLock(OrchestrationInstance targetInstance, string lockOwnerInstanceId)
{
var message = new ReleaseMessage()
{
ParentInstanceId = lockOwnerInstanceId,
Id = "fix-orphaned-lock", // we don't know the original id but it does not matter
};

var jmessage = JToken.FromObject(message, Serializer.InternalSerializer);
return new EventToSend(EntityMessageEventNames.ReleaseMessageEventName, jmessage, targetInstance);
return new EntityMessageEvent(EntityMessageEventNames.ReleaseMessageEventName, message, targetInstance);
}

/// <summary>
Expand Down
112 changes: 112 additions & 0 deletions src/DurableTask.Core/Entities/EntityMessageEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------
#nullable enable
using System;
using DurableTask.Core.Entities.EventFormat;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace DurableTask.Core.Entities
{
/// <summary>
/// Encapsulates events that represent a message sent to or from an entity.
/// </summary>
public readonly struct EntityMessageEvent
{
readonly string eventName;
readonly EntityMessage message;
readonly OrchestrationInstance target;

internal EntityMessageEvent(string eventName, EntityMessage message, OrchestrationInstance target)
{
this.eventName = eventName;
this.message = message;
this.target = target;
}

/// <inheritdoc/>
public override string ToString()
{
return this.message.ToString();
}

/// <summary>
/// The name of the event.
/// </summary>
public string EventName => this.eventName;

/// <summary>
/// The target instance for the event.
/// </summary>
public OrchestrationInstance TargetInstance => this.target;

/// <summary>
/// Returns the content of this event, as an object that can be serialized later.
/// </summary>
/// <returns></returns>
public object ContentAsObject()
{
// we pre-serialize this now to avoid interference from the application-defined serialization settings
return JObject.FromObject(message, Serializer.InternalSerializer);
}

/// <summary>
/// Returns the content of this event, as a serialized string.
/// </summary>
/// <returns></returns>
public string ContentAsString()
{
return JsonConvert.SerializeObject(message, Serializer.InternalSerializerSettings);
}

/// <summary>
/// Returns this event in the form of a TaskMessage.
/// </summary>
/// <returns></returns>
public TaskMessage AsTaskMessage()
{
return new TaskMessage
{
OrchestrationInstance = this.target,
Event = new History.EventRaisedEvent(-1, this.ContentAsString())
{
Name = this.eventName
}
};
}

/// <summary>
/// Utility function to compute a capped scheduled time, given a scheduled time, a timestamp representing the current time, and the maximum delay.
/// </summary>
/// <param name="nowUtc">a timestamp representing the current time</param>
/// <param name="scheduledUtcTime">the scheduled time, or null if none.</param>
/// <param name="maxDelay">The maximum delay supported by the backend.</param>
/// <returns>the capped scheduled time, or null if none.</returns>
public static (DateTime original, DateTime capped)? GetCappedScheduledTime(DateTime nowUtc, TimeSpan maxDelay, DateTime? scheduledUtcTime)
{
if (!scheduledUtcTime.HasValue)
{
return null;
}

if ((scheduledUtcTime - nowUtc) <= maxDelay)
{
return (scheduledUtcTime.Value, scheduledUtcTime.Value);
}
else
{
return (scheduledUtcTime.Value, nowUtc + maxDelay);
}
}
}
}
28 changes: 28 additions & 0 deletions src/DurableTask.Core/Entities/EventFormat/EntityMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------
#nullable enable
namespace DurableTask.Core.Entities.EventFormat
{
using System.Runtime.Serialization;

/// <summary>
/// The format of entity messages is kept json-deserialization-compatible with the original format.
/// </summary>
[DataContract]
internal abstract class EntityMessage
{
public abstract string GetShortDescription();

public override string ToString() => this.GetShortDescription();
}
}
4 changes: 2 additions & 2 deletions src/DurableTask.Core/Entities/EventFormat/ReleaseMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ namespace DurableTask.Core.Entities.EventFormat
using System.Runtime.Serialization;

[DataContract]
internal class ReleaseMessage
internal class ReleaseMessage : EntityMessage
{
[DataMember(Name = "parent")]
public string? ParentInstanceId { get; set; }

[DataMember(Name = "id")]
public string? Id { get; set; }

public override string ToString()
public override string GetShortDescription()
{
return $"[Release lock {Id} by {ParentInstanceId}]";
}
Expand Down
4 changes: 2 additions & 2 deletions src/DurableTask.Core/Entities/EventFormat/RequestMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace DurableTask.Core.Entities.EventFormat
/// A message sent to an entity, such as operation, signal, lock, or continue messages.
/// </summary>
[DataContract]
internal class RequestMessage
internal class RequestMessage : EntityMessage
{
/// <summary>
/// The name of the operation being called (if this is an operation message) or <c>null</c>
Expand Down Expand Up @@ -99,7 +99,7 @@ internal class RequestMessage
public bool IsLockRequest => LockSet != null;

/// <inheritdoc/>
public override string ToString()
public override string GetShortDescription()
{
if (IsLockRequest)
{
Expand Down
14 changes: 10 additions & 4 deletions src/DurableTask.Core/Entities/EventFormat/ResponseMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ namespace DurableTask.Core.Entities.EventFormat
using System.Runtime.Serialization;

[DataContract]
internal class ResponseMessage
internal class ResponseMessage : EntityMessage
{
public const string LockAcquisitionCompletion = "Lock Acquisition Completed";

[DataMember(Name = "result")]
public string? Result { get; set; }

Expand All @@ -30,15 +32,19 @@ internal class ResponseMessage
[IgnoreDataMember]
public bool IsErrorResult => this.ErrorMessage != null;

public override string ToString()
public override string GetShortDescription()
{
if (this.IsErrorResult)
{
return $"[ErrorResponse {this.Result}]";
return $"[OperationFailed {this.ErrorMessage} {this.FailureDetails}]";
}
else if (this.Result == LockAcquisitionCompletion)
{
return "[LockAcquisitionComplete]";
}
else
{
return $"[Response {this.Result}]";
return $"[OperationSuccessful ({Result?.Length ?? 0} chars)]";
}
}
}
Expand Down
49 changes: 0 additions & 49 deletions src/DurableTask.Core/Entities/EventToSend.cs

This file was deleted.

27 changes: 10 additions & 17 deletions src/DurableTask.Core/Entities/OrchestrationEntityContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void RecoverLockAfterCall(string targetInstanceId)
/// <summary>
/// Get release messages for all locks in the critical section, and release them
/// </summary>
public IEnumerable<EventToSend> EmitLockReleaseMessages()
public IEnumerable<EntityMessageEvent> EmitLockReleaseMessages()
{
if (this.IsInsideCriticalSection)
{
Expand All @@ -195,8 +195,7 @@ public IEnumerable<EventToSend> EmitLockReleaseMessages()
foreach (var entityId in this.criticalSectionLocks!)
{
var instance = new OrchestrationInstance() { InstanceId = entityId.ToString() };
var jmessage = JObject.FromObject(message, Serializer.InternalSerializer);
yield return new EventToSend(EntityMessageEventNames.ReleaseMessageEventName, jmessage, instance);
yield return new EntityMessageEvent(EntityMessageEventNames.ReleaseMessageEventName, message, instance);
}

this.criticalSectionLocks = null;
Expand All @@ -215,13 +214,13 @@ public IEnumerable<EventToSend> EmitLockReleaseMessages()
/// <param name="scheduledTimeUtc">A time for which to schedule the delivery, or null if this is not a scheduled message</param>
/// <param name="input">The operation input</param>
/// <returns>The event to send.</returns>
public EventToSend EmitRequestMessage(
public EntityMessageEvent EmitRequestMessage(
OrchestrationInstance target,
string operationName,
bool oneWay,
Guid operationId,
(DateTime original, DateTime capped)? scheduledTimeUtc,
string input)
(DateTime Original, DateTime Capped)? scheduledTimeUtc,
string? input)
{
var request = new RequestMessage()
{
Expand All @@ -230,16 +229,13 @@ public EventToSend EmitRequestMessage(
Id = operationId,
IsSignal = oneWay,
Operation = operationName,
ScheduledTime = scheduledTimeUtc?.original,
ScheduledTime = scheduledTimeUtc?.Original,
Input = input,
};

this.AdjustOutgoingMessage(target.InstanceId, request, scheduledTimeUtc?.capped, out string eventName);
this.AdjustOutgoingMessage(target.InstanceId, request, scheduledTimeUtc?.Capped, out string eventName);

// we pre-serialize to JObject so we can avoid exposure to application-specific serialization settings
var jrequest = JObject.FromObject(request, Serializer.InternalSerializer);

return new EventToSend(eventName, jrequest, target);
return new EntityMessageEvent(eventName, request, target);
}

/// <summary>
Expand All @@ -248,7 +244,7 @@ public EventToSend EmitRequestMessage(
/// <param name="lockRequestId">A unique request id.</param>
/// <param name="entities">All the entities that are to be acquired.</param>
/// <returns>The event to send.</returns>
public EventToSend EmitAcquireMessage(Guid lockRequestId, EntityId[] entities)
public EntityMessageEvent EmitAcquireMessage(Guid lockRequestId, EntityId[] entities)
{
// All the entities in entity[] need to be locked, but to avoid deadlock, the locks have to be acquired
// sequentially, in order. So, we send the lock request to the first entity; when the first lock
Expand Down Expand Up @@ -285,10 +281,7 @@ public EventToSend EmitAcquireMessage(Guid lockRequestId, EntityId[] entities)

this.AdjustOutgoingMessage(target.InstanceId, request, null, out string eventName);

// we pre-serialize to JObject so we can avoid exposure to application-specific serialization settings
var jrequest = JObject.FromObject(request, Serializer.InternalSerializer);

return new EventToSend(eventName, jrequest, target);
return new EntityMessageEvent(eventName, request, target);
}

/// <summary>
Expand Down
3 changes: 2 additions & 1 deletion src/DurableTask.Core/TaskEntityDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,8 @@ void SendLockResponseMessage(WorkItemEffects effects, OrchestrationInstance targ
{
var message = new ResponseMessage()
{
Result = "Lock Acquisition Completed", // ignored by receiver but shows up in traces
// content is ignored by receiver but helps with tracing
Result = ResponseMessage.LockAcquisitionCompletion,
};
this.ProcessSendEventMessage(effects, target, EntityMessageEventNames.ResponseMessageEventName(requestId), message);
}
Expand Down