From 49689fced52e29d7efd386202265d0a105fab276 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 10 May 2016 13:22:20 -0700 Subject: [PATCH 1/2] Cache read SideInput Contents in the InProcessSideInputContainer This ensures that while processing a bundle all elements see the same contents for any SideInput Window. --- .../direct/InProcessSideInputContainer.java | 33 +++++++++++++++---- .../InProcessEvaluationContextTest.java | 3 ++ .../InProcessSideInputContainerTest.java | 15 +++++++-- 3 files changed, 42 insertions(+), 9 deletions(-) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java index 1ef8f1390608..96a9ad23ff04 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.values.PCollectionView; import com.google.common.base.MoreObjects; +import com.google.common.base.Optional; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -211,9 +212,13 @@ public String toString() { private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader { private final Collection> readerViews; + private final LoadingCache< + PCollectionViewWindow, Optional>>> + viewContents; private SideInputContainerSideInputReader(Collection> readerViews) { this.readerViews = ImmutableSet.copyOf(readerViews); + this.viewContents = CacheBuilder.newBuilder().build(new CurrentViewContentsLoader()); } @Override @@ -224,22 +229,23 @@ public boolean isReady(final PCollectionView view, final BoundedWindow window + "Contained views; %s", view, readerViews); - return viewByWindows.getUnchecked(PCollectionViewWindow.of(view, window)).get() != null; + return viewContents.getUnchecked(PCollectionViewWindow.of(view, window)).isPresent(); } @Override @Nullable public T get(final PCollectionView view, final BoundedWindow window) { - checkArgument(readerViews.contains(view), - "calling get(PCollectionView) with unknown view: " + view); - checkArgument(isReady(view, window), - "calling get(PCollectionView) with view %s that is not ready in window %s", + checkArgument( + readerViews.contains(view), "calling get(PCollectionView) with unknown view: " + view); + checkArgument( + isReady(view, window), + "calling get() on a PCollectionView %s that is not ready in window %s", view, window); // Safe covariant cast @SuppressWarnings("unchecked") Iterable> values = - (Iterable>) viewByWindows - .getUnchecked(PCollectionViewWindow.of(view, window)).get(); + (Iterable>) viewContents.getUnchecked(PCollectionViewWindow.of(view, + window)).get(); return view.fromIterableInternal(values); } @@ -254,4 +260,17 @@ public boolean isEmpty() { } } + /** + * A {@link CacheLoader} that loads the current contents of a {@link PCollectionViewWindow} into + * an optional. + */ + private class CurrentViewContentsLoader extends CacheLoader< + PCollectionViewWindow, Optional>>> { + + @Override + public Optional>> + load(PCollectionViewWindow key) { + return Optional.fromNullable(viewByWindows.getUnchecked(key).get()); + } + } } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java index b73e41a2093b..10b87214eb30 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java @@ -150,6 +150,9 @@ public void writeToViewWriterThenReadReads() { WindowedValue.of( 4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1)); viewWriter.add(Collections.singleton(overrittenSecondValue)); + assertThat(reader.get(view, second), containsInAnyOrder(2)); + // The cached value is served in the earlier reader + reader = context.createSideInputReader(ImmutableList.>of(view)); assertThat(reader.get(view, second), containsInAnyOrder(4444)); } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java index 2f376ddac747..746c0f856e01 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java @@ -399,7 +399,8 @@ public void isReadyForSomeNotReadyViewsFalseUntilElements() { FIRST_WINDOW.maxTimestamp().minus(100L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING))); - assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true)); + // Cached value is false + assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false)); container.write( singletonView, @@ -410,10 +411,15 @@ public void isReadyForSomeNotReadyViewsFalseUntilElements() { SECOND_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING))); assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); - assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true)); + assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false)); assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(false)); assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); + + reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView)); + assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); + assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true)); + assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true)); } @Test @@ -431,6 +437,11 @@ public void isReadyForEmptyWindowTrue() throws Exception { if (!onComplete.await(1500L, TimeUnit.MILLISECONDS)) { fail("Callback to set empty values did not complete!"); } + // The cached value was false, so it continues to be true + assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); + + // A new reader for the same container gets a fresh look + reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView)); assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true)); } From 59a12902e37c7ee9cc9621a690536202d73b347c Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 9 May 2016 15:47:27 -0700 Subject: [PATCH 2/2] Minor checkArgument style fix --- .../beam/runners/direct/InProcessSideInputContainer.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java index 96a9ad23ff04..f53f59065566 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java @@ -235,11 +235,12 @@ public boolean isReady(final PCollectionView view, final BoundedWindow window @Override @Nullable public T get(final PCollectionView view, final BoundedWindow window) { - checkArgument( - readerViews.contains(view), "calling get(PCollectionView) with unknown view: " + view); + checkArgument(readerViews.contains(view), + "call to get(PCollectionView) with unknown view: %s", + view); checkArgument( isReady(view, window), - "calling get() on a PCollectionView %s that is not ready in window %s", + "calling get() on PCollectionView %s that is not ready in window %s", view, window); // Safe covariant cast