diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 9a8ec278e8ee..a73dd521f864 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -58,6 +58,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; @@ -197,28 +198,10 @@ public void onTimer( Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) { - - // The effective timestamp is when derived elements will have their timestamp set, if not - // otherwise specified. If this is an event time timer, then they have the timer's output - // timestamp. Otherwise, they are set to the input timestamp, which is by definition - // non-late. - Instant effectiveTimestamp; - switch (timeDomain) { - case EVENT_TIME: - effectiveTimestamp = outputTimestamp; - break; - case PROCESSING_TIME: - case SYNCHRONIZED_PROCESSING_TIME: - effectiveTimestamp = stepContext.timerInternals().currentInputWatermarkTime(); - break; - - default: - throw new IllegalArgumentException(String.format("Unknown time domain: %s", timeDomain)); - } + Preconditions.checkNotNull(outputTimestamp, "outputTimestamp"); OnTimerArgumentProvider argumentProvider = - new OnTimerArgumentProvider<>( - timerId, key, window, timestamp, effectiveTimestamp, timeDomain); + new OnTimerArgumentProvider<>(timerId, key, window, timestamp, outputTimestamp, timeDomain); invoker.invokeOnTimer(timerId, timerFamilyId, argumentProvider); } @@ -1217,7 +1200,7 @@ public Timer align(Duration period) { } /** - * For event time timers the target time should be prior to window GC time. So it return + * For event time timers the target time should be prior to window GC time. So it returns * min(time to set, GC Time of window). */ private Instant minTargetAndGcTime(Instant target) { @@ -1237,13 +1220,12 @@ public Timer withOutputTimestamp(Instant outputTimestamp) { } /** - * - * - * + * Ensures that a timer's {@code outputTimestamp} is set at or after the current input timestamp + * (minus allowed timestamp skew if set) and before the max timestamp of the window (plus + * allowed lateness).
+ * If the outputTimestamp is not set, it is defaulted to either: + *
  • The firing timestamp for timers in the {@link TimeDomain#EVENT_TIME} + *
  • The current element timestamp for other time domains. */ private void setAndVerifyOutputTimestamp() { if (outputTimestamp != null) { @@ -1261,7 +1243,7 @@ private void setAndVerifyOutputTimestamp() { + "earlier than the timestamp of the current input or timer (%s) minus the " + "allowed skew (%s) and no later than %s. See the " + "DoFn#getAllowedTimestampSkew() Javadoc for details on changing the " - + "allowed skew.details on changing the allowed skew.", + + "allowed skew.", outputTimestamp, elementInputTimestamp, fn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE @@ -1269,22 +1251,18 @@ private void setAndVerifyOutputTimestamp() { : PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()), BoundedWindow.TIMESTAMP_MAX_VALUE)); } - } - - // Output timestamp is set to the delivery time if not initialized by an user. - if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { + } else if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { + // The outputTimestamp was unset and this is a timer in the EVENT_TIME domain. The output + // timestamp will be the firing timestamp. outputTimestamp = target; - } - // For processing timers - if (outputTimestamp == null) { - // For processing timers output timestamp will be: - // 1) timestamp of input element - // OR - // 2) output timestamp of firing timer. + } else { + // The outputTimestamp was unset and this is a timer in the PROCESSING_TIME + // (or SYNCHRONIZED_PROCESSING_TIME) domain. The output timestamp will be the timestamp of + // the element (or timer) setting this timer. outputTimestamp = elementInputTimestamp; } - Instant windowExpiry = window.maxTimestamp().plus(allowedLateness); + Instant windowExpiry = LateDataUtils.garbageCollectionTime(window, allowedLateness); if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { checkArgument( !outputTimestamp.isAfter(windowExpiry), diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index a8559983de84..bc82b7be42b7 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -1025,13 +1025,26 @@ protected void fireTimer(TimerData timerData) { checkArgument(namespace instanceof WindowNamespace); BoundedWindow window = ((WindowNamespace) namespace).getWindow(); timerInternals.onFiredOrDeletedTimer(timerData); + Instant effectiveOutputTimestamp; + + if (timerData.getDomain() == TimeDomain.EVENT_TIME) { + effectiveOutputTimestamp = timerData.getOutputTimestamp(); + } else { + // Flink does not set a watermark hold for the timer's output timestamp, and previous to + // https://github.com/apache/beam/pull/17262 processing time timers did not correctly emit + // elements at their output timestamp. In this case we need to continue doing the wrong thing + // and using the output watermark rather than the firing timestamp. Once flink correctly sets + // a watermark hold for the output timestamp, this should be changed back. + effectiveOutputTimestamp = timerInternals.currentOutputWatermarkTime(); + } + pushbackDoFnRunner.onTimer( timerData.getTimerId(), timerData.getTimerFamilyId(), keyedStateInternals.getKey(), window, timerData.getTimestamp(), - timerData.getOutputTimestamp(), + effectiveOutputTimestamp, timerData.getDomain()); }