From 81085d28e86c18fdfcb05464ad2270b514a5e9aa Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Thu, 18 Apr 2019 11:51:48 -0400 Subject: [PATCH 1/2] Adding comments to CallbackExecutor.submit --- .../pubsub/v1/SequentialExecutorService.java | 41 ++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index bbebd8db4de2..34be63f1673d 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -115,6 +115,7 @@ private static class AutoExecutor extends SequentialExecutor { super(executor); } + @Override protected void execute(final String key, final Deque tasks) { executor.execute( new Runnable() { @@ -146,8 +147,40 @@ private static class CallbackExecutor extends SequentialExecutor { super(executor); } + /** + * This method does the following in a chain: + * + *
    + *
  1. Creates an `ApiFuture` that can be used for tracking progress. + *
  2. Creates a `CancellableRunnable` out of the `Callable` + *
  3. Adds the `CancellableRunnable` to the task queue + *
  4. Once the task is ready to be run, it will execute the `Callable` + *
  5. When the callable completes one of two things happens: + *
      + *
    1. On success: + *
        + *
      1. Complete the `ApiFuture` by setting the return value. + *
      2. Call the next task. + *
      + *
    2. On Failure: + *
        + *
      1. Fail the `ApiFuture` by setting the exception. + *
      2. Cancel all tasks in the queue. + *
      + *
    + *
+ * + * @param key The key for the task queue + * @param callable The thing to run + * @param The Type of + * @return + */ ApiFuture submit(final String key, final Callable callable) { + // Step 1: create a future for the user final SettableApiFuture future = SettableApiFuture.create(); + + // Step 2: create the CancellableRunnable + // Step 3: add the task to queue via `execute` execute( key, new CancellableRunnable() { @@ -155,20 +188,25 @@ ApiFuture submit(final String key, final Callable callable) { @Override public void run() { + // the task was cancelled if (cancelled) { return; } + try { - ApiFuture callResult = callable.call(); + // Step 4: call the `Callable` + ApiFuture callResult = callable.call(); ApiFutures.addCallback( callResult, new ApiFutureCallback() { + // Step 5.1: on success @Override public void onSuccess(T msg) { future.set(msg); resume(key); } + // Step 5.2: on failure @Override public void onFailure(Throwable e) { future.setException(e); @@ -193,6 +231,7 @@ public void cancel(Throwable e) { return future; } + @Override protected void execute(final String key, final Deque tasks) { executor.execute( new Runnable() { From e41f4036cf52c34d4b8457b81d50f8e6d5a94015 Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Thu, 18 Apr 2019 12:18:18 -0400 Subject: [PATCH 2/2] Fixing the merge --- .../com/google/cloud/pubsub/v1/SequentialExecutorService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index 5b2d729ec301..06eca64b2b00 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -195,7 +195,7 @@ public void run() { try { // Step 4: call the `Callable` - ApiFuture callResult = callable.call(); + ApiFuture callResult = callable.call(); ApiFutures.addCallback( callResult, new ApiFutureCallback() {