Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.View;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
Expand All @@ -79,6 +80,7 @@
import com.google.cloud.dataflow.sdk.values.TypedPValue;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -548,6 +550,7 @@ public void addStep(PTransform<?, ?> transform, String type) {
currentStep.setKind(type);
steps.add(currentStep);
addInput(PropertyNames.USER_NAME, getFullName(transform));
addDisplayData(PropertyNames.DISPLAY_DATA, DisplayData.from(transform));
}

@Override
Expand Down Expand Up @@ -725,6 +728,15 @@ private void addOutput(String name, PValue value, Coder<?> valueCoder) {
outputInfoList.add(outputInfo);
}

private void addDisplayData(String name, DisplayData displayData) {
List<Map<String, Object>> serializedItems = Lists.newArrayList();
for (DisplayData.Item item : displayData.items()) {
serializedItems.add(MAPPER.convertValue(item, Map.class));
}

addList(getProperties(), name, serializedItems);
}

@Override
public OutputReference asOutputReference(PValue value) {
AppliedPTransform<?, ?, ?> transform =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.dataflow.sdk.values.PValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -994,7 +995,7 @@ private TransformWatermarks(
* Returns the input watermark of the {@link AppliedPTransform}.
*/
public Instant getInputWatermark() {
return inputWatermark.get();
return Preconditions.checkNotNull(inputWatermark.get());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public Instant currentSynchronizedProcessingTime() {
}

@Override
@Nullable
public Instant currentInputWatermarkTime() {
return watermarks.getInputWatermark();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonInclude;

import org.apache.avro.reflect.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -214,10 +217,12 @@ private Item(
this.label = label;
}

@JsonGetter("namespace")
public String getNamespace() {
return ns;
}

@JsonGetter("key")
public String getKey() {
return key;
}
Expand All @@ -226,13 +231,15 @@ public String getKey() {
* Retrieve the {@link DisplayData.Type} of display metadata. All metadata conforms to a
* predefined set of allowed types.
*/
@JsonGetter("type")
public Type getType() {
return type;
}

/**
* Retrieve the value of the metadata item.
*/
@JsonGetter("value")
public String getValue() {
return value;
}
Expand All @@ -244,6 +251,8 @@ public String getValue() {
* <p>Some display data types will not provide a short value, in which case the return value
* will be null.
*/
@JsonGetter("shortValue")
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
public String getShortValue() {
return shortValue;
Expand All @@ -255,6 +264,8 @@ public String getShortValue() {
*
* <p>If no label was specified, this will return {@code null}.
*/
@JsonGetter("label")
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
public String getLabel() {
return label;
Expand All @@ -266,8 +277,10 @@ public String getLabel() {
*
* <p>If no URL was specified, this will return {@code null}.
*/
@JsonGetter("linkUrl")
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
public String getUrl() {
public String getLinkUrl() {
return url;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.GlobalCombineFn;
import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.CombineFnWithContext;
import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.Context;
import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import com.google.cloud.dataflow.sdk.util.state.StateContext;
Expand All @@ -46,6 +49,60 @@ public class CombineFnUtil {
return new NonSerializableBoundedKeyedCombineFn<>(combineFn, context);
}

/**
* Return a {@link CombineFnWithContext} from the given {@link GlobalCombineFn}.
*/
public static <InputT, AccumT, OutputT>
CombineFnWithContext<InputT, AccumT, OutputT> toFnWithContext(
GlobalCombineFn<InputT, AccumT, OutputT> globalCombineFn) {
if (globalCombineFn instanceof CombineFnWithContext) {
@SuppressWarnings("unchecked")
CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext =
(CombineFnWithContext<InputT, AccumT, OutputT>) globalCombineFn;
return combineFnWithContext;
} else {
@SuppressWarnings("unchecked")
final CombineFn<InputT, AccumT, OutputT> combineFn =
(CombineFn<InputT, AccumT, OutputT>) globalCombineFn;
return new CombineFnWithContext<InputT, AccumT, OutputT>() {
@Override
public AccumT createAccumulator(Context c) {
return combineFn.createAccumulator();
}
@Override
public AccumT addInput(AccumT accumulator, InputT input, Context c) {
return combineFn.addInput(accumulator, input);
}
@Override
public AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c) {
return combineFn.mergeAccumulators(accumulators);
}
@Override
public OutputT extractOutput(AccumT accumulator, Context c) {
return combineFn.extractOutput(accumulator);
}
@Override
public AccumT compact(AccumT accumulator, Context c) {
return combineFn.compact(accumulator);
}
@Override
public OutputT defaultValue() {
return combineFn.defaultValue();
}
@Override
public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
throws CannotProvideCoderException {
return combineFn.getAccumulatorCoder(registry, inputCoder);
}
@Override
public Coder<OutputT> getDefaultOutputCoder(
CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException {
return combineFn.getDefaultOutputCoder(registry, inputCoder);
}
};
}
}

private static class NonSerializableBoundedKeyedCombineFn<K, InputT, AccumT, OutputT>
extends KeyedCombineFn<K, InputT, AccumT, OutputT> {
private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ public boolean apply(WindowedValue<InputT> input) {
/** Is {@code window} expired w.r.t. the garbage collection watermark? */
private boolean canDropDueToExpiredWindow(BoundedWindow window) {
Instant inputWM = timerInternals.currentInputWatermarkTime();
return inputWM != null
&& window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM);
return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private <W> PaneInfo describePane(
boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY;

// True is the input watermark hasn't passed the window's max timestamp.
boolean isEarlyForInput = inputWM == null || !inputWM.isAfter(windowMaxTimestamp);
boolean isEarlyForInput = !inputWM.isAfter(windowMaxTimestamp);

Timing timing;
if (isLateForOutput || !onlyEarlyPanesSoFar) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,5 @@ public class PropertyNames {
public static final String VALIDATE_SINK = "validate_sink";
public static final String VALIDATE_SOURCE = "validate_source";
public static final String VALUE = "value";
public static final String DISPLAY_DATA = "display_data";
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ public Instant currentSynchronizedProcessingTime() {
}

@Override
@Nullable
public Instant currentEventTime() {
return timerInternals.currentInputWatermarkTime();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,11 @@ private Collection<W> processElement(WindowedValue<InputT> value) throws Excepti
directContext.timestamp(),
directContext.timers(),
directContext.state());

// At this point, if triggerRunner.shouldFire before the processValue then
// triggerRunner.shouldFire after the processValue. In other words adding values
// cannot take a trigger state from firing to non-firing.
// (We don't actually assert this since it is too slow.)
}

return windows;
Expand Down Expand Up @@ -532,6 +537,10 @@ public void onTimer(TimerData timer) throws Exception {
"ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window);
}

// If this is an end-of-window timer then, we need to set a GC timer
boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
&& timer.getTimestamp().equals(window.maxTimestamp());

// If this is a garbage collection timer then we should trigger and garbage collect the window.
Instant cleanupTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
boolean isGarbageCollection =
Expand All @@ -548,7 +557,7 @@ public void onTimer(TimerData timer) throws Exception {
// We need to call onTrigger to emit the final pane if required.
// The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
// and the watermark has passed the end of the window.
onTrigger(directContext, renamedContext, true/* isFinished */);
onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow);
}

// Cleanup flavor B: Clear all the remaining state for this window since we'll never
Expand All @@ -564,10 +573,12 @@ public void onTimer(TimerData timer) throws Exception {
emitIfAppropriate(directContext, renamedContext);
}

// If this is an end-of-window timer then, we need to set a GC timer
boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
&& timer.getTimestamp().equals(window.maxTimestamp());
if (isEndOfWindow) {
// If the window strategy trigger includes a watermark trigger then at this point
// there should be no data holds, either because we'd already cleared them on an
// earlier onTrigger, or because we just cleared them on the above emitIfAppropriate.
// We could assert this but it is very expensive.

// Since we are processing an on-time firing we should schedule the garbage collection
// timer. (If getAllowedLateness is zero then the timer event will be considered a
// cleanup event and handled by the above).
Expand Down Expand Up @@ -666,7 +677,7 @@ private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context directCon
// Run onTrigger to produce the actual pane contents.
// As a side effect it will clear all element holds, but not necessarily any
// end-of-window or garbage collection holds.
onTrigger(directContext, renamedContext, isFinished);
onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/);

// Now that we've triggered, the pane is empty.
nonEmptyPanes.clearPane(renamedContext.state());
Expand Down Expand Up @@ -713,10 +724,12 @@ private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing
private void onTrigger(
final ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
boolean isFinished)
boolean isFinished, boolean isEndOfWindow)
throws Exception {
Instant inputWM = timerInternals.currentInputWatermarkTime();

// Prefetch necessary states
ReadableState<Instant> outputTimestampFuture =
ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture =
watermarkHold.extractAndRelease(renamedContext, isFinished).readLater();
ReadableState<PaneInfo> paneFuture =
paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
Expand All @@ -729,7 +742,41 @@ private void onTrigger(
// Calculate the pane info.
final PaneInfo pane = paneFuture.read();
// Extract the window hold, and as a side effect clear it.
final Instant outputTimestamp = outputTimestampFuture.read();

WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read();
final Instant outputTimestamp = pair.oldHold;
@Nullable Instant newHold = pair.newHold;

if (newHold != null) {
// We can't be finished yet.
Preconditions.checkState(
!isFinished, "new hold at %s but finished %s", newHold, directContext.window());
// The hold cannot be behind the input watermark.
Preconditions.checkState(
!newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM);
if (newHold.isAfter(directContext.window().maxTimestamp())) {
// The hold must be for garbage collection, which can't have happened yet.
Preconditions.checkState(
newHold.isEqual(
directContext.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness())),
"new hold %s should be at garbage collection for window %s plus %s",
newHold,
directContext.window(),
windowingStrategy.getAllowedLateness());
} else {
// The hold must be for the end-of-window, which can't have happened yet.
Preconditions.checkState(
newHold.isEqual(directContext.window().maxTimestamp()),
"new hold %s should be at end of window %s",
newHold,
directContext.window());
Preconditions.checkState(
!isEndOfWindow,
"new hold at %s for %s but this is the watermark trigger",
newHold,
directContext.window());
}
}

// Only emit a pane if it has data or empty panes are observable.
if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {
Expand Down Expand Up @@ -778,7 +825,7 @@ private Instant scheduleEndOfWindowOrGarbageCollectionTimer(
Instant endOfWindow = directContext.window().maxTimestamp();
Instant fireTime;
String which;
if (inputWM != null && endOfWindow.isBefore(inputWM)) {
if (endOfWindow.isBefore(inputWM)) {
fireTime = endOfWindow.plus(windowingStrategy.getAllowedLateness());
which = "garbage collection";
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ public interface TimerInternals {

/**
* Return the current, local input watermark timestamp for this computation
* in the {@link TimeDomain#EVENT_TIME} time domain. Return {@code null} if unknown.
* in the {@link TimeDomain#EVENT_TIME} time domain.
*
* <p>This value:
* <ol>
* <li>Is never {@literal null}, but may be {@link BoundedWindow#TIMESTAMP_MIN_VALUE}.
* <li>Is monotonically increasing.
* <li>May differ between workers due to network and other delays.
* <li>Will never be ahead of the global input watermark for this computation. But it
Expand All @@ -95,7 +96,6 @@ public interface TimerInternals {
* it is possible for an element to be considered locally on-time even though it is
* globally late.
*/
@Nullable
Instant currentInputWatermarkTime();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public interface Timers {
@Nullable
public abstract Instant currentSynchronizedProcessingTime();

/** Returns the current event time or {@code null} if unknown. */
@Nullable
/** Returns the current event time. */
public abstract Instant currentEventTime();
}
Loading