From 16778333157a6a6ff7d5700b2c856329304e7858 Mon Sep 17 00:00:00 2001 From: herefree <841043203@qq.com> Date: Tue, 26 Nov 2024 17:37:49 +0800 Subject: [PATCH 1/4] support flink sourceIdleTime metric --- .../metrics/FileStoreSourceReaderMetrics.java | 19 +++++++++++++++ .../flink/source/operator/ReadOperator.java | 2 ++ .../FileStoreSourceReaderMetricsTest.java | 24 +++++++++++++++++++ .../source/operator/OperatorSourceTest.java | 10 ++++++++ 4 files changed, 55 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java index 2e1e94777949..6955bfef1827 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java @@ -27,18 +27,22 @@ public class FileStoreSourceReaderMetrics { private long latestFileCreationTime = UNDEFINED; private long lastSplitUpdateTime = UNDEFINED; + private long idleStartTime = ACTIVE; public static final long UNDEFINED = -1L; + private static final long ACTIVE = Long.MAX_VALUE; public FileStoreSourceReaderMetrics(MetricGroup sourceReaderMetricGroup) { sourceReaderMetricGroup.gauge( MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, this::getFetchTimeLag); + sourceReaderMetricGroup.gauge(MetricNames.SOURCE_IDLE_TIME, this::getIdleTime); } /** Called when consumed snapshot changes. */ public void recordSnapshotUpdate(long fileCreationTime) { this.latestFileCreationTime = fileCreationTime; lastSplitUpdateTime = System.currentTimeMillis(); + idleStartTime = ACTIVE; } @VisibleForTesting @@ -57,4 +61,19 @@ public long getLatestFileCreationTime() { long getLastSplitUpdateTime() { return lastSplitUpdateTime; } + + public void idlingStarted() { + if (!isIdling()) { + idleStartTime = System.currentTimeMillis(); + } + } + + boolean isIdling() { + return idleStartTime != ACTIVE; + } + + @VisibleForTesting + long getIdleTime() { + return isIdling() ? System.currentTimeMillis() - idleStartTime : 0; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java index 80c85f7cdb35..89f1f8754251 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java @@ -83,6 +83,7 @@ public void open() throws Exception { this.read = readBuilder.newRead().withIOManager(ioManager); this.reuseRow = new FlinkRowData(null); this.reuseRecord = new StreamRecord<>(reuseRow); + this.sourceReaderMetrics.idlingStarted(); } @Override @@ -113,6 +114,7 @@ public void processElement(StreamRecord record) throws Exception { output.collect(reuseRecord); } } + sourceReaderMetrics.idlingStarted(); } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java index 2012e7a8956c..2eb34dcc7bd7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java @@ -52,4 +52,28 @@ public void testCurrentFetchLagUpdated() { assertThat(sourceReaderMetrics.getFetchTimeLag()) .isNotEqualTo(FileStoreSourceReaderMetrics.UNDEFINED); } + + @Test + public void testSourceIdleTimeUpdated() throws InterruptedException { + MetricListener metricListener = new MetricListener(); + final FileStoreSourceReaderMetrics sourceReaderMetrics = + new FileStoreSourceReaderMetrics(metricListener.getMetricGroup()); + + assertThat(sourceReaderMetrics.getIdleTime()).isEqualTo(0L); + + // idle start + sourceReaderMetrics.idlingStarted(); + Thread.sleep(10L); + assertThat(sourceReaderMetrics.getIdleTime()).isGreaterThan(9L); + + //non-idle + sourceReaderMetrics.recordSnapshotUpdate(123); + Thread.sleep(10L); + assertThat(sourceReaderMetrics.getIdleTime()).isEqualTo(0L); + + // idle start + sourceReaderMetrics.idlingStarted(); + Thread.sleep(10L); + assertThat(sourceReaderMetrics.getIdleTime()).isGreaterThan(9L); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java index 61a03a29a21b..c5abe5eb32a4 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java @@ -204,6 +204,11 @@ public void testReadOperatorMetricsRegisterAndUpdate() throws Exception { .getValue()) .isEqualTo(-1L); + Thread.sleep(300L); + assertThat((Long)TestingMetricUtils.getGauge( + readerOperatorMetricGroup, "sourceIdleTime") + .getValue()).isGreaterThan(299L); + harness.processElement(new StreamRecord<>(splits.get(0))); assertThat( (Long) @@ -228,6 +233,11 @@ public void testReadOperatorMetricsRegisterAndUpdate() throws Exception { "currentEmitEventTimeLag") .getValue()) .isEqualTo(emitEventTimeLag); + + assertThat((Long)TestingMetricUtils.getGauge( + readerOperatorMetricGroup, "sourceIdleTime") + .getValue()).isGreaterThan(99L).isLessThan(300L); + } private T testReadSplit( From e419ef648fc91e028ce80ee3d60ea6ae35b87328 Mon Sep 17 00:00:00 2001 From: herefree <841043203@qq.com> Date: Tue, 26 Nov 2024 17:42:47 +0800 Subject: [PATCH 2/4] fix --- .../FileStoreSourceReaderMetricsTest.java | 2 +- .../source/operator/OperatorSourceTest.java | 20 ++++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java index 2eb34dcc7bd7..ed43cd0cd237 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java @@ -66,7 +66,7 @@ public void testSourceIdleTimeUpdated() throws InterruptedException { Thread.sleep(10L); assertThat(sourceReaderMetrics.getIdleTime()).isGreaterThan(9L); - //non-idle + // non-idle sourceReaderMetrics.recordSnapshotUpdate(123); Thread.sleep(10L); assertThat(sourceReaderMetrics.getIdleTime()).isEqualTo(0L); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java index c5abe5eb32a4..0bce8c8901ea 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java @@ -205,9 +205,12 @@ public void testReadOperatorMetricsRegisterAndUpdate() throws Exception { .isEqualTo(-1L); Thread.sleep(300L); - assertThat((Long)TestingMetricUtils.getGauge( - readerOperatorMetricGroup, "sourceIdleTime") - .getValue()).isGreaterThan(299L); + assertThat( + (Long) + TestingMetricUtils.getGauge( + readerOperatorMetricGroup, "sourceIdleTime") + .getValue()) + .isGreaterThan(299L); harness.processElement(new StreamRecord<>(splits.get(0))); assertThat( @@ -234,10 +237,13 @@ public void testReadOperatorMetricsRegisterAndUpdate() throws Exception { .getValue()) .isEqualTo(emitEventTimeLag); - assertThat((Long)TestingMetricUtils.getGauge( - readerOperatorMetricGroup, "sourceIdleTime") - .getValue()).isGreaterThan(99L).isLessThan(300L); - + assertThat( + (Long) + TestingMetricUtils.getGauge( + readerOperatorMetricGroup, "sourceIdleTime") + .getValue()) + .isGreaterThan(99L) + .isLessThan(300L); } private T testReadSplit( From dc8823a03f49647ae8c6b7de70401a5e7f31f192 Mon Sep 17 00:00:00 2001 From: herefree <841043203@qq.com> Date: Thu, 5 Dec 2024 16:41:46 +0800 Subject: [PATCH 3/4] support idleStartTime in readOperator --- .../metrics/FileStoreSourceReaderMetrics.java | 19 -------------- .../flink/source/operator/ReadOperator.java | 26 ++++++++++++++++--- .../FileStoreSourceReaderMetricsTest.java | 24 ----------------- 3 files changed, 23 insertions(+), 46 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java index 6955bfef1827..2e1e94777949 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java @@ -27,22 +27,18 @@ public class FileStoreSourceReaderMetrics { private long latestFileCreationTime = UNDEFINED; private long lastSplitUpdateTime = UNDEFINED; - private long idleStartTime = ACTIVE; public static final long UNDEFINED = -1L; - private static final long ACTIVE = Long.MAX_VALUE; public FileStoreSourceReaderMetrics(MetricGroup sourceReaderMetricGroup) { sourceReaderMetricGroup.gauge( MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, this::getFetchTimeLag); - sourceReaderMetricGroup.gauge(MetricNames.SOURCE_IDLE_TIME, this::getIdleTime); } /** Called when consumed snapshot changes. */ public void recordSnapshotUpdate(long fileCreationTime) { this.latestFileCreationTime = fileCreationTime; lastSplitUpdateTime = System.currentTimeMillis(); - idleStartTime = ACTIVE; } @VisibleForTesting @@ -61,19 +57,4 @@ public long getLatestFileCreationTime() { long getLastSplitUpdateTime() { return lastSplitUpdateTime; } - - public void idlingStarted() { - if (!isIdling()) { - idleStartTime = System.currentTimeMillis(); - } - } - - boolean isIdling() { - return idleStartTime != ACTIVE; - } - - @VisibleForTesting - long getIdleTime() { - return isIdling() ? System.currentTimeMillis() - idleStartTime : 0; - } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java index 89f1f8754251..7828888867ad 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java @@ -54,9 +54,11 @@ public class ReadOperator extends AbstractStreamOperator private transient IOManager ioManager; private transient FileStoreSourceReaderMetrics sourceReaderMetrics; - // we create our own gauge for currentEmitEventTimeLag, because this operator is not a FLIP-27 + // we create our own gauge for currentEmitEventTimeLag and sourceIdleTime, because this operator + // is not a FLIP-27 // source and Flink can't automatically calculate this metric private transient long emitEventTimeLag = FileStoreSourceReaderMetrics.UNDEFINED; + private transient long idleStartTime = FileStoreSourceReaderMetrics.UNDEFINED; private transient Counter numRecordsIn; public ReadOperator(ReadBuilder readBuilder) { @@ -69,6 +71,7 @@ public void open() throws Exception { this.sourceReaderMetrics = new FileStoreSourceReaderMetrics(getMetricGroup()); getMetricGroup().gauge(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG, () -> emitEventTimeLag); + getMetricGroup().gauge(MetricNames.SOURCE_IDLE_TIME, this::getIdleTime); this.numRecordsIn = InternalSourceReaderMetricGroup.wrap(getMetricGroup()) .getIOMetricGroup() @@ -83,7 +86,7 @@ public void open() throws Exception { this.read = readBuilder.newRead().withIOManager(ioManager); this.reuseRow = new FlinkRowData(null); this.reuseRecord = new StreamRecord<>(reuseRow); - this.sourceReaderMetrics.idlingStarted(); + this.idlingStarted(); } @Override @@ -95,6 +98,8 @@ public void processElement(StreamRecord record) throws Exception { .earliestFileCreationEpochMillis() .orElse(FileStoreSourceReaderMetrics.UNDEFINED); sourceReaderMetrics.recordSnapshotUpdate(eventTime); + // update idleStartTime when reading a new split + idleStartTime = FileStoreSourceReaderMetrics.UNDEFINED; boolean firstRecord = true; try (CloseableIterator iterator = @@ -114,7 +119,8 @@ public void processElement(StreamRecord record) throws Exception { output.collect(reuseRecord); } } - sourceReaderMetrics.idlingStarted(); + // start idle when data sending is completed + this.idlingStarted(); } @Override @@ -124,4 +130,18 @@ public void close() throws Exception { ioManager.close(); } } + + private void idlingStarted() { + if (!isIdling()) { + idleStartTime = System.currentTimeMillis(); + } + } + + private boolean isIdling() { + return idleStartTime != FileStoreSourceReaderMetrics.UNDEFINED; + } + + private long getIdleTime() { + return isIdling() ? System.currentTimeMillis() - idleStartTime : 0; + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java index ed43cd0cd237..2012e7a8956c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java @@ -52,28 +52,4 @@ public void testCurrentFetchLagUpdated() { assertThat(sourceReaderMetrics.getFetchTimeLag()) .isNotEqualTo(FileStoreSourceReaderMetrics.UNDEFINED); } - - @Test - public void testSourceIdleTimeUpdated() throws InterruptedException { - MetricListener metricListener = new MetricListener(); - final FileStoreSourceReaderMetrics sourceReaderMetrics = - new FileStoreSourceReaderMetrics(metricListener.getMetricGroup()); - - assertThat(sourceReaderMetrics.getIdleTime()).isEqualTo(0L); - - // idle start - sourceReaderMetrics.idlingStarted(); - Thread.sleep(10L); - assertThat(sourceReaderMetrics.getIdleTime()).isGreaterThan(9L); - - // non-idle - sourceReaderMetrics.recordSnapshotUpdate(123); - Thread.sleep(10L); - assertThat(sourceReaderMetrics.getIdleTime()).isEqualTo(0L); - - // idle start - sourceReaderMetrics.idlingStarted(); - Thread.sleep(10L); - assertThat(sourceReaderMetrics.getIdleTime()).isGreaterThan(9L); - } } From cf45da4b8ff447938f83d67313afcb4308393996 Mon Sep 17 00:00:00 2001 From: herefree <841043203@qq.com> Date: Fri, 6 Dec 2024 13:41:28 +0800 Subject: [PATCH 4/4] fix --- .../flink/source/metrics/FileStoreSourceReaderMetrics.java | 1 + .../apache/paimon/flink/source/operator/ReadOperator.java | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java index 2e1e94777949..a270e0eceecd 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java @@ -29,6 +29,7 @@ public class FileStoreSourceReaderMetrics { private long lastSplitUpdateTime = UNDEFINED; public static final long UNDEFINED = -1L; + public static final long ACTIVE = Long.MAX_VALUE; public FileStoreSourceReaderMetrics(MetricGroup sourceReaderMetricGroup) { sourceReaderMetricGroup.gauge( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java index 7828888867ad..d884724c6749 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java @@ -58,7 +58,7 @@ public class ReadOperator extends AbstractStreamOperator // is not a FLIP-27 // source and Flink can't automatically calculate this metric private transient long emitEventTimeLag = FileStoreSourceReaderMetrics.UNDEFINED; - private transient long idleStartTime = FileStoreSourceReaderMetrics.UNDEFINED; + private transient long idleStartTime = FileStoreSourceReaderMetrics.ACTIVE; private transient Counter numRecordsIn; public ReadOperator(ReadBuilder readBuilder) { @@ -99,7 +99,7 @@ public void processElement(StreamRecord record) throws Exception { .orElse(FileStoreSourceReaderMetrics.UNDEFINED); sourceReaderMetrics.recordSnapshotUpdate(eventTime); // update idleStartTime when reading a new split - idleStartTime = FileStoreSourceReaderMetrics.UNDEFINED; + idleStartTime = FileStoreSourceReaderMetrics.ACTIVE; boolean firstRecord = true; try (CloseableIterator iterator = @@ -138,7 +138,7 @@ private void idlingStarted() { } private boolean isIdling() { - return idleStartTime != FileStoreSourceReaderMetrics.UNDEFINED; + return idleStartTime != FileStoreSourceReaderMetrics.ACTIVE; } private long getIdleTime() {