Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 18 additions & 17 deletions src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -281,36 +281,32 @@ public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew

#region IEntityOrchestrationService

EntityBackendProperties IEntityOrchestrationService.GetEntityBackendProperties()
EntityBackendProperties IEntityOrchestrationService.EntityBackendProperties
=> new EntityBackendProperties()
{
EntityMessageReorderWindow = TimeSpan.FromMinutes(this.settings.EntityMessageReorderWindowInMinutes),
MaxEntityOperationBatchSize = this.settings.MaxEntityOperationBatchSize,
MaxConcurrentTaskEntityWorkItems = this.settings.MaxConcurrentTaskEntityWorkItems,
SupportsImplicitEntityDeletion = false, // not supported by this backend
MaximumSignalDelayTime = TimeSpan.FromDays(6),
UseSeparateQueueForEntityWorkItems = this.settings.UseSeparateQueueForEntityWorkItems,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this "use separate queue" feature?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means the backend maintains two separate queues for orchestration work items and entity work items. Among other things, this makes it possible to specify separate concurrency limits for the two.

};

bool IEntityOrchestrationService.ProcessEntitiesSeparately()
{
if (this.settings.UseSeparateQueueForEntityWorkItems)
{
this.orchestrationSessionManager.ProcessEntitiesSeparately = true;
return true;
}
else
{
return false;
}
}
EntityBackendQueries IEntityOrchestrationService.EntityBackendQueries
=> new EntityTrackingStoreQueries(
this.messageManager,
this.trackingStore,
this.EnsureTaskHubAsync,
((IEntityOrchestrationService)this).EntityBackendProperties,
this.SendTaskOrchestrationMessageAsync);

Task<TaskOrchestrationWorkItem> IEntityOrchestrationService.LockNextOrchestrationWorkItemAsync(
TimeSpan receiveTimeout,
CancellationToken cancellationToken)
{
if (!orchestrationSessionManager.ProcessEntitiesSeparately)
if (!this.settings.UseSeparateQueueForEntityWorkItems)
{
throw new InvalidOperationException("backend was not configured for separate entity processing");
throw new InvalidOperationException("Internal configuration is inconsistent. Backend is using single queue for orchestration/entity dispatch, but frontend is pulling from individual queues.");
}
return this.LockNextTaskOrchestrationWorkItemAsync(false, cancellationToken);
}
Expand All @@ -319,9 +315,9 @@ Task<TaskOrchestrationWorkItem> IEntityOrchestrationService.LockNextEntityWorkIt
TimeSpan receiveTimeout,
CancellationToken cancellationToken)
{
if (!orchestrationSessionManager.ProcessEntitiesSeparately)
if (!this.settings.UseSeparateQueueForEntityWorkItems)
{
throw new InvalidOperationException("backend was not configured for separate entity processing");
throw new InvalidOperationException("Internal configuration is inconsistent. Backend is using single queue for orchestration/entity dispatch, but frontend is pulling from individual queues.");
}
return this.LockNextTaskOrchestrationWorkItemAsync(entitiesOnly: true, cancellationToken);
}
Expand Down Expand Up @@ -680,6 +676,10 @@ public Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(
TimeSpan receiveTimeout,
CancellationToken cancellationToken)
{
if (this.settings.UseSeparateQueueForEntityWorkItems)
{
throw new InvalidOperationException("Internal configuration is inconsistent. Backend is using separate queues for orchestration/entity dispatch, but frontend is pulling from single queue.");
}
return LockNextTaskOrchestrationWorkItemAsync(entitiesOnly: false, cancellationToken);
}

Expand Down Expand Up @@ -2103,6 +2103,7 @@ private static OrchestrationInstanceStatusQueryCondition ToAzureStorageCondition
TaskHubNames = condition.TaskHubNames,
InstanceIdPrefix = condition.InstanceIdPrefix,
FetchInput = condition.FetchInputsAndOutputs,
ExcludeEntities = condition.ExcludeEntities,
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ internal LogHelper Logger
/// <summary>
/// Whether to use separate work item queues for entities and orchestrators.
/// This defaults to false, to avoid issues when using this provider from code that does not support separate dispatch.
/// Consumers that support separate dispatch should explicitly set this to true.
/// Consumers that require separate dispatch (such as the new out-of-proc v2 SDKs) must set this to true.
/// </summary>
public bool UseSeparateQueueForEntityWorkItems { get; set; } = false;
}
Expand Down
258 changes: 258 additions & 0 deletions src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------using System;
#nullable enable
namespace DurableTask.AzureStorage
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.Serialization.Json;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.AzureStorage.Tracking;
using DurableTask.Core;
using DurableTask.Core.Entities;

