Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,21 @@
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.SettableFuture;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;

import javax.annotation.Nullable;

Expand All @@ -51,36 +50,26 @@
* available and writing to a {@link PCollectionView}.
*/
class InProcessSideInputContainer {
private final InProcessEvaluationContext evaluationContext;
private final Collection<PCollectionView<?>> containedViews;
private final LoadingCache<PCollectionViewWindow<?>,
SettableFuture<Iterable<? extends WindowedValue<?>>>> viewByWindows;
private final LoadingCache<
PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
viewByWindows;

/**
* Create a new {@link InProcessSideInputContainer} with the provided views and the provided
* context.
*/
public static InProcessSideInputContainer create(
InProcessEvaluationContext context, Collection<PCollectionView<?>> containedViews) {
CacheLoader<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>>
loader = new CacheLoader<PCollectionViewWindow<?>,
SettableFuture<Iterable<? extends WindowedValue<?>>>>() {
@Override
public SettableFuture<Iterable<? extends WindowedValue<?>>> load(
PCollectionViewWindow<?> view) {
return SettableFuture.create();
}
};
LoadingCache<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>>
viewByWindows = CacheBuilder.newBuilder().build(loader);
return new InProcessSideInputContainer(context, containedViews, viewByWindows);
final InProcessEvaluationContext context, Collection<PCollectionView<?>> containedViews) {
LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
viewByWindows = CacheBuilder.newBuilder().build(new CallbackSchedulingLoader(context));
return new InProcessSideInputContainer(containedViews, viewByWindows);
}

private InProcessSideInputContainer(InProcessEvaluationContext context,
private InProcessSideInputContainer(
Collection<PCollectionView<?>> containedViews,
LoadingCache<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>>
viewByWindows) {
this.evaluationContext = context;
LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
viewByWindows) {
this.containedViews = ImmutableSet.copyOf(containedViews);
this.viewByWindows = viewByWindows;
}
Expand Down Expand Up @@ -146,37 +135,87 @@ private Map<BoundedWindow, Collection<WindowedValue<?>>> indexValuesByWindow(
private void updatePCollectionViewWindowValues(
PCollectionView<?> view, BoundedWindow window, Collection<WindowedValue<?>> windowValues) {
PCollectionViewWindow<?> windowedView = PCollectionViewWindow.of(view, window);
SettableFuture<Iterable<? extends WindowedValue<?>>> future = null;
try {
future = viewByWindows.get(windowedView);
if (future.isDone()) {
Iterator<? extends WindowedValue<?>> existingValues = future.get().iterator();
PaneInfo newPane = windowValues.iterator().next().getPane();
// The current value may have no elements, if no elements were produced for the window,
// but we are recieving late data.
if (!existingValues.hasNext()
|| newPane.getIndex() > existingValues.next().getPane().getIndex()) {
viewByWindows.invalidate(windowedView);
viewByWindows.get(windowedView).set(windowValues);
}
} else {
future.set(windowValues);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (future != null && !future.isDone()) {
future.set(Collections.<WindowedValue<?>>emptyList());
}
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
AtomicReference<Iterable<? extends WindowedValue<?>>> contents =
viewByWindows.getUnchecked(windowedView);
if (contents.compareAndSet(null, windowValues)) {
// the value had never been set, so we set it and are done.
return;
}
PaneInfo newPane = windowValues.iterator().next().getPane();

Iterable<? extends WindowedValue<?>> existingValues;
long existingPane;
do {
existingValues = contents.get();
existingPane =
Iterables.isEmpty(existingValues)
? -1L
: existingValues.iterator().next().getPane().getIndex();
} while (newPane.getIndex() > existingPane
&& !contents.compareAndSet(existingValues, windowValues));
}

private static class CallbackSchedulingLoader extends
CacheLoader<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>> {
private final InProcessEvaluationContext context;

public CallbackSchedulingLoader(
InProcessEvaluationContext context) {
this.context = context;
}

@Override
public AtomicReference<Iterable<? extends WindowedValue<?>>>
load(PCollectionViewWindow<?> view) {

AtomicReference<Iterable<? extends WindowedValue<?>>> contents = new AtomicReference<>();
WindowingStrategy<?, ?> windowingStrategy = view.getView().getWindowingStrategyInternal();

context.scheduleAfterOutputWouldBeProduced(view.getView(),
view.getWindow(),
windowingStrategy,
new WriteEmptyViewContents(view.getView(), view.getWindow(), contents));
return contents;
}
}

private static class WriteEmptyViewContents implements Runnable {
private final PCollectionView<?> view;
private final BoundedWindow window;
private final AtomicReference<Iterable<? extends WindowedValue<?>>> contents;

private WriteEmptyViewContents(PCollectionView<?> view, BoundedWindow window,
AtomicReference<Iterable<? extends WindowedValue<?>>> contents) {
this.contents = contents;
this.view = view;
this.window = window;
}

@Override
public void run() {
// The requested window has closed without producing elements, so reflect that in
// the PCollectionView. If set has already been called, will do nothing.
contents.compareAndSet(null, Collections.<WindowedValue<?>>emptyList());
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("view", view)
.add("window", window)
.toString();
}
}

private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader {
private final Collection<PCollectionView<?>> readerViews;
private final LoadingCache<
PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>>
viewContents;

private SideInputContainerSideInputReader(Collection<PCollectionView<?>> readerViews) {
this.readerViews = ImmutableSet.copyOf(readerViews);
this.viewContents = CacheBuilder.newBuilder().build(new CurrentViewContentsLoader());
}

@Override
Expand All @@ -187,43 +226,25 @@ public boolean isReady(final PCollectionView<?> view, final BoundedWindow window
+ "Contained views; %s",
view,
readerViews);
return getViewFuture(view, window).isDone();
return viewContents.getUnchecked(PCollectionViewWindow.of(view, window)).isPresent();
}

@Override
@Nullable
public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
checkArgument(readerViews.contains(view),
"call to get(PCollectionView) with unknown view: %s",
view);
checkArgument(
readerViews.contains(view), "calling get(PCollectionView) with unknown view: " + view);
try {
final Future<Iterable<? extends WindowedValue<?>>> future = getViewFuture(view, window);
// Safe covariant cast
@SuppressWarnings("unchecked")
Iterable<WindowedValue<?>> values = (Iterable<WindowedValue<?>>) future.get();
return view.fromIterableInternal(values);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

/**
* Gets the future containing the contents of the provided {@link PCollectionView} in the
* provided {@link BoundedWindow}, setting up a callback to populate the future with empty
* contents if necessary.
*/
private <T> Future<Iterable<? extends WindowedValue<?>>> getViewFuture(
final PCollectionView<T> view, final BoundedWindow window) {
PCollectionViewWindow<T> windowedView = PCollectionViewWindow.of(view, window);
final SettableFuture<Iterable<? extends WindowedValue<?>>> future =
viewByWindows.getUnchecked(windowedView);

WindowingStrategy<?, ?> windowingStrategy = view.getWindowingStrategyInternal();
evaluationContext.scheduleAfterOutputWouldBeProduced(
view, window, windowingStrategy, new WriteEmptyViewContents(view, window, future));
return future;
isReady(view, window),
"calling get() on PCollectionView %s that is not ready in window %s",
view,
window);
// Safe covariant cast
@SuppressWarnings("unchecked") Iterable<WindowedValue<?>> values =
(Iterable<WindowedValue<?>>) viewContents.getUnchecked(PCollectionViewWindow.of(view,
window)).get();
return view.fromIterableInternal(values);
}

@Override
Expand All @@ -237,31 +258,17 @@ public boolean isEmpty() {
}
}

private static class WriteEmptyViewContents implements Runnable {
private final PCollectionView<?> view;
private final BoundedWindow window;
private final SettableFuture<Iterable<? extends WindowedValue<?>>> future;

private WriteEmptyViewContents(PCollectionView<?> view, BoundedWindow window,
SettableFuture<Iterable<? extends WindowedValue<?>>> future) {
this.future = future;
this.view = view;
this.window = window;
}

@Override
public void run() {
// The requested window has closed without producing elements, so reflect that in
// the PCollectionView. If set has already been called, will do nothing.
future.set(Collections.<WindowedValue<?>>emptyList());
}
/**
* A {@link CacheLoader} that loads the current contents of a {@link PCollectionViewWindow} into
* an optional.
*/
private class CurrentViewContentsLoader extends CacheLoader<
PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>> {

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("view", view)
.add("window", window)
.toString();
public Optional<? extends Iterable<? extends WindowedValue<?>>>
load(PCollectionViewWindow<?> key) {
return Optional.fromNullable(viewByWindows.getUnchecked(key).get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ public void writeToViewWriterThenReadReads() {
WindowedValue.of(
4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1));
viewWriter.add(Collections.singleton(overrittenSecondValue));
assertThat(reader.get(view, second), containsInAnyOrder(2));
// The cached value is served in the earlier reader
reader = context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view));
assertThat(reader.get(view, second), containsInAnyOrder(4444));
}

Expand Down
Loading