From 3e8df24a2ced82be7ebe26837f96f651acc1ac06 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 2 May 2016 10:03:43 -0700 Subject: [PATCH 1/3] Move ReadyCheckingSideInputReader to util This SideInputReader allows callers to check for a side input being available before attempting to read the contents --- .../direct/InProcessEvaluationContext.java | 11 +----- .../direct/InProcessSideInputContainer.java | 2 +- .../InProcessSideInputContainerTest.java | 2 +- .../util/ReadyCheckingSideInputReader.java | 34 +++++++++++++++++++ 4 files changed, 37 insertions(+), 12 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReadyCheckingSideInputReader.java diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java index d9a7ff02f839..92e5aa554b2a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.util.ExecutionContext; +import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowedValue; @@ -333,16 +334,6 @@ public ReadyCheckingSideInputReader createSideInputReader( return sideInputContainer.createReaderForViews(sideInputs); } - /** - * A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has - * had its contents set in a window. - */ - static interface ReadyCheckingSideInputReader extends SideInputReader { - /** - * Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}. - */ - boolean isReady(PCollectionView view, BoundedWindow window); - } /** * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java index f4980ef1546c..d0f29ff3f876 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java @@ -19,10 +19,10 @@ import static com.google.common.base.Preconditions.checkArgument; -import org.apache.beam.runners.direct.InProcessEvaluationContext.ReadyCheckingSideInputReader; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.PCollectionViewWindow; +import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java index d8a78f23d3e8..8f89e707b9bb 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertThat; import static org.mockito.Mockito.doAnswer; -import org.apache.beam.runners.direct.InProcessEvaluationContext.ReadyCheckingSideInputReader; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; @@ -37,6 +36,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.util.PCollectionViews; +import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReadyCheckingSideInputReader.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReadyCheckingSideInputReader.java new file mode 100644 index 000000000000..cb38a55a0da4 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReadyCheckingSideInputReader.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has + * had its contents set in a window. + */ +public interface ReadyCheckingSideInputReader extends SideInputReader { + /** + * Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}. + */ + boolean isReady(PCollectionView view, BoundedWindow window); +} + From 8bddf3286a2ce23d29508a2bc7fd3576db7959c3 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 2 May 2016 10:04:20 -0700 Subject: [PATCH 2/3] Add PushbackSideInputDoFnRunner This DoFnRunner wraps a DoFnRunner and provides an additional method to process an element in all the windows where all side inputs are ready, returning any elements that it could not process. --- .../sdk/util/PushbackSideInputDoFnRunner.java | 115 ++++++++++ .../sdk/util/IdentitySideInputWindowFn.java | 50 +++++ .../util/PushbackSideInputDoFnRunnerTest.java | 198 ++++++++++++++++++ 3 files changed, 363 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java new file mode 100644 index 000000000000..4eeedf65a149 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning + * them via the {@link #processElementInReadyWindows(WindowedValue)}. + */ +class PushbackSideInputDoFnRunner implements DoFnRunner { + private final DoFnRunner underlying; + private final Collection> views; + private final ReadyCheckingSideInputReader sideInputReader; + + private Set notReadyWindows; + + public static PushbackSideInputDoFnRunner create( + DoFnRunner underlying, + Collection> views, + ReadyCheckingSideInputReader sideInputReader) { + return new PushbackSideInputDoFnRunner<>(underlying, views, sideInputReader); + } + + private PushbackSideInputDoFnRunner( + DoFnRunner underlying, + Collection> views, + ReadyCheckingSideInputReader sideInputReader) { + this.underlying = underlying; + this.views = views; + this.sideInputReader = sideInputReader; + } + + @Override + public void startBundle() { + notReadyWindows = new HashSet<>(); + underlying.startBundle(); + } + + /** + * Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for the provided element + * for each window the element is in that is ready. + * + * @param elem the element to process in all ready windows + * @return each element that could not be processed because it requires a side input window + * that is not ready. + */ + public Iterable> processElementInReadyWindows(WindowedValue elem) { + if (views.isEmpty()) { + processElement(elem); + return Collections.emptyList(); + } + ImmutableList.Builder> pushedBack = ImmutableList.builder(); + for (WindowedValue windowElem : elem.explodeWindows()) { + BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows()); + boolean isReady = !notReadyWindows.contains(mainInputWindow); + for (PCollectionView view : views) { + BoundedWindow sideInputWindow = + view.getWindowingStrategyInternal() + .getWindowFn() + .getSideInputWindow(mainInputWindow); + if (!sideInputReader.isReady(view, sideInputWindow)) { + isReady = false; + break; + } + } + if (isReady) { + processElement(windowElem); + } else { + notReadyWindows.add(mainInputWindow); + pushedBack.add(windowElem); + } + } + return pushedBack.build(); + } + + @Override + public void processElement(WindowedValue elem) { + underlying.processElement(elem); + } + + /** + * Call the underlying {@link DoFnRunner#finishBundle()}. + */ + @Override + public void finishBundle() { + notReadyWindows = null; + underlying.finishBundle(); + } +} + diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java new file mode 100644 index 000000000000..205a6ba47b9c --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; +import org.apache.beam.sdk.transforms.windowing.WindowFn; + +import java.util.Collection; + +class IdentitySideInputWindowFn extends NonMergingWindowFn { + @Override + public Collection assignWindows(WindowFn.AssignContext c) + throws Exception { + return (Collection) c.windows(); + } + + @Override + public boolean isCompatible(WindowFn other) { + return true; + } + + @Override + public Coder windowCoder() { + // not used + return (Coder) GlobalWindow.Coder.INSTANCE; + } + + @Override + public BoundedWindow getSideInputWindow(BoundedWindow window) { + return window; + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java new file mode 100644 index 000000000000..bfc32e0abf36 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +import com.google.common.collect.ImmutableList; + +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link PushbackSideInputDoFnRunner}. + */ +@RunWith(JUnit4.class) +public class PushbackSideInputDoFnRunnerTest { + @Mock private ReadyCheckingSideInputReader reader; + @Mock private DoFnRunner underlying; + private PCollectionView singletonView; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + TestPipeline p = TestPipeline.create(); + PCollection created = p.apply(Create.of(1, 2, 3)); + singletonView = + created + .apply(Window.into(new IdentitySideInputWindowFn())) + .apply(Sum.integersGlobally().asSingletonView()); + } + + @Test + public void processElementSideInputNotReady() { + when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class))) + .thenReturn(false); + + PushbackSideInputDoFnRunner runner = + PushbackSideInputDoFnRunner.create( + underlying, ImmutableList.>of(singletonView), reader); + runner.startBundle(); + + WindowedValue oneWindow = + WindowedValue.of( + 2, + new Instant(-2), + new IntervalWindow(new Instant(-500L), new Instant(0L)), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + Iterable> oneWindowPushback = + runner.processElementInReadyWindows(oneWindow); + assertThat(oneWindowPushback, containsInAnyOrder(oneWindow)); + verify(underlying, never()).processElement(Mockito.any(WindowedValue.class)); + } + + @Test + public void processElementSideInputNotReadyMultipleWindows() { + when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class))) + .thenReturn(false); + + PushbackSideInputDoFnRunner runner = + PushbackSideInputDoFnRunner.create( + underlying, ImmutableList.>of(singletonView), reader); + + runner.startBundle(); + WindowedValue multiWindow = + WindowedValue.of( + 2, + new Instant(-2), + ImmutableList.of( + new IntervalWindow(new Instant(-500L), new Instant(0L)), + new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), + GlobalWindow.INSTANCE), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + Iterable> multiWindowPushback = + runner.processElementInReadyWindows(multiWindow); + assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows())); + verify(underlying, never()).processElement(Mockito.any(WindowedValue.class)); + } + + @Test + public void processElementSideInputNotReadySomeWindows() { + when(reader.isReady(Mockito.eq(singletonView), Mockito.eq(GlobalWindow.INSTANCE))) + .thenReturn(false); + when( + reader.isReady( + Mockito.eq(singletonView), + org.mockito.AdditionalMatchers.not(Mockito.eq(GlobalWindow.INSTANCE)))) + .thenReturn(true); + + PushbackSideInputDoFnRunner runner = + PushbackSideInputDoFnRunner.create( + underlying, ImmutableList.>of(singletonView), reader); + runner.startBundle(); + + WindowedValue multiWindow = + WindowedValue.of( + 2, + new Instant(-2), + ImmutableList.of( + new IntervalWindow(new Instant(-500L), new Instant(0L)), + new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), + GlobalWindow.INSTANCE), + PaneInfo.NO_FIRING); + Iterable> multiWindowPushback = + runner.processElementInReadyWindows(multiWindow); + assertThat( + multiWindowPushback, + containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L)))); + verify(underlying, times(2)).processElement(Mockito.any(WindowedValue.class)); + } + + @Test + public void processElementSideInputReadyAllWindows() { + when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class))) + .thenReturn(true); + + PushbackSideInputDoFnRunner runner = + PushbackSideInputDoFnRunner.create( + underlying, ImmutableList.>of(singletonView), reader); + runner.startBundle(); + + WindowedValue multiWindow = + WindowedValue.of( + 2, + new Instant(-2), + ImmutableList.of( + new IntervalWindow(new Instant(-500L), new Instant(0L)), + new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), + GlobalWindow.INSTANCE), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + Iterable> multiWindowPushback = + runner.processElementInReadyWindows(multiWindow); + assertThat(multiWindowPushback, emptyIterable()); + for (WindowedValue explodedValue : multiWindow.explodeWindows()) { + verify(underlying).processElement(explodedValue); + } + } + + @Test + public void processElementNoSideInputs() { + PushbackSideInputDoFnRunner runner = PushbackSideInputDoFnRunner.create( + underlying, + ImmutableList.>of(), + reader); + + WindowedValue multiWindow = + WindowedValue.of( + 2, + new Instant(-2), + ImmutableList.of( + new IntervalWindow(new Instant(-500L), new Instant(0L)), + new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), + GlobalWindow.INSTANCE), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + Iterable> multiWindowPushback = + runner.processElementInReadyWindows(multiWindow); + assertThat(multiWindowPushback, emptyIterable()); + verify(underlying).processElement(multiWindow); + } +} From 2f57a211a5e2bab18c96bf62f3fc45568225560a Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 2 May 2016 13:11:10 -0700 Subject: [PATCH 3/3] fixup! Add PushbackSideInputDoFnRunner --- .../sdk/util/IdentitySideInputWindowFn.java | 4 + .../util/PushbackSideInputDoFnRunnerTest.java | 100 ++++++++++++------ 2 files changed, 72 insertions(+), 32 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java index 205a6ba47b9c..ecab6f8eb6c5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java @@ -25,6 +25,10 @@ import java.util.Collection; +/** + * A {@link WindowFn} for use during tests that returns the input window for calls to + * {@link #getSideInputWindow(BoundedWindow)}. + */ class IdentitySideInputWindowFn extends NonMergingWindowFn { @Override public Collection assignWindows(WindowFn.AssignContext c) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java index bfc32e0abf36..88851187bb65 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java @@ -20,10 +20,8 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import org.apache.beam.sdk.testing.TestPipeline; @@ -39,6 +37,7 @@ import com.google.common.collect.ImmutableList; +import org.hamcrest.Matchers; import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; @@ -48,13 +47,16 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import java.util.ArrayList; +import java.util.List; + /** * Tests for {@link PushbackSideInputDoFnRunner}. */ @RunWith(JUnit4.class) public class PushbackSideInputDoFnRunnerTest { @Mock private ReadyCheckingSideInputReader reader; - @Mock private DoFnRunner underlying; + private TestDoFnRunner underlying; private PCollectionView singletonView; @Before @@ -66,6 +68,27 @@ public void setup() { created .apply(Window.into(new IdentitySideInputWindowFn())) .apply(Sum.integersGlobally().asSingletonView()); + + underlying = new TestDoFnRunner<>(); + } + + private PushbackSideInputDoFnRunner createRunner( + ImmutableList> views) { + PushbackSideInputDoFnRunner runner = + PushbackSideInputDoFnRunner.create(underlying, views, reader); + runner.startBundle(); + return runner; + } + + @Test + public void startFinishBundleDelegates() { + PushbackSideInputDoFnRunner runner = + createRunner(ImmutableList.>of(singletonView)); + + assertThat(underlying.started, is(true)); + assertThat(underlying.finished, is(false)); + runner.finishBundle(); + assertThat(underlying.finished, is(true)); } @Test @@ -74,9 +97,7 @@ public void processElementSideInputNotReady() { .thenReturn(false); PushbackSideInputDoFnRunner runner = - PushbackSideInputDoFnRunner.create( - underlying, ImmutableList.>of(singletonView), reader); - runner.startBundle(); + createRunner(ImmutableList.>of(singletonView)); WindowedValue oneWindow = WindowedValue.of( @@ -87,7 +108,7 @@ public void processElementSideInputNotReady() { Iterable> oneWindowPushback = runner.processElementInReadyWindows(oneWindow); assertThat(oneWindowPushback, containsInAnyOrder(oneWindow)); - verify(underlying, never()).processElement(Mockito.any(WindowedValue.class)); + assertThat(underlying.inputElems, Matchers.>emptyIterable()); } @Test @@ -96,10 +117,8 @@ public void processElementSideInputNotReadyMultipleWindows() { .thenReturn(false); PushbackSideInputDoFnRunner runner = - PushbackSideInputDoFnRunner.create( - underlying, ImmutableList.>of(singletonView), reader); + createRunner(ImmutableList.>of(singletonView)); - runner.startBundle(); WindowedValue multiWindow = WindowedValue.of( 2, @@ -112,7 +131,7 @@ public void processElementSideInputNotReadyMultipleWindows() { Iterable> multiWindowPushback = runner.processElementInReadyWindows(multiWindow); assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows())); - verify(underlying, never()).processElement(Mockito.any(WindowedValue.class)); + assertThat(underlying.inputElems, Matchers.>emptyIterable()); } @Test @@ -126,25 +145,25 @@ public void processElementSideInputNotReadySomeWindows() { .thenReturn(true); PushbackSideInputDoFnRunner runner = - PushbackSideInputDoFnRunner.create( - underlying, ImmutableList.>of(singletonView), reader); - runner.startBundle(); + createRunner(ImmutableList.>of(singletonView)); + IntervalWindow littleWindow = new IntervalWindow(new Instant(-500L), new Instant(0L)); + IntervalWindow bigWindow = + new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)); WindowedValue multiWindow = WindowedValue.of( 2, new Instant(-2), - ImmutableList.of( - new IntervalWindow(new Instant(-500L), new Instant(0L)), - new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), - GlobalWindow.INSTANCE), + ImmutableList.of(littleWindow, bigWindow, GlobalWindow.INSTANCE), PaneInfo.NO_FIRING); Iterable> multiWindowPushback = runner.processElementInReadyWindows(multiWindow); assertThat( multiWindowPushback, containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L)))); - verify(underlying, times(2)).processElement(Mockito.any(WindowedValue.class)); + assertThat(underlying.inputElems, + containsInAnyOrder(WindowedValue.of(2, new Instant(-2), littleWindow, PaneInfo.NO_FIRING), + WindowedValue.of(2, new Instant(-2), bigWindow, PaneInfo.NO_FIRING))); } @Test @@ -152,10 +171,8 @@ public void processElementSideInputReadyAllWindows() { when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class))) .thenReturn(true); - PushbackSideInputDoFnRunner runner = - PushbackSideInputDoFnRunner.create( - underlying, ImmutableList.>of(singletonView), reader); - runner.startBundle(); + ImmutableList> views = ImmutableList.>of(singletonView); + PushbackSideInputDoFnRunner runner = createRunner(views); WindowedValue multiWindow = WindowedValue.of( @@ -169,17 +186,14 @@ public void processElementSideInputReadyAllWindows() { Iterable> multiWindowPushback = runner.processElementInReadyWindows(multiWindow); assertThat(multiWindowPushback, emptyIterable()); - for (WindowedValue explodedValue : multiWindow.explodeWindows()) { - verify(underlying).processElement(explodedValue); - } + assertThat(underlying.inputElems, + containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray())); } @Test public void processElementNoSideInputs() { - PushbackSideInputDoFnRunner runner = PushbackSideInputDoFnRunner.create( - underlying, - ImmutableList.>of(), - reader); + PushbackSideInputDoFnRunner runner = + createRunner(ImmutableList.>of()); WindowedValue multiWindow = WindowedValue.of( @@ -193,6 +207,28 @@ public void processElementNoSideInputs() { Iterable> multiWindowPushback = runner.processElementInReadyWindows(multiWindow); assertThat(multiWindowPushback, emptyIterable()); - verify(underlying).processElement(multiWindow); + assertThat(underlying.inputElems, containsInAnyOrder(multiWindow)); + } + + private static class TestDoFnRunner implements DoFnRunner { + List> inputElems; + private boolean started = false; + private boolean finished = false; + + @Override + public void startBundle() { + started = true; + inputElems = new ArrayList<>(); + } + + @Override + public void processElement(WindowedValue elem) { + inputElems.add(elem); + } + + @Override + public void finishBundle() { + finished = true; + } } }