Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 68 additions & 3 deletions src/Abstractions/Entities/EntityInstanceId.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,80 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Text.Json;
using System.Text.Json.Serialization;

namespace Microsoft.DurableTask.Entities;

/// <summary>
/// Represents the ID of an entity.
/// </summary>
/// <param name="Name">The name of the entity.</param>
/// <param name="Key">The key for this entity.</param>
public readonly record struct EntityInstanceId(string Name, string Key)
[JsonConverter(typeof(EntityInstanceId.JsonConverter))]
public readonly record struct EntityInstanceId
{
/// <summary>
/// Initializes a new instance of the <see cref="EntityInstanceId"/> class.
/// </summary>
/// <param name="name">The entity name.</param>
/// <param name="key">The entity key.</param>
public EntityInstanceId(string name, string key)
{
Check.NotNullOrEmpty(name);
if (name.Contains('@'))
{
throw new ArgumentException("entity names may not contain `@` characters.", nameof(name));
}

Check.NotNull(key);
this.Name = name.ToLowerInvariant();
this.Key = key;
}

/// <summary>
/// Gets the entity name. Entity names are normalized to lower case.
/// </summary>
public string Name { get; }

/// <summary>
/// Gets the entity key.
/// </summary>
public string Key { get; }

/// <summary>
/// Constructs a <see cref="EntityInstanceId"/> from a string containing the instance ID.
/// </summary>
/// <param name="instanceId">The string representation of the entity ID.</param>
/// <returns>the constructed entity instance ID.</returns>
public static EntityInstanceId FromString(string instanceId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put static methods before instance.

{
Check.NotNullOrEmpty(instanceId);
var pos = instanceId.IndexOf('@', 1);
if (pos <= 0 || instanceId[0] != '@')
{
throw new ArgumentException($"Instance ID '{instanceId}' is not a valid entity ID.", nameof(instanceId));
}

var entityName = instanceId.Substring(1, pos - 1);
var entityKey = instanceId.Substring(pos + 1);
return new EntityInstanceId(entityName, entityKey);
}

/// <inheritdoc/>
public override string ToString() => $"@{this.Name}@{this.Key}";

/// <summary>
/// We override the default json conversion so we can use a more compact string representation for entity instance ids.
/// </summary>
class JsonConverter : JsonConverter<EntityInstanceId>
{
public override EntityInstanceId Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
return EntityInstanceId.FromString(reader.GetString()!);
}

public override void Write(Utf8JsonWriter writer, EntityInstanceId value, JsonSerializerOptions options)
{
writer.WriteStringValue(value.ToString()!);
}
}
}
7 changes: 7 additions & 0 deletions src/Client/Core/DurableTaskClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ public DataConverter DataConverter
}
}

/// <summary>
/// Gets or sets a value indicating whether this client should support entities. If true, all instance ids starting with '@' are reserved for entities,
/// and validation checks are performed where appropriate.
/// </summary>
public bool EnableEntitySupport { get; set; }

/// <summary>
/// Gets a value indicating whether <see cref="DataConverter" /> was explicitly set or not.
/// </summary>
Expand All @@ -67,6 +73,7 @@ internal void ApplyTo(DurableTaskClientOptions other)
{
// Make sure to keep this up to date as values are added.
other.DataConverter = this.DataConverter;
other.EnableEntitySupport = this.EnableEntitySupport;
}
}
}
21 changes: 18 additions & 3 deletions src/Client/Core/Entities/EntityQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,24 @@ public string? InstanceIdStartsWith
get => this.instanceIdStartsWith;
init
{
// prefix '@' if filter value provided and not already prefixed with '@'.
this.instanceIdStartsWith = value?.Length > 0 && value[0] != '@'
? $"@{value}" : value;
if (value != null)
{
// prefix '@' if filter value provided and not already prefixed with '@'.
string prefix = value.Length == 0 || value[0] != '@' ? $"@{value}" : value;

// check if there is a name-key separator in the string
int pos = prefix.IndexOf('@', 1);
if (pos != -1)
{
// selectively normalize only the part up until that separator
this.instanceIdStartsWith = prefix.Substring(0, pos).ToLowerInvariant() + prefix.Substring(pos);
}
else
{
// normalize the entire prefix
this.instanceIdStartsWith = prefix.ToLowerInvariant();
}
}
}
}

