diff --git a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/DurableClientContext.java b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/DurableClientContext.java
index a952db68..2790bccd 100644
--- a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/DurableClientContext.java
+++ b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/DurableClientContext.java
@@ -22,8 +22,13 @@
/**
* The binding value type for the {@literal @}DurableClientInput parameter.
+ *
+ * This class implements {@link AutoCloseable} to ensure proper cleanup of underlying gRPC resources.
+ * When used with the Azure Functions Java worker, the worker should call {@link #close()} after the
+ * function invocation completes to release network resources and prevent channel leak warnings.
+ *
*/
-public class DurableClientContext {
+public class DurableClientContext implements AutoCloseable {
// These fields are populated via GSON deserialization by the Functions Java worker.
private String rpcBaseUrl;
private String taskHubName;
@@ -44,7 +49,12 @@ public String getTaskHubName() {
*
* @return the Durable Task client object associated with the current function invocation.
*/
- public DurableTaskClient getClient() {
+ public synchronized DurableTaskClient getClient() {
+ // Return existing client if already initialized
+ if (this.client != null) {
+ return this.client;
+ }
+
if (this.rpcBaseUrl == null || this.rpcBaseUrl.length() == 0) {
throw new IllegalStateException("The client context wasn't populated with an RPC base URL!");
}
@@ -78,12 +88,10 @@ public HttpResponseMessage waitForCompletionOrCreateCheckStatusResponse(
HttpRequestMessage> request,
String instanceId,
Duration timeout) {
- if (this.client == null) {
- this.client = getClient();
- }
+ DurableTaskClient client = getClient();
OrchestrationMetadata orchestration;
try {
- orchestration = this.client.waitForInstanceCompletion(instanceId, timeout, true);
+ orchestration = client.waitForInstanceCompletion(instanceId, timeout, true);
return request.createResponseBuilder(HttpStatus.ACCEPTED)
.header("Content-Type", "application/json")
.body(orchestration.getSerializedOutput())
@@ -148,4 +156,20 @@ private String getInstanceStatusURL(HttpRequestMessage> request, String instan
return baseUrl + "/runtime/webhooks/durabletask/instances/" + encodedInstanceId;
}
+
+ /**
+ * Closes the underlying durable task client and releases any associated network resources.
+ *
+ * This method should be called when the function invocation is complete to prevent gRPC channel leaks.
+ * If no client has been created yet (i.e., {@link #getClient()} was never called), this method does nothing.
+ * This method is idempotent and can be called multiple times safely.
+ *
+ */
+ @Override
+ public synchronized void close() {
+ if (this.client != null) {
+ this.client.close();
+ this.client = null;
+ }
+ }
}
diff --git a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH
index 3d3f9e98..fdb90d6a 100644
--- a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH
+++ b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH
@@ -1 +1 @@
-fbe5bb20835678099fc51a44993ed9b045dee5a6
\ No newline at end of file
+026329c53fe6363985655857b9ca848ec7238bd2
\ No newline at end of file
diff --git a/internal/durabletask-protobuf/protos/orchestrator_service.proto b/internal/durabletask-protobuf/protos/orchestrator_service.proto
index 88928c3b..8ef46a4a 100644
--- a/internal/durabletask-protobuf/protos/orchestrator_service.proto
+++ b/internal/durabletask-protobuf/protos/orchestrator_service.proto
@@ -41,6 +41,7 @@ message TaskFailureDetails {
google.protobuf.StringValue stackTrace = 3;
TaskFailureDetails innerFailure = 4;
bool isNonRetriable = 5;
+ map properties = 6;
}
enum OrchestrationStatus {
@@ -95,6 +96,7 @@ message TaskScheduledEvent {
google.protobuf.StringValue version = 2;
google.protobuf.StringValue input = 3;
TraceContext parentTraceContext = 4;
+ map tags = 5;
}
message TaskCompletedEvent {
@@ -113,6 +115,7 @@ message SubOrchestrationInstanceCreatedEvent {
google.protobuf.StringValue version = 3;
google.protobuf.StringValue input = 4;
TraceContext parentTraceContext = 5;
+ map tags = 6;
}
message SubOrchestrationInstanceCompletedEvent {
@@ -192,7 +195,7 @@ message EntityOperationCalledEvent {
}
message EntityLockRequestedEvent {
- string criticalSectionId = 1;
+ string criticalSectionId = 1;
repeated string lockSet = 2;
int32 position = 3;
google.protobuf.StringValue parentInstanceId = 4; // used only within messages, null in histories
@@ -217,7 +220,19 @@ message EntityUnlockSentEvent {
message EntityLockGrantedEvent {
string criticalSectionId = 1;
}
-
+
+message ExecutionRewoundEvent {
+ google.protobuf.StringValue reason = 1;
+ google.protobuf.StringValue parentExecutionId = 2; // used only for rewinding suborchestrations, null otherwise
+ google.protobuf.StringValue instanceId = 3; // used only for rewinding suborchestrations, null otherwise
+ TraceContext parentTraceContext = 4; // used only for rewinding suborchestrations, null otherwise
+ google.protobuf.StringValue name = 5; // used by DTS backend only
+ google.protobuf.StringValue version = 6; // used by DTS backend only
+ google.protobuf.StringValue input = 7; // used by DTS backend only
+ ParentInstanceInfo parentInstance = 8; // used by DTS backend only
+ map tags = 9; // used by DTS backend only
+}
+
message HistoryEvent {
int32 eventId = 1;
google.protobuf.Timestamp timestamp = 2;
@@ -244,11 +259,12 @@ message HistoryEvent {
ExecutionResumedEvent executionResumed = 22;
EntityOperationSignaledEvent entityOperationSignaled = 23;
EntityOperationCalledEvent entityOperationCalled = 24;
- EntityOperationCompletedEvent entityOperationCompleted = 25;
- EntityOperationFailedEvent entityOperationFailed = 26;
+ EntityOperationCompletedEvent entityOperationCompleted = 25;
+ EntityOperationFailedEvent entityOperationFailed = 26;
EntityLockRequestedEvent entityLockRequested = 27;
EntityLockGrantedEvent entityLockGranted = 28;
EntityUnlockSentEvent entityUnlockSent = 29;
+ ExecutionRewoundEvent executionRewound = 30;
}
}
@@ -256,6 +272,8 @@ message ScheduleTaskAction {
string name = 1;
google.protobuf.StringValue version = 2;
google.protobuf.StringValue input = 3;
+ map tags = 4;
+ TraceContext parentTraceContext = 5;
}
message CreateSubOrchestrationAction {
@@ -263,6 +281,8 @@ message CreateSubOrchestrationAction {
string name = 2;
google.protobuf.StringValue version = 3;
google.protobuf.StringValue input = 4;
+ TraceContext parentTraceContext = 5;
+ map tags = 6;
}
message CreateTimerAction {
@@ -282,6 +302,7 @@ message CompleteOrchestrationAction {
google.protobuf.StringValue newVersion = 4;
repeated HistoryEvent carryoverEvents = 5;
TaskFailureDetails failureDetails = 6;
+ map tags = 7;
}
message TerminateOrchestrationAction {
@@ -312,6 +333,11 @@ message OrchestratorAction {
}
}
+message OrchestrationTraceContext {
+ google.protobuf.StringValue spanID = 1;
+ google.protobuf.Timestamp spanStartTime = 2;
+}
+
message OrchestratorRequest {
string instanceId = 1;
google.protobuf.StringValue executionId = 2;
@@ -320,6 +346,8 @@ message OrchestratorRequest {
OrchestratorEntityParameters entityParameters = 5;
bool requiresHistoryStreaming = 6;
map properties = 7;
+
+ OrchestrationTraceContext orchestrationTraceContext = 8;
}
message OrchestratorResponse {
@@ -331,6 +359,17 @@ message OrchestratorResponse {
// The number of work item events that were processed by the orchestrator.
// This field is optional. If not set, the service should assume that the orchestrator processed all events.
google.protobuf.Int32Value numEventsProcessed = 5;
+ OrchestrationTraceContext orchestrationTraceContext = 6;
+
+ // Whether or not a history is required to complete the original OrchestratorRequest and none was provided.
+ bool requiresHistory = 7;
+
+ // True if this is a partial (chunked) completion. The backend must keep the work item open until the final chunk (isPartial=false).
+ bool isPartial = 8;
+
+ // Zero-based position of the current chunk within a chunked completion sequence.
+ // This field is omitted for non-chunked completions.
+ google.protobuf.Int32Value chunkIndex = 9;
}
message CreateInstanceRequest {
@@ -343,6 +382,7 @@ message CreateInstanceRequest {
google.protobuf.StringValue executionId = 7;
map tags = 8;
TraceContext parentTraceContext = 9;
+ google.protobuf.Timestamp requestTime = 10;
}
message OrchestrationIdReusePolicy {
@@ -449,12 +489,28 @@ message QueryInstancesResponse {
google.protobuf.StringValue continuationToken = 2;
}
+message ListInstanceIdsRequest {
+ repeated OrchestrationStatus runtimeStatus = 1;
+ google.protobuf.Timestamp completedTimeFrom = 2;
+ google.protobuf.Timestamp completedTimeTo = 3;
+ int32 pageSize = 4;
+ google.protobuf.StringValue lastInstanceKey = 5;
+}
+
+message ListInstanceIdsResponse {
+ repeated string instanceIds = 1;
+ google.protobuf.StringValue lastInstanceKey = 2;
+}
+
message PurgeInstancesRequest {
oneof request {
string instanceId = 1;
PurgeInstanceFilter purgeInstanceFilter = 2;
+ InstanceBatch instanceBatch = 4;
}
bool recursive = 3;
+ // used in the case when an instanceId is specified to determine if the purge request is for an orchestration (as opposed to an entity)
+ bool isOrchestration = 5;
}
message PurgeInstanceFilter {
@@ -468,6 +524,15 @@ message PurgeInstancesResponse {
google.protobuf.BoolValue isComplete = 2;
}
+message RestartInstanceRequest {
+ string instanceId = 1;
+ bool restartWithNewInstanceId = 2;
+}
+
+message RestartInstanceResponse {
+ string instanceId = 1;
+}
+
message CreateTaskHubRequest {
bool recreateIfExists = 1;
}
@@ -490,10 +555,12 @@ message SignalEntityRequest {
google.protobuf.StringValue input = 3;
string requestId = 4;
google.protobuf.Timestamp scheduledTime = 5;
+ TraceContext parentTraceContext = 6;
+ google.protobuf.Timestamp requestTime = 7;
}
message SignalEntityResponse {
- // no payload
+ // no payload
}
message GetEntityRequest {
@@ -553,6 +620,7 @@ message EntityBatchRequest {
string instanceId = 1;
google.protobuf.StringValue entityState = 2;
repeated OperationRequest operations = 3;
+ map properties = 4;
}
message EntityBatchResult {
@@ -562,6 +630,8 @@ message EntityBatchResult {
TaskFailureDetails failureDetails = 4;
string completionToken = 5;
repeated OperationInfo operationInfos = 6; // used only with DTS
+ // Whether or not an entity state is required to complete the original EntityBatchRequest and none was provided.
+ bool requiresState = 7;
}
message EntityRequest {
@@ -575,6 +645,7 @@ message OperationRequest {
string operation = 1;
string requestId = 2;
google.protobuf.StringValue input = 3;
+ TraceContext traceContext = 4;
}
message OperationResult {
@@ -591,10 +662,14 @@ message OperationInfo {
message OperationResultSuccess {
google.protobuf.StringValue result = 1;
+ google.protobuf.Timestamp startTimeUtc = 2;
+ google.protobuf.Timestamp endTimeUtc = 3;
}
message OperationResultFailure {
TaskFailureDetails failureDetails = 1;
+ google.protobuf.Timestamp startTimeUtc = 2;
+ google.protobuf.Timestamp endTimeUtc = 3;
}
message OperationAction {
@@ -610,6 +685,8 @@ message SendSignalAction {
string name = 2;
google.protobuf.StringValue input = 3;
google.protobuf.Timestamp scheduledTime = 4;
+ google.protobuf.Timestamp requestTime = 5;
+ TraceContext parentTraceContext = 6;
}
message StartNewOrchestrationAction {
@@ -618,6 +695,8 @@ message StartNewOrchestrationAction {
google.protobuf.StringValue version = 3;
google.protobuf.StringValue input = 4;
google.protobuf.Timestamp scheduledTime = 5;
+ google.protobuf.Timestamp requestTime = 6;
+ TraceContext parentTraceContext = 7;
}
message AbandonActivityTaskRequest {
@@ -644,6 +723,17 @@ message AbandonEntityTaskResponse {
// Empty.
}
+message SkipGracefulOrchestrationTerminationsRequest {
+ InstanceBatch instanceBatch = 1;
+ google.protobuf.StringValue reason = 2;
+}
+
+message SkipGracefulOrchestrationTerminationsResponse {
+ // Those instances which could not be terminated because they had locked entities at the time of this termination call,
+ // are already in a terminal state (completed, failed, terminated, etc.), are not orchestrations, or do not exist (i.e. have been purged)
+ repeated string unterminatedInstanceIds = 1;
+}
+
service TaskHubSidecarService {
// Sends a hello request to the sidecar service.
rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty);
@@ -657,18 +747,21 @@ service TaskHubSidecarService {
// Rewinds an orchestration instance to last known good state and replays from there.
rpc RewindInstance(RewindInstanceRequest) returns (RewindInstanceResponse);
+ // Restarts an orchestration instance.
+ rpc RestartInstance(RestartInstanceRequest) returns (RestartInstanceResponse);
+
// Waits for an orchestration instance to reach a running or completion state.
rpc WaitForInstanceStart(GetInstanceRequest) returns (GetInstanceResponse);
-
+
// Waits for an orchestration instance to reach a completion state (completed, failed, terminated, etc.).
rpc WaitForInstanceCompletion(GetInstanceRequest) returns (GetInstanceResponse);
// Raises an event to a running orchestration instance.
rpc RaiseEvent(RaiseEventRequest) returns (RaiseEventResponse);
-
+
// Terminates a running orchestration instance.
rpc TerminateInstance(TerminateRequest) returns (TerminateResponse);
-
+
// Suspends a running orchestration instance.
rpc SuspendInstance(SuspendRequest) returns (SuspendResponse);
@@ -678,6 +771,9 @@ service TaskHubSidecarService {
// rpc DeleteInstance(DeleteInstanceRequest) returns (DeleteInstanceResponse);
rpc QueryInstances(QueryInstancesRequest) returns (QueryInstancesResponse);
+
+ rpc ListInstanceIds(ListInstanceIdsRequest) returns (ListInstanceIdsResponse);
+
rpc PurgeInstances(PurgeInstancesRequest) returns (PurgeInstancesResponse);
rpc GetWorkItems(GetWorkItemsRequest) returns (stream WorkItem);
@@ -714,6 +810,10 @@ service TaskHubSidecarService {
// Abandon an entity work item
rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse);
+
+ // "Skip" graceful termination of orchestrations by immediately changing their status in storage to "terminated".
+ // Note that a maximum of 500 orchestrations can be terminated at a time using this method.
+ rpc SkipGracefulOrchestrationTerminations(SkipGracefulOrchestrationTerminationsRequest) returns (SkipGracefulOrchestrationTerminationsResponse);
}
message GetWorkItemsRequest {
@@ -732,6 +832,16 @@ enum WorkerCapability {
// When set, the service may return work items without any history events as an optimization.
// It is strongly recommended that all SDKs support this capability.
WORKER_CAPABILITY_HISTORY_STREAMING = 1;
+
+ // Indicates that the worker supports scheduled tasks.
+ // The service may send schedule-triggered orchestration work items,
+ // and the worker must handle them, including the scheduledTime field.
+ WORKER_CAPABILITY_SCHEDULED_TASKS = 2;
+
+ // Signals that the worker can handle large payloads stored externally (e.g., Blob Storage).
+ // Work items may contain URI references instead of inline data, and the worker must fetch them.
+ // This avoids message size limits and reduces network overhead.
+ WORKER_CAPABILITY_LARGE_PAYLOADS = 3;
}
message WorkItem {
@@ -750,7 +860,7 @@ message CompleteTaskResponse {
}
message HealthPing {
- // No payload
+ // No payload
}
message StreamInstanceHistoryRequest {
@@ -764,3 +874,8 @@ message StreamInstanceHistoryRequest {
message HistoryChunk {
repeated HistoryEvent events = 1;
}
+
+message InstanceBatch {
+ // A maximum of 500 instance IDs can be provided in this list.
+ repeated string instanceIds = 1;
+}