From e015d2d082849289ca456532a2c59da685a7b341 Mon Sep 17 00:00:00 2001 From: technoboy Date: Fri, 8 Oct 2021 21:42:53 +0800 Subject: [PATCH 1/3] Add MQTTMetricsProvider. --- .../pulsar/handlers/mqtt/Constants.java | 2 +- .../handlers/mqtt/MQTTChannelInitializer.java | 22 ++-- .../handlers/mqtt/MQTTProtocolHandler.java | 13 +- .../pulsar/handlers/mqtt/MQTTService.java | 46 +++++++ .../handlers/mqtt/proxy/MQTTProxyService.java | 8 +- .../DefaultProtocolMethodProcessorImpl.java | 7 +- .../mqtt/support/MQTTMetricsProvider.java | 113 ++++++++++++++++++ .../handlers/mqtt/utils/NettyUtils.java | 18 +++ .../mqtt/support/MQTTMetricsProviderTest.java | 62 ++++++++++ 9 files changed, 267 insertions(+), 24 deletions(-) create mode 100644 mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java create mode 100644 mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTMetricsProvider.java create mode 100644 mqtt-impl/src/test/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTMetricsProviderTest.java diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java index 54bc33f9e..aee694efc 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java @@ -19,7 +19,7 @@ public final class Constants { public static final String ATTR_CLIENT_ID = "ClientID"; - + public static final String ATTR_CLIENT_ADDR = "ClientAddr"; public static final String AUTH_BASIC = "basic"; public static final String AUTH_TOKEN = "token"; diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTChannelInitializer.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTChannelInitializer.java index 54f53d740..7b246bb32 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTChannelInitializer.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTChannelInitializer.java @@ -22,6 +22,7 @@ import io.netty.handler.ssl.SslHandler; import io.netty.handler.timeout.IdleStateHandler; import io.streamnative.pulsar.handlers.mqtt.support.DefaultProtocolMethodProcessorImpl; +import io.streamnative.pulsar.handlers.mqtt.support.MQTTMetricsProvider; import io.streamnative.pulsar.handlers.mqtt.support.psk.PSKConfiguration; import io.streamnative.pulsar.handlers.mqtt.support.psk.PSKUtils; import java.util.Map; @@ -43,6 +44,7 @@ public class MQTTChannelInitializer extends ChannelInitializer { private final MQTTServerConfiguration mqttConfig; private final Map authProviders; + private final MQTTMetricsProvider metricsProvider; private final boolean enableTls; private final boolean enableTlsPsk; private final boolean tlsEnabledWithKeyStore; @@ -50,21 +52,17 @@ public class MQTTChannelInitializer extends ChannelInitializer { private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder; private PSKConfiguration pskConfiguration; - public MQTTChannelInitializer(PulsarService pulsarService, - MQTTServerConfiguration mqttConfig, - Map authProviders, + public MQTTChannelInitializer(MQTTService mqttService, boolean enableTls) { - this(pulsarService, mqttConfig, authProviders, enableTls, false); + this(mqttService, enableTls, false); } - public MQTTChannelInitializer(PulsarService pulsarService, - MQTTServerConfiguration mqttConfig, - Map authProviders, - boolean enableTls, boolean enableTlsPsk) { + public MQTTChannelInitializer(MQTTService mqttService, boolean enableTls, boolean enableTlsPsk) { super(); - this.pulsarService = pulsarService; - this.mqttConfig = mqttConfig; - this.authProviders = authProviders; + this.pulsarService = mqttService.getPulsarService(); + this.mqttConfig = mqttService.getServerConfiguration(); + this.authProviders = mqttService.getAuthProviders(); + this.metricsProvider = mqttService.getMetricsProvider(); this.enableTls = enableTls; this.enableTlsPsk = enableTlsPsk; this.tlsEnabledWithKeyStore = mqttConfig.isTlsEnabledWithKeyStore(); @@ -123,6 +121,6 @@ public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("encoder", MqttEncoder.INSTANCE); ch.pipeline().addLast("handler", new MQTTInboundHandler(new DefaultProtocolMethodProcessorImpl(pulsarService, mqttConfig, - authProviders))); + authProviders, metricsProvider))); } } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTProtocolHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTProtocolHandler.java index 774643ecd..2a4563138 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTProtocolHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTProtocolHandler.java @@ -36,7 +36,6 @@ import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.protocol.ProtocolHandler; import org.apache.pulsar.broker.service.BrokerService; - /** * MQTT Protocol Handler load and run by Pulsar Service. */ @@ -56,6 +55,8 @@ public class MQTTProtocolHandler implements ProtocolHandler { private MQTTProxyService proxyService; + private MQTTService mqttService; + @Override public String protocolName() { return PROTOCOL_NAME; @@ -94,7 +95,7 @@ public void start(BrokerService brokerService) { this.authProviders = AuthUtils.configureAuthProviders(brokerService.getAuthenticationService(), mqttConfig.getMqttAuthenticationMethods()); } - + mqttService = new MQTTService(brokerService.pulsar(), mqttConfig, authProviders); if (mqttConfig.isMqttProxyEnable()) { MQTTProxyConfiguration proxyConfig = new MQTTProxyConfiguration(); proxyConfig.setDefaultTenant(mqttConfig.getDefaultTenant()); @@ -137,7 +138,7 @@ public void start(BrokerService brokerService) { proxyConfig.setTlsKeyStorePassword(mqttConfig.getTlsTrustStorePassword()); proxyConfig.setTlsKeyFilePath(mqttConfig.getTlsKeyFilePath()); log.info("proxyConfig broker service URL: {}", proxyConfig.getBrokerServiceURL()); - proxyService = new MQTTProxyService(proxyConfig, brokerService.getPulsar(), authProviders); + proxyService = new MQTTProxyService(proxyConfig, mqttService); try { proxyService.start(); log.info("Start MQTT proxy service at port: {}", proxyConfig.getMqttProxyPort()); @@ -170,17 +171,17 @@ public Map> newChannelIniti if (listener.startsWith(PLAINTEXT_PREFIX)) { builder.put( new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)), - new MQTTChannelInitializer(brokerService.pulsar(), mqttConfig, authProviders, false)); + new MQTTChannelInitializer(mqttService, false)); } else if (listener.startsWith(SSL_PREFIX) && mqttConfig.isTlsEnabled()) { builder.put( new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)), - new MQTTChannelInitializer(brokerService.pulsar(), mqttConfig, authProviders, true)); + new MQTTChannelInitializer(mqttService, true)); } else if (listener.startsWith(SSL_PSK_PREFIX) && mqttConfig.isTlsPskEnabled()) { builder.put( new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)), - new MQTTChannelInitializer(brokerService.pulsar(), mqttConfig, authProviders, false, true)); + new MQTTChannelInitializer(mqttService, false, true)); } else { log.error("MQTT listener {} not supported. supports {}, {} or {}", diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java new file mode 100644 index 000000000..3452724c9 --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java @@ -0,0 +1,46 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt; + +import io.streamnative.pulsar.handlers.mqtt.support.MQTTMetricsProvider; +import java.util.Map; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.authentication.AuthenticationProvider; + +/** + * Main class for mqtt service. + */ +@Slf4j +public class MQTTService { + + @Getter + private MQTTServerConfiguration serverConfiguration; + @Getter + private PulsarService pulsarService; + @Getter + private Map authProviders; + + @Getter + private final MQTTMetricsProvider metricsProvider; + + public MQTTService(PulsarService pulsarService, MQTTServerConfiguration serverConfiguration, + Map authProviders) { + this.serverConfiguration = serverConfiguration; + this.pulsarService = pulsarService; + this.authProviders = authProviders; + this.metricsProvider = new MQTTMetricsProvider(serverConfiguration); + } +} diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java index da3cec16f..8df78ba9d 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java @@ -19,6 +19,7 @@ import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; +import io.streamnative.pulsar.handlers.mqtt.MQTTService; import java.io.Closeable; import java.util.Map; import lombok.Getter; @@ -53,13 +54,12 @@ public class MQTTProxyService implements Closeable { private Map authProviders; public MQTTProxyService( - MQTTProxyConfiguration proxyConfig, PulsarService pulsarService, - Map authProviders) { + MQTTProxyConfiguration proxyConfig, MQTTService mqttService) { configValid(proxyConfig); this.proxyConfig = proxyConfig; - this.pulsarService = pulsarService; - this.authProviders = authProviders; + this.pulsarService = mqttService.getPulsarService(); + this.authProviders = mqttService.getAuthProviders(); acceptorGroup = EventLoopUtil.newEventLoopGroup(proxyConfig.getMqttProxyNumAcceptorThreads(), false, acceptorThreadFactory); workerGroup = EventLoopUtil.newEventLoopGroup(proxyConfig.getMqttProxyNumIOThreads(), diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/DefaultProtocolMethodProcessorImpl.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/DefaultProtocolMethodProcessorImpl.java index cc9b2da03..906d1eecf 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/DefaultProtocolMethodProcessorImpl.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/DefaultProtocolMethodProcessorImpl.java @@ -77,15 +77,17 @@ public class DefaultProtocolMethodProcessorImpl implements ProtocolMethodProcess private final PacketIdGenerator packetIdGenerator; private final OutstandingPacketContainer outstandingPacketContainer; private final Map authProviders; + private final MQTTMetricsProvider metricsProvider; public DefaultProtocolMethodProcessorImpl (PulsarService pulsarService, MQTTServerConfiguration configuration, - Map authProviders) { + Map authProviders, MQTTMetricsProvider metricsProvider) { this.pulsarService = pulsarService; this.configuration = configuration; this.qosPublishHandlers = new QosPublishHandlersImpl(pulsarService, configuration); this.packetIdGenerator = PacketIdGenerator.newNonZeroGenerator(); this.outstandingPacketContainer = new OutstandingPacketContainerImpl(); this.authProviders = authProviders; + this.metricsProvider = metricsProvider; } @Override @@ -179,6 +181,7 @@ public void processConnect(Channel channel, MqttConnectMessage msg) { } final boolean success = descriptor.assignState(SENDACK, ESTABLISHED); + metricsProvider.addClient(NettyUtils.getAndAttachAddress(channel)); if (log.isDebugEnabled()) { log.debug("The CONNECT message has been processed. CId={}, username={} success={}", @@ -251,6 +254,7 @@ public void processDisconnect(Channel channel, MqttMessage msg) { if (log.isDebugEnabled()) { log.debug("[Disconnect] [{}] ", clientID); } + metricsProvider.removeClient(NettyUtils.retrieveAddress(channel)); final ConnectionDescriptor existingDescriptor = ConnectionDescriptorStore.getInstance().getConnection(clientID); if (existingDescriptor == null) { // another client with same ID removed the descriptor, we must exit @@ -295,6 +299,7 @@ public void processConnectionLost(Channel channel) { } String clientId = NettyUtils.retrieveClientId(channel); if (StringUtils.isNotEmpty(clientId)) { + metricsProvider.removeClient(NettyUtils.retrieveAddress(channel)); ConnectionDescriptor oldConnDescriptor = new ConnectionDescriptor(clientId, channel, true); ConnectionDescriptorStore.getInstance().removeConnection(oldConnDescriptor); removeSubscriptions(null, clientId); diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTMetricsProvider.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTMetricsProvider.java new file mode 100644 index 000000000..9a3423dac --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTMetricsProvider.java @@ -0,0 +1,113 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.support; + +import com.google.common.collect.Lists; +import io.streamnative.pulsar.handlers.mqtt.MQTTServerConfiguration; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Getter; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; +import org.apache.pulsar.common.stats.Metrics; +import org.apache.pulsar.common.util.SimpleTextOutputStream; + +/** + * MQTT metrics provider. + */ +public class MQTTMetricsProvider implements PrometheusRawMetricsProvider { + + private final AtomicLong onlineClientsCount = new AtomicLong(); + + private Set onlineClients = ConcurrentHashMap.newKeySet(); + + @Getter + private final MQTTServerConfiguration serverConfiguration; + + private List metrics; + + public MQTTMetricsProvider(MQTTServerConfiguration config) { + this.serverConfiguration = config; + this.metrics = Lists.newArrayList(); + } + + @Override + public void generate(SimpleTextOutputStream stream) { + String cluster = serverConfiguration.getClusterName(); + Collection metrics = getMetrics(); + Set names = new HashSet<>(); + for (Metrics item : metrics) { + for (Map.Entry entry : item.getMetrics().entrySet()) { + String name = entry.getKey(); + if (!names.contains(name)) { + stream.write("# TYPE ").write(entry.getKey()).write(' ') + .write("counter").write('\n'); + names.add(name); + } + stream.write(name) + .write("{cluster=\"").write(cluster).write('"'); + + for (Map.Entry metric : item.getDimensions().entrySet()) { + if (metric.getKey().isEmpty() || "cluster".equals(metric.getKey())) { + continue; + } + stream.write(", ").write(metric.getKey()).write("=\"").write(metric.getValue()).write('"'); + } + stream.write("} ").write(String.valueOf(entry.getValue())) + .write(' ').write(System.currentTimeMillis()).write("\n"); + } + } + } + + private Collection getMetrics() { + Map dimensionMap = new HashMap<>(); + Metrics m = Metrics.create(dimensionMap); + m.put("mop_online_clients_count", getAndResetOnlineClientsCount()); + + metrics.clear(); + metrics.add(m); + return metrics; + } + + public long getAndResetOnlineClientsCount() { + return onlineClientsCount.getAndSet(0); + } + + public long getOnlineClientsCount() { + return onlineClientsCount.get(); + } + + public Set getOnlineClients() { + return onlineClients; + } + + public void addClient(String address) { + if (onlineClients.add(address)) { + onlineClientsCount.incrementAndGet(); + } + } + + public void removeClient(String address) { + if (StringUtils.isNotEmpty(address) && onlineClients.remove(address)) { + onlineClientsCount.decrementAndGet(); + } + } + +} diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/NettyUtils.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/NettyUtils.java index b6f982242..fc4506902 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/NettyUtils.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/NettyUtils.java @@ -13,6 +13,7 @@ */ package io.streamnative.pulsar.handlers.mqtt.utils; +import static io.streamnative.pulsar.handlers.mqtt.Constants.ATTR_CLIENT_ADDR; import static io.streamnative.pulsar.handlers.mqtt.Constants.ATTR_CLIENT_ID; import static io.streamnative.pulsar.handlers.mqtt.Constants.ATTR_CONNECT_MSG; import io.netty.channel.Channel; @@ -20,6 +21,7 @@ import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.AttributeKey; +import java.net.InetSocketAddress; import java.util.Optional; /** @@ -32,6 +34,7 @@ public final class NettyUtils { private static final AttributeKey ATTR_KEY_CLIENT_ID = AttributeKey.valueOf(ATTR_CLIENT_ID); private static final AttributeKey ATTR_KEY_USERNAME = AttributeKey.valueOf(ATTR_USERNAME); private static final AttributeKey ATTR_KEY_CONNECT_MSG = AttributeKey.valueOf(ATTR_CONNECT_MSG); + private static final AttributeKey ATTR_KEY_CLIENT_ADDR = AttributeKey.valueOf(ATTR_CLIENT_ADDR); public static void attachClientID(Channel channel, String clientId) { channel.attr(NettyUtils.ATTR_KEY_CLIENT_ID).set(clientId); @@ -66,6 +69,21 @@ public static void addIdleStateHandler(Channel channel, int idleTime) { pipeline.addFirst("idleStateHandler", new IdleStateHandler(idleTime, 0, 0)); } + public static String getAndAttachAddress(Channel channel) { + String address = getAddress(channel); + channel.attr(NettyUtils.ATTR_KEY_CLIENT_ADDR).set(address); + return address; + } + + public static String getAddress(Channel channel) { + InetSocketAddress address = (InetSocketAddress) channel.remoteAddress(); + return address.getHostName() + ":" + address.getPort(); + } + + public static String retrieveAddress(Channel channel) { + return (String) channel.attr(NettyUtils.ATTR_KEY_CLIENT_ADDR).get(); + } + private NettyUtils() { } } diff --git a/mqtt-impl/src/test/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTMetricsProviderTest.java b/mqtt-impl/src/test/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTMetricsProviderTest.java new file mode 100644 index 000000000..d4f1374dc --- /dev/null +++ b/mqtt-impl/src/test/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTMetricsProviderTest.java @@ -0,0 +1,62 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.support; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.streamnative.pulsar.handlers.mqtt.MQTTServerConfiguration; +import org.apache.pulsar.common.util.SimpleTextOutputStream; +import org.junit.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +public class MQTTMetricsProviderTest { + + private MQTTServerConfiguration serverConfiguration; + private SimpleTextOutputStream outputStream; + private ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(); + + @BeforeMethod + public void setUp() { + outputStream = new SimpleTextOutputStream(buf); + serverConfiguration = mock(MQTTServerConfiguration.class); + doReturn("mop-cluster").when(serverConfiguration).getClusterName(); + } + + @Test + public void testGenerate() { + MQTTMetricsProvider provider = new MQTTMetricsProvider(serverConfiguration); + provider.addClient("192.168.1.0:11022"); + provider.generate(outputStream); + String result = new String(buf.array(), buf.arrayOffset(), buf.readableBytes()); + buf.release(); + Assert.assertTrue(result.contains("mop_online_clients_count")); + } + + @Test + public void testClientsData() { + MQTTMetricsProvider provider = new MQTTMetricsProvider(serverConfiguration); + String client1 = "192.168.1.0:11022"; + provider.addClient(client1); + Assert.assertTrue(provider.getOnlineClients().contains(client1)); + Assert.assertEquals(provider.getOnlineClientsCount(), 1); + provider.addClient(client1); + Assert.assertEquals(provider.getOnlineClientsCount(), 1); + provider.removeClient(client1); + Assert.assertTrue(provider.getOnlineClients().isEmpty()); + Assert.assertEquals(provider.getOnlineClientsCount(), 0); + } +} From 27ca3263d4940b42f3feeb86335be294c910d1a2 Mon Sep 17 00:00:00 2001 From: technoboy Date: Fri, 8 Oct 2021 21:56:33 +0800 Subject: [PATCH 2/3] fix checkstyle --- .../pulsar/handlers/mqtt/support/MQTTMetricsProviderTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mqtt-impl/src/test/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTMetricsProviderTest.java b/mqtt-impl/src/test/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTMetricsProviderTest.java index d4f1374dc..9f59bddc1 100644 --- a/mqtt-impl/src/test/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTMetricsProviderTest.java +++ b/mqtt-impl/src/test/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTMetricsProviderTest.java @@ -13,6 +13,8 @@ */ package io.streamnative.pulsar.handlers.mqtt.support; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.streamnative.pulsar.handlers.mqtt.MQTTServerConfiguration; @@ -20,8 +22,6 @@ import org.junit.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; public class MQTTMetricsProviderTest { From 95e67c72f0682245e63aba3ae1b433d5db151f1e Mon Sep 17 00:00:00 2001 From: technoboy Date: Sat, 9 Oct 2021 09:33:44 +0800 Subject: [PATCH 3/3] Add addPrometheusRawMetricsProvider. --- .../java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java index 3452724c9..20fe55848 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java @@ -42,5 +42,6 @@ public MQTTService(PulsarService pulsarService, MQTTServerConfiguration serverCo this.pulsarService = pulsarService; this.authProviders = authProviders; this.metricsProvider = new MQTTMetricsProvider(serverConfiguration); + this.pulsarService.addPrometheusRawMetricsProvider(metricsProvider); } }