diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index e2f7be331..9f5d15047 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -281,7 +281,7 @@ public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew #region IEntityOrchestrationService - EntityBackendProperties IEntityOrchestrationService.GetEntityBackendProperties() + EntityBackendProperties IEntityOrchestrationService.EntityBackendProperties => new EntityBackendProperties() { EntityMessageReorderWindow = TimeSpan.FromMinutes(this.settings.EntityMessageReorderWindowInMinutes), @@ -289,28 +289,24 @@ EntityBackendProperties IEntityOrchestrationService.GetEntityBackendProperties() MaxConcurrentTaskEntityWorkItems = this.settings.MaxConcurrentTaskEntityWorkItems, SupportsImplicitEntityDeletion = false, // not supported by this backend MaximumSignalDelayTime = TimeSpan.FromDays(6), + UseSeparateQueueForEntityWorkItems = this.settings.UseSeparateQueueForEntityWorkItems, }; - bool IEntityOrchestrationService.ProcessEntitiesSeparately() - { - if (this.settings.UseSeparateQueueForEntityWorkItems) - { - this.orchestrationSessionManager.ProcessEntitiesSeparately = true; - return true; - } - else - { - return false; - } - } + EntityBackendQueries IEntityOrchestrationService.EntityBackendQueries + => new EntityTrackingStoreQueries( + this.messageManager, + this.trackingStore, + this.EnsureTaskHubAsync, + ((IEntityOrchestrationService)this).EntityBackendProperties, + this.SendTaskOrchestrationMessageAsync); Task IEntityOrchestrationService.LockNextOrchestrationWorkItemAsync( TimeSpan receiveTimeout, CancellationToken cancellationToken) { - if (!orchestrationSessionManager.ProcessEntitiesSeparately) + if (!this.settings.UseSeparateQueueForEntityWorkItems) { - throw new InvalidOperationException("backend was not configured for separate entity processing"); + throw new InvalidOperationException("Internal configuration is inconsistent. Backend is using single queue for orchestration/entity dispatch, but frontend is pulling from individual queues."); } return this.LockNextTaskOrchestrationWorkItemAsync(false, cancellationToken); } @@ -319,9 +315,9 @@ Task IEntityOrchestrationService.LockNextEntityWorkIt TimeSpan receiveTimeout, CancellationToken cancellationToken) { - if (!orchestrationSessionManager.ProcessEntitiesSeparately) + if (!this.settings.UseSeparateQueueForEntityWorkItems) { - throw new InvalidOperationException("backend was not configured for separate entity processing"); + throw new InvalidOperationException("Internal configuration is inconsistent. Backend is using single queue for orchestration/entity dispatch, but frontend is pulling from individual queues."); } return this.LockNextTaskOrchestrationWorkItemAsync(entitiesOnly: true, cancellationToken); } @@ -680,6 +676,10 @@ public Task LockNextTaskOrchestrationWorkItemAsync( TimeSpan receiveTimeout, CancellationToken cancellationToken) { + if (this.settings.UseSeparateQueueForEntityWorkItems) + { + throw new InvalidOperationException("Internal configuration is inconsistent. Backend is using separate queues for orchestration/entity dispatch, but frontend is pulling from single queue."); + } return LockNextTaskOrchestrationWorkItemAsync(entitiesOnly: false, cancellationToken); } @@ -2103,6 +2103,7 @@ private static OrchestrationInstanceStatusQueryCondition ToAzureStorageCondition TaskHubNames = condition.TaskHubNames, InstanceIdPrefix = condition.InstanceIdPrefix, FetchInput = condition.FetchInputsAndOutputs, + ExcludeEntities = condition.ExcludeEntities, }; } diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs index 819480bde..609a8b35d 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs @@ -315,7 +315,7 @@ internal LogHelper Logger /// /// Whether to use separate work item queues for entities and orchestrators. /// This defaults to false, to avoid issues when using this provider from code that does not support separate dispatch. - /// Consumers that support separate dispatch should explicitly set this to true. + /// Consumers that require separate dispatch (such as the new out-of-proc v2 SDKs) must set this to true. /// public bool UseSeparateQueueForEntityWorkItems { get; set; } = false; } diff --git a/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs b/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs new file mode 100644 index 000000000..e6a146833 --- /dev/null +++ b/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs @@ -0,0 +1,258 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ----------------------------------------------------------------------------------using System; +#nullable enable +namespace DurableTask.AzureStorage +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Linq; + using System.Runtime.Serialization.Json; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + using DurableTask.AzureStorage.Tracking; + using DurableTask.Core; + using DurableTask.Core.Entities; + + class EntityTrackingStoreQueries : EntityBackendQueries + { + readonly MessageManager messageManager; + readonly ITrackingStore trackingStore; + readonly Func ensureTaskHub; + readonly EntityBackendProperties properties; + readonly Func sendEvent; + + static TimeSpan timeLimitForCleanEntityStorageLoop = TimeSpan.FromSeconds(5); + + public EntityTrackingStoreQueries( + MessageManager messageManager, + ITrackingStore trackingStore, + Func ensureTaskHub, + EntityBackendProperties properties, + Func sendEvent) + { + this.messageManager = messageManager; + this.trackingStore = trackingStore; + this.ensureTaskHub = ensureTaskHub; + this.properties = properties; + this.sendEvent = sendEvent; + } + + public async override Task GetEntityAsync( + EntityId id, + bool includeState = false, + bool includeDeleted = false, + CancellationToken cancellation = default(CancellationToken)) + { + await this.ensureTaskHub(); + OrchestrationState? state = (await this.trackingStore.GetStateAsync(id.ToString(), allExecutions: false, fetchInput: includeState)).FirstOrDefault(); + return await this.GetEntityMetadataAsync(state, includeDeleted, includeState); + } + + public async override Task QueryEntitiesAsync(EntityQuery filter, CancellationToken cancellation) + { + var condition = new OrchestrationInstanceStatusQueryCondition() + { + InstanceId = null, + InstanceIdPrefix = filter.InstanceIdStartsWith, + CreatedTimeFrom = filter.LastModifiedFrom ?? default(DateTime), + CreatedTimeTo = filter.LastModifiedTo ?? default(DateTime), + FetchInput = filter.IncludeState, + FetchOutput = false, + ExcludeEntities = false, + }; + + await this.ensureTaskHub(); + + List entityResult; + string? continuationToken = filter.ContinuationToken; + var stopwatch = new Stopwatch(); + stopwatch.Start(); + + do + { + DurableStatusQueryResult result = await this.trackingStore.GetStateAsync(condition, filter.PageSize ?? 100, continuationToken, cancellation); + entityResult = await ConvertResultsAsync(result.OrchestrationState); + continuationToken = result.ContinuationToken; + } + while ( // continue query right away if the page is completely empty, but never in excess of 100ms + continuationToken != null + && entityResult.Count == 0 + && stopwatch.ElapsedMilliseconds <= 100); + + return new EntityQueryResult() + { + Results = entityResult, + ContinuationToken = continuationToken, + }; + + async ValueTask> ConvertResultsAsync(IEnumerable states) + { + entityResult = new List(); + foreach (OrchestrationState entry in states) + { + EntityMetadata? entityMetadata = await this.GetEntityMetadataAsync(entry, filter.IncludeDeleted, filter.IncludeState); + if (entityMetadata.HasValue) + { + entityResult.Add(entityMetadata.Value); + } + } + return entityResult; + } + } + + public async override Task CleanEntityStorageAsync(CleanEntityStorageRequest request = default(CleanEntityStorageRequest), CancellationToken cancellation = default(CancellationToken)) + { + DateTime now = DateTime.UtcNow; + string? continuationToken = request.ContinuationToken; + int emptyEntitiesRemoved = 0; + int orphanedLocksReleased = 0; + var stopwatch = Stopwatch.StartNew(); + + var condition = new OrchestrationInstanceStatusQueryCondition() + { + InstanceIdPrefix = "@", + FetchInput = false, + FetchOutput = false, + ExcludeEntities = false, + }; + + await this.ensureTaskHub(); + + // list all entities (without fetching the input) and for each one that requires action, + // perform that action. Waits for all actions to finish after each page. + do + { + DurableStatusQueryResult page = await this.trackingStore.GetStateAsync(condition, 100, continuationToken, cancellation); + + var tasks = new List(); + foreach (OrchestrationState state in page.OrchestrationState) + { + EntityStatus? status = ClientEntityHelpers.GetEntityStatus(state.Status); + if (status != null) + { + if (request.ReleaseOrphanedLocks && status.LockedBy != null) + { + tasks.Add(CheckForOrphanedLockAndFixIt(state, status.LockedBy)); + } + + if (request.RemoveEmptyEntities) + { + bool isEmptyEntity = !status.EntityExists && status.LockedBy == null && status.QueueSize == 0; + bool safeToRemoveWithoutBreakingMessageSorterLogic = + (now - state.LastUpdatedTime > this.properties.EntityMessageReorderWindow); + if (isEmptyEntity && safeToRemoveWithoutBreakingMessageSorterLogic) + { + tasks.Add(DeleteIdleOrchestrationEntity(state)); + } + } + } + } + + async Task DeleteIdleOrchestrationEntity(OrchestrationState state) + { + PurgeHistoryResult result = await this.trackingStore.PurgeInstanceHistoryAsync(state.OrchestrationInstance.InstanceId); + Interlocked.Add(ref emptyEntitiesRemoved, result.InstancesDeleted); + } + + async Task CheckForOrphanedLockAndFixIt(OrchestrationState state, string lockOwner) + { + OrchestrationState? ownerState + = (await this.trackingStore.GetStateAsync(lockOwner, allExecutions: false, fetchInput: false)).FirstOrDefault(); + + bool OrchestrationIsRunning(OrchestrationStatus? status) + => status != null && (status == OrchestrationStatus.Running || status == OrchestrationStatus.Suspended); + + if (! OrchestrationIsRunning(ownerState?.OrchestrationStatus)) + { + // the owner is not a running orchestration. Send a lock release. + var targetInstance = new OrchestrationInstance() { InstanceId = lockOwner }; + EntityMessageEvent eventToSend = ClientEntityHelpers.EmitUnlockForOrphanedLock(targetInstance, lockOwner); + await this.sendEvent(eventToSend.AsTaskMessage()); + Interlocked.Increment(ref orphanedLocksReleased); + } + } + + await Task.WhenAll(tasks); + } + while (continuationToken != null & stopwatch.Elapsed <= timeLimitForCleanEntityStorageLoop); + + return new CleanEntityStorageResult() + { + EmptyEntitiesRemoved = emptyEntitiesRemoved, + OrphanedLocksReleased = orphanedLocksReleased, + ContinuationToken = continuationToken, + }; + } + + async ValueTask GetEntityMetadataAsync(OrchestrationState? state, bool includeDeleted, bool includeState) + { + if (state == null) + { + return null; + } + + if (!includeState) + { + if (!includeDeleted) + { + // it is possible that this entity was logically deleted even though its orchestration was not purged yet. + // we can check this efficiently (i.e. without deserializing anything) by looking at just the custom status + if (!EntityStatus.TestEntityExists(state.Status)) + { + return null; + } + } + + return new EntityMetadata() + { + EntityId = EntityId.FromString(state.OrchestrationInstance.InstanceId), + LastModifiedTime = state.CreatedTime, + SerializedState = null, // we were instructed to not include the state + }; + } + else + { + // first, retrieve the entity scheduler state (= input of the orchestration state), possibly from blob storage. + string serializedSchedulerState; + if (MessageManager.TryGetLargeMessageReference(state.Input, out Uri blobUrl)) + { + serializedSchedulerState = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobUrl); + } + else + { + serializedSchedulerState = state.Input; + } + + // next, extract the entity state from the scheduler state + string? serializedEntityState = ClientEntityHelpers.GetEntityState(serializedSchedulerState); + + // return the result to the user + if (!includeDeleted && serializedEntityState == null) + { + return null; + } + else + { + return new EntityMetadata() + { + EntityId = EntityId.FromString(state.OrchestrationInstance.InstanceId), + LastModifiedTime = state.CreatedTime, + SerializedState = serializedEntityState, + }; + } + } + } + } +} diff --git a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs index 4c0486573..2e2b7282d 100644 --- a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs +++ b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs @@ -58,14 +58,6 @@ public OrchestrationSessionManager( internal IEnumerable Queues => this.ownedControlQueues.Values; - /// - /// Recent versions of DurableTask.Core can be configured to use a separate pipeline for processing entity work items, - /// while older versions use a single pipeline for both orchestration and entity work items. To support both scenarios, - /// this property can be modified prior to starting the orchestration service. If set to true, the work items that are ready for - /// processing are stored in and , respectively. - /// - internal bool ProcessEntitiesSeparately { get; set; } - public void AddQueue(string partitionId, ControlQueue controlQueue, CancellationToken cancellationToken) { if (this.ownedControlQueues.TryAdd(partitionId, controlQueue)) @@ -529,7 +521,8 @@ async Task ScheduleOrchestrationStatePrefetch( batch.TrackingStoreContext = history.TrackingStoreContext; } - if (this.ProcessEntitiesSeparately && DurableTask.Core.Common.Entities.IsEntityInstance(batch.OrchestrationInstanceId)) + if (this.settings.UseSeparateQueueForEntityWorkItems + && DurableTask.Core.Common.Entities.IsEntityInstance(batch.OrchestrationInstanceId)) { this.entitiesReadyForProcessingQueue.Enqueue(node); } diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index c6b287263..abeaf7c86 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -430,7 +430,7 @@ public override async Task> GetStateAsync(string insta } /// - async Task FetchInstanceStatusInternalAsync(string instanceId, bool fetchInput) + internal async Task FetchInstanceStatusInternalAsync(string instanceId, bool fetchInput) { if (instanceId == null) { diff --git a/src/DurableTask.AzureStorage/Tracking/OrchestrationInstanceStatusQueryCondition.cs b/src/DurableTask.AzureStorage/Tracking/OrchestrationInstanceStatusQueryCondition.cs index ae2ff74b9..46415f98f 100644 --- a/src/DurableTask.AzureStorage/Tracking/OrchestrationInstanceStatusQueryCondition.cs +++ b/src/DurableTask.AzureStorage/Tracking/OrchestrationInstanceStatusQueryCondition.cs @@ -68,6 +68,11 @@ public class OrchestrationInstanceStatusQueryCondition /// public bool FetchOutput { get; set; } = true; + /// + /// Whether to exclude entities from the results. + /// + public bool ExcludeEntities { get; set; } = false; + /// /// Get the TableQuery object /// @@ -159,6 +164,13 @@ string GetConditions() TableOperators.And, TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.LessThan, greaterThanPrefix))); } + else if (this.ExcludeEntities) + { + conditions.Add(TableQuery.CombineFilters( + TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.LessThan, "@"), + TableOperators.Or, + TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.GreaterThanOrEqual, "A"))); + } if (this.InstanceId != null) { diff --git a/src/DurableTask.Core/Entities/EntityBackendProperties.cs b/src/DurableTask.Core/Entities/EntityBackendProperties.cs index 1fe9cd5f3..20d8ec9e0 100644 --- a/src/DurableTask.Core/Entities/EntityBackendProperties.cs +++ b/src/DurableTask.Core/Entities/EntityBackendProperties.cs @@ -14,6 +14,7 @@ namespace DurableTask.Core.Entities { using System; + using System.Threading; /// /// Entity processing characteristics that are controlled by the backend provider, i.e. the orchestration service. @@ -37,17 +38,28 @@ public class EntityBackendProperties public int MaxConcurrentTaskEntityWorkItems { get; set; } /// - /// Whether the backend supports implicit deletion, i.e. setting the entity scheduler state to null implicitly deletes the storage record. + /// Gets or sets whether the backend supports implicit deletion. Implicit deletion means that + /// the storage does not retain any data for entities that don't have any state. /// public bool SupportsImplicitEntityDeletion { get; set; } /// - /// Value of maximum durable timer delay. Used for delayed signals. + /// Gets or sets the maximum durable timer delay. Used for delayed signals. /// public TimeSpan MaximumSignalDelayTime { get; set; } /// - /// Computes a cap on the scheduled time of an entity signal, based on the maximum signal delay time + /// Gets or sets whether the backend uses separate work item queues for entities and orchestrators. If true, + /// the frontend must use and + /// + /// to fetch entities and orchestrations. Otherwise, it must use fetch both work items using + /// . + /// + public bool UseSeparateQueueForEntityWorkItems { get; set; } + + /// + /// A utility function to compute a cap on the scheduled time of an entity signal, based on the value of + /// . /// /// The current time. /// The scheduled time. diff --git a/src/DurableTask.Core/Entities/EntityBackendQueries.cs b/src/DurableTask.Core/Entities/EntityBackendQueries.cs new file mode 100644 index 000000000..361667033 --- /dev/null +++ b/src/DurableTask.Core/Entities/EntityBackendQueries.cs @@ -0,0 +1,186 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- +#nullable enable +namespace DurableTask.Core.Entities +{ + using System; + using System.Collections.Generic; + using System.Threading; + using System.Threading.Tasks; + + /// + /// Encapsulates support for entity queries, at the abstraction level of a storage backend. + /// + public abstract class EntityBackendQueries + { + /// + /// Tries to get the entity with ID of . + /// + /// The ID of the entity to get. + /// true to include entity state in the response, false to not. + /// whether to return metadata for a deleted entity (if such data was retained by the backend). + /// The cancellation token to cancel the operation. + /// a response containing metadata describing the entity. + public abstract Task GetEntityAsync( + EntityId id, bool includeState = false, bool includeDeleted = false, CancellationToken cancellation = default); + + /// + /// Queries entity instances based on the conditions specified in . + /// + /// The query filter. + /// The cancellation token to cancel the operation. + /// One page of query results. + public abstract Task QueryEntitiesAsync(EntityQuery query, CancellationToken cancellation); + + /// + /// Cleans entity storage. See for the different forms of cleaning available. + /// + /// The request which describes what to clean. + /// The cancellation token to cancel the operation. + /// A task that completes when the operation is finished. + public abstract Task CleanEntityStorageAsync( + CleanEntityStorageRequest request = default, CancellationToken cancellation = default); + + /// + /// Metadata about an entity, as returned by queries. + /// + public struct EntityMetadata + { + /// + /// Gets or sets the ID for this entity. + /// + public EntityId EntityId { get; set; } + + /// + /// Gets or sets the time the entity was last modified. + /// + public DateTime LastModifiedTime { get; set; } + + /// + /// Gets or sets the serialized state for this entity. Can be null if the query + /// specified to not include the state, or to include deleted entities. + /// + public string? SerializedState { get; set; } + } + + /// + /// A description of an entity query. + /// + /// + /// The default query returns all entities (does not specify any filters). + /// + public struct EntityQuery + { + /// + /// Gets or sets the optional starts-with expression for the entity instance ID. + /// + public string? InstanceIdStartsWith { get; set; } + + /// + /// Gets or sets a value indicating to include only entity instances which were last modified after the provided time. + /// + public DateTime? LastModifiedFrom { get; set; } + + /// + /// Gets or sets a value indicating to include only entity instances which were last modified before the provided time. + /// + public DateTime? LastModifiedTo { get; set; } + + /// + /// Gets or sets a value indicating whether to include state in the query results or not. + /// + public bool IncludeState { get; set; } + + /// + /// Gets or sets a value indicating whether or not to include deleted entities. + /// + /// + /// This setting is relevant only for providers which retain metadata for deleted entities ( is false). + /// + public bool IncludeDeleted { get; set; } + + /// + /// Gets or sets the desired size of each page to return. + /// + /// + /// If no size is specified, the backend may choose an appropriate page size based on its implementation. + /// Note that the size of the returned page may be smaller or larger than the requested page size, and cannot + /// be used to determine whether the end of the query has been reached. + /// + public int? PageSize { get; set; } + + /// + /// Gets or sets the continuation token to resume a previous query. + /// + public string? ContinuationToken { get; set; } + } + + /// + /// A page of results. + /// + public struct EntityQueryResult + { + /// + /// Gets or sets the query results. + /// + public IEnumerable Results { get; set; } + + /// + /// Gets or sets the continuation token to continue this query, if not null. + /// + public string? ContinuationToken { get; set; } + } + + /// + /// Request struct for . + /// + public struct CleanEntityStorageRequest + { + /// + /// Gets or sets a value indicating whether to remove empty entities. + /// + public bool RemoveEmptyEntities { get; set; } + + /// + /// Gets or sets a value indicating whether to release orphaned locks or not. + /// + public bool ReleaseOrphanedLocks { get; set; } + + /// + /// Gets or sets the continuation token to resume a previous . + /// + public string? ContinuationToken { get; set; } + } + + /// + /// Result struct for . + /// + public struct CleanEntityStorageResult + { + /// + /// Gets or sets the number of empty entities removed. + /// + public int EmptyEntitiesRemoved { get; set; } + + /// + /// Gets or sets the number of orphaned locks that were removed. + /// + public int OrphanedLocksReleased { get; set; } + + /// + /// Gets or sets the continuation token to continue the , if not null. + /// + public string? ContinuationToken { get; set; } + } + } +} diff --git a/src/DurableTask.Core/Entities/IEntityOrchestrationService.cs b/src/DurableTask.Core/Entities/IEntityOrchestrationService.cs index 5e8823d72..c8c9b80c8 100644 --- a/src/DurableTask.Core/Entities/IEntityOrchestrationService.cs +++ b/src/DurableTask.Core/Entities/IEntityOrchestrationService.cs @@ -23,18 +23,17 @@ namespace DurableTask.Core.Entities public interface IEntityOrchestrationService : IOrchestrationService { /// - /// The entity orchestration service. + /// Properties of the backend implementation and configuration, as related to the new entity support in DurableTask.Core. /// - /// An object containing properties of the entity backend. - EntityBackendProperties GetEntityBackendProperties(); + /// An object containing properties of the entity backend, or null if the backend does not natively support DurableTask.Core entities. + EntityBackendProperties? EntityBackendProperties { get; } /// - /// Checks whether the backend is configured for separate work-item processing of orchestrations and entities. - /// If this returns true, must use or to - /// pull orchestrations or entities separately. Otherwise, must use . - /// This must be called prior to starting the orchestration service. + /// Support for entity queries. /// - bool ProcessEntitiesSeparately(); + /// An object that can be used to issue entity queries to the orchestration service, or null if the backend does not natively + /// support entity queries. + EntityBackendQueries? EntityBackendQueries { get; } /// /// Specialized variant of that diff --git a/src/DurableTask.Core/Query/OrchestrationQuery.cs b/src/DurableTask.Core/Query/OrchestrationQuery.cs index 2932d11da..0fc4140f6 100644 --- a/src/DurableTask.Core/Query/OrchestrationQuery.cs +++ b/src/DurableTask.Core/Query/OrchestrationQuery.cs @@ -69,5 +69,11 @@ public OrchestrationQuery() { } /// Determines whether the query will include the input of the orchestration. /// public bool FetchInputsAndOutputs { get; set; } = true; + + /// + /// Whether to exclude entities from the query results. This defaults to false for compatibility with older SDKs, + /// but is set to true by the newer SDKs. + /// + public bool ExcludeEntities { get; set; } = false; } } \ No newline at end of file diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index 0e9f0990f..54dd59222 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -56,7 +56,7 @@ internal TaskEntityDispatcher( this.logHelper = logHelper ?? throw new ArgumentNullException(nameof(logHelper)); this.errorPropagationMode = errorPropagationMode; this.entityOrchestrationService = (orchestrationService as IEntityOrchestrationService)!; - this.entityBackendProperties = entityOrchestrationService.GetEntityBackendProperties(); + this.entityBackendProperties = entityOrchestrationService.EntityBackendProperties; this.dispatcher = new WorkItemDispatcher( "TaskEntityDispatcher", diff --git a/src/DurableTask.Core/TaskHubWorker.cs b/src/DurableTask.Core/TaskHubWorker.cs index e7a6e09b5..c5cf0cd56 100644 --- a/src/DurableTask.Core/TaskHubWorker.cs +++ b/src/DurableTask.Core/TaskHubWorker.cs @@ -37,12 +37,11 @@ public sealed class TaskHubWorker : IDisposable readonly INameVersionObjectManager orchestrationManager; readonly INameVersionObjectManager entityManager; - readonly IEntityOrchestrationService entityOrchestrationService; - readonly DispatchMiddlewarePipeline orchestrationDispatchPipeline = new DispatchMiddlewarePipeline(); readonly DispatchMiddlewarePipeline entityDispatchPipeline = new DispatchMiddlewarePipeline(); readonly DispatchMiddlewarePipeline activityDispatchPipeline = new DispatchMiddlewarePipeline(); + readonly bool dispatchEntitiesSeparately; readonly SemaphoreSlim slimLock = new SemaphoreSlim(1, 1); readonly LogHelper logHelper; @@ -52,11 +51,6 @@ public sealed class TaskHubWorker : IDisposable // ReSharper disable once InconsistentNaming (avoid breaking change) public IOrchestrationService orchestrationService { get; } - /// - /// Indicates whether the configured backend supports entities. - /// - public bool SupportsEntities => this.entityOrchestrationService != null; - volatile bool isStarted; TaskActivityDispatcher activityDispatcher; @@ -152,14 +146,7 @@ public TaskHubWorker( this.entityManager = entityObjectManager ?? throw new ArgumentException("entityObjectManager"); this.orchestrationService = orchestrationService ?? throw new ArgumentException("orchestrationService"); this.logHelper = new LogHelper(loggerFactory?.CreateLogger("DurableTask.Core")); - - // If the backend supports a separate work item queue for entities (indicated by it implementing IEntityOrchestrationService), - // we take note of that here, and let the backend know that we are going to pull the work items separately. - if (orchestrationService is IEntityOrchestrationService entityOrchestrationService - && entityOrchestrationService.ProcessEntitiesSeparately()) - { - this.entityOrchestrationService = entityOrchestrationService; - } + this.dispatchEntitiesSeparately = (orchestrationService as IEntityOrchestrationService).EntityBackendProperties?.UseSeparateQueueForEntityWorkItems ?? false; } /// @@ -238,8 +225,7 @@ public async Task StartAsync() this.orchestrationManager, this.orchestrationDispatchPipeline, this.logHelper, - this.ErrorPropagationMode, - this.entityOrchestrationService); + this.ErrorPropagationMode); this.activityDispatcher = new TaskActivityDispatcher( this.orchestrationService, this.activityManager, @@ -247,7 +233,7 @@ public async Task StartAsync() this.logHelper, this.ErrorPropagationMode); - if (this.SupportsEntities) + if (this.dispatchEntitiesSeparately) { this.entityDispatcher = new TaskEntityDispatcher( this.orchestrationService, @@ -261,7 +247,7 @@ public async Task StartAsync() await this.orchestrationDispatcher.StartAsync(); await this.activityDispatcher.StartAsync(); - if (this.SupportsEntities) + if (this.dispatchEntitiesSeparately) { await this.entityDispatcher.StartAsync(); } @@ -303,7 +289,7 @@ public async Task StopAsync(bool isForced) { this.orchestrationDispatcher.StopAsync(isForced), this.activityDispatcher.StopAsync(isForced), - this.SupportsEntities ? this.entityDispatcher.StopAsync(isForced) : Task.CompletedTask, + this.dispatchEntitiesSeparately ? this.entityDispatcher.StopAsync(isForced) : Task.CompletedTask, }; await Task.WhenAll(dispatcherShutdowns); @@ -360,9 +346,9 @@ public TaskHubWorker AddTaskOrchestrations(params ObjectCreator public TaskHubWorker AddTaskEntities(params Type[] taskEntityTypes) { - if (!this.SupportsEntities) + if (!this.dispatchEntitiesSeparately) { - throw new NotSupportedException("The configured backend does not support entities."); + throw new NotSupportedException("The configured backend does not support separate entity dispatch."); } foreach (Type type in taskEntityTypes) @@ -387,9 +373,9 @@ public TaskHubWorker AddTaskEntities(params Type[] taskEntityTypes) /// public TaskHubWorker AddTaskEntities(params ObjectCreator[] taskEntityCreators) { - if (!this.SupportsEntities) + if (!this.dispatchEntitiesSeparately) { - throw new NotSupportedException("The configured backend does not support entities."); + throw new NotSupportedException("The configured backend does not support separate entity dispatch."); } foreach (ObjectCreator creator in taskEntityCreators) diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index d507fd4af..115bba19c 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -51,16 +51,15 @@ internal TaskOrchestrationDispatcher( INameVersionObjectManager objectManager, DispatchMiddlewarePipeline dispatchPipeline, LogHelper logHelper, - ErrorPropagationMode errorPropagationMode, - IEntityOrchestrationService entityOrchestrationService) + ErrorPropagationMode errorPropagationMode) { this.objectManager = objectManager ?? throw new ArgumentNullException(nameof(objectManager)); this.orchestrationService = orchestrationService ?? throw new ArgumentNullException(nameof(orchestrationService)); this.dispatchPipeline = dispatchPipeline ?? throw new ArgumentNullException(nameof(dispatchPipeline)); this.logHelper = logHelper ?? throw new ArgumentNullException(nameof(logHelper)); this.errorPropagationMode = errorPropagationMode; - this.entityOrchestrationService = entityOrchestrationService; - this.entityBackendProperties = this.entityOrchestrationService?.GetEntityBackendProperties(); + this.entityOrchestrationService = orchestrationService as IEntityOrchestrationService; + this.entityBackendProperties = this.entityOrchestrationService?.EntityBackendProperties; this.dispatcher = new WorkItemDispatcher( "TaskOrchestrationDispatcher", @@ -118,11 +117,11 @@ public async Task StopAsync(bool forced) /// A new TaskOrchestrationWorkItem protected Task OnFetchWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken) { - if (this.entityOrchestrationService != null) + if (this.entityBackendProperties?.UseSeparateQueueForEntityWorkItems == true) { // only orchestrations should be served by this dispatcher, so we call // the method which returns work items for orchestrations only. - return this.entityOrchestrationService.LockNextOrchestrationWorkItemAsync(receiveTimeout, cancellationToken); + return this.entityOrchestrationService!.LockNextOrchestrationWorkItemAsync(receiveTimeout, cancellationToken); } else {