diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java index 1ef8f1390608..f53f59065566 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.values.PCollectionView; import com.google.common.base.MoreObjects; +import com.google.common.base.Optional; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -211,9 +212,13 @@ public String toString() { private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader { private final Collection> readerViews; + private final LoadingCache< + PCollectionViewWindow, Optional>>> + viewContents; private SideInputContainerSideInputReader(Collection> readerViews) { this.readerViews = ImmutableSet.copyOf(readerViews); + this.viewContents = CacheBuilder.newBuilder().build(new CurrentViewContentsLoader()); } @Override @@ -224,22 +229,24 @@ public boolean isReady(final PCollectionView view, final BoundedWindow window + "Contained views; %s", view, readerViews); - return viewByWindows.getUnchecked(PCollectionViewWindow.of(view, window)).get() != null; + return viewContents.getUnchecked(PCollectionViewWindow.of(view, window)).isPresent(); } @Override @Nullable public T get(final PCollectionView view, final BoundedWindow window) { checkArgument(readerViews.contains(view), - "calling get(PCollectionView) with unknown view: " + view); - checkArgument(isReady(view, window), - "calling get(PCollectionView) with view %s that is not ready in window %s", + "call to get(PCollectionView) with unknown view: %s", + view); + checkArgument( + isReady(view, window), + "calling get() on PCollectionView %s that is not ready in window %s", view, window); // Safe covariant cast @SuppressWarnings("unchecked") Iterable> values = - (Iterable>) viewByWindows - .getUnchecked(PCollectionViewWindow.of(view, window)).get(); + (Iterable>) viewContents.getUnchecked(PCollectionViewWindow.of(view, + window)).get(); return view.fromIterableInternal(values); } @@ -254,4 +261,17 @@ public boolean isEmpty() { } } + /** + * A {@link CacheLoader} that loads the current contents of a {@link PCollectionViewWindow} into + * an optional. + */ + private class CurrentViewContentsLoader extends CacheLoader< + PCollectionViewWindow, Optional>>> { + + @Override + public Optional>> + load(PCollectionViewWindow key) { + return Optional.fromNullable(viewByWindows.getUnchecked(key).get()); + } + } } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java index b73e41a2093b..10b87214eb30 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java @@ -150,6 +150,9 @@ public void writeToViewWriterThenReadReads() { WindowedValue.of( 4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1)); viewWriter.add(Collections.singleton(overrittenSecondValue)); + assertThat(reader.get(view, second), containsInAnyOrder(2)); + // The cached value is served in the earlier reader + reader = context.createSideInputReader(ImmutableList.>of(view)); assertThat(reader.get(view, second), containsInAnyOrder(4444)); } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java index 2f376ddac747..746c0f856e01 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java @@ -399,7 +399,8 @@ public void isReadyForSomeNotReadyViewsFalseUntilElements() { FIRST_WINDOW.maxTimestamp().minus(100L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING))); - assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true)); + // Cached value is false + assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false)); container.write( singletonView, @@ -410,10 +411,15 @@ public void isReadyForSomeNotReadyViewsFalseUntilElements() { SECOND_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING))); assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); - assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true)); + assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false)); assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(false)); assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); + + reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView)); + assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); + assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true)); + assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true)); } @Test @@ -431,6 +437,11 @@ public void isReadyForEmptyWindowTrue() throws Exception { if (!onComplete.await(1500L, TimeUnit.MILLISECONDS)) { fail("Callback to set empty values did not complete!"); } + // The cached value was false, so it continues to be true + assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); + + // A new reader for the same container gets a fresh look + reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView)); assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true)); }