class EntityTrackingStoreQueries : EntityBackendQueries
{
readonly MessageManager messageManager;
readonly ITrackingStore trackingStore;
readonly Func<Task> ensureTaskHub;
readonly EntityBackendProperties properties;
readonly Func<TaskMessage, Task> sendEvent;

static TimeSpan timeLimitForCleanEntityStorageLoop = TimeSpan.FromSeconds(5);

public EntityTrackingStoreQueries(
MessageManager messageManager,
ITrackingStore trackingStore,
Func<Task> ensureTaskHub,
EntityBackendProperties properties,
Func<TaskMessage, Task> sendEvent)
{
this.messageManager = messageManager;
this.trackingStore = trackingStore;
this.ensureTaskHub = ensureTaskHub;
this.properties = properties;
this.sendEvent = sendEvent;
}

public async override Task<EntityMetadata?> GetEntityAsync(
EntityId id,
bool includeState = false,
bool includeDeleted = false,
CancellationToken cancellation = default(CancellationToken))
{
await this.ensureTaskHub();
OrchestrationState? state = (await this.trackingStore.GetStateAsync(id.ToString(), allExecutions: false, fetchInput: includeState)).FirstOrDefault();
return await this.GetEntityMetadataAsync(state, includeDeleted, includeState);
}

public async override Task<EntityQueryResult> QueryEntitiesAsync(EntityQuery filter, CancellationToken cancellation)
{
var condition = new OrchestrationInstanceStatusQueryCondition()
{
InstanceId = null,
InstanceIdPrefix = filter.InstanceIdStartsWith,
CreatedTimeFrom = filter.LastModifiedFrom ?? default(DateTime),
CreatedTimeTo = filter.LastModifiedTo ?? default(DateTime),
FetchInput = filter.IncludeState,
FetchOutput = false,
ExcludeEntities = false,
};

await this.ensureTaskHub();

List<EntityMetadata> entityResult;
string? continuationToken = filter.ContinuationToken;
var stopwatch = new Stopwatch();
stopwatch.Start();

do
{
DurableStatusQueryResult result = await this.trackingStore.GetStateAsync(condition, filter.PageSize ?? 100, continuationToken, cancellation);
entityResult = await ConvertResultsAsync(result.OrchestrationState);
continuationToken = result.ContinuationToken;
}
while ( // continue query right away if the page is completely empty, but never in excess of 100ms
continuationToken != null
&& entityResult.Count == 0
&& stopwatch.ElapsedMilliseconds <= 100);
Comment on lines +89 to +92
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have we always had this 100ms limit?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this was already there (but it was in the DF extension since that's where we implemented this query).


return new EntityQueryResult()
{
Results = entityResult,
ContinuationToken = continuationToken,
};

async ValueTask<List<EntityMetadata>> ConvertResultsAsync(IEnumerable<OrchestrationState> states)
{
entityResult = new List<EntityMetadata>();
foreach (OrchestrationState entry in states)
{
EntityMetadata? entityMetadata = await this.GetEntityMetadataAsync(entry, filter.IncludeDeleted, filter.IncludeState);
if (entityMetadata.HasValue)
{
entityResult.Add(entityMetadata.Value);
}
}
return entityResult;
}
}

public async override Task<CleanEntityStorageResult> CleanEntityStorageAsync(CleanEntityStorageRequest request = default(CleanEntityStorageRequest), CancellationToken cancellation = default(CancellationToken))
{
DateTime now = DateTime.UtcNow;
string? continuationToken = request.ContinuationToken;
int emptyEntitiesRemoved = 0;
int orphanedLocksReleased = 0;
var stopwatch = Stopwatch.StartNew();

var condition = new OrchestrationInstanceStatusQueryCondition()
{
InstanceIdPrefix = "@",
FetchInput = false,
FetchOutput = false,
ExcludeEntities = false,
};

await this.ensureTaskHub();

// list all entities (without fetching the input) and for each one that requires action,
// perform that action. Waits for all actions to finish after each page.
do
{
DurableStatusQueryResult page = await this.trackingStore.GetStateAsync(condition, 100, continuationToken, cancellation);

var tasks = new List<Task>();
foreach (OrchestrationState state in page.OrchestrationState)
{
EntityStatus? status = ClientEntityHelpers.GetEntityStatus(state.Status);
if (status != null)
{
if (request.ReleaseOrphanedLocks && status.LockedBy != null)
{
tasks.Add(CheckForOrphanedLockAndFixIt(state, status.LockedBy));
}

if (request.RemoveEmptyEntities)
{
bool isEmptyEntity = !status.EntityExists && status.LockedBy == null && status.QueueSize == 0;
bool safeToRemoveWithoutBreakingMessageSorterLogic =
(now - state.LastUpdatedTime > this.properties.EntityMessageReorderWindow);
if (isEmptyEntity && safeToRemoveWithoutBreakingMessageSorterLogic)
{
tasks.Add(DeleteIdleOrchestrationEntity(state));
}
}
}
}

async Task DeleteIdleOrchestrationEntity(OrchestrationState state)
{
PurgeHistoryResult result = await this.trackingStore.PurgeInstanceHistoryAsync(state.OrchestrationInstance.InstanceId);
Interlocked.Add(ref emptyEntitiesRemoved, result.InstancesDeleted);
}

async Task CheckForOrphanedLockAndFixIt(OrchestrationState state, string lockOwner)
{
OrchestrationState? ownerState
= (await this.trackingStore.GetStateAsync(lockOwner, allExecutions: false, fetchInput: false)).FirstOrDefault();

bool OrchestrationIsRunning(OrchestrationStatus? status)
=> status != null && (status == OrchestrationStatus.Running || status == OrchestrationStatus.Suspended);

if (! OrchestrationIsRunning(ownerState?.OrchestrationStatus))
{
// the owner is not a running orchestration. Send a lock release.
var targetInstance = new OrchestrationInstance() { InstanceId = lockOwner };
EntityMessageEvent eventToSend = ClientEntityHelpers.EmitUnlockForOrphanedLock(targetInstance, lockOwner);
await this.sendEvent(eventToSend.AsTaskMessage());
Interlocked.Increment(ref orphanedLocksReleased);
}
}

await Task.WhenAll(tasks);
}
while (continuationToken != null & stopwatch.Elapsed <= timeLimitForCleanEntityStorageLoop);

return new CleanEntityStorageResult()
{
EmptyEntitiesRemoved = emptyEntitiesRemoved,
OrphanedLocksReleased = orphanedLocksReleased,
ContinuationToken = continuationToken,
};
}

async ValueTask<EntityMetadata?> GetEntityMetadataAsync(OrchestrationState? state, bool includeDeleted, bool includeState)
{
if (state == null)
{
return null;
}

if (!includeState)
{
if (!includeDeleted)
{
// it is possible that this entity was logically deleted even though its orchestration was not purged yet.
// we can check this efficiently (i.e. without deserializing anything) by looking at just the custom status
if (!EntityStatus.TestEntityExists(state.Status))
{
return null;
}
}

return new EntityMetadata()
{
EntityId = EntityId.FromString(state.OrchestrationInstance.InstanceId),
LastModifiedTime = state.CreatedTime,
SerializedState = null, // we were instructed to not include the state
};
}
else
{
// first, retrieve the entity scheduler state (= input of the orchestration state), possibly from blob storage.
string serializedSchedulerState;
if (MessageManager.TryGetLargeMessageReference(state.Input, out Uri blobUrl))
{
serializedSchedulerState = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobUrl);
}
else
{
serializedSchedulerState = state.Input;
}

// next, extract the entity state from the scheduler state
string? serializedEntityState = ClientEntityHelpers.GetEntityState(serializedSchedulerState);

// return the result to the user
if (!includeDeleted && serializedEntityState == null)
{
return null;
}
else
{
return new EntityMetadata()
{
EntityId = EntityId.FromString(state.OrchestrationInstance.InstanceId),
LastModifiedTime = state.CreatedTime,
SerializedState = serializedEntityState,
};
}
}
}
}
}
11 changes: 2 additions & 9 deletions src/DurableTask.AzureStorage/OrchestrationSessionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,6 @@ public OrchestrationSessionManager(

internal IEnumerable<ControlQueue> Queues => this.ownedControlQueues.Values;

/// <summary>
/// Recent versions of DurableTask.Core can be configured to use a separate pipeline for processing entity work items,
/// while older versions use a single pipeline for both orchestration and entity work items. To support both scenarios,
/// this property can be modified prior to starting the orchestration service. If set to true, the work items that are ready for
/// processing are stored in <see cref="entitiesReadyForProcessingQueue"/> and <see cref="orchestrationsReadyForProcessingQueue"/>, respectively.
/// </summary>
internal bool ProcessEntitiesSeparately { get; set; }

public void AddQueue(string partitionId, ControlQueue controlQueue, CancellationToken cancellationToken)
{
if (this.ownedControlQueues.TryAdd(partitionId, controlQueue))
Expand Down Expand Up @@ -529,7 +521,8 @@ async Task ScheduleOrchestrationStatePrefetch(
batch.TrackingStoreContext = history.TrackingStoreContext;
}

if (this.ProcessEntitiesSeparately && DurableTask.Core.Common.Entities.IsEntityInstance(batch.OrchestrationInstanceId))
if (this.settings.UseSeparateQueueForEntityWorkItems
&& DurableTask.Core.Common.Entities.IsEntityInstance(batch.OrchestrationInstanceId))
{
this.entitiesReadyForProcessingQueue.Enqueue(node);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ public override async Task<IList<OrchestrationState>> GetStateAsync(string insta
}

/// <inheritdoc />
async Task<InstanceStatus?> FetchInstanceStatusInternalAsync(string instanceId, bool fetchInput)
internal async Task<InstanceStatus?> FetchInstanceStatusInternalAsync(string instanceId, bool fetchInput)
{
if (instanceId == null)
{
Expand Down
Loading