diff --git a/.github/workflows/features.yml b/.github/workflows/features.yml index c734c6ac4e..4e27ad44b1 100644 --- a/.github/workflows/features.yml +++ b/.github/workflows/features.yml @@ -3,8 +3,9 @@ on: [push, pull_request] jobs: features-test: - uses: temporalio/features/.github/workflows/java.yaml@main + uses: temporalio/features/.github/workflows/java.yaml@java-breaking-update with: java-repo-path: ${{github.event.pull_request.head.repo.full_name}} version: ${{github.event.pull_request.head.ref}} version-is-repo-ref: true + features-repo-ref: java-breaking-update \ No newline at end of file diff --git a/temporal-sdk/src/main/java/io/temporal/client/LazyUpdateHandleImpl.java b/temporal-sdk/src/main/java/io/temporal/client/LazyUpdateHandleImpl.java index 671bf034a5..5143bece02 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/LazyUpdateHandleImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/LazyUpdateHandleImpl.java @@ -42,6 +42,7 @@ final class LazyUpdateHandleImpl implements UpdateHandle { private final WorkflowExecution execution; private final Class resultClass; private final Type resultType; + private WorkflowClientCallsInterceptor.PollWorkflowUpdateOutput waitCompletedPollCall; LazyUpdateHandleImpl( WorkflowClientCallsInterceptor workflowClientInvoker, @@ -72,12 +73,23 @@ public String getId() { @Override public CompletableFuture getResultAsync(long timeout, TimeUnit unit) { - WorkflowClientCallsInterceptor.PollWorkflowUpdateOutput output = - workflowClientInvoker.pollWorkflowUpdate( - new WorkflowClientCallsInterceptor.PollWorkflowUpdateInput<>( - execution, updateName, id, resultClass, resultType, timeout, unit)); - return output + WorkflowClientCallsInterceptor.PollWorkflowUpdateOutput pollCall = null; + boolean setFromWaitCompleted = false; + + // If waitCompleted was called, use the result from that call. + synchronized (this) { + if (waitCompletedPollCall != null) { + pollCall = waitCompletedPollCall; + waitCompletedPollCall = null; + } + } + + if (!setFromWaitCompleted) { + pollCall = pollUntilComplete(timeout, unit); + } + + return pollCall .getResult() .exceptionally( failure -> { @@ -109,4 +121,17 @@ public CompletableFuture getResultAsync(long timeout, TimeUnit unit) { public CompletableFuture getResultAsync() { return this.getResultAsync(Long.MAX_VALUE, TimeUnit.MILLISECONDS); } + + // Can be called immediately after initialization to wait for the update to be completed, but + // still have the result be returned by getResultAsync. + void waitCompleted() { + waitCompletedPollCall = pollUntilComplete(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } + + private WorkflowClientCallsInterceptor.PollWorkflowUpdateOutput pollUntilComplete( + long timeout, TimeUnit unit) { + return workflowClientInvoker.pollWorkflowUpdate( + new WorkflowClientCallsInterceptor.PollWorkflowUpdateInput<>( + execution, updateName, id, resultClass, resultType, timeout, unit)); + } } diff --git a/temporal-sdk/src/main/java/io/temporal/client/UpdateOptions.java b/temporal-sdk/src/main/java/io/temporal/client/UpdateOptions.java index febfc2d5b2..56ebf3ede6 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/UpdateOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/client/UpdateOptions.java @@ -50,7 +50,7 @@ public static UpdateOptions getDefaultInstance() { private final String updateName; private final String updateId; private final String firstExecutionRunId; - private final UpdateWaitPolicy waitPolicy; + private final WorkflowUpdateStage waitPolicy; private final Class resultClass; private final Type resultType; @@ -58,7 +58,7 @@ private UpdateOptions( String updateName, String updateId, String firstExecutionRunId, - UpdateWaitPolicy waitPolicy, + WorkflowUpdateStage waitPolicy, Class resultClass, Type resultType) { this.updateName = updateName; @@ -81,7 +81,7 @@ public String getFirstExecutionRunId() { return firstExecutionRunId; } - public UpdateWaitPolicy getWaitPolicy() { + public WorkflowUpdateStage getWaitPolicy() { return waitPolicy; } @@ -152,7 +152,7 @@ public static final class Builder { private String updateName; private String updateId; private String firstExecutionRunId; - private UpdateWaitPolicy waitPolicy; + private WorkflowUpdateStage waitPolicy; private Class resultClass; private Type resultType; @@ -208,7 +208,7 @@ public Builder setFirstExecutionRunId(String firstExecutionRunId) { *
  • Completed Wait for the update to be completed by the workflow. * */ - public Builder setWaitPolicy(UpdateWaitPolicy waitPolicy) { + public Builder setWaitPolicy(WorkflowUpdateStage waitPolicy) { this.waitPolicy = waitPolicy; return this; } @@ -239,7 +239,7 @@ public UpdateOptions build() { updateName, updateId, firstExecutionRunId == null ? "" : firstExecutionRunId, - waitPolicy == null ? UpdateWaitPolicy.ACCEPTED : waitPolicy, + waitPolicy == null ? WorkflowUpdateStage.ACCEPTED : waitPolicy, resultClass, resultType == null ? resultClass : resultType); } diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java index 8d37a81e4c..a929826bb5 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java @@ -107,7 +107,8 @@ static WorkflowStub fromTyped(T typed) { /** * Asynchronously update a workflow execution by invoking its update handler and returning a - * handle to the update request. + * handle to the update request. If {@link WorkflowUpdateStage#COMPLETED} is specified, in the + * options, the handle will not be returned until the update is completed. * * @param options options that will be used to configure and start a new update request. * @param args update method arguments diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java index 2a406cf597..40724f2a45 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java @@ -297,7 +297,7 @@ public R update(String updateName, Class resultClass, Object... args) { UpdateOptions options = UpdateOptions.newBuilder() .setUpdateName(updateName) - .setWaitPolicy(UpdateWaitPolicy.COMPLETED) + .setWaitPolicy(WorkflowUpdateStage.COMPLETED) .setResultClass(resultClass) .build(); return startUpdate(options, args).getResultAsync().get(); @@ -316,7 +316,7 @@ public UpdateHandle startUpdate(String updateName, Class resultClass, UpdateOptions options = UpdateOptions.newBuilder() .setUpdateName(updateName) - .setWaitPolicy(UpdateWaitPolicy.ACCEPTED) + .setWaitPolicy(WorkflowUpdateStage.ACCEPTED) .setResultClass(resultClass) .setResultType(resultClass) .build(); @@ -351,14 +351,20 @@ public UpdateHandle startUpdate(UpdateOptions options, Object... args) result.getReference().getWorkflowExecution(), result.getResult()); } else { - return new LazyUpdateHandleImpl<>( - workflowClientInvoker, - workflowType.orElse(null), - options.getUpdateName(), - result.getReference().getUpdateId(), - result.getReference().getWorkflowExecution(), - options.getResultClass(), - options.getResultType()); + LazyUpdateHandleImpl handle = + new LazyUpdateHandleImpl<>( + workflowClientInvoker, + workflowType.orElse(null), + options.getUpdateName(), + result.getReference().getUpdateId(), + result.getReference().getWorkflowExecution(), + options.getResultClass(), + options.getResultType()); + if (options.getWaitPolicy() == WorkflowUpdateStage.COMPLETED) { + // Don't return the handle until completed, since that's what's been asked for + handle.waitCompleted(); + } + return handle; } } catch (Exception e) { Throwable throwable = throwAsWorkflowFailureException(e, targetExecution); diff --git a/temporal-sdk/src/main/java/io/temporal/client/UpdateWaitPolicy.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowUpdateStage.java similarity index 64% rename from temporal-sdk/src/main/java/io/temporal/client/UpdateWaitPolicy.java rename to temporal-sdk/src/main/java/io/temporal/client/WorkflowUpdateStage.java index 883c313e1c..e463aac3e3 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/UpdateWaitPolicy.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowUpdateStage.java @@ -22,8 +22,20 @@ import io.temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage; -public enum UpdateWaitPolicy { - /** Update request waits for the update to be accepted by the workflow */ +public enum WorkflowUpdateStage { + /** + * Update request waits for the update to be until the update request has been admitted by the + * server - it may be the case that due to a considerations like load or resource limits that an + * update is made to wait before the server will indicate that it has been received and will be + * processed. This value does not wait for any sort of acknowledgement from a worker. + */ + ADMITTED( + UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED), + + /** + * Update request waits for the update to be accepted (and validated, if there is a validator) by + * the workflow + */ ACCEPTED( UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED), @@ -33,7 +45,7 @@ public enum UpdateWaitPolicy { private final UpdateWorkflowExecutionLifecycleStage policy; - UpdateWaitPolicy(UpdateWorkflowExecutionLifecycleStage policy) { + WorkflowUpdateStage(UpdateWorkflowExecutionLifecycleStage policy) { this.policy = policy; } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index e55778d6a7..8a337f0249 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -333,9 +333,18 @@ public StartUpdateOutput startUpdate(StartUpdateInput input) { .setFirstExecutionRunId(input.getFirstExecutionRunId()) .setRequest(request) .build(); - Deadline pollTimeoutDeadline = Deadline.after(POLL_UPDATE_TIMEOUT_S, TimeUnit.SECONDS); - UpdateWorkflowExecutionResponse result = - genericClient.update(updateRequest, pollTimeoutDeadline); + + // Re-attempt the update until it is at least accepted, or passes the lifecycle stage specified + // by the user. + UpdateWorkflowExecutionResponse result; + do { + Deadline pollTimeoutDeadline = Deadline.after(POLL_UPDATE_TIMEOUT_S, TimeUnit.SECONDS); + result = genericClient.update(updateRequest, pollTimeoutDeadline); + } while (result.getStage().getNumber() < input.getWaitPolicy().getLifecycleStage().getNumber() + && result.getStage().getNumber() + < UpdateWorkflowExecutionLifecycleStage + .UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED + .getNumber()); if (result.hasOutcome()) { switch (result.getOutcome().getValueCase()) { @@ -399,20 +408,18 @@ public PollWorkflowUpdateOutput pollWorkflowUpdate(PollWorkflowUpdateInpu Deadline pollTimeoutDeadline = Deadline.after(input.getTimeout(), input.getTimeoutUnit()); pollWorkflowUpdateHelper(future, pollUpdateRequest, pollTimeoutDeadline); - return new PollWorkflowUpdateOutput( + return new PollWorkflowUpdateOutput<>( future.thenApply( (result) -> { if (result.hasOutcome()) { switch (result.getOutcome().getValueCase()) { case SUCCESS: Optional updateResult = Optional.of(result.getOutcome().getSuccess()); - R resultValue = - convertResultPayloads( - updateResult, - input.getResultClass(), - input.getResultType(), - dataConverterWithWorkflowContext); - return resultValue; + return convertResultPayloads( + updateResult, + input.getResultClass(), + input.getResultType(), + dataConverterWithWorkflowContext); case FAILURE: throw new WorkflowUpdateException( input.getWorkflowExecution(), @@ -434,31 +441,26 @@ private void pollWorkflowUpdateHelper( CompletableFuture resultCF, PollWorkflowExecutionUpdateRequest request, Deadline deadline) { - - Deadline pollTimeoutDeadline = - Deadline.after(POLL_UPDATE_TIMEOUT_S, TimeUnit.SECONDS).minimum(deadline); genericClient - .pollUpdateAsync(request, pollTimeoutDeadline) + .pollUpdateAsync(request, deadline) .whenComplete( (r, e) -> { + if (e == null && !r.hasOutcome()) { + pollWorkflowUpdateHelper(resultCF, request, deadline); + return; + } if ((e instanceof StatusRuntimeException && ((StatusRuntimeException) e).getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED) - || pollTimeoutDeadline.isExpired() - || (e == null && !r.hasOutcome())) { - // if the request has timed out, stop retrying - if (!deadline.isExpired()) { - pollWorkflowUpdateHelper(resultCF, request, deadline); - } else { - resultCF.completeExceptionally( - new TimeoutException( - "WorkflowId=" - + request.getUpdateRef().getWorkflowExecution().getWorkflowId() - + ", runId=" - + request.getUpdateRef().getWorkflowExecution().getRunId() - + ", updateId=" - + request.getUpdateRef().getUpdateId())); - } + || deadline.isExpired()) { + resultCF.completeExceptionally( + new TimeoutException( + "WorkflowId=" + + request.getUpdateRef().getWorkflowExecution().getWorkflowId() + + ", runId=" + + request.getUpdateRef().getWorkflowExecution().getRunId() + + ", updateId=" + + request.getUpdateRef().getUpdateId())); } else if (e != null) { resultCF.completeExceptionally(e); } else { diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestWorkflows.java b/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestWorkflows.java index 3d9688c984..fca82aacc3 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestWorkflows.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestWorkflows.java @@ -177,16 +177,6 @@ public interface QueryableWorkflow { void mySignal(String value); } - @WorkflowInterface - public interface SimpleWorkflowWithUpdate { - - @WorkflowMethod - String execute(); - - @UpdateMethod - String update(String value); - } - @WorkflowInterface public interface WorkflowWithUpdate { diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java index 979909975c..9d424d37bc 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java @@ -40,6 +40,9 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; @@ -53,7 +56,7 @@ public class UpdateTest { public SDKTestWorkflowRule testWorkflowRule = SDKTestWorkflowRule.newBuilder() .setWorkerOptions(WorkerOptions.newBuilder().build()) - .setWorkflowTypes(TestUpdateWorkflowImpl.class) + .setWorkflowTypes(TestUpdateWorkflowImpl.class, TestWaitingUpdate.class) .setActivityImplementations(new ActivityImpl()) .build(); @@ -107,7 +110,7 @@ public void testUpdateUntyped() throws ExecutionException, InterruptedException workflowType, SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue())); - WorkflowExecution execution = workflowStub.start(); + workflowStub.start(); SDKTestWorkflowRule.waitForOKQuery(workflowStub); assertEquals("initial", workflowStub.query("getState", String.class)); @@ -115,7 +118,7 @@ public void testUpdateUntyped() throws ExecutionException, InterruptedException // send an update through the sync path assertEquals("Execute-Hello", workflowStub.update("update", String.class, 0, "Hello")); // send an update through the async path - UpdateHandle updateRef = workflowStub.startUpdate("update", String.class, 0, "World"); + UpdateHandle updateRef = workflowStub.startUpdate("update", String.class, 0, "World"); assertEquals("Execute-World", updateRef.getResultAsync().get()); // send a bad update that will be rejected through the sync path assertThrows( @@ -137,6 +140,48 @@ public void testUpdateUntyped() throws ExecutionException, InterruptedException assertEquals("Execute-Hello Execute-World", workflowStub.getResult(String.class)); } + @Test + public void testUpdateHandleNotReturnedUntilCompleteWhenAsked() + throws ExecutionException, InterruptedException { + WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + String workflowType = TestWorkflows.WorkflowWithUpdateAndSignal.class.getSimpleName(); + WorkflowStub workflowStub = + workflowClient.newUntypedWorkflowStub( + workflowType, + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue())); + + workflowStub.start(); + + SDKTestWorkflowRule.waitForOKQuery(workflowStub); + assertEquals("initial", workflowStub.query("getState", String.class)); + + // Start the update but verify it does not return the handle until the update is complete + AtomicBoolean updateCompletedLast = new AtomicBoolean(false); + Future asyncUpdate = + Executors.newSingleThreadExecutor() + .submit( + () -> { + UpdateHandle handle = + workflowStub.startUpdate( + UpdateOptions.newBuilder(String.class).setUpdateName("update").build(), + "Enchi"); + updateCompletedLast.set(true); + try { + assertEquals("Enchi", handle.getResultAsync().get()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + workflowStub.signal("signal", "whatever"); + updateCompletedLast.set(false); + + asyncUpdate.get(); + assertTrue(updateCompletedLast.get()); + workflowStub.update("complete", void.class); + workflowStub.getResult(List.class); + } + public static class TestUpdateWorkflowImpl implements WorkflowWithUpdate { String state = "initial"; List updates = new ArrayList<>(); @@ -203,4 +248,45 @@ public String execute(String input) { return Activity.getExecutionContext().getInfo().getActivityType() + "-" + input; } } + + public static class TestWaitingUpdate implements TestWorkflows.WorkflowWithUpdateAndSignal { + String state = "initial"; + List updates = new ArrayList<>(); + CompletablePromise signalled = Workflow.newPromise(); + CompletablePromise promise = Workflow.newPromise(); + private final TestActivities.TestActivity1 activity = + Workflow.newActivityStub( + TestActivities.TestActivity1.class, + ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofHours(1)).build()); + + @Override + public List execute() { + promise.get(); + return updates; + } + + @Override + public String getState() { + return state; + } + + @Override + public void signal(String value) { + signalled.complete(null); + } + + @Override + public String update(String value) { + Workflow.await(() -> signalled.isCompleted()); + return value; + } + + @Override + public void validator(String value) {} + + @Override + public void complete() { + promise.complete(null); + } + } } diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java index f23bcd0ada..5e1a2565a6 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java @@ -66,12 +66,7 @@ import io.temporal.api.common.v1.Payloads; import io.temporal.api.common.v1.RetryPolicy; import io.temporal.api.common.v1.WorkflowExecution; -import io.temporal.api.enums.v1.CancelExternalWorkflowExecutionFailedCause; -import io.temporal.api.enums.v1.EventType; -import io.temporal.api.enums.v1.RetryState; -import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause; -import io.temporal.api.enums.v1.StartChildWorkflowExecutionFailedCause; -import io.temporal.api.enums.v1.TimeoutType; +import io.temporal.api.enums.v1.*; import io.temporal.api.errordetails.v1.QueryFailedFailure; import io.temporal.api.failure.v1.ApplicationFailureInfo; import io.temporal.api.failure.v1.Failure; @@ -1810,6 +1805,9 @@ private static void acceptUpdate( UpdateRef.newBuilder() .setWorkflowExecution(ctx.getExecution()) .setUpdateId(data.id)) + .setStage( + UpdateWorkflowExecutionLifecycleStage + .UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED) .build(); data.acceptance.complete(response); @@ -1852,6 +1850,9 @@ private static void completeUpdate( .setWorkflowExecution(ctx.getExecution()) .setUpdateId(data.id)) .setOutcome(response.getOutcome()) + .setStage( + UpdateWorkflowExecutionLifecycleStage + .UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED) .build(); data.complete.complete(updateResponse); diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index a3a88b3eac..8bba37b0ae 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -1625,6 +1625,9 @@ private void processRejectionMessage( .setUpdateId(rejection.getRejectedRequest().getMeta().getUpdateId()) .setWorkflowExecution(ctx.getExecution())) .setOutcome(Outcome.newBuilder().setFailure(rejection.getFailure()).build()) + .setStage( + UpdateWorkflowExecutionLifecycleStage + .UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED) .build(); u.getAcceptance().complete(response); } @@ -2157,7 +2160,13 @@ public PollWorkflowExecutionUpdateResponse pollUpdateWorkflowExecution( .setOutcome(completionResponse.getOutcome()) .build(); } catch (TimeoutException e) { - return PollWorkflowExecutionUpdateResponse.getDefaultInstance(); + PollWorkflowExecutionUpdateResponse resp = + PollWorkflowExecutionUpdateResponse.getDefaultInstance(); + return resp.toBuilder() + .setStage( + UpdateWorkflowExecutionLifecycleStage + .UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED) + .build(); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof StatusRuntimeException) {