From c3905c8ebc1248f1282215f11c2caf485f1b70d1 Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Wed, 6 Sep 2023 14:42:18 -0700 Subject: [PATCH 1/7] revise entity backend properties and implement entity backend queries. --- .../AzureStorageOrchestrationService.cs | 30 +-- ...zureStorageOrchestrationServiceSettings.cs | 7 + .../EntityTrackingStoreQueries.cs | 253 ++++++++++++++++++ .../OrchestrationSessionManager.cs | 11 +- .../Tracking/AzureTableTrackingStore.cs | 2 +- .../Entities/EntityBackendProperties.cs | 10 + .../Entities/EntityBackendQueries.cs | 173 ++++++++++++ .../Entities/IEntityOrchestrationService.cs | 12 +- src/DurableTask.Core/TaskEntityDispatcher.cs | 2 +- src/DurableTask.Core/TaskHubWorker.cs | 14 +- .../TaskOrchestrationDispatcher.cs | 11 +- 11 files changed, 474 insertions(+), 51 deletions(-) create mode 100644 src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs create mode 100644 src/DurableTask.Core/Entities/EntityBackendQueries.cs diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index e2f7be331..75ee60438 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -281,7 +281,7 @@ public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew #region IEntityOrchestrationService - EntityBackendProperties IEntityOrchestrationService.GetEntityBackendProperties() + EntityBackendProperties IEntityOrchestrationService.EntityBackendProperties => new EntityBackendProperties() { EntityMessageReorderWindow = TimeSpan.FromMinutes(this.settings.EntityMessageReorderWindowInMinutes), @@ -289,28 +289,26 @@ EntityBackendProperties IEntityOrchestrationService.GetEntityBackendProperties() MaxConcurrentTaskEntityWorkItems = this.settings.MaxConcurrentTaskEntityWorkItems, SupportsImplicitEntityDeletion = false, // not supported by this backend MaximumSignalDelayTime = TimeSpan.FromDays(6), + UseSeparateQueriesForEntities = this.settings.UseSeparateQueriesForEntities, // TODO remove entities from orchestration queries if this is true + UseSeparateQueueForEntityWorkItems = this.settings.UseSeparateQueueForEntityWorkItems, }; - bool IEntityOrchestrationService.ProcessEntitiesSeparately() - { - if (this.settings.UseSeparateQueueForEntityWorkItems) - { - this.orchestrationSessionManager.ProcessEntitiesSeparately = true; - return true; - } - else - { - return false; - } - } + EntityBackendQueries IEntityOrchestrationService.EntityBackendQueries + => new EntityTrackingStoreQueries( + this.messageManager, + (this.trackingStore as AzureTableTrackingStore) + ?? throw new NotSupportedException("entity queries not supported for custom tracking stores"), + this.EnsureTaskHubAsync, + ((IEntityOrchestrationService)this).EntityBackendProperties, + this.SendTaskOrchestrationMessageAsync); Task IEntityOrchestrationService.LockNextOrchestrationWorkItemAsync( TimeSpan receiveTimeout, CancellationToken cancellationToken) { - if (!orchestrationSessionManager.ProcessEntitiesSeparately) + if (this.settings.UseSeparateQueueForEntityWorkItems) { - throw new InvalidOperationException("backend was not configured for separate entity processing"); + throw new InvalidOperationException("backend was configured for separate orchestation/entity processing, must use specialized methods to get work items"); } return this.LockNextTaskOrchestrationWorkItemAsync(false, cancellationToken); } @@ -319,7 +317,7 @@ Task IEntityOrchestrationService.LockNextEntityWorkIt TimeSpan receiveTimeout, CancellationToken cancellationToken) { - if (!orchestrationSessionManager.ProcessEntitiesSeparately) + if (!this.settings.UseSeparateQueueForEntityWorkItems) { throw new InvalidOperationException("backend was not configured for separate entity processing"); } diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs index 819480bde..1740d3a32 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs @@ -318,5 +318,12 @@ internal LogHelper Logger /// Consumers that support separate dispatch should explicitly set this to true. /// public bool UseSeparateQueueForEntityWorkItems { get; set; } = false; + + /// + /// Whether to use separate queries for entities and orchestrators. + /// This defaults to false, to avoid issues when using this provider from code that does not support separate queries. + /// Consumers that expect separate queries should explicitly set this to true. + /// + public bool UseSeparateQueriesForEntities { get; set; } = false; } } diff --git a/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs b/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs new file mode 100644 index 000000000..7127a3e65 --- /dev/null +++ b/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs @@ -0,0 +1,253 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ----------------------------------------------------------------------------------using System; +#nullable enable +namespace DurableTask.AzureStorage +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Linq; + using System.Runtime.Serialization.Json; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + using DurableTask.AzureStorage.Tracking; + using DurableTask.Core; + using DurableTask.Core.Entities; + + class EntityTrackingStoreQueries : EntityBackendQueries + { + readonly MessageManager messageManager; + readonly AzureTableTrackingStore trackingStore; + readonly Func ensureTaskHub; + readonly EntityBackendProperties properties; + readonly Func sendEvent; + + public EntityTrackingStoreQueries( + MessageManager messageManager, + AzureTableTrackingStore trackingStore, + Func ensureTaskHub, + EntityBackendProperties properties, + Func sendEvent) + { + this.messageManager = messageManager; + this.trackingStore = trackingStore; + this.ensureTaskHub = ensureTaskHub; + this.properties = properties; + this.sendEvent = sendEvent; + } + + public override async Task GetEntityAsync( + EntityId id, + bool includeState = false, + bool includeDeleted = false, + CancellationToken cancellation = default(CancellationToken)) + { + await this.ensureTaskHub(); + OrchestrationState? state = (await this.trackingStore.FetchInstanceStatusInternalAsync(id.ToString(), includeState))?.State; + return await GetEntityMetadataAsync(state, includeDeleted, includeState, cancellation); + } + + public override async Task QueryEntitiesAsync(EntityQuery filter, CancellationToken cancellation) + { + var condition = new OrchestrationInstanceStatusQueryCondition() + { + InstanceId = null, + InstanceIdPrefix = filter.InstanceIdStartsWith, + CreatedTimeFrom = filter.LastModifiedFrom ?? default(DateTime), + CreatedTimeTo = filter.LastModifiedTo ?? default(DateTime), + FetchInput = filter.IncludeState, + FetchOutput = false, + }; + + await this.ensureTaskHub(); + + List entityResult; + string? continuationToken = filter.ContinuationToken; + Stopwatch stopwatch = new Stopwatch(); + stopwatch.Start(); + + do + { + var result = await this.trackingStore.GetStateAsync(condition, filter.PageSize ?? 100, continuationToken, cancellation); + entityResult = await ConvertResultsAsync(result.OrchestrationState); + continuationToken = result.ContinuationToken; + } + while ( // continue query right away if the page is completely empty, but never in excess of 100ms + continuationToken != null + && entityResult.Count == 0 + && stopwatch.ElapsedMilliseconds <= 100); + + return new EntityQueryResult() + { + Results = entityResult, + ContinuationToken = continuationToken, + }; + + async ValueTask> ConvertResultsAsync(IEnumerable states) + { + entityResult = new List(); + foreach (OrchestrationState entry in states) + { + EntityMetadata? entityMetadata = await this.GetEntityMetadataAsync(entry, filter.IncludeDeleted, filter.IncludeState, cancellation); + if (entityMetadata.HasValue) + { + entityResult.Add(entityMetadata.Value); + } + } + return entityResult; + } + } + + public async override Task CleanEntityStorageAsync(CleanEntityStorageRequest request = default(CleanEntityStorageRequest), CancellationToken cancellation = default(CancellationToken)) + { + DateTime now = DateTime.UtcNow; + string? continuationToken = null; + int emptyEntitiesRemoved = 0; + int orphanedLocksReleased = 0; + + var condition = new OrchestrationInstanceStatusQueryCondition() + { + InstanceIdPrefix = "@", + FetchInput = false, + FetchOutput = false, + }; + + await this.ensureTaskHub(); + + // list all entities (without fetching the input) and for each one that requires action, + // perform that action. Waits for all actions to finish after each page. + do + { + var page = await this.trackingStore.GetStateAsync(condition, 100, continuationToken, cancellation); + + List tasks = new List(); + foreach (var state in page.OrchestrationState) + { + EntityStatus? status = ClientEntityHelpers.GetEntityStatus(state.Status); + if (status != null) + { + if (request.ReleaseOrphanedLocks && status.LockedBy != null) + { + tasks.Add(CheckForOrphanedLockAndFixIt(state, status.LockedBy)); + } + + if (request.RemoveEmptyEntities) + { + bool isEmptyEntity = !status.EntityExists && status.LockedBy == null && status.QueueSize == 0; + bool safeToRemoveWithoutBreakingMessageSorterLogic = + (now - state.LastUpdatedTime > properties.EntityMessageReorderWindow); + if (isEmptyEntity && safeToRemoveWithoutBreakingMessageSorterLogic) + { + tasks.Add(DeleteIdleOrchestrationEntity(state)); + } + } + } + } + + async Task DeleteIdleOrchestrationEntity(OrchestrationState state) + { + var result = await this.trackingStore.PurgeInstanceHistoryAsync(state.OrchestrationInstance.InstanceId); + Interlocked.Add(ref emptyEntitiesRemoved, result.InstancesDeleted); + } + + async Task CheckForOrphanedLockAndFixIt(OrchestrationState state, string lockOwner) + { + OrchestrationState? ownerState + = (await this.trackingStore.GetStateAsync(lockOwner, allExecutions: false, fetchInput: false)).FirstOrDefault(); + + bool OrchestrationIsRunning(OrchestrationStatus? status) + => status != null && (status == OrchestrationStatus.Running || status == OrchestrationStatus.Suspended); + + if (! OrchestrationIsRunning(ownerState?.OrchestrationStatus)) + { + // the owner is not a running orchestration. Send a lock release. + var targetInstance = new OrchestrationInstance() { InstanceId = lockOwner }; + EntityMessageEvent eventToSend = ClientEntityHelpers.EmitUnlockForOrphanedLock(targetInstance, lockOwner); + await this.sendEvent(eventToSend.AsTaskMessage()); + Interlocked.Increment(ref orphanedLocksReleased); + } + } + + await Task.WhenAll(tasks); + } + while (continuationToken != null); + + return new CleanEntityStorageResult() + { + EmptyEntitiesRemoved = emptyEntitiesRemoved, + OrphanedLocksReleased = orphanedLocksReleased, + }; + } + + async ValueTask GetEntityMetadataAsync(OrchestrationState? state, bool includeDeleted, bool includeState, CancellationToken cancellation) + { + if (state == null) + { + return null; + } + + if (!includeState) + { + if (!includeDeleted) + { + // it is possible that this entity was logically deleted even though its orchestration was not purged yet. + // we can check this efficiently (i.e. without deserializing anything) by looking at just the custom status + if (!EntityStatus.TestEntityExists(state.Status)) + { + return null; + } + } + + return new EntityMetadata() + { + EntityId = EntityId.FromString(state.OrchestrationInstance.InstanceId), + LastModifiedTime = state.CreatedTime, + SerializedState = null, // we were instructed to not include the state + }; + } + else + { + // first, retrieve the entity scheduler state (= input of the orchestration state), possibly from blob storage. + string serializedSchedulerState; + if (MessageManager.TryGetLargeMessageReference(state.Input, out Uri blobUrl)) + { + serializedSchedulerState = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobUrl); + } + else + { + serializedSchedulerState = state.Input; + } + + // next, extract the entity state from the scheduler state + string? serializedEntityState = ClientEntityHelpers.GetEntityState(serializedSchedulerState); + + // return the result to the user + if (!includeDeleted && serializedEntityState == null) + { + return null; + } + else + { + return new EntityMetadata() + { + EntityId = EntityId.FromString(state.OrchestrationInstance.InstanceId), + LastModifiedTime = state.CreatedTime, + SerializedState = serializedEntityState, + }; + } + } + } + + } +} diff --git a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs index 4c0486573..2e2b7282d 100644 --- a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs +++ b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs @@ -58,14 +58,6 @@ public OrchestrationSessionManager( internal IEnumerable Queues => this.ownedControlQueues.Values; - /// - /// Recent versions of DurableTask.Core can be configured to use a separate pipeline for processing entity work items, - /// while older versions use a single pipeline for both orchestration and entity work items. To support both scenarios, - /// this property can be modified prior to starting the orchestration service. If set to true, the work items that are ready for - /// processing are stored in and , respectively. - /// - internal bool ProcessEntitiesSeparately { get; set; } - public void AddQueue(string partitionId, ControlQueue controlQueue, CancellationToken cancellationToken) { if (this.ownedControlQueues.TryAdd(partitionId, controlQueue)) @@ -529,7 +521,8 @@ async Task ScheduleOrchestrationStatePrefetch( batch.TrackingStoreContext = history.TrackingStoreContext; } - if (this.ProcessEntitiesSeparately && DurableTask.Core.Common.Entities.IsEntityInstance(batch.OrchestrationInstanceId)) + if (this.settings.UseSeparateQueueForEntityWorkItems + && DurableTask.Core.Common.Entities.IsEntityInstance(batch.OrchestrationInstanceId)) { this.entitiesReadyForProcessingQueue.Enqueue(node); } diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index c6b287263..abeaf7c86 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -430,7 +430,7 @@ public override async Task> GetStateAsync(string insta } /// - async Task FetchInstanceStatusInternalAsync(string instanceId, bool fetchInput) + internal async Task FetchInstanceStatusInternalAsync(string instanceId, bool fetchInput) { if (instanceId == null) { diff --git a/src/DurableTask.Core/Entities/EntityBackendProperties.cs b/src/DurableTask.Core/Entities/EntityBackendProperties.cs index 1fe9cd5f3..e53c8e230 100644 --- a/src/DurableTask.Core/Entities/EntityBackendProperties.cs +++ b/src/DurableTask.Core/Entities/EntityBackendProperties.cs @@ -46,6 +46,16 @@ public class EntityBackendProperties /// public TimeSpan MaximumSignalDelayTime { get; set; } + /// + /// Whether the backend uses separate work item queues for entities and orchestrators. + /// + public bool UseSeparateQueueForEntityWorkItems { get; set; } + + /// + /// Whether the backend uses separate queries for entities and orchestrators. + /// + public bool UseSeparateQueriesForEntities { get; set; } + /// /// Computes a cap on the scheduled time of an entity signal, based on the maximum signal delay time /// diff --git a/src/DurableTask.Core/Entities/EntityBackendQueries.cs b/src/DurableTask.Core/Entities/EntityBackendQueries.cs new file mode 100644 index 000000000..73abf04ea --- /dev/null +++ b/src/DurableTask.Core/Entities/EntityBackendQueries.cs @@ -0,0 +1,173 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- +#nullable enable +namespace DurableTask.Core.Entities +{ + using System; + using System.Collections.Generic; + using System.Threading; + using System.Threading.Tasks; + + /// + /// Entity query support, as provided at the abstraction level of an + /// + public abstract class EntityBackendQueries + { + /// + /// Tries to get the entity with ID of . + /// + /// The ID of the entity to get. + /// true to include entity state in the response, false to not. + /// whether to return metadata for a deleted entity (if such data was retained by the backend). + /// The cancellation token to cancel the operation. + /// a response containing metadata describing the entity. + public abstract Task GetEntityAsync( + EntityId id, bool includeState = false, bool includeDeleted = false, CancellationToken cancellation = default); + + /// + /// Queries entity instances based on the conditions specified in . + /// + /// The query filter. + /// The cancellation token to cancel the operation. + /// One page of query results. + public abstract Task QueryEntitiesAsync(EntityQuery query, CancellationToken cancellation); + + /// + /// Cleans entity storage. See for the different forms of cleaning available. + /// + /// The request which describes what to clean. + /// The cancellation token to cancel the operation. + /// A task that completes when the operation is finished. + public abstract Task CleanEntityStorageAsync( + CleanEntityStorageRequest request = default, CancellationToken cancellation = default); + + /// + /// Metadata about an entity, as returned by queries. + /// + public struct EntityMetadata + { + /// + /// Gets or sets the ID for this entity. + /// + public EntityId EntityId { get; set; } + + /// + /// Gets or sets the time the entity was last modified. + /// + public DateTime LastModifiedTime { get; set; } + + /// + /// Gets or sets the serialized state for this entity. + /// + public string? SerializedState { get; set; } + } + + /// + /// A description of an entity query. + /// + /// + /// The default query returns all entities (does not specify any filters). + /// + public struct EntityQuery + { + /// + /// Gets or sets the optional starts-with expression for the entity instance ID. + /// + public string? InstanceIdStartsWith { get; set; } + + /// + /// Gets or sets entity instances which were last modified after the provided time. + /// + public DateTime? LastModifiedFrom { get; set; } + + /// + /// Gets or sets entity instances which were last modified before the provided time. + /// + public DateTime? LastModifiedTo { get; set; } + + /// + /// Gets or sets a value indicating whether to include state in the query results or not. + /// + public bool IncludeState { get; set; } + + /// + /// Whether or not to include deleted entities. Newer implementations should never set this to true, + /// it is provided only for legacy compatibility. + /// + public bool IncludeDeleted { get; set; } + + /// + /// Gets or sets the desired size of each page to return. + /// + /// + /// If no size is specified, the backend chooses an appropriate page size based on its implementation. + /// Note that the size of the returned page may be smaller or larger than the requested page size, and cannot + /// be used to determine whether the end of the query has been reached. + /// + public int? PageSize { get; set; } + + /// + /// Gets or sets the continuation token to resume a previous query. + /// + public string? ContinuationToken { get; set; } + } + + /// + /// A page of results. + /// + public struct EntityQueryResult + { + /// + /// Gets or sets the list of query results. + /// + public List Results { get; set; } + + /// + /// Gets or sets the continuation token to continue this query, if not null. + /// + public string? ContinuationToken { get; set; } + } + + /// + /// Request struct for . + /// + public struct CleanEntityStorageRequest + { + /// + /// Gets a value indicating whether to remove empty entities. + /// + public bool RemoveEmptyEntities { get; set; } + + /// + /// Gets a value indicating whether to release orphaned locks or not. + /// + public bool ReleaseOrphanedLocks { get; set; } + } + + /// + /// Result struct for . + /// + public struct CleanEntityStorageResult + { + /// + /// Gets the number of empty entities removed. + /// + public int EmptyEntitiesRemoved { get; set; } + + /// + /// Gets the number of orphaned locks that were removed. + /// + public int OrphanedLocksReleased { get; set; } + } + } +} diff --git a/src/DurableTask.Core/Entities/IEntityOrchestrationService.cs b/src/DurableTask.Core/Entities/IEntityOrchestrationService.cs index 5e8823d72..79d9f7b86 100644 --- a/src/DurableTask.Core/Entities/IEntityOrchestrationService.cs +++ b/src/DurableTask.Core/Entities/IEntityOrchestrationService.cs @@ -23,18 +23,16 @@ namespace DurableTask.Core.Entities public interface IEntityOrchestrationService : IOrchestrationService { /// - /// The entity orchestration service. + /// Properties of the backend. /// /// An object containing properties of the entity backend. - EntityBackendProperties GetEntityBackendProperties(); + EntityBackendProperties EntityBackendProperties { get; } /// - /// Checks whether the backend is configured for separate work-item processing of orchestrations and entities. - /// If this returns true, must use or to - /// pull orchestrations or entities separately. Otherwise, must use . - /// This must be called prior to starting the orchestration service. + /// Support for entity queries. /// - bool ProcessEntitiesSeparately(); + /// An object that can be used to issue entity queries to the orchestration service. + EntityBackendQueries EntityBackendQueries { get; } /// /// Specialized variant of that diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index 0e9f0990f..54dd59222 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -56,7 +56,7 @@ internal TaskEntityDispatcher( this.logHelper = logHelper ?? throw new ArgumentNullException(nameof(logHelper)); this.errorPropagationMode = errorPropagationMode; this.entityOrchestrationService = (orchestrationService as IEntityOrchestrationService)!; - this.entityBackendProperties = entityOrchestrationService.GetEntityBackendProperties(); + this.entityBackendProperties = entityOrchestrationService.EntityBackendProperties; this.dispatcher = new WorkItemDispatcher( "TaskEntityDispatcher", diff --git a/src/DurableTask.Core/TaskHubWorker.cs b/src/DurableTask.Core/TaskHubWorker.cs index e7a6e09b5..3fa5a112c 100644 --- a/src/DurableTask.Core/TaskHubWorker.cs +++ b/src/DurableTask.Core/TaskHubWorker.cs @@ -37,7 +37,7 @@ public sealed class TaskHubWorker : IDisposable readonly INameVersionObjectManager orchestrationManager; readonly INameVersionObjectManager entityManager; - readonly IEntityOrchestrationService entityOrchestrationService; + readonly IEntityOrchestrationService entityOrchestrationService; // non-null if backend uses separate dispatch for entities readonly DispatchMiddlewarePipeline orchestrationDispatchPipeline = new DispatchMiddlewarePipeline(); readonly DispatchMiddlewarePipeline entityDispatchPipeline = new DispatchMiddlewarePipeline(); @@ -152,14 +152,7 @@ public TaskHubWorker( this.entityManager = entityObjectManager ?? throw new ArgumentException("entityObjectManager"); this.orchestrationService = orchestrationService ?? throw new ArgumentException("orchestrationService"); this.logHelper = new LogHelper(loggerFactory?.CreateLogger("DurableTask.Core")); - - // If the backend supports a separate work item queue for entities (indicated by it implementing IEntityOrchestrationService), - // we take note of that here, and let the backend know that we are going to pull the work items separately. - if (orchestrationService is IEntityOrchestrationService entityOrchestrationService - && entityOrchestrationService.ProcessEntitiesSeparately()) - { - this.entityOrchestrationService = entityOrchestrationService; - } + this.entityOrchestrationService = entityOrchestrationService as IEntityOrchestrationService; } /// @@ -238,8 +231,7 @@ public async Task StartAsync() this.orchestrationManager, this.orchestrationDispatchPipeline, this.logHelper, - this.ErrorPropagationMode, - this.entityOrchestrationService); + this.ErrorPropagationMode); this.activityDispatcher = new TaskActivityDispatcher( this.orchestrationService, this.activityManager, diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index d507fd4af..735503382 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -51,16 +51,15 @@ internal TaskOrchestrationDispatcher( INameVersionObjectManager objectManager, DispatchMiddlewarePipeline dispatchPipeline, LogHelper logHelper, - ErrorPropagationMode errorPropagationMode, - IEntityOrchestrationService entityOrchestrationService) + ErrorPropagationMode errorPropagationMode) { this.objectManager = objectManager ?? throw new ArgumentNullException(nameof(objectManager)); this.orchestrationService = orchestrationService ?? throw new ArgumentNullException(nameof(orchestrationService)); this.dispatchPipeline = dispatchPipeline ?? throw new ArgumentNullException(nameof(dispatchPipeline)); this.logHelper = logHelper ?? throw new ArgumentNullException(nameof(logHelper)); this.errorPropagationMode = errorPropagationMode; - this.entityOrchestrationService = entityOrchestrationService; - this.entityBackendProperties = this.entityOrchestrationService?.GetEntityBackendProperties(); + this.entityOrchestrationService = entityOrchestrationService as IEntityOrchestrationService; + this.entityBackendProperties = this.entityOrchestrationService?.EntityBackendProperties; this.dispatcher = new WorkItemDispatcher( "TaskOrchestrationDispatcher", @@ -118,11 +117,11 @@ public async Task StopAsync(bool forced) /// A new TaskOrchestrationWorkItem protected Task OnFetchWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken) { - if (this.entityOrchestrationService != null) + if (this.entityBackendProperties?.UseSeparateQueueForEntityWorkItems == true) { // only orchestrations should be served by this dispatcher, so we call // the method which returns work items for orchestrations only. - return this.entityOrchestrationService.LockNextOrchestrationWorkItemAsync(receiveTimeout, cancellationToken); + return this.entityOrchestrationService!.LockNextOrchestrationWorkItemAsync(receiveTimeout, cancellationToken); } else { From add83fa07c6e26d6b5ec4af25521e404b853da7f Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Fri, 8 Sep 2023 13:38:47 -0700 Subject: [PATCH 2/7] Minor revisions to querries and properties, and improved comments. --- .../AzureStorageOrchestrationService.cs | 2 +- .../EntityTrackingStoreQueries.cs | 33 +++++++++-------- .../Entities/EntityBackendProperties.cs | 13 +++---- .../Entities/EntityBackendQueries.cs | 36 ++++++++++++------- 4 files changed, 50 insertions(+), 34 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 75ee60438..dc22736fc 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -289,7 +289,7 @@ EntityBackendProperties IEntityOrchestrationService.EntityBackendProperties MaxConcurrentTaskEntityWorkItems = this.settings.MaxConcurrentTaskEntityWorkItems, SupportsImplicitEntityDeletion = false, // not supported by this backend MaximumSignalDelayTime = TimeSpan.FromDays(6), - UseSeparateQueriesForEntities = this.settings.UseSeparateQueriesForEntities, // TODO remove entities from orchestration queries if this is true + ExludeEntitiesFromOrchestrationQueries = this.settings.UseSeparateQueriesForEntities, // TODO remove entities from orchestration queries if this is true UseSeparateQueueForEntityWorkItems = this.settings.UseSeparateQueueForEntityWorkItems, }; diff --git a/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs b/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs index 7127a3e65..26f65c511 100644 --- a/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs +++ b/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs @@ -33,6 +33,8 @@ class EntityTrackingStoreQueries : EntityBackendQueries readonly EntityBackendProperties properties; readonly Func sendEvent; + static TimeSpan timeLimitForCleanEntityStorageLoop = TimeSpan.FromSeconds(5); + public EntityTrackingStoreQueries( MessageManager messageManager, AzureTableTrackingStore trackingStore, @@ -47,7 +49,7 @@ public EntityTrackingStoreQueries( this.sendEvent = sendEvent; } - public override async Task GetEntityAsync( + public async override Task GetEntityAsync( EntityId id, bool includeState = false, bool includeDeleted = false, @@ -55,10 +57,10 @@ public EntityTrackingStoreQueries( { await this.ensureTaskHub(); OrchestrationState? state = (await this.trackingStore.FetchInstanceStatusInternalAsync(id.ToString(), includeState))?.State; - return await GetEntityMetadataAsync(state, includeDeleted, includeState, cancellation); + return await this.GetEntityMetadataAsync(state, includeDeleted, includeState); } - public override async Task QueryEntitiesAsync(EntityQuery filter, CancellationToken cancellation) + public async override Task QueryEntitiesAsync(EntityQuery filter, CancellationToken cancellation) { var condition = new OrchestrationInstanceStatusQueryCondition() { @@ -74,12 +76,12 @@ public override async Task QueryEntitiesAsync(EntityQuery fil List entityResult; string? continuationToken = filter.ContinuationToken; - Stopwatch stopwatch = new Stopwatch(); + var stopwatch = new Stopwatch(); stopwatch.Start(); do { - var result = await this.trackingStore.GetStateAsync(condition, filter.PageSize ?? 100, continuationToken, cancellation); + DurableStatusQueryResult result = await this.trackingStore.GetStateAsync(condition, filter.PageSize ?? 100, continuationToken, cancellation); entityResult = await ConvertResultsAsync(result.OrchestrationState); continuationToken = result.ContinuationToken; } @@ -99,7 +101,7 @@ async ValueTask> ConvertResultsAsync(IEnumerable(); foreach (OrchestrationState entry in states) { - EntityMetadata? entityMetadata = await this.GetEntityMetadataAsync(entry, filter.IncludeDeleted, filter.IncludeState, cancellation); + EntityMetadata? entityMetadata = await this.GetEntityMetadataAsync(entry, filter.IncludeDeleted, filter.IncludeState); if (entityMetadata.HasValue) { entityResult.Add(entityMetadata.Value); @@ -112,9 +114,10 @@ async ValueTask> ConvertResultsAsync(IEnumerable CleanEntityStorageAsync(CleanEntityStorageRequest request = default(CleanEntityStorageRequest), CancellationToken cancellation = default(CancellationToken)) { DateTime now = DateTime.UtcNow; - string? continuationToken = null; + string? continuationToken = request.ContinuationToken; int emptyEntitiesRemoved = 0; int orphanedLocksReleased = 0; + var stopwatch = Stopwatch.StartNew(); var condition = new OrchestrationInstanceStatusQueryCondition() { @@ -129,10 +132,10 @@ async ValueTask> ConvertResultsAsync(IEnumerable tasks = new List(); - foreach (var state in page.OrchestrationState) + var tasks = new List(); + foreach (OrchestrationState state in page.OrchestrationState) { EntityStatus? status = ClientEntityHelpers.GetEntityStatus(state.Status); if (status != null) @@ -146,7 +149,7 @@ async ValueTask> ConvertResultsAsync(IEnumerable properties.EntityMessageReorderWindow); + (now - state.LastUpdatedTime > this.properties.EntityMessageReorderWindow); if (isEmptyEntity && safeToRemoveWithoutBreakingMessageSorterLogic) { tasks.Add(DeleteIdleOrchestrationEntity(state)); @@ -157,7 +160,7 @@ async ValueTask> ConvertResultsAsync(IEnumerable GetEntityMetadataAsync(OrchestrationState? state, bool includeDeleted, bool includeState, CancellationToken cancellation) + async ValueTask GetEntityMetadataAsync(OrchestrationState? state, bool includeDeleted, bool includeState) { if (state == null) { @@ -248,6 +252,5 @@ bool OrchestrationIsRunning(OrchestrationStatus? status) } } } - } } diff --git a/src/DurableTask.Core/Entities/EntityBackendProperties.cs b/src/DurableTask.Core/Entities/EntityBackendProperties.cs index e53c8e230..41f0f35eb 100644 --- a/src/DurableTask.Core/Entities/EntityBackendProperties.cs +++ b/src/DurableTask.Core/Entities/EntityBackendProperties.cs @@ -37,27 +37,28 @@ public class EntityBackendProperties public int MaxConcurrentTaskEntityWorkItems { get; set; } /// - /// Whether the backend supports implicit deletion, i.e. setting the entity scheduler state to null implicitly deletes the storage record. + /// Gets or sets whether the backend supports implicit deletion. Implicit deletion means that + /// the storage does not retain any data for entities that don't have any state. /// public bool SupportsImplicitEntityDeletion { get; set; } /// - /// Value of maximum durable timer delay. Used for delayed signals. + /// Gets or sets the maximum durable timer delay. Used for delayed signals. /// public TimeSpan MaximumSignalDelayTime { get; set; } /// - /// Whether the backend uses separate work item queues for entities and orchestrators. + /// Gets or sets whether the backend uses separate work item queues for entities and orchestrators. /// public bool UseSeparateQueueForEntityWorkItems { get; set; } /// - /// Whether the backend uses separate queries for entities and orchestrators. + /// Gets or sets whether entities should be excluded from orchestration queries. /// - public bool UseSeparateQueriesForEntities { get; set; } + public bool ExludeEntitiesFromOrchestrationQueries { get; set; } /// - /// Computes a cap on the scheduled time of an entity signal, based on the maximum signal delay time + /// A utility function to compute a cap on the scheduled time of an entity signal, based on the maximum signal delay time /// /// The current time. /// The scheduled time. diff --git a/src/DurableTask.Core/Entities/EntityBackendQueries.cs b/src/DurableTask.Core/Entities/EntityBackendQueries.cs index 73abf04ea..b203df9de 100644 --- a/src/DurableTask.Core/Entities/EntityBackendQueries.cs +++ b/src/DurableTask.Core/Entities/EntityBackendQueries.cs @@ -19,7 +19,7 @@ namespace DurableTask.Core.Entities using System.Threading.Tasks; /// - /// Entity query support, as provided at the abstraction level of an + /// Encapsulates support for entity queries, at the abstraction level of a storage backend. /// public abstract class EntityBackendQueries { @@ -86,12 +86,12 @@ public struct EntityQuery public string? InstanceIdStartsWith { get; set; } /// - /// Gets or sets entity instances which were last modified after the provided time. + /// Gets or sets a value indicating to include only entity instances which were last modified after the provided time. /// public DateTime? LastModifiedFrom { get; set; } /// - /// Gets or sets entity instances which were last modified before the provided time. + /// Gets or sets a value indicating to include only entity instances which were last modified before the provided time. /// public DateTime? LastModifiedTo { get; set; } @@ -101,16 +101,18 @@ public struct EntityQuery public bool IncludeState { get; set; } /// - /// Whether or not to include deleted entities. Newer implementations should never set this to true, - /// it is provided only for legacy compatibility. + /// Gets or sets a value indicating whether or not to include deleted entities. /// + /// + /// This setting is relevant only for providers which retain metadata for deleted entities ( is false). + /// public bool IncludeDeleted { get; set; } /// /// Gets or sets the desired size of each page to return. /// /// - /// If no size is specified, the backend chooses an appropriate page size based on its implementation. + /// If no size is specified, the backend may choose an appropriate page size based on its implementation. /// Note that the size of the returned page may be smaller or larger than the requested page size, and cannot /// be used to determine whether the end of the query has been reached. /// @@ -128,9 +130,9 @@ public struct EntityQuery public struct EntityQueryResult { /// - /// Gets or sets the list of query results. + /// Gets or sets the query results. /// - public List Results { get; set; } + public IEnumerable Results { get; set; } /// /// Gets or sets the continuation token to continue this query, if not null. @@ -144,14 +146,19 @@ public struct EntityQueryResult public struct CleanEntityStorageRequest { /// - /// Gets a value indicating whether to remove empty entities. + /// Gets or sets a value indicating whether to remove empty entities. /// public bool RemoveEmptyEntities { get; set; } /// - /// Gets a value indicating whether to release orphaned locks or not. + /// Gets or sets a value indicating whether to release orphaned locks or not. /// public bool ReleaseOrphanedLocks { get; set; } + + /// + /// Gets or sets the continuation token to resume a previous . + /// + public string? ContinuationToken { get; set; } } /// @@ -160,14 +167,19 @@ public struct CleanEntityStorageRequest public struct CleanEntityStorageResult { /// - /// Gets the number of empty entities removed. + /// Gets or sets the number of empty entities removed. /// public int EmptyEntitiesRemoved { get; set; } /// - /// Gets the number of orphaned locks that were removed. + /// Gets or sets the number of orphaned locks that were removed. /// public int OrphanedLocksReleased { get; set; } + + /// + /// Gets or sets the continuation token to continue the , if not null. + /// + public string? ContinuationToken { get; set; } } } } From 04da90a90b2c20b72de7d04edd07a7436944042e Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Wed, 13 Sep 2023 14:36:38 -0700 Subject: [PATCH 3/7] fix validation of which LockNext methods are being called. --- .../AzureStorageOrchestrationService.cs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index dc22736fc..35f97d590 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -306,9 +306,9 @@ Task IEntityOrchestrationService.LockNextOrchestratio TimeSpan receiveTimeout, CancellationToken cancellationToken) { - if (this.settings.UseSeparateQueueForEntityWorkItems) + if (!this.settings.UseSeparateQueueForEntityWorkItems) { - throw new InvalidOperationException("backend was configured for separate orchestation/entity processing, must use specialized methods to get work items"); + throw new InvalidOperationException("backend was not configured for separate entity processing"); } return this.LockNextTaskOrchestrationWorkItemAsync(false, cancellationToken); } @@ -678,6 +678,10 @@ public Task LockNextTaskOrchestrationWorkItemAsync( TimeSpan receiveTimeout, CancellationToken cancellationToken) { + if (this.settings.UseSeparateQueueForEntityWorkItems) + { + throw new InvalidOperationException("backend was configured for separate orchestration/entity processing, must use specialized methods to get work items"); + } return LockNextTaskOrchestrationWorkItemAsync(entitiesOnly: false, cancellationToken); } From dc32f6f746d094e2357eb5a4a80c8c11e16c0f07 Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Wed, 13 Sep 2023 14:37:16 -0700 Subject: [PATCH 4/7] improve comments --- .../Entities/EntityBackendProperties.cs | 10 ++++++++-- src/DurableTask.Core/Entities/EntityBackendQueries.cs | 3 ++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/DurableTask.Core/Entities/EntityBackendProperties.cs b/src/DurableTask.Core/Entities/EntityBackendProperties.cs index 41f0f35eb..5fddb1d4f 100644 --- a/src/DurableTask.Core/Entities/EntityBackendProperties.cs +++ b/src/DurableTask.Core/Entities/EntityBackendProperties.cs @@ -14,6 +14,7 @@ namespace DurableTask.Core.Entities { using System; + using System.Threading; /// /// Entity processing characteristics that are controlled by the backend provider, i.e. the orchestration service. @@ -48,7 +49,11 @@ public class EntityBackendProperties public TimeSpan MaximumSignalDelayTime { get; set; } /// - /// Gets or sets whether the backend uses separate work item queues for entities and orchestrators. + /// Gets or sets whether the backend uses separate work item queues for entities and orchestrators. If true, + /// the frontend must use and + /// + /// to fetch entities and orchestrations. Otherwise, it must use fetch both work items using + /// . /// public bool UseSeparateQueueForEntityWorkItems { get; set; } @@ -58,7 +63,8 @@ public class EntityBackendProperties public bool ExludeEntitiesFromOrchestrationQueries { get; set; } /// - /// A utility function to compute a cap on the scheduled time of an entity signal, based on the maximum signal delay time + /// A utility function to compute a cap on the scheduled time of an entity signal, based on the value of + /// . /// /// The current time. /// The scheduled time. diff --git a/src/DurableTask.Core/Entities/EntityBackendQueries.cs b/src/DurableTask.Core/Entities/EntityBackendQueries.cs index b203df9de..361667033 100644 --- a/src/DurableTask.Core/Entities/EntityBackendQueries.cs +++ b/src/DurableTask.Core/Entities/EntityBackendQueries.cs @@ -67,7 +67,8 @@ public struct EntityMetadata public DateTime LastModifiedTime { get; set; } /// - /// Gets or sets the serialized state for this entity. + /// Gets or sets the serialized state for this entity. Can be null if the query + /// specified to not include the state, or to include deleted entities. /// public string? SerializedState { get; set; } } From a0672ebc2e89992bb7575208a3af0923991124c4 Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Wed, 13 Sep 2023 14:37:41 -0700 Subject: [PATCH 5/7] fix useage of IEntityOrchestrationService. --- .../Entities/IEntityOrchestrationService.cs | 9 +++---- src/DurableTask.Core/TaskHubWorker.cs | 24 +++++++------------ .../TaskOrchestrationDispatcher.cs | 2 +- 3 files changed, 15 insertions(+), 20 deletions(-) diff --git a/src/DurableTask.Core/Entities/IEntityOrchestrationService.cs b/src/DurableTask.Core/Entities/IEntityOrchestrationService.cs index 79d9f7b86..4f68e39e1 100644 --- a/src/DurableTask.Core/Entities/IEntityOrchestrationService.cs +++ b/src/DurableTask.Core/Entities/IEntityOrchestrationService.cs @@ -25,14 +25,15 @@ public interface IEntityOrchestrationService : IOrchestrationService /// /// Properties of the backend. /// - /// An object containing properties of the entity backend. - EntityBackendProperties EntityBackendProperties { get; } + /// An object containing properties of the entity backend, or null if the backend does not natively support entities. + EntityBackendProperties? EntityBackendProperties { get; } /// /// Support for entity queries. /// - /// An object that can be used to issue entity queries to the orchestration service. - EntityBackendQueries EntityBackendQueries { get; } + /// An object that can be used to issue entity queries to the orchestration service, or null if the backend does not natively + /// support entity queries. + EntityBackendQueries? EntityBackendQueries { get; } /// /// Specialized variant of that diff --git a/src/DurableTask.Core/TaskHubWorker.cs b/src/DurableTask.Core/TaskHubWorker.cs index 3fa5a112c..c5cf0cd56 100644 --- a/src/DurableTask.Core/TaskHubWorker.cs +++ b/src/DurableTask.Core/TaskHubWorker.cs @@ -37,12 +37,11 @@ public sealed class TaskHubWorker : IDisposable readonly INameVersionObjectManager orchestrationManager; readonly INameVersionObjectManager entityManager; - readonly IEntityOrchestrationService entityOrchestrationService; // non-null if backend uses separate dispatch for entities - readonly DispatchMiddlewarePipeline orchestrationDispatchPipeline = new DispatchMiddlewarePipeline(); readonly DispatchMiddlewarePipeline entityDispatchPipeline = new DispatchMiddlewarePipeline(); readonly DispatchMiddlewarePipeline activityDispatchPipeline = new DispatchMiddlewarePipeline(); + readonly bool dispatchEntitiesSeparately; readonly SemaphoreSlim slimLock = new SemaphoreSlim(1, 1); readonly LogHelper logHelper; @@ -52,11 +51,6 @@ public sealed class TaskHubWorker : IDisposable // ReSharper disable once InconsistentNaming (avoid breaking change) public IOrchestrationService orchestrationService { get; } - /// - /// Indicates whether the configured backend supports entities. - /// - public bool SupportsEntities => this.entityOrchestrationService != null; - volatile bool isStarted; TaskActivityDispatcher activityDispatcher; @@ -152,7 +146,7 @@ public TaskHubWorker( this.entityManager = entityObjectManager ?? throw new ArgumentException("entityObjectManager"); this.orchestrationService = orchestrationService ?? throw new ArgumentException("orchestrationService"); this.logHelper = new LogHelper(loggerFactory?.CreateLogger("DurableTask.Core")); - this.entityOrchestrationService = entityOrchestrationService as IEntityOrchestrationService; + this.dispatchEntitiesSeparately = (orchestrationService as IEntityOrchestrationService).EntityBackendProperties?.UseSeparateQueueForEntityWorkItems ?? false; } /// @@ -239,7 +233,7 @@ public async Task StartAsync() this.logHelper, this.ErrorPropagationMode); - if (this.SupportsEntities) + if (this.dispatchEntitiesSeparately) { this.entityDispatcher = new TaskEntityDispatcher( this.orchestrationService, @@ -253,7 +247,7 @@ public async Task StartAsync() await this.orchestrationDispatcher.StartAsync(); await this.activityDispatcher.StartAsync(); - if (this.SupportsEntities) + if (this.dispatchEntitiesSeparately) { await this.entityDispatcher.StartAsync(); } @@ -295,7 +289,7 @@ public async Task StopAsync(bool isForced) { this.orchestrationDispatcher.StopAsync(isForced), this.activityDispatcher.StopAsync(isForced), - this.SupportsEntities ? this.entityDispatcher.StopAsync(isForced) : Task.CompletedTask, + this.dispatchEntitiesSeparately ? this.entityDispatcher.StopAsync(isForced) : Task.CompletedTask, }; await Task.WhenAll(dispatcherShutdowns); @@ -352,9 +346,9 @@ public TaskHubWorker AddTaskOrchestrations(params ObjectCreator public TaskHubWorker AddTaskEntities(params Type[] taskEntityTypes) { - if (!this.SupportsEntities) + if (!this.dispatchEntitiesSeparately) { - throw new NotSupportedException("The configured backend does not support entities."); + throw new NotSupportedException("The configured backend does not support separate entity dispatch."); } foreach (Type type in taskEntityTypes) @@ -379,9 +373,9 @@ public TaskHubWorker AddTaskEntities(params Type[] taskEntityTypes) /// public TaskHubWorker AddTaskEntities(params ObjectCreator[] taskEntityCreators) { - if (!this.SupportsEntities) + if (!this.dispatchEntitiesSeparately) { - throw new NotSupportedException("The configured backend does not support entities."); + throw new NotSupportedException("The configured backend does not support separate entity dispatch."); } foreach (ObjectCreator creator in taskEntityCreators) diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index 735503382..115bba19c 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -58,7 +58,7 @@ internal TaskOrchestrationDispatcher( this.dispatchPipeline = dispatchPipeline ?? throw new ArgumentNullException(nameof(dispatchPipeline)); this.logHelper = logHelper ?? throw new ArgumentNullException(nameof(logHelper)); this.errorPropagationMode = errorPropagationMode; - this.entityOrchestrationService = entityOrchestrationService as IEntityOrchestrationService; + this.entityOrchestrationService = orchestrationService as IEntityOrchestrationService; this.entityBackendProperties = this.entityOrchestrationService?.EntityBackendProperties; this.dispatcher = new WorkItemDispatcher( From b113ae0e5e492286bec43060a714c5292f308019 Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Wed, 13 Sep 2023 16:26:08 -0700 Subject: [PATCH 6/7] revise how to exclude entity results from queries. --- .../AzureStorageOrchestrationService.cs | 2 +- .../AzureStorageOrchestrationServiceSettings.cs | 7 ------- .../EntityTrackingStoreQueries.cs | 2 ++ .../OrchestrationInstanceStatusQueryCondition.cs | 12 ++++++++++++ .../Entities/EntityBackendProperties.cs | 5 ----- src/DurableTask.Core/Query/OrchestrationQuery.cs | 6 ++++++ 6 files changed, 21 insertions(+), 13 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 35f97d590..9355b63b7 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -289,7 +289,6 @@ EntityBackendProperties IEntityOrchestrationService.EntityBackendProperties MaxConcurrentTaskEntityWorkItems = this.settings.MaxConcurrentTaskEntityWorkItems, SupportsImplicitEntityDeletion = false, // not supported by this backend MaximumSignalDelayTime = TimeSpan.FromDays(6), - ExludeEntitiesFromOrchestrationQueries = this.settings.UseSeparateQueriesForEntities, // TODO remove entities from orchestration queries if this is true UseSeparateQueueForEntityWorkItems = this.settings.UseSeparateQueueForEntityWorkItems, }; @@ -2105,6 +2104,7 @@ private static OrchestrationInstanceStatusQueryCondition ToAzureStorageCondition TaskHubNames = condition.TaskHubNames, InstanceIdPrefix = condition.InstanceIdPrefix, FetchInput = condition.FetchInputsAndOutputs, + ExcludeEntities = condition.ExcludeEntities, }; } diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs index 1740d3a32..819480bde 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs @@ -318,12 +318,5 @@ internal LogHelper Logger /// Consumers that support separate dispatch should explicitly set this to true. /// public bool UseSeparateQueueForEntityWorkItems { get; set; } = false; - - /// - /// Whether to use separate queries for entities and orchestrators. - /// This defaults to false, to avoid issues when using this provider from code that does not support separate queries. - /// Consumers that expect separate queries should explicitly set this to true. - /// - public bool UseSeparateQueriesForEntities { get; set; } = false; } } diff --git a/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs b/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs index 26f65c511..a4638a9d8 100644 --- a/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs +++ b/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs @@ -70,6 +70,7 @@ public async override Task QueryEntitiesAsync(EntityQuery fil CreatedTimeTo = filter.LastModifiedTo ?? default(DateTime), FetchInput = filter.IncludeState, FetchOutput = false, + ExcludeEntities = false, }; await this.ensureTaskHub(); @@ -124,6 +125,7 @@ async ValueTask> ConvertResultsAsync(IEnumerable public bool FetchOutput { get; set; } = true; + /// + /// Whether to exclude entities from the results. + /// + public bool ExcludeEntities { get; set; } = false; + /// /// Get the TableQuery object /// @@ -159,6 +164,13 @@ string GetConditions() TableOperators.And, TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.LessThan, greaterThanPrefix))); } + else if (this.ExcludeEntities) + { + conditions.Add(TableQuery.CombineFilters( + TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.LessThan, "@"), + TableOperators.Or, + TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.GreaterThanOrEqual, "A"))); + } if (this.InstanceId != null) { diff --git a/src/DurableTask.Core/Entities/EntityBackendProperties.cs b/src/DurableTask.Core/Entities/EntityBackendProperties.cs index 5fddb1d4f..20d8ec9e0 100644 --- a/src/DurableTask.Core/Entities/EntityBackendProperties.cs +++ b/src/DurableTask.Core/Entities/EntityBackendProperties.cs @@ -57,11 +57,6 @@ public class EntityBackendProperties /// public bool UseSeparateQueueForEntityWorkItems { get; set; } - /// - /// Gets or sets whether entities should be excluded from orchestration queries. - /// - public bool ExludeEntitiesFromOrchestrationQueries { get; set; } - /// /// A utility function to compute a cap on the scheduled time of an entity signal, based on the value of /// . diff --git a/src/DurableTask.Core/Query/OrchestrationQuery.cs b/src/DurableTask.Core/Query/OrchestrationQuery.cs index 2932d11da..0fc4140f6 100644 --- a/src/DurableTask.Core/Query/OrchestrationQuery.cs +++ b/src/DurableTask.Core/Query/OrchestrationQuery.cs @@ -69,5 +69,11 @@ public OrchestrationQuery() { } /// Determines whether the query will include the input of the orchestration. /// public bool FetchInputsAndOutputs { get; set; } = true; + + /// + /// Whether to exclude entities from the query results. This defaults to false for compatibility with older SDKs, + /// but is set to true by the newer SDKs. + /// + public bool ExcludeEntities { get; set; } = false; } } \ No newline at end of file From c43b8d9c1d282ccf05e6a8b74275e87be8504f46 Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Tue, 19 Sep 2023 10:44:21 -0700 Subject: [PATCH 7/7] address PR feedback --- .../AzureStorageOrchestrationService.cs | 9 ++++----- .../AzureStorageOrchestrationServiceSettings.cs | 2 +- .../EntityTrackingStoreQueries.cs | 6 +++--- .../Entities/IEntityOrchestrationService.cs | 4 ++-- 4 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 9355b63b7..9f5d15047 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -295,8 +295,7 @@ EntityBackendProperties IEntityOrchestrationService.EntityBackendProperties EntityBackendQueries IEntityOrchestrationService.EntityBackendQueries => new EntityTrackingStoreQueries( this.messageManager, - (this.trackingStore as AzureTableTrackingStore) - ?? throw new NotSupportedException("entity queries not supported for custom tracking stores"), + this.trackingStore, this.EnsureTaskHubAsync, ((IEntityOrchestrationService)this).EntityBackendProperties, this.SendTaskOrchestrationMessageAsync); @@ -307,7 +306,7 @@ Task IEntityOrchestrationService.LockNextOrchestratio { if (!this.settings.UseSeparateQueueForEntityWorkItems) { - throw new InvalidOperationException("backend was not configured for separate entity processing"); + throw new InvalidOperationException("Internal configuration is inconsistent. Backend is using single queue for orchestration/entity dispatch, but frontend is pulling from individual queues."); } return this.LockNextTaskOrchestrationWorkItemAsync(false, cancellationToken); } @@ -318,7 +317,7 @@ Task IEntityOrchestrationService.LockNextEntityWorkIt { if (!this.settings.UseSeparateQueueForEntityWorkItems) { - throw new InvalidOperationException("backend was not configured for separate entity processing"); + throw new InvalidOperationException("Internal configuration is inconsistent. Backend is using single queue for orchestration/entity dispatch, but frontend is pulling from individual queues."); } return this.LockNextTaskOrchestrationWorkItemAsync(entitiesOnly: true, cancellationToken); } @@ -679,7 +678,7 @@ public Task LockNextTaskOrchestrationWorkItemAsync( { if (this.settings.UseSeparateQueueForEntityWorkItems) { - throw new InvalidOperationException("backend was configured for separate orchestration/entity processing, must use specialized methods to get work items"); + throw new InvalidOperationException("Internal configuration is inconsistent. Backend is using separate queues for orchestration/entity dispatch, but frontend is pulling from single queue."); } return LockNextTaskOrchestrationWorkItemAsync(entitiesOnly: false, cancellationToken); } diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs index 819480bde..609a8b35d 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs @@ -315,7 +315,7 @@ internal LogHelper Logger /// /// Whether to use separate work item queues for entities and orchestrators. /// This defaults to false, to avoid issues when using this provider from code that does not support separate dispatch. - /// Consumers that support separate dispatch should explicitly set this to true. + /// Consumers that require separate dispatch (such as the new out-of-proc v2 SDKs) must set this to true. /// public bool UseSeparateQueueForEntityWorkItems { get; set; } = false; } diff --git a/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs b/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs index a4638a9d8..e6a146833 100644 --- a/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs +++ b/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs @@ -28,7 +28,7 @@ namespace DurableTask.AzureStorage class EntityTrackingStoreQueries : EntityBackendQueries { readonly MessageManager messageManager; - readonly AzureTableTrackingStore trackingStore; + readonly ITrackingStore trackingStore; readonly Func ensureTaskHub; readonly EntityBackendProperties properties; readonly Func sendEvent; @@ -37,7 +37,7 @@ class EntityTrackingStoreQueries : EntityBackendQueries public EntityTrackingStoreQueries( MessageManager messageManager, - AzureTableTrackingStore trackingStore, + ITrackingStore trackingStore, Func ensureTaskHub, EntityBackendProperties properties, Func sendEvent) @@ -56,7 +56,7 @@ public EntityTrackingStoreQueries( CancellationToken cancellation = default(CancellationToken)) { await this.ensureTaskHub(); - OrchestrationState? state = (await this.trackingStore.FetchInstanceStatusInternalAsync(id.ToString(), includeState))?.State; + OrchestrationState? state = (await this.trackingStore.GetStateAsync(id.ToString(), allExecutions: false, fetchInput: includeState)).FirstOrDefault(); return await this.GetEntityMetadataAsync(state, includeDeleted, includeState); } diff --git a/src/DurableTask.Core/Entities/IEntityOrchestrationService.cs b/src/DurableTask.Core/Entities/IEntityOrchestrationService.cs index 4f68e39e1..c8c9b80c8 100644 --- a/src/DurableTask.Core/Entities/IEntityOrchestrationService.cs +++ b/src/DurableTask.Core/Entities/IEntityOrchestrationService.cs @@ -23,9 +23,9 @@ namespace DurableTask.Core.Entities public interface IEntityOrchestrationService : IOrchestrationService { /// - /// Properties of the backend. + /// Properties of the backend implementation and configuration, as related to the new entity support in DurableTask.Core. /// - /// An object containing properties of the entity backend, or null if the backend does not natively support entities. + /// An object containing properties of the entity backend, or null if the backend does not natively support DurableTask.Core entities. EntityBackendProperties? EntityBackendProperties { get; } ///