Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
Expand Down Expand Up @@ -197,28 +198,10 @@ public <KeyT> void onTimer(
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {

// The effective timestamp is when derived elements will have their timestamp set, if not
// otherwise specified. If this is an event time timer, then they have the timer's output
// timestamp. Otherwise, they are set to the input timestamp, which is by definition
// non-late.
Instant effectiveTimestamp;
switch (timeDomain) {
case EVENT_TIME:
effectiveTimestamp = outputTimestamp;
break;
case PROCESSING_TIME:
case SYNCHRONIZED_PROCESSING_TIME:
effectiveTimestamp = stepContext.timerInternals().currentInputWatermarkTime();
break;

default:
throw new IllegalArgumentException(String.format("Unknown time domain: %s", timeDomain));
}
Preconditions.checkNotNull(outputTimestamp, "outputTimestamp");

OnTimerArgumentProvider<KeyT> argumentProvider =
new OnTimerArgumentProvider<>(
timerId, key, window, timestamp, effectiveTimestamp, timeDomain);
new OnTimerArgumentProvider<>(timerId, key, window, timestamp, outputTimestamp, timeDomain);
invoker.invokeOnTimer(timerId, timerFamilyId, argumentProvider);
}

Expand Down Expand Up @@ -1217,7 +1200,7 @@ public Timer align(Duration period) {
}

/**
* For event time timers the target time should be prior to window GC time. So it return
* For event time timers the target time should be prior to window GC time. So it returns
* min(time to set, GC Time of window).
*/
private Instant minTargetAndGcTime(Instant target) {
Expand All @@ -1237,13 +1220,12 @@ public Timer withOutputTimestamp(Instant outputTimestamp) {
}

/**
*
*
* <ul>
* Ensures that:
* <li>Users can't set {@code outputTimestamp} for processing time timers.
* <li>Event time timers' {@code outputTimestamp} is set before window expiration.
* </ul>
* Ensures that a timer's {@code outputTimestamp} is set at or after the current input timestamp
* (minus allowed timestamp skew if set) and before the max timestamp of the window (plus
* allowed lateness). <br>
* If the outputTimestamp is not set, it is defaulted to either:
* <li>The firing timestamp for timers in the {@link TimeDomain#EVENT_TIME}
* <li>The current element timestamp for other time domains.
*/
private void setAndVerifyOutputTimestamp() {
if (outputTimestamp != null) {
Expand All @@ -1261,30 +1243,26 @@ private void setAndVerifyOutputTimestamp() {
+ "earlier than the timestamp of the current input or timer (%s) minus the "
+ "allowed skew (%s) and no later than %s. See the "
+ "DoFn#getAllowedTimestampSkew() Javadoc for details on changing the "
+ "allowed skew.details on changing the allowed skew.",
+ "allowed skew.",
outputTimestamp,
elementInputTimestamp,
fn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE
? fn.getAllowedTimestampSkew()
: PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
BoundedWindow.TIMESTAMP_MAX_VALUE));
}
}

// Output timestamp is set to the delivery time if not initialized by an user.
if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
} else if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
// The outputTimestamp was unset and this is a timer in the EVENT_TIME domain. The output
// timestamp will be the firing timestamp.
outputTimestamp = target;
}
// For processing timers
if (outputTimestamp == null) {
// For processing timers output timestamp will be:
// 1) timestamp of input element
// OR
// 2) output timestamp of firing timer.
} else {
// The outputTimestamp was unset and this is a timer in the PROCESSING_TIME
// (or SYNCHRONIZED_PROCESSING_TIME) domain. The output timestamp will be the timestamp of
// the element (or timer) setting this timer.
outputTimestamp = elementInputTimestamp;
}

Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
Instant windowExpiry = LateDataUtils.garbageCollectionTime(window, allowedLateness);
if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
checkArgument(
!outputTimestamp.isAfter(windowExpiry),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1025,13 +1025,26 @@ protected void fireTimer(TimerData timerData) {
checkArgument(namespace instanceof WindowNamespace);
BoundedWindow window = ((WindowNamespace) namespace).getWindow();
timerInternals.onFiredOrDeletedTimer(timerData);
Instant effectiveOutputTimestamp;

if (timerData.getDomain() == TimeDomain.EVENT_TIME) {
effectiveOutputTimestamp = timerData.getOutputTimestamp();
} else {
// Flink does not set a watermark hold for the timer's output timestamp, and previous to
// https://github.com/apache/beam/pull/17262 processing time timers did not correctly emit
// elements at their output timestamp. In this case we need to continue doing the wrong thing
// and using the output watermark rather than the firing timestamp. Once flink correctly sets
// a watermark hold for the output timestamp, this should be changed back.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How difficult would it be to get Flink to set a watermark hold for processing-time timers as well? The fact that it doesn't seems like a significant bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be too hard (famous last words?) #17262 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is handled in onNewEventTimeTimer, and doesn't look difficult to do for processing-time timers

effectiveOutputTimestamp = timerInternals.currentOutputWatermarkTime();
}

pushbackDoFnRunner.onTimer(
timerData.getTimerId(),
timerData.getTimerFamilyId(),
keyedStateInternals.getKey(),
window,
timerData.getTimestamp(),
timerData.getOutputTimestamp(),
effectiveOutputTimestamp,
timerData.getDomain());
}

Expand Down