Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ public EntityTrackingStoreQueries(
public async override Task<EntityMetadata?> GetEntityAsync(
EntityId id,
bool includeState = false,
bool includeDeleted = false,
bool includeStateless = false,
CancellationToken cancellation = default(CancellationToken))
{
await this.ensureTaskHub();
OrchestrationState? state = (await this.trackingStore.GetStateAsync(id.ToString(), allExecutions: false, fetchInput: includeState)).FirstOrDefault();
return await this.GetEntityMetadataAsync(state, includeDeleted, includeState);
return await this.GetEntityMetadataAsync(state, includeStateless, includeState);
}

public async override Task<EntityQueryResult> QueryEntitiesAsync(EntityQuery filter, CancellationToken cancellation)
Expand Down Expand Up @@ -102,7 +102,7 @@ async ValueTask<List<EntityMetadata>> ConvertResultsAsync(IEnumerable<Orchestrat
entityResult = new List<EntityMetadata>();
foreach (OrchestrationState entry in states)
{
EntityMetadata? entityMetadata = await this.GetEntityMetadataAsync(entry, filter.IncludeDeleted, filter.IncludeState);
EntityMetadata? entityMetadata = await this.GetEntityMetadataAsync(entry, filter.IncludeStateless, filter.IncludeState);
if (entityMetadata.HasValue)
{
entityResult.Add(entityMetadata.Value);
Expand Down Expand Up @@ -149,7 +149,7 @@ async ValueTask<List<EntityMetadata>> ConvertResultsAsync(IEnumerable<Orchestrat

if (request.RemoveEmptyEntities)
{
bool isEmptyEntity = !status.EntityExists && status.LockedBy == null && status.QueueSize == 0;
bool isEmptyEntity = !status.EntityExists && status.LockedBy == null && status.BacklogQueueSize == 0;
bool safeToRemoveWithoutBreakingMessageSorterLogic =
(now - state.LastUpdatedTime > this.properties.EntityMessageReorderWindow);
if (isEmptyEntity && safeToRemoveWithoutBreakingMessageSorterLogic)
Expand Down Expand Up @@ -196,7 +196,7 @@ bool OrchestrationIsRunning(OrchestrationStatus? status)
};
}

async ValueTask<EntityMetadata?> GetEntityMetadataAsync(OrchestrationState? state, bool includeDeleted, bool includeState)
async ValueTask<EntityMetadata?> GetEntityMetadataAsync(OrchestrationState? state, bool includeStateless, bool includeState)
{
if (state == null)
{
Expand All @@ -205,7 +205,7 @@ bool OrchestrationIsRunning(OrchestrationStatus? status)

if (!includeState)
{
if (!includeDeleted)
if (!includeStateless)
{
// it is possible that this entity was logically deleted even though its orchestration was not purged yet.
// we can check this efficiently (i.e. without deserializing anything) by looking at just the custom status
Expand All @@ -215,10 +215,14 @@ bool OrchestrationIsRunning(OrchestrationStatus? status)
}
}

EntityStatus? status = ClientEntityHelpers.GetEntityStatus(state.Status);

return new EntityMetadata()
{
EntityId = EntityId.FromString(state.OrchestrationInstance.InstanceId),
LastModifiedTime = state.CreatedTime,
BacklogQueueSize = status?.BacklogQueueSize ?? 0,
LockedBy = status?.LockedBy,
SerializedState = null, // we were instructed to not include the state
};
}
Expand All @@ -239,16 +243,20 @@ bool OrchestrationIsRunning(OrchestrationStatus? status)
string? serializedEntityState = ClientEntityHelpers.GetEntityState(serializedSchedulerState);

// return the result to the user
if (!includeDeleted && serializedEntityState == null)
if (!includeStateless && serializedEntityState == null)
{
return null;
}
else
{
EntityStatus? status = ClientEntityHelpers.GetEntityStatus(state.Status);

return new EntityMetadata()
{
EntityId = EntityId.FromString(state.OrchestrationInstance.InstanceId),
LastModifiedTime = state.CreatedTime,
BacklogQueueSize = status?.BacklogQueueSize ?? 0,
LockedBy = status?.LockedBy,
SerializedState = serializedEntityState,
};
}
Expand Down
24 changes: 18 additions & 6 deletions src/DurableTask.Core/Entities/EntityBackendQueries.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ public abstract class EntityBackendQueries
/// </summary>
/// <param name="id">The ID of the entity to get.</param>
/// <param name="includeState"><c>true</c> to include entity state in the response, <c>false</c> to not.</param>
/// <param name="includeDeleted">whether to return metadata for a deleted entity (if such data was retained by the backend).</param>
/// <param name="includeStateless">whether to include metadata for entities without user-defined state.</param>
/// <param name="cancellation">The cancellation token to cancel the operation.</param>
/// <returns>a response containing metadata describing the entity.</returns>
public abstract Task<EntityMetadata?> GetEntityAsync(
EntityId id, bool includeState = false, bool includeDeleted = false, CancellationToken cancellation = default);
EntityId id, bool includeState = false, bool includeStateless = false, CancellationToken cancellation = default);

/// <summary>
/// Queries entity instances based on the conditions specified in <paramref name="query"/>.
Expand Down Expand Up @@ -66,6 +66,16 @@ public struct EntityMetadata
/// </summary>
public DateTime LastModifiedTime { get; set; }

/// <summary>
/// Gets the size of the backlog queue, if there is a backlog, and if that metric is supported by the backend.
/// </summary>
public int BacklogQueueSize { get; set; }

/// <summary>
/// Gets the instance id of the orchestration that has locked this entity, or null if the entity is not locked.
/// </summary>
public string? LockedBy { get; set; }

/// <summary>
/// Gets or sets the serialized state for this entity. Can be null if the query
/// specified to not include the state, or to include deleted entities.
Expand Down Expand Up @@ -102,12 +112,14 @@ public struct EntityQuery
public bool IncludeState { get; set; }

/// <summary>
/// Gets or sets a value indicating whether or not to include deleted entities.
/// Gets a value indicating whether to include metadata about entities that have no user-defined state.
/// </summary>
/// <remarks>
/// This setting is relevant only for providers which retain metadata for deleted entities (<see cref="EntityBackendProperties.SupportsImplicitEntityDeletion"/> is false).
/// <remarks> Stateless entities occur when the storage provider is tracking metadata about an entity for synchronization purposes
/// even though the entity does not "logically" exist, in the sense that it has no application-defined state.
/// Stateless entities are usually transient. For example, they may be in the process of being created or deleted, or
/// they may have been locked by a critical section.
/// </remarks>
public bool IncludeDeleted { get; set; }
public bool IncludeStateless { get; set; }

/// <summary>
/// Gets or sets the desired size of each page to return.
Expand Down
4 changes: 2 additions & 2 deletions src/DurableTask.Core/Entities/StateFormat/EntityStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static bool TestEntityExists(string serializedJson)
}

/// <summary>
/// Whether this entity exists or not.
/// Whether this entity currently has a user-defined state or not.
/// </summary>
[DataMember(Name = EntityExistsProperyName, EmitDefaultValue = false)]
public bool EntityExists { get; set; }
Expand All @@ -47,7 +47,7 @@ public static bool TestEntityExists(string serializedJson)
/// The size of the queue, i.e. the number of operations that are waiting for the current operation to complete.
/// </summary>
[DataMember(Name = "queueSize", EmitDefaultValue = false)]
public int QueueSize { get; set; }
public int BacklogQueueSize { get; set; }
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept the JSON member name the same as before so this remains compatible with existing storage layouts, allowing us to migrate non-isolated DF apps at some point.


/// <summary>
/// The instance id of the orchestration that currently holds the lock of this entity.
Expand Down
2 changes: 1 addition & 1 deletion src/DurableTask.Core/TaskEntityDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
var entityStatus = new EntityStatus()
{
EntityExists = schedulerState.EntityExists,
QueueSize = schedulerState.Queue?.Count ?? 0,
BacklogQueueSize = schedulerState.Queue?.Count ?? 0,
LockedBy = schedulerState.LockedBy,
};
var serializedEntityStatus = JsonConvert.SerializeObject(entityStatus, Serializer.InternalSerializerSettings);
Expand Down