diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 50e325cecfa65..0e7f15e64dbe4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -127,8 +127,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private ServiceUnitStateTableView tableview; private ScheduledFuture monitorTask; - private SessionEvent lastMetadataSessionEvent = SessionReestablished; - private long lastMetadataSessionEventTimestamp = 0; + private volatile SessionEvent lastMetadataSessionEvent = SessionReestablished; + private volatile long lastMetadataSessionEventTimestamp = 0; private long inFlightStateWaitingTimeInMillis; private long ownershipMonitorDelayTimeInSecs; @@ -1707,7 +1707,9 @@ protected void monitorOwnerships(List brokers) { var metadataState = getMetadataState(); if (metadataState != Stable) { - log.warn("metadata state:{} is not Stable. Skipping ownership monitor.", metadataState); + log.warn("metadata state:{} is not Stable. Skipping ownership monitor. lastMetadataSessionEvent:{}," + + " lastMetadataSessionEventTimestamp:{}", + metadataState, lastMetadataSessionEvent, lastMetadataSessionEventTimestamp); return; } @@ -1976,9 +1978,21 @@ public List getMetrics() { var metric = Metrics.create(dimensions); + var metadataState = getMetadataState(); + long now = System.currentTimeMillis(); + long lastSessionEventAgeSeconds = + lastMetadataSessionEventTimestamp > 0 + ? MILLISECONDS.toSeconds(now - lastMetadataSessionEventTimestamp) + : -1; + metric.put("brk_sunit_state_chn_orphan_su_cleanup_ops_total", totalOrphanServiceUnitCleanupCnt); metric.put("brk_sunit_state_chn_su_tombstone_cleanup_ops_total", totalServiceUnitTombstoneCleanupCnt); metric.put("brk_sunit_state_chn_owned_su_total", getTotalOwnedServiceUnitCnt()); + metric.put("brk_sunit_state_chn_metadata_state", metadataState.ordinal()); + metric.put("brk_sunit_state_chn_last_metadata_session_event_is_reestablished", + lastMetadataSessionEvent == SessionReestablished ? 1 : 0); + metric.put("brk_sunit_state_chn_last_metadata_session_event_timestamp_ms", lastMetadataSessionEventTimestamp); + metric.put("brk_sunit_state_chn_last_metadata_session_event_age_seconds", lastSessionEventAgeSeconds); metrics.add(metric); return metrics; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index bbde38bfbeceb..5baf91f2d5d24 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -96,6 +96,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.MetadataStoreTableView; @@ -731,6 +732,82 @@ public void handleMetadataSessionEventTest() throws IllegalAccessException { } + + @Test + public void metadataStateMetricsTest() throws IllegalAccessException { + ServiceUnitStateChannelImpl channel1 = (ServiceUnitStateChannelImpl) this.channel1; + + long now = System.currentTimeMillis(); + long oldTimestamp = now - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000) - 1; + FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent", SessionReestablished, true); + FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEventTimestamp", oldTimestamp, true); + long beforeMetricsCall = System.currentTimeMillis(); + var metrics = channel1.getMetrics(); + long afterMetricsCall = System.currentTimeMillis(); + assertEquals(0, getMetric(metrics, "brk_sunit_state_chn_metadata_state").intValue()); + assertEquals(1, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_is_reestablished") + .intValue()); + assertEquals(oldTimestamp, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_timestamp_ms") + .longValue()); + long ageSeconds = getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_age_seconds") + .longValue(); + long minAgeSeconds = TimeUnit.MILLISECONDS.toSeconds(beforeMetricsCall - oldTimestamp); + long maxAgeSeconds = TimeUnit.MILLISECONDS.toSeconds(afterMetricsCall - oldTimestamp); + assertTrue(ageSeconds >= minAgeSeconds && ageSeconds <= maxAgeSeconds, + "Unexpected age seconds: " + ageSeconds + ", expected within [" + minAgeSeconds + ", " + + maxAgeSeconds + "]"); + + FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent", SessionReestablished, true); + FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEventTimestamp", now, true); + metrics = channel1.getMetrics(); + assertEquals(1, getMetric(metrics, "brk_sunit_state_chn_metadata_state").intValue()); + assertEquals(1, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_is_reestablished") + .intValue()); + assertEquals(now, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_timestamp_ms") + .longValue()); + ageSeconds = getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_age_seconds") + .longValue(); + assertTrue(ageSeconds >= 0 && ageSeconds <= 1, "Unexpected age seconds: " + ageSeconds); + + FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent", SessionLost, true); + FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEventTimestamp", now, true); + metrics = channel1.getMetrics(); + assertEquals(2, getMetric(metrics, "brk_sunit_state_chn_metadata_state").intValue()); + assertEquals(0, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_is_reestablished") + .intValue()); + assertEquals(now, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_timestamp_ms") + .longValue()); + ageSeconds = getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_age_seconds") + .longValue(); + assertTrue(ageSeconds >= 0 && ageSeconds <= 1, "Unexpected age seconds: " + ageSeconds); + + FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent", SessionReestablished, true); + FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEventTimestamp", 0L, true); + metrics = channel1.getMetrics(); + assertEquals(0, getMetric(metrics, "brk_sunit_state_chn_metadata_state").intValue()); + assertEquals(1, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_is_reestablished") + .intValue()); + assertEquals(0L, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_timestamp_ms") + .longValue()); + assertEquals(-1L, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_age_seconds") + .longValue()); + } + + private static Number getMetric(List metrics, String metricName) { + for (Metrics metric : metrics) { + Object value = metric.getMetrics().get(metricName); + if (value == null) { + continue; + } + if (!(value instanceof Number)) { + fail(metricName + " is not numeric: " + value); + } + return (Number) value; + } + fail("Missing " + metricName + " metric"); + return -1L; + } + @Test(priority = 8) public void handleBrokerCreationEventTest() throws IllegalAccessException { var cleanupJobs = getCleanupJobs(channel1);