diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
index e8dd076cd1193..5992d50f27dc7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
@@ -16,6 +16,7 @@
package org.apache.bookkeeper.mledger;
import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
public interface ManagedLedgerMXBean {
@@ -95,5 +96,9 @@ public interface ManagedLedgerMXBean {
double getLedgerSwitchLatencyAverageUsec();
+ StatsBuckets getInternalAddEntryLatencyBuckets();
+
+ StatsBuckets getInternalEntrySizeBuckets();
+
PendingBookieOpsStats getPendingBookieOpsStats();
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
index 5defcb198cbe7..64800634be00e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
@@ -236,6 +236,16 @@ public long[] getLedgerSwitchLatencyBuckets() {
return ledgerSwitchLatencyStatsUsec.getBuckets();
}
+ @Override
+ public StatsBuckets getInternalAddEntryLatencyBuckets() {
+ return addEntryLatencyStatsUsec;
+ }
+
+ @Override
+ public StatsBuckets getInternalEntrySizeBuckets() {
+ return entryStats;
+ }
+
@Override
public double getLedgerSwitchLatencyAverageUsec() {
return ledgerSwitchLatencyStatsUsec.getAvg();
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/StatsBuckets.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/StatsBuckets.java
index 2a566ad9cd868..4ad4ee7d698f6 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/StatsBuckets.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/StatsBuckets.java
@@ -71,6 +71,17 @@ public void refresh() {
this.count = count;
}
+ public void reset() {
+ sum = 0;
+ sumCounter.reset();
+ count = 0;
+
+ for (int i = 0; i < buckets.length; i++) {
+ buckets[i].reset();
+ values[i] = 0;
+ }
+ }
+
public long[] getBuckets() {
return values;
}
@@ -87,6 +98,16 @@ public double getAvg() {
return sum / (double) count;
}
+ public void addAll(StatsBuckets other) {
+ checkArgument(boundaries.length == other.boundaries.length);
+
+ for (int i = 0; i < buckets.length; i++) {
+ buckets[i].add(other.values[i]);
+ }
+
+ sumCounter.add(other.count);
+ }
+
private boolean isSorted(long[] array) {
long previous = Long.MIN_VALUE;
diff --git a/pom.xml b/pom.xml
index 15bef8d5dcf4c..103ba3c9c0e77 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,7 +102,7 @@ flexible messaging model and an intuitive client API.
0.9.5
9.3.11.v20160721
1.1.8
-
+ 0.0.21
@@ -411,6 +411,18 @@ flexible messaging model and an intuitive client API.
0.9.0
+
+ io.prometheus
+ simpleclient
+ ${prometheus.version}
+
+
+
+ io.prometheus
+ simpleclient_hotspot
+ ${prometheus.version}
+
+
org.apache.spark
spark-streaming_2.10
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index f08e712e6d9da..db890b558c045 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -181,6 +181,16 @@
log4j
test
+
+
+ io.prometheus
+ simpleclient
+
+
+
+ io.prometheus
+ simpleclient_hotspot
+
io.swagger
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java
index 8db4905299eeb..a48a30e057fd4 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java
@@ -28,15 +28,8 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
-import com.yahoo.pulsar.utils.PulsarBrokerVersionStringUtils;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
@@ -57,6 +50,7 @@
import com.yahoo.pulsar.broker.service.BrokerService;
import com.yahoo.pulsar.broker.service.Topic;
import com.yahoo.pulsar.broker.stats.MetricsGenerator;
+import com.yahoo.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import com.yahoo.pulsar.broker.web.WebService;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.client.util.FutureUtil;
@@ -64,6 +58,7 @@
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.policies.data.ClusterData;
+import com.yahoo.pulsar.utils.PulsarBrokerVersionStringUtils;
import com.yahoo.pulsar.websocket.WebSocketConsumerServlet;
import com.yahoo.pulsar.websocket.WebSocketProducerServlet;
import com.yahoo.pulsar.websocket.WebSocketService;
@@ -173,7 +168,7 @@ public void close() throws PulsarServerException {
this.bkClientFactory.close();
this.bkClientFactory = null;
}
-
+
if (this.leaderElectionService != null) {
this.leaderElectionService.stop();
this.leaderElectionService = null;
@@ -267,6 +262,8 @@ public void start() throws PulsarServerException {
this.webService.addRestResources("/admin", "com.yahoo.pulsar.broker.admin", true);
this.webService.addRestResources("/lookup", "com.yahoo.pulsar.broker.lookup", true);
+ this.webService.addServlet("/metrics", new ServletHolder(new PrometheusMetricsServlet(this)), false);
+
if (config.isWebSocketServiceEnabled()) {
// Use local broker address to avoid different IP address when using a VIP for service discovery
this.webSocketService = new WebSocketService(
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/BrokerStats.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/BrokerStats.java
index 819c92b49b6a6..1760ade5f4f4f 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/BrokerStats.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/BrokerStats.java
@@ -28,21 +28,21 @@
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
-import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import com.yahoo.pulsar.broker.loadbalance.ResourceUnit;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import com.yahoo.pulsar.broker.stats.AllocatorStatsGenerator;
import com.yahoo.pulsar.broker.stats.BookieClientStatsGenerator;
import com.yahoo.pulsar.broker.stats.MBeanStatsGenerator;
-import com.yahoo.pulsar.common.stats.Metrics;
import com.yahoo.pulsar.broker.web.RestException;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.stats.AllocatorStats;
+import com.yahoo.pulsar.common.stats.Metrics;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/LoadManager.java
index 0c4c435232907..de9dfd8761af8 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/LoadManager.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/LoadManager.java
@@ -15,6 +15,7 @@
*/
package com.yahoo.pulsar.broker.loadbalance;
+import java.io.IOException;
import java.util.List;
import org.slf4j.Logger;
@@ -28,8 +29,9 @@
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
-import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
+import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.common.stats.Metrics;
+import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
/**
* LoadManager runs though set of load reports collected from different brokers and generates a recommendation of
@@ -59,10 +61,10 @@ public interface LoadManager {
* Generate the load report
*/
LoadReport generateLoadReport() throws Exception;
-
+
/**
- * Returns {@link Deserializer} to deserialize load report
- *
+ * Returns {@link Deserializer} to deserialize load report
+ *
* @return
*/
Deserializer extends ServiceLookupData> getLoadReportDeserializer();
@@ -109,7 +111,7 @@ public interface LoadManager {
/**
* Initialize this LoadManager.
- *
+ *
* @param pulsar
* The service to initialize this with.
*/
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index b40d42034af5f..17c1eda8f8f8f 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -15,6 +15,7 @@
*/
package com.yahoo.pulsar.broker.loadbalance.impl;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
@@ -26,8 +27,9 @@
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
-import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
+import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.common.stats.Metrics;
+import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
/**
* Wrapper class allowing classes of instance ModularLoadManager to be compatible with the interface LoadManager.
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 201dfca560ec7..48fc6b842e15c 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -19,7 +19,6 @@
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import java.io.IOException;
-import java.net.URL;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
@@ -48,8 +47,6 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
@@ -62,7 +59,6 @@
import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import com.yahoo.pulsar.broker.loadbalance.PlacementStrategy;
import com.yahoo.pulsar.broker.loadbalance.ResourceUnit;
-import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.ResourceQuota;
@@ -1062,7 +1058,7 @@ private long getAverageJvmHeapUsageMBytes() {
}
}
- private SystemResourceUsage getSystemResourceUsage() throws IOException {
+ public SystemResourceUsage getSystemResourceUsage() throws IOException {
SystemResourceUsage systemResourceUsage = LoadManagerShared.getSystemResourceUsage(brokerHostUsage);
long memoryUsageInMBytes = getAverageJvmHeapUsageMBytes();
systemResourceUsage.memory.usage = (double) memoryUsageInMBytes;
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java
index 87167b42c40e5..d2fc96aacb8e6 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java
@@ -425,8 +425,10 @@ public PulsarClient getReplicationClient(String cluster) {
configuration.setAuthentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
}
- String clusterUrl = configuration.isUseTls() ? (isNotBlank(data.getBrokerServiceUrlTls())
- ? data.getBrokerServiceUrlTls() : data.getServiceUrlTls()) : null;
+ String clusterUrl = configuration.isUseTls()
+ ? (isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls()
+ : data.getServiceUrlTls())
+ : null;
clusterUrl = (isNotBlank(clusterUrl)) ? clusterUrl
: (isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl());
return new PulsarClientImpl(clusterUrl, configuration, this.workerGroup);
@@ -1083,4 +1085,8 @@ private void createPendingLoadTopic() {
}
}
-}
\ No newline at end of file
+
+ public ConcurrentOpenHashMap>> getMultiLayerTopicMap() {
+ return multiLayerTopicsMap;
+ }
+}
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
new file mode 100644
index 0000000000000..89259e6c7d198
--- /dev/null
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -0,0 +1,65 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.pulsar.broker.stats.prometheus;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
+
+public class AggregatedNamespaceStats {
+ public int topicsCount;
+ public int subscriptionsCount;
+ public int producersCount;
+ public int consumersCount;
+ public double rateIn;
+ public double rateOut;
+ public double throughputIn;
+ public double throughputOut;
+
+ public long storageSize;
+ public long msgBacklog;
+
+ public StatsBuckets storageWriteLatencyBuckets = new StatsBuckets(
+ ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
+ public StatsBuckets entrySizeBuckets = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_SIZE_BUCKETS_BYTES);
+
+ public double storageWriteRate;
+ public double storageReadRate;
+
+ public Map replicationStats = new HashMap<>();
+
+ public void reset() {
+ topicsCount = 0;
+ subscriptionsCount = 0;
+ producersCount = 0;
+ consumersCount = 0;
+ rateIn = 0;
+ rateOut = 0;
+ throughputIn = 0;
+ throughputOut = 0;
+
+ storageSize = 0;
+ msgBacklog = 0;
+ storageWriteRate = 0;
+ storageReadRate = 0;
+
+ replicationStats.clear();
+ storageWriteLatencyBuckets.reset();
+ entrySizeBuckets.reset();
+ }
+}
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java
new file mode 100644
index 0000000000000..ce88b8325f3ff
--- /dev/null
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java
@@ -0,0 +1,33 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.pulsar.broker.stats.prometheus;
+
+public class AggregatedReplicationStats {
+ public double msgRateIn;
+
+ /** Total throughput received from the remote cluster. bytes/s */
+ public double msgThroughputIn;
+
+ /** Total rate of messages delivered to the replication-subscriber. msg/s */
+ public double msgRateOut;
+
+ /** Total throughput delivered to the replication-subscriber. bytes/s */
+ public double msgThroughputOut;
+
+ /** Number of messages pending to be replicated to remote cluster */
+ public long replicationBacklog;
+
+}
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
new file mode 100644
index 0000000000000..313eb7ea2bfc2
--- /dev/null
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -0,0 +1,185 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.pulsar.broker.stats.prometheus;
+
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
+
+import com.yahoo.pulsar.broker.PulsarService;
+import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
+import com.yahoo.pulsar.common.policies.data.ReplicatorStats;
+import com.yahoo.pulsar.utils.SimpleTextOutputStream;
+
+import io.netty.util.concurrent.FastThreadLocal;
+
+public class NamespaceStatsAggregator {
+
+ private static FastThreadLocal localNamespaceStats = new FastThreadLocal() {
+ @Override
+ protected AggregatedNamespaceStats initialValue() throws Exception {
+ return new AggregatedNamespaceStats();
+ }
+ };
+
+ public static void generate(PulsarService pulsar, SimpleTextOutputStream stream) {
+ String cluster = pulsar.getConfiguration().getClusterName();
+ AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
+
+ pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
+ namespaceStats.reset();
+
+ bundlesMap.forEach((bundle, topicsMap) -> {
+ topicsMap.forEach((name, topic) -> {
+ updateNamespaceStats(namespaceStats, topic);
+ });
+ });
+
+ printNamespaceStats(stream, cluster, namespace, namespaceStats);
+ });
+ }
+
+ private static void updateNamespaceStats(AggregatedNamespaceStats stats, PersistentTopic topic) {
+ // Managed Ledger stats
+ ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) topic.getManagedLedger().getStats();
+
+ stats.storageSize += mlStats.getStoredMessagesSize();
+ stats.storageWriteLatencyBuckets.addAll(mlStats.getInternalAddEntryLatencyBuckets());
+ stats.entrySizeBuckets.addAll(mlStats.getInternalEntrySizeBuckets());
+
+ stats.storageWriteRate = mlStats.getAddEntryMessagesRate();
+ stats.storageReadRate = mlStats.getReadEntriesRate();
+ stats.topicsCount++;
+
+ topic.getProducers().forEach(producer -> {
+ if (producer.isRemote()) {
+ AggregatedReplicationStats replStats = stats.replicationStats
+ .computeIfAbsent(producer.getRemoteCluster(), k -> new AggregatedReplicationStats());
+
+ replStats.msgRateIn += producer.getStats().msgRateIn;
+ replStats.msgThroughputIn += producer.getStats().msgThroughputIn;
+ } else {
+ // Local producer
+ stats.producersCount++;
+ stats.rateIn += producer.getStats().msgRateIn;
+ stats.throughputIn += producer.getStats().msgThroughputIn;
+ }
+ });
+
+ topic.getSubscriptions().forEach((name, subscription) -> {
+ stats.subscriptionsCount++;
+ stats.msgBacklog += subscription.getNumberOfEntriesInBacklog();
+
+ subscription.getConsumers().forEach(consumer -> {
+ stats.consumersCount++;
+ stats.rateOut += consumer.getStats().msgRateOut;
+ stats.throughputOut += consumer.getStats().msgThroughputOut;
+ });
+ });
+
+ topic.getReplicators().forEach((cluster, replicator) -> {
+ AggregatedReplicationStats aggReplStats = stats.replicationStats.computeIfAbsent(cluster,
+ k -> new AggregatedReplicationStats());
+
+ ReplicatorStats replStats = replicator.getStats();
+ aggReplStats.msgRateOut += replStats.msgRateOut;
+ aggReplStats.msgThroughputOut += replStats.msgThroughputOut;
+ aggReplStats.replicationBacklog += replStats.replicationBacklog;
+ });
+ }
+
+ private static void printNamespaceStats(SimpleTextOutputStream stream, String cluster, String namespace,
+ AggregatedNamespaceStats stats) {
+ metric(stream, cluster, namespace, "pulsar_topics_count", stats.topicsCount);
+ metric(stream, cluster, namespace, "pulsar_subscriptions_count", stats.subscriptionsCount);
+ metric(stream, cluster, namespace, "pulsar_producers_count", stats.producersCount);
+ metric(stream, cluster, namespace, "pulsar_consumers_count", stats.consumersCount);
+
+ metric(stream, cluster, namespace, "pulsar_rate_in", stats.rateIn);
+ metric(stream, cluster, namespace, "pulsar_rate_out", stats.rateOut);
+ metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
+ metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);
+
+ metric(stream, cluster, namespace, "pulsar_storage_size", stats.storageSize);
+ metric(stream, cluster, namespace, "pulsar_storage_write_rate", stats.storageWriteRate);
+ metric(stream, cluster, namespace, "pulsar_storage_read_rate", stats.storageReadRate);
+
+ metricWithRemoteCluster(stream, cluster, namespace, "pulsar_msg_backlog", "local", stats.msgBacklog);
+
+ stats.storageWriteLatencyBuckets.refresh();
+ long[] latencyBuckets = stats.storageWriteLatencyBuckets.getBuckets();
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_1", latencyBuckets[1]);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_5", latencyBuckets[2]);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_10", latencyBuckets[3]);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_20", latencyBuckets[4]);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_50", latencyBuckets[5]);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_100", latencyBuckets[6]);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_200", latencyBuckets[7]);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_overflow", latencyBuckets[9]);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_count",
+ stats.storageWriteLatencyBuckets.getCount());
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_sum",
+ stats.storageWriteLatencyBuckets.getSum());
+
+ stats.entrySizeBuckets.refresh();
+ long[] entrySizeBuckets = stats.entrySizeBuckets.getBuckets();
+ metric(stream, cluster, namespace, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
+ metric(stream, cluster, namespace, "pulsar_entry_size_le_512", entrySizeBuckets[1]);
+ metric(stream, cluster, namespace, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]);
+ metric(stream, cluster, namespace, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]);
+ metric(stream, cluster, namespace, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]);
+ metric(stream, cluster, namespace, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]);
+ metric(stream, cluster, namespace, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]);
+ metric(stream, cluster, namespace, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]);
+ metric(stream, cluster, namespace, "pulsar_entry_size_le_overflow", entrySizeBuckets[8]);
+ metric(stream, cluster, namespace, "pulsar_entry_size_count", stats.entrySizeBuckets.getCount());
+ metric(stream, cluster, namespace, "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum());
+
+ if (!stats.replicationStats.isEmpty()) {
+ stats.replicationStats.forEach((remoteCluster, replStats) -> {
+ metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_in", remoteCluster,
+ replStats.msgRateIn);
+ metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_out", remoteCluster,
+ replStats.msgRateOut);
+ metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_throughput_in", remoteCluster,
+ replStats.msgThroughputIn);
+ metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_throughput_out", remoteCluster,
+ replStats.msgThroughputOut);
+ metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_backlog", remoteCluster,
+ replStats.replicationBacklog);
+ });
+ }
+ }
+
+ private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
+ long value) {
+ stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace).write("\"} ");
+ stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+ }
+
+ private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
+ double value) {
+ stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace).write("\"} ");
+ stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+ }
+
+ private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
+ String name, String remoteCluster, double value) {
+ stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace);
+ stream.write("\", remote_cluster=\"").write(remoteCluster).write("\"} ");
+ stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+ }
+}
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
new file mode 100644
index 0000000000000..688756a5d3b4a
--- /dev/null
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -0,0 +1,101 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.pulsar.broker.stats.prometheus;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Enumeration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.yahoo.pulsar.broker.PulsarService;
+import com.yahoo.pulsar.broker.stats.metrics.JvmMetrics;
+import com.yahoo.pulsar.utils.SimpleTextOutputStream;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.prometheus.client.Collector;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Gauge.Child;
+import io.prometheus.client.hotspot.DefaultExports;
+
+/**
+ * Generate metrics aggregated at the namespace level and formats them out in a text format suitable to be consumed by
+ * Prometheus. Format specification can be found at {@link https://prometheus.io/docs/instrumenting/exposition_formats/}
+ */
+public class PrometheusMetricsGenerator {
+
+ static {
+ DefaultExports.initialize();
+
+ Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new Child() {
+ @Override
+ public double get() {
+ return JvmMetrics.getJvmDirectMemoryUsed();
+ }
+ }).register(CollectorRegistry.defaultRegistry);
+
+ Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Child() {
+ @SuppressWarnings("restriction")
+ @Override
+ public double get() {
+ return sun.misc.VM.maxDirectMemory();
+ }
+ }).register(CollectorRegistry.defaultRegistry);
+ }
+
+ public static void generate(PulsarService pulsar, OutputStream out) throws IOException {
+ ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
+ try {
+ SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
+
+ generateSystemMetrics(stream, pulsar.getConfiguration().getClusterName());
+
+ NamespaceStatsAggregator.generate(pulsar, stream);
+
+ out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
+ } finally {
+ buf.release();
+ }
+ }
+
+ private static void generateSystemMetrics(SimpleTextOutputStream stream, String cluster) {
+ Enumeration metricFamilySamples = CollectorRegistry.defaultRegistry.metricFamilySamples();
+ while (metricFamilySamples.hasMoreElements()) {
+ MetricFamilySamples metricFamily = metricFamilySamples.nextElement();
+
+ for (int i = 0; i < metricFamily.samples.size(); i++) {
+ Sample sample = metricFamily.samples.get(i);
+ stream.write(sample.name);
+ stream.write("{cluster=\"").write(cluster).write("\",");
+ for (int j = 0; j < sample.labelNames.size(); j++) {
+ stream.write(sample.labelNames.get(j));
+ stream.write("=\"");
+ stream.write(sample.labelValues.get(j));
+ stream.write("\",");
+ }
+
+ stream.write("} ");
+ stream.write(Collector.doubleToGoString(sample.value));
+ stream.write('\n');
+ }
+ }
+ }
+}
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
new file mode 100644
index 0000000000000..b14a07837a7bd
--- /dev/null
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -0,0 +1,83 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.pulsar.broker.stats.prometheus;
+
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.http.HttpStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.yahoo.pulsar.broker.PulsarService;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+public class PrometheusMetricsServlet extends HttpServlet {
+
+ private static final long serialVersionUID = 1L;
+
+ private final PulsarService pulsar;
+ private ExecutorService executor = null;
+
+ public PrometheusMetricsServlet(PulsarService pulsar) {
+ this.pulsar = pulsar;
+ }
+
+ @Override
+ public void init() throws ServletException {
+ executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("prometheus-stats"));
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException, IOException {
+ AsyncContext context = request.startAsync();
+
+ executor.execute(safeRun(() -> {
+ HttpServletResponse res = (HttpServletResponse) context.getResponse();
+ try {
+ res.setStatus(HttpStatus.OK_200);
+ res.setContentType("text/plain");
+ PrometheusMetricsGenerator.generate(pulsar, res.getOutputStream());
+ context.complete();
+
+ } catch (IOException e) {
+ log.error("Failed to generate prometheus stats", e);
+ res.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
+ context.complete();
+ }
+ }));
+ }
+
+ @Override
+ public void destroy() {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(PrometheusMetricsServlet.class);
+}
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/utils/SimpleTextOutputStream.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/utils/SimpleTextOutputStream.java
new file mode 100644
index 0000000000000..37959bcce5cca
--- /dev/null
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/utils/SimpleTextOutputStream.java
@@ -0,0 +1,132 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.pulsar.utils;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * Format strings and numbers into a ByteBuf without any memory allocation.
+ *
+ */
+public class SimpleTextOutputStream {
+ private final ByteBuf buffer;
+ private final static char[] hexChars = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e',
+ 'f' };
+
+ public SimpleTextOutputStream(ByteBuf buffer) {
+ this.buffer = buffer;
+ }
+
+ public SimpleTextOutputStream write(byte[] a) {
+ buffer.writeBytes(a, 0, a.length);
+ return this;
+ }
+
+ public SimpleTextOutputStream write(byte[] a, int offset, int len) {
+ buffer.writeBytes(a, offset, len);
+ return this;
+ }
+
+ public SimpleTextOutputStream write(char c) {
+ buffer.writeByte((byte) c);
+ return this;
+ }
+
+ public SimpleTextOutputStream write(String s) {
+ if (s == null) {
+ return this;
+ }
+ int len = s.length();
+ for (int i = 0; i < len; i++) {
+ buffer.writeByte((byte) s.charAt(i));
+ }
+
+ return this;
+ }
+
+ public SimpleTextOutputStream write(Number n) {
+ if (n instanceof Integer) {
+ return write(n.intValue());
+ } else if (n instanceof Long) {
+ return write(n.longValue());
+ } else if (n instanceof Double) {
+ return write(n.doubleValue());
+ } else {
+ return write(n.toString());
+ }
+ }
+
+ public SimpleTextOutputStream writeEncoded(String s) {
+ if (s == null) {
+ return this;
+ }
+
+ int len = s.length();
+ for (int i = 0; i < len; i++) {
+
+ char c = s.charAt(i);
+ if (c < 32 || c > 126) { // below 32 and above 126 in ascii table
+ buffer.writeByte((byte) '\\');
+ buffer.writeByte((byte) 'u');
+ buffer.writeByte(hexChars[(c & 0xf000) >> 12]);
+ buffer.writeByte(hexChars[(c & 0x0f00) >> 8]);
+ buffer.writeByte(hexChars[(c & 0x00f0) >> 4]);
+ buffer.writeByte(hexChars[c & 0x000f]);
+ continue;
+ }
+
+ if (c == '\\' || c == '"') {
+ buffer.writeByte((byte) '\\');
+ }
+ buffer.writeByte((byte) c);
+ }
+ return this;
+ }
+
+ SimpleTextOutputStream write(boolean b) {
+ write(b ? "true" : "false");
+ return this;
+ }
+
+ SimpleTextOutputStream write(long n) {
+ NumberFormat.format(this.buffer, n);
+ return this;
+ }
+
+ SimpleTextOutputStream write(double d) {
+ long i = (long) d;
+ write(i);
+
+ long r = Math.abs((long) (1000 * (d - i)));
+ write('.');
+ if (r == 0) {
+ write('0');
+ return this;
+ }
+
+ if (r < 100) {
+ write('0');
+ }
+
+ if (r < 10) {
+ write('0');
+ }
+
+ write(r);
+ return this;
+ }
+
+}
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/utils/StatsOutputStream.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/utils/StatsOutputStream.java
index 5971bc0d9a74a..eb0d574223c20 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/utils/StatsOutputStream.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/utils/StatsOutputStream.java
@@ -19,182 +19,109 @@
import io.netty.buffer.ByteBuf;
-public class StatsOutputStream {
- private final ByteBuf buffer;
+public class StatsOutputStream extends SimpleTextOutputStream {
private final Stack separators = new Stack<>();
- private final static char[] hexChars = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e',
- 'f' };
public StatsOutputStream(ByteBuf buffer) {
- this.buffer = buffer;
+ super(buffer);
}
public StatsOutputStream startObject() {
checkSeparator();
separators.push(Boolean.FALSE);
- return writeString("{");
+ write('{');
+ return this;
}
public StatsOutputStream startObject(String key) {
checkSeparator();
- writeChar('"').writeEncodedString(key).writeString("\":{");
+ write('"').writeEncoded(key).write("\":{");
separators.push(Boolean.FALSE);
return this;
}
public StatsOutputStream endObject() {
separators.pop();
- return writeString("}");
+ write('}');
+ return this;
}
public StatsOutputStream startList() {
checkSeparator();
separators.push(Boolean.FALSE);
- return writeString("[");
+ write('[');
+ return this;
}
public StatsOutputStream startList(String key) {
checkSeparator();
- writeChar('"').writeEncodedString(key).writeString("\":[");
+ write('"').writeEncoded(key).write("\":[");
separators.push(Boolean.FALSE);
return this;
}
public StatsOutputStream endList() {
separators.pop();
- return writeString("]");
+ write(']');
+ return this;
}
public StatsOutputStream writePair(String name, boolean value) {
checkSeparator();
- return writeChar('"').writeEncodedString(name).writeString("\":").writeBoolean(value);
+ write('"').writeEncoded(name).write("\":").write(value);
+ return this;
}
public StatsOutputStream writePair(String name, long n) {
checkSeparator();
- return writeChar('"').writeEncodedString(name).writeString("\":").writeLong(n);
+ write('"').writeEncoded(name).write("\":").write(n);
+ return this;
}
public StatsOutputStream writePair(String name, double d) {
checkSeparator();
- return writeChar('"').writeEncodedString(name).writeString("\":").writeDouble(d);
+ write('"').writeEncoded(name).write("\":").write(d);
+ return this;
}
public StatsOutputStream writePair(String name, String s) {
checkSeparator();
- return writeChar('"').writeEncodedString(name).writeString("\":\"").writeEncodedString(s).writeChar('"');
+ write('"').writeEncoded(name).write("\":\"").writeEncoded(s).write('"');
+ return this;
}
- public StatsOutputStream write(boolean value) {
+ public StatsOutputStream writeItem(boolean value) {
checkSeparator();
- return writeBoolean(value);
+ super.write(value);
+ return this;
}
- public StatsOutputStream write(long n) {
+ public StatsOutputStream writeItem(long n) {
checkSeparator();
- return writeLong(n);
+ super.write(n);
+ return this;
}
- public StatsOutputStream write(double d) {
+ public StatsOutputStream writeItem(double d) {
checkSeparator();
- return writeDouble(d);
+ super.write(d);
+ return this;
}
- StatsOutputStream write(String s) {
+ StatsOutputStream writeItem(String s) {
checkSeparator();
- return writeChar('"').writeEncodedString(s).writeChar('"');
+
+ write('"').writeEncoded(s).write('"');
+ return this;
}
private void checkSeparator() {
if (separators.isEmpty()) {
return;
} else if (separators.peek() == Boolean.TRUE) {
- writeString(",");
+ write(",");
} else {
separators.set(separators.size() - 1, Boolean.TRUE);
}
}
-
- StatsOutputStream writeBytes(byte[] a, int offset, int len) {
- buffer.writeBytes(a, offset, len);
- return this;
- }
-
- StatsOutputStream writeChar(char c) {
- buffer.writeByte((byte) c);
- return this;
- }
-
- StatsOutputStream writeString(String s) {
- if (s == null) {
- return this;
- }
- int len = s.length();
- for (int i = 0; i < len; i++) {
- buffer.writeByte((byte) s.charAt(i));
- }
-
- return this;
- }
-
- StatsOutputStream writeEncodedString(String s) {
- if (s == null) {
- return this;
- }
-
- int len = s.length();
- for (int i = 0; i < len; i++) {
-
- char c = s.charAt(i);
- if (c < 32 || c > 126) { // below 32 and above 126 in ascii table
- buffer.writeByte((byte) '\\');
- buffer.writeByte((byte) 'u');
- buffer.writeByte(hexChars[(c & 0xf000) >> 12]);
- buffer.writeByte(hexChars[(c & 0x0f00) >> 8]);
- buffer.writeByte(hexChars[(c & 0x00f0) >> 4]);
- buffer.writeByte(hexChars[c & 0x000f]);
- continue;
- }
-
- if (c == '\\' || c == '"') {
- buffer.writeByte((byte) '\\');
- }
- buffer.writeByte((byte) c);
- }
- return this;
- }
-
- StatsOutputStream writeBoolean(boolean b) {
- writeString(b ? "true" : "false");
- return this;
- }
-
- StatsOutputStream writeLong(long n) {
- NumberFormat.format(this.buffer, n);
- return this;
- }
-
- StatsOutputStream writeDouble(double d) {
- long i = (long) d;
- writeLong(i);
-
- long r = Math.abs((long) (1000 * (d - i)));
- writeString(".");
- if (r == 0) {
- writeString("0");
- return this;
- }
-
- if (r < 100) {
- writeString("0");
- }
-
- if (r < 10) {
- writeString("0");
- }
-
- writeLong(r);
- return this;
- }
-
}
diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/utils/SimpleTextOutputStreamTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/utils/SimpleTextOutputStreamTest.java
new file mode 100644
index 0000000000000..0150f0519e3d6
--- /dev/null
+++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/utils/SimpleTextOutputStreamTest.java
@@ -0,0 +1,116 @@
+/**
+ * Copyright 2016 Yahoo Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.pulsar.utils;
+
+import static org.testng.Assert.assertEquals;
+
+import java.nio.charset.Charset;
+
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+public class SimpleTextOutputStreamTest {
+
+ private ByteBuf buf;
+ private SimpleTextOutputStream stream;
+
+ @BeforeMethod
+ public void reset() {
+ buf = Unpooled.buffer(4096);
+ stream = new StatsOutputStream(buf);
+ }
+
+ @Test
+ public void testBooleanFormat() {
+ stream.write(false);
+ assertEquals(str(), "false");
+
+ stream.write(true);
+ assertEquals(str(), "true");
+ }
+
+ @Test
+ public void testLongFormat() {
+ stream.write(0);
+ assertEquals(str(), "0");
+
+ stream.write(-1);
+ assertEquals(str(), "-1");
+
+ stream.write(123456789);
+ assertEquals(str(), "123456789");
+
+ stream.write(-123456789);
+ assertEquals(str(), "-123456789");
+
+ long i = 2 * (long) Integer.MAX_VALUE;
+
+ stream.write(i);
+ assertEquals(str(), Long.toString(i));
+
+ stream.write(Long.MAX_VALUE);
+ assertEquals(str(), Long.toString(Long.MAX_VALUE));
+
+ stream.write(Long.MIN_VALUE);
+ assertEquals(str(), Long.toString(Long.MIN_VALUE));
+
+ // Testing trailing 0s
+ stream.write(100);
+ assertEquals(str(), "100");
+
+ stream.write(-1000);
+ assertEquals(str(), "-1000");
+ }
+
+ @Test
+ public void testDoubleFormat() {
+ stream.write(0.0);
+ assertEquals(str(), "0.0");
+
+ stream.write(1.0);
+ assertEquals(str(), "1.0");
+
+ stream.write(1.123456789);
+ assertEquals(str(), "1.123");
+
+ stream.write(123456.123456789);
+ assertEquals(str(), "123456.123");
+
+ stream.write(-123456.123456789);
+ assertEquals(str(), "-123456.123");
+
+ stream.write(-123456.003456789);
+ assertEquals(str(), "-123456.003");
+
+ stream.write(-123456.100456789);
+ assertEquals(str(), "-123456.100");
+ }
+
+ @Test
+ public void testString() {
+ stream.writeEncoded("�\b`~�ýý8ýH\\abcd\"");
+ assertEquals(str(), "\\ufffd\\u0008`~\\ufffd\\u00fd\\u00fd8\\u00fdH\\\\abcd\\\"");
+ }
+
+ public String str() {
+ String s = buf.toString(Charset.forName("utf-8"));
+ reset();
+ return s;
+ }
+}
diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/utils/StatsOutputStreamTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/utils/StatsOutputStreamTest.java
index 959d343ed4e49..44a2f047832f2 100644
--- a/pulsar-broker/src/test/java/com/yahoo/pulsar/utils/StatsOutputStreamTest.java
+++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/utils/StatsOutputStreamTest.java
@@ -38,72 +38,6 @@ public void reset() {
stream = new StatsOutputStream(buf);
}
- @Test
- public void testBooleanFormat() {
- stream.writeBoolean(false);
- assertEquals(str(), "false");
-
- stream.writeBoolean(true);
- assertEquals(str(), "true");
- }
-
- @Test
- public void testLongFormat() {
- stream.writeLong(0);
- assertEquals(str(), "0");
-
- stream.writeLong(-1);
- assertEquals(str(), "-1");
-
- stream.writeLong(123456789);
- assertEquals(str(), "123456789");
-
- stream.writeLong(-123456789);
- assertEquals(str(), "-123456789");
-
- long i = 2 * (long) Integer.MAX_VALUE;
-
- stream.writeLong(i);
- assertEquals(str(), Long.toString(i));
-
- stream.writeLong(Long.MAX_VALUE);
- assertEquals(str(), Long.toString(Long.MAX_VALUE));
-
- stream.writeLong(Long.MIN_VALUE);
- assertEquals(str(), Long.toString(Long.MIN_VALUE));
-
- // Testing trailing 0s
- stream.writeLong(100);
- assertEquals(str(), "100");
-
- stream.writeLong(-1000);
- assertEquals(str(), "-1000");
- }
-
- @Test
- public void testDoubleFormat() {
- stream.writeDouble(0.0);
- assertEquals(str(), "0.0");
-
- stream.writeDouble(1.0);
- assertEquals(str(), "1.0");
-
- stream.writeDouble(1.123456789);
- assertEquals(str(), "1.123");
-
- stream.writeDouble(123456.123456789);
- assertEquals(str(), "123456.123");
-
- stream.writeDouble(-123456.123456789);
- assertEquals(str(), "-123456.123");
-
- stream.writeDouble(-123456.003456789);
- assertEquals(str(), "-123456.003");
-
- stream.writeDouble(-123456.100456789);
- assertEquals(str(), "-123456.100");
- }
-
@Test
public void testPairs() {
stream.writePair("my-count", 1);
@@ -126,22 +60,22 @@ public void testLists() {
assertEquals(str(), "[]");
stream.startList();
- stream.write(1);
+ stream.writeItem(1);
stream.endList();
assertEquals(str(), "[1]");
stream.startList();
- stream.write(1).write(2);
+ stream.writeItem(1).writeItem(2);
stream.endList();
assertEquals(str(), "[1,2]");
stream.startList();
- stream.write(1).write(2).write(3);
+ stream.writeItem(1).writeItem(2).writeItem(3);
stream.endList();
assertEquals(str(), "[1,2,3]");
stream.startList();
- stream.write(1).write(2).write(3).write(false).write(1.0).write("xyz");
+ stream.writeItem(1).writeItem(2).writeItem(3).writeItem(false).writeItem(1.0).writeItem("xyz");
stream.endList();
assertEquals(str(), "[1,2,3,false,1.0,\"xyz\"]");
}
@@ -153,7 +87,7 @@ public void testNamedLists() {
assertEquals(str(), "\"abc\":[]");
stream.startList("abc");
- stream.write(1);
+ stream.writeItem(1);
stream.endList();
assertEquals(str(), "\"abc\":[1]");
}
@@ -209,13 +143,6 @@ public void testNestedObjects() {
assertEquals(str(), "[{\"a\":1},{\"b\":2}]");
}
- @Test
- public void testString() {
- stream.writeEncodedString("�\b`~�ýý8ýH\\abcd\"");
- assertEquals(str(), "\\ufffd\\u0008`~\\ufffd\\u00fd\\u00fd8\\u00fdH\\\\abcd\\\"");
-
- }
-
@SuppressWarnings("unchecked")
@Test
public void testCopyOnWriteArrayList() {