Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -317,17 +317,30 @@ public String getStepName(AppliedPTransform<?, ?, ?> application) {
}

/**
* Returns a {@link SideInputReader} capable of reading the provided
* Returns a {@link ReadyCheckingSideInputReader} capable of reading the provided
* {@link PCollectionView PCollectionViews}.
*
* @param sideInputs the {@link PCollectionView PCollectionViews} the result should be able to
* read
* @return a {@link SideInputReader} that can read all of the provided
* {@link PCollectionView PCollectionViews}
* read
* @return a {@link SideInputReader} that can read all of the provided {@link PCollectionView
* PCollectionViews}
*/
public SideInputReader createSideInputReader(final List<PCollectionView<?>> sideInputs) {
public ReadyCheckingSideInputReader createSideInputReader(
final List<PCollectionView<?>> sideInputs) {
return sideInputContainer.createReaderForViews(sideInputs);
}

/**
* A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has
* had its contents set in a window.
*/
interface ReadyCheckingSideInputReader extends SideInputReader {
/**
* Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}.
*/
boolean isReady(PCollectionView<?> view, BoundedWindow window);
}

/**
* Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent
* of all other {@link CounterSet CounterSets} created by this call.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessEvaluationContext.ReadyCheckingSideInputReader;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.util.PCollectionViewWindow;
Expand All @@ -41,6 +42,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -85,11 +87,12 @@ private InProcessSideInputContainer(InProcessEvaluationContext context,
}

/**
* Return a view of this {@link InProcessSideInputContainer} that contains only the views in
* the provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable without
* Return a view of this {@link InProcessSideInputContainer} that contains only the views in the
* provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable without
* casting, but will change as this {@link InProcessSideInputContainer} is modified.
*/
public SideInputReader createReaderForViews(Collection<PCollectionView<?>> newContainedViews) {
public ReadyCheckingSideInputReader createReaderForViews(
Collection<PCollectionView<?>> newContainedViews) {
if (!containedViews.containsAll(newContainedViews)) {
Set<PCollectionView<?>> currentlyContained = ImmutableSet.copyOf(containedViews);
Set<PCollectionView<?>> newRequested = ImmutableSet.copyOf(newContainedViews);
Expand Down Expand Up @@ -170,41 +173,31 @@ private void updatePCollectionViewWindowValues(
}
}

private final class SideInputContainerSideInputReader implements SideInputReader {
private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader {
private final Collection<PCollectionView<?>> readerViews;

private SideInputContainerSideInputReader(Collection<PCollectionView<?>> readerViews) {
this.readerViews = ImmutableSet.copyOf(readerViews);
}

@Override
public boolean isReady(final PCollectionView<?> view, final BoundedWindow window) {
checkArgument(
readerViews.contains(view),
"Tried to check if view %s was ready in a SideInputReader that does not contain it. "
+ "Contained views; %s",
view,
readerViews);
return getViewFuture(view, window).isDone();
}

@Override
@Nullable
public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
checkArgument(
readerViews.contains(view), "calling get(PCollectionView) with unknown view: " + view);
PCollectionViewWindow<T> windowedView = PCollectionViewWindow.of(view, window);
try {
final SettableFuture<Iterable<? extends WindowedValue<?>>> future =
viewByWindows.get(windowedView);

WindowingStrategy<?, ?> windowingStrategy = view.getWindowingStrategyInternal();
evaluationContext.scheduleAfterOutputWouldBeProduced(
view, window, windowingStrategy, new Runnable() {
@Override
public void run() {
// The requested window has closed without producing elements, so reflect that in
// the PCollectionView. If set has already been called, will do nothing.
future.set(Collections.<WindowedValue<?>>emptyList());
}

@Override
public String toString() {
return MoreObjects.toStringHelper("InProcessSideInputContainerEmptyCallback")
.add("view", view)
.add("window", window)
.toString();
}
});
final Future<Iterable<? extends WindowedValue<?>>> future = getViewFuture(view, window);
// Safe covariant cast
@SuppressWarnings("unchecked")
Iterable<WindowedValue<?>> values = (Iterable<WindowedValue<?>>) future.get();
Expand All @@ -217,6 +210,23 @@ public String toString() {
}
}

/**
* Gets the future containing the contents of the provided {@link PCollectionView} in the
* provided {@link BoundedWindow}, setting up a callback to populate the future with empty
* contents if necessary.
*/
private <T> Future<Iterable<? extends WindowedValue<?>>> getViewFuture(
final PCollectionView<T> view, final BoundedWindow window) {
PCollectionViewWindow<T> windowedView = PCollectionViewWindow.of(view, window);
final SettableFuture<Iterable<? extends WindowedValue<?>>> future =
viewByWindows.getUnchecked(windowedView);

WindowingStrategy<?, ?> windowingStrategy = view.getWindowingStrategyInternal();
evaluationContext.scheduleAfterOutputWouldBeProduced(
view, window, windowingStrategy, new WriteEmptyViewContents(view, window, future));
return future;
}

@Override
public <T> boolean contains(PCollectionView<T> view) {
return readerViews.contains(view);
Expand All @@ -227,4 +237,32 @@ public boolean isEmpty() {
return readerViews.isEmpty();
}
}

private static class WriteEmptyViewContents implements Runnable {
private final PCollectionView<?> view;
private final BoundedWindow window;
private final SettableFuture<Iterable<? extends WindowedValue<?>>> future;

private WriteEmptyViewContents(PCollectionView<?> view, BoundedWindow window,
SettableFuture<Iterable<? extends WindowedValue<?>>> future) {
this.future = future;
this.view = view;
this.window = window;
}

@Override
public void run() {
// The requested window has closed without producing elements, so reflect that in
// the PCollectionView. If set has already been called, will do nothing.
future.set(Collections.<WindowedValue<?>>emptyList());
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("view", view)
.add("window", window)
.toString();
}
}
}
Loading