Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add tests for this change?

Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@

/**
* The binding value type for the {@literal @}DurableClientInput parameter.
* <p>
* This class implements {@link AutoCloseable} to ensure proper cleanup of underlying gRPC resources.
* When used with the Azure Functions Java worker, the worker should call {@link #close()} after the
* function invocation completes to release network resources and prevent channel leak warnings.
* </p>
*/
public class DurableClientContext {
public class DurableClientContext implements AutoCloseable {
// These fields are populated via GSON deserialization by the Functions Java worker.
private String rpcBaseUrl;
private String taskHubName;
Expand All @@ -44,7 +49,12 @@ public String getTaskHubName() {
*
* @return the Durable Task client object associated with the current function invocation.
*/
public DurableTaskClient getClient() {
public synchronized DurableTaskClient getClient() {
// Return existing client if already initialized
if (this.client != null) {
return this.client;
}

if (this.rpcBaseUrl == null || this.rpcBaseUrl.length() == 0) {
throw new IllegalStateException("The client context wasn't populated with an RPC base URL!");
}
Expand Down Expand Up @@ -78,12 +88,10 @@ public HttpResponseMessage waitForCompletionOrCreateCheckStatusResponse(
HttpRequestMessage<?> request,
String instanceId,
Duration timeout) {
if (this.client == null) {
this.client = getClient();
}
DurableTaskClient client = getClient();
OrchestrationMetadata orchestration;
try {
orchestration = this.client.waitForInstanceCompletion(instanceId, timeout, true);
orchestration = client.waitForInstanceCompletion(instanceId, timeout, true);
return request.createResponseBuilder(HttpStatus.ACCEPTED)
.header("Content-Type", "application/json")
.body(orchestration.getSerializedOutput())
Expand Down Expand Up @@ -148,4 +156,20 @@ private String getInstanceStatusURL(HttpRequestMessage<?> request, String instan

return baseUrl + "/runtime/webhooks/durabletask/instances/" + encodedInstanceId;
}

/**
* Closes the underlying durable task client and releases any associated network resources.
* <p>
* This method should be called when the function invocation is complete to prevent gRPC channel leaks.
* If no client has been created yet (i.e., {@link #getClient()} was never called), this method does nothing.
* This method is idempotent and can be called multiple times safely.
* </p>
*/
@Override
public synchronized void close() {
if (this.client != null) {
this.client.close();
this.client = null;
}
}
}
2 changes: 1 addition & 1 deletion internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH
Original file line number Diff line number Diff line change
@@ -1 +1 @@
fbe5bb20835678099fc51a44993ed9b045dee5a6
026329c53fe6363985655857b9ca848ec7238bd2
Loading
Loading