From 7b950b54113c3b928647d5c5929838b4869ae41d Mon Sep 17 00:00:00 2001 From: Steve Niemitz Date: Mon, 4 Apr 2022 09:39:46 -0400 Subject: [PATCH 1/5] Use the supplied output timestamp for processing time timers rather than the input watermark --- .../org/apache/beam/runners/core/SimpleDoFnRunner.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 9a8ec278e8ee..cf187b9aaf67 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -209,7 +209,12 @@ public void onTimer( break; case PROCESSING_TIME: case SYNCHRONIZED_PROCESSING_TIME: - effectiveTimestamp = stepContext.timerInternals().currentInputWatermarkTime(); + Instant outputWatermark = stepContext.timerInternals().currentOutputWatermarkTime(); + Instant inputWatermark = stepContext.timerInternals().currentInputWatermarkTime(); + effectiveTimestamp = + outputTimestamp != null + ? outputTimestamp + : outputWatermark != null ? outputWatermark : inputWatermark; break; default: From 252eb1c50d5121b342484362adab26224238560b Mon Sep 17 00:00:00 2001 From: Steve Niemitz Date: Mon, 4 Apr 2022 11:53:36 -0400 Subject: [PATCH 2/5] fixed flink test --- .../wrappers/streaming/DoFnOperatorTest.java | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java index acfede826e79..0f0beebc4661 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java @@ -305,7 +305,10 @@ public void processElement( eventTimerWithOutputTimestamp .withOutputTimestamp(timerOutputTimestamp) .set(timerTimestamp); - processingTimer.offset(Duration.millis(timerTimestamp.getMillis())).setRelative(); + processingTimer + .withOutputTimestamp(timerOutputTimestamp) + .offset(Duration.millis(timerTimestamp.getMillis())) + .setRelative(); } @OnTimer(eventTimerId) @@ -327,10 +330,8 @@ public void onEventTime2(OnTimerContext context) { @OnTimer(processingTimerId) public void onProcessingTime(OnTimerContext context) { assertEquals( - // Timestamps in processing timer context are defined to be the input watermark - // See SimpleDoFnRunner#onTimer - "Timer timestamp must match current input watermark", - timerTimestamp.plus(Duration.millis(1)), + "Timer timestamp must match set timestamp.", + timerOutputTimestamp, context.timestamp()); context.outputWithTimestamp(processingTimeMessage, context.timestamp()); } @@ -426,12 +427,7 @@ public void onProcessingTime(OnTimerContext context) { stripStreamRecordFromWindowedValue(testHarness.getOutput()), contains( WindowedValue.of( - // Timestamps in processing timer context are defined to be the input watermark - // See SimpleDoFnRunner#onTimer - processingTimeMessage, - timerTimestamp.plus(Duration.millis(1)), - window1, - PaneInfo.NO_FIRING))); + processingTimeMessage, timerOutputTimestamp, window1, PaneInfo.NO_FIRING))); testHarness.close(); } From 277f005eff20676f3211d5d86459bc98348de75d Mon Sep 17 00:00:00 2001 From: Steve Niemitz Date: Wed, 6 Apr 2022 08:56:40 -0400 Subject: [PATCH 3/5] make flink use the old behavior until it correctly sets a watermark hold at the output timestamp for processing time timers --- .../wrappers/streaming/DoFnOperator.java | 15 ++++++++++++++- .../wrappers/streaming/DoFnOperatorTest.java | 18 +++++++++++------- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index a8559983de84..bc82b7be42b7 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -1025,13 +1025,26 @@ protected void fireTimer(TimerData timerData) { checkArgument(namespace instanceof WindowNamespace); BoundedWindow window = ((WindowNamespace) namespace).getWindow(); timerInternals.onFiredOrDeletedTimer(timerData); + Instant effectiveOutputTimestamp; + + if (timerData.getDomain() == TimeDomain.EVENT_TIME) { + effectiveOutputTimestamp = timerData.getOutputTimestamp(); + } else { + // Flink does not set a watermark hold for the timer's output timestamp, and previous to + // https://github.com/apache/beam/pull/17262 processing time timers did not correctly emit + // elements at their output timestamp. In this case we need to continue doing the wrong thing + // and using the output watermark rather than the firing timestamp. Once flink correctly sets + // a watermark hold for the output timestamp, this should be changed back. + effectiveOutputTimestamp = timerInternals.currentOutputWatermarkTime(); + } + pushbackDoFnRunner.onTimer( timerData.getTimerId(), timerData.getTimerFamilyId(), keyedStateInternals.getKey(), window, timerData.getTimestamp(), - timerData.getOutputTimestamp(), + effectiveOutputTimestamp, timerData.getDomain()); } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java index 0f0beebc4661..acfede826e79 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java @@ -305,10 +305,7 @@ public void processElement( eventTimerWithOutputTimestamp .withOutputTimestamp(timerOutputTimestamp) .set(timerTimestamp); - processingTimer - .withOutputTimestamp(timerOutputTimestamp) - .offset(Duration.millis(timerTimestamp.getMillis())) - .setRelative(); + processingTimer.offset(Duration.millis(timerTimestamp.getMillis())).setRelative(); } @OnTimer(eventTimerId) @@ -330,8 +327,10 @@ public void onEventTime2(OnTimerContext context) { @OnTimer(processingTimerId) public void onProcessingTime(OnTimerContext context) { assertEquals( - "Timer timestamp must match set timestamp.", - timerOutputTimestamp, + // Timestamps in processing timer context are defined to be the input watermark + // See SimpleDoFnRunner#onTimer + "Timer timestamp must match current input watermark", + timerTimestamp.plus(Duration.millis(1)), context.timestamp()); context.outputWithTimestamp(processingTimeMessage, context.timestamp()); } @@ -427,7 +426,12 @@ public void onProcessingTime(OnTimerContext context) { stripStreamRecordFromWindowedValue(testHarness.getOutput()), contains( WindowedValue.of( - processingTimeMessage, timerOutputTimestamp, window1, PaneInfo.NO_FIRING))); + // Timestamps in processing timer context are defined to be the input watermark + // See SimpleDoFnRunner#onTimer + processingTimeMessage, + timerTimestamp.plus(Duration.millis(1)), + window1, + PaneInfo.NO_FIRING))); testHarness.close(); } From f968ad988faae6453fe2b78bc5957d3add9e9eb2 Mon Sep 17 00:00:00 2001 From: Steve Niemitz Date: Wed, 6 Apr 2022 09:19:47 -0400 Subject: [PATCH 4/5] clean up setAndVerifyOutputTimestamp --- .../beam/runners/core/SimpleDoFnRunner.java | 37 ++++++++----------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index cf187b9aaf67..17f002db8a94 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -1222,7 +1222,7 @@ public Timer align(Duration period) { } /** - * For event time timers the target time should be prior to window GC time. So it return + * For event time timers the target time should be prior to window GC time. So it returns * min(time to set, GC Time of window). */ private Instant minTargetAndGcTime(Instant target) { @@ -1242,13 +1242,12 @@ public Timer withOutputTimestamp(Instant outputTimestamp) { } /** - * - * - *
    - * Ensures that: - *
  • Users can't set {@code outputTimestamp} for processing time timers. - *
  • Event time timers' {@code outputTimestamp} is set before window expiration. - *
+ * Ensures that a timer's {@code outputTimestamp} is set at or after the current input timestamp + * (minus allowed timestamp skew if set) and before the max timestamp of the window (plus + * allowed lateness).
+ * If the outputTimestamp is not set, it is defaulted to either: + *
  • The firing timestamp for timers in the {@link TimeDomain#EVENT_TIME} + *
  • The current element timestamp for other time domains. */ private void setAndVerifyOutputTimestamp() { if (outputTimestamp != null) { @@ -1266,7 +1265,7 @@ private void setAndVerifyOutputTimestamp() { + "earlier than the timestamp of the current input or timer (%s) minus the " + "allowed skew (%s) and no later than %s. See the " + "DoFn#getAllowedTimestampSkew() Javadoc for details on changing the " - + "allowed skew.details on changing the allowed skew.", + + "allowed skew.", outputTimestamp, elementInputTimestamp, fn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE @@ -1274,22 +1273,18 @@ private void setAndVerifyOutputTimestamp() { : PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()), BoundedWindow.TIMESTAMP_MAX_VALUE)); } - } - - // Output timestamp is set to the delivery time if not initialized by an user. - if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { + } else if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { + // The outputTimestamp was unset and this is a timer in the EVENT_TIME domain. The output + // timestamp will be the firing timestamp. outputTimestamp = target; - } - // For processing timers - if (outputTimestamp == null) { - // For processing timers output timestamp will be: - // 1) timestamp of input element - // OR - // 2) output timestamp of firing timer. + } else { + // The outputTimestamp was unset and this is a timer in the PROCESSING_TIME + // (or SYNCHRONIZED_PROCESSING_TIME) domain. The output timestamp will be the timestamp of + // the element (or timer) setting this timer. outputTimestamp = elementInputTimestamp; } - Instant windowExpiry = window.maxTimestamp().plus(allowedLateness); + Instant windowExpiry = LateDataUtils.garbageCollectionTime(window, allowedLateness); if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { checkArgument( !outputTimestamp.isAfter(windowExpiry), From 3ba3fe5760537f91eb8e4837bd1d87ec62fc2eb3 Mon Sep 17 00:00:00 2001 From: Steve Niemitz Date: Wed, 6 Apr 2022 12:41:50 -0400 Subject: [PATCH 5/5] simplify output timestamp in timer invocation --- .../beam/runners/core/SimpleDoFnRunner.java | 28 ++----------------- 1 file changed, 3 insertions(+), 25 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 17f002db8a94..a73dd521f864 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -58,6 +58,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; @@ -197,33 +198,10 @@ public void onTimer( Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) { - - // The effective timestamp is when derived elements will have their timestamp set, if not - // otherwise specified. If this is an event time timer, then they have the timer's output - // timestamp. Otherwise, they are set to the input timestamp, which is by definition - // non-late. - Instant effectiveTimestamp; - switch (timeDomain) { - case EVENT_TIME: - effectiveTimestamp = outputTimestamp; - break; - case PROCESSING_TIME: - case SYNCHRONIZED_PROCESSING_TIME: - Instant outputWatermark = stepContext.timerInternals().currentOutputWatermarkTime(); - Instant inputWatermark = stepContext.timerInternals().currentInputWatermarkTime(); - effectiveTimestamp = - outputTimestamp != null - ? outputTimestamp - : outputWatermark != null ? outputWatermark : inputWatermark; - break; - - default: - throw new IllegalArgumentException(String.format("Unknown time domain: %s", timeDomain)); - } + Preconditions.checkNotNull(outputTimestamp, "outputTimestamp"); OnTimerArgumentProvider argumentProvider = - new OnTimerArgumentProvider<>( - timerId, key, window, timestamp, effectiveTimestamp, timeDomain); + new OnTimerArgumentProvider<>(timerId, key, window, timestamp, outputTimestamp, timeDomain); invoker.invokeOnTimer(timerId, timerFamilyId, argumentProvider); }