Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class FileStoreSourceReaderMetrics {
private long lastSplitUpdateTime = UNDEFINED;

public static final long UNDEFINED = -1L;
public static final long ACTIVE = Long.MAX_VALUE;

public FileStoreSourceReaderMetrics(MetricGroup sourceReaderMetricGroup) {
sourceReaderMetricGroup.gauge(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ public class ReadOperator extends AbstractStreamOperator<RowData>
private transient IOManager ioManager;

private transient FileStoreSourceReaderMetrics sourceReaderMetrics;
// we create our own gauge for currentEmitEventTimeLag, because this operator is not a FLIP-27
// we create our own gauge for currentEmitEventTimeLag and sourceIdleTime, because this operator
// is not a FLIP-27
// source and Flink can't automatically calculate this metric
private transient long emitEventTimeLag = FileStoreSourceReaderMetrics.UNDEFINED;
private transient long idleStartTime = FileStoreSourceReaderMetrics.ACTIVE;
private transient Counter numRecordsIn;

public ReadOperator(ReadBuilder readBuilder) {
Expand All @@ -69,6 +71,7 @@ public void open() throws Exception {

this.sourceReaderMetrics = new FileStoreSourceReaderMetrics(getMetricGroup());
getMetricGroup().gauge(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG, () -> emitEventTimeLag);
getMetricGroup().gauge(MetricNames.SOURCE_IDLE_TIME, this::getIdleTime);
this.numRecordsIn =
InternalSourceReaderMetricGroup.wrap(getMetricGroup())
.getIOMetricGroup()
Expand All @@ -83,6 +86,7 @@ public void open() throws Exception {
this.read = readBuilder.newRead().withIOManager(ioManager);
this.reuseRow = new FlinkRowData(null);
this.reuseRecord = new StreamRecord<>(reuseRow);
this.idlingStarted();
}

@Override
Expand All @@ -94,6 +98,8 @@ public void processElement(StreamRecord<Split> record) throws Exception {
.earliestFileCreationEpochMillis()
.orElse(FileStoreSourceReaderMetrics.UNDEFINED);
sourceReaderMetrics.recordSnapshotUpdate(eventTime);
// update idleStartTime when reading a new split
idleStartTime = FileStoreSourceReaderMetrics.ACTIVE;

boolean firstRecord = true;
try (CloseableIterator<InternalRow> iterator =
Expand All @@ -113,6 +119,8 @@ public void processElement(StreamRecord<Split> record) throws Exception {
output.collect(reuseRecord);
}
}
// start idle when data sending is completed
this.idlingStarted();
}

@Override
Expand All @@ -122,4 +130,18 @@ public void close() throws Exception {
ioManager.close();
}
}

private void idlingStarted() {
if (!isIdling()) {
idleStartTime = System.currentTimeMillis();
}
}

private boolean isIdling() {
return idleStartTime != FileStoreSourceReaderMetrics.ACTIVE;
}

private long getIdleTime() {
return isIdling() ? System.currentTimeMillis() - idleStartTime : 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,14 @@ public void testReadOperatorMetricsRegisterAndUpdate() throws Exception {
.getValue())
.isEqualTo(-1L);

Thread.sleep(300L);
assertThat(
(Long)
TestingMetricUtils.getGauge(
readerOperatorMetricGroup, "sourceIdleTime")
.getValue())
.isGreaterThan(299L);

harness.processElement(new StreamRecord<>(splits.get(0)));
assertThat(
(Long)
Expand All @@ -228,6 +236,14 @@ public void testReadOperatorMetricsRegisterAndUpdate() throws Exception {
"currentEmitEventTimeLag")
.getValue())
.isEqualTo(emitEventTimeLag);

assertThat(
(Long)
TestingMetricUtils.getGauge(
readerOperatorMetricGroup, "sourceIdleTime")
.getValue())
.isGreaterThan(99L)
.isLessThan(300L);
}

private <T> T testReadSplit(
Expand Down
Loading