From 3d91e504132261806e0d9470a7c0549b5fee32cd Mon Sep 17 00:00:00 2001 From: Kai Wang Date: Fri, 9 Jan 2026 15:05:20 +0800 Subject: [PATCH 1/2] Ensure metadata session state visibility and improve Unstable observability for ServiceUnitStateChannelImpl --- .../channel/ServiceUnitStateChannelImpl.java | 20 +++++++++-- .../channel/ServiceUnitStateChannelTest.java | 36 +++++++++++++++++++ 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 50e325cecfa65..0e7f15e64dbe4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -127,8 +127,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private ServiceUnitStateTableView tableview; private ScheduledFuture monitorTask; - private SessionEvent lastMetadataSessionEvent = SessionReestablished; - private long lastMetadataSessionEventTimestamp = 0; + private volatile SessionEvent lastMetadataSessionEvent = SessionReestablished; + private volatile long lastMetadataSessionEventTimestamp = 0; private long inFlightStateWaitingTimeInMillis; private long ownershipMonitorDelayTimeInSecs; @@ -1707,7 +1707,9 @@ protected void monitorOwnerships(List brokers) { var metadataState = getMetadataState(); if (metadataState != Stable) { - log.warn("metadata state:{} is not Stable. Skipping ownership monitor.", metadataState); + log.warn("metadata state:{} is not Stable. Skipping ownership monitor. lastMetadataSessionEvent:{}," + + " lastMetadataSessionEventTimestamp:{}", + metadataState, lastMetadataSessionEvent, lastMetadataSessionEventTimestamp); return; } @@ -1976,9 +1978,21 @@ public List getMetrics() { var metric = Metrics.create(dimensions); + var metadataState = getMetadataState(); + long now = System.currentTimeMillis(); + long lastSessionEventAgeSeconds = + lastMetadataSessionEventTimestamp > 0 + ? MILLISECONDS.toSeconds(now - lastMetadataSessionEventTimestamp) + : -1; + metric.put("brk_sunit_state_chn_orphan_su_cleanup_ops_total", totalOrphanServiceUnitCleanupCnt); metric.put("brk_sunit_state_chn_su_tombstone_cleanup_ops_total", totalServiceUnitTombstoneCleanupCnt); metric.put("brk_sunit_state_chn_owned_su_total", getTotalOwnedServiceUnitCnt()); + metric.put("brk_sunit_state_chn_metadata_state", metadataState.ordinal()); + metric.put("brk_sunit_state_chn_last_metadata_session_event_is_reestablished", + lastMetadataSessionEvent == SessionReestablished ? 1 : 0); + metric.put("brk_sunit_state_chn_last_metadata_session_event_timestamp_ms", lastMetadataSessionEventTimestamp); + metric.put("brk_sunit_state_chn_last_metadata_session_event_age_seconds", lastSessionEventAgeSeconds); metrics.add(metric); return metrics; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index bbde38bfbeceb..fb1af36cc37c8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -96,6 +96,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.MetadataStoreTableView; @@ -731,6 +732,41 @@ public void handleMetadataSessionEventTest() throws IllegalAccessException { } + + @Test + public void metadataStateMetricsTest() throws IllegalAccessException { + ServiceUnitStateChannelImpl channel1 = (ServiceUnitStateChannelImpl) this.channel1; + + long now = System.currentTimeMillis(); + FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent", SessionReestablished, true); + FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEventTimestamp", + now - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000) - 1, true); + assertEquals(0, getMetadataStateMetric(channel1.getMetrics())); + + FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent", SessionReestablished, true); + FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEventTimestamp", now, true); + assertEquals(1, getMetadataStateMetric(channel1.getMetrics())); + + FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent", SessionLost, true); + FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEventTimestamp", now, true); + assertEquals(2, getMetadataStateMetric(channel1.getMetrics())); + } + + private static int getMetadataStateMetric(List metrics) { + for (Metrics metric : metrics) { + Object value = metric.getMetrics().get("brk_sunit_state_chn_metadata_state"); + if (value == null) { + continue; + } + if (!(value instanceof Number)) { + fail("brk_sunit_state_chn_metadata_state is not numeric: " + value); + } + return ((Number) value).intValue(); + } + fail("Missing brk_sunit_state_chn_metadata_state metric"); + return -1; + } + @Test(priority = 8) public void handleBrokerCreationEventTest() throws IllegalAccessException { var cleanupJobs = getCleanupJobs(channel1); From f5f6960abe12460254d95f695596774ebf600118 Mon Sep 17 00:00:00 2001 From: Kai Wang Date: Fri, 9 Jan 2026 17:55:32 +0800 Subject: [PATCH 2/2] Enhance metrics validation in ServiceUnitStateChannelTest for metadata session events --- .../channel/ServiceUnitStateChannelTest.java | 63 +++++++++++++++---- 1 file changed, 52 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index fb1af36cc37c8..5baf91f2d5d24 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -738,33 +738,74 @@ public void metadataStateMetricsTest() throws IllegalAccessException { ServiceUnitStateChannelImpl channel1 = (ServiceUnitStateChannelImpl) this.channel1; long now = System.currentTimeMillis(); + long oldTimestamp = now - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000) - 1; FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent", SessionReestablished, true); - FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEventTimestamp", - now - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000) - 1, true); - assertEquals(0, getMetadataStateMetric(channel1.getMetrics())); + FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEventTimestamp", oldTimestamp, true); + long beforeMetricsCall = System.currentTimeMillis(); + var metrics = channel1.getMetrics(); + long afterMetricsCall = System.currentTimeMillis(); + assertEquals(0, getMetric(metrics, "brk_sunit_state_chn_metadata_state").intValue()); + assertEquals(1, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_is_reestablished") + .intValue()); + assertEquals(oldTimestamp, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_timestamp_ms") + .longValue()); + long ageSeconds = getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_age_seconds") + .longValue(); + long minAgeSeconds = TimeUnit.MILLISECONDS.toSeconds(beforeMetricsCall - oldTimestamp); + long maxAgeSeconds = TimeUnit.MILLISECONDS.toSeconds(afterMetricsCall - oldTimestamp); + assertTrue(ageSeconds >= minAgeSeconds && ageSeconds <= maxAgeSeconds, + "Unexpected age seconds: " + ageSeconds + ", expected within [" + minAgeSeconds + ", " + + maxAgeSeconds + "]"); FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent", SessionReestablished, true); FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEventTimestamp", now, true); - assertEquals(1, getMetadataStateMetric(channel1.getMetrics())); + metrics = channel1.getMetrics(); + assertEquals(1, getMetric(metrics, "brk_sunit_state_chn_metadata_state").intValue()); + assertEquals(1, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_is_reestablished") + .intValue()); + assertEquals(now, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_timestamp_ms") + .longValue()); + ageSeconds = getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_age_seconds") + .longValue(); + assertTrue(ageSeconds >= 0 && ageSeconds <= 1, "Unexpected age seconds: " + ageSeconds); FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent", SessionLost, true); FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEventTimestamp", now, true); - assertEquals(2, getMetadataStateMetric(channel1.getMetrics())); + metrics = channel1.getMetrics(); + assertEquals(2, getMetric(metrics, "brk_sunit_state_chn_metadata_state").intValue()); + assertEquals(0, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_is_reestablished") + .intValue()); + assertEquals(now, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_timestamp_ms") + .longValue()); + ageSeconds = getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_age_seconds") + .longValue(); + assertTrue(ageSeconds >= 0 && ageSeconds <= 1, "Unexpected age seconds: " + ageSeconds); + + FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEvent", SessionReestablished, true); + FieldUtils.writeDeclaredField(channel1, "lastMetadataSessionEventTimestamp", 0L, true); + metrics = channel1.getMetrics(); + assertEquals(0, getMetric(metrics, "brk_sunit_state_chn_metadata_state").intValue()); + assertEquals(1, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_is_reestablished") + .intValue()); + assertEquals(0L, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_timestamp_ms") + .longValue()); + assertEquals(-1L, getMetric(metrics, "brk_sunit_state_chn_last_metadata_session_event_age_seconds") + .longValue()); } - private static int getMetadataStateMetric(List metrics) { + private static Number getMetric(List metrics, String metricName) { for (Metrics metric : metrics) { - Object value = metric.getMetrics().get("brk_sunit_state_chn_metadata_state"); + Object value = metric.getMetrics().get(metricName); if (value == null) { continue; } if (!(value instanceof Number)) { - fail("brk_sunit_state_chn_metadata_state is not numeric: " + value); + fail(metricName + " is not numeric: " + value); } - return ((Number) value).intValue(); + return (Number) value; } - fail("Missing brk_sunit_state_chn_metadata_state metric"); - return -1; + fail("Missing " + metricName + " metric"); + return -1L; } @Test(priority = 8)