diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java index e6fd49f750..d0199e079a 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java @@ -26,22 +26,21 @@ import com.google.cloud.dataflow.sdk.util.WindowingStrategy; import com.google.cloud.dataflow.sdk.values.PCollectionView; import com.google.common.base.MoreObjects; +import com.google.common.base.Optional; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; @@ -51,36 +50,26 @@ * available and writing to a {@link PCollectionView}. */ class InProcessSideInputContainer { - private final InProcessEvaluationContext evaluationContext; private final Collection> containedViews; - private final LoadingCache, - SettableFuture>>> viewByWindows; + private final LoadingCache< + PCollectionViewWindow, AtomicReference>>> + viewByWindows; /** * Create a new {@link InProcessSideInputContainer} with the provided views and the provided * context. */ public static InProcessSideInputContainer create( - InProcessEvaluationContext context, Collection> containedViews) { - CacheLoader, SettableFuture>>> - loader = new CacheLoader, - SettableFuture>>>() { - @Override - public SettableFuture>> load( - PCollectionViewWindow view) { - return SettableFuture.create(); - } - }; - LoadingCache, SettableFuture>>> - viewByWindows = CacheBuilder.newBuilder().build(loader); - return new InProcessSideInputContainer(context, containedViews, viewByWindows); + final InProcessEvaluationContext context, Collection> containedViews) { + LoadingCache, AtomicReference>>> + viewByWindows = CacheBuilder.newBuilder().build(new CallbackSchedulingLoader(context)); + return new InProcessSideInputContainer(containedViews, viewByWindows); } - private InProcessSideInputContainer(InProcessEvaluationContext context, + private InProcessSideInputContainer( Collection> containedViews, - LoadingCache, SettableFuture>>> - viewByWindows) { - this.evaluationContext = context; + LoadingCache, AtomicReference>>> + viewByWindows) { this.containedViews = ImmutableSet.copyOf(containedViews); this.viewByWindows = viewByWindows; } @@ -146,37 +135,87 @@ private Map>> indexValuesByWindow( private void updatePCollectionViewWindowValues( PCollectionView view, BoundedWindow window, Collection> windowValues) { PCollectionViewWindow windowedView = PCollectionViewWindow.of(view, window); - SettableFuture>> future = null; - try { - future = viewByWindows.get(windowedView); - if (future.isDone()) { - Iterator> existingValues = future.get().iterator(); - PaneInfo newPane = windowValues.iterator().next().getPane(); - // The current value may have no elements, if no elements were produced for the window, - // but we are recieving late data. - if (!existingValues.hasNext() - || newPane.getIndex() > existingValues.next().getPane().getIndex()) { - viewByWindows.invalidate(windowedView); - viewByWindows.get(windowedView).set(windowValues); - } - } else { - future.set(windowValues); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - if (future != null && !future.isDone()) { - future.set(Collections.>emptyList()); - } - } catch (ExecutionException e) { - throw new RuntimeException(e.getCause()); + AtomicReference>> contents = + viewByWindows.getUnchecked(windowedView); + if (contents.compareAndSet(null, windowValues)) { + // the value had never been set, so we set it and are done. + return; + } + PaneInfo newPane = windowValues.iterator().next().getPane(); + + Iterable> existingValues; + long existingPane; + do { + existingValues = contents.get(); + existingPane = + Iterables.isEmpty(existingValues) + ? -1L + : existingValues.iterator().next().getPane().getIndex(); + } while (newPane.getIndex() > existingPane + && !contents.compareAndSet(existingValues, windowValues)); + } + + private static class CallbackSchedulingLoader extends + CacheLoader, AtomicReference>>> { + private final InProcessEvaluationContext context; + + public CallbackSchedulingLoader( + InProcessEvaluationContext context) { + this.context = context; + } + + @Override + public AtomicReference>> + load(PCollectionViewWindow view) { + + AtomicReference>> contents = new AtomicReference<>(); + WindowingStrategy windowingStrategy = view.getView().getWindowingStrategyInternal(); + + context.scheduleAfterOutputWouldBeProduced(view.getView(), + view.getWindow(), + windowingStrategy, + new WriteEmptyViewContents(view.getView(), view.getWindow(), contents)); + return contents; + } + } + + private static class WriteEmptyViewContents implements Runnable { + private final PCollectionView view; + private final BoundedWindow window; + private final AtomicReference>> contents; + + private WriteEmptyViewContents(PCollectionView view, BoundedWindow window, + AtomicReference>> contents) { + this.contents = contents; + this.view = view; + this.window = window; + } + + @Override + public void run() { + // The requested window has closed without producing elements, so reflect that in + // the PCollectionView. If set has already been called, will do nothing. + contents.compareAndSet(null, Collections.>emptyList()); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("view", view) + .add("window", window) + .toString(); } } private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader { private final Collection> readerViews; + private final LoadingCache< + PCollectionViewWindow, Optional>>> + viewContents; private SideInputContainerSideInputReader(Collection> readerViews) { this.readerViews = ImmutableSet.copyOf(readerViews); + this.viewContents = CacheBuilder.newBuilder().build(new CurrentViewContentsLoader()); } @Override @@ -187,43 +226,25 @@ public boolean isReady(final PCollectionView view, final BoundedWindow window + "Contained views; %s", view, readerViews); - return getViewFuture(view, window).isDone(); + return viewContents.getUnchecked(PCollectionViewWindow.of(view, window)).isPresent(); } @Override @Nullable public T get(final PCollectionView view, final BoundedWindow window) { + checkArgument(readerViews.contains(view), + "call to get(PCollectionView) with unknown view: %s", + view); checkArgument( - readerViews.contains(view), "calling get(PCollectionView) with unknown view: " + view); - try { - final Future>> future = getViewFuture(view, window); - // Safe covariant cast - @SuppressWarnings("unchecked") - Iterable> values = (Iterable>) future.get(); - return view.fromIterableInternal(values); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - } - - /** - * Gets the future containing the contents of the provided {@link PCollectionView} in the - * provided {@link BoundedWindow}, setting up a callback to populate the future with empty - * contents if necessary. - */ - private Future>> getViewFuture( - final PCollectionView view, final BoundedWindow window) { - PCollectionViewWindow windowedView = PCollectionViewWindow.of(view, window); - final SettableFuture>> future = - viewByWindows.getUnchecked(windowedView); - - WindowingStrategy windowingStrategy = view.getWindowingStrategyInternal(); - evaluationContext.scheduleAfterOutputWouldBeProduced( - view, window, windowingStrategy, new WriteEmptyViewContents(view, window, future)); - return future; + isReady(view, window), + "calling get() on PCollectionView %s that is not ready in window %s", + view, + window); + // Safe covariant cast + @SuppressWarnings("unchecked") Iterable> values = + (Iterable>) viewContents.getUnchecked(PCollectionViewWindow.of(view, + window)).get(); + return view.fromIterableInternal(values); } @Override @@ -237,31 +258,17 @@ public boolean isEmpty() { } } - private static class WriteEmptyViewContents implements Runnable { - private final PCollectionView view; - private final BoundedWindow window; - private final SettableFuture>> future; - - private WriteEmptyViewContents(PCollectionView view, BoundedWindow window, - SettableFuture>> future) { - this.future = future; - this.view = view; - this.window = window; - } - - @Override - public void run() { - // The requested window has closed without producing elements, so reflect that in - // the PCollectionView. If set has already been called, will do nothing. - future.set(Collections.>emptyList()); - } + /** + * A {@link CacheLoader} that loads the current contents of a {@link PCollectionViewWindow} into + * an optional. + */ + private class CurrentViewContentsLoader extends CacheLoader< + PCollectionViewWindow, Optional>>> { @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("view", view) - .add("window", window) - .toString(); + public Optional>> + load(PCollectionViewWindow key) { + return Optional.fromNullable(viewByWindows.getUnchecked(key).get()); } } } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java index 7cfb63a84b..086f7642f6 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java @@ -147,6 +147,9 @@ public void writeToViewWriterThenReadReads() { WindowedValue.of( 4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1)); viewWriter.add(Collections.singleton(overrittenSecondValue)); + assertThat(reader.get(view, second), containsInAnyOrder(2)); + // The cached value is served in the earlier reader + reader = context.createSideInputReader(ImmutableList.>of(view)); assertThat(reader.get(view, second), containsInAnyOrder(4444)); } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java index 9073a4715e..a46747622a 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java @@ -20,6 +20,7 @@ import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; import static org.mockito.Mockito.doAnswer; import com.google.cloud.dataflow.sdk.coders.KvCoder; @@ -58,8 +59,10 @@ import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; /** * Tests for {@link InProcessSideInputContainer}. @@ -191,46 +194,12 @@ public void getReturnsLatestPaneInWindow() throws Exception { * there is data in the pane. */ @Test - public void getBlocksUntilPaneAvailable() throws Exception { - BoundedWindow window = - new BoundedWindow() { - @Override - public Instant maxTimestamp() { - return new Instant(1024L); - } - }; - Future singletonFuture = - getFutureOfView( - container.createReaderForViews(ImmutableList.>of(singletonView)), - singletonView, - window); - - WindowedValue singletonValue = - WindowedValue.of(4.75, new Instant(475L), window, PaneInfo.ON_TIME_AND_ONLY_FIRING); - - assertThat(singletonFuture.isDone(), is(false)); - container.write(singletonView, ImmutableList.>of(singletonValue)); - assertThat(singletonFuture.get(), equalTo(4.75)); - } - - @Test - public void withPCollectionViewsWithPutInOriginalReturnsContents() throws Exception { - BoundedWindow window = new BoundedWindow() { - @Override - public Instant maxTimestamp() { - return new Instant(1024L); - } - }; - SideInputReader newReader = - container.createReaderForViews(ImmutableList.>of(singletonView)); - Future singletonFuture = getFutureOfView(newReader, singletonView, window); - - WindowedValue singletonValue = - WindowedValue.of(24.125, new Instant(475L), window, PaneInfo.ON_TIME_AND_ONLY_FIRING); + public void getNotReadyThrows() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("not ready"); - assertThat(singletonFuture.isDone(), is(false)); - container.write(singletonView, ImmutableList.>of(singletonValue)); - assertThat(singletonFuture.get(), equalTo(24.125)); + container.createReaderForViews(ImmutableList.>of(mapView)) + .get(mapView, GlobalWindow.INSTANCE); } @Test @@ -427,7 +396,8 @@ public void isReadyForSomeNotReadyViewsFalseUntilElements() { FIRST_WINDOW.maxTimestamp().minus(100L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING))); - assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true)); + // Cached value is false + assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false)); container.write( singletonView, @@ -438,22 +408,37 @@ public void isReadyForSomeNotReadyViewsFalseUntilElements() { SECOND_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING))); assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); - assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true)); + assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false)); assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(false)); assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); + + reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView)); + assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); + assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true)); + assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true)); } @Test - public void isReadyForEmptyWindowTrue() { + public void isReadyForEmptyWindowTrue() throws Exception { + CountDownLatch onComplete = new CountDownLatch(1); immediatelyInvokeCallback(mapView, GlobalWindow.INSTANCE); + CountDownLatch latch = invokeLatchedCallback(singletonView, GlobalWindow.INSTANCE, onComplete); ReadyCheckingSideInputReader reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView)); assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(true)); assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); - immediatelyInvokeCallback(singletonView, GlobalWindow.INSTANCE); + latch.countDown(); + if (!onComplete.await(1500L, TimeUnit.MILLISECONDS)) { + fail("Callback to set empty values did not complete!"); + } + // The cached value was false, so it continues to be true + assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); + + // A new reader for the same container gets a fresh look + reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView)); assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true)); } @@ -480,6 +465,45 @@ public Void answer(InvocationOnMock invocation) throws Throwable { Mockito.any(Runnable.class)); } + /** + * When a callAfterWindowCloses with the specified view's producing transform, window, and + * windowing strategy is invoked, start a thread that will invoke the callback after the returned + * {@link CountDownLatch} is counted down once. + */ + private CountDownLatch invokeLatchedCallback( + PCollectionView view, BoundedWindow window, final CountDownLatch onComplete) { + final CountDownLatch runLatch = new CountDownLatch(1); + doAnswer( + new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Object callback = invocation.getArguments()[3]; + final Runnable callbackRunnable = (Runnable) callback; + Executors.newSingleThreadExecutor().submit(new Runnable() { + public void run() { + try { + if (!runLatch.await(1500L, TimeUnit.MILLISECONDS)) { + fail("Run latch didn't count down within timeout"); + } + callbackRunnable.run(); + onComplete.countDown(); + } catch (InterruptedException e) { + fail("Unexpectedly interrupted while waiting for latch to be counted down"); + } + } + }); + return null; + } + }) + .when(context) + .scheduleAfterOutputWouldBeProduced( + Mockito.eq(view), + Mockito.eq(window), + Mockito.eq(view.getWindowingStrategyInternal()), + Mockito.any(Runnable.class)); + return runLatch; + } + private Future getFutureOfView(final SideInputReader myReader, final PCollectionView view, final BoundedWindow window) { Callable callable = new Callable() {