Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.apache.bookkeeper.mledger;

import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
import org.apache.bookkeeper.mledger.util.StatsBuckets;

public interface ManagedLedgerMXBean {

Expand Down Expand Up @@ -95,5 +96,9 @@ public interface ManagedLedgerMXBean {

double getLedgerSwitchLatencyAverageUsec();

StatsBuckets getInternalAddEntryLatencyBuckets();

StatsBuckets getInternalEntrySizeBuckets();

PendingBookieOpsStats getPendingBookieOpsStats();
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,16 @@ public long[] getLedgerSwitchLatencyBuckets() {
return ledgerSwitchLatencyStatsUsec.getBuckets();
}

@Override
public StatsBuckets getInternalAddEntryLatencyBuckets() {
return addEntryLatencyStatsUsec;
}

@Override
public StatsBuckets getInternalEntrySizeBuckets() {
return entryStats;
}

@Override
public double getLedgerSwitchLatencyAverageUsec() {
return ledgerSwitchLatencyStatsUsec.getAvg();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ public void refresh() {
this.count = count;
}

public void reset() {
sum = 0;
sumCounter.reset();
count = 0;

for (int i = 0; i < buckets.length; i++) {
buckets[i].reset();
values[i] = 0;
}
}

public long[] getBuckets() {
return values;
}
Expand All @@ -87,6 +98,16 @@ public double getAvg() {
return sum / (double) count;
}

public void addAll(StatsBuckets other) {
checkArgument(boundaries.length == other.boundaries.length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also check isSorted(other.boundaries)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the other is also a StatsBuckets the sorted check would have been already performed in its own constructor


for (int i = 0; i < buckets.length; i++) {
buckets[i].add(other.values[i]);
}

sumCounter.add(other.count);
}

private boolean isSorted(long[] array) {
long previous = Long.MIN_VALUE;

Expand Down
14 changes: 13 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ flexible messaging model and an intuitive client API.</description>
<storm.version>0.9.5</storm.version>
<jetty.version>9.3.11.v20160721</jetty.version>
<athenz.version>1.1.8</athenz.version>

<prometheus.version>0.0.21</prometheus.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -411,6 +411,18 @@ flexible messaging model and an intuitive client API.</description>
<version>0.9.0</version>
</dependency>

<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
<version>${prometheus.version}</version>
</dependency>

<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId>
<version>${prometheus.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
Expand Down
10 changes: 10 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,16 @@
<groupId>log4j</groupId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
</dependency>

<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId>
</dependency>

<dependency>
<groupId>io.swagger</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,8 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

import com.yahoo.pulsar.utils.PulsarBrokerVersionStringUtils;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
Expand All @@ -57,13 +50,15 @@
import com.yahoo.pulsar.broker.service.BrokerService;
import com.yahoo.pulsar.broker.service.Topic;
import com.yahoo.pulsar.broker.stats.MetricsGenerator;
import com.yahoo.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import com.yahoo.pulsar.broker.web.WebService;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.client.util.FutureUtil;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.policies.data.ClusterData;
import com.yahoo.pulsar.utils.PulsarBrokerVersionStringUtils;
import com.yahoo.pulsar.websocket.WebSocketConsumerServlet;
import com.yahoo.pulsar.websocket.WebSocketProducerServlet;
import com.yahoo.pulsar.websocket.WebSocketService;
Expand Down Expand Up @@ -173,7 +168,7 @@ public void close() throws PulsarServerException {
this.bkClientFactory.close();
this.bkClientFactory = null;
}

if (this.leaderElectionService != null) {
this.leaderElectionService.stop();
this.leaderElectionService = null;
Expand Down Expand Up @@ -267,6 +262,8 @@ public void start() throws PulsarServerException {
this.webService.addRestResources("/admin", "com.yahoo.pulsar.broker.admin", true);
this.webService.addRestResources("/lookup", "com.yahoo.pulsar.broker.lookup", true);

this.webService.addServlet("/metrics", new ServletHolder(new PrometheusMetricsServlet(this)), false);

if (config.isWebSocketServiceEnabled()) {
// Use local broker address to avoid different IP address when using a VIP for service discovery
this.webSocketService = new WebSocketService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;

import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import com.yahoo.pulsar.broker.loadbalance.ResourceUnit;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import com.yahoo.pulsar.broker.stats.AllocatorStatsGenerator;
import com.yahoo.pulsar.broker.stats.BookieClientStatsGenerator;
import com.yahoo.pulsar.broker.stats.MBeanStatsGenerator;
import com.yahoo.pulsar.common.stats.Metrics;
import com.yahoo.pulsar.broker.web.RestException;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.stats.AllocatorStats;
import com.yahoo.pulsar.common.stats.Metrics;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.yahoo.pulsar.broker.loadbalance;

import java.io.IOException;
import java.util.List;

import org.slf4j.Logger;
Expand All @@ -28,8 +29,9 @@
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.common.stats.Metrics;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;

/**
* LoadManager runs though set of load reports collected from different brokers and generates a recommendation of
Expand Down Expand Up @@ -59,10 +61,10 @@ public interface LoadManager {
* Generate the load report
*/
LoadReport generateLoadReport() throws Exception;

/**
* Returns {@link Deserializer} to deserialize load report
*
* Returns {@link Deserializer} to deserialize load report
*
* @return
*/
Deserializer<? extends ServiceLookupData> getLoadReportDeserializer();
Expand Down Expand Up @@ -109,7 +111,7 @@ public interface LoadManager {

/**
* Initialize this LoadManager.
*
*
* @param pulsar
* The service to initialize this with.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.yahoo.pulsar.broker.loadbalance.impl;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

Expand All @@ -26,8 +27,9 @@
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.common.stats.Metrics;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;

/**
* Wrapper class allowing classes of instance ModularLoadManager to be compatible with the interface LoadManager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static org.apache.commons.lang3.StringUtils.isNotEmpty;

import java.io.IOException;
import java.net.URL;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -48,8 +47,6 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
Expand All @@ -62,7 +59,6 @@
import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import com.yahoo.pulsar.broker.loadbalance.PlacementStrategy;
import com.yahoo.pulsar.broker.loadbalance.ResourceUnit;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.ResourceQuota;
Expand Down Expand Up @@ -1062,7 +1058,7 @@ private long getAverageJvmHeapUsageMBytes() {
}
}

private SystemResourceUsage getSystemResourceUsage() throws IOException {
public SystemResourceUsage getSystemResourceUsage() throws IOException {
SystemResourceUsage systemResourceUsage = LoadManagerShared.getSystemResourceUsage(brokerHostUsage);
long memoryUsageInMBytes = getAverageJvmHeapUsageMBytes();
systemResourceUsage.memory.usage = (double) memoryUsageInMBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,10 @@ public PulsarClient getReplicationClient(String cluster) {
configuration.setAuthentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
}
String clusterUrl = configuration.isUseTls() ? (isNotBlank(data.getBrokerServiceUrlTls())
? data.getBrokerServiceUrlTls() : data.getServiceUrlTls()) : null;
String clusterUrl = configuration.isUseTls()
? (isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls()
: data.getServiceUrlTls())
: null;
clusterUrl = (isNotBlank(clusterUrl)) ? clusterUrl
: (isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl());
return new PulsarClientImpl(clusterUrl, configuration, this.workerGroup);
Expand Down Expand Up @@ -1083,4 +1085,8 @@ private void createPendingLoadTopic() {
}

}
}

public ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, PersistentTopic>>> getMultiLayerTopicMap() {
return multiLayerTopicsMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.stats.prometheus;

import java.util.HashMap;
import java.util.Map;

import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.bookkeeper.mledger.util.StatsBuckets;

public class AggregatedNamespaceStats {
public int topicsCount;
public int subscriptionsCount;
public int producersCount;
public int consumersCount;
public double rateIn;
public double rateOut;
public double throughputIn;
public double throughputOut;

public long storageSize;
public long msgBacklog;

public StatsBuckets storageWriteLatencyBuckets = new StatsBuckets(
ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
public StatsBuckets entrySizeBuckets = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_SIZE_BUCKETS_BYTES);

public double storageWriteRate;
public double storageReadRate;

public Map<String, AggregatedReplicationStats> replicationStats = new HashMap<>();

public void reset() {
topicsCount = 0;
subscriptionsCount = 0;
producersCount = 0;
consumersCount = 0;
rateIn = 0;
rateOut = 0;
throughputIn = 0;
throughputOut = 0;

storageSize = 0;
msgBacklog = 0;
storageWriteRate = 0;
storageReadRate = 0;

replicationStats.clear();
storageWriteLatencyBuckets.reset();
entrySizeBuckets.reset();
}
}
Loading