diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java index dd73d67f0c..fcd8182b8e 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java @@ -372,9 +372,8 @@ private boolean fireTimers() throws Exception { KeyedWorkItems.timersWorkItem(keyTimers.getKey(), delivery); @SuppressWarnings({"unchecked", "rawtypes"}) CommittedBundle bundle = - evaluationContext - .createKeyedBundle( - null, keyTimers.getKey(), (PCollection) transform.getInput()) + InProcessBundle.>keyed( + (PCollection) transform.getInput(), keyTimers.getKey()) .add(WindowedValue.valueInEmptyWindows(work)) .commit(Instant.now()); scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery)); diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java index b2aa45ef62..4aeb0d3b2a 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -75,7 +75,6 @@ class InProcessEvaluationContext { /** The options that were used to create this {@link Pipeline}. */ private final InProcessPipelineOptions options; - private final BundleFactory bundleFactory; /** The current processing time and event time watermarks and timers. */ private final InMemoryWatermarkManager watermarkManager; @@ -92,24 +91,21 @@ class InProcessEvaluationContext { public static InProcessEvaluationContext create( InProcessPipelineOptions options, - BundleFactory bundleFactory, Collection> rootTransforms, Map>> valueToConsumers, Map, String> stepNames, Collection> views) { return new InProcessEvaluationContext( - options, bundleFactory, rootTransforms, valueToConsumers, stepNames, views); + options, rootTransforms, valueToConsumers, stepNames, views); } private InProcessEvaluationContext( InProcessPipelineOptions options, - BundleFactory bundleFactory, Collection> rootTransforms, Map>> valueToConsumers, Map, String> stepNames, Collection> views) { this.options = checkNotNull(options); - this.bundleFactory = checkNotNull(bundleFactory); checkNotNull(rootTransforms); checkNotNull(valueToConsumers); checkNotNull(stepNames); @@ -209,7 +205,7 @@ private void fireAvailableCallbacks(AppliedPTransform producingTransfor * Create a {@link UncommittedBundle} for use by a source. */ public UncommittedBundle createRootBundle(PCollection output) { - return bundleFactory.createRootBundle(output); + return InProcessBundle.unkeyed(output); } /** @@ -217,7 +213,9 @@ public UncommittedBundle createRootBundle(PCollection output) { * PCollection}. */ public UncommittedBundle createBundle(CommittedBundle input, PCollection output) { - return bundleFactory.createBundle(input, output); + return input.isKeyed() + ? InProcessBundle.keyed(output, input.getKey()) + : InProcessBundle.unkeyed(output); } /** @@ -226,7 +224,7 @@ public UncommittedBundle createBundle(CommittedBundle input, PCollecti */ public UncommittedBundle createKeyedBundle( CommittedBundle input, Object key, PCollection output) { - return bundleFactory.createKeyedBundle(input, key, output); + return InProcessBundle.keyed(output, key); } /** @@ -355,9 +353,7 @@ public CounterSet getCounters() { * for each time they are set. */ public Map, Map> extractFiredTimers() { - Map, Map> fired = - watermarkManager.extractFiredTimers(); - return fired; + return watermarkManager.extractFiredTimers(); } /** diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java index 9b6db8f730..82d5e170f6 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -230,7 +230,6 @@ public InProcessPipelineResult run(Pipeline pipeline) { InProcessEvaluationContext context = InProcessEvaluationContext.create( getPipelineOptions(), - createBundleFactory(getPipelineOptions()), consumerTrackingVisitor.getRootTransforms(), consumerTrackingVisitor.getValueToConsumers(), consumerTrackingVisitor.getStepNames(), @@ -270,10 +269,6 @@ public InProcessPipelineResult run(Pipeline pipeline) { return Collections.emptyMap(); } - private BundleFactory createBundleFactory(InProcessPipelineOptions pipelineOptions) { - return InProcessBundleFactory.create(); - } - /** * The result of running a {@link Pipeline} with the {@link InProcessPipelineRunner}. * diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/IntervalBoundedExponentialBackOff.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/IntervalBoundedExponentialBackOff.java index 5e73ad8fe8..3521584ae0 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/IntervalBoundedExponentialBackOff.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/IntervalBoundedExponentialBackOff.java @@ -53,12 +53,6 @@ public class IntervalBoundedExponentialBackOff implements BackOff { private final long initialIntervalMillis; private int currentAttempt; - // BEAM-168: https://issues.apache.org/jira/browse/BEAM-168 - @Deprecated - public IntervalBoundedExponentialBackOff(int maximumIntervalMillis, long initialIntervalMillis) { - this((long) maximumIntervalMillis, initialIntervalMillis); - } - public IntervalBoundedExponentialBackOff(long maximumIntervalMillis, long initialIntervalMillis) { Preconditions.checkArgument( maximumIntervalMillis > 0, "Maximum interval must be greater than zero."); diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index 106b356cb8..1a70af5317 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -444,8 +444,19 @@ private Collection processElement(WindowedValue value) throws Excepti ReduceFn.Context directContext = contextFactory.base(window, StateStyle.DIRECT); - W active = activeWindows.mergeResultWindow(window); - Preconditions.checkState(active != null, "Window %s has no mergeResultWindow", window); + if (triggerRunner.isClosed(directContext.state())) { + // This window has already been closed. + droppedDueToClosedWindow.addValue(1L); + WindowTracing.debug( + "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} " + + "since window is no longer active at inputWatermark:{}; outputWatermark:{}", + value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + continue; + } + + W active = activeWindows.representative(window); + Preconditions.checkState(active != null, "Window %s has no representative", window); windows.add(active); } @@ -456,24 +467,10 @@ private Collection processElement(WindowedValue value) throws Excepti triggerRunner.prefetchForValue(window, directContext.state()); } - // Process the element for each (mergeResultWindow, not closed) window it belongs to. - List triggerableWindows = new ArrayList<>(windows.size()); + // Process the element for each (representative, not closed) window it belongs to. for (W window : windows) { ReduceFn.ProcessValueContext directContext = contextFactory.forValue( window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT); - if (triggerRunner.isClosed(directContext.state())) { - // This window has already been closed. - droppedDueToClosedWindow.addValue(1L); - WindowTracing.debug( - "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} " - + "since window is no longer active at inputWatermark:{}; outputWatermark:{}", - value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); - continue; - } - - triggerableWindows.add(window); - activeWindows.ensureWindowIsActive(window); ReduceFn.ProcessValueContext renamedContext = contextFactory.forValue( window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED); diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java index fef108dff3..83cfc9b4f9 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java @@ -69,6 +69,7 @@ import com.google.cloud.dataflow.sdk.values.TupleTag; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.hamcrest.Matchers; @@ -817,10 +818,10 @@ public void processElement(ProcessContext c) throws Exception { @Override public void populateDisplayData(DisplayData.Builder builder) { builder - .add("foo", "bar") - .add("foo2", DataflowPipelineTranslatorTest.class) - .withLabel("Test Class") - .withLinkUrl("http://www.google.com"); + .add("foo", "bar") + .add("foo2", 123) + .withLabel("Test Value") + .withLinkUrl("http://www.google.com"); } }; @@ -836,10 +837,12 @@ public void populateDisplayData(DisplayData.Builder builder) { } }; + ParDo.Bound parDo1 = ParDo.of(fn1); + ParDo.Bound parDo2 = ParDo.of(fn2); pipeline .apply(Create.of(1, 2, 3)) - .apply(ParDo.of(fn1)) - .apply(ParDo.of(fn2)); + .apply(parDo1) + .apply(parDo2); Job job = translator.translate( pipeline, pipeline.getRunner(), Collections.emptyList()).getJob(); @@ -856,34 +859,33 @@ public void populateDisplayData(DisplayData.Builder builder) { Collection> fn2displayData = (Collection>) parDo2Properties.get("display_data"); - ImmutableList expectedFn1DisplayData = ImmutableList.of( - ImmutableMap.builder() - .put("namespace", fn1.getClass().getName()) - .put("key", "foo") - .put("type", "STRING") - .put("value", "bar") - .build(), - ImmutableMap.builder() - .put("namespace", fn1.getClass().getName()) - .put("key", "foo2") - .put("type", "JAVA_CLASS") - .put("value", DataflowPipelineTranslatorTest.class.getName()) - .put("shortValue", DataflowPipelineTranslatorTest.class.getSimpleName()) - .put("label", "Test Class") - .put("linkUrl", "http://www.google.com") - .build() + ImmutableSet> expectedFn1DisplayData = ImmutableSet.of( + ImmutableMap.builder() + .put("key", "foo") + .put("type", "STRING") + .put("value", "bar") + .put("namespace", fn1.getClass().getName()) + .build(), + ImmutableMap.builder() + .put("key", "foo2") + .put("type", "INTEGER") + .put("value", "123") + .put("namespace", fn1.getClass().getName()) + .put("label", "Test Value") + .put("linkUrl", "http://www.google.com") + .build() ); - ImmutableList expectedFn2DisplayData = ImmutableList.of( - ImmutableMap.builder() - .put("namespace", fn2.getClass().getName()) - .put("key", "foo3") - .put("type", "STRING") - .put("value", "barge") - .build() + ImmutableSet> expectedFn2DisplayData = ImmutableSet.of( + ImmutableMap.builder() + .put("key", "foo3") + .put("type", "STRING") + .put("value", "barge") + .put("namespace", fn2.getClass().getName()) + .build() ); - assertEquals(expectedFn1DisplayData, fn1displayData); - assertEquals(expectedFn2DisplayData, fn2displayData); + assertEquals(expectedFn1DisplayData, ImmutableSet.copyOf(fn1displayData)); + assertEquals(expectedFn2DisplayData, ImmutableSet.copyOf(fn2displayData)); } } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java index 5de5a59b25..0e0125ef43 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java @@ -45,7 +45,6 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.Mock; -import org.mockito.MockitoAnnotations; import java.io.IOException; import java.util.Arrays; @@ -61,7 +60,6 @@ public class BoundedReadEvaluatorFactoryTest { private PCollection longs; private TransformEvaluatorFactory factory; @Mock private InProcessEvaluationContext context; - private BundleFactory bundleFactory; @Before public void setup() { @@ -170,7 +168,7 @@ public void boundedSourceEvaluatorClosesReader() throws Exception { PCollection pcollection = p.apply(Read.from(source)); AppliedPTransform sourceTransform = pcollection.getProducingTransformInternal(); - UncommittedBundle output = bundleFactory.createRootBundle(pcollection); + UncommittedBundle output = InProcessBundle.unkeyed(longs); when(context.createRootBundle(pcollection)).thenReturn(output); TransformEvaluator evaluator = factory.forApplication(sourceTransform, null, context); @@ -188,7 +186,7 @@ public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception { PCollection pcollection = p.apply(Read.from(source)); AppliedPTransform sourceTransform = pcollection.getProducingTransformInternal(); - UncommittedBundle output = bundleFactory.createRootBundle(pcollection); + UncommittedBundle output = InProcessBundle.unkeyed(longs); when(context.createRootBundle(pcollection)).thenReturn(output); TransformEvaluator evaluator = factory.forApplication(sourceTransform, null, context); diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java index dcc9775bd8..73c103ac90 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java @@ -33,6 +33,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; import java.io.IOException; import java.io.InputStream; @@ -42,6 +44,7 @@ /** * Tests for {@link EncodabilityEnforcementFactory}. */ +@RunWith(JUnit4.class) public class EncodabilityEnforcementFactoryTest { @Rule public ExpectedException thrown = ExpectedException.none(); private EncodabilityEnforcementFactory factory = EncodabilityEnforcementFactory.create(); diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java index 6268b4b771..05e38babc6 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java @@ -110,7 +110,6 @@ public void setup() { context = InProcessEvaluationContext.create( runner.getPipelineOptions(), - InProcessBundleFactory.create(), rootTransforms, valueToConsumers, cVis.getStepNames(), @@ -157,9 +156,7 @@ public void getExecutionContextSameStepSameKeyState() { stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1); context.handleResult( - InProcessBundleFactory.create() - .createKeyedBundle(null, "foo", created) - .commit(Instant.now()), + InProcessBundle.keyed(created, "foo").commit(Instant.now()), ImmutableList.of(), StepTransformResult.withoutHold(created.getProducingTransformInternal()) .withState(stepContext.commitState()) @@ -251,7 +248,7 @@ public void handleResultMergesCounters() { .withCounters(againCounters) .build(); context.handleResult( - context.createRootBundle(created).commit(Instant.now()), + InProcessBundle.unkeyed(created).commit(Instant.now()), ImmutableList.of(), secondResult); assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(12L)); @@ -278,7 +275,7 @@ public void handleResultStoresState() { .build(); context.handleResult( - context.createKeyedBundle(null, myKey, created).commit(Instant.now()), + InProcessBundle.keyed(created, myKey).commit(Instant.now()), ImmutableList.of(), stateResult); @@ -360,7 +357,7 @@ public void extractFiredTimersExtractsTimers() { // haven't added any timers, must be empty assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); context.handleResult( - context.createKeyedBundle(null, key, created).commit(Instant.now()), + InProcessBundle.keyed(created, key).commit(Instant.now()), ImmutableList.of(), timerResult); diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java index b14b34bb24..7741263c77 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java @@ -73,7 +73,6 @@ public class TransformExecutorTest { private RegisteringCompletionCallback completionCallback; private TransformExecutorService transformEvaluationState; - private BundleFactory bundleFactory; @Mock private InProcessEvaluationContext evaluationContext; @Mock private TransformEvaluatorRegistry registry; private Map, Boolean> scheduled; @@ -82,8 +81,6 @@ public class TransformExecutorTest { public void setup() { MockitoAnnotations.initMocks(this); - bundleFactory = InProcessBundleFactory.create(); - scheduled = new HashMap<>(); transformEvaluationState = TransformExecutorServices.parallel(MoreExecutors.newDirectExecutorService(), scheduled); @@ -158,7 +155,7 @@ public InProcessTransformResult finishBundle() throws Exception { WindowedValue spam = WindowedValue.valueInGlobalWindow("spam"); WindowedValue third = WindowedValue.valueInGlobalWindow("third"); CommittedBundle inputBundle = - bundleFactory.createRootBundle(created).add(foo).add(spam).add(third).commit(Instant.now()); + InProcessBundle.unkeyed(created).add(foo).add(spam).add(third).commit(Instant.now()); when( registry.forApplication( downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) @@ -204,7 +201,7 @@ public InProcessTransformResult finishBundle() throws Exception { WindowedValue foo = WindowedValue.valueInGlobalWindow("foo"); CommittedBundle inputBundle = - bundleFactory.createRootBundle(created).add(foo).commit(Instant.now()); + InProcessBundle.unkeyed(created).add(foo).commit(Instant.now()); when( registry.forApplication( downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) @@ -242,8 +239,7 @@ public InProcessTransformResult finishBundle() throws Exception { } }; - CommittedBundle inputBundle = - bundleFactory.createRootBundle(created).commit(Instant.now()); + CommittedBundle inputBundle = InProcessBundle.unkeyed(created).commit(Instant.now()); when( registry.forApplication( downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java index ab0b939597..464e781c0f 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java @@ -141,7 +141,6 @@ public void boundedSourceEvaluatorClosesReader() throws Exception { PCollection pcollection = p.apply(Read.from(source)); AppliedPTransform sourceTransform = pcollection.getProducingTransformInternal(); - UncommittedBundle output = bundleFactory.createRootBundle(pcollection); when(context.createRootBundle(pcollection)).thenReturn(output); TransformEvaluator evaluator = factory.forApplication(sourceTransform, null, context); @@ -159,7 +158,6 @@ public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception { PCollection pcollection = p.apply(Read.from(source)); AppliedPTransform sourceTransform = pcollection.getProducingTransformInternal(); - UncommittedBundle output = bundleFactory.createRootBundle(pcollection); when(context.createRootBundle(pcollection)).thenReturn(output); TransformEvaluator evaluator = factory.forApplication(sourceTransform, null, context); diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java index dfc8c38f52..e8128c7788 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java @@ -553,6 +553,7 @@ public void populateDisplayData(Builder builder) { }); } + @Test public void testAcceptsNullOptionalValues() { DisplayData.from( new HasDisplayData() { diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java index 73b35b66f5..646accd252 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java @@ -723,41 +723,9 @@ public void testMergingWithCloseBeforeGC() throws Exception { equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); } - /** - * Ensure a closed trigger has its state recorded in the merge result window. - */ - @Test - public void testMergingWithCloseTrigger() throws Exception { - ReduceFnTester, IntervalWindow> tester = - ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger, - AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50), - ClosingBehavior.FIRE_IF_NON_EMPTY); - - // Create a new merged session window. - tester.injectElements(TimestampedValue.of(1, new Instant(1)), - TimestampedValue.of(2, new Instant(2))); - - // Force the trigger to be closed for the merged window. - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - triggerShouldFinish(mockTrigger); - tester.advanceInputWatermark(new Instant(13)); - - // Trigger is now closed. - assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(12)))); - - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false); - - // Revisit the same session window. - tester.injectElements(TimestampedValue.of(1, new Instant(1)), - TimestampedValue.of(2, new Instant(2))); - - // Trigger is still closed. - assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(12)))); - } - /** * If a later event tries to reuse an earlier session window which has been closed, we - * should reject that element and not fail due to the window no longer being active. + * should reject that element and not fail due to the window no longer having a representative. */ @Test public void testMergingWithReusedWindow() throws Exception { @@ -777,8 +745,6 @@ public void testMergingWithReusedWindow() throws Exception { // Another element in the same session window. // Should be discarded with 'window closed'. tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21. - // And nothing should be left in the active window state. - assertTrue(tester.hasNoActiveWindows()); // Now the garbage collection timer will fire, finding the trigger already closed. tester.advanceInputWatermark(new Instant(100)); @@ -795,91 +761,6 @@ public void testMergingWithReusedWindow() throws Exception { equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); } - /** - * When a merged window's trigger is closed we record that state using the merged window rather - * than the original windows. - */ - @Test - public void testMergingWithClosedRepresentative() throws Exception { - ReduceFnTester, IntervalWindow> tester = - ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger, - AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50), - ClosingBehavior.FIRE_IF_NON_EMPTY); - - // 2 elements into merged session window. - // Close the trigger, but the garbage collection timer is still pending. - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - triggerShouldFinish(mockTrigger); - tester.injectElements(TimestampedValue.of(1, new Instant(1)), // in [1, 11), gc at 21. - TimestampedValue.of(8, new Instant(8))); // in [8, 18), gc at 28. - - // More elements into the same merged session window. - // It has not yet been gced. - // Should be discarded with 'window closed'. - tester.injectElements(TimestampedValue.of(1, new Instant(1)), // in [1, 11), gc at 21. - TimestampedValue.of(2, new Instant(2)), // in [2, 12), gc at 22. - TimestampedValue.of(8, new Instant(8))); // in [8, 18), gc at 28. - - // Now the garbage collection timer will fire, finding the trigger already closed. - tester.advanceInputWatermark(new Instant(100)); - - List>> output = tester.extractOutput(); - - assertThat(output.size(), equalTo(1)); - assertThat(output.get(0), - isSingleWindowedValue(containsInAnyOrder(1, 8), - 1, // timestamp - 1, // window start - 18)); // window end - assertThat( - output.get(0).getPane(), - equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0))); - } - - /** - * If an element for a closed session window ends up being merged into other still-open - * session windows, the resulting session window is not 'poisoned'. - */ - @Test - public void testMergingWithClosedDoesNotPoison() throws Exception { - ReduceFnTester, IntervalWindow> tester = - ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger, - AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50), - ClosingBehavior.FIRE_IF_NON_EMPTY); - - // 1 element, force its trigger to close. - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - triggerShouldFinish(mockTrigger); - tester.injectElements(TimestampedValue.of(2, new Instant(2))); - - // 3 elements, one already closed. - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false); - tester.injectElements(TimestampedValue.of(1, new Instant(1)), - TimestampedValue.of(2, new Instant(2)), - TimestampedValue.of(3, new Instant(3))); - - tester.advanceInputWatermark(new Instant(100)); - - List>> output = tester.extractOutput(); - assertThat(output.size(), equalTo(2)); - assertThat(output.get(0), - isSingleWindowedValue(containsInAnyOrder(2), - 2, // timestamp - 2, // window start - 12)); // window end - assertThat( - output.get(0).getPane(), - equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0))); - assertThat(output.get(1), - isSingleWindowedValue(containsInAnyOrder(1, 2, 3), - 1, // timestamp - 1, // window start - 13)); // window end - assertThat( - output.get(1).getPane(), - equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); - } - /** * Tests that when data is assigned to multiple windows but some of those windows have * had their triggers finish, then the data is dropped and counted accurately.