Expand Down
29 changes: 26 additions & 3 deletions src/Client/Grpc/GrpcDurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public sealed class GrpcDurableTaskClient : DurableTaskClient
readonly ILogger logger;
readonly TaskHubSidecarServiceClient sidecarClient;
readonly GrpcDurableTaskClientOptions options;
readonly DurableEntityClient entityClient;
readonly DurableEntityClient? entityClient;
AsyncDisposable asyncDisposable;

/// <summary>
Expand Down Expand Up @@ -49,11 +49,16 @@ public GrpcDurableTaskClient(string name, GrpcDurableTaskClientOptions options,
this.options = Check.NotNull(options);
this.asyncDisposable = GetCallInvoker(options, out CallInvoker callInvoker);
this.sidecarClient = new TaskHubSidecarServiceClient(callInvoker);
this.entityClient = new GrpcDurableEntityClient(this.Name, this.DataConverter, this.sidecarClient, logger);

if (this.options.EnableEntitySupport)
{
this.entityClient = new GrpcDurableEntityClient(this.Name, this.DataConverter, this.sidecarClient, logger);
}
}

/// <inheritdoc/>
public override DurableEntityClient Entities => this.entityClient;
public override DurableEntityClient Entities => this.entityClient
?? throw new NotSupportedException($"Durable entities are disabled because {nameof(DurableTaskClientOptions)}.{nameof(DurableTaskClientOptions.EnableEntitySupport)}=false");

DataConverter DataConverter => this.options.DataConverter;

