Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 25 additions & 18 deletions temporal-sdk/src/main/java/io/temporal/client/UpdateOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public static <T> UpdateOptions.Builder<T> newBuilder(Class<T> resultClass) {
return new UpdateOptions.Builder<T>().setResultClass(resultClass);
}

public static UpdateOptions.Builder newBuilder(UpdateOptions options) {
return new UpdateOptions.Builder(options);
public static <T> UpdateOptions.Builder<T> newBuilder(UpdateOptions<T> options) {
return new UpdateOptions.Builder<T>(options);
}

public static UpdateOptions getDefaultInstance() {
Expand All @@ -50,21 +50,21 @@ public static UpdateOptions getDefaultInstance() {
private final String updateName;
private final String updateId;
private final String firstExecutionRunId;
private final WorkflowUpdateStage waitPolicy;
private final WorkflowUpdateStage waitForStage;
private final Class<T> resultClass;
private final Type resultType;

private UpdateOptions(
String updateName,
String updateId,
String firstExecutionRunId,
WorkflowUpdateStage waitPolicy,
WorkflowUpdateStage waitForStage,
Class<T> resultClass,
Type resultType) {
this.updateName = updateName;
this.updateId = updateId;
this.firstExecutionRunId = firstExecutionRunId;
this.waitPolicy = waitPolicy;
this.waitForStage = waitForStage;
this.resultClass = resultClass;
this.resultType = resultType;
}
Expand All @@ -81,8 +81,8 @@ public String getFirstExecutionRunId() {
return firstExecutionRunId;
}

public WorkflowUpdateStage getWaitPolicy() {
return waitPolicy;
public WorkflowUpdateStage getWaitForStage() {
return waitForStage;
}

public Class<T> getResultClass() {
Expand All @@ -105,15 +105,15 @@ public boolean equals(Object o) {
return Objects.equal(updateName, that.updateName)
&& updateId == that.updateId
&& firstExecutionRunId == that.firstExecutionRunId
&& waitPolicy.equals(that.waitPolicy)
&& waitForStage.equals(that.waitForStage)
&& resultClass.equals(that.resultClass)
&& resultType.equals(that.resultType);
}

@Override
public int hashCode() {
return Objects.hashCode(
updateName, updateId, firstExecutionRunId, waitPolicy, resultClass, resultType);
updateName, updateId, firstExecutionRunId, waitForStage, resultClass, resultType);
}

@Override
Expand All @@ -125,8 +125,8 @@ public String toString() {
+ updateId
+ ", firstExecutionRunId="
+ firstExecutionRunId
+ ", waitPolicy="
+ waitPolicy
+ ", waitForStage="
+ waitForStage
+ ", resultClass="
+ resultClass
+ ", resultType='"
Expand All @@ -146,13 +146,19 @@ public void validate() {
if (resultClass == null) {
throw new IllegalStateException("resultClass must not be null");
}
if (waitForStage == null) {
throw new IllegalStateException("waitForStage must not be null");
}
if (waitForStage.equals(WorkflowUpdateStage.ADMITTED)) {
throw new IllegalStateException("waitForStage cannot be ADMITTED");
}
}

public static final class Builder<T> {
private String updateName;
private String updateId;
private String firstExecutionRunId;
private WorkflowUpdateStage waitPolicy;
private WorkflowUpdateStage waitForStage;
private Class<T> resultClass;
private Type resultType;

Expand All @@ -165,7 +171,7 @@ private Builder(UpdateOptions<T> options) {
this.updateName = options.updateName;
this.updateId = options.updateId;
this.firstExecutionRunId = options.firstExecutionRunId;
this.waitPolicy = options.waitPolicy;
this.waitForStage = options.waitForStage;
this.resultClass = options.resultClass;
this.resultType = options.resultType;
}
Expand Down Expand Up @@ -200,16 +206,17 @@ public Builder<T> setFirstExecutionRunId(String firstExecutionRunId) {

/**
* Specifies at what point in the update request life cycles this request should return.
*
* <p>Default value if not set: <b>Accepted</b>
* Required to be set to one of the following values:
*
* <ul>
* <li><b>Accepted</b> Wait for the update to be accepted by the workflow.
* <li><b>Completed</b> Wait for the update to be completed by the workflow.
* </ul>
*
* Admitted is not allowed as a value.
*/
public Builder<T> setWaitPolicy(WorkflowUpdateStage waitPolicy) {
this.waitPolicy = waitPolicy;
public Builder<T> setWaitForStage(WorkflowUpdateStage waitForStage) {
this.waitForStage = waitForStage;
return this;
}

Expand Down Expand Up @@ -239,7 +246,7 @@ public UpdateOptions<T> build() {
updateName,
updateId,
firstExecutionRunId == null ? "" : firstExecutionRunId,
waitPolicy == null ? WorkflowUpdateStage.ACCEPTED : waitPolicy,
waitForStage,
resultClass,
resultType == null ? resultClass : resultType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,20 @@ static <T> WorkflowStub fromTyped(T typed) {

/**
* Asynchronously update a workflow execution by invoking its update handler and returning a
* handle to the update request. Usually a update handler is a method annotated with {@link
* handle to the update request. Usually an update handler is a method annotated with {@link
* io.temporal.workflow.UpdateMethod}.
*
* @param updateName name of the update handler. Usually it is a method name.
* @param waitForStage stage to wait for before returning the update handle. Admitted is not
* allowed as a value.
* @param resultClass class of the update return value
* @param <R> type of the update return value
* @param args update method arguments
* @return update handle that can be used to get the result of the update.
*/
@Experimental
<R> UpdateHandle<R> startUpdate(String updateName, Class<R> resultClass, Object... args);
<R> UpdateHandle<R> startUpdate(
String updateName, WorkflowUpdateStage waitForStage, Class<R> resultClass, Object... args);

/**
* Asynchronously update a workflow execution by invoking its update handler and returning a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public <R> R update(String updateName, Class<R> resultClass, Object... args) {
UpdateOptions<R> options =
UpdateOptions.<R>newBuilder()
.setUpdateName(updateName)
.setWaitPolicy(WorkflowUpdateStage.COMPLETED)
.setWaitForStage(WorkflowUpdateStage.COMPLETED)
.setResultClass(resultClass)
.build();
return startUpdate(options, args).getResultAsync().get();
Expand All @@ -312,11 +312,12 @@ public <R> R update(String updateName, Class<R> resultClass, Object... args) {
}

@Override
public <R> UpdateHandle<R> startUpdate(String updateName, Class<R> resultClass, Object... args) {
public <R> UpdateHandle<R> startUpdate(
String updateName, WorkflowUpdateStage waitForStage, Class<R> resultClass, Object... args) {
UpdateOptions<R> options =
UpdateOptions.<R>newBuilder()
.setUpdateName(updateName)
.setWaitPolicy(WorkflowUpdateStage.ACCEPTED)
.setWaitForStage(waitForStage)
.setResultClass(resultClass)
.setResultType(resultClass)
.build();
Expand All @@ -342,7 +343,7 @@ public <R> UpdateHandle<R> startUpdate(UpdateOptions<R> options, Object... args)
options.getResultType(),
options.getFirstExecutionRunId(),
WaitPolicy.newBuilder()
.setLifecycleStage(options.getWaitPolicy().getProto())
.setLifecycleStage(options.getWaitForStage().getProto())
.build()));

if (result.hasResult()) {
Expand All @@ -360,7 +361,7 @@ public <R> UpdateHandle<R> startUpdate(UpdateOptions<R> options, Object... args)
result.getReference().getWorkflowExecution(),
options.getResultClass(),
options.getResultType());
if (options.getWaitPolicy() == WorkflowUpdateStage.COMPLETED) {
if (options.getWaitForStage() == WorkflowUpdateStage.COMPLETED) {
// Don't return the handle until completed, since that's what's been asked for
handle.waitCompleted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ public void updateNonExistentWorkflowUntyped() {

assertThrows(
WorkflowNotFoundException.class,
() -> workflowStub.startUpdate("update", Void.class, "some-value"));
() ->
workflowStub.startUpdate(
"update", WorkflowUpdateStage.ACCEPTED, Void.class, "some-value"));
}

@Test
Expand Down Expand Up @@ -103,7 +105,9 @@ public void updateCompletedWorkflowUntyped() {

assertThrows(
WorkflowNotFoundException.class,
() -> workflowStub.startUpdate("update", Void.class, "some-value"));
() ->
workflowStub.startUpdate(
"update", WorkflowUpdateStage.ACCEPTED, Void.class, "some-value"));
}

@Test
Expand Down Expand Up @@ -131,6 +135,7 @@ public void updateWorkflowDuplicateId() throws ExecutionException, InterruptedEx
.setUpdateName("update")
.setUpdateId(updateId)
.setFirstExecutionRunId(execution.getRunId())
.setWaitForStage(WorkflowUpdateStage.ACCEPTED)
.build(),
0,
"some-value")
Expand All @@ -146,6 +151,7 @@ public void updateWorkflowDuplicateId() throws ExecutionException, InterruptedEx
.setUpdateName("update")
.setUpdateId(updateId)
.setFirstExecutionRunId(execution.getRunId())
.setWaitForStage(WorkflowUpdateStage.ACCEPTED)
.build(),
"some-other-value")
.getResultAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.temporal.client.UpdateHandle;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowStub;
import io.temporal.client.WorkflowUpdateStage;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.*;
Expand Down Expand Up @@ -62,7 +63,8 @@ public void closeWorkflowWhileUpdateIsRunning() throws ExecutionException, Inter
SDKTestWorkflowRule.waitForOKQuery(workflowStub);
// Send an update that is accepted, but will not complete.
UpdateHandle<String> handle =
workflowStub.startUpdate("update", String.class, 10_000, "some-value");
workflowStub.startUpdate(
"update", WorkflowUpdateStage.ACCEPTED, String.class, 10_000, "some-value");

// Complete workflow, since the update is accepted it will not block completion
workflowStub.update("complete", void.class);
Expand All @@ -81,7 +83,8 @@ public void LongRunningWorkflowUpdateId() throws ExecutionException, Interrupted
workflowStub.start();
SDKTestWorkflowRule.waitForOKQuery(workflowStub);
UpdateHandle<String> handle =
workflowStub.startUpdate("update", String.class, 65_000, "some-value");
workflowStub.startUpdate(
"update", WorkflowUpdateStage.ACCEPTED, String.class, 65_000, "some-value");

assertEquals("some-value", handle.getResultAsync().get());
workflowStub.update("complete", void.class);
Expand All @@ -101,7 +104,8 @@ public void WorkflowUpdateGetResultTimeout() throws ExecutionException, Interrup
SDKTestWorkflowRule.waitForOKQuery(workflowStub);

UpdateHandle<String> handle =
workflowStub.startUpdate("update", String.class, 10_000, "some-value");
workflowStub.startUpdate(
"update", WorkflowUpdateStage.ACCEPTED, String.class, 10_000, "some-value");

CompletableFuture<String> result = handle.getResultAsync(2, TimeUnit.SECONDS);
// Verify get throws the correct exception in around the right amount of time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ public void testUpdateUntyped() throws ExecutionException, InterruptedException
// send an update through the sync path
assertEquals("Execute-Hello", workflowStub.update("update", String.class, 0, "Hello"));
// send an update through the async path
UpdateHandle<String> updateRef = workflowStub.startUpdate("update", String.class, 0, "World");
UpdateHandle<String> updateRef =
workflowStub.startUpdate("update", WorkflowUpdateStage.ACCEPTED, String.class, 0, "World");
assertEquals("Execute-World", updateRef.getResultAsync().get());
// send a bad update that will be rejected through the sync path
assertThrows(
Expand All @@ -133,7 +134,9 @@ public void testUpdateUntyped() throws ExecutionException, InterruptedException
// send a bad update that will be rejected through the sync path
assertThrows(
WorkflowUpdateException.class,
() -> workflowStub.startUpdate("update", String.class, 0, "Bad Update"));
() ->
workflowStub.startUpdate(
"update", WorkflowUpdateStage.ACCEPTED, String.class, 0, "Bad Update"));

workflowStub.update("complete", void.class);

Expand Down Expand Up @@ -163,7 +166,10 @@ public void testUpdateHandleNotReturnedUntilCompleteWhenAsked()
() -> {
UpdateHandle<String> handle =
workflowStub.startUpdate(
UpdateOptions.newBuilder(String.class).setUpdateName("update").build(),
UpdateOptions.newBuilder(String.class)
.setUpdateName("update")
.setWaitForStage(WorkflowUpdateStage.COMPLETED)
.build(),
"Enchi");
updateCompletedLast.set(true);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testContinueAsNewInAUpdate() {

// Send an update to continue as new, must be async since the update won't complete
WorkflowStub workflowStub = WorkflowStub.fromTyped(workflow);
workflowStub.startUpdate("update", String.class, 0, "");
workflowStub.startUpdate("update", WorkflowUpdateStage.ACCEPTED, String.class, 0, "");

testWorkflowRule.waitForTheEndOfWFT(execution.getWorkflowId());
testWorkflowRule.invalidateWorkflowCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
package io.temporal.testing;

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.UpdateHandle;
import io.temporal.client.UpdateOptions;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.client.*;
import io.temporal.common.interceptors.WorkflowClientInterceptorBase;
import io.temporal.serviceclient.TestServiceStubs;
import java.lang.reflect.Type;
Expand Down Expand Up @@ -240,8 +237,8 @@ public <R> R update(String updateName, Class<R> resultClass, Object... args) {

@Override
public <R> UpdateHandle<R> startUpdate(
String updateName, Class<R> resultClass, Object... args) {
return next.startUpdate(updateName, resultClass, args);
String updateName, WorkflowUpdateStage waitForStage, Class<R> resultClass, Object... args) {
return next.startUpdate(updateName, waitForStage, resultClass, args);
}

@Override
Expand Down