From 6079c550ff6354dddda010c61c7db851b97e1da3 Mon Sep 17 00:00:00 2001 From: tuyarer Date: Sat, 18 Feb 2023 20:56:51 -0800 Subject: [PATCH 1/2] Set MetricContext for checkpointing --- .../flink/metrics/ReaderInvocationUtil.java | 15 +++++++++++++++ .../streaming/io/UnboundedSourceWrapper.java | 5 ++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java index 736a2dd9da59..04033e35ccd5 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java @@ -22,6 +22,7 @@ import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; @@ -69,4 +70,18 @@ public boolean invokeAdvance(ReaderT reader) throws IOException { return reader.advance(); } } + + public UnboundedSource.CheckpointMark invokeCheckpointMark(UnboundedSource.UnboundedReader reader) + throws IOException { + if (enableMetrics) { + try (Closeable ignored = + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) { + UnboundedSource.CheckpointMark result = reader.getCheckpointMark(); + container.updateMetrics(stepName); + return result; + } + } else { + return reader.getCheckpointMark(); + } + } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 92d0652e11f8..c8994505a009 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -391,6 +391,9 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throw return; } + ReaderInvocationUtil> readerInvoker = + new ReaderInvocationUtil<>(stepName, serializedOptions.get(), metricContainer); + stateForCheckpoint.clear(); long checkpointId = functionSnapshotContext.getCheckpointId(); @@ -405,7 +408,7 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throw UnboundedSource.UnboundedReader reader = localReaders.get(i); @SuppressWarnings("unchecked") - CheckpointMarkT mark = (CheckpointMarkT) reader.getCheckpointMark(); + CheckpointMarkT mark = (CheckpointMarkT) readerInvoker.invokeCheckpointMark(reader); checkpointMarks.add(mark); KV, CheckpointMarkT> kv = KV.of(source, mark); stateForCheckpoint.add(kv); From fd383fae1adc545b6b6a22b274902cda956fec49 Mon Sep 17 00:00:00 2001 From: tuyarer Date: Sat, 18 Feb 2023 22:17:32 -0800 Subject: [PATCH 2/2] Spotless Fix --- .../beam/runners/flink/metrics/ReaderInvocationUtil.java | 4 ++-- .../wrappers/streaming/io/UnboundedSourceWrapper.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java index 04033e35ccd5..1b52354bf01a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java @@ -71,8 +71,8 @@ public boolean invokeAdvance(ReaderT reader) throws IOException { } } - public UnboundedSource.CheckpointMark invokeCheckpointMark(UnboundedSource.UnboundedReader reader) - throws IOException { + public UnboundedSource.CheckpointMark invokeCheckpointMark( + UnboundedSource.UnboundedReader reader) throws IOException { if (enableMetrics) { try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index c8994505a009..cd187b4da02d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -392,7 +392,7 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throw } ReaderInvocationUtil> readerInvoker = - new ReaderInvocationUtil<>(stepName, serializedOptions.get(), metricContainer); + new ReaderInvocationUtil<>(stepName, serializedOptions.get(), metricContainer); stateForCheckpoint.clear();