diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java index 2e1e94777949..a270e0eceecd 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java @@ -29,6 +29,7 @@ public class FileStoreSourceReaderMetrics { private long lastSplitUpdateTime = UNDEFINED; public static final long UNDEFINED = -1L; + public static final long ACTIVE = Long.MAX_VALUE; public FileStoreSourceReaderMetrics(MetricGroup sourceReaderMetricGroup) { sourceReaderMetricGroup.gauge( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java index 80c85f7cdb35..d884724c6749 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java @@ -54,9 +54,11 @@ public class ReadOperator extends AbstractStreamOperator private transient IOManager ioManager; private transient FileStoreSourceReaderMetrics sourceReaderMetrics; - // we create our own gauge for currentEmitEventTimeLag, because this operator is not a FLIP-27 + // we create our own gauge for currentEmitEventTimeLag and sourceIdleTime, because this operator + // is not a FLIP-27 // source and Flink can't automatically calculate this metric private transient long emitEventTimeLag = FileStoreSourceReaderMetrics.UNDEFINED; + private transient long idleStartTime = FileStoreSourceReaderMetrics.ACTIVE; private transient Counter numRecordsIn; public ReadOperator(ReadBuilder readBuilder) { @@ -69,6 +71,7 @@ public void open() throws Exception { this.sourceReaderMetrics = new FileStoreSourceReaderMetrics(getMetricGroup()); getMetricGroup().gauge(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG, () -> emitEventTimeLag); + getMetricGroup().gauge(MetricNames.SOURCE_IDLE_TIME, this::getIdleTime); this.numRecordsIn = InternalSourceReaderMetricGroup.wrap(getMetricGroup()) .getIOMetricGroup() @@ -83,6 +86,7 @@ public void open() throws Exception { this.read = readBuilder.newRead().withIOManager(ioManager); this.reuseRow = new FlinkRowData(null); this.reuseRecord = new StreamRecord<>(reuseRow); + this.idlingStarted(); } @Override @@ -94,6 +98,8 @@ public void processElement(StreamRecord record) throws Exception { .earliestFileCreationEpochMillis() .orElse(FileStoreSourceReaderMetrics.UNDEFINED); sourceReaderMetrics.recordSnapshotUpdate(eventTime); + // update idleStartTime when reading a new split + idleStartTime = FileStoreSourceReaderMetrics.ACTIVE; boolean firstRecord = true; try (CloseableIterator iterator = @@ -113,6 +119,8 @@ public void processElement(StreamRecord record) throws Exception { output.collect(reuseRecord); } } + // start idle when data sending is completed + this.idlingStarted(); } @Override @@ -122,4 +130,18 @@ public void close() throws Exception { ioManager.close(); } } + + private void idlingStarted() { + if (!isIdling()) { + idleStartTime = System.currentTimeMillis(); + } + } + + private boolean isIdling() { + return idleStartTime != FileStoreSourceReaderMetrics.ACTIVE; + } + + private long getIdleTime() { + return isIdling() ? System.currentTimeMillis() - idleStartTime : 0; + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java index 61a03a29a21b..0bce8c8901ea 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java @@ -204,6 +204,14 @@ public void testReadOperatorMetricsRegisterAndUpdate() throws Exception { .getValue()) .isEqualTo(-1L); + Thread.sleep(300L); + assertThat( + (Long) + TestingMetricUtils.getGauge( + readerOperatorMetricGroup, "sourceIdleTime") + .getValue()) + .isGreaterThan(299L); + harness.processElement(new StreamRecord<>(splits.get(0))); assertThat( (Long) @@ -228,6 +236,14 @@ public void testReadOperatorMetricsRegisterAndUpdate() throws Exception { "currentEmitEventTimeLag") .getValue()) .isEqualTo(emitEventTimeLag); + + assertThat( + (Long) + TestingMetricUtils.getGauge( + readerOperatorMetricGroup, "sourceIdleTime") + .getValue()) + .isGreaterThan(99L) + .isLessThan(300L); } private T testReadSplit(