From ab26b1773f6295bcbfb4ae255e61e4d2b2cf906d Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 18 Apr 2016 16:16:48 -0700 Subject: [PATCH 1/6] Allow InProcess Evaluators to check Side Input completion This checks to ensure that the PCollectionView in the SideInputWindow for the provided window either has elements available or is empty. Schedule a future to ensure that the SideInputWindows are appropriately filled with an empty iterable after retreiving the element. --- .../inprocess/InProcessEvaluationContext.java | 25 +- .../InProcessSideInputContainer.java | 89 ++++-- .../InProcessSideInputContainerTest.java | 279 +++++++++++++----- 3 files changed, 288 insertions(+), 105 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java index d439ba7c6539..51a47547e617 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -320,17 +320,30 @@ public String getStepName(AppliedPTransform application) { } /** - * Returns a {@link SideInputReader} capable of reading the provided - * {@link PCollectionView PCollectionViews}. + * Returns a {@link SideInputReader} capable of reading the provided {@link PCollectionView + * PCollectionViews}. + * * @param sideInputs the {@link PCollectionView PCollectionViews} the result should be able to - * read - * @return a {@link SideInputReader} that can read all of the provided - * {@link PCollectionView PCollectionViews} + * read + * @return a {@link SideInputReader} that can read all of the provided {@link PCollectionView + * PCollectionViews} */ - public SideInputReader createSideInputReader(final List> sideInputs) { + public ReadyCheckingSideInputReader createSideInputReader( + final List> sideInputs) { return sideInputContainer.createReaderForViews(sideInputs); } + /** + * A {@link SideInputReader} that allows callers to check to see if the {@link PCollectionView} + * has contents in the specified window. + */ + static interface ReadyCheckingSideInputReader extends SideInputReader { + /** + * Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}. + */ + boolean allViewsReadyInWindow(BoundedWindow window); + } + /** * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent * of all other {@link CounterSet CounterSets} created by this call. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java index 6bf6e8a15505..add28d7e3668 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkArgument; +import org.apache.beam.sdk.runners.inprocess.InProcessEvaluationContext.ReadyCheckingSideInputReader; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.PCollectionViewWindow; @@ -44,6 +45,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import javax.annotation.Nullable; @@ -88,11 +90,12 @@ private InProcessSideInputContainer(InProcessEvaluationContext context, } /** - * Return a view of this {@link InProcessSideInputContainer} that contains only the views in - * the provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable without + * Return a view of this {@link InProcessSideInputContainer} that contains only the views in the + * provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable without * casting, but will change as this {@link InProcessSideInputContainer} is modified. */ - public SideInputReader createReaderForViews(Collection> newContainedViews) { + public ReadyCheckingSideInputReader createReaderForViews( + Collection> newContainedViews) { if (!containedViews.containsAll(newContainedViews)) { Set> currentlyContained = ImmutableSet.copyOf(containedViews); Set> newRequested = ImmutableSet.copyOf(newContainedViews); @@ -173,41 +176,43 @@ private void updatePCollectionViewWindowValues( } } - private final class SideInputContainerSideInputReader implements SideInputReader { + private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader { private final Collection> readerViews; private SideInputContainerSideInputReader(Collection> readerViews) { this.readerViews = ImmutableSet.copyOf(readerViews); } + @Override + public boolean allViewsReadyInWindow(final BoundedWindow elementWindow) { + for (PCollectionView view : readerViews) { + try { + BoundedWindow viewWindow = + view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(elementWindow); + Future>> viewContents = + getViewFuture(view, viewWindow); + if (!viewContents.isDone()) { + return false; + } + } catch (ExecutionException e) { + throw new RuntimeException( + String.format( + "Exception while checking to see if PCollectionView %s is ready in window %s", + view, + elementWindow), + e); + } + } + return true; + } + @Override @Nullable public T get(final PCollectionView view, final BoundedWindow window) { checkArgument( readerViews.contains(view), "calling get(PCollectionView) with unknown view: " + view); - PCollectionViewWindow windowedView = PCollectionViewWindow.of(view, window); try { - final SettableFuture>> future = - viewByWindows.get(windowedView); - - WindowingStrategy windowingStrategy = view.getWindowingStrategyInternal(); - evaluationContext.scheduleAfterOutputWouldBeProduced( - view, window, windowingStrategy, new Runnable() { - @Override - public void run() { - // The requested window has closed without producing elements, so reflect that in - // the PCollectionView. If set has already been called, will do nothing. - future.set(Collections.>emptyList()); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper("InProcessSideInputContainerEmptyCallback") - .add("view", view) - .add("window", window) - .toString(); - } - }); + final Future>> future = getViewFuture(view, window); // Safe covariant cast @SuppressWarnings("unchecked") Iterable> values = (Iterable>) future.get(); @@ -220,6 +225,38 @@ public String toString() { } } + /** + * Gets the future containing the contents of the provided {@link PCollectionView} in the + * provided {@link BoundedWindow}, setting up a callback to populate the future with empty + * contents if necessary. + */ + private Future>> getViewFuture( + final PCollectionView view, final BoundedWindow window) throws ExecutionException { + PCollectionViewWindow windowedView = PCollectionViewWindow.of(view, window); + final SettableFuture>> future = + viewByWindows.get(windowedView); + + WindowingStrategy windowingStrategy = view.getWindowingStrategyInternal(); + evaluationContext.scheduleAfterOutputWouldBeProduced( + view, window, windowingStrategy, new Runnable() { + @Override + public void run() { + // The requested window has closed without producing elements, so reflect that in + // the PCollectionView. If set has already been called, will do nothing. + future.set(Collections.>emptyList()); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper("InProcessSideInputContainerEmptyCallback") + .add("view", view) + .add("window", window) + .toString(); + } + }); + return future; + } + @Override public boolean contains(PCollectionView view) { return readerViews.contains(view); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java index 99224135490f..315d09132a50 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java @@ -24,8 +24,10 @@ import static org.junit.Assert.assertThat; import static org.mockito.Mockito.doAnswer; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.runners.inprocess.InProcessEvaluationContext.ReadyCheckingSideInputReader; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Mean; @@ -33,8 +35,12 @@ import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; @@ -58,6 +64,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.util.Collection; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.Executors; @@ -68,6 +75,32 @@ */ @RunWith(JUnit4.class) public class InProcessSideInputContainerTest { + private static final BoundedWindow FIRST_WINDOW = + new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return new Instant(789541L); + } + + @Override + public String toString() { + return "firstWindow"; + } + }; + + private static final BoundedWindow SECOND_WINDOW = + new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return new Instant(14564786L); + } + + @Override + public String toString() { + return "secondWindow"; + } + }; + @Rule public ExpectedException thrown = ExpectedException.none(); @@ -84,46 +117,21 @@ public class InProcessSideInputContainerTest { // Not present in container. private PCollectionView> iterableView; - private BoundedWindow firstWindow = new BoundedWindow() { - @Override - public Instant maxTimestamp() { - return new Instant(789541L); - } - - @Override - public String toString() { - return "firstWindow"; - } - }; - - private BoundedWindow secondWindow = new BoundedWindow() { - @Override - public Instant maxTimestamp() { - return new Instant(14564786L); - } - - @Override - public String toString() { - return "secondWindow"; - } - }; - @Before public void setup() { MockitoAnnotations.initMocks(this); pipeline = TestPipeline.create(); PCollection create = - pipeline.apply("forBaseCollection", Create.of(1, 2, 3, 4)); + pipeline + .apply("forBaseCollection", Create.of(1, 2, 3, 4)) + .apply(Window.into(new SideInputContainerTestWindowFn())); mapView = create.apply("forKeyTypes", WithKeys.of("foo")) .apply("asMapView", View.asMap()); - singletonView = - create.apply("forCombinedTypes", Mean.globally()) - .apply("asDoubleView", View.asSingleton()); - + singletonView = create.apply("forCombinedTypes", Mean.globally().asSingletonView()); iterableView = create.apply("asIterableView", View.asIterable()); container = InProcessSideInputContainer.create( @@ -132,15 +140,18 @@ public void setup() { @Test public void getAfterWriteReturnsPaneInWindow() throws Exception { - WindowedValue> one = WindowedValue.of( - KV.of("one", 1), new Instant(1L), firstWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING); - WindowedValue> two = WindowedValue.of( - KV.of("two", 2), new Instant(20L), firstWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING); + WindowedValue> one = + WindowedValue.of( + KV.of("one", 1), new Instant(1L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING); + WindowedValue> two = + WindowedValue.of( + KV.of("two", 2), new Instant(20L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING); container.write(mapView, ImmutableList.>of(one, two)); Map viewContents = - container.createReaderForViews(ImmutableList.>of(mapView)) - .get(mapView, firstWindow); + container + .createReaderForViews(ImmutableList.>of(mapView)) + .get(mapView, FIRST_WINDOW); assertThat(viewContents, hasEntry("one", 1)); assertThat(viewContents, hasEntry("two", 2)); assertThat(viewContents.size(), is(2)); @@ -148,26 +159,40 @@ public void getAfterWriteReturnsPaneInWindow() throws Exception { @Test public void getReturnsLatestPaneInWindow() throws Exception { - WindowedValue> one = WindowedValue.of(KV.of("one", 1), new Instant(1L), - secondWindow, PaneInfo.createPane(true, false, Timing.EARLY)); - WindowedValue> two = WindowedValue.of(KV.of("two", 2), new Instant(20L), - secondWindow, PaneInfo.createPane(true, false, Timing.EARLY)); + WindowedValue> one = + WindowedValue.of( + KV.of("one", 1), + new Instant(1L), + SECOND_WINDOW, + PaneInfo.createPane(true, false, Timing.EARLY)); + WindowedValue> two = + WindowedValue.of( + KV.of("two", 2), + new Instant(20L), + SECOND_WINDOW, + PaneInfo.createPane(true, false, Timing.EARLY)); container.write(mapView, ImmutableList.>of(one, two)); Map viewContents = - container.createReaderForViews(ImmutableList.>of(mapView)) - .get(mapView, secondWindow); + container + .createReaderForViews(ImmutableList.>of(mapView)) + .get(mapView, SECOND_WINDOW); assertThat(viewContents, hasEntry("one", 1)); assertThat(viewContents, hasEntry("two", 2)); assertThat(viewContents.size(), is(2)); - WindowedValue> three = WindowedValue.of(KV.of("three", 3), - new Instant(300L), secondWindow, PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)); + WindowedValue> three = + WindowedValue.of( + KV.of("three", 3), + new Instant(300L), + SECOND_WINDOW, + PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)); container.write(mapView, ImmutableList.>of(three)); Map overwrittenViewContents = - container.createReaderForViews(ImmutableList.>of(mapView)) - .get(mapView, secondWindow); + container + .createReaderForViews(ImmutableList.>of(mapView)) + .get(mapView, SECOND_WINDOW); assertThat(overwrittenViewContents, hasEntry("three", 3)); assertThat(overwrittenViewContents.size(), is(1)); } @@ -259,66 +284,98 @@ public void getOnReaderForViewNotInReaderFails() { @Test public void writeForMultipleElementsInDifferentWindowsSucceeds() throws Exception { - WindowedValue firstWindowedValue = WindowedValue.of(2.875, - firstWindow.maxTimestamp().minus(200L), firstWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING); + WindowedValue firstWindowedValue = + WindowedValue.of( + 2.875, + FIRST_WINDOW.maxTimestamp().minus(200L), + FIRST_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING); WindowedValue secondWindowedValue = - WindowedValue.of(4.125, secondWindow.maxTimestamp().minus(2_000_000L), secondWindow, + WindowedValue.of( + 4.125, + SECOND_WINDOW.maxTimestamp().minus(2_000_000L), + SECOND_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING); container.write(singletonView, ImmutableList.of(firstWindowedValue, secondWindowedValue)); assertThat( - container.createReaderForViews(ImmutableList.>of(singletonView)) - .get(singletonView, firstWindow), + container + .createReaderForViews(ImmutableList.>of(singletonView)) + .get(singletonView, FIRST_WINDOW), equalTo(2.875)); assertThat( - container.createReaderForViews(ImmutableList.>of(singletonView)) - .get(singletonView, secondWindow), + container + .createReaderForViews(ImmutableList.>of(singletonView)) + .get(singletonView, SECOND_WINDOW), equalTo(4.125)); } @Test public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Exception { - WindowedValue firstValue = WindowedValue.of( - 44, firstWindow.maxTimestamp().minus(200L), firstWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING); - WindowedValue secondValue = WindowedValue.of( - 44, firstWindow.maxTimestamp().minus(200L), firstWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING); + WindowedValue firstValue = + WindowedValue.of( + 44, + FIRST_WINDOW.maxTimestamp().minus(200L), + FIRST_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING); + WindowedValue secondValue = + WindowedValue.of( + 44, + FIRST_WINDOW.maxTimestamp().minus(200L), + FIRST_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING); container.write(iterableView, ImmutableList.of(firstValue, secondValue)); assertThat( - container.createReaderForViews(ImmutableList.>of(iterableView)) - .get(iterableView, firstWindow), + container + .createReaderForViews(ImmutableList.>of(iterableView)) + .get(iterableView, FIRST_WINDOW), contains(44, 44)); } @Test public void writeForElementInMultipleWindowsSucceeds() throws Exception { WindowedValue multiWindowedValue = - WindowedValue.of(2.875, firstWindow.maxTimestamp().minus(200L), - ImmutableList.of(firstWindow, secondWindow), PaneInfo.ON_TIME_AND_ONLY_FIRING); + WindowedValue.of( + 2.875, + FIRST_WINDOW.maxTimestamp().minus(200L), + ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW), + PaneInfo.ON_TIME_AND_ONLY_FIRING); container.write(singletonView, ImmutableList.of(multiWindowedValue)); assertThat( - container.createReaderForViews(ImmutableList.>of(singletonView)) - .get(singletonView, firstWindow), + container + .createReaderForViews(ImmutableList.>of(singletonView)) + .get(singletonView, FIRST_WINDOW), equalTo(2.875)); assertThat( - container.createReaderForViews(ImmutableList.>of(singletonView)) - .get(singletonView, secondWindow), + container + .createReaderForViews(ImmutableList.>of(singletonView)) + .get(singletonView, SECOND_WINDOW), equalTo(2.875)); } @Test public void finishDoesNotOverwriteWrittenElements() throws Exception { - WindowedValue> one = WindowedValue.of(KV.of("one", 1), new Instant(1L), - secondWindow, PaneInfo.createPane(true, false, Timing.EARLY)); - WindowedValue> two = WindowedValue.of(KV.of("two", 2), new Instant(20L), - secondWindow, PaneInfo.createPane(true, false, Timing.EARLY)); + WindowedValue> one = + WindowedValue.of( + KV.of("one", 1), + new Instant(1L), + SECOND_WINDOW, + PaneInfo.createPane(true, false, Timing.EARLY)); + WindowedValue> two = + WindowedValue.of( + KV.of("two", 2), + new Instant(20L), + SECOND_WINDOW, + PaneInfo.createPane(true, false, Timing.EARLY)); container.write(mapView, ImmutableList.>of(one, two)); - immediatelyInvokeCallback(mapView, secondWindow); + immediatelyInvokeCallback(mapView, SECOND_WINDOW); Map viewContents = - container.createReaderForViews(ImmutableList.>of(mapView)) - .get(mapView, secondWindow); + container + .createReaderForViews(ImmutableList.>of(mapView)) + .get(mapView, SECOND_WINDOW); assertThat(viewContents, hasEntry("one", 1)); assertThat(viewContents, hasEntry("two", 2)); @@ -327,16 +384,68 @@ public void finishDoesNotOverwriteWrittenElements() throws Exception { @Test public void finishOnPendingViewsSetsEmptyElements() throws Exception { - immediatelyInvokeCallback(mapView, secondWindow); + immediatelyInvokeCallback(mapView, SECOND_WINDOW); Future> mapFuture = getFutureOfView( container.createReaderForViews(ImmutableList.>of(mapView)), mapView, - secondWindow); + SECOND_WINDOW); assertThat(mapFuture.get().isEmpty(), is(true)); } + @Test + public void allViewsReadyInWindowEmptyReaderTrue() { + ReadyCheckingSideInputReader reader = + container.createReaderForViews(ImmutableList.>of()); + + assertThat(reader.allViewsReadyInWindow(GlobalWindow.INSTANCE), is(true)); + assertThat(reader.allViewsReadyInWindow(FIRST_WINDOW), is(true)); + assertThat(reader.allViewsReadyInWindow(SECOND_WINDOW), is(true)); + } + + @Test + public void allViewsReadyInWindowForSomeNotReadyViewsFalseUntilElements() { + BoundedWindow sideInputWindow = + mapView.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(FIRST_WINDOW); + container.write( + mapView, + ImmutableList.of( + WindowedValue.of( + KV.of("one", 1), + FIRST_WINDOW.maxTimestamp().minus(100L), + sideInputWindow, + PaneInfo.ON_TIME_AND_ONLY_FIRING))); + + ReadyCheckingSideInputReader reader = + container.createReaderForViews(ImmutableList.of(mapView, singletonView)); + assertThat(reader.allViewsReadyInWindow(FIRST_WINDOW), is(false)); + + container.write( + singletonView, + ImmutableList.of( + WindowedValue.of( + 1.25, + FIRST_WINDOW.maxTimestamp().minus(100L), + SECOND_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING))); + assertThat(reader.allViewsReadyInWindow(FIRST_WINDOW), is(true)); + + assertThat(reader.allViewsReadyInWindow(SECOND_WINDOW), is(false)); + } + + @Test + public void allViewsReadyInWindowForEmptyWindowTrue() { + immediatelyInvokeCallback(mapView, GlobalWindow.INSTANCE); + + ReadyCheckingSideInputReader reader = + container.createReaderForViews(ImmutableList.of(mapView, singletonView)); + assertThat(reader.allViewsReadyInWindow(SECOND_WINDOW), is(false)); + + immediatelyInvokeCallback(singletonView, GlobalWindow.INSTANCE); + assertThat(reader.allViewsReadyInWindow(SECOND_WINDOW), is(true)); + } + /** * When a callAfterWindowCloses with the specified view's producing transform, window, and * windowing strategy is invoked, immediately execute the callback. @@ -370,4 +479,28 @@ public ValueT call() throws Exception { }; return Executors.newSingleThreadExecutor().submit(callable); } + + private static class SideInputContainerTestWindowFn + extends NonMergingWindowFn { + @Override + public boolean isCompatible(WindowFn other) { + return false; + } + + @Override + public Coder windowCoder() { + return (Coder) IntervalWindow.getCoder(); + } + + @Override + public BoundedWindow getSideInputWindow(BoundedWindow window) { + return window.equals(FIRST_WINDOW) ? SECOND_WINDOW : GlobalWindow.INSTANCE; + } + + @Override + public Collection assignWindows(WindowFn.AssignContext c) + throws Exception { + return null; + } + } } From c118b5af34190a2c69452bf454b779141ac3d591 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Thu, 21 Apr 2016 09:36:40 -0700 Subject: [PATCH 2/6] fixup! Allow InProcess Evaluators to check Side Input completion --- .../inprocess/InProcessEvaluationContext.java | 2 +- .../InProcessSideInputContainer.java | 36 ++++++----- .../InProcessSideInputContainerTest.java | 62 +++++-------------- 3 files changed, 36 insertions(+), 64 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java index 51a47547e617..61a9dd87cd41 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -341,7 +341,7 @@ static interface ReadyCheckingSideInputReader extends SideInputReader { /** * Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}. */ - boolean allViewsReadyInWindow(BoundedWindow window); + boolean isReady(PCollectionView view, BoundedWindow window); } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java index add28d7e3668..e92bf32fedb5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java @@ -184,24 +184,26 @@ private SideInputContainerSideInputReader(Collection> readerV } @Override - public boolean allViewsReadyInWindow(final BoundedWindow elementWindow) { - for (PCollectionView view : readerViews) { - try { - BoundedWindow viewWindow = - view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(elementWindow); - Future>> viewContents = - getViewFuture(view, viewWindow); - if (!viewContents.isDone()) { - return false; - } - } catch (ExecutionException e) { - throw new RuntimeException( - String.format( - "Exception while checking to see if PCollectionView %s is ready in window %s", - view, - elementWindow), - e); + public boolean isReady(final PCollectionView view, final BoundedWindow window) { + checkArgument( + readerViews.contains(view), + "Tried to check if view %s was ready in a SideInputReader that does not contain it. " + + "Contained views; %s", + view, + readerViews); + try { + Future>> viewContents = + getViewFuture(view, window); + if (!viewContents.isDone()) { + return false; } + } catch (ExecutionException e) { + throw new RuntimeException( + String.format( + "Exception while checking to see if PCollectionView %s is ready in window %s", + view, + window), + e); } return true; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java index 315d09132a50..4a522c8d9ea8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertThat; import static org.mockito.Mockito.doAnswer; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.runners.inprocess.InProcessEvaluationContext.ReadyCheckingSideInputReader; @@ -35,12 +34,8 @@ import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; @@ -64,7 +59,6 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import java.util.Collection; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.Executors; @@ -123,9 +117,7 @@ public void setup() { pipeline = TestPipeline.create(); PCollection create = - pipeline - .apply("forBaseCollection", Create.of(1, 2, 3, 4)) - .apply(Window.into(new SideInputContainerTestWindowFn())); + pipeline.apply("forBaseCollection", Create.of(1, 2, 3, 4)); mapView = create.apply("forKeyTypes", WithKeys.of("foo")) @@ -395,31 +387,30 @@ public void finishOnPendingViewsSetsEmptyElements() throws Exception { } @Test - public void allViewsReadyInWindowEmptyReaderTrue() { + public void isReadyInEmptyReaderThrows() { ReadyCheckingSideInputReader reader = container.createReaderForViews(ImmutableList.>of()); - - assertThat(reader.allViewsReadyInWindow(GlobalWindow.INSTANCE), is(true)); - assertThat(reader.allViewsReadyInWindow(FIRST_WINDOW), is(true)); - assertThat(reader.allViewsReadyInWindow(SECOND_WINDOW), is(true)); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("does not contain"); + thrown.expectMessage(ImmutableList.of().toString()); + reader.isReady(mapView, GlobalWindow.INSTANCE); } @Test public void allViewsReadyInWindowForSomeNotReadyViewsFalseUntilElements() { - BoundedWindow sideInputWindow = - mapView.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(FIRST_WINDOW); container.write( mapView, ImmutableList.of( WindowedValue.of( KV.of("one", 1), FIRST_WINDOW.maxTimestamp().minus(100L), - sideInputWindow, + SECOND_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING))); ReadyCheckingSideInputReader reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView)); - assertThat(reader.allViewsReadyInWindow(FIRST_WINDOW), is(false)); + assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); + assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false)); container.write( singletonView, @@ -429,9 +420,11 @@ public void allViewsReadyInWindowForSomeNotReadyViewsFalseUntilElements() { FIRST_WINDOW.maxTimestamp().minus(100L), SECOND_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING))); - assertThat(reader.allViewsReadyInWindow(FIRST_WINDOW), is(true)); + assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); + assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true)); - assertThat(reader.allViewsReadyInWindow(SECOND_WINDOW), is(false)); + assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(false)); + assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); } @Test @@ -440,10 +433,11 @@ public void allViewsReadyInWindowForEmptyWindowTrue() { ReadyCheckingSideInputReader reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView)); - assertThat(reader.allViewsReadyInWindow(SECOND_WINDOW), is(false)); + assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(true)); + assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); immediatelyInvokeCallback(singletonView, GlobalWindow.INSTANCE); - assertThat(reader.allViewsReadyInWindow(SECOND_WINDOW), is(true)); + assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true)); } /** @@ -479,28 +473,4 @@ public ValueT call() throws Exception { }; return Executors.newSingleThreadExecutor().submit(callable); } - - private static class SideInputContainerTestWindowFn - extends NonMergingWindowFn { - @Override - public boolean isCompatible(WindowFn other) { - return false; - } - - @Override - public Coder windowCoder() { - return (Coder) IntervalWindow.getCoder(); - } - - @Override - public BoundedWindow getSideInputWindow(BoundedWindow window) { - return window.equals(FIRST_WINDOW) ? SECOND_WINDOW : GlobalWindow.INSTANCE; - } - - @Override - public Collection assignWindows(WindowFn.AssignContext c) - throws Exception { - return null; - } - } } From 96bf32b51db6cf8b25485e5b3534de54c8394b82 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Thu, 21 Apr 2016 12:11:36 -0700 Subject: [PATCH 3/6] fixup! Allow InProcess Evaluators to check Side Input completion --- .../runners/inprocess/InProcessSideInputContainer.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java index e92bf32fedb5..aee3f3918fc8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java @@ -192,11 +192,8 @@ public boolean isReady(final PCollectionView view, final BoundedWindow window view, readerViews); try { - Future>> viewContents = - getViewFuture(view, window); - if (!viewContents.isDone()) { - return false; - } + Future>> viewContents = getViewFuture(view, window); + return viewContents.isDone(); } catch (ExecutionException e) { throw new RuntimeException( String.format( @@ -205,7 +202,6 @@ public boolean isReady(final PCollectionView view, final BoundedWindow window window), e); } - return true; } @Override From efa5702735503c799f8f24a9a3bd21425c48c642 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Thu, 21 Apr 2016 13:13:21 -0700 Subject: [PATCH 4/6] fixup! Allow InProcess Evaluators to check Side Input completion --- .../inprocess/InProcessEvaluationContext.java | 8 ++++---- .../InProcessSideInputContainer.java | 17 ++++------------ .../InProcessSideInputContainerTest.java | 20 +++++++++++++++---- 3 files changed, 24 insertions(+), 21 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java index 61a9dd87cd41..3990f0d04fdb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -320,8 +320,8 @@ public String getStepName(AppliedPTransform application) { } /** - * Returns a {@link SideInputReader} capable of reading the provided {@link PCollectionView - * PCollectionViews}. + * Returns a {@link ReadyCheckingSideInputReader} capable of reading the provided + * {@link PCollectionView PCollectionViews}. * * @param sideInputs the {@link PCollectionView PCollectionViews} the result should be able to * read @@ -334,8 +334,8 @@ public ReadyCheckingSideInputReader createSideInputReader( } /** - * A {@link SideInputReader} that allows callers to check to see if the {@link PCollectionView} - * has contents in the specified window. + * A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has + * had its contents set in a window. */ static interface ReadyCheckingSideInputReader extends SideInputReader { /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java index aee3f3918fc8..d6cf59341726 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java @@ -191,17 +191,8 @@ public boolean isReady(final PCollectionView view, final BoundedWindow window + "Contained views; %s", view, readerViews); - try { - Future>> viewContents = getViewFuture(view, window); - return viewContents.isDone(); - } catch (ExecutionException e) { - throw new RuntimeException( - String.format( - "Exception while checking to see if PCollectionView %s is ready in window %s", - view, - window), - e); - } + Future>> viewContents = getViewFuture(view, window); + return viewContents.isDone(); } @Override @@ -229,10 +220,10 @@ public T get(final PCollectionView view, final BoundedWindow window) { * contents if necessary. */ private Future>> getViewFuture( - final PCollectionView view, final BoundedWindow window) throws ExecutionException { + final PCollectionView view, final BoundedWindow window) { PCollectionViewWindow windowedView = PCollectionViewWindow.of(view, window); final SettableFuture>> future = - viewByWindows.get(windowedView); + viewByWindows.getUnchecked(windowedView); WindowingStrategy windowingStrategy = view.getWindowingStrategyInternal(); evaluationContext.scheduleAfterOutputWouldBeProduced( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java index 4a522c8d9ea8..60a4e00e1ce8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java @@ -397,27 +397,39 @@ public void isReadyInEmptyReaderThrows() { } @Test - public void allViewsReadyInWindowForSomeNotReadyViewsFalseUntilElements() { + public void isReadyForSomeNotReadyViewsFalseUntilElements() { container.write( mapView, ImmutableList.of( WindowedValue.of( KV.of("one", 1), - FIRST_WINDOW.maxTimestamp().minus(100L), + SECOND_WINDOW.maxTimestamp().minus(100L), SECOND_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING))); ReadyCheckingSideInputReader reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView)); + assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false)); assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); + assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false)); + container.write( + mapView, + ImmutableList.of( + WindowedValue.of( + KV.of("too", 2), + FIRST_WINDOW.maxTimestamp().minus(100L), + FIRST_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING))); + assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true)); + container.write( singletonView, ImmutableList.of( WindowedValue.of( 1.25, - FIRST_WINDOW.maxTimestamp().minus(100L), + SECOND_WINDOW.maxTimestamp().minus(100L), SECOND_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING))); assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); @@ -428,7 +440,7 @@ public void allViewsReadyInWindowForSomeNotReadyViewsFalseUntilElements() { } @Test - public void allViewsReadyInWindowForEmptyWindowTrue() { + public void isReadyForEmptyWindowTrue() { immediatelyInvokeCallback(mapView, GlobalWindow.INSTANCE); ReadyCheckingSideInputReader reader = From 365798b36ebe25478ca31d139a675cd5ed8b7119 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Thu, 21 Apr 2016 13:51:38 -0700 Subject: [PATCH 5/6] fixup! Allow InProcess Evaluators to check Side Input completion --- .../sdk/runners/inprocess/InProcessSideInputContainer.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java index d6cf59341726..e37c1d8a4f48 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java @@ -191,8 +191,7 @@ public boolean isReady(final PCollectionView view, final BoundedWindow window + "Contained views; %s", view, readerViews); - Future>> viewContents = getViewFuture(view, window); - return viewContents.isDone(); + return getViewFuture(view, window).isDone(); } @Override From c579b0bf4fff21c129799203d2a480e82990d55d Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 26 Apr 2016 16:57:15 -0700 Subject: [PATCH 6/6] fixup! Allow InProcess Evaluators to check Side Input completion --- .../InProcessSideInputContainer.java | 45 ++++++++++++------- .../InProcessSideInputContainerTest.java | 8 ++++ 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java index e37c1d8a4f48..fda78fc16f5f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java @@ -226,22 +226,7 @@ private Future>> getViewFuture( WindowingStrategy windowingStrategy = view.getWindowingStrategyInternal(); evaluationContext.scheduleAfterOutputWouldBeProduced( - view, window, windowingStrategy, new Runnable() { - @Override - public void run() { - // The requested window has closed without producing elements, so reflect that in - // the PCollectionView. If set has already been called, will do nothing. - future.set(Collections.>emptyList()); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper("InProcessSideInputContainerEmptyCallback") - .add("view", view) - .add("window", window) - .toString(); - } - }); + view, window, windowingStrategy, new WriteEmptyViewContents(view, window, future)); return future; } @@ -255,4 +240,32 @@ public boolean isEmpty() { return readerViews.isEmpty(); } } + + private static class WriteEmptyViewContents implements Runnable { + private final PCollectionView view; + private final BoundedWindow window; + private final SettableFuture>> future; + + private WriteEmptyViewContents(PCollectionView view, BoundedWindow window, + SettableFuture>> future) { + this.future = future; + this.view = view; + this.window = window; + } + + @Override + public void run() { + // The requested window has closed without producing elements, so reflect that in + // the PCollectionView. If set has already been called, will do nothing. + future.set(Collections.>emptyList()); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("view", view) + .add("window", window) + .toString(); + } + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java index 60a4e00e1ce8..03443f8f1b54 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java @@ -386,6 +386,10 @@ public void finishOnPendingViewsSetsEmptyElements() throws Exception { assertThat(mapFuture.get().isEmpty(), is(true)); } + /** + * Demonstrates that calling isReady on an empty container throws an + * {@link IllegalArgumentException}. + */ @Test public void isReadyInEmptyReaderThrows() { ReadyCheckingSideInputReader reader = @@ -396,6 +400,10 @@ public void isReadyInEmptyReaderThrows() { reader.isReady(mapView, GlobalWindow.INSTANCE); } + /** + * Demonstrates that calling isReady returns false until elements are written to the + * {@link PCollectionView}, {@link BoundedWindow} pair, at which point it returns true. + */ @Test public void isReadyForSomeNotReadyViewsFalseUntilElements() { container.write(