diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java index d9a7ff02f839..92e5aa554b2a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.util.ExecutionContext; +import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowedValue; @@ -333,16 +334,6 @@ public ReadyCheckingSideInputReader createSideInputReader( return sideInputContainer.createReaderForViews(sideInputs); } - /** - * A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has - * had its contents set in a window. - */ - static interface ReadyCheckingSideInputReader extends SideInputReader { - /** - * Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}. - */ - boolean isReady(PCollectionView view, BoundedWindow window); - } /** * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java index f4980ef1546c..d0f29ff3f876 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java @@ -19,10 +19,10 @@ import static com.google.common.base.Preconditions.checkArgument; -import org.apache.beam.runners.direct.InProcessEvaluationContext.ReadyCheckingSideInputReader; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.PCollectionViewWindow; +import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java index d8a78f23d3e8..8f89e707b9bb 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertThat; import static org.mockito.Mockito.doAnswer; -import org.apache.beam.runners.direct.InProcessEvaluationContext.ReadyCheckingSideInputReader; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; @@ -37,6 +36,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.util.PCollectionViews; +import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java new file mode 100644 index 000000000000..4eeedf65a149 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning + * them via the {@link #processElementInReadyWindows(WindowedValue)}. + */ +class PushbackSideInputDoFnRunner implements DoFnRunner { + private final DoFnRunner underlying; + private final Collection> views; + private final ReadyCheckingSideInputReader sideInputReader; + + private Set notReadyWindows; + + public static PushbackSideInputDoFnRunner create( + DoFnRunner underlying, + Collection> views, + ReadyCheckingSideInputReader sideInputReader) { + return new PushbackSideInputDoFnRunner<>(underlying, views, sideInputReader); + } + + private PushbackSideInputDoFnRunner( + DoFnRunner underlying, + Collection> views, + ReadyCheckingSideInputReader sideInputReader) { + this.underlying = underlying; + this.views = views; + this.sideInputReader = sideInputReader; + } + + @Override + public void startBundle() { + notReadyWindows = new HashSet<>(); + underlying.startBundle(); + } + + /** + * Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for the provided element + * for each window the element is in that is ready. + * + * @param elem the element to process in all ready windows + * @return each element that could not be processed because it requires a side input window + * that is not ready. + */ + public Iterable> processElementInReadyWindows(WindowedValue elem) { + if (views.isEmpty()) { + processElement(elem); + return Collections.emptyList(); + } + ImmutableList.Builder> pushedBack = ImmutableList.builder(); + for (WindowedValue windowElem : elem.explodeWindows()) { + BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows()); + boolean isReady = !notReadyWindows.contains(mainInputWindow); + for (PCollectionView view : views) { + BoundedWindow sideInputWindow = + view.getWindowingStrategyInternal() + .getWindowFn() + .getSideInputWindow(mainInputWindow); + if (!sideInputReader.isReady(view, sideInputWindow)) { + isReady = false; + break; + } + } + if (isReady) { + processElement(windowElem); + } else { + notReadyWindows.add(mainInputWindow); + pushedBack.add(windowElem); + } + } + return pushedBack.build(); + } + + @Override + public void processElement(WindowedValue elem) { + underlying.processElement(elem); + } + + /** + * Call the underlying {@link DoFnRunner#finishBundle()}. + */ + @Override + public void finishBundle() { + notReadyWindows = null; + underlying.finishBundle(); + } +} + diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReadyCheckingSideInputReader.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReadyCheckingSideInputReader.java new file mode 100644 index 000000000000..cb38a55a0da4 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReadyCheckingSideInputReader.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has + * had its contents set in a window. + */ +public interface ReadyCheckingSideInputReader extends SideInputReader { + /** + * Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}. + */ + boolean isReady(PCollectionView view, BoundedWindow window); +} + diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java new file mode 100644 index 000000000000..ecab6f8eb6c5 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; +import org.apache.beam.sdk.transforms.windowing.WindowFn; + +import java.util.Collection; + +/** + * A {@link WindowFn} for use during tests that returns the input window for calls to + * {@link #getSideInputWindow(BoundedWindow)}. + */ +class IdentitySideInputWindowFn extends NonMergingWindowFn { + @Override + public Collection assignWindows(WindowFn.AssignContext c) + throws Exception { + return (Collection) c.windows(); + } + + @Override + public boolean isCompatible(WindowFn other) { + return true; + } + + @Override + public Coder windowCoder() { + // not used + return (Coder) GlobalWindow.Coder.INSTANCE; + } + + @Override + public BoundedWindow getSideInputWindow(BoundedWindow window) { + return window; + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java new file mode 100644 index 000000000000..88851187bb65 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.when; + +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +import com.google.common.collect.ImmutableList; + +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests for {@link PushbackSideInputDoFnRunner}. + */ +@RunWith(JUnit4.class) +public class PushbackSideInputDoFnRunnerTest { + @Mock private ReadyCheckingSideInputReader reader; + private TestDoFnRunner underlying; + private PCollectionView singletonView; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + TestPipeline p = TestPipeline.create(); + PCollection created = p.apply(Create.of(1, 2, 3)); + singletonView = + created + .apply(Window.into(new IdentitySideInputWindowFn())) + .apply(Sum.integersGlobally().asSingletonView()); + + underlying = new TestDoFnRunner<>(); + } + + private PushbackSideInputDoFnRunner createRunner( + ImmutableList> views) { + PushbackSideInputDoFnRunner runner = + PushbackSideInputDoFnRunner.create(underlying, views, reader); + runner.startBundle(); + return runner; + } + + @Test + public void startFinishBundleDelegates() { + PushbackSideInputDoFnRunner runner = + createRunner(ImmutableList.>of(singletonView)); + + assertThat(underlying.started, is(true)); + assertThat(underlying.finished, is(false)); + runner.finishBundle(); + assertThat(underlying.finished, is(true)); + } + + @Test + public void processElementSideInputNotReady() { + when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class))) + .thenReturn(false); + + PushbackSideInputDoFnRunner runner = + createRunner(ImmutableList.>of(singletonView)); + + WindowedValue oneWindow = + WindowedValue.of( + 2, + new Instant(-2), + new IntervalWindow(new Instant(-500L), new Instant(0L)), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + Iterable> oneWindowPushback = + runner.processElementInReadyWindows(oneWindow); + assertThat(oneWindowPushback, containsInAnyOrder(oneWindow)); + assertThat(underlying.inputElems, Matchers.>emptyIterable()); + } + + @Test + public void processElementSideInputNotReadyMultipleWindows() { + when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class))) + .thenReturn(false); + + PushbackSideInputDoFnRunner runner = + createRunner(ImmutableList.>of(singletonView)); + + WindowedValue multiWindow = + WindowedValue.of( + 2, + new Instant(-2), + ImmutableList.of( + new IntervalWindow(new Instant(-500L), new Instant(0L)), + new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), + GlobalWindow.INSTANCE), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + Iterable> multiWindowPushback = + runner.processElementInReadyWindows(multiWindow); + assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows())); + assertThat(underlying.inputElems, Matchers.>emptyIterable()); + } + + @Test + public void processElementSideInputNotReadySomeWindows() { + when(reader.isReady(Mockito.eq(singletonView), Mockito.eq(GlobalWindow.INSTANCE))) + .thenReturn(false); + when( + reader.isReady( + Mockito.eq(singletonView), + org.mockito.AdditionalMatchers.not(Mockito.eq(GlobalWindow.INSTANCE)))) + .thenReturn(true); + + PushbackSideInputDoFnRunner runner = + createRunner(ImmutableList.>of(singletonView)); + + IntervalWindow littleWindow = new IntervalWindow(new Instant(-500L), new Instant(0L)); + IntervalWindow bigWindow = + new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)); + WindowedValue multiWindow = + WindowedValue.of( + 2, + new Instant(-2), + ImmutableList.of(littleWindow, bigWindow, GlobalWindow.INSTANCE), + PaneInfo.NO_FIRING); + Iterable> multiWindowPushback = + runner.processElementInReadyWindows(multiWindow); + assertThat( + multiWindowPushback, + containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L)))); + assertThat(underlying.inputElems, + containsInAnyOrder(WindowedValue.of(2, new Instant(-2), littleWindow, PaneInfo.NO_FIRING), + WindowedValue.of(2, new Instant(-2), bigWindow, PaneInfo.NO_FIRING))); + } + + @Test + public void processElementSideInputReadyAllWindows() { + when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class))) + .thenReturn(true); + + ImmutableList> views = ImmutableList.>of(singletonView); + PushbackSideInputDoFnRunner runner = createRunner(views); + + WindowedValue multiWindow = + WindowedValue.of( + 2, + new Instant(-2), + ImmutableList.of( + new IntervalWindow(new Instant(-500L), new Instant(0L)), + new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), + GlobalWindow.INSTANCE), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + Iterable> multiWindowPushback = + runner.processElementInReadyWindows(multiWindow); + assertThat(multiWindowPushback, emptyIterable()); + assertThat(underlying.inputElems, + containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray())); + } + + @Test + public void processElementNoSideInputs() { + PushbackSideInputDoFnRunner runner = + createRunner(ImmutableList.>of()); + + WindowedValue multiWindow = + WindowedValue.of( + 2, + new Instant(-2), + ImmutableList.of( + new IntervalWindow(new Instant(-500L), new Instant(0L)), + new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), + GlobalWindow.INSTANCE), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + Iterable> multiWindowPushback = + runner.processElementInReadyWindows(multiWindow); + assertThat(multiWindowPushback, emptyIterable()); + assertThat(underlying.inputElems, containsInAnyOrder(multiWindow)); + } + + private static class TestDoFnRunner implements DoFnRunner { + List> inputElems; + private boolean started = false; + private boolean finished = false; + + @Override + public void startBundle() { + started = true; + inputElems = new ArrayList<>(); + } + + @Override + public void processElement(WindowedValue elem) { + inputElems.add(elem); + } + + @Override + public void finishBundle() { + finished = true; + } + } +}