diff --git a/src/main/java/com/chickling/kmonitor/controller/JMXMetricController.java b/src/main/java/com/chickling/kmonitor/controller/JMXMetricController.java index 8793ea0..b4f2813 100644 --- a/src/main/java/com/chickling/kmonitor/controller/JMXMetricController.java +++ b/src/main/java/com/chickling/kmonitor/controller/JMXMetricController.java @@ -16,11 +16,11 @@ import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; -import com.chickling.kmonitor.core.jmx.FormatedMeterMetric; -import com.chickling.kmonitor.core.jmx.JMXExecutor; -import com.chickling.kmonitor.core.jmx.KafkaJMX; -import com.chickling.kmonitor.core.jmx.KafkaMetrics; -import com.chickling.kmonitor.core.jmx.MeterMetric; +import com.chickling.kmonitor.jmx.FormatedMeterMetric; +import com.chickling.kmonitor.jmx.JMXExecutor; +import com.chickling.kmonitor.jmx.KafkaJMX; +import com.chickling.kmonitor.jmx.KafkaMetrics; +import com.chickling.kmonitor.jmx.MeterMetric; import com.chickling.kmonitor.utils.ZKUtils; /** @@ -33,204 +33,201 @@ @RestController @RequestMapping("/metrics") public class JMXMetricController { - private static Logger LOG = LoggerFactory.getLogger(JMXMetricController.class); - - @RequestMapping(value = "/brokerTopicMetrics/brokers", method = RequestMethod.GET) - public String getBrokerTopicMetricsForBrokers() { - Map result = new HashMap(); - try { - List jmxHosts = ZKUtils.getKafkaJMXHostsFromZookeeper(); - for (String jmxHost : jmxHosts) { - String[] jmxArr = jmxHost.split(":"); - if ("-1".equals(jmxArr[2])) { - continue; - } - KafkaJMX kafkaJMX = new KafkaJMX(); - kafkaJMX.doWithConnection(jmxArr[1], Integer.parseInt(jmxArr[2]), Optional.of(""), Optional.of(""), - false, new JMXExecutor() { - - @Override - public void doWithConnection(MBeanServerConnection mBeanServerConnection) { - KafkaMetrics metrics = new KafkaMetrics(); - if (result.containsKey("BytesInPerSec")) { - result.put("BytesInPerSec", merge(result.get("BytesInPerSec"), - metrics.getBytesInPerSec(mBeanServerConnection, Optional.empty()))); - } else { - result.put("BytesInPerSec", - metrics.getBytesInPerSec(mBeanServerConnection, Optional.empty())); - } - - if (result.containsKey("BytesOutPerSec")) { - result.put("BytesOutPerSec", merge(result.get("BytesOutPerSec"), - metrics.getBytesOutPerSec(mBeanServerConnection, Optional.empty()))); - } else { - result.put("BytesOutPerSec", - metrics.getBytesOutPerSec(mBeanServerConnection, Optional.empty())); - } - - if (result.containsKey("BytesRejectedPerSec")) { - result.put("BytesRejectedPerSec", merge(result.get("BytesRejectedPerSec"), - metrics.getBytesRejectedPerSec(mBeanServerConnection, Optional.empty()))); - } else { - result.put("BytesRejectedPerSec", - metrics.getBytesRejectedPerSec(mBeanServerConnection, Optional.empty())); - } - - if (result.containsKey("FailedFetchRequestsPerSec")) { - result.put("FailedFetchRequestsPerSec", - merge(result.get("FailedFetchRequestsPerSec"), - metrics.getFailedFetchRequestsPerSec(mBeanServerConnection, - Optional.empty()))); - } else { - result.put("FailedFetchRequestsPerSec", metrics - .getFailedFetchRequestsPerSec(mBeanServerConnection, Optional.empty())); - } - if (result.containsKey("FailedProduceRequestsPerSec")) { - result.put("FailedProduceRequestsPerSec", - merge(result.get("FailedProduceRequestsPerSec"), - metrics.getFailedProduceRequestsPerSec(mBeanServerConnection, - Optional.empty()))); - } else { - result.put("FailedProduceRequestsPerSec", metrics - .getFailedProduceRequestsPerSec(mBeanServerConnection, Optional.empty())); - } - } - - }); - } - } catch (Exception e) { - LOG.error("Get jmxHosts error!" + e.getMessage()); - - } - JSONObject response = new JSONObject(); - Set keys = result.keySet(); - for (String key : keys) { - response.put(key, new JSONObject(new FormatedMeterMetric(result.get(key)))); - } - return response.toString(); - } - - protected MeterMetric merge(MeterMetric old, MeterMetric newOne) { - return new MeterMetric(old.getCount() + newOne.getCount(), old.getMeanRate() + newOne.getMeanRate(), - old.getOneMinuteRate() + newOne.getOneMinuteRate(), - old.getFiveMinuteRate() + newOne.getFiveMinuteRate(), - old.getFifteenMinuteRate() + newOne.getFifteenMinuteRate()); - } - - @RequestMapping(value = "/brokerTopicMetrics/broker/{bid}", method = RequestMethod.GET) - public String getBrokerTopicMetricsForBroker(@PathVariable String bid) { - JSONObject response = new JSONObject(); - try { - List jmxHosts = ZKUtils.getKafkaJMXHostsFromZookeeper(); - for (String jmxHost : jmxHosts) { - String[] jmxArr = jmxHost.split(":"); - if (bid.equals(jmxArr[0])) { - if ("-1".equals(jmxArr[2])) { - return response.toString(); - } - KafkaJMX kafkaJMX = new KafkaJMX(); - kafkaJMX.doWithConnection(jmxArr[1], Integer.parseInt(jmxArr[2]), Optional.of(""), Optional.of(""), - false, new JMXExecutor() { - - @Override - public void doWithConnection(MBeanServerConnection mBeanServerConnection) { - KafkaMetrics metrics = new KafkaMetrics(); - response.put("BytesInPerSec", new JSONObject(new FormatedMeterMetric( - metrics.getBytesInPerSec(mBeanServerConnection, Optional.empty())))); - response.put("BytesOutPerSec", new JSONObject(new FormatedMeterMetric( - metrics.getBytesOutPerSec(mBeanServerConnection, Optional.empty())))); - response.put("BytesRejectedPerSec", new JSONObject(new FormatedMeterMetric( - metrics.getBytesRejectedPerSec(mBeanServerConnection, Optional.empty())))); - response.put("FailedFetchRequestsPerSec", - new JSONObject(new FormatedMeterMetric(metrics.getFailedFetchRequestsPerSec( - mBeanServerConnection, Optional.empty())))); - response.put("FailedProduceRequestsPerSec", - new JSONObject( - new FormatedMeterMetric(metrics.getFailedProduceRequestsPerSec( - mBeanServerConnection, Optional.empty())))); - } - }); - } - } - } catch (Exception e) { - LOG.error("Get jmxHosts error!" + e.getMessage()); - - } - return response.toString(); - } - - @RequestMapping(value = "/brokerTopicMetrics/topic/{topic}", method = RequestMethod.GET) - public String getBrokerTopicMetrics(@PathVariable String topic) { - Map result = new HashMap(); - try { - List jmxHosts = ZKUtils.getKafkaJMXHostsFromZookeeper(); - for (String jmxHost : jmxHosts) { - String[] jmxArr = jmxHost.split(":"); - if ("-1".equals(jmxArr[2])) { - continue; - } - KafkaJMX kafkaJMX = new KafkaJMX(); - kafkaJMX.doWithConnection(jmxArr[1], Integer.parseInt(jmxArr[2]), Optional.of(""), Optional.of(""), - false, new JMXExecutor() { - - @Override - public void doWithConnection(MBeanServerConnection mBeanServerConnection) { - KafkaMetrics metrics = new KafkaMetrics(); - if (result.containsKey("BytesInPerSec")) { - result.put("BytesInPerSec", merge(result.get("BytesInPerSec"), - metrics.getBytesInPerSec(mBeanServerConnection, Optional.of(topic)))); - } else { - result.put("BytesInPerSec", - metrics.getBytesInPerSec(mBeanServerConnection, Optional.of(topic))); - } - - if (result.containsKey("BytesOutPerSec")) { - result.put("BytesOutPerSec", merge(result.get("BytesOutPerSec"), - metrics.getBytesOutPerSec(mBeanServerConnection, Optional.of(topic)))); - } else { - result.put("BytesOutPerSec", - metrics.getBytesOutPerSec(mBeanServerConnection, Optional.of(topic))); - } - - if (result.containsKey("BytesRejectedPerSec")) { - result.put("BytesRejectedPerSec", merge(result.get("BytesRejectedPerSec"), - metrics.getBytesRejectedPerSec(mBeanServerConnection, Optional.of(topic)))); - } else { - result.put("BytesRejectedPerSec", - metrics.getBytesRejectedPerSec(mBeanServerConnection, Optional.of(topic))); - } - - if (result.containsKey("FailedFetchRequestsPerSec")) { - result.put("FailedFetchRequestsPerSec", - merge(result.get("FailedFetchRequestsPerSec"), - metrics.getFailedFetchRequestsPerSec(mBeanServerConnection, - Optional.empty()))); - } else { - result.put("FailedFetchRequestsPerSec", metrics - .getFailedFetchRequestsPerSec(mBeanServerConnection, Optional.of(topic))); - } - if (result.containsKey("FailedProduceRequestsPerSec")) { - result.put("FailedProduceRequestsPerSec", - merge(result.get("FailedProduceRequestsPerSec"), - metrics.getFailedProduceRequestsPerSec(mBeanServerConnection, - Optional.empty()))); - } else { - result.put("FailedProduceRequestsPerSec", metrics - .getFailedProduceRequestsPerSec(mBeanServerConnection, Optional.of(topic))); - } - } - - }); - } - } catch (Exception e) { - LOG.error("Get jmxHosts error!" + e.getMessage()); - - } - JSONObject response = new JSONObject(); - Set keys = result.keySet(); - for (String key : keys) { - response.put(key, new JSONObject(new FormatedMeterMetric(result.get(key)))); - } - return response.toString(); - } + private static Logger LOG = LoggerFactory.getLogger(JMXMetricController.class); + + @RequestMapping(value = "/brokerTopicMetrics/brokers", method = RequestMethod.GET) + public String getBrokerTopicMetricsForBrokers() { + Map result = new HashMap(); + try { + List jmxHosts = ZKUtils.getKafkaJMXHostsFromZookeeper(); + for (String jmxHost : jmxHosts) { + String[] jmxArr = jmxHost.split(":"); + if ("-1".equals(jmxArr[2])) { + continue; + } + KafkaJMX kafkaJMX = new KafkaJMX(); + kafkaJMX.doWithConnection(jmxArr[1], Integer.parseInt(jmxArr[2]), Optional.of(""), Optional.of(""), false, new JMXExecutor() { + + @Override + public void doWithConnection(MBeanServerConnection mBeanServerConnection) { + KafkaMetrics metrics = new KafkaMetrics(); + if (result.containsKey("MessagesInPerSec")) { + result.put("MessagesInPerSec", + merge(result.get("MessagesInPerSec"), metrics.getMessagesInPerSec(mBeanServerConnection, Optional.empty()))); + } else { + result.put("MessagesInPerSec", metrics.getMessagesInPerSec(mBeanServerConnection, Optional.empty())); + } + + if (result.containsKey("BytesInPerSec")) { + result.put("BytesInPerSec", + merge(result.get("BytesInPerSec"), metrics.getBytesInPerSec(mBeanServerConnection, Optional.empty()))); + } else { + result.put("BytesInPerSec", metrics.getBytesInPerSec(mBeanServerConnection, Optional.empty())); + } + + if (result.containsKey("BytesOutPerSec")) { + result.put("BytesOutPerSec", + merge(result.get("BytesOutPerSec"), metrics.getBytesOutPerSec(mBeanServerConnection, Optional.empty()))); + } else { + result.put("BytesOutPerSec", metrics.getBytesOutPerSec(mBeanServerConnection, Optional.empty())); + } + + if (result.containsKey("BytesRejectedPerSec")) { + result.put("BytesRejectedPerSec", + merge(result.get("BytesRejectedPerSec"), metrics.getBytesRejectedPerSec(mBeanServerConnection, Optional.empty()))); + } else { + result.put("BytesRejectedPerSec", metrics.getBytesRejectedPerSec(mBeanServerConnection, Optional.empty())); + } + + if (result.containsKey("FailedFetchRequestsPerSec")) { + result.put("FailedFetchRequestsPerSec", merge(result.get("FailedFetchRequestsPerSec"), + metrics.getFailedFetchRequestsPerSec(mBeanServerConnection, Optional.empty()))); + } else { + result.put("FailedFetchRequestsPerSec", metrics.getFailedFetchRequestsPerSec(mBeanServerConnection, Optional.empty())); + } + + if (result.containsKey("FailedProduceRequestsPerSec")) { + result.put("FailedProduceRequestsPerSec", merge(result.get("FailedProduceRequestsPerSec"), + metrics.getFailedProduceRequestsPerSec(mBeanServerConnection, Optional.empty()))); + } else { + result.put("FailedProduceRequestsPerSec", metrics.getFailedProduceRequestsPerSec(mBeanServerConnection, Optional.empty())); + } + } + }); + } + } catch (Exception e) { + LOG.error("Get jmxHosts error!" + e.getMessage()); + } + JSONObject response = new JSONObject(); + Set keys = result.keySet(); + for (String key : keys) { + if ("MessagesInPerSec".equals(key)) { + response.put(key, new JSONObject(new FormatedMeterMetric(result.get(key), 0))); + } else { + response.put(key, new JSONObject(new FormatedMeterMetric(result.get(key)))); + } + } + return response.toString(); + } + + protected MeterMetric merge(MeterMetric old, MeterMetric newOne) { + return new MeterMetric(old.getCount() + newOne.getCount(), old.getMeanRate() + newOne.getMeanRate(), + old.getOneMinuteRate() + newOne.getOneMinuteRate(), old.getFiveMinuteRate() + newOne.getFiveMinuteRate(), + old.getFifteenMinuteRate() + newOne.getFifteenMinuteRate()); + } + + @RequestMapping(value = "/brokerTopicMetrics/broker/{bid}", method = RequestMethod.GET) + public String getBrokerTopicMetricsForBroker(@PathVariable String bid) { + JSONObject response = new JSONObject(); + try { + List jmxHosts = ZKUtils.getKafkaJMXHostsFromZookeeper(); + for (String jmxHost : jmxHosts) { + String[] jmxArr = jmxHost.split(":"); + if (bid.equals(jmxArr[0])) { + if ("-1".equals(jmxArr[2])) { + return response.toString(); + } + KafkaJMX kafkaJMX = new KafkaJMX(); + kafkaJMX.doWithConnection(jmxArr[1], Integer.parseInt(jmxArr[2]), Optional.of(""), Optional.of(""), false, new JMXExecutor() { + + @Override + public void doWithConnection(MBeanServerConnection mBeanServerConnection) { + KafkaMetrics metrics = new KafkaMetrics(); + response.put("MessagesInPerSec", + new JSONObject(new FormatedMeterMetric(metrics.getMessagesInPerSec(mBeanServerConnection, Optional.empty()), 0))); + response.put("BytesInPerSec", + new JSONObject(new FormatedMeterMetric(metrics.getBytesInPerSec(mBeanServerConnection, Optional.empty())))); + response.put("BytesOutPerSec", + new JSONObject(new FormatedMeterMetric(metrics.getBytesOutPerSec(mBeanServerConnection, Optional.empty())))); + response.put("BytesRejectedPerSec", + new JSONObject(new FormatedMeterMetric(metrics.getBytesRejectedPerSec(mBeanServerConnection, Optional.empty())))); + response.put("FailedFetchRequestsPerSec", + new JSONObject(new FormatedMeterMetric(metrics.getFailedFetchRequestsPerSec(mBeanServerConnection, Optional.empty())))); + response.put("FailedProduceRequestsPerSec", + new JSONObject(new FormatedMeterMetric(metrics.getFailedProduceRequestsPerSec(mBeanServerConnection, Optional.empty())))); + } + }); + } + } + } catch (Exception e) { + LOG.error("Get jmxHosts error!" + e.getMessage()); + + } + return response.toString(); + } + + @RequestMapping(value = "/brokerTopicMetrics/topic/{topic}", method = RequestMethod.GET) + public String getBrokerTopicMetrics(@PathVariable String topic) { + Map result = new HashMap(); + try { + List jmxHosts = ZKUtils.getKafkaJMXHostsFromZookeeper(); + for (String jmxHost : jmxHosts) { + String[] jmxArr = jmxHost.split(":"); + if ("-1".equals(jmxArr[2])) { + continue; + } + KafkaJMX kafkaJMX = new KafkaJMX(); + kafkaJMX.doWithConnection(jmxArr[1], Integer.parseInt(jmxArr[2]), Optional.of(""), Optional.of(""), false, new JMXExecutor() { + + @Override + public void doWithConnection(MBeanServerConnection mBeanServerConnection) { + KafkaMetrics metrics = new KafkaMetrics(); + if (result.containsKey("MessagesInPerSec")) { + result.put("MessagesInPerSec", + merge(result.get("MessagesInPerSec"), metrics.getMessagesInPerSec(mBeanServerConnection, Optional.of(topic)))); + } else { + result.put("MessagesInPerSec", metrics.getMessagesInPerSec(mBeanServerConnection, Optional.of(topic))); + } + if (result.containsKey("BytesInPerSec")) { + result.put("BytesInPerSec", + merge(result.get("BytesInPerSec"), metrics.getBytesInPerSec(mBeanServerConnection, Optional.of(topic)))); + } else { + result.put("BytesInPerSec", metrics.getBytesInPerSec(mBeanServerConnection, Optional.of(topic))); + } + + if (result.containsKey("BytesOutPerSec")) { + result.put("BytesOutPerSec", + merge(result.get("BytesOutPerSec"), metrics.getBytesOutPerSec(mBeanServerConnection, Optional.of(topic)))); + } else { + result.put("BytesOutPerSec", metrics.getBytesOutPerSec(mBeanServerConnection, Optional.of(topic))); + } + + if (result.containsKey("BytesRejectedPerSec")) { + result.put("BytesRejectedPerSec", + merge(result.get("BytesRejectedPerSec"), metrics.getBytesRejectedPerSec(mBeanServerConnection, Optional.of(topic)))); + } else { + result.put("BytesRejectedPerSec", metrics.getBytesRejectedPerSec(mBeanServerConnection, Optional.of(topic))); + } + + if (result.containsKey("FailedFetchRequestsPerSec")) { + result.put("FailedFetchRequestsPerSec", merge(result.get("FailedFetchRequestsPerSec"), + metrics.getFailedFetchRequestsPerSec(mBeanServerConnection, Optional.empty()))); + } else { + result.put("FailedFetchRequestsPerSec", metrics.getFailedFetchRequestsPerSec(mBeanServerConnection, Optional.of(topic))); + } + if (result.containsKey("FailedProduceRequestsPerSec")) { + result.put("FailedProduceRequestsPerSec", merge(result.get("FailedProduceRequestsPerSec"), + metrics.getFailedProduceRequestsPerSec(mBeanServerConnection, Optional.empty()))); + } else { + result.put("FailedProduceRequestsPerSec", metrics.getFailedProduceRequestsPerSec(mBeanServerConnection, Optional.of(topic))); + } + } + + }); + } + } catch (Exception e) { + LOG.error("Get jmxHosts error!" + e.getMessage()); + + } + JSONObject response = new JSONObject(); + Set keys = result.keySet(); + for (String key : keys) { + if ("MessagesInPerSec".equals(key)) { + response.put(key, new JSONObject(new FormatedMeterMetric(result.get(key), 0))); + } else { + response.put(key, new JSONObject(new FormatedMeterMetric(result.get(key)))); + } + } + return response.toString(); + } } diff --git a/src/main/java/com/chickling/kmonitor/core/jmx/FormatedMeterMetric.java b/src/main/java/com/chickling/kmonitor/core/jmx/FormatedMeterMetric.java deleted file mode 100644 index 2f064d6..0000000 --- a/src/main/java/com/chickling/kmonitor/core/jmx/FormatedMeterMetric.java +++ /dev/null @@ -1,78 +0,0 @@ -package com.chickling.kmonitor.core.jmx; - -import com.chickling.kmonitor.utils.MetricUtils; - -/** - * @author Hulva Luva.H from ECBD - * @date 2017年7月20日 - * @description - * - */ -public class FormatedMeterMetric { - private Long count; - private String meanRate; - private String oneMinuteRate; - private String fiveMinuteRate; - private String fifteenMinuteRate; - - public FormatedMeterMetric() { - super(); - } - - public FormatedMeterMetric(MeterMetric metric) { - this(metric.getCount(), MetricUtils.sizeFormat(metric.getMeanRate()), - MetricUtils.sizeFormat(metric.getOneMinuteRate()), MetricUtils.sizeFormat(metric.getFiveMinuteRate()), - MetricUtils.sizeFormat(metric.getFifteenMinuteRate())); - } - - public FormatedMeterMetric(Long count, String meanRate, String oneMinuteRate, String fiveMinuteRate, - String fifteenMinuteRate) { - super(); - this.count = count; - this.meanRate = meanRate; - this.oneMinuteRate = oneMinuteRate; - this.fiveMinuteRate = fiveMinuteRate; - this.fifteenMinuteRate = fifteenMinuteRate; - } - - public Long getCount() { - return count; - } - - public void setCount(Long count) { - this.count = count; - } - - public String getFifteenMinuteRate() { - return fifteenMinuteRate; - } - - public void setFifteenMinuteRate(String fifteenMinuteRate) { - this.fifteenMinuteRate = fifteenMinuteRate; - } - - public String getFiveMinuteRate() { - return fiveMinuteRate; - } - - public void setFiveMinuteRate(String fiveMinuteRate) { - this.fiveMinuteRate = fiveMinuteRate; - } - - public String getOneMinuteRate() { - return oneMinuteRate; - } - - public void setOneMinuteRate(String oneMinuteRate) { - this.oneMinuteRate = oneMinuteRate; - } - - public String getMeanRate() { - return meanRate; - } - - public void setMeanRate(String meanRate) { - this.meanRate = meanRate; - } - -} diff --git a/src/main/java/com/chickling/kmonitor/core/jmx/KafkaMetrics.java b/src/main/java/com/chickling/kmonitor/core/jmx/KafkaMetrics.java deleted file mode 100644 index 6eb8115..0000000 --- a/src/main/java/com/chickling/kmonitor/core/jmx/KafkaMetrics.java +++ /dev/null @@ -1,147 +0,0 @@ -package com.chickling.kmonitor.core.jmx; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -import javax.management.Attribute; -import javax.management.AttributeList; -import javax.management.MBeanServerConnection; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.chickling.kmonitor.core.jmx.metric.OSMetric; - -/** - * @author Hulva Luva.H - * @since 2017-07-11 - * - */ -public class KafkaMetrics { - private static Logger LOG = LoggerFactory.getLogger(KafkaMetrics.class); - - // kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower} - - public static final Map objectNames = new HashMap(); - public static final String replicaFetcherManagerMinFetchRate = "replicaFetcherManagerMinFetchRate"; - public static final String replicaFetcherManagerMaxLag = "replicaFetcherManagerMaxLag "; - public static final String kafkaControllerActiveControllerCount = "kafkaControllerActiveControllerCount "; - public static final String kafkaControllerOfflinePartitionsCount = "kafkaControllerOfflinePartitionsCount "; - public static final String logFlushStats = "logFlushStats "; - public static final String operatingSystemObjectName = "operatingSystemObjectName "; - public static final String logSegmentObjectName = "logSegmentObjectName "; - public static final String directoryObjectName = "directoryObjectName "; - - static { - try { - objectNames.put(replicaFetcherManagerMinFetchRate, - new ObjectName("kafka.server:type=ReplicaFetcherManager,name=MinFetchRate,clientId=Replica")); - objectNames.put(replicaFetcherManagerMaxLag, - new ObjectName("kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica")); - objectNames.put(kafkaControllerActiveControllerCount, - new ObjectName("kafka.controller:type=KafkaController,name=ActiveControllerCount")); - objectNames.put(kafkaControllerOfflinePartitionsCount, - new ObjectName("kafka.controller:type=KafkaController,name=OfflinePartitionsCount")); - objectNames.put(logFlushStats, new ObjectName("kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs")); - objectNames.put(operatingSystemObjectName, new ObjectName("java.lang:type=OperatingSystem")); - objectNames.put(logSegmentObjectName, new ObjectName("kafka.log:type=Log,name=*-LogSegments")); - objectNames.put(directoryObjectName, new ObjectName("kafka.log:type=Log,name=*-Directory")); - } catch (MalformedObjectNameException e) { - LOG.error(e.getMessage()); - } - } - - public MeterMetric getBytesInPerSec(MBeanServerConnection mbsc, Optional topicName) { - return getBrokerTopicMetrics(mbsc, "BytesInPerSec", topicName); - } - - public MeterMetric getBytesOutPerSec(MBeanServerConnection mbsc, Optional topicName) { - return getBrokerTopicMetrics(mbsc, "BytesOutPerSec", topicName); - } - - public MeterMetric getBytesRejectedPerSec(MBeanServerConnection mbsc, Optional topicName) { - return getBrokerTopicMetrics(mbsc, "BytesRejectedPerSec", topicName); - } - - public MeterMetric getFailedFetchRequestsPerSec(MBeanServerConnection mbsc, Optional topicName) { - return getBrokerTopicMetrics(mbsc, "FailedFetchRequestsPerSec", topicName); - } - - public MeterMetric getFailedProduceRequestsPerSec(MBeanServerConnection mbsc, Optional topicName) { - return getBrokerTopicMetrics(mbsc, "FailedProduceRequestsPerSec", topicName); - } - - public MeterMetric getMessagesInPerSec(MBeanServerConnection mbsc, Optional topicName) { - return getBrokerTopicMetrics(mbsc, "MessagesInPerSec", topicName); - } - - private MeterMetric getBrokerTopicMetrics(MBeanServerConnection mbsc, String metricName, - Optional topicName) { - return getMeterMetric(mbsc, getObjectName(metricName, topicName)); - } - - private MeterMetric getMeterMetric(MBeanServerConnection mbsc, ObjectName objectName) { - String[] attributes = { "Count", "MeanRate", "OneMinuteRate", "FiveMinuteRate", "FifteenMinuteRate" }; - AttributeList attributeList = null; - try { - attributeList = mbsc.getAttributes(objectName, attributes); - } catch (Exception e) { - LOG.warn("getMeterMetric failed! " + e.getMessage()); - return new MeterMetric(0L, 0D, 0D, 0D, 0D); - } - return new MeterMetric(getLongValue(attributeList, attributes[0]), getDoubleValue(attributeList, attributes[1]), - getDoubleValue(attributeList, attributes[2]), getDoubleValue(attributeList, attributes[3]), - getDoubleValue(attributeList, attributes[4])); - } - - private ObjectName getObjectName(String metricName, Optional topicName) { - ObjectName objectName = null; - try { - if (topicName.isPresent()) { - objectName = new ObjectName( - "kafka.server:type=BrokerTopicMetrics,name=" + metricName + ",topic=" + topicName.get()); - } else { - objectName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=" + metricName); - } - } catch (MalformedObjectNameException e) { - LOG.error("Get ObjectName error! " + e.getMessage()); - } - return objectName; - } - - public OSMetric getOSMetric(MBeanServerConnection mbsc) { - String[] attributes = { "ProcessCpuLoad", "SystemCpuLoad" }; - AttributeList attributeList = null; - try { - attributeList = mbsc.getAttributes(objectNames.get(operatingSystemObjectName), attributes); - } catch (Exception e) { - LOG.warn("getOSMetric failed! " + e.getMessage()); - return new OSMetric(0D, 0D); - } - return new OSMetric(getDoubleValue(attributeList, attributes[0]), getDoubleValue(attributeList, attributes[0])); - } - - private Double getDoubleValue(AttributeList attributes, String name) { - List _attributes = attributes.asList(); - for (Attribute attr : _attributes) { - if (attr.getName().equalsIgnoreCase(name)) { - return (Double) attr.getValue(); - } - } - return 0D; - } - - private Long getLongValue(AttributeList attributes, String name) { - List _attributes = attributes.asList(); - for (Attribute attr : _attributes) { - if (attr.getName().equalsIgnoreCase(name)) { - return (Long) attr.getValue(); - } - } - return 0L; - } -} diff --git a/src/main/java/com/chickling/kmonitor/core/jmx/metric/OSMetric.java b/src/main/java/com/chickling/kmonitor/core/jmx/metric/OSMetric.java deleted file mode 100644 index 9c6ee4d..0000000 --- a/src/main/java/com/chickling/kmonitor/core/jmx/metric/OSMetric.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.chickling.kmonitor.core.jmx.metric; - -import com.chickling.kmonitor.utils.MetricUtils; - -/** - * @author Hulva Luva.H - * @since 2017-07-11 - * - */ -public class OSMetric { - - private Double processCpuLoad; - private Double systemCpuLoad; - - public OSMetric() { - super(); - } - - public OSMetric(Double processCpuLoad, Double systemCpuLoad) { - super(); - this.processCpuLoad = processCpuLoad; - this.systemCpuLoad = systemCpuLoad; - } - - public Double getProcessCpuLoad() { - return processCpuLoad; - } - - public void setProcessCpuLoad(Double processCpuLoad) { - this.processCpuLoad = processCpuLoad; - } - - public Double getSystemCpuLoad() { - return systemCpuLoad; - } - - public void setSystemCpuLoad(Double systemCpuLoad) { - this.systemCpuLoad = systemCpuLoad; - } - - public String formatedProcessCpuLoad() { - return MetricUtils.rateFormat(getProcessCpuLoad(), 0); - } - - public String formatedSystemCpuLoad() { - return MetricUtils.rateFormat(getSystemCpuLoad(), 0); - } - - @Override - public String toString() { - return "OSMetric [processCpuLoad=" + processCpuLoad + ", systemCpuLoad=" + systemCpuLoad + "]"; - } - -} diff --git a/src/main/java/com/chickling/kmonitor/email/EmailSender.java b/src/main/java/com/chickling/kmonitor/email/EmailSender.java index 2ce38cf..b076380 100644 --- a/src/main/java/com/chickling/kmonitor/email/EmailSender.java +++ b/src/main/java/com/chickling/kmonitor/email/EmailSender.java @@ -19,56 +19,56 @@ * */ public class EmailSender { - private static Logger LOG = LoggerFactory.getLogger(EmailSender.class); + private static Logger LOG = LoggerFactory.getLogger(EmailSender.class); - private static AppConfig config; + private static AppConfig config; - public static void setConfig(AppConfig _config) { - config = _config; - } + public static void setConfig(AppConfig _config) { + config = _config; + } - public static void sendEmail(String message, String sendTo, String group_topic) { - Properties properties = System.getProperties(); + public static void sendEmail(String message, String sendTo, String group_topic) { + Properties properties = System.getProperties(); - if (config.getSmtpAuth()) { - properties.setProperty("mail.user", config.getSmtpUser()); - properties.setProperty("mail.password", config.getSmtpPasswd()); - } - properties.setProperty("mail.smtp.host", config.getSmtpServer()); + if (config.getSmtpAuth()) { + properties.setProperty("mail.user", config.getSmtpUser()); + properties.setProperty("mail.password", config.getSmtpPasswd()); + } + properties.setProperty("mail.smtp.host", config.getSmtpServer()); - Session session = Session.getDefaultInstance(properties); + Session session = Session.getDefaultInstance(properties); - MimeMessage mimeMessage = new MimeMessage(session); + MimeMessage mimeMessage = new MimeMessage(session); - try { - String[] sendToArr = sendTo.split(";"); - mimeMessage.setFrom(new InternetAddress(config.getMailSender())); - if (sendToArr.length > 1) { - String cc = ""; - for (int i = 1; i < sendToArr.length; i++) { - cc += i == sendToArr.length - 1 ? sendToArr[i] : sendToArr[i] + ","; - } - mimeMessage.addRecipients(Message.RecipientType.CC, InternetAddress.parse(cc)); - } - mimeMessage.addRecipient(Message.RecipientType.TO, new InternetAddress(sendToArr[0])); - - String[] group_topicArr = group_topic.split("_"); - String subject = config.getMailSubject(); - if(subject.contains("{group}")) { - subject.replace("{group}", group_topicArr[0]); - } - if(subject.contains("{topic}")) { - subject.replace("{topic}", group_topicArr[1]); - } - mimeMessage.setSubject(config.getMailSubject()); - mimeMessage.setSentDate(new Date()); - mimeMessage.setContent(message, "text/html"); + try { + String[] sendToArr = sendTo.split(";"); + mimeMessage.setFrom(new InternetAddress(config.getMailSender())); + if (sendToArr.length > 1) { + String cc = ""; + for (int i = 1; i < sendToArr.length; i++) { + cc += i == sendToArr.length - 1 ? sendToArr[i] : sendToArr[i] + ","; + } + mimeMessage.addRecipients(Message.RecipientType.CC, InternetAddress.parse(cc)); + } + mimeMessage.addRecipient(Message.RecipientType.TO, new InternetAddress(sendToArr[0])); - Transport.send(mimeMessage); - } catch ( + String[] group_topicArr = group_topic.split("_"); + String subject = config.getMailSubject(); + if (subject.contains("{group}")) { + subject = subject.replace("{group}", group_topicArr[0]); + } + if (subject.contains("{topic}")) { + subject = subject.replace("{topic}", group_topicArr[1]); + } + mimeMessage.setSubject(subject); + mimeMessage.setSentDate(new Date()); + mimeMessage.setContent(message, "text/html"); - Exception e) { - LOG.error("sendEmail faild!", e); - } - } + Transport.send(mimeMessage); + } catch ( + + Exception e) { + LOG.error("sendEmail faild!", e); + } + } } diff --git a/src/main/java/com/chickling/kmonitor/jmx/FormatedMeterMetric.java b/src/main/java/com/chickling/kmonitor/jmx/FormatedMeterMetric.java new file mode 100644 index 0000000..4cf349e --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/jmx/FormatedMeterMetric.java @@ -0,0 +1,82 @@ +package com.chickling.kmonitor.jmx; + +import com.chickling.kmonitor.utils.MetricUtils; + +/** + * @author Hulva Luva.H from ECBD + * @date 2017年7月20日 + * @description + * + */ +public class FormatedMeterMetric { + private Long count; + private String meanRate; + private String oneMinuteRate; + private String fiveMinuteRate; + private String fifteenMinuteRate; + + public FormatedMeterMetric() { + super(); + } + + public FormatedMeterMetric(MeterMetric metric) { + this(metric.getCount(), MetricUtils.sizeFormat(metric.getMeanRate()), MetricUtils.sizeFormat(metric.getOneMinuteRate()), + MetricUtils.sizeFormat(metric.getFiveMinuteRate()), MetricUtils.sizeFormat(metric.getFifteenMinuteRate())); + } + + public FormatedMeterMetric(MeterMetric metric, int interation) { + this(metric.getCount(), MetricUtils.rateFormat(metric.getMeanRate(), interation), + MetricUtils.rateFormat(metric.getOneMinuteRate(), interation), MetricUtils.rateFormat(metric.getFiveMinuteRate(), interation), + MetricUtils.rateFormat(metric.getFifteenMinuteRate(), interation)); + } + + public FormatedMeterMetric(Long count, String meanRate, String oneMinuteRate, String fiveMinuteRate, String fifteenMinuteRate) { + super(); + this.count = count; + this.meanRate = meanRate; + this.oneMinuteRate = oneMinuteRate; + this.fiveMinuteRate = fiveMinuteRate; + this.fifteenMinuteRate = fifteenMinuteRate; + } + + public Long getCount() { + return count; + } + + public void setCount(Long count) { + this.count = count; + } + + public String getFifteenMinuteRate() { + return fifteenMinuteRate; + } + + public void setFifteenMinuteRate(String fifteenMinuteRate) { + this.fifteenMinuteRate = fifteenMinuteRate; + } + + public String getFiveMinuteRate() { + return fiveMinuteRate; + } + + public void setFiveMinuteRate(String fiveMinuteRate) { + this.fiveMinuteRate = fiveMinuteRate; + } + + public String getOneMinuteRate() { + return oneMinuteRate; + } + + public void setOneMinuteRate(String oneMinuteRate) { + this.oneMinuteRate = oneMinuteRate; + } + + public String getMeanRate() { + return meanRate; + } + + public void setMeanRate(String meanRate) { + this.meanRate = meanRate; + } + +} diff --git a/src/main/java/com/chickling/kmonitor/core/jmx/JMXExecutor.java b/src/main/java/com/chickling/kmonitor/jmx/JMXExecutor.java similarity index 84% rename from src/main/java/com/chickling/kmonitor/core/jmx/JMXExecutor.java rename to src/main/java/com/chickling/kmonitor/jmx/JMXExecutor.java index 4b5e2bd..e338a40 100644 --- a/src/main/java/com/chickling/kmonitor/core/jmx/JMXExecutor.java +++ b/src/main/java/com/chickling/kmonitor/jmx/JMXExecutor.java @@ -1,4 +1,4 @@ -package com.chickling.kmonitor.core.jmx; +package com.chickling.kmonitor.jmx; import javax.management.MBeanServerConnection; diff --git a/src/main/java/com/chickling/kmonitor/core/jmx/KafkaJMX.java b/src/main/java/com/chickling/kmonitor/jmx/KafkaJMX.java similarity index 95% rename from src/main/java/com/chickling/kmonitor/core/jmx/KafkaJMX.java rename to src/main/java/com/chickling/kmonitor/jmx/KafkaJMX.java index a4d2575..56f25b5 100644 --- a/src/main/java/com/chickling/kmonitor/core/jmx/KafkaJMX.java +++ b/src/main/java/com/chickling/kmonitor/jmx/KafkaJMX.java @@ -1,4 +1,4 @@ -package com.chickling.kmonitor.core.jmx; +package com.chickling.kmonitor.jmx; import java.io.IOException; import java.util.HashMap; @@ -44,7 +44,7 @@ public void doWithConnection(String jmxHost, int jmxPort, Optional jmxUs JMXServiceURL url = new JMXServiceURL(urlStr); // authenticate Map env = new HashMap(); - String[] credentials = { jmxUser.get(), jmxPasswd.get() }; + String[] credentials = { jmxUser.orElse(""), jmxPasswd.orElse("") }; env.put(JMXConnector.CREDENTIALS, credentials); if (jmxSSL) { // com.sun.management.jmxremote.registry.ssl=true diff --git a/src/main/java/com/chickling/kmonitor/jmx/KafkaMetrics.java b/src/main/java/com/chickling/kmonitor/jmx/KafkaMetrics.java new file mode 100644 index 0000000..4d6b90d --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/jmx/KafkaMetrics.java @@ -0,0 +1,228 @@ +package com.chickling.kmonitor.jmx; + +import java.util.List; +import java.util.Optional; + +import javax.management.Attribute; +import javax.management.AttributeList; +import javax.management.MBeanServerConnection; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Hulva Luva.H + * @since 2017-07-11 + * + */ +public class KafkaMetrics { + private static Logger LOG = LoggerFactory.getLogger(KafkaMetrics.class); + + // kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower} + + // java.lang + public static final String OPRATING_SYSTEM = "java.lang:type=OperatingSystem"; + public static final String RUNTIME = "java.lang:type=Runtime"; + + // kafka.cluster + public static final String IN_SYNC_REPLICAS_COUNT = "kafka.cluster:type=Partition,name=InSyncReplicasCount,topic={},partition={}"; + public static final String REPLICAS_COUNT = "kafka.cluster:type=Partition,name=ReplicasCount,topic={},partition={}"; + public static final String UNDER_REPLICATED = "kafka.cluster:type=Partition,name=UnderReplicated,topic={},partition={}"; + + // kafka.controller + public static final String LEADER_ELECTION_RATE_AND_TIME_MS = "kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs"; + public static final String UNCLEAN_LEADER_ELECTIONS_PER_SEC = "kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec"; + public static final String ACTIVE_CONTROLLER_COUNT = "kafka.controller:type=KafkaController,name=ActiveControllerCount"; + public static final String OFFLINE_PARTITIONS_COUNT = "kafka.controller:type=KafkaController,name=OfflinePartitionsCount"; + public static final String PREFERED_REPLICA_IMBALANCE = "kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount"; + + // kafka.log + public static final String LOG_END_OFFSET = "kafka.log:type=Log,name=LogEndOffset,topic={},partition={}"; + public static final String LOG_START_OFFSET = "kafka.log:type=Log,name=LogStartOffset,topic={},partition={}"; + public static final String LOG_SEGMENTS_NUM = "kafka.log:type=Log,name=NumLogSegments,topic={},partition={}"; + public static final String LOG_SIZE = "kafka.log:type=Log,name=Size,topic={},partition={}"; + public static final String CLEANER_RECOPY_PERCENT = "kafka.log:type=LogCleaner,name=cleaner-recopy-percent"; + public static final String MAX_BUFFER_UTILIZATION_PERCENT = "kafka.log:type=LogCleaner,name=max-buffer-utilization-percent"; + public static final String MAX_CLEAN_TIME_SECS = "kafka.log:type=LogCleaner,name=max-clean-time-secs"; + public static final String MAX_DIRTY_PERCENT = "kafka.log:type=LogCleanerManager,name=max-dirty-percent"; + public static final String TIME_SINCE_LAST_RUN_CLEANER_MS = "kafka.log:type=LogCleanerManager,name=time-since-last-run-ms"; + + // kafka.network + public static final String PROCESSOR_IDLEPERCENT = "kafka.network:type=Processor,name=IdlePercent,networkProcessor={}"; + public static final String REQUEST_QUEUE_SIZE = "kafka.network:type=RequestChannel,name=RequestQueueSize"; + public static final String RESPONSE_QUEUE_SIZE = "kafka.network:type=RequestChannel,name=ResponseQueueSize"; + public static final String RESPONSE_QUEUE_SIZE_OF_PROCESSOR = "kafka.network:type=RequestChannel,name=ResponseQueueSize,processor={}"; + + public static final String REQUEST_METRICS = "kafka.network:type=RequestMetrics,name={},request= {}"; + public static final String REQUEST_METRICS_NAMES[] = {"LocalTimeMs", "RemoteTimeMs", "RequestQueueTimeMs", "RequestQueueTimeMs", + "RequestsPerSec", "ResponseQueueTimeMs", "ResponseSendTimeMs", "ThrottleTimeMs", "TotalTimeMs"}; + public static final String REQUEST_METRICS_REQUESTS[] = + {"ApiVersions", "ControlledShutdown", "CreateTopics", "DeleteTopics", "DescribeGroups", "Fetch", "FetchConsumer", "FetchFollower", + "GroupCoordinator", "Heartbeat", "JoinGroup", "LeaderAndIsr", "LeaveGroup", "ListGroups", "Metadata", "OffsetCommit", + "OffsetFetch", "Offsets", "Produce", "SaslHandshake", "StopReplica", "SyncGroup", "UpdateMetadata"}; + + public static final String NETWORK_PROCESSOR_AVG_IDLE_PERCENT = "kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent"; + + // kafka.server + public static final String BROKER_BYTES_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec"; + public static final String TOPIC_BYTES_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic={}"; + public static final String BROKER_BYTES_OUT_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec"; + public static final String TOPIC_BYTES_OUT_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic={}"; + public static final String BROKER_BYTES_REJECTERD_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec"; + public static final String TOPIC_BYTES_REJECTERD_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic={}"; + public static final String BROKER_FAILED_FETCH_REQUESTS_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec"; + public static final String TOPIC_FAILED_FETCH_REQUESTS_PER_SEC = + "kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec,topic={}"; + public static final String BROKER_FAILED_PRODUCE_REQUESTS_PER_SEC = + "kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec"; + public static final String TOPIC_FAILED_PRODUCE_REQUESTS_PER_SEC = + "kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec,topic={}"; + public static final String BROKER_MESSAGES_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec"; + public static final String TOPIC_MESSAGES_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic={}"; + public static final String TOTAL_FETCH_REQUESTS_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec"; + public static final String TOTAL_PRODUCE_REQUESTS_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec"; + + public static final String CONSUMER_EXPIRES_PER_SEC = "kafka.server:type=DelayedFetchMetrics,name=ExpiresPerSec,fetcherType=consumer"; + public static final String FOLLOWER_EXPIRES_PER_SEC = "kafka.server:type=DelayedFetchMetrics,name=ExpiresPerSec,fetcherType=follower"; + + public static final String DELAYED_OPERATIONS_FETCH_NUM = + "kafka.server:type=DelayedOperationPurgatory,name=NumDelayedOperations,delayedOperation=Fetch"; + public static final String DELAYED_OPERATIONS_HEARTBEAT_NUM = + "kafka.server:type=DelayedOperationPurgatory,name=NumDelayedOperations,delayedOperation=Heartbeat"; + public static final String DELAYED_OPERATIONS_PRODUCE_NUM = + "kafka.server:type=DelayedOperationPurgatory,name=NumDelayedOperations,delayedOperation=Produce"; + public static final String DELAYED_OPERATIONS_REBALANCE_NUM = + "kafka.server:type=DelayedOperationPurgatory,name=NumDelayedOperations,delayedOperation=Rebalance"; + public static final String DELAYED_OPERATIONS_TOPIC_NUM = + "kafka.server:type=DelayedOperationPurgatory,name=NumDelayedOperations,delayedOperation=topic"; + + public static final String DELAYED_OPERATIONS_FETCH_PURGATORY_SIZE = + "kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch"; + public static final String DELAYED_OPERATIONS_HEARTBEAT_PURGATORY_SIZE = + "kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Heartbeat"; + public static final String DELAYED_OPERATIONS_PRODUCE_PURGATORY_SIZE = + "kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce"; + public static final String DELAYED_OPERATIONS_REBALANCE_PURGATORY_SIZE = + "kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Rebalance"; + public static final String DELAYED_OPERATIONS_TOPIC_PURGATORY_SIZE = + "kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=topic"; + + public static final String FETCH_DELAY_QUEUE_SIZE = "kafka.server:type=Fetch"; + public static final String PRODUCE_DELAY_QUEUE_SIZE = "kafka.server:type=Produce"; + + public static final String CONSUMER_LAG = "kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId={},topic={},partition={}"; + + public static final String FETCHER_STATS_BYTES_PER_SEC = + "kafka.server:type=FetcherStats,name=BytesPerSec,clientId={},brokerHost={},brokerPort={}"; + public static final String FETCHER_STATS_REQUESTS_PER_SEC = + "kafka.server:type=FetcherStats,name=RequestsPerSec,clientId={},brokerHost={},brokerPort={}"; + + public static final String REQUEST_HANDLER_AVG_IDLE_PERCENT = + "kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent"; + + public static final String BROKER_STATE = "kafka.server:type=KafkaServer,name=BrokerState"; + public static final String CLUSTER_ID = "kafka.server:type=KafkaServer,name=ClusterId"; + + public static final String LEADER_REPLICATION = "kafka.server:type=LeaderReplication"; + + public static final String REPLICA_MAX_LAG = "kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica"; + public static final String REPLICA_MIN_FETCH_RATE = "kafka.server:type=ReplicaFetcherManager,name=MinFetchRate,clientId=Replica"; + public static final String REPLICA_ISR_EXPANDS_PER_SEC = "kafka.server:type=ReplicaManager,name=IsrExpandsPerSec"; + public static final String REPLICA_ISR_SHRINKS_PER_SEC = "kafka.server:type=ReplicaManager,name=IsrShrinksPerSec"; + public static final String REPLICA_LEADER_COUNT = "kafka.server:type=ReplicaManager,name=LeaderCount"; + public static final String REPLICA_PARTITION_COUNT = "kafka.server:type=ReplicaManager,name=PartitionCount"; + public static final String REPLICA_UNDERREPLICATED_PARTITIONS = "kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions"; + + // kafka.server:type=SessionExpireListener + public static final String ZK_AUTHFAILURES_PER_SEC = "kafka.server:type=SessionExpireListener,name=ZooKeeperAuthFailuresPerSec"; + public static final String ZK_DISCONNECTS_PER_SEC = "kafka.server:type=SessionExpireListener,name=ZooKeeperDisconnectsPerSec"; + public static final String ZK_EXPIRES_PER_SEC = "kafka.server:type=SessionExpireListener,name=ZooKeeperExpiresPerSec"; + public static final String ZK_RO_CONNECTS_PER_SEC = "kafka.server:type=SessionExpireListener,name=ZooKeeperReadOnlyConnectsPerSec"; + public static final String ZK_SASL_AUTHS_PER_SEC = "kafka.server:type=SessionExpireListener,name=ZooKeeperSaslAuthenticationsPerSec"; + public static final String ZK_SYNC_CONNECTIONS_PER_SEC = "kafka.server:type=SessionExpireListener,name=ZooKeeperSyncConnectsPerSec"; + + // app-info + public static final String APP_INFO = "kafka.server:type=app-info,id=0"; + + public static final String REPLICA_FETCHER_METRICS = "kafka.server:type=replica-fetcher-metrics,broker-id={},fetcher-id={}"; + public static final String SOCKET_SERVER_METRICS = "kafka.server:type=socket-server-metrics,networkProcessor={}"; + + + public MeterMetric getBytesInPerSec(MBeanServerConnection mbsc, Optional topicName) { + return getBrokerTopicMetrics(mbsc, "BytesInPerSec", topicName); + } + + public MeterMetric getBytesOutPerSec(MBeanServerConnection mbsc, Optional topicName) { + return getBrokerTopicMetrics(mbsc, "BytesOutPerSec", topicName); + } + + public MeterMetric getBytesRejectedPerSec(MBeanServerConnection mbsc, Optional topicName) { + return getBrokerTopicMetrics(mbsc, "BytesRejectedPerSec", topicName); + } + + public MeterMetric getFailedFetchRequestsPerSec(MBeanServerConnection mbsc, Optional topicName) { + return getBrokerTopicMetrics(mbsc, "FailedFetchRequestsPerSec", topicName); + } + + public MeterMetric getFailedProduceRequestsPerSec(MBeanServerConnection mbsc, Optional topicName) { + return getBrokerTopicMetrics(mbsc, "FailedProduceRequestsPerSec", topicName); + } + + public MeterMetric getMessagesInPerSec(MBeanServerConnection mbsc, Optional topicName) { + return getBrokerTopicMetrics(mbsc, "MessagesInPerSec", topicName); + } + + private MeterMetric getBrokerTopicMetrics(MBeanServerConnection mbsc, String metricName, Optional topicName) { + return getMeterMetric(mbsc, getObjectName(metricName, topicName)); + } + + private MeterMetric getMeterMetric(MBeanServerConnection mbsc, ObjectName objectName) { + String[] attributes = {"Count", "MeanRate", "OneMinuteRate", "FiveMinuteRate", "FifteenMinuteRate"}; + AttributeList attributeList = null; + try { + attributeList = mbsc.getAttributes(objectName, attributes); + } catch (Exception e) { + LOG.warn("getMeterMetric failed! " + e.getMessage()); + return new MeterMetric(0L, 0D, 0D, 0D, 0D); + } + return new MeterMetric(getLongValue(attributeList, attributes[0]), getDoubleValue(attributeList, attributes[1]), + getDoubleValue(attributeList, attributes[2]), getDoubleValue(attributeList, attributes[3]), + getDoubleValue(attributeList, attributes[4])); + } + + private ObjectName getObjectName(String metricName, Optional topicName) { + ObjectName objectName = null; + try { + if (topicName.isPresent()) { + objectName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=" + metricName + ",topic=" + topicName.get()); + } else { + objectName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=" + metricName); + } + } catch (MalformedObjectNameException e) { + LOG.error("Get ObjectName error! " + e.getMessage()); + } + return objectName; + } + + private Double getDoubleValue(AttributeList attributes, String name) { + List _attributes = attributes.asList(); + for (Attribute attr : _attributes) { + if (attr.getName().equalsIgnoreCase(name)) { + return (Double) attr.getValue(); + } + } + return 0D; + } + + private Long getLongValue(AttributeList attributes, String name) { + List _attributes = attributes.asList(); + for (Attribute attr : _attributes) { + if (attr.getName().equalsIgnoreCase(name)) { + return (Long) attr.getValue(); + } + } + return 0L; + } +} diff --git a/src/main/java/com/chickling/kmonitor/core/jmx/MeterMetric.java b/src/main/java/com/chickling/kmonitor/jmx/MeterMetric.java similarity index 97% rename from src/main/java/com/chickling/kmonitor/core/jmx/MeterMetric.java rename to src/main/java/com/chickling/kmonitor/jmx/MeterMetric.java index fa6cc80..1d8a194 100644 --- a/src/main/java/com/chickling/kmonitor/core/jmx/MeterMetric.java +++ b/src/main/java/com/chickling/kmonitor/jmx/MeterMetric.java @@ -1,4 +1,4 @@ -package com.chickling.kmonitor.core.jmx; +package com.chickling.kmonitor.jmx; /** * @author Hulva Luva.H diff --git a/src/main/java/com/chickling/kmonitor/jmx/MetricDataCollector.java b/src/main/java/com/chickling/kmonitor/jmx/MetricDataCollector.java new file mode 100644 index 0000000..6d39f74 --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/jmx/MetricDataCollector.java @@ -0,0 +1,10 @@ +package com.chickling.kmonitor.jmx; + +/** + * @author Hulva Luva.H + * @since 2017-07-23 + * + */ +public class MetricDataCollector { + +} diff --git a/src/main/java/com/chickling/kmonitor/jmx/ObjectNameHolder.java b/src/main/java/com/chickling/kmonitor/jmx/ObjectNameHolder.java new file mode 100644 index 0000000..452266b --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/jmx/ObjectNameHolder.java @@ -0,0 +1,103 @@ +package com.chickling.kmonitor.jmx; + +import java.util.Map; + +/** + * @author Hulva Luva.H + * @since 2017-7-22 + * + */ +public class ObjectNameHolder { + private String metric; + private String type; + private String name; + /** + * + * kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=test -> extra<"topic=test", JSONObject.NULL> + * kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=test,partition=p1 -> extra<"topic=test", <"partition=p1", JSONObject.NULL>> + * + */ + private Map extra; + + public ObjectNameHolder() { + super(); + } + + public ObjectNameHolder(String metric, String type, String name, Map extra) { + super(); + this.metric = metric; + this.type = type; + this.name = name; + this.extra = extra; + } + + public String getMetric() { + return metric; + } + + public void setMetric(String metric) { + this.metric = metric; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Map getExtra() { + return extra; + } + + public void setExtra(Map extra) { + this.extra = extra; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((metric == null) ? 0 : metric.hashCode()); + result = prime * result + ((name == null) ? 0 : name.hashCode()); + result = prime * result + ((type == null) ? 0 : type.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ObjectNameHolder other = (ObjectNameHolder) obj; + if (metric == null) { + if (other.metric != null) + return false; + } else if (!metric.equals(other.metric)) + return false; + if (name == null) { + if (other.name != null) + return false; + } else if (!name.equals(other.name)) + return false; + if (type == null) { + if (other.type != null) + return false; + } else if (!type.equals(other.type)) + return false; + return true; + } + +} diff --git a/src/main/java/com/chickling/kmonitor/jmx/ObjectNameManager.java b/src/main/java/com/chickling/kmonitor/jmx/ObjectNameManager.java new file mode 100644 index 0000000..6aa9cb7 --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/jmx/ObjectNameManager.java @@ -0,0 +1,162 @@ +package com.chickling.kmonitor.jmx; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import javax.management.MBeanServerConnection; +import javax.management.ObjectInstance; + +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.chickling.kmonitor.utils.ZKUtils; + +/** + * + * @author Hulva Luva.H + * @since 2017-07-23 + * + */ +public class ObjectNameManager { + private static Logger LOG = LoggerFactory.getLogger(ObjectNameManager.class); + + private static boolean excludeInternalTopic = true; // like __consumer_offsets + + private static Map objectNames = new HashMap(); + // private static + + private static KafkaJMX kafkaJMX = null; + + private static ObjectNameManager objectNameManager = null; + + public static ObjectNameManager getInstance() { + return objectNameManager == null ? new ObjectNameManager() : objectNameManager; + } + + private ObjectNameManager() { + initObjectNames(); + } + + public Map getObjectNames() { + return objectNames; + } + + public void refreshObjectNames() { + initObjectNames(); + } + + private void initObjectNames() { + try { + if (kafkaJMX == null) { + kafkaJMX = new KafkaJMX(); + } + List jmxHosts = ZKUtils.getKafkaJMXHostsFromZookeeper(); + for (String jmxHost : jmxHosts) { + String[] jmxArr = jmxHost.split(":"); + if ("-1".equals(jmxArr[2])) { + continue; + } + kafkaJMX.doWithConnection(jmxArr[1], Integer.parseInt(jmxArr[2]), Optional.of(""), Optional.of(""), false, new JMXExecutor() { + + @SuppressWarnings("unchecked") + @Override + public void doWithConnection(MBeanServerConnection mbsc) { + try { + Set beans = mbsc.queryMBeans(null, null); + + beans.forEach(bean -> { + ObjectNameHolder objectNameHolder = new ObjectNameHolder(); + String objectName = bean.getObjectName().toString(); + if (excludeInternalTopic && !objectName.contains("__consumer_offsets")) { + String[] metric_other = objectName.split(":"); + objectNameHolder.setMetric(metric_other[0]); + String[] type_name_other = metric_other[1].split(","); + String firstLevelK = null; + for (int i = 0; i < type_name_other.length; i++) { + String[] tempArr = type_name_other[i].split("="); + if ("type".equalsIgnoreCase(tempArr[0])) { + objectNameHolder.setType(tempArr[1]); + } else if ("name".equalsIgnoreCase(tempArr[0])) { + objectNameHolder.setName(tempArr[1]); + } else { + String key = objectNameHolder.getName() == null ? metric_other[0] + objectNameHolder.getType() + : metric_other[0] + objectNameHolder.getType() + objectNameHolder.getName(); + + if (objectNames.containsKey(key)) { + ObjectNameHolder objectNameHolderOld = objectNames.get(key); + Map extras = objectNameHolderOld.getExtra(); + if (extras == null) + extras = new HashMap(); + + if (extras.containsKey(firstLevelK)) { + String secondLevelK = tempArr[0] + "=" + tempArr[1]; + Map firstLevelInExtras = null; + if (extras.get(firstLevelK).equals(JSONObject.NULL)) { + firstLevelInExtras = new HashMap(); + firstLevelInExtras.put(secondLevelK, JSONObject.NULL); + extras.put(firstLevelK, firstLevelInExtras); + } + firstLevelInExtras = (Map) extras.get(firstLevelK); + + // has next level? + if (i == type_name_other.length - 1) + continue; + i++; + tempArr = type_name_other[i].split("="); + String thirdLevelK = tempArr[0] + "=" + tempArr[1]; + Map secondLevelInExtras = null; + if (firstLevelInExtras.get(secondLevelK).equals(JSONObject.NULL)) { + secondLevelInExtras = new HashMap(); + secondLevelInExtras.put(thirdLevelK, JSONObject.NULL); + firstLevelInExtras.put(secondLevelK, secondLevelInExtras); + } + secondLevelInExtras = (Map) firstLevelInExtras.get(secondLevelK); + + // has next level? + if (i == type_name_other.length - 1) + continue; + i++; + tempArr = type_name_other[i].split("="); + String fourthLevelK = tempArr[0] + "=" + tempArr[1]; + Map thirdLevelInExtras = null; + if (firstLevelInExtras.get(thirdLevelK).equals(JSONObject.NULL)) { + thirdLevelInExtras = new HashMap(); + thirdLevelInExtras.put(fourthLevelK, JSONObject.NULL); + secondLevelInExtras.put(thirdLevelK, thirdLevelInExtras); + } + // thirdLevelInExtras = (Map) secondLevelInExtras.get(thirdLevelK); + // TODO so far as i know, there's no more than two level + if (i < type_name_other.length - 1) + LOG.warn("Ops~ There's do have a objectName over thrid level! objectName -> " + objectName); + } else { + // TODO that Metric has same metric, type and name with an exits one + firstLevelK = tempArr[0] + "=" + tempArr[1]; + extras.put(firstLevelK, JSONObject.NULL); + } + } else { + firstLevelK = tempArr[0] + "=" + tempArr[1]; + Map extras = new HashMap(); + extras.put(firstLevelK, JSONObject.NULL); + objectNameHolder.setExtra(extras); + objectNames.put(key, objectNameHolder); + } + } + } + } + }); + } catch (Exception e) { + LOG.warn("Ops~ Get objectNames - " + e.getMessage()); + } + } + }); + } + } catch (Exception e) { + LOG.error("initObjectNames went wrong! " + e.getMessage()); + } + } + +} diff --git a/src/main/java/com/chickling/kmonitor/jmx/metrics/MetricType1.java b/src/main/java/com/chickling/kmonitor/jmx/metrics/MetricType1.java new file mode 100644 index 0000000..d449a78 --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/jmx/metrics/MetricType1.java @@ -0,0 +1,102 @@ +package com.chickling.kmonitor.jmx.metrics; + +/** + * + * @author Hulva Luva.H + * @since 2017-07-26 + * + */ +public class MetricType1 { + private long count; + private String eventType; + private double meanRate; + private double oneMinuteRate; + private double fiveMinuteRate; + private double fifteenMinuteRate; + private String rateUnit; + + public long getCount() { + return count; + } + + public void setCount(long count) { + this.count = count; + } + + public String getEventType() { + return eventType; + } + + public void setEventType(String eventType) { + this.eventType = eventType; + } + + public double getMeanRate() { + return meanRate; + } + + public void setMeanRate(double meanRate) { + this.meanRate = meanRate; + } + + public double getOneMinuteRate() { + return oneMinuteRate; + } + + public void setOneMinuteRate(double oneMinuteRate) { + this.oneMinuteRate = oneMinuteRate; + } + + public double getFiveMinuteRate() { + return fiveMinuteRate; + } + + public void setFiveMinuteRate(double fiveMinuteRate) { + this.fiveMinuteRate = fiveMinuteRate; + } + + public double getFifteenMinuteRate() { + return fifteenMinuteRate; + } + + public void setFifteenMinuteRate(double fifteenMinuteRate) { + this.fifteenMinuteRate = fifteenMinuteRate; + } + + public String getRateUnit() { + return rateUnit; + } + + public void setRateUnit(String rateUnit) { + this.rateUnit = rateUnit; + } + + @Override + public String toString() { + switch (this.eventType) { + case "bytes": + + break; + case "requests": + + break; + case "percent": + + break; + case "messages": + + break; + case "expands": // IsrExpandsPerSec + + break; + case "shrinks": // IsrShrinksPerSec + + break; + default: + + break; + } + return "MetricType1 [count=" + count + ", eventType=" + eventType + ", meanRate=" + meanRate + ", oneMinuteRate=" + oneMinuteRate + + ", fiveMinuteRate=" + fiveMinuteRate + ", fifteenMinuteRate=" + fifteenMinuteRate + ", rateUnit=" + rateUnit + "]"; + } +} diff --git a/src/main/java/com/chickling/kmonitor/jmx/metrics/MetricType2.java b/src/main/java/com/chickling/kmonitor/jmx/metrics/MetricType2.java new file mode 100644 index 0000000..b79511c --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/jmx/metrics/MetricType2.java @@ -0,0 +1,37 @@ +package com.chickling.kmonitor.jmx.metrics; + +/** + * @author Hulva Luva.H + * @since 2017-07-26 + * + */ +public class MetricType2 { + private String name; // like queue-size Value byte-rate count + private T value; + private String unit; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public T getValue() { + return value; + } + + public void setValue(T value) { + this.value = value; + } + + public String getUnit() { + return unit; + } + + public void setUnit(String unit) { + this.unit = unit; + } + +} diff --git a/src/main/java/com/chickling/kmonitor/jmx/metrics/MetricType3.java b/src/main/java/com/chickling/kmonitor/jmx/metrics/MetricType3.java new file mode 100644 index 0000000..28bb06a --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/jmx/metrics/MetricType3.java @@ -0,0 +1,139 @@ +package com.chickling.kmonitor.jmx.metrics; + +/** + * @author Hulva Luva.H + * @since 2017-07-26 + * + * kafka.server:type=replica-fetcher-metrics + * + * kafka.server:type=socket-server-metrics + */ +public class MetricType3 { + private double connectionCloseRate; + private double connectionCount; + private double connectionCreationRate; + private double incomingByteRate; + private double ioRatio; + private double idTimeNSavg; + private double ioWaitTimeNSavg; + private double networkIOrate; + private double outgoingByteRate; + private double requestRate; + private double requestSizeAvg; + private double requestSizeMax; + private double responseRate; + private double selectRate; + + public double getConnectionCloseRate() { + return connectionCloseRate; + } + + public void setConnectionCloseRate(double connectionCloseRate) { + this.connectionCloseRate = connectionCloseRate; + } + + public double getConnectionCount() { + return connectionCount; + } + + public void setConnectionCount(double connectionCount) { + this.connectionCount = connectionCount; + } + + public double getConnectionCreationRate() { + return connectionCreationRate; + } + + public void setConnectionCreationRate(double connectionCreationRate) { + this.connectionCreationRate = connectionCreationRate; + } + + public double getIncomingByteRate() { + return incomingByteRate; + } + + public void setIncomingByteRate(double incomingByteRate) { + this.incomingByteRate = incomingByteRate; + } + + public double getIoRatio() { + return ioRatio; + } + + public void setIoRatio(double ioRatio) { + this.ioRatio = ioRatio; + } + + public double getIdTimeNSavg() { + return idTimeNSavg; + } + + public void setIdTimeNSavg(double idTimeNSavg) { + this.idTimeNSavg = idTimeNSavg; + } + + public double getIoWaitTimeNSavg() { + return ioWaitTimeNSavg; + } + + public void setIoWaitTimeNSavg(double ioWaitTimeNSavg) { + this.ioWaitTimeNSavg = ioWaitTimeNSavg; + } + + public double getNetworkIOrate() { + return networkIOrate; + } + + public void setNetworkIOrate(double networkIOrate) { + this.networkIOrate = networkIOrate; + } + + public double getOutgoingByteRate() { + return outgoingByteRate; + } + + public void setOutgoingByteRate(double outgoingByteRate) { + this.outgoingByteRate = outgoingByteRate; + } + + public double getRequestRate() { + return requestRate; + } + + public void setRequestRate(double requestRate) { + this.requestRate = requestRate; + } + + public double getRequestSizeAvg() { + return requestSizeAvg; + } + + public void setRequestSizeAvg(double requestSizeAvg) { + this.requestSizeAvg = requestSizeAvg; + } + + public double getRequestSizeMax() { + return requestSizeMax; + } + + public void setRequestSizeMax(double requestSizeMax) { + this.requestSizeMax = requestSizeMax; + } + + public double getResponseRate() { + return responseRate; + } + + public void setResponseRate(double responseRate) { + this.responseRate = responseRate; + } + + public double getSelectRate() { + return selectRate; + } + + public void setSelectRate(double selectRate) { + this.selectRate = selectRate; + } + +} diff --git a/src/main/java/com/chickling/kmonitor/jmx/metrics/MetricType4.java b/src/main/java/com/chickling/kmonitor/jmx/metrics/MetricType4.java new file mode 100644 index 0000000..8d2acc2 --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/jmx/metrics/MetricType4.java @@ -0,0 +1,110 @@ +package com.chickling.kmonitor.jmx.metrics; + +/** + * @author Hulva Luva.H from ECBD + * @since 2017-07-26 + * + * RequestMetrics + */ +public class MetricType4 { + private long count; + private double max; + private double mean; + private double min; + private double stdDev; // 标准差 + private double p50thPercentile; + private double p75thPercentile; + private double p95thPercentile; + private double p98thPercentile; + private double p99thPercentile; + private double p999thPercentile; + + public long getCount() { + return count; + } + + public void setCount(long count) { + this.count = count; + } + + public double getMax() { + return max; + } + + public void setMax(double max) { + this.max = max; + } + + public double getMean() { + return mean; + } + + public void setMean(double mean) { + this.mean = mean; + } + + public double getMin() { + return min; + } + + public void setMin(double min) { + this.min = min; + } + + public double getStdDev() { + return stdDev; + } + + public void setStdDev(double stdDev) { + this.stdDev = stdDev; + } + + public double getP50thPercentile() { + return p50thPercentile; + } + + public void setP50thPercentile(double p50thPercentile) { + this.p50thPercentile = p50thPercentile; + } + + public double getP75thPercentile() { + return p75thPercentile; + } + + public void setP75thPercentile(double p75thPercentile) { + this.p75thPercentile = p75thPercentile; + } + + public double getP95thPercentile() { + return p95thPercentile; + } + + public void setP95thPercentile(double p95thPercentile) { + this.p95thPercentile = p95thPercentile; + } + + public double getP98thPercentile() { + return p98thPercentile; + } + + public void setP98thPercentile(double p98thPercentile) { + this.p98thPercentile = p98thPercentile; + } + + public double getP99thPercentile() { + return p99thPercentile; + } + + public void setP99thPercentile(double p99thPercentile) { + this.p99thPercentile = p99thPercentile; + } + + public double getP999thPercentile() { + return p999thPercentile; + } + + public void setP999thPercentile(double p999thPercentile) { + this.p999thPercentile = p999thPercentile; + } + +} diff --git a/src/main/java/com/chickling/kmonitor/jmx/metrics/OperatingSystem.java b/src/main/java/com/chickling/kmonitor/jmx/metrics/OperatingSystem.java new file mode 100644 index 0000000..12cb57f --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/jmx/metrics/OperatingSystem.java @@ -0,0 +1,160 @@ +package com.chickling.kmonitor.jmx.metrics; + +/** + * + * @author Hulva Luva.H + * @since 2017-07-26 + * + */ +public class OperatingSystem { + private String arch; + private String name; + private String version; + + private int availableProcessors; + + private int maxFileDescriptorCount; // TODO ? + private int openFileDescriptorCount; + + private double processCpuLoad; + private long processCpuTime; + private double systemCpuLoad; + private double systemLoadAverage; + + private long totalPhysicalMemorySize; + private long freePhysicalMemorySize; + private long committedVirtualMemorySize; + private long totalSwapSpaceSize; + private long freeSwapSpaceSize; + + public String getArch() { + return arch; + } + + public void setArch(String arch) { + this.arch = arch; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + public int getAvailableProcessors() { + return availableProcessors; + } + + public void setAvailableProcessors(int availableProcessors) { + this.availableProcessors = availableProcessors; + } + + public int getMaxFileDescriptorCount() { + return maxFileDescriptorCount; + } + + public void setMaxFileDescriptorCount(int maxFileDescriptorCount) { + this.maxFileDescriptorCount = maxFileDescriptorCount; + } + + public int getOpenFileDescriptorCount() { + return openFileDescriptorCount; + } + + public void setOpenFileDescriptorCount(int openFileDescriptorCount) { + this.openFileDescriptorCount = openFileDescriptorCount; + } + + public double getProcessCpuLoad() { + return processCpuLoad; + } + + public void setProcessCpuLoad(double processCpuLoad) { + this.processCpuLoad = processCpuLoad; + } + + public long getProcessCpuTime() { + return processCpuTime; + } + + public void setProcessCpuTime(long processCpuTime) { + this.processCpuTime = processCpuTime; + } + + public double getSystemCpuLoad() { + return systemCpuLoad; + } + + public void setSystemCpuLoad(double systemCpuLoad) { + this.systemCpuLoad = systemCpuLoad; + } + + public double getSystemLoadAverage() { + return systemLoadAverage; + } + + public void setSystemLoadAverage(double systemLoadAverage) { + this.systemLoadAverage = systemLoadAverage; + } + + public long getTotalPhysicalMemorySize() { + return totalPhysicalMemorySize; + } + + public void setTotalPhysicalMemorySize(long totalPhysicalMemorySize) { + this.totalPhysicalMemorySize = totalPhysicalMemorySize; + } + + public long getFreePhysicalMemorySize() { + return freePhysicalMemorySize; + } + + public void setFreePhysicalMemorySize(long freePhysicalMemorySize) { + this.freePhysicalMemorySize = freePhysicalMemorySize; + } + + public long getCommittedVirtualMemorySize() { + return committedVirtualMemorySize; + } + + public void setCommittedVirtualMemorySize(long committedVirtualMemorySize) { + this.committedVirtualMemorySize = committedVirtualMemorySize; + } + + public long getTotalSwapSpaceSize() { + return totalSwapSpaceSize; + } + + public void setTotalSwapSpaceSize(long totalSwapSpaceSize) { + this.totalSwapSpaceSize = totalSwapSpaceSize; + } + + public long getFreeSwapSpaceSize() { + return freeSwapSpaceSize; + } + + public void setFreeSwapSpaceSize(long freeSwapSpaceSize) { + this.freeSwapSpaceSize = freeSwapSpaceSize; + } + + @Override + public String toString() { + return "OperatingSystem [arch=" + arch + ", name=" + name + ", version=" + version + ", availableProcessors=" + availableProcessors + + ", maxFileDescriptorCount=" + maxFileDescriptorCount + ", openFileDescriptorCount=" + openFileDescriptorCount + + ", processCpuLoad=" + processCpuLoad + ", processCpuTime=" + processCpuTime + ", systemCpuLoad=" + systemCpuLoad + + ", systemLoadAverage=" + systemLoadAverage + ", totalPhysicalMemorySize=" + totalPhysicalMemorySize + ", freePhysicalMemorySize=" + + freePhysicalMemorySize + ", committedVirtualMemorySize=" + committedVirtualMemorySize + ", totalSwapSpaceSize=" + + totalSwapSpaceSize + ", freeSwapSpaceSize=" + freeSwapSpaceSize + "]"; + } + +} diff --git a/src/main/java/com/chickling/kmonitor/utils/elasticsearch/Ielasticsearch.java b/src/main/java/com/chickling/kmonitor/utils/elasticsearch/Ielasticsearch.java index 28a8f14..d4dc6e9 100644 --- a/src/main/java/com/chickling/kmonitor/utils/elasticsearch/Ielasticsearch.java +++ b/src/main/java/com/chickling/kmonitor/utils/elasticsearch/Ielasticsearch.java @@ -8,21 +8,20 @@ import com.chickling.kmonitor.model.OffsetPoints; /** - * @author Hulva Luva.H from ECBD - * @date 2017年7月19日 - * @description + * @author Hulva Luva.H + * @since 2017-7-19 * */ public interface Ielasticsearch { - void bulkIndex(JSONObject data, String docType, String indexPrefix); + void bulkIndex(JSONObject data, String docType, String indexPrefix); - List offsetHistory(String indexPrefix, String docType, String group, String topic); + List offsetHistory(String indexPrefix, String docType, String group, String topic); - List scrollsSearcher(OffsetHistoryQueryParams params, String docType, String indexPrefix); + List scrollsSearcher(OffsetHistoryQueryParams params, String docType, String indexPrefix); - boolean check(); + boolean check(); - void close(); + void close(); } diff --git a/src/main/java/com/chickling/kmonitor/utils/elasticsearch/javaapi/ElasticsearchJavaUtil.java b/src/main/java/com/chickling/kmonitor/utils/elasticsearch/javaapi/ElasticsearchJavaUtil.java index 303781f..1f33558 100644 --- a/src/main/java/com/chickling/kmonitor/utils/elasticsearch/javaapi/ElasticsearchJavaUtil.java +++ b/src/main/java/com/chickling/kmonitor/utils/elasticsearch/javaapi/ElasticsearchJavaUtil.java @@ -55,7 +55,7 @@ public ElasticsearchJavaUtil(String stringHosts) { } public boolean check() { - return client.connectedNodes().isEmpty(); + return !client.connectedNodes().isEmpty(); } private void initClient(String stringHosts) { diff --git a/src/main/java/com/chickling/kmonitor/utils/elasticsearch/restapi/ElasticsearchRESTUtil.java b/src/main/java/com/chickling/kmonitor/utils/elasticsearch/restapi/ElasticsearchRESTUtil.java index 3acd741..383cde3 100644 --- a/src/main/java/com/chickling/kmonitor/utils/elasticsearch/restapi/ElasticsearchRESTUtil.java +++ b/src/main/java/com/chickling/kmonitor/utils/elasticsearch/restapi/ElasticsearchRESTUtil.java @@ -35,342 +35,330 @@ * */ public class ElasticsearchRESTUtil implements Ielasticsearch { - private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchRESTUtil.class); - private static String RERST_HOST; - private static RestTemplate REST; - private static HttpComponentsClientHttpRequestFactory httpRequestFactory; - private static HttpHeaders headers; - - static { - httpRequestFactory = new HttpComponentsClientHttpRequestFactory(); - httpRequestFactory.setConnectTimeout(300000); // 5min - httpRequestFactory.setConnectionRequestTimeout(300000); - httpRequestFactory.setReadTimeout(300000); - REST = new RestTemplate(); - REST.setRequestFactory(httpRequestFactory); - headers = new HttpHeaders(); - headers.add("Content-Type", "application/json"); - headers.add("Accept", "*/*"); - } - - public ElasticsearchRESTUtil(String restHost) { - RERST_HOST = restHost; - } - - /** - * GET _cluster/health - * - * - * { - "cluster_name" : "testcluster", - "status" : "yellow", - "timed_out" : false, - "number_of_nodes" : 1, - "number_of_data_nodes" : 1, - "active_primary_shards" : 5, - "active_shards" : 5, - "relocating_shards" : 0, - "initializing_shards" : 0, - "unassigned_shards" : 5, - "delayed_unassigned_shards": 0, - "number_of_pending_tasks" : 0, - "number_of_in_flight_fetch": 0, - "task_max_waiting_in_queue_millis": 0, - "active_shards_percent_as_number": 50.0 - } - * - * - * @return - */ - public boolean check() { - - return true; - } - - public void bulkIndex(JSONObject data, String docType, String indexPrefix) { - StringBuilder bulkData = new StringBuilder(); - SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd"); - Date now = new Date(); - String indexSufix = sFormat.format(now); - - boolean hasData = false; - Iterator keys = data.keys(); - while (keys.hasNext()) { - hasData = true; - bulkData.append( - "{\"index\": {\"_index\":\"" + indexPrefix + indexSufix + "\",\"_type\":\"" + docType + "\"}}") - .append("\n"); - bulkData.append(data.getJSONObject(keys.next()).toString()).append("\n"); - } - if (!hasData) { - return; - } - ResponseEntity response = REST.exchange("http://" + RERST_HOST + "/_bulk", HttpMethod.POST, - new HttpEntity(bulkData.toString(), headers), String.class); - // TODO Do something with response? - response.getBody(); - } - - public List scrollsSearcher(OffsetHistoryQueryParams params, String docType, String indexPrefix) { - ExecutorService pool = Executors.newFixedThreadPool(SystemManager.DEFAULT_THREAD_POOL_SIZE, - new WorkerThreadFactory("OffsetHistoryQuery-RESTAPI")); - - List result = new ArrayList(); - - SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); - try { - String indexNameSearch = indexPrefix + "*"; - - String rangeFrom = getRangeFrom(params); - - List concernedTimestamp = dateHistogram(sFormat.parse(rangeFrom).getTime(), - Long.parseLong(params.getRangeto()), params.getInterval()); - - List>> futureList = new ArrayList>>(); - - ResponseEntity response = REST - .exchange("http://" + RERST_HOST + "/" + indexNameSearch + "/" + docType + "/_search?scroll=1m", - HttpMethod.POST, - new HttpEntity( - ScrollSearchTemplate.getScrollSearchBody(params.getTopic(), params.getGroup(), - rangeFrom, sFormat.format(new Date(Long.parseLong(params.getRangeto())))), - headers), - String.class); - - JSONObject searchResult = null; - while (true) { - searchResult = new JSONObject(response.getBody()); - final JSONArray searchHits = searchResult.getJSONObject("hits").getJSONArray("hits"); - if (searchHits.length() == 0) { - break; - } - try { - Future> future = null; - future = pool.submit(new GenerateOffsetHistoryDataset(searchHits, concernedTimestamp)); - futureList.add(future); - } catch (Exception e) { - LOG.warn("Ops...GenerateOffsetHistoryDataset went wrong! " + e.getMessage()); - } - - response = REST.exchange("http://" + RERST_HOST + "/_search/scroll", HttpMethod.POST, - new HttpEntity( - ScrollSearchTemplate.getScrollNextBody(searchResult.getString("_scroll_id")), headers), - String.class); - } - for (Future> future : futureList) { - try { - result.addAll(future.get()); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - LOG.error("Interrupted when get GenerateOffsetHistoryDataset in future...", e); - } catch (ExecutionException e) { - // TODO Auto-generated catch block - LOG.error("QAQ when get GenerateOffsetHistoryDataset in future...", e); - } - } - pool.shutdown(); - } catch ( - - Exception e) { - // TODO - LOG.error("Damn...", e); - } - return result; - } - - private List dateHistogram(long from, long to, String interval) { - switch (interval) { - case "1m": - return dateHistogram(from, to, 1 * 60 * 1000); - case "10m": - return dateHistogram(from, to, 10 * 60 * 1000); - case "30m": - return dateHistogram(from, to, 30 * 60 * 1000); - case "1h": - return dateHistogram(from, to, 1 * 60 * 60 * 1000); - case "1d": - return dateHistogram(from, to, 24 * 60 * 60 * 1000); - default: - return dateHistogram(from, to, 1 * 60 * 1000); - } - } - - private List dateHistogram(long from, long to, long interval) { - SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); - List histogramedTimestamp = new ArrayList(); - for (long i = from; i < to + interval; i = i + interval) { - histogramedTimestamp.add(sFormat.format(new Date(i))); - } - return histogramedTimestamp; - } - - private String getRangeFrom(OffsetHistoryQueryParams params) throws ParseException { - SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); - Calendar cal = Calendar.getInstance(); - cal.setTime(new Date(Long.parseLong(params.getRangeto()))); - switch (params.getRange()) { - case "1h": - cal.add(Calendar.HOUR, -1); - break; - case "8h": - cal.add(Calendar.HOUR, -8); - break; - case "16h": - cal.add(Calendar.HOUR, -16); - break; - case "1d": - cal.add(Calendar.DATE, -1); - break; - case "2d": - cal.add(Calendar.DATE, -2); - break; - case "1w": - cal.add(Calendar.WEEK_OF_MONTH, -1); - break; - default: - cal.add(Calendar.HOUR, -1); - break; - } - return sFormat.format(cal.getTime()); - } - - // private int calculatePageSize(LogsizeRequestParams params, String - // rangeFrom) throws ParseException { - // SimpleDateFormat sFormat = new - // SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); - // switch (params.getInterval()) { - // case "5m": - // return (int) ((Long.parseLong(params.getRangeTo()) - - // sFormat.parse(rangeFrom).getTime()) / (5 * 60 * 1000)); - // case "30m": - // return (int) ((Long.parseLong(params.getRangeTo()) - - // sFormat.parse(rangeFrom).getTime()) - // / (30 * 60 * 1000)); - // case "1h": - // return (int) ((Long.parseLong(params.getRangeTo()) - - // sFormat.parse(rangeFrom).getTime()) - // / (60 * 60 * 1000)); - // case "1d": - // return (int) ((Long.parseLong(params.getRangeTo()) - - // sFormat.parse(rangeFrom).getTime()) - // / (24 * 60 * 60 * 1000)); - // default: - // return (int) ((Long.parseLong(params.getRangeTo()) - - // sFormat.parse(rangeFrom).getTime()) - // / (60 * 60 * 1000)); - // } - // } - - public List offsetHistory(String indexPrefix, String docType, String group, String topic) { - ExecutorService pool = Executors.newFixedThreadPool(SystemManager.DEFAULT_THREAD_POOL_SIZE, - new WorkerThreadFactory("OffsetHistoryQuery-RESTAPI")); - - List result = new ArrayList(); - SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); - try { - String indexNameSearch = indexPrefix + "*"; - - Date now = new Date(); - Calendar cal = Calendar.getInstance(); - cal.setTime(now); - cal.set(Calendar.MILLISECOND, 0); - cal.set(Calendar.SECOND, 0); - cal.add(Calendar.HOUR, -1); - String rangeFrom = sFormat.format(cal.getTime()); - List>> futureList = new ArrayList>>(); - - ResponseEntity response = REST.exchange( - - "http://" + RERST_HOST + "/" + indexNameSearch + "/" + docType + "/_search?scroll=1m", - HttpMethod.POST, - new HttpEntity( - ScrollSearchTemplate.getScrollSearchBody(topic, group, rangeFrom, sFormat.format(now)), - headers), - String.class); - - JSONObject searchResult = null; - while (true) { - searchResult = new JSONObject(response.getBody()); - final JSONArray searchHits = searchResult.getJSONObject("hits").getJSONArray("hits"); - if (searchHits.length() == 0) { - break; - } - try { - Future> future = null; - future = pool.submit(new GenerateOffsetHistoryDataset(searchHits)); - futureList.add(future); - } catch (Exception e) { - LOG.warn("Ops...GenerateOffsetHistoryDataset went wrong! " + e.getMessage()); - } - - response = REST.exchange("http://" + RERST_HOST + "/_search/scroll", HttpMethod.POST, - new HttpEntity( - ScrollSearchTemplate.getScrollNextBody(searchResult.getString("_scroll_id")), headers), - String.class); - } - for (Future> future : futureList) { - try { - result.addAll(future.get()); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - LOG.error("Interrupted when get GenerateOffsetHistoryDataset in future...", e); - } catch (ExecutionException e) { - // TODO Auto-generated catch block - LOG.error("QAQ when get GenerateOffsetHistoryDataset in future...", e); - } - } - pool.shutdown(); - } catch (Exception e) { - pool.shutdown(); - // TODO - LOG.error("Damn...", e); - } - - return result; - } - - class GenerateOffsetHistoryDataset implements Callable> { - - private JSONArray searchHits; - List concernedTimestamp = null; - - public GenerateOffsetHistoryDataset(JSONArray searchHits) { - this.searchHits = searchHits; - } - - public GenerateOffsetHistoryDataset(JSONArray searchHitPart, List concernedTimestamp) { - this.searchHits = searchHitPart; - this.concernedTimestamp = concernedTimestamp; - } - - @Override - public List call() { - JSONObject source = null; - List datasets = new ArrayList(); - try { - for (int i = 0; i < searchHits.length(); i++) { - source = searchHits.getJSONObject(i).getJSONObject("_source"); - if (concernedTimestamp != null) { - if (!concernedTimestamp.contains(source.getString("date"))) { - continue; - } - } - Long offset = source.getLong("offset"); - Long logsize = source.getLong("logSize"); - - datasets.add(new OffsetPoints(source.getLong("timestamp"), source.getInt("partition"), - source.getString("owner"), offset, logsize)); - } - } catch (Exception e) { - LOG.error("GenerateOffsetHistoryDataset error! " + e.getMessage()); - } - return datasets; - } - - } - - @Override - public void close() { - - } + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchRESTUtil.class); + private static String RERST_HOST; + private static RestTemplate REST; + private static HttpComponentsClientHttpRequestFactory httpRequestFactory; + private static HttpHeaders headers; + + static { + httpRequestFactory = new HttpComponentsClientHttpRequestFactory(); + httpRequestFactory.setConnectTimeout(300000); // 5min + httpRequestFactory.setConnectionRequestTimeout(300000); + httpRequestFactory.setReadTimeout(300000); + REST = new RestTemplate(); + REST.setRequestFactory(httpRequestFactory); + headers = new HttpHeaders(); + headers.add("Content-Type", "application/json"); + headers.add("Accept", "*/*"); + } + + public ElasticsearchRESTUtil(String restHost) { + RERST_HOST = restHost; + } + + /** + * GET _cluster/health + * + * + * { + "cluster_name" : "testcluster", + "status" : "yellow", + "timed_out" : false, + "number_of_nodes" : 1, + "number_of_data_nodes" : 1, + "active_primary_shards" : 5, + "active_shards" : 5, + "relocating_shards" : 0, + "initializing_shards" : 0, + "unassigned_shards" : 5, + "delayed_unassigned_shards": 0, + "number_of_pending_tasks" : 0, + "number_of_in_flight_fetch": 0, + "task_max_waiting_in_queue_millis": 0, + "active_shards_percent_as_number": 50.0 + } + * + * + * @return + */ + public boolean check() { + + return true; + } + + public void bulkIndex(JSONObject data, String docType, String indexPrefix) { + StringBuilder bulkData = new StringBuilder(); + SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd"); + Date now = new Date(); + String indexSufix = sFormat.format(now); + + boolean hasData = false; + Iterator keys = data.keys(); + while (keys.hasNext()) { + hasData = true; + bulkData.append("{\"index\": {\"_index\":\"" + indexPrefix + indexSufix + "\",\"_type\":\"" + docType + "\"}}").append("\n"); + bulkData.append(data.getJSONObject(keys.next()).toString()).append("\n"); + } + if (!hasData) { + return; + } + ResponseEntity response = REST.exchange("http://" + RERST_HOST + "/_bulk", HttpMethod.POST, + new HttpEntity(bulkData.toString(), headers), String.class); + // TODO Do something with response? + response.getBody(); + } + + public List scrollsSearcher(OffsetHistoryQueryParams params, String docType, String indexPrefix) { + ExecutorService pool = + Executors.newFixedThreadPool(SystemManager.DEFAULT_THREAD_POOL_SIZE, new WorkerThreadFactory("OffsetHistoryQuery-RESTAPI")); + + List result = new ArrayList(); + + SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + try { + String indexNameSearch = indexPrefix + "*"; + + String rangeFrom = getRangeFrom(params); + + List concernedTimestamp = + dateHistogram(sFormat.parse(rangeFrom).getTime(), Long.parseLong(params.getRangeto()), params.getInterval()); + + List>> futureList = new ArrayList>>(); + + ResponseEntity response = REST.exchange("http://" + RERST_HOST + "/" + indexNameSearch + "/" + docType + "/_search?scroll=1m", + HttpMethod.POST, new HttpEntity(ScrollSearchTemplate.getScrollSearchBody(params.getTopic(), params.getGroup(), rangeFrom, + sFormat.format(new Date(Long.parseLong(params.getRangeto())))), headers), + String.class); + + JSONObject searchResult = null; + while (true) { + searchResult = new JSONObject(response.getBody()); + final JSONArray searchHits = searchResult.getJSONObject("hits").getJSONArray("hits"); + if (searchHits.length() == 0) { + break; + } + try { + Future> future = null; + future = pool.submit(new GenerateOffsetHistoryDataset(searchHits, concernedTimestamp)); + futureList.add(future); + } catch (Exception e) { + LOG.warn("Ops...GenerateOffsetHistoryDataset went wrong! " + e.getMessage()); + } + + response = REST.exchange("http://" + RERST_HOST + "/_search/scroll", HttpMethod.POST, + new HttpEntity(ScrollSearchTemplate.getScrollNextBody(searchResult.getString("_scroll_id")), headers), String.class); + } + for (Future> future : futureList) { + try { + result.addAll(future.get()); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + LOG.error("Interrupted when get GenerateOffsetHistoryDataset in future...", e); + } catch (ExecutionException e) { + // TODO Auto-generated catch block + LOG.error("QAQ when get GenerateOffsetHistoryDataset in future...", e); + } + } + pool.shutdown(); + } catch ( + + Exception e) { + // TODO + LOG.error("Damn...", e); + } + return result; + } + + private List dateHistogram(long from, long to, String interval) { + switch (interval) { + case "1m": + return dateHistogram(from, to, 1 * 60 * 1000); + case "10m": + return dateHistogram(from, to, 10 * 60 * 1000); + case "30m": + return dateHistogram(from, to, 30 * 60 * 1000); + case "1h": + return dateHistogram(from, to, 1 * 60 * 60 * 1000); + case "1d": + return dateHistogram(from, to, 24 * 60 * 60 * 1000); + default: + return dateHistogram(from, to, 1 * 60 * 1000); + } + } + + private List dateHistogram(long from, long to, long interval) { + SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + List histogramedTimestamp = new ArrayList(); + for (long i = from; i < to + interval; i = i + interval) { + histogramedTimestamp.add(sFormat.format(new Date(i))); + } + return histogramedTimestamp; + } + + private String getRangeFrom(OffsetHistoryQueryParams params) throws ParseException { + SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + Calendar cal = Calendar.getInstance(); + cal.setTime(new Date(Long.parseLong(params.getRangeto()))); + switch (params.getRange()) { + case "1h": + cal.add(Calendar.HOUR, -1); + break; + case "8h": + cal.add(Calendar.HOUR, -8); + break; + case "16h": + cal.add(Calendar.HOUR, -16); + break; + case "1d": + cal.add(Calendar.DATE, -1); + break; + case "2d": + cal.add(Calendar.DATE, -2); + break; + case "1w": + cal.add(Calendar.WEEK_OF_MONTH, -1); + break; + default: + cal.add(Calendar.HOUR, -1); + break; + } + return sFormat.format(cal.getTime()); + } + + // private int calculatePageSize(LogsizeRequestParams params, String + // rangeFrom) throws ParseException { + // SimpleDateFormat sFormat = new + // SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + // switch (params.getInterval()) { + // case "5m": + // return (int) ((Long.parseLong(params.getRangeTo()) - + // sFormat.parse(rangeFrom).getTime()) / (5 * 60 * 1000)); + // case "30m": + // return (int) ((Long.parseLong(params.getRangeTo()) - + // sFormat.parse(rangeFrom).getTime()) + // / (30 * 60 * 1000)); + // case "1h": + // return (int) ((Long.parseLong(params.getRangeTo()) - + // sFormat.parse(rangeFrom).getTime()) + // / (60 * 60 * 1000)); + // case "1d": + // return (int) ((Long.parseLong(params.getRangeTo()) - + // sFormat.parse(rangeFrom).getTime()) + // / (24 * 60 * 60 * 1000)); + // default: + // return (int) ((Long.parseLong(params.getRangeTo()) - + // sFormat.parse(rangeFrom).getTime()) + // / (60 * 60 * 1000)); + // } + // } + + public List offsetHistory(String indexPrefix, String docType, String group, String topic) { + ExecutorService pool = + Executors.newFixedThreadPool(SystemManager.DEFAULT_THREAD_POOL_SIZE, new WorkerThreadFactory("OffsetHistoryQuery-RESTAPI")); + + List result = new ArrayList(); + SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + try { + String indexNameSearch = indexPrefix + "*"; + + Date now = new Date(); + Calendar cal = Calendar.getInstance(); + cal.setTime(now); + cal.set(Calendar.MILLISECOND, 0); + cal.set(Calendar.SECOND, 0); + cal.add(Calendar.HOUR, -1); + String rangeFrom = sFormat.format(cal.getTime()); + List>> futureList = new ArrayList>>(); + + ResponseEntity response = REST.exchange( + + "http://" + RERST_HOST + "/" + indexNameSearch + "/" + docType + "/_search?scroll=1m", HttpMethod.POST, + new HttpEntity(ScrollSearchTemplate.getScrollSearchBody(topic, group, rangeFrom, sFormat.format(now)), headers), + String.class); + + JSONObject searchResult = null; + while (true) { + searchResult = new JSONObject(response.getBody()); + final JSONArray searchHits = searchResult.getJSONObject("hits").getJSONArray("hits"); + if (searchHits.length() == 0) { + break; + } + try { + Future> future = null; + future = pool.submit(new GenerateOffsetHistoryDataset(searchHits)); + futureList.add(future); + } catch (Exception e) { + LOG.warn("Ops...GenerateOffsetHistoryDataset went wrong! " + e.getMessage()); + } + + response = REST.exchange("http://" + RERST_HOST + "/_search/scroll", HttpMethod.POST, + new HttpEntity(ScrollSearchTemplate.getScrollNextBody(searchResult.getString("_scroll_id")), headers), String.class); + } + for (Future> future : futureList) { + try { + result.addAll(future.get()); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + LOG.error("Interrupted when get GenerateOffsetHistoryDataset in future...", e); + } catch (ExecutionException e) { + // TODO Auto-generated catch block + LOG.error("QAQ when get GenerateOffsetHistoryDataset in future...", e); + } + } + pool.shutdown(); + } catch (Exception e) { + pool.shutdown(); + // TODO + LOG.error("Damn...", e); + } + + return result; + } + + class GenerateOffsetHistoryDataset implements Callable> { + + private JSONArray searchHits; + List concernedTimestamp = null; + + public GenerateOffsetHistoryDataset(JSONArray searchHits) { + this.searchHits = searchHits; + } + + public GenerateOffsetHistoryDataset(JSONArray searchHitPart, List concernedTimestamp) { + this.searchHits = searchHitPart; + this.concernedTimestamp = concernedTimestamp; + } + + @Override + public List call() { + JSONObject source = null; + List datasets = new ArrayList(); + try { + for (int i = 0; i < searchHits.length(); i++) { + source = searchHits.getJSONObject(i).getJSONObject("_source"); + if (concernedTimestamp != null) { + if (!concernedTimestamp.contains(source.getString("date"))) { + continue; + } + } + Long offset = source.getLong("offset"); + Long logsize = source.getLong("logSize"); + + datasets + .add(new OffsetPoints(source.getLong("timestamp"), source.getInt("partition"), source.getString("owner"), offset, logsize)); + } + } catch (Exception e) { + LOG.error("GenerateOffsetHistoryDataset error! " + e.getMessage()); + } + return datasets; + } + + } + + @Override + public void close() { + + } + } diff --git a/src/main/resources/static/index.html b/src/main/resources/static/index.html index 1bb1c0f..0b4e4b3 100644 --- a/src/main/resources/static/index.html +++ b/src/main/resources/static/index.html @@ -26,6 +26,7 @@ + @@ -109,6 +110,9 @@ src="//cdnjs.cloudflare.com/ajax/libs/lodash.js/2.4.1/lodash.min.js"> + + + diff --git a/src/main/resources/static/scripts/controllers.js b/src/main/resources/static/scripts/controllers.js index 6a36c58..3e30dc8 100644 --- a/src/main/resources/static/scripts/controllers.js +++ b/src/main/resources/static/scripts/controllers.js @@ -169,8 +169,8 @@ angular.module('offsetapp.controllers', [ "offsetapp.services" ]) 'show.bs.modal', function(event) { var button = $(event.relatedTarget); - var topic = $routeParams.group; - var group = $routeParams.topic; + var group = $routeParams.group; + var topic = $routeParams.topic; var modal = $(this); modal.find('.modal-body #topic').val(topic); modal.find('.modal-body #group').val(group); @@ -356,7 +356,7 @@ angular.module('offsetapp.controllers', [ "offsetapp.services" ]) offsetinfo.newAlert('alerting/task', sendData, function(d) { $scope.tasks = d; - $('#taskModal').modal('hide'); + $('#taskDetailModal').modal('hide'); swal({ title : "Task updated!", type : "success", @@ -560,6 +560,14 @@ angular.module('offsetapp.controllers', [ "offsetapp.services" ]) } $scope.brokerTopicMetrics = d; }); + +// Morris.Donut({ +// element: 'donut-physicalMemory', +// data: [ +// {label: "FreePhysicalMemorySize", value: 12}, +// {label: "UsedPhysicalMemorySize", value: 88} +// ] +// }); // var options = { // axisY : { // type : Chartist.AutoScaleAxis, diff --git a/src/main/resources/static/views/broker.html b/src/main/resources/static/views/broker.html index a8f3ab0..4d7c387 100644 --- a/src/main/resources/static/views/broker.html +++ b/src/main/resources/static/views/broker.html @@ -4,71 +4,153 @@

-
-
-

Metrics

+
+
+
+

Metrics

+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
RateMean1 min5 min15 min
Messages in /sec{{brokerTopicMetrics.MessagesInPerSec.meanRate}}{{brokerTopicMetrics.MessagesInPerSec.oneMinuteRate}}{{brokerTopicMetrics.MessagesInPerSec.fiveMinuteRate}}{{brokerTopicMetrics.MessagesInPerSec.fifteenMinuteRate}}
Bytes in /sec{{brokerTopicMetrics.BytesInPerSec.meanRate}}{{brokerTopicMetrics.BytesInPerSec.oneMinuteRate}}{{brokerTopicMetrics.BytesInPerSec.fiveMinuteRate}}{{brokerTopicMetrics.BytesInPerSec.fifteenMinuteRate}}
Bytes out /sec{{brokerTopicMetrics.BytesOutPerSec.meanRate}}{{brokerTopicMetrics.BytesOutPerSec.oneMinuteRate}}{{brokerTopicMetrics.BytesOutPerSec.fiveMinuteRate}}{{brokerTopicMetrics.BytesOutPerSec.fifteenMinuteRate}}
Bytes rejected /sec{{brokerTopicMetrics.BytesRejectedPerSec.meanRate}}{{brokerTopicMetrics.BytesRejectedPerSec.oneMinuteRate}}{{brokerTopicMetrics.BytesRejectedPerSec.fiveMinuteRate}}{{brokerTopicMetrics.BytesRejectedPerSec.fifteenMinuteRate}}
Failed fetch request /sec{{brokerTopicMetrics.FailedFetchRequestsPerSec.meanRate}}{{brokerTopicMetrics.FailedFetchRequestsPerSec.oneMinuteRate}}{{brokerTopicMetrics.FailedFetchRequestsPerSec.fiveMinuteRate}}{{brokerTopicMetrics.FailedFetchRequestsPerSec.fifteenMinuteRate}}
Failed produce request /sec{{brokerTopicMetrics.FailedProduceRequestsPerSec.meanRate}}{{brokerTopicMetrics.FailedProduceRequestsPerSec.oneMinuteRate}}{{brokerTopicMetrics.FailedProduceRequestsPerSec.fiveMinuteRate}}{{brokerTopicMetrics.FailedProduceRequestsPerSec.fifteenMinuteRate}}
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
RateMean1 min5 min15 min
Bytes in{{brokerTopicMetrics.BytesInPerSec.meanRate}}{{brokerTopicMetrics.BytesInPerSec.oneMinuteRate}}{{brokerTopicMetrics.BytesInPerSec.fiveMinuteRate}}{{brokerTopicMetrics.BytesInPerSec.fifteenMinuteRate}}
Bytes out{{brokerTopicMetrics.BytesOutPerSec.meanRate}}{{brokerTopicMetrics.BytesOutPerSec.oneMinuteRate}}{{brokerTopicMetrics.BytesOutPerSec.fiveMinuteRate}}{{brokerTopicMetrics.BytesOutPerSec.fifteenMinuteRate}}
Bytes rejected{{brokerTopicMetrics.BytesRejectedPerSec.meanRate}}{{brokerTopicMetrics.BytesRejectedPerSec.oneMinuteRate}}{{brokerTopicMetrics.BytesRejectedPerSec.fiveMinuteRate}}{{brokerTopicMetrics.BytesRejectedPerSec.fifteenMinuteRate}}
Failed fetch request{{brokerTopicMetrics.FailedFetchRequestsPerSec.meanRate}}{{brokerTopicMetrics.FailedFetchRequestsPerSec.oneMinuteRate}}{{brokerTopicMetrics.FailedFetchRequestsPerSec.fiveMinuteRate}}{{brokerTopicMetrics.FailedFetchRequestsPerSec.fifteenMinuteRate}}
Failed produce request{{brokerTopicMetrics.FailedProduceRequestsPerSec.meanRate}}{{brokerTopicMetrics.FailedProduceRequestsPerSec.oneMinuteRate}}{{brokerTopicMetrics.FailedProduceRequestsPerSec.fiveMinuteRate}}{{brokerTopicMetrics.FailedProduceRequestsPerSec.fifteenMinuteRate}}
+ +
+ - - Bytes in - {{brokerTopicMetrics.BytesInPerSec.meanRate}} - {{brokerTopicMetrics.BytesInPerSec.oneMinuteRate}} - {{brokerTopicMetrics.BytesInPerSec.fiveMinuteRate}} - {{brokerTopicMetrics.BytesInPerSec.fifteenMinuteRate}} - - - Bytes out - {{brokerTopicMetrics.BytesOutPerSec.meanRate}} - {{brokerTopicMetrics.BytesOutPerSec.oneMinuteRate}} - {{brokerTopicMetrics.BytesOutPerSec.fiveMinuteRate}} - {{brokerTopicMetrics.BytesOutPerSec.fifteenMinuteRate}} - - - Bytes rejected - {{brokerTopicMetrics.BytesRejectedPerSec.meanRate}} - {{brokerTopicMetrics.BytesRejectedPerSec.oneMinuteRate}} - {{brokerTopicMetrics.BytesRejectedPerSec.fiveMinuteRate}} - {{brokerTopicMetrics.BytesRejectedPerSec.fifteenMinuteRate}} - - - Failed fetch request - {{brokerTopicMetrics.FailedFetchRequestsPerSec.meanRate}} - {{brokerTopicMetrics.FailedFetchRequestsPerSec.oneMinuteRate}} - {{brokerTopicMetrics.FailedFetchRequestsPerSec.fiveMinuteRate}} - {{brokerTopicMetrics.FailedFetchRequestsPerSec.fifteenMinuteRate}} - - - Failed produce request - {{brokerTopicMetrics.FailedProduceRequestsPerSec.meanRate}} - {{brokerTopicMetrics.FailedProduceRequestsPerSec.oneMinuteRate}} - {{brokerTopicMetrics.FailedProduceRequestsPerSec.fiveMinuteRate}} - {{brokerTopicMetrics.FailedProduceRequestsPerSec.fifteenMinuteRate}} - - - -
+ diff --git a/src/main/resources/static/views/topic-consumers.html b/src/main/resources/static/views/topic-consumers.html index 00a26ec..aae634c 100644 --- a/src/main/resources/static/views/topic-consumers.html +++ b/src/main/resources/static/views/topic-consumers.html @@ -19,43 +19,43 @@

Metrics

- + {{brokerTopicMetrics.MessagesInPerSec.meanRate}} + {{brokerTopicMetrics.MessagesInPerSec.oneMinuteRate}} + {{brokerTopicMetrics.MessagesInPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.MessagesInPerSec.fifteenMinuteRate}} + - Bytes in + Bytes in /sec {{brokerTopicMetrics.BytesInPerSec.meanRate}} {{brokerTopicMetrics.BytesInPerSec.oneMinuteRate}} {{brokerTopicMetrics.BytesInPerSec.fiveMinuteRate}} {{brokerTopicMetrics.BytesInPerSec.fifteenMinuteRate}} - Bytes out + Bytes out /sec {{brokerTopicMetrics.BytesOutPerSec.meanRate}} {{brokerTopicMetrics.BytesOutPerSec.oneMinuteRate}} {{brokerTopicMetrics.BytesOutPerSec.fiveMinuteRate}} {{brokerTopicMetrics.BytesOutPerSec.fifteenMinuteRate}} - Bytes rejected + Bytes rejected /sec {{brokerTopicMetrics.BytesRejectedPerSec.meanRate}} {{brokerTopicMetrics.BytesRejectedPerSec.oneMinuteRate}} {{brokerTopicMetrics.BytesRejectedPerSec.fiveMinuteRate}} {{brokerTopicMetrics.BytesRejectedPerSec.fifteenMinuteRate}} - Failed fetch request + Failed fetch request /sec {{brokerTopicMetrics.FailedFetchRequestsPerSec.meanRate}} {{brokerTopicMetrics.FailedFetchRequestsPerSec.oneMinuteRate}} {{brokerTopicMetrics.FailedFetchRequestsPerSec.fiveMinuteRate}} {{brokerTopicMetrics.FailedFetchRequestsPerSec.fifteenMinuteRate}} - Failed produce request + Failed produce request /sec {{brokerTopicMetrics.FailedProduceRequestsPerSec.meanRate}} {{brokerTopicMetrics.FailedProduceRequestsPerSec.oneMinuteRate}} {{brokerTopicMetrics.FailedProduceRequestsPerSec.fiveMinuteRate}} @@ -66,66 +66,65 @@

Metrics

Loading ...
-
-

Active Consumers

+
+

Active Consumers

- - - - - - - - - - - - - - -
name
{{c.name}}
Unable to find Active Consumers
+ + + + + + + + + + + + + + +
name
{{c.name}}
Unable to find Active Consumers
-

Active Consumers Offsets

-
-
-
-
Unable to - find Active Consumers
+

Active Consumers Offsets

+
+
+
+
Unable to find + Active Consumers
-
+
-

InActive Consumers

+

InActive Consumers

- - - - - - - - - - - - - - -
name
{{c.name}}
Unable to find Inactive Consumers
+ + + + + + + + + + + + + + +
name
{{c.name}}
Unable to find Inactive Consumers
-

Inactive Consumers Offsets

-
-
-
-
Unable to - find Inactive Consumers
+

Inactive Consumers Offsets

+
+
+
+
Unable to find + Inactive Consumers
-
+
-
\ No newline at end of file +
\ No newline at end of file diff --git a/src/main/resources/static/views/topic-detail.html b/src/main/resources/static/views/topic-detail.html index 61d278f..ca60c0e 100644 --- a/src/main/resources/static/views/topic-detail.html +++ b/src/main/resources/static/views/topic-detail.html @@ -19,43 +19,43 @@

Metrics

- + {{brokerTopicMetrics.MessagesInPerSec.meanRate}} + {{brokerTopicMetrics.MessagesInPerSec.oneMinuteRate}} + {{brokerTopicMetrics.MessagesInPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.MessagesInPerSec.fifteenMinuteRate}} + - Bytes in + Bytes in /sec {{brokerTopicMetrics.BytesInPerSec.meanRate}} {{brokerTopicMetrics.BytesInPerSec.oneMinuteRate}} {{brokerTopicMetrics.BytesInPerSec.fiveMinuteRate}} {{brokerTopicMetrics.BytesInPerSec.fifteenMinuteRate}} - Bytes out + Bytes out /sec {{brokerTopicMetrics.BytesOutPerSec.meanRate}} {{brokerTopicMetrics.BytesOutPerSec.oneMinuteRate}} {{brokerTopicMetrics.BytesOutPerSec.fiveMinuteRate}} {{brokerTopicMetrics.BytesOutPerSec.fifteenMinuteRate}} - Bytes rejected + Bytes rejected /sec {{brokerTopicMetrics.BytesRejectedPerSec.meanRate}} {{brokerTopicMetrics.BytesRejectedPerSec.oneMinuteRate}} {{brokerTopicMetrics.BytesRejectedPerSec.fiveMinuteRate}} {{brokerTopicMetrics.BytesRejectedPerSec.fifteenMinuteRate}} - Failed fetch request + Failed fetch request /sec {{brokerTopicMetrics.FailedFetchRequestsPerSec.meanRate}} {{brokerTopicMetrics.FailedFetchRequestsPerSec.oneMinuteRate}} {{brokerTopicMetrics.FailedFetchRequestsPerSec.fiveMinuteRate}} {{brokerTopicMetrics.FailedFetchRequestsPerSec.fifteenMinuteRate}} - Failed produce request + Failed produce request /sec {{brokerTopicMetrics.FailedProduceRequestsPerSec.meanRate}} {{brokerTopicMetrics.FailedProduceRequestsPerSec.oneMinuteRate}} {{brokerTopicMetrics.FailedProduceRequestsPerSec.fiveMinuteRate}} @@ -66,33 +66,32 @@

Metrics

Loading ...
-
-

Active Consumers

- - - - - - - - - - - - -
name
- {{c.name}}{{c.name}}
+
+

Active Consumers

+ + + + + + + + + + + + +
name
+ {{c.name}}{{c.name}}
- More details... -
+ More details... +
-
\ No newline at end of file +
\ No newline at end of file diff --git a/src/test/java/com/chickling/kmonitor/test/CollectionTest.java b/src/test/java/com/chickling/kmonitor/test/CollectionTest.java new file mode 100644 index 0000000..4081ae9 --- /dev/null +++ b/src/test/java/com/chickling/kmonitor/test/CollectionTest.java @@ -0,0 +1,38 @@ +package com.chickling.kmonitor.test; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.chickling.kmonitor.jmx.ObjectNameHolder; + +/** + * @author Hulva Luva.H from ECBD + * @date 2017年7月22日 + * @description + * + */ +public class CollectionTest { + + /** + * @param args + */ + public static void main(String[] args) { + Set objectNames = new HashSet(); + ObjectNameHolder onh = new ObjectNameHolder("metric", "type", "name", new HashMap()); + objectNames.add(onh); + System.out.println( + objectNames.contains(new ObjectNameHolder("metric", "type", "name", new HashMap()))); + Map ops = new HashMap(); + ops.put("Hello", "World"); + ObjectNameHolder onh1 = new ObjectNameHolder("metric", "type", "name", ops); + System.out.println(objectNames.contains(onh1)); + System.out.println(onh1.equals(onh)); + ObjectNameHolder onh2 = new ObjectNameHolder("metric", "type", "name", null); + System.out.println(onh2.equals(onh)); + ObjectNameHolder onh3 = new ObjectNameHolder("metric", "type", null, null); + System.out.println(onh2.equals(onh3)); + } + +} diff --git a/src/test/java/com/chickling/kmonitor/test/EsSearchTest.java b/src/test/java/com/chickling/kmonitor/test/EsSearchTest.java index 979dd2f..d8f6d7d 100644 --- a/src/test/java/com/chickling/kmonitor/test/EsSearchTest.java +++ b/src/test/java/com/chickling/kmonitor/test/EsSearchTest.java @@ -1,9 +1,11 @@ package com.chickling.kmonitor.test; -import java.util.List; +import java.text.SimpleDateFormat; +import java.util.Date; -import com.chickling.kmonitor.model.OffsetPoints; -import com.chickling.kmonitor.utils.elasticsearch.javaapi.ElasticsearchJavaUtil; +import org.json.JSONObject; + +import com.chickling.kmonitor.utils.elasticsearch.restapi.ElasticsearchRESTUtil; /** * @@ -12,11 +14,25 @@ */ public class EsSearchTest { - public static void main(String[] args) { - ElasticsearchJavaUtil es = new ElasticsearchJavaUtil("10.16.238.82:9300,10.16.238.83:9300,10.16.238.84:9300"); - List result = es.offsetHistory("logx_healthcheck_test", "kafkaoffset", "testkafka", "EC2_Test"); - - System.out.println(result); - } + public static void main(String[] args) { + // ElasticsearchJavaUtil es = new + // ElasticsearchJavaUtil("10.16.238.82:9300,10.16.238.83:9300,10.16.238.84:9300"); + // List result = es.offsetHistory("logx_healthcheck_test", "kafkaoffset", "testkafka", + // "EC2_Test"); + // + // System.out.println(result); + ElasticsearchRESTUtil esUtil = new ElasticsearchRESTUtil("10.16.238.92:9200"); + JSONObject root = new JSONObject(); + JSONObject doc = null; + SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + for (int i = 0; i < 1000; i++) { + doc = new JSONObject(); + doc.put("hell", Math.random() * 1000); + doc.put("world", Math.random() * 1000); + doc.put("date", sFormat.format(new Date())); + root.put(i + "", doc); + } + esUtil.bulkIndex(root, "test", "test"); + } } diff --git a/src/test/java/com/chickling/kmonitor/test/JMXTest.java b/src/test/java/com/chickling/kmonitor/test/JMXTest.java index 6fc42be..4b704b9 100644 --- a/src/test/java/com/chickling/kmonitor/test/JMXTest.java +++ b/src/test/java/com/chickling/kmonitor/test/JMXTest.java @@ -3,7 +3,10 @@ import java.io.BufferedWriter; import java.io.FileWriter; import java.io.PrintWriter; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; @@ -16,9 +19,13 @@ import org.json.JSONArray; import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.chickling.kmonitor.core.jmx.JMXExecutor; -import com.chickling.kmonitor.core.jmx.KafkaJMX; +import com.chickling.kmonitor.jmx.JMXExecutor; +import com.chickling.kmonitor.jmx.KafkaJMX; +import com.chickling.kmonitor.jmx.ObjectNameHolder; +import com.chickling.kmonitor.utils.ZKUtils; /** * @author Hulva Luva.H @@ -26,106 +33,204 @@ * */ public class JMXTest { - - private static boolean excludeInternalTopic = true; // like __consumer_offsets - - public static void main(String[] args) { - KafkaJMX kafkaJMX = new KafkaJMX(); - objectName_Metrics(kafkaJMX); - objectNames(kafkaJMX); - } - - private static void objectNames(KafkaJMX kafkaJMX) { - kafkaJMX.doWithConnection("10.16.238.94", 8888, Optional.of(""), Optional.of(""), false, new JMXExecutor() { - - @Override - public void doWithConnection(MBeanServerConnection mbsc) { - // KafkaMetrics kafkaMetrics = new KafkaMetrics(); - try (FileWriter fw = new FileWriter("objectNames.json", true); - BufferedWriter bw = new BufferedWriter(fw); - PrintWriter out = new PrintWriter(bw)) { - Set beans = mbsc.queryMBeans(null, null); - JSONArray objectName = new JSONArray(); - for (ObjectInstance bean : beans) { - if (excludeInternalTopic && bean.getObjectName().toString().contains("__consumer_offsets")) { - continue; - } - System.out.println("ObjectName: " + bean.getObjectName()); - objectName.put(bean.getObjectName().toString()); - MBeanInfo mbeanInfo = mbsc.getMBeanInfo(bean.getObjectName()); - System.out.println("\tMBeanInfo: " + mbeanInfo); - MBeanAttributeInfo[] attributes = mbeanInfo.getAttributes(); - String[] attributeArr = new String[attributes.length]; - for (int i = 0; i < attributes.length; i++) { - attributeArr[i] = attributes[i].getName(); - } - AttributeList attributeList = mbsc.getAttributes(bean.getObjectName(), attributeArr); - List attributeList1 = attributeList.asList(); - - for (Attribute attr : attributeList1) { - System.out.println("\t\tName: " + attr.getName() + " Value: " + attr.getValue()); - } - } - out.println(objectName.toString()); - } catch (Exception e) { - - } - } - }); - - } - - private static void objectName_Metrics(KafkaJMX kafkaJMX) { - kafkaJMX.doWithConnection("10.16.238.94", 8888, Optional.of(""), Optional.of(""), false, new JMXExecutor() { - - @Override - public void doWithConnection(MBeanServerConnection mbsc) { - // KafkaMetrics kafkaMetrics = new KafkaMetrics(); - try (FileWriter fw = new FileWriter("metrics.json", true); - BufferedWriter bw = new BufferedWriter(fw); - PrintWriter out = new PrintWriter(bw)) { - Set beans = mbsc.queryMBeans(null, null); - - JSONArray objectNameMetrics = new JSONArray(); - JSONObject objectName = null; - for (ObjectInstance bean : beans) { - objectName = new JSONObject(); - String objectNameStr = bean.getObjectName().toString(); - if (excludeInternalTopic && objectNameStr.contains("__consumer_offsets")) { - continue; - } - System.out.println("ObjectName: " + objectNameStr); - String[] metric_other = objectNameStr.split(":"); - objectName.put("metric", metric_other[0]); - String[] type_name_other = metric_other[1].split(","); - String[] temp; - for (int i = 0; i < type_name_other.length; i++) { - temp = type_name_other[i].split("="); - objectName.put(temp[0], temp[1]); - } - objectName.put("objectName", bean.getObjectName().toString()); - MBeanInfo mbeanInfo = mbsc.getMBeanInfo(bean.getObjectName()); - System.out.println("\tMBeanInfo: " + mbeanInfo); - MBeanAttributeInfo[] attributes = mbeanInfo.getAttributes(); - String[] attributeArr = new String[attributes.length]; - for (int i = 0; i < attributes.length; i++) { - attributeArr[i] = attributes[i].getName(); - } - AttributeList attributeList = mbsc.getAttributes(bean.getObjectName(), attributeArr); - List attributeList1 = attributeList.asList(); - - for (Attribute attr : attributeList1) { - objectName.put(attr.getName(), attr.getValue()); - System.out.println("\t\tName: " + attr.getName() + " Value: " + attr.getValue()); - } - objectNameMetrics.put(objectName); - } - out.println(objectNameMetrics.toString()); - } catch (Exception e) { - - } - } - }); - } + private static Logger LOG = LoggerFactory.getLogger(JMXTest.class); + + private static boolean excludeInternalTopic = true; // like __consumer_offsets + private static Map objectNames = new HashMap(); + + public static void main(String[] args) { + + KafkaJMX kafkaJMX = new KafkaJMX(); + // objectName_Metrics(kafkaJMX); + // objectNames(kafkaJMX); + + ZKUtils.init("10.16.238.101:8181,10.16.238.102:8181,10.16.238.103:8181", 30000, 30000); + initObjectNames(kafkaJMX); + } + + private static void initObjectNames(KafkaJMX kafkaJMX) { + try { + if (kafkaJMX == null) { + kafkaJMX = new KafkaJMX(); + } + List jmxHosts = ZKUtils.getKafkaJMXHostsFromZookeeper(); + for (String jmxHost : jmxHosts) { + String[] jmxArr = jmxHost.split(":"); + if ("-1".equals(jmxArr[2])) { + continue; + } + kafkaJMX.doWithConnection(jmxArr[1], Integer.parseInt(jmxArr[2]), Optional.of(""), Optional.of(""), false, new JMXExecutor() { + + @Override + public void doWithConnection(MBeanServerConnection mbsc) { + try { + Set beans = mbsc.queryMBeans(null, null); + ObjectNameHolder objectNameHolder = null; + ObjectNameHolder objectNameHolderOld = null; + for (ObjectInstance bean : beans) { + objectNameHolder = new ObjectNameHolder(); + String objectName = bean.getObjectName().toString(); + if (excludeInternalTopic && objectName.contains("__consumer_offsets")) { + continue; + } + String[] metric_other = objectName.split(":"); + objectNameHolder.setMetric(metric_other[0]); + String[] type_name_other = metric_other[1].split(","); + Map temp = new HashMap(); + String currentSort = ""; + for (int i = 0; i < type_name_other.length; i++) { + String[] tempArr = type_name_other[i].split("="); + if ("type".equalsIgnoreCase(tempArr[0])) { + objectNameHolder.setType(tempArr[1]); + } else if ("name".equalsIgnoreCase(tempArr[0])) { + objectNameHolder.setName(tempArr[1]); + } else { + currentSort += i == type_name_other.length - 1 ? tempArr[0] : tempArr[0] + ","; + temp.put(tempArr[0], tempArr[1]); + } + } + String key = objectNameHolder.getName() == null ? metric_other[0] + objectNameHolder.getType() + objectNameHolder.getName() + : metric_other[0] + objectNameHolder.getType(); + + if (objectNames.containsKey(key)) { + objectNameHolderOld = objectNames.get(key); + final Map extras = objectNameHolderOld.getExtra(); + + if (extras.isEmpty()) { + continue; + } + String sort = (String) extras.get("sort"); + if (sort.equals(currentSort)) { + temp.forEach((k, v) -> { + @SuppressWarnings("unchecked") + Set somthing = (Set) extras.get(k); + somthing.add(v); + }); + } else { + extras.put("sort", currentSort); + temp.forEach((k, v) -> { + Set somthing = new HashSet(); + somthing.add(v); + extras.put(k, somthing); + }); + } + } else { + Map extras = new HashMap(); + extras.put("sort", currentSort); + temp.forEach((k, v) -> { + Set somthing = new HashSet(); + somthing.add(v); + extras.put(k, somthing); + }); + objectNameHolder.setExtra(extras); + objectNames.put(key, objectNameHolder); + } + } + } catch (Exception e) { + LOG.error("Ops~", e); + } + } + }); + objectNames.forEach((k, v) -> { + LOG.info(k + " -> " + v); + }); + } + } catch (Exception e) { + + } + } + + public static void objectNames(KafkaJMX kafkaJMX) { + kafkaJMX.doWithConnection("10.16.238.94", 8888, Optional.of(""), Optional.of(""), false, new JMXExecutor() { + + @Override + public void doWithConnection(MBeanServerConnection mbsc) { + // KafkaMetrics kafkaMetrics = new KafkaMetrics(); + try (FileWriter fw = new FileWriter("objectNames.json", true); + BufferedWriter bw = new BufferedWriter(fw); + PrintWriter out = new PrintWriter(bw)) { + Set beans = mbsc.queryMBeans(null, null); + JSONArray objectName = new JSONArray(); + for (ObjectInstance bean : beans) { + if (excludeInternalTopic && bean.getObjectName().toString().contains("__consumer_offsets")) { + continue; + } + System.out.println("ObjectName: " + bean.getObjectName()); + objectName.put(bean.getObjectName().toString()); + MBeanInfo mbeanInfo = mbsc.getMBeanInfo(bean.getObjectName()); + System.out.println("\tMBeanInfo: " + mbeanInfo); + MBeanAttributeInfo[] attributes = mbeanInfo.getAttributes(); + String[] attributeArr = new String[attributes.length]; + for (int i = 0; i < attributes.length; i++) { + attributeArr[i] = attributes[i].getName(); + } + AttributeList attributeList = mbsc.getAttributes(bean.getObjectName(), attributeArr); + List attributeList1 = attributeList.asList(); + + for (Attribute attr : attributeList1) { + System.out.println("\t\tName: " + attr.getName() + " Value: " + attr.getValue()); + } + } + out.println(objectName.toString()); + } catch (Exception e) { + + } + } + }); + + } + + public static void objectName_Metrics(KafkaJMX kafkaJMX) { + kafkaJMX.doWithConnection("10.16.238.94", 8888, Optional.of(""), Optional.of(""), false, new JMXExecutor() { + + @Override + public void doWithConnection(MBeanServerConnection mbsc) { + // KafkaMetrics kafkaMetrics = new KafkaMetrics(); + try (FileWriter fw = new FileWriter("metrics.json", true); + BufferedWriter bw = new BufferedWriter(fw); + PrintWriter out = new PrintWriter(bw)) { + Set beans = mbsc.queryMBeans(null, null); + + JSONArray objectNameMetrics = new JSONArray(); + JSONObject objectName = null; + for (ObjectInstance bean : beans) { + objectName = new JSONObject(); + String objectNameStr = bean.getObjectName().toString(); + if (excludeInternalTopic && objectNameStr.contains("__consumer_offsets")) { + continue; + } + System.out.println("ObjectName: " + objectNameStr); + String[] metric_other = objectNameStr.split(":"); + objectName.put("metric", metric_other[0]); + String[] type_name_other = metric_other[1].split(","); + String[] temp; + for (int i = 0; i < type_name_other.length; i++) { + temp = type_name_other[i].split("="); + objectName.put(temp[0], temp[1]); + } + objectName.put("objectName", bean.getObjectName().toString()); + MBeanInfo mbeanInfo = mbsc.getMBeanInfo(bean.getObjectName()); + System.out.println("\tMBeanInfo: " + mbeanInfo); + MBeanAttributeInfo[] attributes = mbeanInfo.getAttributes(); + String[] attributeArr = new String[attributes.length]; + for (int i = 0; i < attributes.length; i++) { + attributeArr[i] = attributes[i].getName(); + } + AttributeList attributeList = mbsc.getAttributes(bean.getObjectName(), attributeArr); + List attributeList1 = attributeList.asList(); + + for (Attribute attr : attributeList1) { + objectName.put(attr.getName(), attr.getValue()); + System.out.println("\t\tName: " + attr.getName() + " Value: " + attr.getValue()); + } + objectNameMetrics.put(objectName); + } + out.println(objectNameMetrics.toString()); + } catch (Exception e) { + + } + } + }); + } } diff --git a/src/test/java/com/chickling/kmonitor/test/JSONTest.java b/src/test/java/com/chickling/kmonitor/test/JSONTest.java new file mode 100644 index 0000000..1c6a849 --- /dev/null +++ b/src/test/java/com/chickling/kmonitor/test/JSONTest.java @@ -0,0 +1,40 @@ +package com.chickling.kmonitor.test; + +import java.util.HashMap; +import java.util.Map; + +import org.json.JSONArray; +import org.json.JSONObject; + +/** + * @author Hulva Luva.H + * @since 2017-07-24 + * + */ +public class JSONTest { + + public static void main(String[] args) { +// Map root = new HashMap(); +// root.put("firstLevel", JSONObject.NULL); +// if (root.get("firstLevel").equals(JSONObject.NULL)) { +// Map firstLevel = new HashMap(); +// firstLevel.put("secondLevel", JSONObject.NULL); +// root.put("firstLevel", firstLevel); +// } +// JSONObject json = new JSONObject(root); +// System.out.println(json); + + JSONObject json = new JSONObject(); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(1); + jsonArr.put(3); + json.put("columns", jsonArr); + JSONArray jsonArr1 = new JSONArray(); + + jsonArr1.put(jsonArr); + jsonArr1.put(json); + + System.out.println(jsonArr1); + } + +} diff --git a/src/test/java/com/chickling/kmonitor/test/ObjectNameManagerTest.java b/src/test/java/com/chickling/kmonitor/test/ObjectNameManagerTest.java new file mode 100644 index 0000000..b3e934b --- /dev/null +++ b/src/test/java/com/chickling/kmonitor/test/ObjectNameManagerTest.java @@ -0,0 +1,27 @@ +package com.chickling.kmonitor.test; + +import java.util.Map; + +import org.json.JSONObject; + +import com.chickling.kmonitor.jmx.ObjectNameHolder; +import com.chickling.kmonitor.jmx.ObjectNameManager; +import com.chickling.kmonitor.utils.ZKUtils; + +/** + * + * @author Hulva Luva.H + * @since 2017-07-25 + * + */ +public class ObjectNameManagerTest { + + public static void main(String[] args) { + ZKUtils.init("10.16.238.101:8181,10.16.238.102:8181,10.16.238.103:8181", 30000, 30000); + ObjectNameManager onm = ObjectNameManager.getInstance(); + Map objectNames = onm.getObjectNames(); + System.out.println(objectNames); + System.out.println(new JSONObject(objectNames)); + } + +} diff --git a/src/test/java/com/chickling/kmonitor/test/SaveJMXMetricsToES.java b/src/test/java/com/chickling/kmonitor/test/SaveJMXMetricsToES.java new file mode 100644 index 0000000..2361bd1 --- /dev/null +++ b/src/test/java/com/chickling/kmonitor/test/SaveJMXMetricsToES.java @@ -0,0 +1,130 @@ +package com.chickling.kmonitor.test; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import javax.management.Attribute; +import javax.management.AttributeList; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanInfo; +import javax.management.MBeanServerConnection; +import javax.management.ObjectInstance; + +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.chickling.kmonitor.jmx.JMXExecutor; +import com.chickling.kmonitor.jmx.KafkaJMX; +import com.chickling.kmonitor.utils.ZKUtils; +import com.chickling.kmonitor.utils.elasticsearch.javaapi.ElasticsearchJavaUtil; + +import scala.App; + +/** + * @author Hulva Luva.H from ECBD + * @date 2017年7月25日 + * @description + * + */ +public class SaveJMXMetricsToES { + private static Logger LOG = LoggerFactory.getLogger(App.class); + private static boolean excludeInternalTopic = true; // like __consumer_offsets + private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); + + private static final String indexPrefix = "logx_monitor_kafka-"; + private static final String docType = "kafkajmx"; + private static final SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd"); + + public static void main(String[] args) { + ZKUtils.init("10.16.238.101:8181,10.16.238.102:8181,10.16.238.103:8181", 30000, 30000); + // ElasticsearchJavaUtil es = new + // ElasticsearchJavaUtil("10.16.238.82:9300,10.16.238.83:9300,10.16.238.84:9300"); + ElasticsearchJavaUtil es = new ElasticsearchJavaUtil("10.16.232.120:9300"); + KafkaJMX kafkaJMX = new KafkaJMX(); + + scheduler.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + + List jmxHosts = ZKUtils.getKafkaJMXHostsFromZookeeper(); + for (String jmxHost : jmxHosts) { + String[] jmxArr = jmxHost.split(":"); + if ("-1".equals(jmxArr[2])) { + continue; + } + kafkaJMX.doWithConnection(jmxArr[1], Integer.parseInt(jmxArr[2]), Optional.of(""), Optional.of(""), false, new JMXExecutor() { + + @Override + public void doWithConnection(MBeanServerConnection mbsc) { + JSONObject objectName = null; + try { + Date now = new Date(); + SimpleDateFormat sFormat1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + String indexSufix = sFormat.format(now); + Set beans = mbsc.queryMBeans(null, null); + + for (ObjectInstance bean : beans) { + objectName = new JSONObject(); + objectName.put("timestamp", now.getTime()); + objectName.put("formatedDate", sFormat1.format(now)); + String objectNameStr = bean.getObjectName().toString(); + if (excludeInternalTopic && objectNameStr.contains("__consumer_offsets")) { + continue; + } + if (objectNameStr.contains("java.lang:type=Runtime")) { + continue; + } + String[] metric_other = objectNameStr.split(":"); + objectName.put("metric", metric_other[0]); + String[] type_name_other = metric_other[1].split(","); + String[] temp; + for (int i = 0; i < type_name_other.length; i++) { + temp = type_name_other[i].split("="); + objectName.put(temp[0], temp[1]); + } + objectName.put("objectName", bean.getObjectName().toString()); + MBeanInfo mbeanInfo = mbsc.getMBeanInfo(bean.getObjectName()); + MBeanAttributeInfo[] attributes = mbeanInfo.getAttributes(); + String[] attributeArr = new String[attributes.length]; + for (int i = 0; i < attributes.length; i++) { + attributeArr[i] = attributes[i].getName(); + } + AttributeList attributeList = mbsc.getAttributes(bean.getObjectName(), attributeArr); + List attributeList1 = attributeList.asList(); + + for (Attribute attr : attributeList1) { + if (attr.getValue() == null) { + objectName.put(attr.getName(), "NA"); + } else if (attr.getValue().equals(Double.NEGATIVE_INFINITY)) { + objectName.put(attr.getName(), attr.getValue() + ""); + } else if ("kafka.server:type=KafkaServer,name=ClusterId".equalsIgnoreCase(bean.getObjectName().toString())) { + objectName.put("ClusterId" + attr.getName(), attr.getValue() + ""); + } else { + objectName.put(attr.getName(), attr.getValue()); + } + } + // es.indexDoc(objectName, indexPrefix + indexSufix, docType); + } + } catch (Exception e) { + LOG.error("Ops~" + objectName, e); + } + } + }); + } + // es.bulkIndex().awaitClose(10 * 6000, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.warn("Ops..." + e.getMessage()); + } + } + }, 0, 1 * 60 * 1000, TimeUnit.MILLISECONDS); + } +}