diff --git a/src/DurableTask.Core/Entities/ClientEntityHelpers.cs b/src/DurableTask.Core/Entities/ClientEntityHelpers.cs index c6c0783e7..94bc0512a 100644 --- a/src/DurableTask.Core/Entities/ClientEntityHelpers.cs +++ b/src/DurableTask.Core/Entities/ClientEntityHelpers.cs @@ -33,7 +33,7 @@ public static class ClientEntityHelpers /// The serialized input for the operation. /// The time to schedule this signal, or null if not a scheduled signal /// The event to send. - public static EventToSend EmitOperationSignal(OrchestrationInstance targetInstance, Guid requestId, string operationName, string input, (DateTime Original, DateTime Capped)? scheduledTimeUtc) + public static EntityMessageEvent EmitOperationSignal(OrchestrationInstance targetInstance, Guid requestId, string operationName, string? input, (DateTime Original, DateTime Capped)? scheduledTimeUtc) { var request = new RequestMessage() { @@ -46,13 +46,11 @@ public static EventToSend EmitOperationSignal(OrchestrationInstance targetInstan Input = input, }; - var jrequest = JToken.FromObject(request, Serializer.InternalSerializer); - var eventName = scheduledTimeUtc.HasValue ? EntityMessageEventNames.ScheduledRequestMessageEventName(scheduledTimeUtc.Value.Capped) : EntityMessageEventNames.RequestMessageEventName; - return new EventToSend(eventName, jrequest, targetInstance); + return new EntityMessageEvent(eventName, request, targetInstance); } /// @@ -61,7 +59,7 @@ public static EventToSend EmitOperationSignal(OrchestrationInstance targetInstan /// The target instance. /// The instance id of the entity to be unlocked. /// The event to send. - public static EventToSend EmitUnlockForOrphanedLock(OrchestrationInstance targetInstance, string lockOwnerInstanceId) + public static EntityMessageEvent EmitUnlockForOrphanedLock(OrchestrationInstance targetInstance, string lockOwnerInstanceId) { var message = new ReleaseMessage() { @@ -69,8 +67,7 @@ public static EventToSend EmitUnlockForOrphanedLock(OrchestrationInstance target Id = "fix-orphaned-lock", // we don't know the original id but it does not matter }; - var jmessage = JToken.FromObject(message, Serializer.InternalSerializer); - return new EventToSend(EntityMessageEventNames.ReleaseMessageEventName, jmessage, targetInstance); + return new EntityMessageEvent(EntityMessageEventNames.ReleaseMessageEventName, message, targetInstance); } /// diff --git a/src/DurableTask.Core/Entities/EntityMessageEvent.cs b/src/DurableTask.Core/Entities/EntityMessageEvent.cs new file mode 100644 index 000000000..924d5c7fd --- /dev/null +++ b/src/DurableTask.Core/Entities/EntityMessageEvent.cs @@ -0,0 +1,112 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- +#nullable enable +using System; +using DurableTask.Core.Entities.EventFormat; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace DurableTask.Core.Entities +{ + /// + /// Encapsulates events that represent a message sent to or from an entity. + /// + public readonly struct EntityMessageEvent + { + readonly string eventName; + readonly EntityMessage message; + readonly OrchestrationInstance target; + + internal EntityMessageEvent(string eventName, EntityMessage message, OrchestrationInstance target) + { + this.eventName = eventName; + this.message = message; + this.target = target; + } + + /// + public override string ToString() + { + return this.message.ToString(); + } + + /// + /// The name of the event. + /// + public string EventName => this.eventName; + + /// + /// The target instance for the event. + /// + public OrchestrationInstance TargetInstance => this.target; + + /// + /// Returns the content of this event, as an object that can be serialized later. + /// + /// + public object ContentAsObject() + { + // we pre-serialize this now to avoid interference from the application-defined serialization settings + return JObject.FromObject(message, Serializer.InternalSerializer); + } + + /// + /// Returns the content of this event, as a serialized string. + /// + /// + public string ContentAsString() + { + return JsonConvert.SerializeObject(message, Serializer.InternalSerializerSettings); + } + + /// + /// Returns this event in the form of a TaskMessage. + /// + /// + public TaskMessage AsTaskMessage() + { + return new TaskMessage + { + OrchestrationInstance = this.target, + Event = new History.EventRaisedEvent(-1, this.ContentAsString()) + { + Name = this.eventName + } + }; + } + + /// + /// Utility function to compute a capped scheduled time, given a scheduled time, a timestamp representing the current time, and the maximum delay. + /// + /// a timestamp representing the current time + /// the scheduled time, or null if none. + /// The maximum delay supported by the backend. + /// the capped scheduled time, or null if none. + public static (DateTime original, DateTime capped)? GetCappedScheduledTime(DateTime nowUtc, TimeSpan maxDelay, DateTime? scheduledUtcTime) + { + if (!scheduledUtcTime.HasValue) + { + return null; + } + + if ((scheduledUtcTime - nowUtc) <= maxDelay) + { + return (scheduledUtcTime.Value, scheduledUtcTime.Value); + } + else + { + return (scheduledUtcTime.Value, nowUtc + maxDelay); + } + } + } +} \ No newline at end of file diff --git a/src/DurableTask.Core/Entities/EventFormat/EntityMessage.cs b/src/DurableTask.Core/Entities/EventFormat/EntityMessage.cs new file mode 100644 index 000000000..a07f333bc --- /dev/null +++ b/src/DurableTask.Core/Entities/EventFormat/EntityMessage.cs @@ -0,0 +1,28 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- +#nullable enable +namespace DurableTask.Core.Entities.EventFormat +{ + using System.Runtime.Serialization; + + /// + /// The format of entity messages is kept json-deserialization-compatible with the original format. + /// + [DataContract] + internal abstract class EntityMessage + { + public abstract string GetShortDescription(); + + public override string ToString() => this.GetShortDescription(); + } +} diff --git a/src/DurableTask.Core/Entities/EventFormat/ReleaseMessage.cs b/src/DurableTask.Core/Entities/EventFormat/ReleaseMessage.cs index 433a56e33..455c9b4fc 100644 --- a/src/DurableTask.Core/Entities/EventFormat/ReleaseMessage.cs +++ b/src/DurableTask.Core/Entities/EventFormat/ReleaseMessage.cs @@ -16,7 +16,7 @@ namespace DurableTask.Core.Entities.EventFormat using System.Runtime.Serialization; [DataContract] - internal class ReleaseMessage + internal class ReleaseMessage : EntityMessage { [DataMember(Name = "parent")] public string? ParentInstanceId { get; set; } @@ -24,7 +24,7 @@ internal class ReleaseMessage [DataMember(Name = "id")] public string? Id { get; set; } - public override string ToString() + public override string GetShortDescription() { return $"[Release lock {Id} by {ParentInstanceId}]"; } diff --git a/src/DurableTask.Core/Entities/EventFormat/RequestMessage.cs b/src/DurableTask.Core/Entities/EventFormat/RequestMessage.cs index 2fbaf6099..e46d1759f 100644 --- a/src/DurableTask.Core/Entities/EventFormat/RequestMessage.cs +++ b/src/DurableTask.Core/Entities/EventFormat/RequestMessage.cs @@ -20,7 +20,7 @@ namespace DurableTask.Core.Entities.EventFormat /// A message sent to an entity, such as operation, signal, lock, or continue messages. /// [DataContract] - internal class RequestMessage + internal class RequestMessage : EntityMessage { /// /// The name of the operation being called (if this is an operation message) or null @@ -99,7 +99,7 @@ internal class RequestMessage public bool IsLockRequest => LockSet != null; /// - public override string ToString() + public override string GetShortDescription() { if (IsLockRequest) { diff --git a/src/DurableTask.Core/Entities/EventFormat/ResponseMessage.cs b/src/DurableTask.Core/Entities/EventFormat/ResponseMessage.cs index eb3a17d47..03de350de 100644 --- a/src/DurableTask.Core/Entities/EventFormat/ResponseMessage.cs +++ b/src/DurableTask.Core/Entities/EventFormat/ResponseMessage.cs @@ -16,8 +16,10 @@ namespace DurableTask.Core.Entities.EventFormat using System.Runtime.Serialization; [DataContract] - internal class ResponseMessage + internal class ResponseMessage : EntityMessage { + public const string LockAcquisitionCompletion = "Lock Acquisition Completed"; + [DataMember(Name = "result")] public string? Result { get; set; } @@ -30,15 +32,19 @@ internal class ResponseMessage [IgnoreDataMember] public bool IsErrorResult => this.ErrorMessage != null; - public override string ToString() + public override string GetShortDescription() { if (this.IsErrorResult) { - return $"[ErrorResponse {this.Result}]"; + return $"[OperationFailed {this.ErrorMessage} {this.FailureDetails}]"; + } + else if (this.Result == LockAcquisitionCompletion) + { + return "[LockAcquisitionComplete]"; } else { - return $"[Response {this.Result}]"; + return $"[OperationSuccessful ({Result?.Length ?? 0} chars)]"; } } } diff --git a/src/DurableTask.Core/Entities/EventToSend.cs b/src/DurableTask.Core/Entities/EventToSend.cs deleted file mode 100644 index 2ce2b7b4d..000000000 --- a/src/DurableTask.Core/Entities/EventToSend.cs +++ /dev/null @@ -1,49 +0,0 @@ -// ---------------------------------------------------------------------------------- -// Copyright Microsoft Corporation -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// http://www.apache.org/licenses/LICENSE-2.0 -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// ---------------------------------------------------------------------------------- -#nullable enable -namespace DurableTask.Core.Entities -{ - /// - /// The data associated with sending an event to an orchestration. - /// - public readonly struct EventToSend - { - /// - /// The name of the event. - /// - public readonly string EventName { get; } - - /// - /// The content of the event. - /// - public readonly object EventContent { get; } - - /// - /// The target instance for the event. - /// - public readonly OrchestrationInstance TargetInstance { get; } - - /// - /// Construct an entity message event with the given members. - /// - /// The name of the event. - /// The content of the event. - /// The target of the event. - public EventToSend(string name, object content, OrchestrationInstance target) - { - EventName = name; - EventContent = content; - TargetInstance = target; - } - } -} \ No newline at end of file diff --git a/src/DurableTask.Core/Entities/OrchestrationEntityContext.cs b/src/DurableTask.Core/Entities/OrchestrationEntityContext.cs index 1791eb334..164dae1dc 100644 --- a/src/DurableTask.Core/Entities/OrchestrationEntityContext.cs +++ b/src/DurableTask.Core/Entities/OrchestrationEntityContext.cs @@ -182,7 +182,7 @@ public void RecoverLockAfterCall(string targetInstanceId) /// /// Get release messages for all locks in the critical section, and release them /// - public IEnumerable EmitLockReleaseMessages() + public IEnumerable EmitLockReleaseMessages() { if (this.IsInsideCriticalSection) { @@ -195,8 +195,7 @@ public IEnumerable EmitLockReleaseMessages() foreach (var entityId in this.criticalSectionLocks!) { var instance = new OrchestrationInstance() { InstanceId = entityId.ToString() }; - var jmessage = JObject.FromObject(message, Serializer.InternalSerializer); - yield return new EventToSend(EntityMessageEventNames.ReleaseMessageEventName, jmessage, instance); + yield return new EntityMessageEvent(EntityMessageEventNames.ReleaseMessageEventName, message, instance); } this.criticalSectionLocks = null; @@ -215,13 +214,13 @@ public IEnumerable EmitLockReleaseMessages() /// A time for which to schedule the delivery, or null if this is not a scheduled message /// The operation input /// The event to send. - public EventToSend EmitRequestMessage( + public EntityMessageEvent EmitRequestMessage( OrchestrationInstance target, string operationName, bool oneWay, Guid operationId, - (DateTime original, DateTime capped)? scheduledTimeUtc, - string input) + (DateTime Original, DateTime Capped)? scheduledTimeUtc, + string? input) { var request = new RequestMessage() { @@ -230,16 +229,13 @@ public EventToSend EmitRequestMessage( Id = operationId, IsSignal = oneWay, Operation = operationName, - ScheduledTime = scheduledTimeUtc?.original, + ScheduledTime = scheduledTimeUtc?.Original, Input = input, }; - this.AdjustOutgoingMessage(target.InstanceId, request, scheduledTimeUtc?.capped, out string eventName); + this.AdjustOutgoingMessage(target.InstanceId, request, scheduledTimeUtc?.Capped, out string eventName); - // we pre-serialize to JObject so we can avoid exposure to application-specific serialization settings - var jrequest = JObject.FromObject(request, Serializer.InternalSerializer); - - return new EventToSend(eventName, jrequest, target); + return new EntityMessageEvent(eventName, request, target); } /// @@ -248,7 +244,7 @@ public EventToSend EmitRequestMessage( /// A unique request id. /// All the entities that are to be acquired. /// The event to send. - public EventToSend EmitAcquireMessage(Guid lockRequestId, EntityId[] entities) + public EntityMessageEvent EmitAcquireMessage(Guid lockRequestId, EntityId[] entities) { // All the entities in entity[] need to be locked, but to avoid deadlock, the locks have to be acquired // sequentially, in order. So, we send the lock request to the first entity; when the first lock @@ -285,10 +281,7 @@ public EventToSend EmitAcquireMessage(Guid lockRequestId, EntityId[] entities) this.AdjustOutgoingMessage(target.InstanceId, request, null, out string eventName); - // we pre-serialize to JObject so we can avoid exposure to application-specific serialization settings - var jrequest = JObject.FromObject(request, Serializer.InternalSerializer); - - return new EventToSend(eventName, jrequest, target); + return new EntityMessageEvent(eventName, request, target); } /// diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index 8a51c8342..88df1ac54 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -717,7 +717,8 @@ void SendLockResponseMessage(WorkItemEffects effects, OrchestrationInstance targ { var message = new ResponseMessage() { - Result = "Lock Acquisition Completed", // ignored by receiver but shows up in traces + // content is ignored by receiver but helps with tracing + Result = ResponseMessage.LockAcquisitionCompletion, }; this.ProcessSendEventMessage(effects, target, EntityMessageEventNames.ResponseMessageEventName(requestId), message); }