From 0039ee694c8ed63e0b9e6357b8a761d8dea9a115 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Thu, 28 Apr 2016 12:22:47 -0700 Subject: [PATCH 1/2] Add CommittedResult Return as the output to InProcessEvaluationContext#handleResult(). This allows a richer return type to improve possible behaviors when a result is returned. --- .../runners/inprocess/CommittedResult.java | 42 +++++++++++ .../runners/inprocess/CompletionCallback.java | 2 +- .../ExecutorServiceParallelExecutor.java | 16 ++-- .../inprocess/InProcessEvaluationContext.java | 4 +- .../runners/inprocess/TransformExecutor.java | 4 +- .../inprocess/CommittedResultTest.java | 73 +++++++++++++++++++ .../InProcessEvaluationContextTest.java | 4 +- .../inprocess/TransformExecutorTest.java | 4 +- 8 files changed, 132 insertions(+), 17 deletions(-) create mode 100644 sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CommittedResult.java create mode 100644 sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/CommittedResultTest.java diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CommittedResult.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CommittedResult.java new file mode 100644 index 0000000000..d900cfac4d --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CommittedResult.java @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.auto.value.AutoValue; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; + +/** + * A {@link InProcessTransformResult} that has been committed. + */ +@AutoValue +abstract class CommittedResult { + /** + * Returns the {@link AppliedPTransform} that produced this result. + */ + public abstract AppliedPTransform getTransform(); + + /** + * Returns the outputs produced by the transform. + */ + public abstract Iterable> getOutputs(); + + public static CommittedResult create( + InProcessTransformResult original, Iterable> outputs) { + return new AutoValue_CommittedResult(original.getTransform(), + outputs); + } +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java index d45520c196..6940421b93 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java @@ -24,7 +24,7 @@ interface CompletionCallback { /** * Handle a successful result, returning the committed outputs of the result. */ - Iterable> handleResult( + CommittedResult handleResult( CommittedBundle inputBundle, InProcessTransformResult result); /** diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java index cca19552dc..fd3d1e36a3 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java @@ -213,14 +213,14 @@ public void awaitCompletion() throws Throwable { */ private class DefaultCompletionCallback implements CompletionCallback { @Override - public Iterable> handleResult( + public CommittedResult handleResult( CommittedBundle inputBundle, InProcessTransformResult result) { - Iterable> resultBundles = + CommittedResult committedResult = evaluationContext.handleResult(inputBundle, Collections.emptyList(), result); - for (CommittedBundle outputBundle : resultBundles) { + for (CommittedBundle outputBundle : committedResult.getOutputs()) { allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); } - return resultBundles; + return committedResult; } @Override @@ -243,14 +243,14 @@ private TimerCompletionCallback(Iterable timers) { } @Override - public Iterable> handleResult( + public CommittedResult handleResult( CommittedBundle inputBundle, InProcessTransformResult result) { - Iterable> resultBundles = + CommittedResult committedResult = evaluationContext.handleResult(inputBundle, timers, result); - for (CommittedBundle outputBundle : resultBundles) { + for (CommittedBundle outputBundle : committedResult.getOutputs()) { allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); } - return resultBundles; + return committedResult; } @Override diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java index 2b21cc8584..a27ceb0ffa 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -142,7 +142,7 @@ private InProcessEvaluationContext( * @param result the result of evaluating the input bundle * @return the committed bundles contained within the handled {@code result} */ - public synchronized Iterable> handleResult( + public synchronized CommittedResult handleResult( @Nullable CommittedBundle completedBundle, Iterable completedTimers, InProcessTransformResult result) { @@ -173,7 +173,7 @@ public synchronized Iterable> handleResult( applicationStateInternals.remove(stepAndKey); } } - return committedBundles; + return CommittedResult.create(result, committedBundles); } private Iterable> commitBundles( diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java index 3313134120..7bf0e996a1 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java @@ -157,9 +157,9 @@ private InProcessTransformResult finishBundle( TransformEvaluator evaluator, Collection> enforcements) throws Exception { InProcessTransformResult result = evaluator.finishBundle(); - Iterable> outputs = onComplete.handleResult(inputBundle, result); + CommittedResult outputs = onComplete.handleResult(inputBundle, result); for (ModelEnforcement enforcement : enforcements) { - enforcement.afterFinish(inputBundle, result, outputs); + enforcement.afterFinish(inputBundle, result, outputs.getOutputs()); } return result; } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/CommittedResultTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/CommittedResultTest.java new file mode 100644 index 0000000000..3046dc1119 --- /dev/null +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/CommittedResultTest.java @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static org.junit.Assert.assertThat; + +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.values.PBegin; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PDone; +import com.google.common.collect.ImmutableList; + +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +/** + * Tests for {@link CommittedResult}. + */ +@RunWith(JUnit4.class) +public class CommittedResultTest implements Serializable { + private transient TestPipeline p = TestPipeline.create(); + private transient AppliedPTransform transform = + AppliedPTransform.of("foo", p.begin(), PDone.in(p), new PTransform() { + }); + private transient BundleFactory bundleFactory = InProcessBundleFactory.create(); + + @Test + public void getTransformExtractsFromResult() { + CommittedResult result = + CommittedResult.create(StepTransformResult.withoutHold(transform).build(), + Collections.>emptyList()); + + assertThat(result.getTransform(), Matchers.>equalTo(transform)); + } + + @Test + public void getOutputsEqualInput() { + List> outputs = + ImmutableList.of(bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p, + WindowingStrategy.globalDefault(), + PCollection.IsBounded.BOUNDED)).commit(Instant.now()), + bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p, + WindowingStrategy.globalDefault(), + PCollection.IsBounded.UNBOUNDED)).commit(Instant.now())); + CommittedResult result = + CommittedResult.create(StepTransformResult.withoutHold(transform).build(), outputs); + + assertThat(result.getOutputs(), Matchers.containsInAnyOrder(outputs.toArray())); + } +} diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java index d225519a79..7cfb63a84b 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java @@ -457,7 +457,7 @@ public void isDoneWithPartiallyDone() { UncommittedBundle rootBundle = context.createRootBundle(created); rootBundle.add(WindowedValue.valueInGlobalWindow(1)); - Iterable> handleResult = + CommittedResult handleResult = context.handleResult( null, ImmutableList.of(), @@ -466,7 +466,7 @@ public void isDoneWithPartiallyDone() { .build()); @SuppressWarnings("unchecked") CommittedBundle committedBundle = - (CommittedBundle) Iterables.getOnlyElement(handleResult); + (CommittedBundle) Iterables.getOnlyElement(handleResult.getOutputs()); context.handleResult( null, ImmutableList.of(), diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java index de7c60bcce..2dc2628e15 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java @@ -484,11 +484,11 @@ private RegisteringCompletionCallback(CountDownLatch onMethod) { } @Override - public Iterable> handleResult( + public CommittedResult handleResult( CommittedBundle inputBundle, InProcessTransformResult result) { handledResult = result; onMethod.countDown(); - return Collections.emptyList(); + return CommittedResult.create(result, Collections.>emptyList()); } @Override From 844dd7112e173f8beb0b5704ad60cec471f2f3df Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Thu, 28 Apr 2016 13:42:36 -0700 Subject: [PATCH 2/2] Use CommittedResult in InMemoryWatermarkManager This enable unprocessed elements to be handled in the Watermark manager after they are added to the CommittedResult structure. --- .../inprocess/InMemoryWatermarkManager.java | 19 +- .../inprocess/InProcessEvaluationContext.java | 6 +- .../InMemoryWatermarkManagerTest.java | 368 ++++++++++++------ 3 files changed, 252 insertions(+), 141 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java index 918a7f4a1c..1bb55ab9dd 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java @@ -797,18 +797,19 @@ public TransformWatermarks getWatermarks(AppliedPTransform transform) { * . * * @param completed the input that has completed - * @param transform the transform that has completed processing the input - * @param outputs the bundles the transform has output + * @param timerUpdate the timers that were added, removed, and completed as part of producing + * this update + * @param result the result that was produced by processing the input * @param earliestHold the earliest watermark hold in the transform's state. {@code null} if there * is no hold */ public void updateWatermarks( @Nullable CommittedBundle completed, - AppliedPTransform transform, TimerUpdate timerUpdate, - Iterable> outputs, + CommittedResult result, @Nullable Instant earliestHold) { - updatePending(completed, transform, timerUpdate, outputs); + AppliedPTransform transform = result.getTransform(); + updatePending(completed, timerUpdate, result); TransformWatermarks transformWms = transformToWatermarks.get(transform); transformWms.setEventTimeHold(completed == null ? null : completed.getKey(), earliestHold); refreshWatermarks(transform); @@ -843,15 +844,14 @@ private void refreshWatermarks(AppliedPTransform transform) { */ private void updatePending( CommittedBundle input, - AppliedPTransform transform, TimerUpdate timerUpdate, - Iterable> outputs) { - TransformWatermarks completedTransform = transformToWatermarks.get(transform); + CommittedResult result) { + TransformWatermarks completedTransform = transformToWatermarks.get(result.getTransform()); // Newly pending elements must be added before completed elements are removed, as the two // do not share a Mutex within this call and thus can be interleaved with external calls to // refresh. - for (CommittedBundle bundle : outputs) { + for (CommittedBundle bundle : result.getOutputs()) { for (AppliedPTransform consumer : consumers.get(bundle.getPCollection())) { TransformWatermarks watermarks = transformToWatermarks.get(consumer); watermarks.addPending(bundle); @@ -862,7 +862,6 @@ private void updatePending( if (input != null) { completedTransform.removePending(input); } - } /** diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java index a27ceb0ffa..4160d8bd51 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -149,11 +149,11 @@ public synchronized CommittedResult handleResult( Iterable> committedBundles = commitBundles(result.getOutputBundles()); // Update watermarks and timers + CommittedResult committedResult = CommittedResult.create(result, committedBundles); watermarkManager.updateWatermarks( completedBundle, - result.getTransform(), result.getTimerUpdate().withCompletedTimers(completedTimers), - committedBundles, + committedResult, result.getWatermarkHold()); fireAllAvailableCallbacks(); // Update counters @@ -173,7 +173,7 @@ public synchronized CommittedResult handleResult( applicationStateInternals.remove(stepAndKey); } } - return CommittedResult.create(result, committedBundles); + return committedResult; } private Iterable> commitBundles( diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java index 0603a5a386..5707bb0b2b 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java @@ -156,8 +156,11 @@ public void getWatermarkForUntouchedTransform() { @Test public void getWatermarkForUpdatedSourceTransform() { CommittedBundle output = multiWindowedBundle(createdInts, 1); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(output), new Instant(8000L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.>singleton(output)), + new Instant(8000L)); TransformWatermarks updatedSourceWatermark = manager.getWatermarks(createdInts.getProducingTransformInternal()); @@ -172,8 +175,10 @@ public void getWatermarkForUpdatedSourceTransform() { public void getWatermarkForMultiInputTransform() { CommittedBundle secondPcollectionBundle = multiWindowedBundle(intsToFlatten, -1); - manager.updateWatermarks(null, intsToFlatten.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(secondPcollectionBundle), + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(intsToFlatten.getProducingTransformInternal(), + Collections.>singleton(secondPcollectionBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); // We didn't do anything for the first source, so we shouldn't have progressed the watermark @@ -202,13 +207,17 @@ public void getWatermarkForMultiInputTransform() { CommittedBundle flattenedBundleSecondCreate = multiWindowedBundle(flattened, -1); // We have finished processing the bundle from the second PCollection, but we haven't consumed // anything from the first PCollection yet; so our watermark shouldn't advance - manager.updateWatermarks(secondPcollectionBundle, flattened.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(flattenedBundleSecondCreate), + manager.updateWatermarks(secondPcollectionBundle, + TimerUpdate.empty(), + result(flattened.getProducingTransformInternal(), + Collections.>singleton(flattenedBundleSecondCreate)), null); TransformWatermarks transformAfterProcessing = manager.getWatermarks(flattened.getProducingTransformInternal()); - manager.updateWatermarks(secondPcollectionBundle, flattened.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(flattenedBundleSecondCreate), + manager.updateWatermarks(secondPcollectionBundle, + TimerUpdate.empty(), + result(flattened.getProducingTransformInternal(), + Collections.>singleton(flattenedBundleSecondCreate)), null); assertThat( transformAfterProcessing.getInputWatermark(), @@ -222,8 +231,10 @@ public void getWatermarkForMultiInputTransform() { timestampedBundle(createdInts, TimestampedValue.of(5, firstCollectionTimestamp)); // the source is done, but elements are still buffered. The source output watermark should be // past the end of the global window - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(firstPcollectionBundle), + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.>singleton(firstPcollectionBundle)), new Instant(Long.MAX_VALUE)); TransformWatermarks firstSourceWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); @@ -250,8 +261,10 @@ public void getWatermarkForMultiInputTransform() { CommittedBundle completedFlattenBundle = bundleFactory.createRootBundle(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); - manager.updateWatermarks(firstPcollectionBundle, flattened.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(completedFlattenBundle), + manager.updateWatermarks(firstPcollectionBundle, + TimerUpdate.empty(), + result(flattened.getProducingTransformInternal(), + Collections.>singleton(completedFlattenBundle)), null); TransformWatermarks afterConsumingAllInput = manager.getWatermarks(flattened.getProducingTransformInternal()); @@ -272,8 +285,11 @@ public void getWatermarkForMultiConsumedCollection() { CommittedBundle createdBundle = timestampedBundle(createdInts, TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)), TimestampedValue.of(3, new Instant(-1000L))); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createdBundle), new Instant(Long.MAX_VALUE)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.>singleton(createdBundle)), + new Instant(Long.MAX_VALUE)); TransformWatermarks createdAfterProducing = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat( @@ -284,8 +300,11 @@ public void getWatermarkForMultiConsumedCollection() { timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)), TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)), TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L))); - manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(keyBundle), null); + manager.updateWatermarks(createdBundle, + TimerUpdate.empty(), + result(keyed.getProducingTransformInternal(), + Collections.>singleton(keyBundle)), + null); TransformWatermarks keyedWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); assertThat( @@ -300,8 +319,11 @@ public void getWatermarkForMultiConsumedCollection() { CommittedBundle filteredBundle = timestampedBundle(filtered, TimestampedValue.of(2, new Instant(1234L))); - manager.updateWatermarks(createdBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(filteredBundle), null); + manager.updateWatermarks(createdBundle, + TimerUpdate.empty(), + result(filtered.getProducingTransformInternal(), + Collections.>singleton(filteredBundle)), + null); TransformWatermarks filteredProcessedWatermarks = manager.getWatermarks(filtered.getProducingTransformInternal()); assertThat( @@ -319,17 +341,23 @@ public void getWatermarkForMultiConsumedCollection() { @Test public void updateWatermarkWithWatermarkHolds() { CommittedBundle createdBundle = timestampedBundle(createdInts, - TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)), + TimestampedValue.of(1, new Instant(1_000_000L)), + TimestampedValue.of(2, new Instant(1234L)), TimestampedValue.of(3, new Instant(-1000L))); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createdBundle), new Instant(Long.MAX_VALUE)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.>singleton(createdBundle)), + new Instant(Long.MAX_VALUE)); - CommittedBundle> keyBundle = - timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)), - TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)), - TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L))); - manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(keyBundle), + CommittedBundle> keyBundle = timestampedBundle(keyed, + TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)), + TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)), + TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L))); + manager.updateWatermarks(createdBundle, + TimerUpdate.empty(), + result(keyed.getProducingTransformInternal(), + Collections.>singleton(keyBundle)), new Instant(500L)); TransformWatermarks keyedWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); @@ -355,40 +383,54 @@ public void updateWatermarkWithKeyedWatermarkHolds() { .add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L))) .commit(clock.now()); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - ImmutableList.of(firstKeyBundle, secondKeyBundle), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + ImmutableList.of(firstKeyBundle, secondKeyBundle)), + BoundedWindow.TIMESTAMP_MAX_VALUE); - manager.updateWatermarks(firstKeyBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>emptyList(), new Instant(-1000L)); - manager.updateWatermarks(secondKeyBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>emptyList(), new Instant(1234L)); + manager.updateWatermarks(firstKeyBundle, + TimerUpdate.empty(), + result(filtered.getProducingTransformInternal(), + Collections.>emptyList()), + new Instant(-1000L)); + manager.updateWatermarks(secondKeyBundle, + TimerUpdate.empty(), + result(filtered.getProducingTransformInternal(), + Collections.>emptyList()), + new Instant(1234L)); TransformWatermarks filteredWatermarks = manager.getWatermarks(filtered.getProducingTransformInternal()); - assertThat( - filteredWatermarks.getInputWatermark(), + assertThat(filteredWatermarks.getInputWatermark(), not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE))); assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new Instant(-1000L)))); CommittedBundle fauxFirstKeyTimerBundle = bundleFactory.createKeyedBundle(null, "Odd", createdInts).commit(clock.now()); - manager.updateWatermarks(fauxFirstKeyTimerBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>emptyList(), + manager.updateWatermarks(fauxFirstKeyTimerBundle, + TimerUpdate.empty(), + result(filtered.getProducingTransformInternal(), + Collections.>emptyList()), BoundedWindow.TIMESTAMP_MAX_VALUE); assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(1234L))); CommittedBundle fauxSecondKeyTimerBundle = bundleFactory.createKeyedBundle(null, "Even", createdInts).commit(clock.now()); - manager.updateWatermarks(fauxSecondKeyTimerBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>emptyList(), new Instant(5678L)); + manager.updateWatermarks(fauxSecondKeyTimerBundle, + TimerUpdate.empty(), + result(filtered.getProducingTransformInternal(), + Collections.>emptyList()), + new Instant(5678L)); assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(5678L))); - manager.updateWatermarks(fauxSecondKeyTimerBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>emptyList(), + manager.updateWatermarks(fauxSecondKeyTimerBundle, + TimerUpdate.empty(), + result(filtered.getProducingTransformInternal(), + Collections.>emptyList()), BoundedWindow.TIMESTAMP_MAX_VALUE); - assertThat( - filteredWatermarks.getOutputWatermark(), + assertThat(filteredWatermarks.getOutputWatermark(), not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE))); } @@ -400,16 +442,21 @@ public void updateWatermarkWithKeyedWatermarkHolds() { public void updateOutputWatermarkShouldBeMonotonic() { CommittedBundle firstInput = bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(firstInput), new Instant(0L)); + manager.updateWatermarks(null, TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.>singleton(firstInput)), + new Instant(0L)); TransformWatermarks firstWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L))); CommittedBundle secondInput = bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(secondInput), new Instant(-250L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.>singleton(secondInput)), + new Instant(-250L)); TransformWatermarks secondWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(secondWatermarks.getOutputWatermark(), not(earlierThan(new Instant(0L)))); @@ -422,17 +469,22 @@ public void updateOutputWatermarkShouldBeMonotonic() { @Test public void updateWatermarkWithHoldsShouldBeMonotonic() { CommittedBundle createdBundle = timestampedBundle(createdInts, - TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)), - TimestampedValue.of(3, new Instant(-1000L))); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createdBundle), new Instant(Long.MAX_VALUE)); + TimestampedValue.of(1, new Instant(1_000_000L)), + TimestampedValue.of(2, new Instant(1234L)), + TimestampedValue.of(3, new Instant(-1000L))); manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.>singleton(createdBundle)), + new Instant(Long.MAX_VALUE)); CommittedBundle> keyBundle = timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)), TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)), TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L))); - manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(keyBundle), + manager.updateWatermarks(createdBundle, + TimerUpdate.empty(), + result(keyed.getProducingTransformInternal(), + Collections.>singleton(keyBundle)), new Instant(500L)); TransformWatermarks keyedWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); @@ -459,16 +511,22 @@ public void updateWatermarkWithLateData() { CommittedBundle createdBundle = timestampedBundle(createdInts, TimestampedValue.of(1, sourceWatermark), TimestampedValue.of(2, new Instant(1234L))); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createdBundle), sourceWatermark); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.>singleton(createdBundle)), + sourceWatermark); CommittedBundle> keyBundle = timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), sourceWatermark), TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L))); // Finish processing the on-time data. The watermarks should progress to be equal to the source - manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(keyBundle), null); + manager.updateWatermarks(createdBundle, + TimerUpdate.empty(), + result(keyed.getProducingTransformInternal(), + Collections.>singleton(keyBundle)), + null); TransformWatermarks onTimeWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); assertThat(onTimeWatermarks.getInputWatermark(), equalTo(sourceWatermark)); @@ -478,8 +536,11 @@ public void updateWatermarkWithLateData() { timestampedBundle(createdInts, TimestampedValue.of(3, new Instant(-1000L))); // the late data arrives in a downstream PCollection after its watermark has advanced past it; // we don't advance the watermark past the current watermark until we've consumed the late data - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(lateDataBundle), new Instant(2_000_000L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.>singleton(lateDataBundle)), + new Instant(2_000_000L)); TransformWatermarks bufferedLateWm = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(bufferedLateWm.getOutputWatermark(), equalTo(new Instant(2_000_000L))); @@ -493,30 +554,31 @@ public void updateWatermarkWithLateData() { CommittedBundle> lateKeyedBundle = timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L))); - manager.updateWatermarks(lateDataBundle, keyed.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(lateKeyedBundle), null); + manager.updateWatermarks(lateDataBundle, + TimerUpdate.empty(), + result(keyed.getProducingTransformInternal(), + Collections.>singleton(lateKeyedBundle)), + null); } public void updateWatermarkWithDifferentWindowedValueInstances() { manager.updateWatermarks( null, - createdInts.getProducingTransformInternal(), TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), Collections.>singleton( bundleFactory .createRootBundle(createdInts) .add(WindowedValue.valueInGlobalWindow(1)) - .commit(Instant.now())), + .commit(Instant.now()))), BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks( - bundleFactory - .createRootBundle(createdInts) + bundleFactory.createRootBundle(createdInts) .add(WindowedValue.valueInGlobalWindow(1)) .commit(Instant.now()), - keyed.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>emptyList(), + result(keyed.getProducingTransformInternal(), Collections.>emptyList()), null); TransformWatermarks onTimeWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); @@ -530,8 +592,10 @@ public void updateWatermarkWithDifferentWindowedValueInstances() { @Test public void getWatermarksAfterOnlyEmptyOutput() { CommittedBundle emptyCreateOutput = multiWindowedBundle(createdInts); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(emptyCreateOutput), + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.>singleton(emptyCreateOutput)), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks updatedSourceWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); @@ -557,12 +621,17 @@ public void getWatermarksAfterOnlyEmptyOutput() { @Test public void getWatermarksAfterHoldAndEmptyOutput() { CommittedBundle firstCreateOutput = multiWindowedBundle(createdInts, 1, 2); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(firstCreateOutput), new Instant(12_000L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.>singleton(firstCreateOutput)), + new Instant(12_000L)); CommittedBundle firstFilterOutput = multiWindowedBundle(filtered); - manager.updateWatermarks(firstCreateOutput, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(firstFilterOutput), + manager.updateWatermarks(firstCreateOutput, + TimerUpdate.empty(), + result(filtered.getProducingTransformInternal(), + Collections.>singleton(firstFilterOutput)), new Instant(10_000L)); TransformWatermarks firstFilterWatermarks = manager.getWatermarks(filtered.getProducingTransformInternal()); @@ -570,8 +639,10 @@ public void getWatermarksAfterHoldAndEmptyOutput() { assertThat(firstFilterWatermarks.getOutputWatermark(), not(laterThan(new Instant(10_000L)))); CommittedBundle emptyCreateOutput = multiWindowedBundle(createdInts); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(emptyCreateOutput), + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.>singleton(emptyCreateOutput)), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks updatedSourceWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); @@ -610,8 +681,11 @@ public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() { CommittedBundle createOutput = bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L)); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.>singleton(createOutput)), + BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks createAfterUpdate = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), equalTo(clock.now())); @@ -636,8 +710,10 @@ public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() { CommittedBundle filterOutputBundle = bundleFactory.createRootBundle(intsToFlatten).commit(new Instant(1250L)); - manager.updateWatermarks(createOutput, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(filterOutputBundle), + manager.updateWatermarks(createOutput, + TimerUpdate.empty(), + result(filtered.getProducingTransformInternal(), + Collections.>singleton(filterOutputBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks filterAfterConsumed = manager.getWatermarks(filtered.getProducingTransformInternal()); @@ -658,8 +734,10 @@ public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() { // @Test public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() { CommittedBundle createdBundle = multiWindowedBundle(createdInts, 1, 2, 4, 8); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createdBundle), new Instant(1248L)); + manager.updateWatermarks(null, TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.>singleton(createdBundle)), + new Instant(1248L)); TransformWatermarks filteredWms = manager.getWatermarks(filtered.getProducingTransformInternal()); @@ -675,8 +753,10 @@ public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() { TimerData.of(StateNamespaces.global(), new Instant(4096L), TimeDomain.PROCESSING_TIME); TimerUpdate timers = TimerUpdate.builder("key").setTimer(pastTimer).setTimer(futureTimer).build(); - manager.updateWatermarks(createdBundle, filtered.getProducingTransformInternal(), timers, - Collections.>singleton(filteredBundle), + manager.updateWatermarks(createdBundle, + timers, + result(filtered.getProducingTransformInternal(), + Collections.>singleton(filteredBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); Instant startTime = clock.now(); clock.set(startTime.plus(250L)); @@ -709,11 +789,11 @@ public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() { bundleFactory.createKeyedBundle(null, "key", filteredTimesTwo) .commit(filteredWms.getSynchronizedProcessingOutputTime()); // Complete the processing time timer - manager.updateWatermarks(filteredTimerBundle, filtered.getProducingTransformInternal(), + manager.updateWatermarks(filteredTimerBundle, TimerUpdate.builder("key") - .withCompletedTimers(Collections.singleton(pastTimer)) - .build(), - Collections.>singleton(filteredTimerResult), + .withCompletedTimers(Collections.singleton(pastTimer)).build(), + result(filtered.getProducingTransformInternal(), + Collections.>singleton(filteredTimerResult)), BoundedWindow.TIMESTAMP_MAX_VALUE); clock.set(startTime.plus(500L)); @@ -723,8 +803,10 @@ public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() { filteredDoubledWms.getSynchronizedProcessingOutputTime(), not(earlierThan(filteredTimerResult.getSynchronizedProcessingOutputWatermark()))); - manager.updateWatermarks(filteredTimerResult, filteredTimesTwo.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>emptyList(), + manager.updateWatermarks(filteredTimerResult, + TimerUpdate.empty(), + result(filteredTimesTwo.getProducingTransformInternal(), + Collections.>emptyList()), BoundedWindow.TIMESTAMP_MAX_VALUE); assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), equalTo(clock.now())); @@ -758,18 +840,23 @@ public void getSynchronizedProcessingTimeOutputTimeIsMonotonic() { CommittedBundle createOutput = bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L)); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.>singleton(createOutput)), + BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks createAfterUpdate = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), not(laterThan(clock.now()))); - assertThat( - createAfterUpdate.getSynchronizedProcessingOutputTime(), not(laterThan(clock.now()))); + assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), + not(laterThan(clock.now()))); CommittedBundle createSecondOutput = bundleFactory.createRootBundle(createdInts).commit(new Instant(750L)); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createSecondOutput), + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.>singleton(createSecondOutput)), BoundedWindow.TIMESTAMP_MAX_VALUE); assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), equalTo(clock.now())); @@ -778,16 +865,20 @@ public void getSynchronizedProcessingTimeOutputTimeIsMonotonic() { @Test public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers() { CommittedBundle created = multiWindowedBundle(createdInts, 1, 2, 3); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(created), new Instant(40_900L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.>singleton(created)), + new Instant(40_900L)); CommittedBundle filteredBundle = multiWindowedBundle(filtered, 2, 4); Instant upstreamHold = new Instant(2048L); TimerData upstreamProcessingTimer = TimerData.of(StateNamespaces.global(), upstreamHold, TimeDomain.PROCESSING_TIME); - manager.updateWatermarks(created, filtered.getProducingTransformInternal(), + manager.updateWatermarks(created, TimerUpdate.builder("key").setTimer(upstreamProcessingTimer).build(), - Collections.>singleton(filteredBundle), + result(filtered.getProducingTransformInternal(), + Collections.>singleton(filteredBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks downstreamWms = @@ -803,11 +894,12 @@ public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers( assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(upstreamHold)); CommittedBundle otherCreated = multiWindowedBundle(createdInts, 4, 8, 12); - manager.updateWatermarks(otherCreated, filtered.getProducingTransformInternal(), + manager.updateWatermarks(otherCreated, TimerUpdate.builder("key") - .withCompletedTimers(Collections.singleton(upstreamProcessingTimer)) - .build(), - Collections.>emptyList(), BoundedWindow.TIMESTAMP_MAX_VALUE); + .withCompletedTimers(Collections.singleton(upstreamProcessingTimer)).build(), + result(filtered.getProducingTransformInternal(), + Collections.>emptyList()), + BoundedWindow.TIMESTAMP_MAX_VALUE); assertThat(downstreamWms.getSynchronizedProcessingInputTime(), not(earlierThan(clock.now()))); } @@ -817,9 +909,9 @@ public void synchronizedProcessingInputTimeIsHeldToPendingBundleTimes() { CommittedBundle created = multiWindowedBundle(createdInts, 1, 2, 3); manager.updateWatermarks( null, - createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(created), + result(createdInts.getProducingTransformInternal(), + Collections.>singleton(created)), new Instant(29_919_235L)); Instant upstreamHold = new Instant(2048L); @@ -827,9 +919,9 @@ public void synchronizedProcessingInputTimeIsHeldToPendingBundleTimes() { bundleFactory.createKeyedBundle(created, "key", filtered).commit(upstreamHold); manager.updateWatermarks( created, - filtered.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(filteredBundle), + result(filtered.getProducingTransformInternal(), + Collections.>singleton(filteredBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks downstreamWms = @@ -849,8 +941,10 @@ public void extractFiredTimersReturnsFiredEventTimeTimers() { // Advance WM of keyed past the first timer, but ahead of the second and third CommittedBundle createdBundle = multiWindowedBundle(filtered); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.singleton(createdBundle), new Instant(1500L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)), + new Instant(1500L)); TimerData earliestTimer = TimerData.of(StateNamespaces.global(), new Instant(1000), TimeDomain.EVENT_TIME); @@ -866,11 +960,10 @@ public void extractFiredTimersReturnsFiredEventTimeTimers() { .setTimer(lastTimer) .build(); - manager.updateWatermarks( - createdBundle, - filtered.getProducingTransformInternal(), + manager.updateWatermarks(createdBundle, update, - Collections.>singleton(multiWindowedBundle(intsToFlatten)), + result(filtered.getProducingTransformInternal(), + Collections.>singleton(multiWindowedBundle(intsToFlatten))), new Instant(1000L)); Map, Map> firstTransformFiredTimers = @@ -883,8 +976,11 @@ public void extractFiredTimersReturnsFiredEventTimeTimers() { FiredTimers firstFired = firstFilteredTimers.get(key); assertThat(firstFired.getTimers(TimeDomain.EVENT_TIME), contains(earliestTimer)); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>emptyList(), new Instant(50_000L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.>emptyList()), + new Instant(50_000L)); Map, Map> secondTransformFiredTimers = manager.extractFiredTimers(); assertThat( @@ -906,8 +1002,10 @@ public void extractFiredTimersReturnsFiredProcessingTimeTimers() { // Advance WM of keyed past the first timer, but ahead of the second and third CommittedBundle createdBundle = multiWindowedBundle(filtered); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.singleton(createdBundle), new Instant(1500L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)), + new Instant(1500L)); TimerData earliestTimer = TimerData.of(StateNamespaces.global(), new Instant(999L), TimeDomain.PROCESSING_TIME); @@ -925,9 +1023,9 @@ public void extractFiredTimersReturnsFiredProcessingTimeTimers() { manager.updateWatermarks( createdBundle, - filtered.getProducingTransformInternal(), update, - Collections.>singleton(multiWindowedBundle(intsToFlatten)), + result(filtered.getProducingTransformInternal(), + Collections.>singleton(multiWindowedBundle(intsToFlatten))), new Instant(1000L)); Map, Map> firstTransformFiredTimers = @@ -941,8 +1039,11 @@ public void extractFiredTimersReturnsFiredProcessingTimeTimers() { assertThat(firstFired.getTimers(TimeDomain.PROCESSING_TIME), contains(earliestTimer)); clock.set(new Instant(50_000L)); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>emptyList(), new Instant(50_000L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.>emptyList()), + new Instant(50_000L)); Map, Map> secondTransformFiredTimers = manager.extractFiredTimers(); assertThat( @@ -964,8 +1065,10 @@ public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() { // Advance WM of keyed past the first timer, but ahead of the second and third CommittedBundle createdBundle = multiWindowedBundle(filtered); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.singleton(createdBundle), new Instant(1500L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)), + new Instant(1500L)); TimerData earliestTimer = TimerData.of( StateNamespaces.global(), new Instant(999L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); @@ -983,9 +1086,9 @@ public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() { manager.updateWatermarks( createdBundle, - filtered.getProducingTransformInternal(), update, - Collections.>singleton(multiWindowedBundle(intsToFlatten)), + result(filtered.getProducingTransformInternal(), + Collections.>singleton(multiWindowedBundle(intsToFlatten))), new Instant(1000L)); Map, Map> firstTransformFiredTimers = @@ -1000,8 +1103,11 @@ public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() { firstFired.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), contains(earliestTimer)); clock.set(new Instant(50_000L)); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>emptyList(), new Instant(50_000L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.>emptyList()), + new Instant(50_000L)); Map, Map> secondTransformFiredTimers = manager.extractFiredTimers(); assertThat( @@ -1130,7 +1236,6 @@ public boolean matches(Object item) { ReadableInstant instant = (ReadableInstant) item; return instant.isAfter(shouldBeEarlier); } - @Override public void describeTo(Description description) { description.appendText("later than ").appendValue(shouldBeEarlier); @@ -1162,4 +1267,11 @@ private final CommittedBundle multiWindowedBundle(PCollection pc, T... } return bundle.commit(BoundedWindow.TIMESTAMP_MAX_VALUE); } + + private final CommittedResult result( + AppliedPTransform transform, + Iterable> bundles) { + return CommittedResult.create(StepTransformResult.withoutHold(transform) + .build(), bundles); + } }