Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {

private ServiceUnitStateTableView tableview;
private ScheduledFuture<?> monitorTask;
private SessionEvent lastMetadataSessionEvent = SessionReestablished;
private long lastMetadataSessionEventTimestamp = 0;
private volatile SessionEvent lastMetadataSessionEvent = SessionReestablished;
private volatile long lastMetadataSessionEventTimestamp = 0;
private long inFlightStateWaitingTimeInMillis;

private long ownershipMonitorDelayTimeInSecs;
Expand Down Expand Up @@ -1707,7 +1707,9 @@ protected void monitorOwnerships(List<String> brokers) {

var metadataState = getMetadataState();
if (metadataState != Stable) {
log.warn("metadata state:{} is not Stable. Skipping ownership monitor.", metadataState);
log.warn("metadata state:{} is not Stable. Skipping ownership monitor. lastMetadataSessionEvent:{},"
+ " lastMetadataSessionEventTimestamp:{}",
metadataState, lastMetadataSessionEvent, lastMetadataSessionEventTimestamp);
return;
}

Expand Down Expand Up @@ -1976,9 +1978,21 @@ public List<Metrics> getMetrics() {

var metric = Metrics.create(dimensions);

var metadataState = getMetadataState();
long now = System.currentTimeMillis();
long lastSessionEventAgeSeconds =
lastMetadataSessionEventTimestamp > 0
? MILLISECONDS.toSeconds(now - lastMetadataSessionEventTimestamp)
: -1;

metric.put("brk_sunit_state_chn_orphan_su_cleanup_ops_total", totalOrphanServiceUnitCleanupCnt);
metric.put("brk_sunit_state_chn_su_tombstone_cleanup_ops_total", totalServiceUnitTombstoneCleanupCnt);
metric.put("brk_sunit_state_chn_owned_su_total", getTotalOwnedServiceUnitCnt());
metric.put("brk_sunit_state_chn_metadata_state", metadataState.ordinal());
metric.put("brk_sunit_state_chn_last_metadata_session_event_is_reestablished",
lastMetadataSessionEvent == SessionReestablished ? 1 : 0);
metric.put("brk_sunit_state_chn_last_metadata_session_event_timestamp_ms", lastMetadataSessionEventTimestamp);
metric.put("brk_sunit_state_chn_last_metadata_session_event_age_seconds", lastSessionEventAgeSeconds);
metrics.add(metric);

return metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreTableView;
Expand Down Expand Up @@ -731,6 +732,82 @@ public void handleMetadataSessionEventTest() throws IllegalAccessException {

}


@Test
public void metadataStateMetricsTest() throws IllegalAccessException {
ServiceUnitStateChannelImpl channel1 = (ServiceUnitStateChannelImpl) this.channel1;

long now = System.currentTimeMillis();
long oldTimestamp = now - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000) - 1;
FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent", SessionReestablished, true);
FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEventTimestamp", oldTimestamp, true);
long beforeMetricsCall = System.currentTimeMillis();
var metrics = channel1.getMetrics();
long afterMetricsCall = System.currentTimeMillis();
assertEquals(0, getMetric(metrics, "brk_sunit_state_chn_metadata_state").intValue());
assertEquals(1, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_is_reestablished")
.intValue());
assertEquals(oldTimestamp, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_timestamp_ms")
.longValue());
long ageSeconds = getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_age_seconds")
.longValue();
long minAgeSeconds = TimeUnit.MILLISECONDS.toSeconds(beforeMetricsCall - oldTimestamp);
long maxAgeSeconds = TimeUnit.MILLISECONDS.toSeconds(afterMetricsCall - oldTimestamp);
assertTrue(ageSeconds >= minAgeSeconds && ageSeconds <= maxAgeSeconds,
"Unexpected age seconds: " + ageSeconds + ", expected within [" + minAgeSeconds + ", "
+ maxAgeSeconds + "]");

FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent", SessionReestablished, true);
FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEventTimestamp", now, true);
metrics = channel1.getMetrics();
assertEquals(1, getMetric(metrics, "brk_sunit_state_chn_metadata_state").intValue());
assertEquals(1, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_is_reestablished")
.intValue());
assertEquals(now, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_timestamp_ms")
.longValue());
ageSeconds = getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_age_seconds")
.longValue();
assertTrue(ageSeconds >= 0 && ageSeconds <= 1, "Unexpected age seconds: " + ageSeconds);

FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent", SessionLost, true);
FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEventTimestamp", now, true);
metrics = channel1.getMetrics();
assertEquals(2, getMetric(metrics, "brk_sunit_state_chn_metadata_state").intValue());
assertEquals(0, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_is_reestablished")
.intValue());
assertEquals(now, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_timestamp_ms")
.longValue());
ageSeconds = getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_age_seconds")
.longValue();
assertTrue(ageSeconds >= 0 && ageSeconds <= 1, "Unexpected age seconds: " + ageSeconds);

FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent", SessionReestablished, true);
FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEventTimestamp", 0L, true);
metrics = channel1.getMetrics();
assertEquals(0, getMetric(metrics, "brk_sunit_state_chn_metadata_state").intValue());
assertEquals(1, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_is_reestablished")
.intValue());
assertEquals(0L, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_timestamp_ms")
.longValue());
assertEquals(-1L, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_age_seconds")
.longValue());
}

private static Number getMetric(List<Metrics> metrics, String metricName) {
for (Metrics metric : metrics) {
Object value = metric.getMetrics().get(metricName);
if (value == null) {
continue;
}
if (!(value instanceof Number)) {
fail(metricName + " is not numeric: " + value);
}
return (Number) value;
}
fail("Missing " + metricName + " metric");
return -1L;
}

@Test(priority = 8)
public void handleBrokerCreationEventTest() throws IllegalAccessException {
var cleanupJobs = getCleanupJobs(channel1);
Expand Down
Loading