diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java index d439ba7c6539..3990f0d04fdb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -320,17 +320,30 @@ public String getStepName(AppliedPTransform application) { } /** - * Returns a {@link SideInputReader} capable of reading the provided + * Returns a {@link ReadyCheckingSideInputReader} capable of reading the provided * {@link PCollectionView PCollectionViews}. + * * @param sideInputs the {@link PCollectionView PCollectionViews} the result should be able to - * read - * @return a {@link SideInputReader} that can read all of the provided - * {@link PCollectionView PCollectionViews} + * read + * @return a {@link SideInputReader} that can read all of the provided {@link PCollectionView + * PCollectionViews} */ - public SideInputReader createSideInputReader(final List> sideInputs) { + public ReadyCheckingSideInputReader createSideInputReader( + final List> sideInputs) { return sideInputContainer.createReaderForViews(sideInputs); } + /** + * A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has + * had its contents set in a window. + */ + static interface ReadyCheckingSideInputReader extends SideInputReader { + /** + * Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}. + */ + boolean isReady(PCollectionView view, BoundedWindow window); + } + /** * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent * of all other {@link CounterSet CounterSets} created by this call. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java index 6bf6e8a15505..fda78fc16f5f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkArgument; +import org.apache.beam.sdk.runners.inprocess.InProcessEvaluationContext.ReadyCheckingSideInputReader; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.PCollectionViewWindow; @@ -44,6 +45,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import javax.annotation.Nullable; @@ -88,11 +90,12 @@ private InProcessSideInputContainer(InProcessEvaluationContext context, } /** - * Return a view of this {@link InProcessSideInputContainer} that contains only the views in - * the provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable without + * Return a view of this {@link InProcessSideInputContainer} that contains only the views in the + * provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable without * casting, but will change as this {@link InProcessSideInputContainer} is modified. */ - public SideInputReader createReaderForViews(Collection> newContainedViews) { + public ReadyCheckingSideInputReader createReaderForViews( + Collection> newContainedViews) { if (!containedViews.containsAll(newContainedViews)) { Set> currentlyContained = ImmutableSet.copyOf(containedViews); Set> newRequested = ImmutableSet.copyOf(newContainedViews); @@ -173,41 +176,31 @@ private void updatePCollectionViewWindowValues( } } - private final class SideInputContainerSideInputReader implements SideInputReader { + private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader { private final Collection> readerViews; private SideInputContainerSideInputReader(Collection> readerViews) { this.readerViews = ImmutableSet.copyOf(readerViews); } + @Override + public boolean isReady(final PCollectionView view, final BoundedWindow window) { + checkArgument( + readerViews.contains(view), + "Tried to check if view %s was ready in a SideInputReader that does not contain it. " + + "Contained views; %s", + view, + readerViews); + return getViewFuture(view, window).isDone(); + } + @Override @Nullable public T get(final PCollectionView view, final BoundedWindow window) { checkArgument( readerViews.contains(view), "calling get(PCollectionView) with unknown view: " + view); - PCollectionViewWindow windowedView = PCollectionViewWindow.of(view, window); try { - final SettableFuture>> future = - viewByWindows.get(windowedView); - - WindowingStrategy windowingStrategy = view.getWindowingStrategyInternal(); - evaluationContext.scheduleAfterOutputWouldBeProduced( - view, window, windowingStrategy, new Runnable() { - @Override - public void run() { - // The requested window has closed without producing elements, so reflect that in - // the PCollectionView. If set has already been called, will do nothing. - future.set(Collections.>emptyList()); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper("InProcessSideInputContainerEmptyCallback") - .add("view", view) - .add("window", window) - .toString(); - } - }); + final Future>> future = getViewFuture(view, window); // Safe covariant cast @SuppressWarnings("unchecked") Iterable> values = (Iterable>) future.get(); @@ -220,6 +213,23 @@ public String toString() { } } + /** + * Gets the future containing the contents of the provided {@link PCollectionView} in the + * provided {@link BoundedWindow}, setting up a callback to populate the future with empty + * contents if necessary. + */ + private Future>> getViewFuture( + final PCollectionView view, final BoundedWindow window) { + PCollectionViewWindow windowedView = PCollectionViewWindow.of(view, window); + final SettableFuture>> future = + viewByWindows.getUnchecked(windowedView); + + WindowingStrategy windowingStrategy = view.getWindowingStrategyInternal(); + evaluationContext.scheduleAfterOutputWouldBeProduced( + view, window, windowingStrategy, new WriteEmptyViewContents(view, window, future)); + return future; + } + @Override public boolean contains(PCollectionView view) { return readerViews.contains(view); @@ -230,4 +240,32 @@ public boolean isEmpty() { return readerViews.isEmpty(); } } + + private static class WriteEmptyViewContents implements Runnable { + private final PCollectionView view; + private final BoundedWindow window; + private final SettableFuture>> future; + + private WriteEmptyViewContents(PCollectionView view, BoundedWindow window, + SettableFuture>> future) { + this.future = future; + this.view = view; + this.window = window; + } + + @Override + public void run() { + // The requested window has closed without producing elements, so reflect that in + // the PCollectionView. If set has already been called, will do nothing. + future.set(Collections.>emptyList()); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("view", view) + .add("window", window) + .toString(); + } + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java index 99224135490f..03443f8f1b54 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.runners.inprocess.InProcessEvaluationContext.ReadyCheckingSideInputReader; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Mean; @@ -68,6 +69,32 @@ */ @RunWith(JUnit4.class) public class InProcessSideInputContainerTest { + private static final BoundedWindow FIRST_WINDOW = + new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return new Instant(789541L); + } + + @Override + public String toString() { + return "firstWindow"; + } + }; + + private static final BoundedWindow SECOND_WINDOW = + new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return new Instant(14564786L); + } + + @Override + public String toString() { + return "secondWindow"; + } + }; + @Rule public ExpectedException thrown = ExpectedException.none(); @@ -84,30 +111,6 @@ public class InProcessSideInputContainerTest { // Not present in container. private PCollectionView> iterableView; - private BoundedWindow firstWindow = new BoundedWindow() { - @Override - public Instant maxTimestamp() { - return new Instant(789541L); - } - - @Override - public String toString() { - return "firstWindow"; - } - }; - - private BoundedWindow secondWindow = new BoundedWindow() { - @Override - public Instant maxTimestamp() { - return new Instant(14564786L); - } - - @Override - public String toString() { - return "secondWindow"; - } - }; - @Before public void setup() { MockitoAnnotations.initMocks(this); @@ -120,10 +123,7 @@ public void setup() { create.apply("forKeyTypes", WithKeys.of("foo")) .apply("asMapView", View.asMap()); - singletonView = - create.apply("forCombinedTypes", Mean.globally()) - .apply("asDoubleView", View.asSingleton()); - + singletonView = create.apply("forCombinedTypes", Mean.globally().asSingletonView()); iterableView = create.apply("asIterableView", View.asIterable()); container = InProcessSideInputContainer.create( @@ -132,15 +132,18 @@ public void setup() { @Test public void getAfterWriteReturnsPaneInWindow() throws Exception { - WindowedValue> one = WindowedValue.of( - KV.of("one", 1), new Instant(1L), firstWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING); - WindowedValue> two = WindowedValue.of( - KV.of("two", 2), new Instant(20L), firstWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING); + WindowedValue> one = + WindowedValue.of( + KV.of("one", 1), new Instant(1L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING); + WindowedValue> two = + WindowedValue.of( + KV.of("two", 2), new Instant(20L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING); container.write(mapView, ImmutableList.>of(one, two)); Map viewContents = - container.createReaderForViews(ImmutableList.>of(mapView)) - .get(mapView, firstWindow); + container + .createReaderForViews(ImmutableList.>of(mapView)) + .get(mapView, FIRST_WINDOW); assertThat(viewContents, hasEntry("one", 1)); assertThat(viewContents, hasEntry("two", 2)); assertThat(viewContents.size(), is(2)); @@ -148,26 +151,40 @@ public void getAfterWriteReturnsPaneInWindow() throws Exception { @Test public void getReturnsLatestPaneInWindow() throws Exception { - WindowedValue> one = WindowedValue.of(KV.of("one", 1), new Instant(1L), - secondWindow, PaneInfo.createPane(true, false, Timing.EARLY)); - WindowedValue> two = WindowedValue.of(KV.of("two", 2), new Instant(20L), - secondWindow, PaneInfo.createPane(true, false, Timing.EARLY)); + WindowedValue> one = + WindowedValue.of( + KV.of("one", 1), + new Instant(1L), + SECOND_WINDOW, + PaneInfo.createPane(true, false, Timing.EARLY)); + WindowedValue> two = + WindowedValue.of( + KV.of("two", 2), + new Instant(20L), + SECOND_WINDOW, + PaneInfo.createPane(true, false, Timing.EARLY)); container.write(mapView, ImmutableList.>of(one, two)); Map viewContents = - container.createReaderForViews(ImmutableList.>of(mapView)) - .get(mapView, secondWindow); + container + .createReaderForViews(ImmutableList.>of(mapView)) + .get(mapView, SECOND_WINDOW); assertThat(viewContents, hasEntry("one", 1)); assertThat(viewContents, hasEntry("two", 2)); assertThat(viewContents.size(), is(2)); - WindowedValue> three = WindowedValue.of(KV.of("three", 3), - new Instant(300L), secondWindow, PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)); + WindowedValue> three = + WindowedValue.of( + KV.of("three", 3), + new Instant(300L), + SECOND_WINDOW, + PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)); container.write(mapView, ImmutableList.>of(three)); Map overwrittenViewContents = - container.createReaderForViews(ImmutableList.>of(mapView)) - .get(mapView, secondWindow); + container + .createReaderForViews(ImmutableList.>of(mapView)) + .get(mapView, SECOND_WINDOW); assertThat(overwrittenViewContents, hasEntry("three", 3)); assertThat(overwrittenViewContents.size(), is(1)); } @@ -259,66 +276,98 @@ public void getOnReaderForViewNotInReaderFails() { @Test public void writeForMultipleElementsInDifferentWindowsSucceeds() throws Exception { - WindowedValue firstWindowedValue = WindowedValue.of(2.875, - firstWindow.maxTimestamp().minus(200L), firstWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING); + WindowedValue firstWindowedValue = + WindowedValue.of( + 2.875, + FIRST_WINDOW.maxTimestamp().minus(200L), + FIRST_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING); WindowedValue secondWindowedValue = - WindowedValue.of(4.125, secondWindow.maxTimestamp().minus(2_000_000L), secondWindow, + WindowedValue.of( + 4.125, + SECOND_WINDOW.maxTimestamp().minus(2_000_000L), + SECOND_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING); container.write(singletonView, ImmutableList.of(firstWindowedValue, secondWindowedValue)); assertThat( - container.createReaderForViews(ImmutableList.>of(singletonView)) - .get(singletonView, firstWindow), + container + .createReaderForViews(ImmutableList.>of(singletonView)) + .get(singletonView, FIRST_WINDOW), equalTo(2.875)); assertThat( - container.createReaderForViews(ImmutableList.>of(singletonView)) - .get(singletonView, secondWindow), + container + .createReaderForViews(ImmutableList.>of(singletonView)) + .get(singletonView, SECOND_WINDOW), equalTo(4.125)); } @Test public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Exception { - WindowedValue firstValue = WindowedValue.of( - 44, firstWindow.maxTimestamp().minus(200L), firstWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING); - WindowedValue secondValue = WindowedValue.of( - 44, firstWindow.maxTimestamp().minus(200L), firstWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING); + WindowedValue firstValue = + WindowedValue.of( + 44, + FIRST_WINDOW.maxTimestamp().minus(200L), + FIRST_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING); + WindowedValue secondValue = + WindowedValue.of( + 44, + FIRST_WINDOW.maxTimestamp().minus(200L), + FIRST_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING); container.write(iterableView, ImmutableList.of(firstValue, secondValue)); assertThat( - container.createReaderForViews(ImmutableList.>of(iterableView)) - .get(iterableView, firstWindow), + container + .createReaderForViews(ImmutableList.>of(iterableView)) + .get(iterableView, FIRST_WINDOW), contains(44, 44)); } @Test public void writeForElementInMultipleWindowsSucceeds() throws Exception { WindowedValue multiWindowedValue = - WindowedValue.of(2.875, firstWindow.maxTimestamp().minus(200L), - ImmutableList.of(firstWindow, secondWindow), PaneInfo.ON_TIME_AND_ONLY_FIRING); + WindowedValue.of( + 2.875, + FIRST_WINDOW.maxTimestamp().minus(200L), + ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW), + PaneInfo.ON_TIME_AND_ONLY_FIRING); container.write(singletonView, ImmutableList.of(multiWindowedValue)); assertThat( - container.createReaderForViews(ImmutableList.>of(singletonView)) - .get(singletonView, firstWindow), + container + .createReaderForViews(ImmutableList.>of(singletonView)) + .get(singletonView, FIRST_WINDOW), equalTo(2.875)); assertThat( - container.createReaderForViews(ImmutableList.>of(singletonView)) - .get(singletonView, secondWindow), + container + .createReaderForViews(ImmutableList.>of(singletonView)) + .get(singletonView, SECOND_WINDOW), equalTo(2.875)); } @Test public void finishDoesNotOverwriteWrittenElements() throws Exception { - WindowedValue> one = WindowedValue.of(KV.of("one", 1), new Instant(1L), - secondWindow, PaneInfo.createPane(true, false, Timing.EARLY)); - WindowedValue> two = WindowedValue.of(KV.of("two", 2), new Instant(20L), - secondWindow, PaneInfo.createPane(true, false, Timing.EARLY)); + WindowedValue> one = + WindowedValue.of( + KV.of("one", 1), + new Instant(1L), + SECOND_WINDOW, + PaneInfo.createPane(true, false, Timing.EARLY)); + WindowedValue> two = + WindowedValue.of( + KV.of("two", 2), + new Instant(20L), + SECOND_WINDOW, + PaneInfo.createPane(true, false, Timing.EARLY)); container.write(mapView, ImmutableList.>of(one, two)); - immediatelyInvokeCallback(mapView, secondWindow); + immediatelyInvokeCallback(mapView, SECOND_WINDOW); Map viewContents = - container.createReaderForViews(ImmutableList.>of(mapView)) - .get(mapView, secondWindow); + container + .createReaderForViews(ImmutableList.>of(mapView)) + .get(mapView, SECOND_WINDOW); assertThat(viewContents, hasEntry("one", 1)); assertThat(viewContents, hasEntry("two", 2)); @@ -327,16 +376,90 @@ public void finishDoesNotOverwriteWrittenElements() throws Exception { @Test public void finishOnPendingViewsSetsEmptyElements() throws Exception { - immediatelyInvokeCallback(mapView, secondWindow); + immediatelyInvokeCallback(mapView, SECOND_WINDOW); Future> mapFuture = getFutureOfView( container.createReaderForViews(ImmutableList.>of(mapView)), mapView, - secondWindow); + SECOND_WINDOW); assertThat(mapFuture.get().isEmpty(), is(true)); } + /** + * Demonstrates that calling isReady on an empty container throws an + * {@link IllegalArgumentException}. + */ + @Test + public void isReadyInEmptyReaderThrows() { + ReadyCheckingSideInputReader reader = + container.createReaderForViews(ImmutableList.>of()); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("does not contain"); + thrown.expectMessage(ImmutableList.of().toString()); + reader.isReady(mapView, GlobalWindow.INSTANCE); + } + + /** + * Demonstrates that calling isReady returns false until elements are written to the + * {@link PCollectionView}, {@link BoundedWindow} pair, at which point it returns true. + */ + @Test + public void isReadyForSomeNotReadyViewsFalseUntilElements() { + container.write( + mapView, + ImmutableList.of( + WindowedValue.of( + KV.of("one", 1), + SECOND_WINDOW.maxTimestamp().minus(100L), + SECOND_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING))); + + ReadyCheckingSideInputReader reader = + container.createReaderForViews(ImmutableList.of(mapView, singletonView)); + assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false)); + assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); + + assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false)); + + container.write( + mapView, + ImmutableList.of( + WindowedValue.of( + KV.of("too", 2), + FIRST_WINDOW.maxTimestamp().minus(100L), + FIRST_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING))); + assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true)); + + container.write( + singletonView, + ImmutableList.of( + WindowedValue.of( + 1.25, + SECOND_WINDOW.maxTimestamp().minus(100L), + SECOND_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING))); + assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); + assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true)); + + assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(false)); + assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); + } + + @Test + public void isReadyForEmptyWindowTrue() { + immediatelyInvokeCallback(mapView, GlobalWindow.INSTANCE); + + ReadyCheckingSideInputReader reader = + container.createReaderForViews(ImmutableList.of(mapView, singletonView)); + assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(true)); + assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); + + immediatelyInvokeCallback(singletonView, GlobalWindow.INSTANCE); + assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true)); + } + /** * When a callAfterWindowCloses with the specified view's producing transform, window, and * windowing strategy is invoked, immediately execute the callback.