diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml index 54910aba53..ca78b3402e 100644 --- a/.github/workflows/validate.yml +++ b/.github/workflows/validate.yml @@ -162,3 +162,7 @@ jobs: working-directory: ./examples run: | mm.py ./src/main/java/io/dapr/examples/querystate/README.md + - name: Validate workflows example + working-directory: ./examples + run: | + mm.py ./src/main/java/io/dapr/examples/workflows/README.md diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000000..fedee965e5 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,541 @@ +# Dapr Workflow Sample + +In this example, we'll use Dapr to test workflow features. + +Visit [the Workflow documentation landing page](https://docs.dapr.io/developing-applications/building-blocks/workflow) for more information. + +This example contains the follow classes: + +* DemoWorkflow: An example of a Dapr Workflow. +* DemoWorkflowClient: This application will start workflows using Dapr. +* DemoWorkflowWorker: An application that registers a workflow to the Dapr workflow runtime engine. It also executes the workflow instance. + +## Pre-requisites + +* [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/). +* Java JDK 11 (or greater): + * [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11) + * [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11) + * [OpenJDK 11](https://jdk.java.net/11/) +* [Apache Maven](https://maven.apache.org/install.html) version 3.x. + +### Checking out the code + +Clone this repository: + +```sh +git clone https://github.com/dapr/java-sdk.git +cd java-sdk +``` + +Then build the Maven project: + +```sh +# make sure you are in the `java-sdk` directory. +mvn install +``` + +Get into the `examples` directory. +```sh +cd examples +``` + +### Initialize Dapr + +Run `dapr init` to initialize Dapr in Self-Hosted Mode if it's not already initialized. + +## Patterns + +Those examples contain the following workflow patterns: +1. [Chaining Pattern](#chaining-pattern) +2. [Fan-out/Fan-in Pattern](#fan-outfan-in-pattern) +3. [Continue As New Pattern](#continue-as-new-pattern) +4. [External Event Pattern](#external-event-pattern) +5. [Sub-workflow Pattern](#sub-workflow-pattern) + +### Chaining Pattern +In the chaining pattern, a sequence of activities executes in a specific order. +In this pattern, the output of one activity is applied to the input of another activity. +The chaining pattern is useful when you need to execute a sequence of activities in a specific order. + +The first Java class is `DemoChainWorker`. Its job is to register an implementation of `DemoChainWorkflow` in Dapr's workflow runtime engine. In the `DemoChainWorker.java` file, you will find the `DemoChainWorker` class and the `main` method. See the code snippet below: +```java +public class DemoChainWorker { + /** + * The main method of this app. + * + * @param args The port the app will listen on. + * @throws Exception An Exception. + */ + public static void main(String[] args) throws Exception { + // Register the Workflow with the builder. + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoChainWorkflow.class); + builder.registerActivity(ToUpperCaseActivity.class); + + // Build and then start the workflow runtime pulling and executing tasks + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("Start workflow runtime"); + runtime.start(); + } + + System.exit(0); + } +} +``` + +The second Java class you want to look at is `DemoChainWorkflow`, it defines the workflow. In this example it chains the activites in order. See the code snippet below: +```java +public class DemoChainWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + + String result = ""; + result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "Tokyo", String.class).await() + ", "; + result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "London", String.class).await() + ", "; + result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "Seattle", String.class).await(); + + ctx.getLogger().info("Workflow finished with result: " + result); + ctx.complete(result); + }; + } +} +``` + +The next Java class you want to look at is `ToUpperCaseActivity`, it defines the logics for a single acitvity, in this case, it converts a string to upper case. See the code snippet below: +```java +public class ToUpperCaseActivity implements WorkflowActivity { + + @Override + public String run(WorkflowActivityContext ctx) { + Logger logger = LoggerFactory.getLogger(ToUpperCaseActivity.class); + logger.info("Starting Chaining Activity: " + ctx.getName()); + + var message = ctx.getInput(String.class); + var newMessage = message.toUpperCase(); + + logger.info("Message Received from input: " + message); + logger.info("Sending message to output: " + newMessage); + + logger.info("Activity returned: " + newMessage); + logger.info("Activity finished"); + + return newMessage; + } +} +``` + +Execute the following script in order to run DemoChainWorker: +```sh +dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chain.DemoChainWorker +``` + +Once running, the logs will start displaying the different steps: First, you can see workflow is starting: +```text +== APP == Start workflow runtime +== APP == Nov 07, 2023 11:03:07 AM com.microsoft.durabletask.DurableTaskGrpcWorker startAndBlock +== APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001. +``` + +Then, execute the following script in order to run DemoChainClient: +```sh +java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chain.DemoChainClient +``` + + + + +Now you can see the worker logs showing the acitvity is invoked in sequnce and the status of each activity: +```text +== APP == 2023-11-07 11:03:14,178 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.chain.DemoChainWorkflow +== APP == 2023-11-07 11:03:14,229 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity +== APP == 2023-11-07 11:03:14,235 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Message Received from input: Tokyo +== APP == 2023-11-07 11:03:14,235 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Sending message to output: TOKYO +== APP == 2023-11-07 11:03:14,266 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity +== APP == 2023-11-07 11:03:14,267 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Message Received from input: London +== APP == 2023-11-07 11:03:14,267 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Sending message to output: LONDON +== APP == 2023-11-07 11:03:14,282 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity +== APP == 2023-11-07 11:03:14,282 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Message Received from input: Seattle +== APP == 2023-11-07 11:03:14,283 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Sending message to output: SEATTLE +== APP == 2023-11-07 11:03:14,298 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Workflow finished with result: TOKYO, LONDON, SEATTLE +``` +and the client logs showing the workflow is started and finished with expected result: +```text +Started a new chaining model workflow with instance ID: 6e4fe69b-689b-4998-b095-d6b52c7d6328 +workflow instance with ID: 6e4fe69b-689b-4998-b095-d6b52c7d6328 completed with result: TOKYO, LONDON, SEATTLE +``` + +### Fan-out/Fan-in Pattern + +In the fan out/fan in pattern, you execute multiple activities in parallel and then wait for all activities to finish. Often, some aggregation work is done on the results that are returned from the activities. + +The `DemoFanInOutWorkflow` class defines the workflow. In this example it executes the activities in parallel and then sums the results. See the code snippet below: +```java +public class DemoFanInOutWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + + + // The input is a list of objects that need to be operated on. + // In this example, inputs are expected to be strings. + List inputs = ctx.getInput(List.class); + + // Fan-out to multiple concurrent activity invocations, each of which does a word count. + List> tasks = inputs.stream() + .map(input -> ctx.callActivity(CountWordsActivity.class.getName(), input.toString(), Integer.class)) + .collect(Collectors.toList()); + + // Fan-in to get the total word count from all the individual activity results. + List allWordCountResults = ctx.allOf(tasks).await(); + int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum(); + + ctx.getLogger().info("Workflow finished with result: " + totalWordCount); + // Save the final result as the orchestration output. + ctx.complete(totalWordCount); + }; + } +} +``` + +The `CountWordsActivity` class defines the logics for a single acitvity, in this case, it counts the words in a string. See the code snippet below: +```java +public class CountWordsActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + Logger logger = LoggerFactory.getLogger(ToUpperCaseActivity.class); + logger.info("Starting Activity: " + ctx.getName()); + + String input = ctx.getInput(String.class); + StringTokenizer tokenizer = new StringTokenizer(input); + int result = tokenizer.countTokens(); + + logger.info("Activity returned: " + result); + logger.info("Activity finished"); + + return result; + } +} +``` + + +Execute the following script in order to run DemoFanInOutWorker: +```sh +dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutWorker +``` + +Execute the following script in order to run DemoFanInOutClient: + +```sh +java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutClient +``` + + +Now you can see the logs from worker: +```text +== APP == 2023-11-07 14:52:03,075 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.faninout.DemoFanInOutWorkflow +== APP == 2023-11-07 14:52:03,144 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity +== APP == 2023-11-07 14:52:03,147 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 2 +== APP == 2023-11-07 14:52:03,148 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished +== APP == 2023-11-07 14:52:03,152 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity +== APP == 2023-11-07 14:52:03,152 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 9 +== APP == 2023-11-07 14:52:03,152 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished +== APP == 2023-11-07 14:52:03,167 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity +== APP == 2023-11-07 14:52:03,167 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 21 +== APP == 2023-11-07 14:52:03,167 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished +== APP == 2023-11-07 14:52:03,170 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity +== APP == 2023-11-07 14:52:03,170 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 17 +== APP == 2023-11-07 14:52:03,170 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished +== APP == 2023-11-07 14:52:03,173 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity +== APP == 2023-11-07 14:52:03,173 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 11 +== APP == 2023-11-07 14:52:03,174 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished +== APP == 2023-11-07 14:52:03,212 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Workflow finished with result: 60 +``` + +and the client: +```text +Started a new fan out/fan in model model workflow with instance ID: 092c1928-b5dd-4576-9468-300bf6aed986 +workflow instance with ID: 092c1928-b5dd-4576-9468-300bf6aed986 completed with result: 60 +``` + +### Continue As New Pattern +`ContinueAsNew` API allows you to restart the workflow with a new input. + +The `DemoContinueAsNewWorkflow` class defines the workflow. It simulates periodic cleanup work that happen every 10 seconds, after previous cleanup has finished. See the code snippet below: +```java +public class DemoContinueAsNewWorkflow extends Workflow { + /* + Compared with a CRON schedule, this periodic workflow example will never overlap. + For example, a CRON schedule that executes a cleanup every hour will execute it at 1:00, 2:00, 3:00 etc. + and could potentially run into overlap issues if the cleanup takes longer than an hour. + In this example, however, if the cleanup takes 30 minutes, and we create a timer for 1 hour between cleanups, + then it will be scheduled at 1:00, 2:30, 4:00, etc. and there is no chance of overlap. + */ + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + + ctx.getLogger().info("call CleanUpActivity to do the clean up"); + ctx.callActivity(CleanUpActivity.class.getName()).await(); + ctx.getLogger().info("CleanUpActivity finished"); + + ctx.getLogger().info("wait 10 seconds for next clean up"); + ctx.createTimer(Duration.ofSeconds(10)).await(); + + // continue the workflow. + ctx.continueAsNew(null); + }; + } +} +``` + +The `CleanUpActivity` class defines the logics for a single acitvity, in this case, it simulates a clean up work. See the code snippet below: +```java +public class CleanUpActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + Logger logger = LoggerFactory.getLogger(CleanUpActivity.class); + logger.info("Starting Activity: " + ctx.getName()); + + logger.info("start clean up work, it may take few seconds to finish..."); + + //Sleeping for 5 seconds to simulate long running operation + try { + TimeUnit.SECONDS.sleep(5); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return "clean up finish."; + } +} +``` + +Once you start the workflow and client using the following commands: +```sh +dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorker +``` +```sh +java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewClient +```` + +You will see the logs from worker showing the `CleanUpActivity` is invoked every 10 seconds after previous one is finished: +```text +== APP == 2023-11-07 14:44:42,004 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow +== APP == 2023-11-07 14:44:42,004 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up +== APP == 2023-11-07 14:44:42,009 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - Starting Activity: io.dapr.examples.workflows.continueasnew.CleanUpActivity +== APP == 2023-11-07 14:44:42,009 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - start clean up work, it may take few seconds to finish... +== APP == 2023-11-07 14:44:47,026 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow +== APP == 2023-11-07 14:44:47,026 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up +== APP == 2023-11-07 14:44:47,030 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - CleanUpActivity finished +== APP == 2023-11-07 14:44:47,030 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - wait 10 seconds for next clean up +== APP == 2023-11-07 14:44:47,033 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - Starting Activity: io.dapr.examples.workflows.continueasnew.CleanUpActivity +== APP == 2023-11-07 14:44:47,033 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - start clean up work, it may take few seconds to finish... +== APP == 2023-11-07 14:44:52,053 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - CleanUpActivity finished +== APP == 2023-11-07 14:44:52,053 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - wait 10 seconds for next clean up +== APP == 2023-11-07 14:44:57,006 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow +== APP == 2023-11-07 14:44:57,006 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up +== APP == 2023-11-07 14:44:57,012 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - Starting Activity: io.dapr.examples.workflows.continueasnew.CleanUpActivity +== APP == 2023-11-07 14:44:57,012 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - start clean up work, it may take few seconds to finish... +== APP == 2023-11-07 14:45:02,017 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow +== APP == 2023-11-07 14:45:02,020 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up +== APP == 2023-11-07 14:45:02,021 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - CleanUpActivity finished +== APP == 2023-11-07 14:45:02,021 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - wait 10 seconds for next clean up +... +``` + +and the client: +```text +Started a new continue-as-new model workflow with instance ID: c853fb93-f0e7-4ad7-ad41-385732386f94 +``` +It will continue to run until you stop the worker. + +### External Event Pattern +In the external event pattern, a workflow is started by an external event. The workflow can then wait for other external events to occur before completing. + +The `DemoExternalEventWorkflow` class defines the workflow. It waits for an external event `Approval` to run the corresponding activity. See the code snippet below: +```java +public class DemoExternalEventWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + + Boolean approved = ctx.waitForExternalEvent("Approval", boolean.class).await(); + if (approved) { + ctx.getLogger().info("approval granted - do the approved action"); + ctx.callActivity(ApproveActivity.class.getName()).await(); + ctx.getLogger().info("approval-activity finished"); + } else { + ctx.getLogger().info("approval denied - send a notification"); + ctx.callActivity(DenyActivity.class.getName()).await(); + ctx.getLogger().info("denied-activity finished"); + } + }; + } +} +``` + +In the `DemoExternalEventClient` class we send out Approval event to tell our workflow to run the approved activity. +```java +client.raiseEvent(instanceId, "Approval", true); +``` + +Start the workflow and client using the following commands: + +ex +```sh +dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventWorker +``` + +```sh +java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventClient +``` + +The worker logs: +```text +== APP == 2023-11-07 16:01:23,279 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.externalevent.DemoExternalEventWorkflow +== APP == 2023-11-07 16:01:23,279 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Waiting for approval... +== APP == 2023-11-07 16:01:23,324 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - approval granted - do the approved action +== APP == 2023-11-07 16:01:23,348 {HH:mm:ss.SSS} [main] INFO i.d.e.w.e.ApproveActivity - Starting Activity: io.dapr.examples.workflows.externalevent.ApproveActivity +== APP == 2023-11-07 16:01:23,348 {HH:mm:ss.SSS} [main] INFO i.d.e.w.e.ApproveActivity - Running approval activity... +== APP == 2023-11-07 16:01:28,410 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - approval-activity finished +``` + +The client log: +```text +Started a new external-event model workflow with instance ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 +workflow instance with ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 completed. +``` + +### Sub-workflow Pattern +The sub-workflow pattern allows you to call a workflow from another workflow. + +The `DemoWorkflow` class defines the workflow. It calls a sub-workflow `DemoSubWorkflow` to do the work. See the code snippet below: +```java +public class DemoWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + + var subWorkflowInput = "Hello Dapr Workflow!"; + ctx.getLogger().info("calling subworkflow with input: " + subWorkflowInput); + + var subWorkflowOutput = + ctx.callSubWorkflow(DemoSubWorkflow.class.getName(), subWorkflowInput, String.class).await(); + + ctx.getLogger().info("subworkflow finished with: " + subWorkflowOutput); + }; + } +} +``` + +The `DemoSubWorkflow` class defines the sub-workflow. It call the activity to do the work and returns the result. See the code snippet below: +```java +public class DemoSubWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting SubWorkflow: " + ctx.getName()); + + var subWorkflowInput = ctx.getInput(String.class); + ctx.getLogger().info("SubWorkflow received input: " + subWorkflowInput); + + ctx.getLogger().info("SubWorkflow is calling Activity: " + ReverseActivity.class.getName()); + String result = ctx.callActivity(ReverseActivity.class.getName(), subWorkflowInput, String.class).await(); + + ctx.getLogger().info("SubWorkflow finished with: " + result); + ctx.complete(result); + }; + } +} +``` + +The `ReverseActivity` class defines the logics for a single acitvity, in this case, it reverses a string. See the code snippet below: +```java +public class ReverseActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + Logger logger = LoggerFactory.getLogger(ReverseActivity.class); + logger.info("Starting Activity: " + ctx.getName()); + + var message = ctx.getInput(String.class); + var newMessage = new StringBuilder(message).reverse().toString(); + + logger.info("Message Received from input: " + message); + logger.info("Sending message to output: " + newMessage); + + logger.info("Activity returned: " + newMessage); + logger.info("Activity finished"); + + return newMessage; + } +} +``` + +Start the workflow and client using the following commands: + +ex +```sh +dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkflowWorker +``` + +```sh +java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkerflowClient +``` + +The log from worker: +```text +== APP == 2023-11-07 20:08:52,521 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.subworkflow.DemoWorkflow +== APP == 2023-11-07 20:08:52,523 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - calling subworkflow with input: Hello Dapr Workflow! +== APP == 2023-11-07 20:08:52,561 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting SubWorkflow: io.dapr.examples.workflows.subworkflow.DemoSubWorkflow +== APP == 2023-11-07 20:08:52,566 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - SubWorkflow received input: Hello Dapr Workflow! +== APP == 2023-11-07 20:08:52,566 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - SubWorkflow is calling Activity: io.dapr.examples.workflows.subworkflow.ReverseActivity +== APP == 2023-11-07 20:08:52,576 {HH:mm:ss.SSS} [main] INFO i.d.e.w.subworkflow.ReverseActivity - Starting Activity: io.dapr.examples.workflows.subworkflow.ReverseActivity +== APP == 2023-11-07 20:08:52,577 {HH:mm:ss.SSS} [main] INFO i.d.e.w.subworkflow.ReverseActivity - Message Received from input: Hello Dapr Workflow! +== APP == 2023-11-07 20:08:52,577 {HH:mm:ss.SSS} [main] INFO i.d.e.w.subworkflow.ReverseActivity - Sending message to output: !wolfkroW rpaD olleH +== APP == 2023-11-07 20:08:52,596 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - SubWorkflow finished with: !wolfkroW rpaD olleH +== APP == 2023-11-07 20:08:52,611 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - subworkflow finished with: !wolfkroW rpaD olleH +``` + +The log from client: +```text +Started a new sub-workflow model workflow with instance ID: c2fb9c83-435b-4b55-bdf1-833b39366cfb +workflow instance with ID: c2fb9c83-435b-4b55-bdf1-833b39366cfb completed with result: !wolfkroW rpaD olleH +``` \ No newline at end of file diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoActivityInput.java b/examples/src/main/java/io/dapr/examples/workflows/DemoActivityInput.java deleted file mode 100644 index 5b0675b9f0..0000000000 --- a/examples/src/main/java/io/dapr/examples/workflows/DemoActivityInput.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2023 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.examples.workflows; - -public class DemoActivityInput { - - private String message; - - public DemoActivityInput() { - } - - public DemoActivityInput(String message) { - this.message = message; - } - - public String getMessage() { - return message; - } - - public void setMessage(String message) { - this.message = message; - } -} diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoActivityOutput.java b/examples/src/main/java/io/dapr/examples/workflows/DemoActivityOutput.java deleted file mode 100644 index 8d060419c3..0000000000 --- a/examples/src/main/java/io/dapr/examples/workflows/DemoActivityOutput.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2023 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.examples.workflows; - -public class DemoActivityOutput { - - private String originalMessage; - private String newMessage; - - public DemoActivityOutput() { - } - - public DemoActivityOutput(String originalMessage, String newMessage) { - this.originalMessage = originalMessage; - this.newMessage = newMessage; - } - - public String getOriginalMessage() { - return originalMessage; - } - - public void setOriginalMessage(String originalMessage) { - this.originalMessage = originalMessage; - } - - public String getNewMessage() { - return newMessage; - } - - public void setNewMessage(String newMessage) { - this.newMessage = newMessage; - } -} diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java deleted file mode 100644 index 3552feb5f1..0000000000 --- a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright 2023 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.examples.workflows; - -import com.microsoft.durabletask.CompositeTaskFailedException; -import com.microsoft.durabletask.Task; -import com.microsoft.durabletask.TaskCanceledException; -import io.dapr.workflows.Workflow; -import io.dapr.workflows.WorkflowStub; - -import java.time.Duration; -import java.util.Arrays; -import java.util.List; - -/** - * Implementation of the DemoWorkflow for the server side. - */ -public class DemoWorkflow extends Workflow { - @Override - public WorkflowStub create() { - return ctx -> { - ctx.getLogger().info("Starting Workflow: " + ctx.getName()); - ctx.getLogger().info("Instance ID: " + ctx.getInstanceId()); - ctx.getLogger().info("Current Orchestration Time: " + ctx.getCurrentInstant()); - ctx.getLogger().info("Waiting for event: 'TimedOutEvent'..."); - try { - ctx.waitForExternalEvent("TimedOutEvent", Duration.ofSeconds(10)).await(); - } catch (TaskCanceledException e) { - ctx.getLogger().warn("Timed out"); - ctx.getLogger().warn(e.getMessage()); - } - - ctx.getLogger().info("Waiting for event: 'TestEvent'..."); - try { - ctx.waitForExternalEvent("TestEvent", Duration.ofSeconds(10)).await(); - ctx.getLogger().info("Received TestEvent"); - } catch (TaskCanceledException e) { - ctx.getLogger().warn("Timed out"); - ctx.getLogger().warn(e.getMessage()); - } - - ctx.getLogger().info("Parallel Execution - Waiting for all tasks to finish..."); - try { - Task t1 = ctx.waitForExternalEvent("event1", Duration.ofSeconds(5), String.class); - Task t2 = ctx.waitForExternalEvent("event2", Duration.ofSeconds(5), String.class); - Task t3 = ctx.waitForExternalEvent("event3", Duration.ofSeconds(5), String.class); - - List results = ctx.allOf(Arrays.asList(t1, t2, t3)).await(); - results.forEach(t -> ctx.getLogger().info("finished task: " + t)); - ctx.getLogger().info("All tasks finished!"); - - } catch (CompositeTaskFailedException e) { - ctx.getLogger().warn(e.getMessage()); - List exceptions = e.getExceptions(); - exceptions.forEach(ex -> ctx.getLogger().warn(ex.getMessage())); - } - - ctx.getLogger().info("Parallel Execution - Waiting for any task to finish..."); - try { - Task e1 = ctx.waitForExternalEvent("e1", Duration.ofSeconds(5), String.class); - Task e2 = ctx.waitForExternalEvent("e2", Duration.ofSeconds(5), String.class); - Task e3 = ctx.waitForExternalEvent("e3", Duration.ofSeconds(5), String.class); - Task timeoutTask = ctx.createTimer(Duration.ofSeconds(1)); - - Task winner = ctx.anyOf(Arrays.asList(e1, e2, e3, timeoutTask)).await(); - if (winner == timeoutTask) { - ctx.getLogger().info("All tasks timed out!"); - } else { - ctx.getLogger().info("One of the tasks finished!"); - } - } catch (TaskCanceledException e) { - ctx.getLogger().warn("Timed out"); - ctx.getLogger().warn(e.getMessage()); - } - - ctx.getLogger().info("Calling Activity..."); - var input = new DemoActivityInput("Hello Activity!"); - var output = ctx.callActivity(DemoWorkflowActivity.class.getName(), input, DemoActivityOutput.class).await(); - - ctx.getLogger().info("Activity returned: " + output); - ctx.getLogger().info("Activity returned: " + output.getNewMessage()); - ctx.getLogger().info("Activity returned: " + output.getOriginalMessage()); - - - boolean shouldComplete = true; - ctx.getLogger().info("Waiting for event: 'RestartEvent'..."); - try { - ctx.waitForExternalEvent("RestartEvent", Duration.ofSeconds(10)).await(); - ctx.getLogger().info("Received RestartEvent"); - ctx.getLogger().info("Restarting Workflow by calling continueAsNew..."); - ctx.continueAsNew("TestInputRestart", false); - shouldComplete = false; - } catch (TaskCanceledException e) { - ctx.getLogger().warn("Restart Timed out"); - ctx.getLogger().warn(e.getMessage()); - } - - if (shouldComplete) { - ctx.getLogger().info("Child-Workflow> Calling ChildWorkflow..."); - var childWorkflowInput = "Hello ChildWorkflow!"; - var childWorkflowOutput = - ctx.callSubWorkflow(DemoSubWorkflow.class.getName(), childWorkflowInput, String.class).await(); - - ctx.getLogger().info("Child-Workflow> returned: " + childWorkflowOutput); - - ctx.getLogger().info("Workflow finished"); - ctx.complete("finished"); - - return; - } - - ctx.getLogger().info("Workflow restarted"); - }; - } -} diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java b/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java deleted file mode 100644 index 3aae026698..0000000000 --- a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright 2023 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.examples.workflows; - -import io.dapr.workflows.client.DaprWorkflowClient; -import io.dapr.workflows.client.WorkflowInstanceStatus; - -import java.time.Duration; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * For setup instructions, see the README. - */ -public class DemoWorkflowClient { - - /** - * The main method. - * - * @param args Input arguments (unused). - * @throws InterruptedException If program has been interrupted. - */ - public static void main(String[] args) throws InterruptedException { - DaprWorkflowClient client = new DaprWorkflowClient(); - - try (client) { - String separatorStr = "*******"; - System.out.println(separatorStr); - String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class, "input data"); - System.out.printf("Started new workflow instance with random ID: %s%n", instanceId); - - System.out.println(separatorStr); - System.out.println("**GetInstanceMetadata:Running Workflow**"); - WorkflowInstanceStatus workflowMetadata = client.getInstanceState(instanceId, true); - System.out.printf("Result: %s%n", workflowMetadata); - - System.out.println(separatorStr); - System.out.println("**WaitForInstanceStart**"); - try { - WorkflowInstanceStatus waitForInstanceStartResult = - client.waitForInstanceStart(instanceId, Duration.ofSeconds(60), true); - System.out.printf("Result: %s%n", waitForInstanceStartResult); - } catch (TimeoutException ex) { - System.out.printf("waitForInstanceStart has an exception:%s%n", ex); - } - - System.out.println(separatorStr); - System.out.println("**SendExternalMessage**"); - client.raiseEvent(instanceId, "TestEvent", "TestEventPayload"); - - System.out.println(separatorStr); - System.out.println("** Registering parallel Events to be captured by allOf(t1,t2,t3) **"); - client.raiseEvent(instanceId, "event1", "TestEvent 1 Payload"); - client.raiseEvent(instanceId, "event2", "TestEvent 2 Payload"); - client.raiseEvent(instanceId, "event3", "TestEvent 3 Payload"); - System.out.printf("Events raised for workflow with instanceId: %s\n", instanceId); - - System.out.println(separatorStr); - System.out.println("** Registering Event to be captured by anyOf(t1,t2,t3) **"); - client.raiseEvent(instanceId, "e2", "event 2 Payload"); - System.out.printf("Event raised for workflow with instanceId: %s\n", instanceId); - - - System.out.println(separatorStr); - System.out.println("**WaitForInstanceCompletion**"); - try { - WorkflowInstanceStatus waitForInstanceCompletionResult = - client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(60), true); - System.out.printf("Result: %s%n", waitForInstanceCompletionResult); - } catch (TimeoutException ex) { - System.out.printf("waitForInstanceCompletion has an exception:%s%n", ex); - } - - System.out.println(separatorStr); - System.out.println("**purgeInstance**"); - boolean purgeResult = client.purgeInstance(instanceId); - System.out.printf("purgeResult: %s%n", purgeResult); - - System.out.println(separatorStr); - System.out.println("**raiseEvent**"); - - String eventInstanceId = client.scheduleNewWorkflow(DemoWorkflow.class); - System.out.printf("Started new workflow instance with random ID: %s%n", eventInstanceId); - client.raiseEvent(eventInstanceId, "TestException", null); - System.out.printf("Event raised for workflow with instanceId: %s\n", eventInstanceId); - - System.out.println(separatorStr); - String instanceToTerminateId = "terminateMe"; - client.scheduleNewWorkflow(DemoWorkflow.class, null, instanceToTerminateId); - System.out.printf("Started new workflow instance with specified ID: %s%n", instanceToTerminateId); - - TimeUnit.SECONDS.sleep(5); - System.out.println("Terminate this workflow instance manually before the timeout is reached"); - client.terminateWorkflow(instanceToTerminateId, null); - System.out.println(separatorStr); - - String restartingInstanceId = "restarting"; - client.scheduleNewWorkflow(DemoWorkflow.class, null, restartingInstanceId); - System.out.printf("Started new workflow instance with ID: %s%n", restartingInstanceId); - System.out.println("Sleeping 30 seconds to restart the workflow"); - TimeUnit.SECONDS.sleep(30); - - System.out.println("**SendExternalMessage: RestartEvent**"); - client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload"); - - System.out.println("Sleeping 30 seconds to terminate the eternal workflow"); - TimeUnit.SECONDS.sleep(30); - client.terminateWorkflow(restartingInstanceId, null); - } - - System.out.println("Exiting DemoWorkflowClient."); - System.exit(0); - } -} diff --git a/examples/src/main/java/io/dapr/examples/workflows/README.md b/examples/src/main/java/io/dapr/examples/workflows/README.md index d24d3f74a9..0dac908e3b 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/README.md +++ b/examples/src/main/java/io/dapr/examples/workflows/README.md @@ -44,38 +44,519 @@ cd examples Run `dapr init` to initialize Dapr in Self-Hosted Mode if it's not already initialized. -### Running the demo Workflow worker +## Patterns -The first Java class to consider is `DemoWorkflowWorker`. It's job is to register an implementation of `DemoWorkflow` in Dapr's workflow runtime engine. In the `DemoWorkflowWorker.java` file, you will find the `DemoWorkflowWorker` class and the `main` method. See the code snippet below: +Those examples contain the following workflow patterns: +1. [Chaining Pattern](#chaining-pattern) +2. [Fan-out/Fan-in Pattern](#fan-outfan-in-pattern) +3. [Continue As New Pattern](#continue-as-new-pattern) +4. [External Event Pattern](#external-event-pattern) +5. [Sub-workflow Pattern](#sub-workflow-pattern) -```java -public class DemoWorkflowWorker { +### Chaining Pattern +In the chaining pattern, a sequence of activities executes in a specific order. +In this pattern, the output of one activity is applied to the input of another activity. +The chaining pattern is useful when you need to execute a sequence of activities in a specific order. +The first Java class is `DemoChainWorker`. Its job is to register an implementation of `DemoChainWorkflow` in Dapr's workflow runtime engine. In the `DemoChainWorker.java` file, you will find the `DemoChainWorker` class and the `main` method. See the code snippet below: +```java +public class DemoChainWorker { + /** + * The main method of this app. + * + * @param args The port the app will listen on. + * @throws Exception An Exception. + */ public static void main(String[] args) throws Exception { - // Register the Workflow with the runtime. - WorkflowRuntime.getInstance().registerWorkflow(DemoWorkflow.class); - System.out.println("Start workflow runtime"); - WorkflowRuntime.getInstance().startAndBlock(); + // Register the Workflow with the builder. + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoChainWorkflow.class); + builder.registerActivity(ToUpperCaseActivity.class); + + // Build and then start the workflow runtime pulling and executing tasks + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("Start workflow runtime"); + runtime.start(); + } + System.exit(0); } } ``` -This application uses `WorkflowRuntime.getInstance().registerWorkflow()` in order to register `DemoWorkflow` as a Workflow in the Dapr Workflow runtime. +The second Java class you want to look at is `DemoChainWorkflow`, it defines the workflow. In this example it chains the activites in order. See the code snippet below: +```java +public class DemoChainWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + + String result = ""; + result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "Tokyo", String.class).await() + ", "; + result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "London", String.class).await() + ", "; + result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "Seattle", String.class).await(); + + ctx.getLogger().info("Workflow finished with result: " + result); + ctx.complete(result); + }; + } +} +``` + +The next Java class you want to look at is `ToUpperCaseActivity`, it defines the logics for a single acitvity, in this case, it converts a string to upper case. See the code snippet below: +```java +public class ToUpperCaseActivity implements WorkflowActivity { + + @Override + public String run(WorkflowActivityContext ctx) { + Logger logger = LoggerFactory.getLogger(ToUpperCaseActivity.class); + logger.info("Starting Chaining Activity: " + ctx.getName()); + + var message = ctx.getInput(String.class); + var newMessage = message.toUpperCase(); + + logger.info("Message Received from input: " + message); + logger.info("Sending message to output: " + newMessage); + + logger.info("Activity returned: " + newMessage); + logger.info("Activity finished"); + + return newMessage; + } +} +``` + +Execute the following script in order to run DemoChainWorker: +```sh +dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chain.DemoChainWorker +``` +Once running, the logs will start displaying the different steps: First, you can see workflow is starting: +```text +== APP == Start workflow runtime +== APP == Nov 07, 2023 11:03:07 AM com.microsoft.durabletask.DurableTaskGrpcWorker startAndBlock +== APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001. +``` + +Then, execute the following script in order to run DemoChainClient: +```sh +java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chain.DemoChainClient +``` + + +Now you can see the worker logs showing the acitvity is invoked in sequnce and the status of each activity: +```text +== APP == 2023-11-07 11:03:14,178 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.chain.DemoChainWorkflow +== APP == 2023-11-07 11:03:14,229 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity +== APP == 2023-11-07 11:03:14,235 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Message Received from input: Tokyo +== APP == 2023-11-07 11:03:14,235 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Sending message to output: TOKYO +== APP == 2023-11-07 11:03:14,266 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity +== APP == 2023-11-07 11:03:14,267 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Message Received from input: London +== APP == 2023-11-07 11:03:14,267 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Sending message to output: LONDON +== APP == 2023-11-07 11:03:14,282 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity +== APP == 2023-11-07 11:03:14,282 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Message Received from input: Seattle +== APP == 2023-11-07 11:03:14,283 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Sending message to output: SEATTLE +== APP == 2023-11-07 11:03:14,298 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Workflow finished with result: TOKYO, LONDON, SEATTLE +``` +and the client logs showing the workflow is started and finished with expected result: +```text +Started a new chaining model workflow with instance ID: 6e4fe69b-689b-4998-b095-d6b52c7d6328 +workflow instance with ID: 6e4fe69b-689b-4998-b095-d6b52c7d6328 completed with result: TOKYO, LONDON, SEATTLE +``` + +### Fan-out/Fan-in Pattern + +In the fan out/fan in pattern, you execute multiple activities in parallel and then wait for all activities to finish. Often, some aggregation work is done on the results that are returned from the activities. + +The `DemoFanInOutWorkflow` class defines the workflow. In this example it executes the activities in parallel and then sums the results. See the code snippet below: +```java +public class DemoFanInOutWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + + + // The input is a list of objects that need to be operated on. + // In this example, inputs are expected to be strings. + List inputs = ctx.getInput(List.class); + + // Fan-out to multiple concurrent activity invocations, each of which does a word count. + List> tasks = inputs.stream() + .map(input -> ctx.callActivity(CountWordsActivity.class.getName(), input.toString(), Integer.class)) + .collect(Collectors.toList()); + + // Fan-in to get the total word count from all the individual activity results. + List allWordCountResults = ctx.allOf(tasks).await(); + int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum(); + + ctx.getLogger().info("Workflow finished with result: " + totalWordCount); + // Save the final result as the orchestration output. + ctx.complete(totalWordCount); + }; + } +} +``` + +The `CountWordsActivity` class defines the logics for a single acitvity, in this case, it counts the words in a string. See the code snippet below: +```java +public class CountWordsActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + Logger logger = LoggerFactory.getLogger(ToUpperCaseActivity.class); + logger.info("Starting Activity: " + ctx.getName()); + + String input = ctx.getInput(String.class); + StringTokenizer tokenizer = new StringTokenizer(input); + int result = tokenizer.countTokens(); + + logger.info("Activity returned: " + result); + logger.info("Activity finished"); + + return result; + } +} +``` + + +Execute the following script in order to run DemoFanInOutWorker: +```sh +dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutWorker +``` +Execute the following script in order to run DemoFanInOutClient: +```sh +java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutClient +``` + + +Now you can see the logs from worker: +```text +== APP == 2023-11-07 14:52:03,075 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.faninout.DemoFanInOutWorkflow +== APP == 2023-11-07 14:52:03,144 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity +== APP == 2023-11-07 14:52:03,147 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 2 +== APP == 2023-11-07 14:52:03,148 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished +== APP == 2023-11-07 14:52:03,152 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity +== APP == 2023-11-07 14:52:03,152 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 9 +== APP == 2023-11-07 14:52:03,152 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished +== APP == 2023-11-07 14:52:03,167 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity +== APP == 2023-11-07 14:52:03,167 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 21 +== APP == 2023-11-07 14:52:03,167 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished +== APP == 2023-11-07 14:52:03,170 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity +== APP == 2023-11-07 14:52:03,170 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 17 +== APP == 2023-11-07 14:52:03,170 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished +== APP == 2023-11-07 14:52:03,173 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity +== APP == 2023-11-07 14:52:03,173 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 11 +== APP == 2023-11-07 14:52:03,174 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished +== APP == 2023-11-07 14:52:03,212 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Workflow finished with result: 60 +``` + +and the client: +```text +Started a new fan out/fan in model model workflow with instance ID: 092c1928-b5dd-4576-9468-300bf6aed986 +workflow instance with ID: 092c1928-b5dd-4576-9468-300bf6aed986 completed with result: 60 +``` + +### Continue As New Pattern +`ContinueAsNew` API allows you to restart the workflow with a new input. + +The `DemoContinueAsNewWorkflow` class defines the workflow. It simulates periodic cleanup work that happen every 10 seconds, after previous cleanup has finished. See the code snippet below: +```java +public class DemoContinueAsNewWorkflow extends Workflow { + /* + Compared with a CRON schedule, this periodic workflow example will never overlap. + For example, a CRON schedule that executes a cleanup every hour will execute it at 1:00, 2:00, 3:00 etc. + and could potentially run into overlap issues if the cleanup takes longer than an hour. + In this example, however, if the cleanup takes 30 minutes, and we create a timer for 1 hour between cleanups, + then it will be scheduled at 1:00, 2:30, 4:00, etc. and there is no chance of overlap. + */ + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + + ctx.getLogger().info("call CleanUpActivity to do the clean up"); + ctx.callActivity(CleanUpActivity.class.getName()).await(); + ctx.getLogger().info("CleanUpActivity finished"); + + ctx.getLogger().info("wait 10 seconds for next clean up"); + ctx.createTimer(Duration.ofSeconds(10)).await(); + + // continue the workflow. + ctx.continueAsNew(null); + }; + } +} +``` + +The `CleanUpActivity` class defines the logics for a single acitvity, in this case, it simulates a clean up work. See the code snippet below: +```java +public class CleanUpActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + Logger logger = LoggerFactory.getLogger(CleanUpActivity.class); + logger.info("Starting Activity: " + ctx.getName()); -The `WorkflowRuntime.getInstance().start()` method will build and start the engine within the Dapr workflow runtime. + logger.info("start clean up work, it may take few seconds to finish..."); + + //Sleeping for 5 seconds to simulate long running operation + try { + TimeUnit.SECONDS.sleep(5); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return "clean up finish."; + } +} +``` -Now, execute the following script in order to run DemoWorkflowWorker: +Once you start the workflow and client using the following commands: ```sh -dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.DemoWorkflowWorker +dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorker +``` +```sh +java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewClient +```` + +You will see the logs from worker showing the `CleanUpActivity` is invoked every 10 seconds after previous one is finished: +```text +== APP == 2023-11-07 14:44:42,004 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow +== APP == 2023-11-07 14:44:42,004 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up +== APP == 2023-11-07 14:44:42,009 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - Starting Activity: io.dapr.examples.workflows.continueasnew.CleanUpActivity +== APP == 2023-11-07 14:44:42,009 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - start clean up work, it may take few seconds to finish... +== APP == 2023-11-07 14:44:47,026 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow +== APP == 2023-11-07 14:44:47,026 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up +== APP == 2023-11-07 14:44:47,030 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - CleanUpActivity finished +== APP == 2023-11-07 14:44:47,030 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - wait 10 seconds for next clean up +== APP == 2023-11-07 14:44:47,033 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - Starting Activity: io.dapr.examples.workflows.continueasnew.CleanUpActivity +== APP == 2023-11-07 14:44:47,033 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - start clean up work, it may take few seconds to finish... +== APP == 2023-11-07 14:44:52,053 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - CleanUpActivity finished +== APP == 2023-11-07 14:44:52,053 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - wait 10 seconds for next clean up +== APP == 2023-11-07 14:44:57,006 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow +== APP == 2023-11-07 14:44:57,006 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up +== APP == 2023-11-07 14:44:57,012 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - Starting Activity: io.dapr.examples.workflows.continueasnew.CleanUpActivity +== APP == 2023-11-07 14:44:57,012 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - start clean up work, it may take few seconds to finish... +== APP == 2023-11-07 14:45:02,017 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow +== APP == 2023-11-07 14:45:02,020 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up +== APP == 2023-11-07 14:45:02,021 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - CleanUpActivity finished +== APP == 2023-11-07 14:45:02,021 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - wait 10 seconds for next clean up +... ``` -### Running the Workflow client +and the client: +```text +Started a new continue-as-new model workflow with instance ID: c853fb93-f0e7-4ad7-ad41-385732386f94 +``` +It will continue to run until you stop the worker. -The `DemoWorkflowClient` starts instances of workflows that have been registered with Dapr. +### External Event Pattern +In the external event pattern, a workflow is started by an external event. The workflow can then wait for other external events to occur before completing. -With the DemoWorkflowWorker running, use the follow command to start the workflow with the DemoWorkflowClient: +The `DemoExternalEventWorkflow` class defines the workflow. It waits for an external event `Approval` to run the corresponding activity. See the code snippet below: +```java +public class DemoExternalEventWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + + Boolean approved = ctx.waitForExternalEvent("Approval", boolean.class).await(); + if (approved) { + ctx.getLogger().info("approval granted - do the approved action"); + ctx.callActivity(ApproveActivity.class.getName()).await(); + ctx.getLogger().info("approval-activity finished"); + } else { + ctx.getLogger().info("approval denied - send a notification"); + ctx.callActivity(DenyActivity.class.getName()).await(); + ctx.getLogger().info("denied-activity finished"); + } + }; + } +} +``` + +In the `DemoExternalEventClient` class we send out Approval event to tell our workflow to run the approved activity. +```java +client.raiseEvent(instanceId, "Approval", true); +``` + +Start the workflow and client using the following commands: + + +```sh +dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventWorker +``` ```sh -java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.DemoWorkflowClient +java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventClient ``` + + +The worker logs: +```text +== APP == 2023-11-07 16:01:23,279 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.externalevent.DemoExternalEventWorkflow +== APP == 2023-11-07 16:01:23,279 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Waiting for approval... +== APP == 2023-11-07 16:01:23,324 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - approval granted - do the approved action +== APP == 2023-11-07 16:01:23,348 {HH:mm:ss.SSS} [main] INFO i.d.e.w.e.ApproveActivity - Starting Activity: io.dapr.examples.workflows.externalevent.ApproveActivity +== APP == 2023-11-07 16:01:23,348 {HH:mm:ss.SSS} [main] INFO i.d.e.w.e.ApproveActivity - Running approval activity... +== APP == 2023-11-07 16:01:28,410 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - approval-activity finished +``` + +The client log: +```text +Started a new external-event model workflow with instance ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 +workflow instance with ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 completed. +``` + +### Sub-workflow Pattern +The sub-workflow pattern allows you to call a workflow from another workflow. + +The `DemoWorkflow` class defines the workflow. It calls a sub-workflow `DemoSubWorkflow` to do the work. See the code snippet below: +```java +public class DemoWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + + var subWorkflowInput = "Hello Dapr Workflow!"; + ctx.getLogger().info("calling subworkflow with input: " + subWorkflowInput); + + var subWorkflowOutput = + ctx.callSubWorkflow(DemoSubWorkflow.class.getName(), subWorkflowInput, String.class).await(); + + ctx.getLogger().info("subworkflow finished with: " + subWorkflowOutput); + }; + } +} +``` + +The `DemoSubWorkflow` class defines the sub-workflow. It call the activity to do the work and returns the result. See the code snippet below: +```java +public class DemoSubWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting SubWorkflow: " + ctx.getName()); + + var subWorkflowInput = ctx.getInput(String.class); + ctx.getLogger().info("SubWorkflow received input: " + subWorkflowInput); + + ctx.getLogger().info("SubWorkflow is calling Activity: " + ReverseActivity.class.getName()); + String result = ctx.callActivity(ReverseActivity.class.getName(), subWorkflowInput, String.class).await(); + + ctx.getLogger().info("SubWorkflow finished with: " + result); + ctx.complete(result); + }; + } +} +``` + +The `ReverseActivity` class defines the logics for a single acitvity, in this case, it reverses a string. See the code snippet below: +```java +public class ReverseActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + Logger logger = LoggerFactory.getLogger(ReverseActivity.class); + logger.info("Starting Activity: " + ctx.getName()); + + var message = ctx.getInput(String.class); + var newMessage = new StringBuilder(message).reverse().toString(); + + logger.info("Message Received from input: " + message); + logger.info("Sending message to output: " + newMessage); + + logger.info("Activity returned: " + newMessage); + logger.info("Activity finished"); + + return newMessage; + } +} +``` + +Start the workflow and client using the following commands: + + +```sh +dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkflowWorker +``` + +```sh +java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkerflowClient +``` + + +The log from worker: +```text +== APP == 2023-11-07 20:08:52,521 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.subworkflow.DemoWorkflow +== APP == 2023-11-07 20:08:52,523 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - calling subworkflow with input: Hello Dapr Workflow! +== APP == 2023-11-07 20:08:52,561 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting SubWorkflow: io.dapr.examples.workflows.subworkflow.DemoSubWorkflow +== APP == 2023-11-07 20:08:52,566 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - SubWorkflow received input: Hello Dapr Workflow! +== APP == 2023-11-07 20:08:52,566 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - SubWorkflow is calling Activity: io.dapr.examples.workflows.subworkflow.ReverseActivity +== APP == 2023-11-07 20:08:52,576 {HH:mm:ss.SSS} [main] INFO i.d.e.w.subworkflow.ReverseActivity - Starting Activity: io.dapr.examples.workflows.subworkflow.ReverseActivity +== APP == 2023-11-07 20:08:52,577 {HH:mm:ss.SSS} [main] INFO i.d.e.w.subworkflow.ReverseActivity - Message Received from input: Hello Dapr Workflow! +== APP == 2023-11-07 20:08:52,577 {HH:mm:ss.SSS} [main] INFO i.d.e.w.subworkflow.ReverseActivity - Sending message to output: !wolfkroW rpaD olleH +== APP == 2023-11-07 20:08:52,596 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - SubWorkflow finished with: !wolfkroW rpaD olleH +== APP == 2023-11-07 20:08:52,611 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - subworkflow finished with: !wolfkroW rpaD olleH +``` + +The log from client: +```text +Started a new sub-workflow model workflow with instance ID: c2fb9c83-435b-4b55-bdf1-833b39366cfb +workflow instance with ID: c2fb9c83-435b-4b55-bdf1-833b39366cfb completed with result: !wolfkroW rpaD olleH +``` \ No newline at end of file diff --git a/examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainClient.java b/examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainClient.java new file mode 100644 index 0000000000..810a365c34 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainClient.java @@ -0,0 +1,44 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.chain; + +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.WorkflowInstanceStatus; + +import java.time.Duration; +import java.util.concurrent.TimeoutException; + +public class DemoChainClient { + /** + * The main method to start the client. + * + * @param args Input arguments (unused). + * @throws InterruptedException If program has been interrupted. + */ + public static void main(String[] args) { + try (DaprWorkflowClient client = new DaprWorkflowClient()) { + String instanceId = client.scheduleNewWorkflow(DemoChainWorkflow.class); + System.out.printf("Started a new chaining model workflow with instance ID: %s%n", instanceId); + WorkflowInstanceStatus workflowInstanceStatus = + client.waitForInstanceCompletion(instanceId, null, true); + + String result = workflowInstanceStatus.readOutputAs(String.class); + System.out.printf("workflow instance with ID: %s completed with result: %s%n", instanceId, result); + + + } catch (TimeoutException | InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java b/examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainWorker.java similarity index 81% rename from examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java rename to examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainWorker.java index 21bd01052e..61b1572bda 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java +++ b/examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainWorker.java @@ -1,43 +1,37 @@ -/* - * Copyright 2023 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.examples.workflows; - -import io.dapr.workflows.runtime.WorkflowRuntime; -import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; - -/** - * For setup instructions, see the README. - */ -public class DemoWorkflowWorker { - - /** - * The main method of this app. - * - * @param args The port the app will listen on. - * @throws Exception An Exception. - */ - public static void main(String[] args) throws Exception { - // Register the Workflow with the builder. - WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoWorkflow.class); - builder.registerActivity(DemoWorkflowActivity.class); - - // Build and then start the workflow runtime pulling and executing tasks - try (WorkflowRuntime runtime = builder.build()) { - System.out.println("Start workflow runtime"); - runtime.start(); - } - - System.exit(0); - } -} +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.chain; + +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class DemoChainWorker { + /** + * The main method of this app. + * + * @param args The port the app will listen on. + * @throws Exception An Exception. + */ + public static void main(String[] args) throws Exception { + // Register the Workflow with the builder. + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoChainWorkflow.class); + builder.registerActivity(ToUpperCaseActivity.class); + + // Build and then start the workflow runtime pulling and executing tasks + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("Start workflow runtime"); + runtime.start(); + } + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainWorkflow.java b/examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainWorkflow.java new file mode 100644 index 0000000000..8bf8b7bef1 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainWorkflow.java @@ -0,0 +1,34 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.chain; + +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; + +public class DemoChainWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + + String result = ""; + result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "Tokyo", String.class).await() + ", "; + result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "London", String.class).await() + ", "; + result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "Seattle", String.class).await(); + + ctx.getLogger().info("Workflow finished with result: " + result); + ctx.complete(result); + }; + } +} \ No newline at end of file diff --git a/examples/src/main/java/io/dapr/examples/workflows/chain/ToUpperCaseActivity.java b/examples/src/main/java/io/dapr/examples/workflows/chain/ToUpperCaseActivity.java new file mode 100644 index 0000000000..f5313df4b4 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/chain/ToUpperCaseActivity.java @@ -0,0 +1,37 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.chain; + +import io.dapr.workflows.runtime.WorkflowActivity; +import io.dapr.workflows.runtime.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ToUpperCaseActivity implements WorkflowActivity { + + @Override + public Object run(WorkflowActivityContext ctx) { + Logger logger = LoggerFactory.getLogger(ToUpperCaseActivity.class); + logger.info("Starting Activity: " + ctx.getName()); + + var message = ctx.getInput(String.class); + var newMessage = message.toUpperCase(); + + logger.info("Message Received from input: " + message); + logger.info("Sending message to output: " + newMessage); + + return newMessage; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/continueasnew/CleanUpActivity.java b/examples/src/main/java/io/dapr/examples/workflows/continueasnew/CleanUpActivity.java new file mode 100644 index 0000000000..d2eef0dd86 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/continueasnew/CleanUpActivity.java @@ -0,0 +1,39 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.continueasnew; + +import io.dapr.workflows.runtime.WorkflowActivity; +import io.dapr.workflows.runtime.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class CleanUpActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + Logger logger = LoggerFactory.getLogger(CleanUpActivity.class); + logger.info("Starting Activity: " + ctx.getName()); + + logger.info("start clean up work, it may take few seconds to finish..."); + + //Sleeping for 5 seconds to simulate long running operation + try { + TimeUnit.SECONDS.sleep(5); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return "clean up finish."; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewClient.java b/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewClient.java new file mode 100644 index 0000000000..dbafb2ebb7 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewClient.java @@ -0,0 +1,38 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.continueasnew; + +import io.dapr.workflows.client.DaprWorkflowClient; +import java.util.concurrent.TimeoutException; + +public class DemoContinueAsNewClient { + /** + * The main method to start the client. + * + * @param args Input arguments (unused). + * @throws InterruptedException If program has been interrupted. + */ + public static void main(String[] args) { + try (DaprWorkflowClient client = new DaprWorkflowClient()) { + String instanceId = client.scheduleNewWorkflow(DemoContinueAsNewWorkflow.class); + System.out.printf("Started a new continue-as-new model workflow with instance ID: %s%n", instanceId); + + client.waitForInstanceCompletion(instanceId, null, true); + System.out.printf("workflow instance with ID: %s completed.", instanceId); + + } catch (TimeoutException | InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewWorker.java b/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewWorker.java new file mode 100644 index 0000000000..0ca050e874 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewWorker.java @@ -0,0 +1,37 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.continueasnew; + +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class DemoContinueAsNewWorker { + /** + * The main method of this app. + * + * @param args The port the app will listen on. + * @throws Exception An Exception. + */ + public static void main(String[] args) throws Exception { + // Register the Workflow with the builder. + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoContinueAsNewWorkflow.class); + builder.registerActivity(CleanUpActivity.class); + + // Build and then start the workflow runtime pulling and executing tasks + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("Start workflow runtime"); + runtime.start(); + } + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewWorkflow.java b/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewWorkflow.java new file mode 100644 index 0000000000..b3176e4c35 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewWorkflow.java @@ -0,0 +1,45 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.continueasnew; + +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; + +import java.time.Duration; + +public class DemoContinueAsNewWorkflow extends Workflow { + /* + Compared with a CRON schedule, this periodic workflow example will never overlap. + For example, a CRON schedule that executes a cleanup every hour will execute it at 1:00, 2:00, 3:00 etc. + and could potentially run into overlap issues if the cleanup takes longer than an hour. + In this example, however, if the cleanup takes 30 minutes, and we create a timer for 1 hour between cleanups, + then it will be scheduled at 1:00, 2:30, 4:00, etc. and there is no chance of overlap. + */ + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + + ctx.getLogger().info("call CleanUpActivity to do the clean up"); + ctx.callActivity(CleanUpActivity.class.getName()).await(); + ctx.getLogger().info("CleanUpActivity finished"); + + ctx.getLogger().info("wait 10 seconds for next clean up"); + ctx.createTimer(Duration.ofSeconds(10)).await(); + + // continue the workflow. + ctx.continueAsNew(null); + }; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/externalevent/ApproveActivity.java b/examples/src/main/java/io/dapr/examples/workflows/externalevent/ApproveActivity.java new file mode 100644 index 0000000000..63b921857a --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/externalevent/ApproveActivity.java @@ -0,0 +1,39 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.externalevent; + +import io.dapr.workflows.runtime.WorkflowActivity; +import io.dapr.workflows.runtime.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class ApproveActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + Logger logger = LoggerFactory.getLogger(ApproveActivity.class); + logger.info("Starting Activity: " + ctx.getName()); + + logger.info("Running approval activity..."); + //Sleeping for 5 seconds to simulate long running operation + try { + TimeUnit.SECONDS.sleep(5); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + return null; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventClient.java b/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventClient.java new file mode 100644 index 0000000000..d7178cad70 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventClient.java @@ -0,0 +1,41 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.externalevent; + +import io.dapr.workflows.client.DaprWorkflowClient; +import java.util.concurrent.TimeoutException; + +public class DemoExternalEventClient { + /** + * The main method to start the client. + * + * @param args Input arguments (unused). + * @throws InterruptedException If program has been interrupted. + */ + public static void main(String[] args) { + try (DaprWorkflowClient client = new DaprWorkflowClient()) { + String instanceId = client.scheduleNewWorkflow(DemoExternalEventWorkflow.class); + System.out.printf("Started a new external-event workflow with instance ID: %s%n", instanceId); + + client.raiseEvent(instanceId, "Approval", true); + //client.raiseEvent(instanceId, "Approval", false); + + client.waitForInstanceCompletion(instanceId, null, true); + System.out.printf("workflow instance with ID: %s completed.", instanceId); + + } catch (TimeoutException | InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventWorker.java b/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventWorker.java new file mode 100644 index 0000000000..aaa1e7c819 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventWorker.java @@ -0,0 +1,38 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.externalevent; + +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class DemoExternalEventWorker { + /** + * The main method of this app. + * + * @param args The port the app will listen on. + * @throws Exception An Exception. + */ + public static void main(String[] args) throws Exception { + // Register the Workflow with the builder. + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoExternalEventWorkflow.class); + builder.registerActivity(ApproveActivity.class); + builder.registerActivity(DenyActivity.class); + + // Build and then start the workflow runtime pulling and executing tasks + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("Start workflow runtime"); + runtime.start(); + } + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventWorkflow.java b/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventWorkflow.java new file mode 100644 index 0000000000..96684150bd --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventWorkflow.java @@ -0,0 +1,38 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.externalevent; + +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; + +public class DemoExternalEventWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + + ctx.getLogger().info("Waiting for approval..."); + Boolean approved = ctx.waitForExternalEvent("Approval", boolean.class).await(); + if (approved) { + ctx.getLogger().info("approval granted - do the approved action"); + ctx.callActivity(ApproveActivity.class.getName()).await(); + ctx.getLogger().info("approval-activity finished"); + } else { + ctx.getLogger().info("approval denied - send a notification"); + ctx.callActivity(DenyActivity.class.getName()).await(); + ctx.getLogger().info("denied-activity finished"); + } + }; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowActivity.java b/examples/src/main/java/io/dapr/examples/workflows/externalevent/DenyActivity.java similarity index 53% rename from examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowActivity.java rename to examples/src/main/java/io/dapr/examples/workflows/externalevent/DenyActivity.java index 9e82f2a4dc..d50d7212ab 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowActivity.java +++ b/examples/src/main/java/io/dapr/examples/workflows/externalevent/DenyActivity.java @@ -11,9 +11,8 @@ limitations under the License. */ -package io.dapr.examples.workflows; +package io.dapr.examples.workflows.externalevent; -import com.fasterxml.jackson.annotation.JsonAutoDetect; import io.dapr.workflows.runtime.WorkflowActivity; import io.dapr.workflows.runtime.WorkflowActivityContext; import org.slf4j.Logger; @@ -21,33 +20,20 @@ import java.util.concurrent.TimeUnit; -@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY) -public class DemoWorkflowActivity implements WorkflowActivity { - +public class DenyActivity implements WorkflowActivity { @Override - public DemoActivityOutput run(WorkflowActivityContext ctx) { - Logger logger = LoggerFactory.getLogger(DemoWorkflowActivity.class); + public Object run(WorkflowActivityContext ctx) { + Logger logger = LoggerFactory.getLogger(DenyActivity.class); logger.info("Starting Activity: " + ctx.getName()); - var message = ctx.getInput(DemoActivityInput.class).getMessage(); - var newMessage = message + " World!, from Activity"; - logger.info("Message Received from input: " + message); - logger.info("Sending message to output: " + newMessage); - - logger.info("Sleeping for 5 seconds to simulate long running operation..."); - + logger.info("Running denied activity..."); + //Sleeping for 5 seconds to simulate long running operation try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { throw new RuntimeException(e); } - - logger.info("Activity finished"); - - var output = new DemoActivityOutput(message, newMessage); - logger.info("Activity returned: " + output); - - return output; + return null; } } diff --git a/examples/src/main/java/io/dapr/examples/workflows/faninout/CountWordsActivity.java b/examples/src/main/java/io/dapr/examples/workflows/faninout/CountWordsActivity.java new file mode 100644 index 0000000000..e6e98dee50 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/faninout/CountWordsActivity.java @@ -0,0 +1,38 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.faninout; + +import io.dapr.workflows.runtime.WorkflowActivity; +import io.dapr.workflows.runtime.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.StringTokenizer; + +public class CountWordsActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + Logger logger = LoggerFactory.getLogger(CountWordsActivity.class); + logger.info("Starting Activity: " + ctx.getName()); + + String input = ctx.getInput(String.class); + StringTokenizer tokenizer = new StringTokenizer(input); + int result = tokenizer.countTokens(); + + logger.info("Activity returned: " + result); + logger.info("Activity finished"); + + return result; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutClient.java b/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutClient.java new file mode 100644 index 0000000000..23a76b788c --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutClient.java @@ -0,0 +1,58 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.faninout; + +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.WorkflowInstanceStatus; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeoutException; + +public class DemoFanInOutClient { + /** + * The main method to start the client. + * + * @param args Input arguments (unused). + * @throws InterruptedException If program has been interrupted. + */ + public static void main(String[] args) throws InterruptedException { + try (DaprWorkflowClient client = new DaprWorkflowClient()) { + // The input is an arbitrary list of strings. + List listOfStrings = Arrays.asList( + "Hello, world!", + "The quick brown fox jumps over the lazy dog.", + "If a tree falls in the forest and there is no one there to hear it, does it make a sound?", + "The greatest glory in living lies not in never falling, but in rising every time we fall.", + "Always remember that you are absolutely unique. Just like everyone else."); + + // Schedule an orchestration which will reliably count the number of words in all the given sentences. + String instanceId = client.scheduleNewWorkflow( + DemoFanInOutWorkflow.class, + listOfStrings); + System.out.printf("Started a new fan out/fan in model workflow with instance ID: %s%n", instanceId); + + // Block until the orchestration completes. Then print the final status, which includes the output. + WorkflowInstanceStatus workflowInstanceStatus = client.waitForInstanceCompletion( + instanceId, + Duration.ofSeconds(30), + true); + System.out.printf("workflow instance with ID: %s completed with result: %s%n", instanceId, + workflowInstanceStatus.readOutputAs(int.class)); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorker.java b/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorker.java new file mode 100644 index 0000000000..d5c6d14e7a --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorker.java @@ -0,0 +1,37 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.faninout; + +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class DemoFanInOutWorker { + /** + * The main method of this app. + * + * @param args The port the app will listen on. + * @throws Exception An Exception. + */ + public static void main(String[] args) throws Exception { + // Register the Workflow with the builder. + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoFanInOutWorkflow.class); + builder.registerActivity(CountWordsActivity.class); + + // Build and then start the workflow runtime pulling and executing tasks + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("Start workflow runtime"); + runtime.start(); + } + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorkflow.java b/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorkflow.java new file mode 100644 index 0000000000..62312f938d --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorkflow.java @@ -0,0 +1,49 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.faninout; + +import com.microsoft.durabletask.Task; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; + +import java.util.List; +import java.util.stream.Collectors; + +public class DemoFanInOutWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + + + // The input is a list of objects that need to be operated on. + // In this example, inputs are expected to be strings. + List inputs = ctx.getInput(List.class); + + // Fan-out to multiple concurrent activity invocations, each of which does a word count. + List> tasks = inputs.stream() + .map(input -> ctx.callActivity(CountWordsActivity.class.getName(), input.toString(), Integer.class)) + .collect(Collectors.toList()); + + // Fan-in to get the total word count from all the individual activity results. + List allWordCountResults = ctx.allOf(tasks).await(); + int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum(); + + ctx.getLogger().info("Workflow finished with result: " + totalWordCount); + // Save the final result as the orchestration output. + ctx.complete(totalWordCount); + }; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/subworkflow/DemoSubWorkerflowClient.java b/examples/src/main/java/io/dapr/examples/workflows/subworkflow/DemoSubWorkerflowClient.java new file mode 100644 index 0000000000..7202694acb --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/subworkflow/DemoSubWorkerflowClient.java @@ -0,0 +1,42 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.subworkflow; + +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.WorkflowInstanceStatus; + +import java.util.concurrent.TimeoutException; + +public class DemoSubWorkerflowClient { + /** + * The main method to start the client. + * + * @param args Input arguments (unused). + * @throws InterruptedException If program has been interrupted. + */ + public static void main(String[] args) { + try (DaprWorkflowClient client = new DaprWorkflowClient()) { + String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class); + System.out.printf("Started a new sub-workflow model workflow with instance ID: %s%n", instanceId); + WorkflowInstanceStatus workflowInstanceStatus = + client.waitForInstanceCompletion(instanceId, null, true); + + String result = workflowInstanceStatus.readOutputAs(String.class); + System.out.printf("workflow instance with ID: %s completed with result: %s%n", instanceId, result); + + } catch (TimeoutException | InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/DemoSubWorkflow.java b/examples/src/main/java/io/dapr/examples/workflows/subworkflow/DemoSubWorkflow.java similarity index 58% rename from examples/src/main/java/io/dapr/examples/workflows/DemoSubWorkflow.java rename to examples/src/main/java/io/dapr/examples/workflows/subworkflow/DemoSubWorkflow.java index 0967443825..72c25575da 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/DemoSubWorkflow.java +++ b/examples/src/main/java/io/dapr/examples/workflows/subworkflow/DemoSubWorkflow.java @@ -11,29 +11,25 @@ limitations under the License. */ -package io.dapr.examples.workflows; +package io.dapr.examples.workflows.subworkflow; import io.dapr.workflows.Workflow; import io.dapr.workflows.WorkflowStub; -/** - * Implementation of the DemoWorkflow for the server side. - */ public class DemoSubWorkflow extends Workflow { @Override public WorkflowStub create() { return ctx -> { + ctx.getLogger().info("Starting SubWorkflow: " + ctx.getName()); - var logger = ctx.getLogger(); - logger.info("Child-Workflow> Started: " + ctx.getName()); - logger.info("Child-Workflow> Instance ID: " + ctx.getInstanceId()); - logger.info("Child-Workflow> Current Time: " + ctx.getCurrentInstant()); + var subWorkflowInput = ctx.getInput(String.class); + ctx.getLogger().info("SubWorkflow received input: " + subWorkflowInput); - var input = ctx.getInput(String.class); - logger.info("Child-Workflow> Input: " + input); + ctx.getLogger().info("SubWorkflow is calling Activity: " + ReverseActivity.class.getName()); + String result = ctx.callActivity(ReverseActivity.class.getName(), subWorkflowInput, String.class).await(); - logger.info("Child-Workflow> Completed"); - ctx.complete("result: " + input); + ctx.getLogger().info("SubWorkflow finished with: " + result); + ctx.complete(result); }; } } diff --git a/examples/src/main/java/io/dapr/examples/workflows/subworkflow/DemoSubWorkflowWorker.java b/examples/src/main/java/io/dapr/examples/workflows/subworkflow/DemoSubWorkflowWorker.java new file mode 100644 index 0000000000..026a786396 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/subworkflow/DemoSubWorkflowWorker.java @@ -0,0 +1,39 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.subworkflow; + +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class DemoSubWorkflowWorker { + /** + * The main method of this app. + * + * @param args The port the app will listen on. + * @throws Exception An Exception. + */ + public static void main(String[] args) throws Exception { + // Register the Workflow with the builder. + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder() + .registerWorkflow(DemoWorkflow.class) + .registerWorkflow(DemoSubWorkflow.class); + builder.registerActivity(ReverseActivity.class); + + // Build and then start the workflow runtime pulling and executing tasks + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("Start workflow runtime"); + runtime.start(); + } + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/subworkflow/DemoWorkflow.java b/examples/src/main/java/io/dapr/examples/workflows/subworkflow/DemoWorkflow.java new file mode 100644 index 0000000000..0d846006ac --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/subworkflow/DemoWorkflow.java @@ -0,0 +1,35 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.subworkflow; + +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; + +public class DemoWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + + var subWorkflowInput = "Hello Dapr Workflow!"; + ctx.getLogger().info("calling subworkflow with input: " + subWorkflowInput); + + var subWorkflowOutput = + ctx.callSubWorkflow(DemoSubWorkflow.class.getName(), subWorkflowInput, String.class).await(); + + ctx.getLogger().info("subworkflow finished with: " + subWorkflowOutput); + ctx.complete(subWorkflowOutput); + }; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/subworkflow/ReverseActivity.java b/examples/src/main/java/io/dapr/examples/workflows/subworkflow/ReverseActivity.java new file mode 100644 index 0000000000..00d34382cd --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/subworkflow/ReverseActivity.java @@ -0,0 +1,36 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.subworkflow; + +import io.dapr.examples.workflows.chain.ToUpperCaseActivity; +import io.dapr.workflows.runtime.WorkflowActivity; +import io.dapr.workflows.runtime.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReverseActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + Logger logger = LoggerFactory.getLogger(ReverseActivity.class); + logger.info("Starting Activity: " + ctx.getName()); + + var message = ctx.getInput(String.class); + var newMessage = new StringBuilder(message).reverse().toString(); + + logger.info("Message Received from input: " + message); + logger.info("Sending message to output: " + newMessage); + + return newMessage; + } +}