From 12a84d4c0559bc262e3feba6632f9156100606bc Mon Sep 17 00:00:00 2001 From: tuyarer Date: Sat, 16 Dec 2023 14:07:41 -0800 Subject: [PATCH 1/2] Enable Backlog Metrics on Flink Metrics --- .../flink/metrics/ReaderInvocationUtil.java | 14 ++++++++++++++ .../streaming/io/UnboundedSourceWrapper.java | 6 ++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java index 60b84e63263f..c94cb7b78ae0 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java @@ -69,4 +69,18 @@ public boolean invokeAdvance(ReaderT reader) throws IOException { return reader.advance(); } } + + public UnboundedSource.CheckpointMark invokeCheckpointMark( + UnboundedSource.UnboundedReader reader) throws IOException { + if (enableMetrics) { + try (Closeable ignored = + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) { + UnboundedSource.CheckpointMark result = reader.getCheckpointMark(); + container.updateMetrics(stepName); + return result; + } + } else { + return reader.getCheckpointMark(); + } + } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 961d31a75370..6ee80e473acc 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -142,6 +142,8 @@ public class UnboundedSourceWrapper> readerInvoker; @SuppressWarnings("unchecked") public UnboundedSourceWrapper( @@ -229,7 +231,7 @@ public void run(SourceContext>> ctx) th context = ctx; - ReaderInvocationUtil> readerInvoker = + readerInvoker = new ReaderInvocationUtil<>(stepName, serializedOptions.get(), metricContainer); setNextWatermarkTimer(this.runtimeContext); @@ -408,7 +410,7 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throw UnboundedSource.UnboundedReader reader = localReaders.get(i); @SuppressWarnings("unchecked") - CheckpointMarkT mark = (CheckpointMarkT) reader.getCheckpointMark(); + CheckpointMarkT mark = (CheckpointMarkT) readerInvoker.invokeCheckpointMark(reader); checkpointMarks.add(mark); KV, CheckpointMarkT> kv = KV.of(source, mark); stateForCheckpoint.add(kv); From 792e667aba3ceb1e2e213773c9353f67fe0562d2 Mon Sep 17 00:00:00 2001 From: tuyarer Date: Sat, 16 Dec 2023 16:27:05 -0800 Subject: [PATCH 2/2] Missing Imports --- .../apache/beam/runners/flink/metrics/ReaderInvocationUtil.java | 1 + .../wrappers/streaming/io/UnboundedSourceWrapper.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java index c94cb7b78ae0..75c9636830e2 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java @@ -22,6 +22,7 @@ import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 6ee80e473acc..39b0482ad1cd 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -143,7 +143,7 @@ public class UnboundedSourceWrapper> readerInvoker; + private transient ReaderInvocationUtil> readerInvoker; @SuppressWarnings("unchecked") public UnboundedSourceWrapper(