diff --git a/durabletask-client/pom.xml b/durabletask-client/pom.xml index 27de194e0a..175cde3e7e 100644 --- a/durabletask-client/pom.xml +++ b/durabletask-client/pom.xml @@ -59,6 +59,10 @@ com.fasterxml.jackson.datatype jackson-datatype-jsr310 + + org.apache.commons + commons-lang3 + io.grpc grpc-testing diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskClient.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskClient.java index 42a98dd556..7f87918038 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskClient.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskClient.java @@ -145,7 +145,7 @@ public void raiseEvent(String instanceId, String eventName) { * Waits for an orchestration to start running and returns an {@link OrchestrationMetadata} object that contains * metadata about the started instance. * - *

A "started" orchestration instance is any instance not in the Pending state.

+ *

A "started" orchestration instance is any instance not in the Pending state.

* *

If an orchestration instance is already running when this method is called, the method will return immediately. *

diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index eb3be6bb9a..f60d0b9e06 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -17,11 +17,13 @@ import io.dapr.durabletask.implementation.protobuf.OrchestratorService; import io.dapr.durabletask.implementation.protobuf.OrchestratorService.TaskFailureDetails; import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactories; import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import org.apache.commons.lang3.StringUtils; import java.time.Duration; import java.util.HashMap; @@ -42,7 +44,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private static final Logger logger = Logger.getLogger(DurableTaskGrpcWorker.class.getPackage().getName()); private static final Duration DEFAULT_MAXIMUM_TIMER_INTERVAL = Duration.ofDays(3); - private final HashMap orchestrationFactories = new HashMap<>(); + private final TaskOrchestrationFactories orchestrationFactories; + private final HashMap activityFactories = new HashMap<>(); private final ManagedChannel managedSidecarChannel; @@ -57,7 +60,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private Thread workerThread; DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) { - this.orchestrationFactories.putAll(builder.orchestrationFactories); + this.orchestrationFactories = builder.orchestrationFactories; this.activityFactories.putAll(builder.activityFactories); this.appId = builder.appId; @@ -115,7 +118,9 @@ public void start() { * */ public void close() { - this.workerThread.interrupt(); + if (this.workerThread != null) { + this.workerThread.interrupt(); + } this.isNormalShutdown = true; this.shutDownWorkerPool(); this.closeSideCarChannel(); @@ -161,6 +166,7 @@ public void startAndBlock() { OrchestratorService.WorkItem.RequestCase requestType = workItem.getRequestCase(); if (requestType == OrchestratorService.WorkItem.RequestCase.ORCHESTRATORREQUEST) { OrchestratorService.OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest(); + logger.log(Level.FINEST, String.format("Processing orchestrator request for instance: {0}", orchestratorRequest.getInstanceId())); @@ -171,11 +177,22 @@ public void startAndBlock() { orchestratorRequest.getPastEventsList(), orchestratorRequest.getNewEventsList()); + var versionBuilder = OrchestratorService.OrchestrationVersion.newBuilder(); + + if (StringUtils.isNotEmpty(taskOrchestratorResult.getVersion())) { + versionBuilder.setName(taskOrchestratorResult.getVersion()); + } + + if (taskOrchestratorResult.getPatches() != null) { + versionBuilder.addAllPatches(taskOrchestratorResult.getPatches()); + } + OrchestratorService.OrchestratorResponse response = OrchestratorService.OrchestratorResponse.newBuilder() .setInstanceId(orchestratorRequest.getInstanceId()) .addAllActions(taskOrchestratorResult.getActions()) .setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus())) .setCompletionToken(workItem.getCompletionToken()) + .setVersion(versionBuilder) .build(); try { diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java index 0d3ebf2274..ad60577256 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java @@ -13,6 +13,8 @@ package io.dapr.durabletask; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactories; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactory; import io.grpc.Channel; import java.time.Duration; @@ -24,7 +26,7 @@ * */ public final class DurableTaskGrpcWorkerBuilder { - final HashMap orchestrationFactories = new HashMap<>(); + TaskOrchestrationFactories orchestrationFactories = new TaskOrchestrationFactories(); final HashMap activityFactories = new HashMap<>(); int port; Channel channel; @@ -40,17 +42,7 @@ public final class DurableTaskGrpcWorkerBuilder { * @return this builder object */ public DurableTaskGrpcWorkerBuilder addOrchestration(TaskOrchestrationFactory factory) { - String key = factory.getName(); - if (key == null || key.length() == 0) { - throw new IllegalArgumentException("A non-empty task orchestration name is required."); - } - - if (this.orchestrationFactories.containsKey(key)) { - throw new IllegalArgumentException( - String.format("A task orchestration factory named %s is already registered.", key)); - } - - this.orchestrationFactories.put(key, factory); + this.orchestrationFactories.addOrchestration(factory); return this; } diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationMetadata.java b/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationMetadata.java index a0565ba634..7f9285d034 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationMetadata.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationMetadata.java @@ -226,7 +226,7 @@ public boolean isCustomStatusFetched() { private T readPayloadAs(Class type, String payload) { if (!this.requestedInputsAndOutputs) { throw new IllegalStateException("This method can only be used when instance metadata is fetched with the option " - + "to include input and output data."); + + "to include input and output data."); } // Note that the Java gRPC implementation converts null protobuf strings into empty Java strings diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRunner.java b/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRunner.java index 22b2154608..bbb9814a86 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRunner.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRunner.java @@ -16,10 +16,11 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.StringValue; import io.dapr.durabletask.implementation.protobuf.OrchestratorService; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactories; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactory; import java.time.Duration; import java.util.Base64; -import java.util.HashMap; import java.util.logging.Logger; /** @@ -134,8 +135,8 @@ public static byte[] loadAndRun(byte[] orchestratorRequestBytes, TaskOrchestrati } // Register the passed orchestration as the default ("*") orchestration - HashMap orchestrationFactories = new HashMap<>(); - orchestrationFactories.put("*", new TaskOrchestrationFactory() { + TaskOrchestrationFactories orchestrationFactories = new TaskOrchestrationFactories(); + orchestrationFactories.addOrchestration(new TaskOrchestrationFactory() { @Override public String getName() { return "*"; @@ -145,6 +146,16 @@ public String getName() { public TaskOrchestration create() { return orchestration; } + + @Override + public String getVersionName() { + return ""; + } + + @Override + public Boolean isLatestVersion() { + return false; + } }); TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor( diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRuntimeStatus.java b/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRuntimeStatus.java index 1bdd33ab38..e9530ae815 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRuntimeStatus.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRuntimeStatus.java @@ -68,7 +68,12 @@ public enum OrchestrationRuntimeStatus { /** * The orchestration is in a suspended state. */ - SUSPENDED; + SUSPENDED, + + /** + * The orchestration is in a stalled state. + */ + STALLED; static OrchestrationRuntimeStatus fromProtobuf(OrchestratorService.OrchestrationStatus status) { switch (status) { @@ -88,6 +93,8 @@ static OrchestrationRuntimeStatus fromProtobuf(OrchestratorService.Orchestration return PENDING; case ORCHESTRATION_STATUS_SUSPENDED: return SUSPENDED; + case ORCHESTRATION_STATUS_STALLED: + return STALLED; default: throw new IllegalArgumentException(String.format("Unknown status value: %s", status)); } diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/Task.java b/durabletask-client/src/main/java/io/dapr/durabletask/Task.java index a3f3313816..de2f13e871 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/Task.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/Task.java @@ -27,12 +27,14 @@ *
  * Task{@literal <}int{@literal >} activityTask = ctx.callActivity("MyActivity", someInput, int.class);
  * 
+ * *

Orchestrator code uses the {@link #await()} method to block on the completion of the task and retrieve the result. * If the task is not yet complete, the {@code await()} method will throw an {@link OrchestratorBlockedException}, which * pauses the orchestrator's execution so that it can save its progress into durable storage and schedule any * outstanding work. When the task is complete, the orchestrator will run again from the beginning and the next time * the task's {@code await()} method is called, the result will be returned, or a {@link TaskFailedException} will be * thrown if the result of the task was an unhandled exception.

+ * *

Note that orchestrator code must never catch {@code OrchestratorBlockedException} because doing so can cause the * orchestration instance to get permanently stuck.

* diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java index b2043b51ee..7a0d1ed1ee 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java @@ -34,7 +34,6 @@ public interface TaskActivityContext { */ T getInput(Class targetType); - /** * Gets the execution id of the current task activity. * diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskFailedException.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskFailedException.java index 377eecb426..5362e830c7 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskFailedException.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskFailedException.java @@ -16,6 +16,7 @@ /** * Exception that gets thrown when awaiting a {@link Task} for an activity or sub-orchestration that fails with an * unhandled exception. + * *

Detailed information associated with a particular task failure can be retrieved * using the {@link #getErrorDetails()} method.

*/ diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java index cd451a2f9b..f2f5ace8c2 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java @@ -378,6 +378,15 @@ default void continueAsNew(Object input) { */ void continueAsNew(Object input, boolean preserveUnprocessedEvents); + /** + * Check if the given patch name can be applied to the orchestration. + * + * @param patchName The name of the patch to check. + * @return True if the given patch name can be applied to the orchestration, False otherwise. + */ + + boolean isPatched(String patchName); + /** * Create a new Uuid that is safe for replay within an orchestration or operation. * diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java index fc79dc9158..0b540dabc0 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java @@ -19,7 +19,11 @@ import io.dapr.durabletask.implementation.protobuf.OrchestratorService.ScheduleTaskAction.Builder; import io.dapr.durabletask.interruption.ContinueAsNewInterruption; import io.dapr.durabletask.interruption.OrchestratorBlockedException; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactories; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactory; +import io.dapr.durabletask.orchestration.exception.VersionNotRegisteredException; import io.dapr.durabletask.util.UuidGenerator; +import org.apache.commons.lang3.StringUtils; import javax.annotation.Nullable; import java.time.Duration; @@ -47,14 +51,14 @@ final class TaskOrchestrationExecutor { private static final String EMPTY_STRING = ""; - private final HashMap orchestrationFactories; + private final TaskOrchestrationFactories orchestrationFactories; private final DataConverter dataConverter; private final Logger logger; private final Duration maximumTimerInterval; private final String appId; public TaskOrchestrationExecutor( - HashMap orchestrationFactories, + TaskOrchestrationFactories orchestrationFactories, DataConverter dataConverter, Duration maximumTimerInterval, Logger logger, @@ -79,6 +83,9 @@ public TaskOrchestratorResult execute(List pas } completed = true; logger.finest("The orchestrator execution completed normally"); + } catch (VersionNotRegisteredException versionNotRegisteredException) { + logger.warning("The orchestrator version is not registered: " + versionNotRegisteredException.toString()); + context.setVersionNotRegistered(); } catch (OrchestratorBlockedException orchestratorBlockedException) { logger.fine("The orchestrator has yielded and will await for new events."); } catch (ContinueAsNewInterruption continueAsNewInterruption) { @@ -87,7 +94,7 @@ public TaskOrchestratorResult execute(List pas } catch (Exception e) { // The orchestrator threw an unhandled exception - fail it // TODO: What's the right way to log this? - logger.warning("The orchestrator failed with an unhandled exception: " + e.toString()); + logger.warning("The orchestrator failed with an unhandled exception: " + e); context.fail(new FailureDetails(e)); } @@ -97,12 +104,16 @@ public TaskOrchestratorResult execute(List pas context.complete(null); } - return new TaskOrchestratorResult(context.pendingActions.values(), context.getCustomStatus()); + return new TaskOrchestratorResult(context.pendingActions.values(), + context.getCustomStatus(), + context.versionName, + context.encounteredPatches); } private class ContextImplTask implements TaskOrchestrationContext { private String orchestratorName; + private final List encounteredPatches = new ArrayList<>(); private String rawInput; private String instanceId; private Instant currentInstant; @@ -127,6 +138,12 @@ private class ContextImplTask implements TaskOrchestrationContext { private Object continuedAsNewInput; private boolean preserveUnprocessedEvents; private Object customStatus; + private final Map appliedPatches = new HashMap<>(); + private final Map historyPatches = new HashMap<>(); + + private String orchestratorVersionName; + + private String versionName; public ContextImplTask(List pastEvents, List newEvents) { @@ -363,6 +380,34 @@ public Task callActivity( return this.createAppropriateTask(taskFactory, options); } + @Override + public boolean isPatched(String patchName) { + var isPatched = this.checkPatch(patchName); + if (isPatched) { + this.encounteredPatches.add(patchName); + } + + return isPatched; + } + + public boolean checkPatch(String patchName) { + if (this.appliedPatches.containsKey(patchName)) { + return this.appliedPatches.get(patchName); + } + + if (this.historyPatches.containsKey(patchName)) { + this.appliedPatches.put(patchName, true); + return true; + } + + if (this.isReplaying) { + this.appliedPatches.put(patchName, false); + return false; + } + this.appliedPatches.put(patchName, true); + return true; + } + @Override public void continueAsNew(Object input, boolean preserveUnprocessedEvents) { Helpers.throwIfOrchestratorComplete(this.isComplete); @@ -438,7 +483,7 @@ public Task callSubOrchestrator( if (input instanceof TaskOptions) { throw new IllegalArgumentException("TaskOptions cannot be used as an input. " - + "Did you call the wrong method overload?"); + + "Did you call the wrong method overload?"); } String serializedInput = this.dataConverter.serialize(input); @@ -944,6 +989,14 @@ private void processEvent(OrchestratorService.HistoryEvent e) { case ORCHESTRATORSTARTED: Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp()); this.setCurrentInstant(instant); + + if (StringUtils.isNotEmpty(e.getOrchestratorStarted().getVersion().getName())) { + this.orchestratorVersionName = e.getOrchestratorStarted().getVersion().getName(); + } + for (var patch : e.getOrchestratorStarted().getVersion().getPatchesList()) { + this.historyPatches.put(patch, true); + } + this.logger.fine(() -> this.instanceId + ": Workflow orchestrator started"); break; case ORCHESTRATORCOMPLETED: @@ -958,18 +1011,27 @@ private void processEvent(OrchestratorService.HistoryEvent e) { this.logger.fine(() -> this.instanceId + ": Workflow execution started"); this.setAppId(e.getRouter().getSourceAppID()); + var versionName = ""; + if (!StringUtils.isEmpty(this.orchestratorVersionName)) { + versionName = this.orchestratorVersionName; + } + // Create and invoke the workflow orchestrator TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories - .get(executionStarted.getName()); + .getOrchestrationFactory(executionStarted.getName(), versionName); + if (factory == null) { // Try getting the default orchestrator - factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*"); + factory = TaskOrchestrationExecutor.this.orchestrationFactories + .getOrchestrationFactory("*"); } // TODO: Throw if the factory is null (orchestration by that name doesn't exist) if (factory == null) { throw new IllegalStateException("No factory found for orchestrator: " + executionStarted.getName()); } + this.versionName = factory.getVersionName(); + TaskOrchestration orchestrator = factory.create(); orchestrator.run(this); break; @@ -979,6 +1041,9 @@ private void processEvent(OrchestratorService.HistoryEvent e) { case EXECUTIONTERMINATED: this.handleExecutionTerminated(e); break; + case EXECUTIONSTALLED: + this.logger.fine(() -> this.instanceId + ": Workflow execution stalled"); + break; case TASKSCHEDULED: this.handleTaskScheduled(e); break; @@ -1018,6 +1083,22 @@ private void processEvent(OrchestratorService.HistoryEvent e) { } } + public void setVersionNotRegistered() { + this.pendingActions.clear(); + + OrchestratorService.CompleteOrchestrationAction.Builder builder = OrchestratorService.CompleteOrchestrationAction + .newBuilder(); + builder.setOrchestrationStatus(OrchestratorService.OrchestrationStatus.ORCHESTRATION_STATUS_STALLED); + + int id = this.sequenceNumber++; + OrchestratorService.OrchestratorAction action = OrchestratorService.OrchestratorAction.newBuilder() + .setId(id) + .setCompleteOrchestration(builder.build()) + .build(); + this.pendingActions.put(id, action); + + } + private class TaskRecord { private final CompletableTask task; private final String taskName; diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java index 705a41d5c0..c8f4ecaee5 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java @@ -17,6 +17,7 @@ import java.util.Collection; import java.util.Collections; +import java.util.List; final class TaskOrchestratorResult { @@ -24,10 +25,16 @@ final class TaskOrchestratorResult { private final String customStatus; - public TaskOrchestratorResult(Collection actions, String customStatus) { + private final String version; + + private final List patches; + + public TaskOrchestratorResult(Collection actions, + String customStatus, String version, List patches) { this.actions = Collections.unmodifiableCollection(actions); - ; this.customStatus = customStatus; + this.version = version; + this.patches = patches; } public Collection getActions() { @@ -37,4 +44,12 @@ public Collection getActions() { public String getCustomStatus() { return this.customStatus; } + + public String getVersion() { + return version; + } + + public List getPatches() { + return patches; + } } diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactories.java b/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactories.java new file mode 100644 index 0000000000..8ed48aa295 --- /dev/null +++ b/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactories.java @@ -0,0 +1,120 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask.orchestration; + +import io.dapr.durabletask.orchestration.exception.VersionNotRegisteredException; + +import java.util.HashMap; +import java.util.logging.Logger; + +public class TaskOrchestrationFactories { + private static final Logger logger = Logger.getLogger(TaskOrchestrationFactories.class.getPackage().getName()); + + final HashMap orchestrationFactories = new HashMap<>(); + final HashMap> versionedOrchestrationFactories = new HashMap<>(); + final HashMap latestVersionOrchestrationFactories = new HashMap<>(); + + /** + * Adds a new orchestration factory to the registry. + * + * @param factory the factory to add + */ + public void addOrchestration(TaskOrchestrationFactory factory) { + String key = factory.getName(); + if (this.emptyString(key)) { + throw new IllegalArgumentException("A non-empty task orchestration name is required."); + } + + if (this.orchestrationFactories.containsKey(key)) { + throw new IllegalArgumentException( + String.format("A task orchestration factory named %s is already registered.", key)); + } + + if (emptyString(factory.getVersionName())) { + this.orchestrationFactories.put(key, factory); + return; + } + + if (!this.versionedOrchestrationFactories.containsKey(key)) { + this.versionedOrchestrationFactories.put(key, new HashMap<>()); + } + + if (this.versionedOrchestrationFactories.get(key).containsKey(factory.getVersionName())) { + throw new IllegalArgumentException("The version name " + factory.getVersionName() + "for " + + factory.getName() + " is already registered."); + } + + this.versionedOrchestrationFactories.get(key).put(factory.getVersionName(), factory); + + if (factory.isLatestVersion()) { + logger.info("Setting1 latest version for " + key + " to " + factory.getVersionName()); + if (this.latestVersionOrchestrationFactories.containsKey(key)) { + throw new IllegalStateException("Latest version already set for " + key); + } + this.latestVersionOrchestrationFactories.put(key, factory.getVersionName()); + } + + } + + /** + * Gets the orchestration factory for the specified orchestration name. + * + * @param orchestrationName the orchestration name + * @return the orchestration factory + */ + public TaskOrchestrationFactory getOrchestrationFactory(String orchestrationName) { + logger.info("Get orchestration factory for " + orchestrationName); + if (this.orchestrationFactories.containsKey(orchestrationName)) { + return this.orchestrationFactories.get(orchestrationName); + } + + return this.getOrchestrationFactory(orchestrationName, ""); + } + + /** + * Gets the orchestration factory for the specified orchestration name and version. + * + * @param orchestrationName the orchestration name + * @param versionName the version name + * @return the orchestration factory + */ + public TaskOrchestrationFactory getOrchestrationFactory(String orchestrationName, String versionName) { + logger.info("Get orchestration factory for " + orchestrationName + " version " + versionName); + if (this.orchestrationFactories.containsKey(orchestrationName)) { + return this.orchestrationFactories.get(orchestrationName); + } + + if (!this.versionedOrchestrationFactories.containsKey(orchestrationName)) { + logger.warning("No orchestration factory registered for " + orchestrationName); + return null; + } + + if (this.emptyString(versionName)) { + logger.info("No version specified, returning latest version"); + String latestVersion = this.latestVersionOrchestrationFactories.get(orchestrationName); + logger.info("Latest version is " + latestVersion); + return this.versionedOrchestrationFactories.get(orchestrationName).get(latestVersion); + } + + if (this.versionedOrchestrationFactories.get(orchestrationName).containsKey(versionName)) { + return this.versionedOrchestrationFactories.get(orchestrationName).get(versionName); + } + + throw new VersionNotRegisteredException(); + } + + private boolean emptyString(String s) { + return s == null || s.isEmpty(); + } +} diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationFactory.java b/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactory.java similarity index 87% rename from durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationFactory.java rename to durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactory.java index 274813b69f..a5e1b6a3cf 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationFactory.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/TaskOrchestrationFactory.java @@ -11,7 +11,9 @@ limitations under the License. */ -package io.dapr.durabletask; +package io.dapr.durabletask.orchestration; + +import io.dapr.durabletask.TaskOrchestration; /** * Factory interface for producing {@link TaskOrchestration} implementations. @@ -30,4 +32,8 @@ public interface TaskOrchestrationFactory { * @return the created orchestration instance */ TaskOrchestration create(); + + String getVersionName(); + + Boolean isLatestVersion(); } diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/exception/VersionNotRegisteredException.java b/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/exception/VersionNotRegisteredException.java new file mode 100644 index 0000000000..f69ad9ea65 --- /dev/null +++ b/durabletask-client/src/main/java/io/dapr/durabletask/orchestration/exception/VersionNotRegisteredException.java @@ -0,0 +1,17 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask.orchestration.exception; + +public class VersionNotRegisteredException extends RuntimeException { +} diff --git a/durabletask-client/src/test/java/io/dapr/durabletask/IntegrationTestBase.java b/durabletask-client/src/test/java/io/dapr/durabletask/IntegrationTestBase.java index bbfcde0469..d0a8a8faa6 100644 --- a/durabletask-client/src/test/java/io/dapr/durabletask/IntegrationTestBase.java +++ b/durabletask-client/src/test/java/io/dapr/durabletask/IntegrationTestBase.java @@ -13,6 +13,7 @@ package io.dapr.durabletask; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactory; import org.junit.jupiter.api.AfterEach; import java.time.Duration; @@ -67,6 +68,16 @@ public String getName() { public TaskOrchestration create() { return implementation; } + + @Override + public String getVersionName() { + return ""; + } + + @Override + public Boolean isLatestVersion() { + return false; + } }); return this; } diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/FullVersioningWorkflowsIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/FullVersioningWorkflowsIT.java new file mode 100644 index 0000000000..e20283280a --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/FullVersioningWorkflowsIT.java @@ -0,0 +1,226 @@ +/* + * Copyright 2024 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.testcontainers.workflows.version.full; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.dapr.config.Properties; +import io.dapr.it.spring.data.CustomMySQLContainer; +import io.dapr.it.testcontainers.ContainerConstants; +import io.dapr.it.testcontainers.workflows.TestWorkflowsApplication; +import io.dapr.it.testcontainers.workflows.TestWorkflowsConfiguration; +import io.dapr.testcontainers.Component; +import io.dapr.testcontainers.DaprContainer; +import io.dapr.testcontainers.DaprLogLevel; +import io.dapr.testcontainers.DaprPlacementContainer; +import io.dapr.testcontainers.DaprSchedulerContainer; +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.WorkflowState; +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.containers.wait.strategy.WaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; +import java.util.Map; + +import static io.dapr.it.spring.data.DaprSpringDataConstants.STATE_STORE_NAME; +import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG; +import static io.dapr.testcontainers.DaprContainerConstants.DAPR_PLACEMENT_IMAGE_TAG; +import static io.dapr.testcontainers.DaprContainerConstants.DAPR_SCHEDULER_IMAGE_TAG; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrowsExactly; + +@SpringBootTest( + webEnvironment = WebEnvironment.RANDOM_PORT, + classes = { + TestWorkflowsConfiguration.class, + TestWorkflowsApplication.class + } +) +@Testcontainers +@Tag("testcontainers") +public class FullVersioningWorkflowsIT { + + private static final Network DAPR_NETWORK = Network.newNetwork(); + + private static final WaitStrategy MYSQL_WAIT_STRATEGY = Wait + .forLogMessage(".*port: 3306 MySQL Community Server \\(GPL\\).*", 1) + .withStartupTimeout(Duration.of(60, ChronoUnit.SECONDS)); + + private static final String STATE_STORE_DSN = "mysql:password@tcp(mysql:3306)/"; + private static final Map STATE_STORE_PROPERTIES = createStateStoreProperties(); + + @Container + private static final MySQLContainer MY_SQL_CONTAINER = new CustomMySQLContainer<>("mysql:5.7.34") + .withNetworkAliases("mysql") + .withDatabaseName("dapr_db") + .withUsername("mysql") + .withPassword("password") + .withNetwork(DAPR_NETWORK) + .waitingFor(MYSQL_WAIT_STRATEGY); + + @Container + private final static DaprPlacementContainer sharedPlacementContainer = new DaprPlacementContainer(DAPR_PLACEMENT_IMAGE_TAG) + .withNetwork(DAPR_NETWORK) + .withNetworkAliases("placement") + .withReuse(false); + + @Container + private final static DaprSchedulerContainer sharedSchedulerContainer = new DaprSchedulerContainer(DAPR_SCHEDULER_IMAGE_TAG) + .withNetwork(DAPR_NETWORK) + .withNetworkAliases("scheduler") + .withReuse(false); + + @Container + private static final DaprContainer DAPR_CONTAINER_V1 = new DaprContainer(DAPR_RUNTIME_IMAGE_TAG) + .withAppName("dapr-worker") + .withNetworkAliases("dapr-worker-v1") + .withNetwork(DAPR_NETWORK) + .withComponent(new Component(STATE_STORE_NAME, "state.mysql", "v1", STATE_STORE_PROPERTIES)) + .withPlacementContainer(sharedPlacementContainer) + .withSchedulerContainer(sharedSchedulerContainer) + .withDaprLogLevel(DaprLogLevel.DEBUG) + .withLogConsumer(outputFrame -> System.out.println("daprV1 -> " +outputFrame.getUtf8String())) + .withAppChannelAddress("host.testcontainers.internal") + .dependsOn(MY_SQL_CONTAINER, sharedPlacementContainer, sharedSchedulerContainer); + + private static final DaprContainer DAPR_CONTAINER_V2 = new DaprContainer(DAPR_RUNTIME_IMAGE_TAG) + .withAppName("dapr-worker") + .withNetworkAliases("dapr-worker-v2") + .withNetwork(DAPR_NETWORK) + .withComponent(new Component(STATE_STORE_NAME, "state.mysql", "v1", STATE_STORE_PROPERTIES)) + .withPlacementContainer(sharedPlacementContainer) + .withSchedulerContainer(sharedSchedulerContainer) + .withDaprLogLevel(DaprLogLevel.DEBUG) + .withLogConsumer(outputFrame -> System.out.println("daprV2 -> " + outputFrame.getUtf8String())) + .withAppChannelAddress("host.testcontainers.internal") + .dependsOn(MY_SQL_CONTAINER, sharedPlacementContainer, sharedSchedulerContainer); + + @Container + private final static GenericContainer workerV1 = new GenericContainer<>(ContainerConstants.JDK_17_TEMURIN_JAMMY) + .withCopyFileToContainer(MountableFile.forHostPath("target"), "/app") + .withWorkingDirectory("/app") + .withCommand("java", "-cp", "test-classes:classes:dependency/*:*", + "-Ddapr.app.id=dapr-worker", + "-Ddapr.grpc.endpoint=dapr-worker-v1:50001", + "-Ddapr.http.endpoint=dapr-worker-v1:3500", + "io.dapr.it.testcontainers.workflows.version.full.WorkflowV1Worker") + .withNetwork(DAPR_NETWORK) + .dependsOn(DAPR_CONTAINER_V1) + .waitingFor(Wait.forLogMessage(".*WorkerV1 started.*", 1)) + .withLogConsumer(outputFrame -> System.out.println("WorkerV1: " + outputFrame.getUtf8String())); + +// This container will be started manually + private final static GenericContainer workerV2 = new GenericContainer<>(ContainerConstants.JDK_17_TEMURIN_JAMMY) + .withCopyFileToContainer(MountableFile.forHostPath("target"), "/app") + .withWorkingDirectory("/app") + .withCommand("java", "-cp", "test-classes:classes:dependency/*:*", + "-Ddapr.app.id=dapr-worker", + "-Ddapr.grpc.endpoint=dapr-worker-v2:50001", + "-Ddapr.http.endpoint=dapr-worker-v2:3500", + "io.dapr.it.testcontainers.workflows.version.full.WorkflowV2Worker") + .withNetwork(DAPR_NETWORK) + .dependsOn(DAPR_CONTAINER_V2) + .waitingFor(Wait.forLogMessage(".*WorkerV2 started.*", 1)) + .withLogConsumer(outputFrame -> System.out.println("WorkerV2: " + outputFrame.getUtf8String())); + + + private static Map createStateStoreProperties() { + Map result = new HashMap<>(); + + result.put("keyPrefix", "name"); + result.put("schemaName", "dapr_db"); + result.put("actorStateStore", "true"); + result.put("connectionString", STATE_STORE_DSN); + + return result; + } + + @DynamicPropertySource + static void daprProperties(DynamicPropertyRegistry registry) { + registry.add("dapr.http.endpoint", DAPR_CONTAINER_V1::getHttpEndpoint); + registry.add("dapr.grpc.endpoint", DAPR_CONTAINER_V1::getGrpcEndpoint); + } + + @Test + public void testWorkflows() throws Exception { + DaprWorkflowClient workflowClientV1 = daprWorkflowClient(DAPR_CONTAINER_V1.getHttpEndpoint(), DAPR_CONTAINER_V1.getGrpcEndpoint()); +// Start workflow V1 + String instanceIdV1 = workflowClientV1.scheduleNewWorkflow("VersionWorkflow"); + workflowClientV1.waitForWorkflowStart(instanceIdV1, Duration.ofSeconds(10), false); + + // Stop worker and dapr + workerV1.stop(); + DAPR_CONTAINER_V1.stop(); + + // Start new worker with patched workflow + DAPR_CONTAINER_V2.start(); + workerV2.start(); + Thread.sleep(1000); + DaprWorkflowClient workflowClientV2 = daprWorkflowClient(DAPR_CONTAINER_V2.getHttpEndpoint(), DAPR_CONTAINER_V2.getGrpcEndpoint()); + + // Start workflow V2 + String instanceIdV2 = workflowClientV2.scheduleNewWorkflow("VersionWorkflow"); + workflowClientV2.waitForWorkflowStart(instanceIdV2, Duration.ofSeconds(10), false); + + // Continue workflow V1 + workflowClientV2.raiseEvent(instanceIdV1, "test", null); + + // Wait for workflow to complete + Duration timeout = Duration.ofSeconds(10); + WorkflowState workflowStatusV1 = workflowClientV2.waitForWorkflowCompletion(instanceIdV1, timeout, true); + WorkflowState workflowStatusV2 = workflowClientV2.waitForWorkflowCompletion(instanceIdV2, timeout, true); + + assertNotNull(workflowStatusV1); + assertNotNull(workflowStatusV2); + + String resultV1 = workflowStatusV1.readOutputAs(String.class); + assertEquals("Activity1, Activity2", resultV1); + + String resultV2 = workflowStatusV2.readOutputAs(String.class); + assertEquals("Activity3, Activity4", resultV2); + } + + public DaprWorkflowClient daprWorkflowClient( + String daprHttpEndpoint, + String daprGrpcEndpoint + ){ + Map overrides = Map.of( + "dapr.http.endpoint", daprHttpEndpoint, + "dapr.grpc.endpoint", daprGrpcEndpoint + ); + + return new DaprWorkflowClient(new Properties(overrides)); + } +} + diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/VersionedWorkflows.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/VersionedWorkflows.java new file mode 100644 index 0000000000..fbaa0ec62f --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/VersionedWorkflows.java @@ -0,0 +1,80 @@ +package io.dapr.it.testcontainers.workflows.version.full; + +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class VersionedWorkflows { + public static final String ACTIVITY_1 = "Activity1"; + public static final String ACTIVITY_2 = "Activity2"; + public static final String ACTIVITY_3 = "Activity3"; + public static final String ACTIVITY_4 = "Activity4"; + + public static class FullVersionWorkflowV1 implements Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow V1: {}", ctx.getName()); + + String result = ""; + result += ctx.callActivity(VersionedWorkflows.ACTIVITY_1, String.class).await() +", "; + ctx.waitForExternalEvent("test").await(); + result += ctx.callActivity(VersionedWorkflows.ACTIVITY_2, String.class).await(); + + ctx.getLogger().info("Workflow finished with result: {}", result); + ctx.complete(result); + }; + } + } + + public static class FullVersionWorkflowV2 implements Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow V2: {}", ctx.getName()); + + String result = ""; + result += ctx.callActivity(VersionedWorkflows.ACTIVITY_3, String.class).await() +", "; + result += ctx.callActivity(VersionedWorkflows.ACTIVITY_4, String.class).await(); + + ctx.getLogger().info("Workflow finished with result: {}", result); + ctx.complete(result); + }; + } + } + + public static void addWorkflowV1(WorkflowRuntimeBuilder workflowRuntimeBuilder, boolean isLatest) { + workflowRuntimeBuilder.registerWorkflow("VersionWorkflow", + VersionedWorkflows.FullVersionWorkflowV1.class, + "V1", + isLatest); + + + workflowRuntimeBuilder.registerActivity(VersionedWorkflows.ACTIVITY_1, (ctx -> { + System.out.println("Activity1 called."); + return VersionedWorkflows.ACTIVITY_1; + })); + + workflowRuntimeBuilder.registerActivity(VersionedWorkflows.ACTIVITY_2, (ctx -> { + System.out.println("Activity2 called."); + return VersionedWorkflows.ACTIVITY_2; + })); + } + + public static void addWorkflowV2(WorkflowRuntimeBuilder workflowRuntimeBuilder) { + workflowRuntimeBuilder.registerWorkflow("VersionWorkflow", + VersionedWorkflows.FullVersionWorkflowV2.class, + "V2", + true); + + workflowRuntimeBuilder.registerActivity(VersionedWorkflows.ACTIVITY_3, (ctx -> { + System.out.println("Activity3 called."); + return VersionedWorkflows.ACTIVITY_3; + })); + + workflowRuntimeBuilder.registerActivity(VersionedWorkflows.ACTIVITY_4, (ctx -> { + System.out.println("Activity4 called."); + return VersionedWorkflows.ACTIVITY_4; + })); + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/WorkflowV1Worker.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/WorkflowV1Worker.java new file mode 100644 index 0000000000..b98385dce9 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/WorkflowV1Worker.java @@ -0,0 +1,22 @@ +package io.dapr.it.testcontainers.workflows.version.full; + +import io.dapr.it.testcontainers.workflows.multiapp.MultiAppWorkflow; +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class WorkflowV1Worker { + public static void main(String[] args) throws Exception { + System.out.println("=== Starting Workflow V1 Runtime ==="); + + // Register the Workflow with the builder + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(); + VersionedWorkflows.addWorkflowV1(builder, true); + + // Build and start the workflow runtime + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("WorkerV1 started"); + System.out.println("Waiting for workflow orchestration requests..."); + runtime.start(); + } + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/WorkflowV2Worker.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/WorkflowV2Worker.java new file mode 100644 index 0000000000..c8324f1d39 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/full/WorkflowV2Worker.java @@ -0,0 +1,22 @@ +package io.dapr.it.testcontainers.workflows.version.full; + +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class WorkflowV2Worker { + public static void main(String[] args) throws Exception { + System.out.println("=== Starting Workflow V2 Runtime ==="); + + // Register the Workflow with the builder + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(); + VersionedWorkflows.addWorkflowV1(builder, false); + VersionedWorkflows.addWorkflowV2(builder); + + // Build and start the workflow runtime + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("WorkerV2 started"); + System.out.println("Waiting for workflow orchestration requests..."); + runtime.start(); + } + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/patch/PatchVersioningWorkflowsIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/patch/PatchVersioningWorkflowsIT.java new file mode 100644 index 0000000000..9e13a9db33 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/patch/PatchVersioningWorkflowsIT.java @@ -0,0 +1,226 @@ +/* + * Copyright 2024 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.testcontainers.workflows.version.patch; + +import io.dapr.config.Properties; +import io.dapr.it.spring.data.CustomMySQLContainer; +import io.dapr.it.testcontainers.ContainerConstants; +import io.dapr.it.testcontainers.workflows.TestWorkflowsApplication; +import io.dapr.it.testcontainers.workflows.TestWorkflowsConfiguration; +import io.dapr.testcontainers.Component; +import io.dapr.testcontainers.DaprContainer; +import io.dapr.testcontainers.DaprLogLevel; +import io.dapr.testcontainers.DaprPlacementContainer; +import io.dapr.testcontainers.DaprSchedulerContainer; +import io.dapr.testcontainers.WorkflowDashboardContainer; +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.WorkflowState; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.containers.wait.strategy.WaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static io.dapr.it.spring.data.DaprSpringDataConstants.STATE_STORE_NAME; +import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG; +import static io.dapr.testcontainers.DaprContainerConstants.DAPR_PLACEMENT_IMAGE_TAG; +import static io.dapr.testcontainers.DaprContainerConstants.DAPR_SCHEDULER_IMAGE_TAG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +@SpringBootTest( + webEnvironment = WebEnvironment.RANDOM_PORT, + classes = { + TestWorkflowsConfiguration.class, + TestWorkflowsApplication.class + } +) +@Testcontainers +@Tag("testcontainers") +public class PatchVersioningWorkflowsIT { + + private static final Network DAPR_NETWORK = Network.newNetwork(); + + private static final WaitStrategy MYSQL_WAIT_STRATEGY = Wait + .forLogMessage(".*port: 3306 MySQL Community Server \\(GPL\\).*", 1) + .withStartupTimeout(Duration.of(60, ChronoUnit.SECONDS)); + + private static final String STATE_STORE_DSN = "mysql:password@tcp(mysql:3306)/"; + private static final Map STATE_STORE_PROPERTIES = createStateStoreProperties(); + + @Container + private static final MySQLContainer MY_SQL_CONTAINER = new CustomMySQLContainer<>("mysql:5.7.34") + .withNetworkAliases("mysql") + .withDatabaseName("dapr_db") + .withUsername("mysql") + .withPassword("password") + .withNetwork(DAPR_NETWORK) + .waitingFor(MYSQL_WAIT_STRATEGY); + + @Container + private final static DaprPlacementContainer sharedPlacementContainer = new DaprPlacementContainer(DAPR_PLACEMENT_IMAGE_TAG) + .withNetwork(DAPR_NETWORK) + .withNetworkAliases("placement") + .withReuse(false); + + @Container + private final static DaprSchedulerContainer sharedSchedulerContainer = new DaprSchedulerContainer(DAPR_SCHEDULER_IMAGE_TAG) + .withNetwork(DAPR_NETWORK) + .withNetworkAliases("scheduler") + .withReuse(false); + + @Container + private static final DaprContainer DAPR_CONTAINER_V1 = new DaprContainer(DAPR_RUNTIME_IMAGE_TAG) + .withAppName("dapr-patch-worker") + .withNetworkAliases("dapr-worker-v1") + .withNetwork(DAPR_NETWORK) + .withComponent(new Component(STATE_STORE_NAME, "state.mysql", "v1", STATE_STORE_PROPERTIES)) + .withPlacementContainer(sharedPlacementContainer) + .withSchedulerContainer(sharedSchedulerContainer) + .withDaprLogLevel(DaprLogLevel.DEBUG) + .withLogConsumer(outputFrame -> System.out.println("PatchDaprV1 -> " +outputFrame.getUtf8String())) + .withAppChannelAddress("host.testcontainers.internal") + .dependsOn(MY_SQL_CONTAINER, sharedPlacementContainer, sharedSchedulerContainer); + + private static final DaprContainer DAPR_CONTAINER_V2 = new DaprContainer(DAPR_RUNTIME_IMAGE_TAG) + .withAppName("dapr-patch-worker") + .withNetworkAliases("dapr-worker-v2") + .withNetwork(DAPR_NETWORK) + .withComponent(new Component(STATE_STORE_NAME, "state.mysql", "v1", STATE_STORE_PROPERTIES)) + .withPlacementContainer(sharedPlacementContainer) + .withSchedulerContainer(sharedSchedulerContainer) + .withDaprLogLevel(DaprLogLevel.DEBUG) + .withLogConsumer(outputFrame -> System.out.println("PatchDaprV2 -> " + outputFrame.getUtf8String())) + .withAppChannelAddress("host.testcontainers.internal") + .dependsOn(MY_SQL_CONTAINER, sharedPlacementContainer, sharedSchedulerContainer); + + @Container + private final static GenericContainer workerV1 = new GenericContainer<>(ContainerConstants.JDK_17_TEMURIN_JAMMY) + .withCopyFileToContainer(MountableFile.forHostPath("target"), "/app") + .withWorkingDirectory("/app") + .withCommand("java", "-cp", "test-classes:classes:dependency/*:*", + "-Ddapr.app.id=dapr-worker", + "-Ddapr.grpc.endpoint=dapr-worker-v1:50001", + "-Ddapr.http.endpoint=dapr-worker-v1:3500", + "io.dapr.it.testcontainers.workflows.version.patch.WorkflowV1Worker") + .withNetwork(DAPR_NETWORK) + .dependsOn(DAPR_CONTAINER_V1) + .waitingFor(Wait.forLogMessage(".*Simple Worker started.*", 1)) + .withLogConsumer(outputFrame -> System.out.println("PatchWorkerV1: " + outputFrame.getUtf8String())); + +// This container will be started manually + private final static GenericContainer workerV2 = new GenericContainer<>(ContainerConstants.JDK_17_TEMURIN_JAMMY) + .withCopyFileToContainer(MountableFile.forHostPath("target"), "/app") + .withWorkingDirectory("/app") + .withCommand("java", "-cp", "test-classes:classes:dependency/*:*", + "-Ddapr.app.id=dapr-worker", + "-Ddapr.grpc.endpoint=dapr-worker-v2:50001", + "-Ddapr.http.endpoint=dapr-worker-v2:3500", + "io.dapr.it.testcontainers.workflows.version.patch.WorkflowV2Worker") + .withNetwork(DAPR_NETWORK) + .dependsOn(DAPR_CONTAINER_V2) + .waitingFor(Wait.forLogMessage(".*Patch Worker started.*", 1)) + .withLogConsumer(outputFrame -> System.out.println("PatchWorkerV2: " + outputFrame.getUtf8String())); + + + private static Map createStateStoreProperties() { + Map result = new HashMap<>(); + + result.put("keyPrefix", "name"); + result.put("schemaName", "dapr_db"); + result.put("actorStateStore", "true"); + result.put("connectionString", STATE_STORE_DSN); + + return result; + } + + @DynamicPropertySource + static void daprProperties(DynamicPropertyRegistry registry) { + registry.add("dapr.http.endpoint", DAPR_CONTAINER_V1::getHttpEndpoint); + registry.add("dapr.grpc.endpoint", DAPR_CONTAINER_V1::getGrpcEndpoint); + } + + @Test + public void testWorkflows() throws Exception { + DaprWorkflowClient workflowClientV1 = daprWorkflowClient(DAPR_CONTAINER_V1.getHttpEndpoint(), DAPR_CONTAINER_V1.getGrpcEndpoint()); +// Start workflow V1 + String instanceIdV1 = workflowClientV1.scheduleNewWorkflow(PatchedWorkflows.NAME); + workflowClientV1.waitForWorkflowStart(instanceIdV1, Duration.ofSeconds(10), false); + + // Wait to workflow to complete the first activity + Thread.sleep(2000); + + // Stop worker and dapr + workerV1.stop(); + DAPR_CONTAINER_V1.stop(); + + // Start new worker with patched workflow + DAPR_CONTAINER_V2.start(); + workerV2.start(); + DaprWorkflowClient workflowClientV2 = daprWorkflowClient(DAPR_CONTAINER_V2.getHttpEndpoint(), DAPR_CONTAINER_V2.getGrpcEndpoint()); + + // Start workflow V2 + String instanceIdV2 = workflowClientV2.scheduleNewWorkflow(PatchedWorkflows.NAME); + workflowClientV2.waitForWorkflowStart(instanceIdV2, Duration.ofSeconds(10), false); + + // Continue workflow V1 + workflowClientV2.raiseEvent(instanceIdV1, "test", null); + workflowClientV2.raiseEvent(instanceIdV2, "test", null); + + // Wait for workflow to complete + Duration timeout = Duration.ofSeconds(10); + WorkflowState workflowStatusV1 = workflowClientV2.waitForWorkflowCompletion(instanceIdV1, timeout, true); + WorkflowState workflowStatusV2 = workflowClientV2.waitForWorkflowCompletion(instanceIdV2, timeout, true); + + assertNotNull(workflowStatusV1); + assertNotNull(workflowStatusV2); + + String resultV1 = workflowStatusV1.readOutputAs(String.class); + assertEquals("Activity1, Activity2", resultV1); + + String resultV2 = workflowStatusV2.readOutputAs(String.class); + assertEquals("Activity1, Activity3, Activity4, Activity2", resultV2); + } + + public DaprWorkflowClient daprWorkflowClient( + String daprHttpEndpoint, + String daprGrpcEndpoint + ){ + Map overrides = Map.of( + "dapr.http.endpoint", daprHttpEndpoint, + "dapr.grpc.endpoint", daprGrpcEndpoint + ); + + return new DaprWorkflowClient(new Properties(overrides)); + } + + +} + diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/patch/PatchedWorkflows.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/patch/PatchedWorkflows.java new file mode 100644 index 0000000000..8cc404a0f7 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/patch/PatchedWorkflows.java @@ -0,0 +1,94 @@ +package io.dapr.it.testcontainers.workflows.version.patch; + +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class PatchedWorkflows { + public static final String ACTIVITY_1 = "Activity1"; + public static final String ACTIVITY_2 = "Activity2"; + public static final String ACTIVITY_3 = "Activity3"; + public static final String ACTIVITY_4 = "Activity4"; + public static final String NAME = "VersionWorkflow"; + + + public static class SimmpleWorkflow implements Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: {}", ctx.getName()); + + String result = ""; + result += ctx.callActivity(PatchedWorkflows.ACTIVITY_1, String.class).await() +", "; + ctx.waitForExternalEvent("test").await(); + result += ctx.callActivity(PatchedWorkflows.ACTIVITY_2, String.class).await(); + + ctx.getLogger().info("Workflow finished with result: {}", result); + ctx.complete(result); + }; + } + } + + + public static class PatchSimpleWorkflow implements Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow with patch V2: {}", ctx.getName()); + + String result = ""; + result += ctx.callActivity(PatchedWorkflows.ACTIVITY_1, String.class).await() +", "; + + var isPatched = ctx.isPatched("V2"); + if(isPatched) { + result += ctx.callActivity(PatchedWorkflows.ACTIVITY_3, String.class).await() + ", "; + result += ctx.callActivity(PatchedWorkflows.ACTIVITY_4, String.class).await() + ", "; + } + + ctx.waitForExternalEvent("test").await(); + result += ctx.callActivity(PatchedWorkflows.ACTIVITY_2, String.class).await(); + ctx.getLogger().info("Workflow finished with result: {}", result); + ctx.complete(result); + }; + } + } + + public static void addSimpleWorkflow(WorkflowRuntimeBuilder workflowRuntimeBuilder) { + workflowRuntimeBuilder.registerWorkflow(NAME, SimmpleWorkflow.class); + + workflowRuntimeBuilder.registerActivity(PatchedWorkflows.ACTIVITY_1, (ctx -> { + System.out.println("Activity1 called."); + return PatchedWorkflows.ACTIVITY_1; + })); + + workflowRuntimeBuilder.registerActivity(PatchedWorkflows.ACTIVITY_2, (ctx -> { + System.out.println("Activity2 called."); + return PatchedWorkflows.ACTIVITY_2; + })); + } + + public static void addPatchWorkflow(WorkflowRuntimeBuilder workflowRuntimeBuilder) { + workflowRuntimeBuilder.registerWorkflow(NAME, + PatchSimpleWorkflow.class); + + workflowRuntimeBuilder.registerActivity(PatchedWorkflows.ACTIVITY_1, (ctx -> { + System.out.println("Activity1 called."); + return PatchedWorkflows.ACTIVITY_1; + })); + + workflowRuntimeBuilder.registerActivity(PatchedWorkflows.ACTIVITY_2, (ctx -> { + System.out.println("Activity2 called."); + return PatchedWorkflows.ACTIVITY_2; + })); + + workflowRuntimeBuilder.registerActivity(PatchedWorkflows.ACTIVITY_3, (ctx -> { + System.out.println("Activity3 called."); + return PatchedWorkflows.ACTIVITY_3; + })); + + workflowRuntimeBuilder.registerActivity(PatchedWorkflows.ACTIVITY_4, (ctx -> { + System.out.println("Activity4 called."); + return PatchedWorkflows.ACTIVITY_4; + })); + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/patch/WorkflowV1Worker.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/patch/WorkflowV1Worker.java new file mode 100644 index 0000000000..bb0365aad9 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/patch/WorkflowV1Worker.java @@ -0,0 +1,21 @@ +package io.dapr.it.testcontainers.workflows.version.patch; + +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class WorkflowV1Worker { + public static void main(String[] args) throws Exception { + System.out.println("=== Starting Simple Workflow Runtime ==="); + + // Register the Workflow with the builder + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(); + PatchedWorkflows.addSimpleWorkflow(builder); + + // Build and start the workflow runtime + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("Simple Worker started"); + System.out.println("Waiting for workflow orchestration requests..."); + runtime.start(); + } + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/patch/WorkflowV2Worker.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/patch/WorkflowV2Worker.java new file mode 100644 index 0000000000..8a25145191 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/version/patch/WorkflowV2Worker.java @@ -0,0 +1,21 @@ +package io.dapr.it.testcontainers.workflows.version.patch; + +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class WorkflowV2Worker { + public static void main(String[] args) throws Exception { + System.out.println("=== Starting Patch Workflow Runtime ==="); + + // Register the Workflow with the builder + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(); + PatchedWorkflows.addPatchWorkflow(builder); + + // Build and start the workflow runtime + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("Patch Worker started"); + System.out.println("Waiting for workflow orchestration requests..."); + runtime.start(); + } + } +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java index 8608e96937..40d83af0ee 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java @@ -20,7 +20,6 @@ import org.slf4j.Logger; import javax.annotation.Nullable; - import java.time.Duration; import java.time.Instant; import java.time.ZonedDateTime; @@ -43,7 +42,6 @@ public interface WorkflowContext { */ Logger getLogger(); - /** * Gets the name of the current workflow. * @@ -533,5 +531,12 @@ default UUID newUuid() { * @param status to be set to the current execution */ void setCustomStatus(Object status); - + + /** + * Checks if the patch has been applied. + * + * @param patchName the patch name to check + * @return true if already applied + */ + boolean isPatched(String patchName); } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java index 79725c0209..5b55d036e1 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java @@ -86,7 +86,18 @@ private DaprWorkflowClient(DurableTaskClient innerClient, ManagedChannel grpcCha * @return the randomly-generated instance ID for new Workflow instance. */ public String scheduleNewWorkflow(Class clazz) { - return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName()); + return this.scheduleNewWorkflow(clazz.getCanonicalName()); + } + + /** + * Schedules a new workflow using DurableTask client. + * + * @param any Workflow type + * @param name Workflow name + * @return the randomly-generated instance ID for new Workflow instance. + */ + public String scheduleNewWorkflow(String name) { + return this.innerClient.scheduleNewOrchestrationInstance(name); } /** @@ -98,7 +109,19 @@ public String scheduleNewWorkflow(Class clazz) { * @return the randomly-generated instance ID for new Workflow instance. */ public String scheduleNewWorkflow(Class clazz, Object input) { - return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), input); + return this.scheduleNewWorkflow(clazz.getCanonicalName(), input); + } + + /** + * Schedules a new workflow using DurableTask client. + * + * @param any Workflow type + * @param name Workflow name + * @param input the input to pass to the scheduled orchestration instance. Must be serializable. + * @return the randomly-generated instance ID for new Workflow instance. + */ + public String scheduleNewWorkflow(String name, Object input) { + return this.innerClient.scheduleNewOrchestrationInstance(name, input); } /** @@ -111,7 +134,20 @@ public String scheduleNewWorkflow(Class clazz, Object in * @return the instanceId parameter value. */ public String scheduleNewWorkflow(Class clazz, Object input, String instanceId) { - return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), input, instanceId); + return this.scheduleNewWorkflow(clazz.getCanonicalName(), input, instanceId); + } + + /** + * Schedules a new workflow using DurableTask client. + * + * @param any Workflow type + * @param name Workflow name + * @param input the input to pass to the scheduled orchestration instance. Must be serializable. + * @param instanceId the unique ID of the orchestration instance to schedule + * @return the instanceId parameter value. + */ + public String scheduleNewWorkflow(String name, Object input, String instanceId) { + return this.innerClient.scheduleNewOrchestrationInstance(name, input, instanceId); } /** @@ -123,9 +159,20 @@ public String scheduleNewWorkflow(Class clazz, Object in * @return the instanceId parameter value. */ public String scheduleNewWorkflow(Class clazz, NewWorkflowOptions options) { - NewOrchestrationInstanceOptions orchestrationInstanceOptions = fromNewWorkflowOptions(options); + return this.scheduleNewWorkflow(clazz.getCanonicalName(), options); + } - return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), + /** + * Schedules a new workflow with a specified set of options for execution. + * + * @param any Workflow type + * @param name name of the workflow to schedule + * @param options the options for the new workflow, including input, instance ID, etc. + * @return the instanceId parameter value. + */ + public String scheduleNewWorkflow(String name, NewWorkflowOptions options) { + NewOrchestrationInstanceOptions orchestrationInstanceOptions = fromNewWorkflowOptions(options); + return this.innerClient.scheduleNewOrchestrationInstance(name, orchestrationInstanceOptions); } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java index 4ccf73e9b9..164fd3b415 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java @@ -319,4 +319,8 @@ private RetryHandler toRetryHandler(WorkflowTaskRetryHandler workflowTaskRetryHa public void setCustomStatus(Object status) { innerContext.setCustomStatus(status); } + + public boolean isPatched(String patchName) { + return this.innerContext.isPatched(patchName); + } } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java index 73b6cc8168..8ac3789f98 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java @@ -14,7 +14,6 @@ package io.dapr.workflows.runtime; import io.dapr.durabletask.TaskOrchestration; -import io.dapr.durabletask.TaskOrchestrationFactory; import io.dapr.workflows.Workflow; import java.lang.reflect.Constructor; @@ -23,11 +22,12 @@ /** * Wrapper for Durable Task Framework orchestration factory. */ -class WorkflowClassWrapper implements TaskOrchestrationFactory { +class WorkflowClassWrapper extends WorkflowVersionWrapper { private final Constructor workflowConstructor; private final String name; public WorkflowClassWrapper(Class clazz) { + super(); this.name = clazz.getCanonicalName(); try { @@ -39,6 +39,19 @@ public WorkflowClassWrapper(Class clazz) { } } + public WorkflowClassWrapper(String name, Class clazz, String versionName, Boolean isLatestVersion) { + super(versionName, isLatestVersion); + this.name = name; + + try { + this.workflowConstructor = clazz.getDeclaredConstructor(); + } catch (NoSuchMethodException e) { + throw new RuntimeException( + String.format("No constructor found for workflow class '%s'.", this.name), e + ); + } + } + @Override public String getName() { return name; diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java index 77a568a386..4dbd766246 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java @@ -14,13 +14,12 @@ package io.dapr.workflows.runtime; import io.dapr.durabletask.TaskOrchestration; -import io.dapr.durabletask.TaskOrchestrationFactory; import io.dapr.workflows.Workflow; /** * Wrapper for Durable Task Framework orchestration factory. */ -class WorkflowInstanceWrapper implements TaskOrchestrationFactory { +class WorkflowInstanceWrapper extends WorkflowVersionWrapper { private final T workflow; private final String name; @@ -29,6 +28,12 @@ public WorkflowInstanceWrapper(T instance) { this.workflow = instance; } + public WorkflowInstanceWrapper(String name, T instance, String versionName, Boolean isLatestVersion) { + super(versionName, isLatestVersion); + this.name = name; + this.workflow = instance; + } + @Override public String getName() { return name; diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java index 09f831a563..f5d47f3e79 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java @@ -70,6 +70,7 @@ public void start(boolean block) { public void close() { this.shutDownWorkerPool(); this.closeSideCarChannel(); + this.worker.close(); } private void closeSideCarChannel() { diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java index c586387aa9..8b7b2c9def 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java @@ -16,13 +16,14 @@ import io.dapr.config.Properties; import io.dapr.durabletask.DurableTaskGrpcWorkerBuilder; import io.dapr.durabletask.TaskActivityFactory; -import io.dapr.durabletask.TaskOrchestrationFactory; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactory; import io.dapr.utils.NetworkUtils; import io.dapr.workflows.Workflow; import io.dapr.workflows.WorkflowActivity; import io.dapr.workflows.internal.ApiTokenClientInterceptor; import io.grpc.ClientInterceptor; import io.grpc.ManagedChannel; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,11 +116,45 @@ public WorkflowRuntimeBuilder withExecutorService(ExecutorService executorServic * @return the WorkflowRuntimeBuilder */ public WorkflowRuntimeBuilder registerWorkflow(Class clazz) { - this.builder.addOrchestration(new WorkflowClassWrapper<>(clazz)); - this.workflowSet.add(clazz.getCanonicalName()); - this.workflows.add(clazz.getSimpleName()); + return this.registerWorkflow(clazz.getCanonicalName(), clazz, null, null); + } - this.logger.info("Registered Workflow: {}", clazz.getSimpleName()); + /** + * Registers a Workflow object. + * + * @param any Workflow type + * @param name the name of the workflow to register + * @param clazz the class being registered + * @return the WorkflowRuntimeBuilder + */ + public WorkflowRuntimeBuilder registerWorkflow(String name, Class clazz) { + return this.registerWorkflow(name, clazz, null, null); + } + + /** + * Registers a Workflow object. + * + * @param any Workflow type + * @param name the name of the workflow to register + * @param clazz the class being registered + * @param versionName the version name of the workflow + * @param isLatestVersion whether the workflow is the latest version + * @return the WorkflowRuntimeBuilder + */ + public WorkflowRuntimeBuilder registerWorkflow(String name, + Class clazz, + String versionName, + Boolean isLatestVersion) { + this.builder.addOrchestration(new WorkflowClassWrapper<>(name, clazz, versionName, isLatestVersion)); + this.workflowSet.add(name); + this.workflows.add(name); + + if (StringUtils.isEmpty(versionName)) { + this.logger.info("Registered Workflow: {}", clazz.getSimpleName()); + } else { + this.logger.info("Registered Workflow Version: {} {} - isLatest {}", + clazz.getSimpleName(), versionName, isLatestVersion); + } return this; } @@ -133,12 +168,34 @@ public WorkflowRuntimeBuilder registerWorkflow(Class cla */ public WorkflowRuntimeBuilder registerWorkflow(T instance) { Class clazz = (Class) instance.getClass(); + this.registerWorkflow(clazz.getCanonicalName(), instance, null, null); + return this; + } - this.builder.addOrchestration(new WorkflowInstanceWrapper<>(instance)); - this.workflowSet.add(clazz.getCanonicalName()); - this.workflows.add(clazz.getSimpleName()); - - this.logger.info("Registered Workflow: {}", clazz.getSimpleName()); + /** + * Registers a Workflow object. + * + * @param any Workflow type + * @param name the name of the workflow to register + * @param instance the workflow instance being registered + * @param versionName the version name of the workflow + * @param isLatestVersion whether the workflow is the latest version + * @return the WorkflowRuntimeBuilder + */ + public WorkflowRuntimeBuilder registerWorkflow(String name, + T instance, + String versionName, + Boolean isLatestVersion) { + this.builder.addOrchestration(new WorkflowInstanceWrapper<>(name, instance, versionName, isLatestVersion)); + this.workflowSet.add(name); + this.workflows.add(name); + + if (StringUtils.isEmpty(versionName)) { + this.logger.info("Registered Workflow {}: {}", name, instance.getClass()); + } else { + this.logger.info("Registered Workflow Version {}: {} {} - isLatest {}", + name, instance.getClass().getSimpleName(), versionName, isLatestVersion); + } return this; } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowVersionWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowVersionWrapper.java new file mode 100644 index 0000000000..4683ebc4dd --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowVersionWrapper.java @@ -0,0 +1,42 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.runtime; + +import io.dapr.durabletask.orchestration.TaskOrchestrationFactory; + +public abstract class WorkflowVersionWrapper implements TaskOrchestrationFactory { + private final String versionName; + private final Boolean isLatestVersion; + + public WorkflowVersionWrapper() { + this.versionName = ""; + this.isLatestVersion = false; + } + + public WorkflowVersionWrapper(String versionName, Boolean isLatestVersion) { + this.versionName = versionName; + this.isLatestVersion = isLatestVersion; + } + + @Override + public String getVersionName() { + return versionName; + } + + @Override + public Boolean isLatestVersion() { + return isLatestVersion; + } + +} diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java index b6ca38ecbc..22e364f244 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java @@ -140,6 +140,11 @@ public void continueAsNew(Object input, boolean preserveUnprocessedEvents) { public void setCustomStatus(Object status) { } + + @Override + public boolean isPatched(String patchName) { + return false; + } }; context = new DefaultWorkflowContext(mockInnerContext); contextWithClass = new DefaultWorkflowContext(mockInnerContext, testWorkflowContext.getClass()); @@ -419,6 +424,13 @@ public void setCustomStatusWorkflow() { } + @Test + public void testIsPatched() { + context.isPatched("patch"); + verify(mockInnerContext, times(1)).isPatched("patch"); + + } + @Test public void newUuidTest() { context.newUuid(); diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java index fd76cadaf4..8458739a79 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java @@ -18,8 +18,10 @@ import io.dapr.workflows.WorkflowContext; import io.dapr.workflows.WorkflowStub; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrowsExactly; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -33,6 +35,22 @@ public WorkflowStub create() { } } + public static abstract class TestErrorWorkflow implements Workflow { + public TestErrorWorkflow(String s){} + @Override + public WorkflowStub create() { + return WorkflowContext::getInstanceId; + } + } + + public static abstract class TestPrivateWorkflow implements Workflow { + private TestPrivateWorkflow(){} + @Override + public WorkflowStub create() { + return WorkflowContext::getInstanceId; + } + } + @Test public void getName() { WorkflowClassWrapper wrapper = new WorkflowClassWrapper<>(TestWorkflow.class); @@ -53,4 +71,24 @@ public void createWithClass() { verify(mockContext, times(1)).getInstanceId(); } + @Test + public void createWithClassAndVersion() { + TaskOrchestrationContext mockContext = mock(TaskOrchestrationContext.class); + WorkflowClassWrapper wrapper = new WorkflowClassWrapper<>("TestWorkflow", TestWorkflow.class, "v1",false); + when(mockContext.getInstanceId()).thenReturn("uuid"); + wrapper.create().run(mockContext); + verify(mockContext, times(1)).getInstanceId(); + } + + @Test + public void createErrorClassAndVersion() { + assertThrowsExactly(RuntimeException.class, () -> new WorkflowClassWrapper<>(TestErrorWorkflow.class)); + assertThrowsExactly(RuntimeException.class, () -> new WorkflowClassWrapper<>("TestErrorWorkflow", TestErrorWorkflow.class, "v1",false)); + + WorkflowClassWrapper wrapper = new WorkflowClassWrapper<>("TestPrivateWorkflow", TestPrivateWorkflow.class, "v2",false); + TaskOrchestrationContext mockContext = mock(TaskOrchestrationContext.class); + assertThrowsExactly(RuntimeException.class, () -> wrapper.create().run(mockContext)); + + } + } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java index 50a21f0b38..4bb5182cf0 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java @@ -15,7 +15,7 @@ import io.dapr.durabletask.TaskActivity; import io.dapr.durabletask.TaskActivityFactory; import io.dapr.durabletask.TaskOrchestration; -import io.dapr.durabletask.TaskOrchestrationFactory; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactory; import io.dapr.workflows.Workflow; import io.dapr.workflows.WorkflowActivity; import io.dapr.workflows.WorkflowActivityContext; @@ -51,11 +51,24 @@ public void registerValidWorkflowClass() { assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow(TestWorkflow.class)); } + @Test + public void registerValidVersionWorkflowClass() { + assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow("TestWorkflow", TestWorkflow.class,"testWorkflowV1", false)); + assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow("TestWorkflow", TestWorkflow.class,"testWorkflowV2", true)); + } + @Test public void registerValidWorkflowInstance() { assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow(new TestWorkflow())); } + @Test + public void registerValidVersionWorkflowInstance() { + assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow("testWorkflowV1", new TestWorkflow(),"testWorkflowV1", false)); + assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow("testWorkflowV2",new TestWorkflow(),"testWorkflowV2", true)); + } + + @Test public void registerValidWorkflowActivityClass() { assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerActivity(TestActivity.class)); @@ -66,6 +79,8 @@ public void registerValidWorkflowActivityInstance() { assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerActivity(new TestActivity())); } + + @Test public void registerValidTaskActivityFactory() { class A implements WorkflowActivity{ @@ -115,6 +130,16 @@ public TaskOrchestration create() { W w = new W(); return ctx -> w.run(new DefaultWorkflowContext(ctx, w.getClass())); } + + @Override + public String getVersionName() { + return ""; + } + + @Override + public Boolean isLatestVersion() { + return null; + } }); }); @@ -126,6 +151,7 @@ public void buildTest() { try { WorkflowRuntime runtime = new WorkflowRuntimeBuilder().build(); System.out.println("WorkflowRuntime created"); + runtime.close(); } catch (Exception e) { throw new RuntimeException(e); } @@ -140,16 +166,18 @@ public void loggingOutputTest() { Logger testLogger = mock(Logger.class); - assertDoesNotThrow(() -> new WorkflowRuntimeBuilder(testLogger).registerWorkflow(TestWorkflow.class)); - assertDoesNotThrow(() -> new WorkflowRuntimeBuilder(testLogger).registerActivity(TestActivity.class)); +var runtimeBuilder = new WorkflowRuntimeBuilder(testLogger); + assertDoesNotThrow(() -> runtimeBuilder.registerWorkflow(TestWorkflow.class)); + assertDoesNotThrow(() -> runtimeBuilder.registerActivity(TestActivity.class)); - WorkflowRuntimeBuilder workflowRuntimeBuilder = new WorkflowRuntimeBuilder(); + var runtime = runtimeBuilder.build(); - WorkflowRuntime runtime = workflowRuntimeBuilder.build(); verify(testLogger, times(1)) .info(eq("Registered Workflow: {}"), eq("TestWorkflow")); verify(testLogger, times(1)) .info(eq("Registered Activity: {}"), eq("TestActivity")); + + runtime.close(); } } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowVersionWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowVersionWrapperTest.java new file mode 100644 index 0000000000..31cebd5efc --- /dev/null +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowVersionWrapperTest.java @@ -0,0 +1,40 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.runtime; + +import io.dapr.durabletask.TaskOrchestration; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class WorkflowVersionWrapperTest { + + @Test + void getVersionProperties() { + var versionWrapper = new WorkflowVersionWrapper("A",true) { + @Override + public String getName() { + return "demo"; + } + + @Override + public TaskOrchestration create() { + return null; + } + }; + + assertEquals("A",versionWrapper.getVersionName()); + assertEquals(true, versionWrapper.isLatestVersion()); + } +} \ No newline at end of file