Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.beam.sdk.values.PCollectionView;

import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -211,9 +212,13 @@ public String toString() {

private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader {
private final Collection<PCollectionView<?>> readerViews;
private final LoadingCache<
PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>>
viewContents;

private SideInputContainerSideInputReader(Collection<PCollectionView<?>> readerViews) {
this.readerViews = ImmutableSet.copyOf(readerViews);
this.viewContents = CacheBuilder.newBuilder().build(new CurrentViewContentsLoader());
}

@Override
Expand All @@ -224,22 +229,24 @@ public boolean isReady(final PCollectionView<?> view, final BoundedWindow window
+ "Contained views; %s",
view,
readerViews);
return viewByWindows.getUnchecked(PCollectionViewWindow.of(view, window)).get() != null;
return viewContents.getUnchecked(PCollectionViewWindow.of(view, window)).isPresent();
}

@Override
@Nullable
public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
checkArgument(readerViews.contains(view),
"calling get(PCollectionView) with unknown view: " + view);
checkArgument(isReady(view, window),
"calling get(PCollectionView) with view %s that is not ready in window %s",
"call to get(PCollectionView) with unknown view: %s",
view);
checkArgument(
isReady(view, window),
"calling get() on PCollectionView %s that is not ready in window %s",
view,
window);
// Safe covariant cast
@SuppressWarnings("unchecked") Iterable<WindowedValue<?>> values =
(Iterable<WindowedValue<?>>) viewByWindows
.getUnchecked(PCollectionViewWindow.of(view, window)).get();
(Iterable<WindowedValue<?>>) viewContents.getUnchecked(PCollectionViewWindow.of(view,
window)).get();
return view.fromIterableInternal(values);
}

Expand All @@ -254,4 +261,17 @@ public boolean isEmpty() {
}
}

/**
* A {@link CacheLoader} that loads the current contents of a {@link PCollectionViewWindow} into
* an optional.
*/
private class CurrentViewContentsLoader extends CacheLoader<
PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>> {

@Override
public Optional<? extends Iterable<? extends WindowedValue<?>>>
load(PCollectionViewWindow<?> key) {
return Optional.fromNullable(viewByWindows.getUnchecked(key).get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ public void writeToViewWriterThenReadReads() {
WindowedValue.of(
4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1));
viewWriter.add(Collections.singleton(overrittenSecondValue));
assertThat(reader.get(view, second), containsInAnyOrder(2));
// The cached value is served in the earlier reader
reader = context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view));
assertThat(reader.get(view, second), containsInAnyOrder(4444));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ public void isReadyForSomeNotReadyViewsFalseUntilElements() {
FIRST_WINDOW.maxTimestamp().minus(100L),
FIRST_WINDOW,
PaneInfo.ON_TIME_AND_ONLY_FIRING)));
assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true));
// Cached value is false
assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false));

container.write(
singletonView,
Expand All @@ -410,10 +411,15 @@ public void isReadyForSomeNotReadyViewsFalseUntilElements() {
SECOND_WINDOW,
PaneInfo.ON_TIME_AND_ONLY_FIRING)));
assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true));
assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false));

assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(false));
assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false));

reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView));
assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true));
assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true));
}

@Test
Expand All @@ -431,6 +437,11 @@ public void isReadyForEmptyWindowTrue() throws Exception {
if (!onComplete.await(1500L, TimeUnit.MILLISECONDS)) {
fail("Callback to set empty values did not complete!");
}
// The cached value was false, so it continues to be true
assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false));

// A new reader for the same container gets a fresh look
reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView));
assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true));
}

Expand Down