Expand All @@ -70,6 +75,8 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
StartOrchestrationOptions? options = null,
CancellationToken cancellation = default)
{
Check.NotEntity(this.options.EnableEntitySupport, options?.InstanceId);

var request = new P.CreateInstanceRequest
{
Name = orchestratorName.Name,
Expand Down Expand Up @@ -103,6 +110,8 @@ public override async Task RaiseEventAsync(
Check.NotNullOrEmpty(instanceId);
Check.NotNullOrEmpty(eventName);

Check.NotEntity(this.options.EnableEntitySupport, instanceId);

P.RaiseEventRequest request = new()
{
InstanceId = instanceId,
Expand All @@ -118,6 +127,8 @@ public override async Task TerminateInstanceAsync(
string instanceId, object? output = null, CancellationToken cancellation = default)
{
Check.NotNullOrEmpty(instanceId);
Check.NotEntity(this.options.EnableEntitySupport, instanceId);

this.logger.TerminatingInstance(instanceId);

string? serializedOutput = this.DataConverter.Serialize(output);
Expand All @@ -134,6 +145,8 @@ await this.sidecarClient.TerminateInstanceAsync(
public override async Task SuspendInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default)
{
Check.NotEntity(this.options.EnableEntitySupport, instanceId);

P.SuspendRequest request = new()
{
InstanceId = instanceId,
Expand All @@ -155,6 +168,8 @@ public override async Task SuspendInstanceAsync(
public override async Task ResumeInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default)
{
Check.NotEntity(this.options.EnableEntitySupport, instanceId);

P.ResumeRequest request = new()
{
InstanceId = instanceId,
Expand All @@ -176,6 +191,8 @@ public override async Task ResumeInstanceAsync(
public override async Task<OrchestrationMetadata?> GetInstancesAsync(
string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
{
Check.NotEntity(this.options.EnableEntitySupport, instanceId);

if (string.IsNullOrEmpty(instanceId))
{
throw new ArgumentNullException(nameof(instanceId));
Expand All @@ -201,6 +218,8 @@ public override async Task ResumeInstanceAsync(
/// <inheritdoc/>
public override AsyncPageable<OrchestrationMetadata> GetAllInstancesAsync(OrchestrationQuery? filter = null)
{
Check.NotEntity(this.options.EnableEntitySupport, filter?.InstanceIdPrefix);

return Pageable.Create(async (continuation, pageSize, cancellation) =>
{
P.QueryInstancesRequest request = new()
Expand Down Expand Up @@ -250,6 +269,8 @@ public override AsyncPageable<OrchestrationMetadata> GetAllInstancesAsync(Orches
public override async Task<OrchestrationMetadata> WaitForInstanceStartAsync(
string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
{
Check.NotEntity(this.options.EnableEntitySupport, instanceId);

this.logger.WaitingForInstanceStart(instanceId, getInputsAndOutputs);

P.GetInstanceRequest request = new()
Expand All @@ -275,6 +296,8 @@ public override async Task<OrchestrationMetadata> WaitForInstanceStartAsync(
public override async Task<OrchestrationMetadata> WaitForInstanceCompletionAsync(
string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
{
Check.NotEntity(this.options.EnableEntitySupport, instanceId);

this.logger.WaitingForInstanceCompletion(instanceId, getInputsAndOutputs);

P.GetInstanceRequest request = new()
Expand Down
14 changes: 14 additions & 0 deletions src/Shared/Core/Validation/Check.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,20 @@ public static string NotNullOrEmpty(
return argument;
}

/// <summary>
/// Checks that, if entity support is enabled, the given string is not an entity instance id, and throws an <see cref="ArgumentException"/> otherwise.
/// </summary>
/// <param name="entitySupportEnabled">Whether entity support is enabled.</param>
/// <param name="instanceId">The instance id.</param>
/// <param name="argument">The name of the argument.</param>
public static void NotEntity(bool entitySupportEnabled, string? instanceId, [CallerArgumentExpression("instanceId")] string? argument = default)
{
if (entitySupportEnabled && instanceId?.Length > 0 && instanceId[0] == '@')
{
throw new ArgumentException("Instance IDs starting with '@' are reserved for entities, and must not be used for orchestrations, when entity support is enabled.", argument);
}
}

/// <summary>
/// Checks if the supplied type is a concrete non-abstract type and implements the provided generic type.
/// Throws <see cref="ArgumentException" /> if the conditions are not met.
Expand Down
7 changes: 7 additions & 0 deletions src/Worker/Core/DurableTaskWorkerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public DataConverter DataConverter
}
}

/// <summary>
/// Gets or sets a value indicating whether this client should support entities. If true, all instance ids starting with '@' are reserved for entities,
/// and validation checks are performed where appropriate.
/// </summary>
public bool EnableEntitySupport { get; set; }

/// <summary>
/// Gets or sets the maximum timer interval for the
/// <see cref="TaskOrchestrationContext.CreateTimer(TimeSpan, CancellationToken)"/> method.
Expand Down Expand Up @@ -99,6 +105,7 @@ internal void ApplyTo(DurableTaskWorkerOptions other)
// Make sure to keep this up to date as values are added.
other.DataConverter = this.DataConverter;
other.MaximumTimerInterval = this.MaximumTimerInterval;
other.EnableEntitySupport = this.EnableEntitySupport;
}
}
}
5 changes: 3 additions & 2 deletions src/Worker/Core/Shims/TaskEntityShim.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,7 @@ public void Reset()

public override void SignalEntity(EntityInstanceId id, string operationName, object? input = null, SignalEntityOptions? options = null)
{
Check.NotNullOrEmpty(id.Name);
Check.NotNull(id.Key);
Check.NotDefault(id);

this.operationActions.Add(new SendSignalOperationAction()
{
Expand All @@ -209,6 +208,8 @@ public override void SignalEntity(EntityInstanceId id, string operationName, obj

public override void StartOrchestration(TaskName name, object? input = null, StartOrchestrationOptions? options = null)
{
Check.NotEntity(true, options?.InstanceId);

this.operationActions.Add(new StartNewOrchestrationOperationAction()
{
Name = name.Name,
Expand Down
23 changes: 22 additions & 1 deletion src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,24 @@ public TaskOrchestrationContextWrapper(

/// <inheritdoc/>
public override TaskOrchestrationEntityFeature Entities
=> this.entityFeature ??= new TaskOrchestrationEntityContext(this);
{
get
{
if (this.entityFeature == null)
{
if (this.invocationContext.Options.EnableEntitySupport)
{
this.entityFeature = new TaskOrchestrationEntityContext(this);
}
else
{
throw new NotSupportedException($"Durable entities are disabled because {nameof(DurableTaskWorkerOptions)}.{nameof(DurableTaskWorkerOptions.EnableEntitySupport)}=false");
}
}

return this.entityFeature;
}
}

/// <summary>
/// Gets the DataConverter to use for inputs, outputs, and entity states.
Expand Down Expand Up @@ -134,6 +151,8 @@ public override async Task<TResult> CallSubOrchestratorAsync<TResult>(
=> options is SubOrchestrationOptions derived ? derived.InstanceId : null;
string instanceId = GetInstanceId(options) ?? this.NewGuid().ToString("N");

Check.NotEntity(this.invocationContext.Options.EnableEntitySupport, instanceId);

// if this orchestration uses entities, first validate that the suborchsestration call is allowed in the current context
if (this.entityFeature != null && !this.entityFeature.EntityContext.ValidateSuborchestrationTransition(out string? errorMsg))
{
Expand Down Expand Up @@ -233,6 +252,8 @@ public override Task<T> WaitForExternalEvent<T>(string eventName, CancellationTo
/// <inheritdoc/>
public override void SendEvent(string instanceId, string eventName, object eventData)
{
Check.NotEntity(this.invocationContext.Options.EnableEntitySupport, instanceId);

this.innerContext.SendEvent(new OrchestrationInstance { InstanceId = instanceId }, eventName, eventData);
}

Expand Down
12 changes: 3 additions & 9 deletions src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ public override async Task<IAsyncDisposable> LockEntitiesAsync(IEnumerable<Entit
/// <inheritdoc/>
public override async Task<TResult> CallEntityAsync<TResult>(EntityInstanceId id, string operationName, object? input = null, CallEntityOptions? options = null)
{
Check.NotNullOrEmpty(id.Name);
Check.NotNull(id.Key);

Check.NotDefault(id);
OperationResult operationResult = await this.CallEntityInternalAsync(id, operationName, input);

if (operationResult.IsError)
Expand All @@ -107,9 +105,7 @@ public override async Task<TResult> CallEntityAsync<TResult>(EntityInstanceId id
/// <inheritdoc/>
public override async Task CallEntityAsync(EntityInstanceId id, string operationName, object? input = null, CallEntityOptions? options = null)
{
Check.NotNullOrEmpty(id.Name);
Check.NotNull(id.Key);

Check.NotDefault(id);
OperationResult operationResult = await this.CallEntityInternalAsync(id, operationName, input);

if (operationResult.IsError)
Expand All @@ -121,9 +117,7 @@ public override async Task CallEntityAsync(EntityInstanceId id, string operation
/// <inheritdoc/>
public override Task SignalEntityAsync(EntityInstanceId id, string operationName, object? input = null, SignalEntityOptions? options = null)
{
Check.NotNullOrEmpty(id.Name);
Check.NotNull(id.Key);

Check.NotDefault(id);
this.SendOperationMessage(id.ToString(), operationName, input, oneWay: true, scheduledTime: options?.SignalTime);
return Task.CompletedTask;
}
Expand Down