diff --git a/src/main/java/com/chickling/kmonitor/controller/JMXMetricController.java b/src/main/java/com/chickling/kmonitor/controller/JMXMetricController.java index b4f2813..5dc65a7 100644 --- a/src/main/java/com/chickling/kmonitor/controller/JMXMetricController.java +++ b/src/main/java/com/chickling/kmonitor/controller/JMXMetricController.java @@ -12,16 +12,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; +import com.chickling.kmonitor.initialize.SystemManager; import com.chickling.kmonitor.jmx.FormatedMeterMetric; import com.chickling.kmonitor.jmx.JMXExecutor; import com.chickling.kmonitor.jmx.KafkaJMX; import com.chickling.kmonitor.jmx.KafkaMetrics; import com.chickling.kmonitor.jmx.MeterMetric; import com.chickling.kmonitor.utils.ZKUtils; +import com.chickling.kmonitor.utils.elasticsearch.restapi.ElasticsearchRESTUtil; /** * @@ -107,6 +110,7 @@ public void doWithConnection(MBeanServerConnection mBeanServerConnection) { response.put(key, new JSONObject(new FormatedMeterMetric(result.get(key)))); } } + response.put("esUrl", SystemManager.getConfig().getEsHosts().split(":")[0] + ":9200/" + SystemManager.getConfig().getEsIndex() + "-*"); return response.toString(); } @@ -151,7 +155,6 @@ public void doWithConnection(MBeanServerConnection mBeanServerConnection) { } } catch (Exception e) { LOG.error("Get jmxHosts error!" + e.getMessage()); - } return response.toString(); } @@ -230,4 +233,9 @@ public void doWithConnection(MBeanServerConnection mBeanServerConnection) { } return response.toString(); } + + @RequestMapping(value = "/metricviz", method = RequestMethod.POST) + public String getMetricVizData(@RequestBody String metric) { + return ElasticsearchRESTUtil.metricVizDataSearch(metric); + } } diff --git a/src/main/java/com/chickling/kmonitor/core/db/ElasticsearchOffsetDB.java b/src/main/java/com/chickling/kmonitor/core/db/ElasticsearchOffsetDB.java index 2d0f1fd..e16fa7a 100644 --- a/src/main/java/com/chickling/kmonitor/core/db/ElasticsearchOffsetDB.java +++ b/src/main/java/com/chickling/kmonitor/core/db/ElasticsearchOffsetDB.java @@ -21,103 +21,107 @@ * @author Hulva Luva.H * */ -public class ElasticsearchOffsetDB implements OffsetDB { - - private Ielasticsearch esUtil; - private String indexPrefix; - private String docType; - - private static final SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); - - public ElasticsearchOffsetDB(AppConfig config) { - if (config.getApiType().equalsIgnoreCase("Java API")) { - esUtil = new ElasticsearchJavaUtil(config.getEsHosts()); - } else { - esUtil = new ElasticsearchRESTUtil(config.getEsHosts()); - } - - setIndexAndType(config.getEsIndex(), config.getDocTypeForOffset()); - } - - public void setIndexAndType(String index, String docType) { - this.indexPrefix = index + "-"; - this.docType = docType; - } - - /* - * (non-Javadoc) - * - * @see com.chickling.kmonitor.core.db.OffsetDb#insert(long, - * com.chickling.kmonitor.model.OffsetInfo) - */ - @Override - public void insert(long timestamp, OffsetInfo offsetInfo) { - // TODO Auto-generated method stub - } - - /* - * (non-Javadoc) - * - * @see com.chickling.kmonitor.core.db.OffsetDb#batchInsert(java. util.List) - */ - @Override - public void batchInsert(List offsetInfoList) { - long now = System.currentTimeMillis(); - Calendar cal = Calendar.getInstance(); - cal.setTimeInMillis(now); - cal.set(Calendar.MILLISECOND, 0); - cal.set(Calendar.SECOND, 0); - - JSONObject data = new JSONObject(); - for (int i = 0; i < offsetInfoList.size(); i++) { - data.put(i + "", generateRecord(cal.getTimeInMillis(), offsetInfoList.get(i))); - } - esUtil.bulkIndex(data, docType, indexPrefix); - } - - private JSONObject generateRecord(long timestamp, OffsetInfo offsetInfo) { - JSONObject data = new JSONObject(); - data.put("group", offsetInfo.getGroup()); - data.put("topic", offsetInfo.getTopic()); - data.put("partition", offsetInfo.getPartition()); - data.put("offset", offsetInfo.getOffset()); - data.put("logSize", offsetInfo.getLogSize()); - data.put("owner", offsetInfo.getOwner()/* owner.getOrElse("NA") */); - data.put("date", sFormat.format(new Date(timestamp))); - data.put("timestamp", timestamp); - data.put("creation", offsetInfo.getCreation()); - data.put("modified", offsetInfo.getModified()); - data.put("lag", offsetInfo.getLag()); - return data; - } - - /* - * (non-Javadoc) - * - * @see com.chickling.kmonitor.core.db.OffsetDb#offsetHistory() - */ - @Override - public OffsetHistory offsetHistory(String group, String topic) { - List offsetPointsList = esUtil.offsetHistory(indexPrefix, docType, group, topic); - CommonUtils.sortByTimestampThenPartition(offsetPointsList); - return new OffsetHistory(group, topic, offsetPointsList); - } - - @Override - public OffsetHistory offsetHistory(OffsetHistoryQueryParams params) { - List offsetPointsList = esUtil.scrollsSearcher(params, docType, indexPrefix); - CommonUtils.sortByTimestampThenPartition(offsetPointsList); - return new OffsetHistory(params.getGroup(), params.getTopic(), offsetPointsList); - } - - @Override - public boolean check() { - return esUtil.check(); - } - - @Override - public void close() { - esUtil.close(); - } +public class ElasticsearchOffsetDB implements OffsetDB { + + private Ielasticsearch esUtil; + private String indexPrefix; + private String docType; + + private static final SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + + public ElasticsearchOffsetDB(AppConfig config) { + if (config.getApiType().equalsIgnoreCase("Java API")) { + esUtil = new ElasticsearchJavaUtil(config.getEsHosts()); + } else { + esUtil = new ElasticsearchRESTUtil(config.getEsHosts()); + } + setIndexAndType(config.getEsIndex(), config.getDocTypeForOffset()); + } + + public void setIndexAndType(String index, String docType) { + this.indexPrefix = index + "-"; + this.docType = docType; + } + + /* + * (non-Javadoc) + * + * @see com.chickling.kmonitor.core.db.OffsetDb#insert(long, + * com.chickling.kmonitor.model.OffsetInfo) + */ + @Override + public void insert(long timestamp, OffsetInfo offsetInfo) { + // TODO Auto-generated method stub + } + + /* + * (non-Javadoc) + * + * @see com.chickling.kmonitor.core.db.OffsetDb#batchInsert(java. util.List) + */ + @Override + public void batchInsert(List offsetInfoList) { + long now = System.currentTimeMillis(); + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(now); + cal.set(Calendar.MILLISECOND, 0); + cal.set(Calendar.SECOND, 0); + + JSONObject data = new JSONObject(); + for (int i = 0; i < offsetInfoList.size(); i++) { + data.put(i + "", generateRecord(cal.getTimeInMillis(), offsetInfoList.get(i))); + } + esUtil.bulkIndex(data, docType, indexPrefix); + } + + private JSONObject generateRecord(long timestamp, OffsetInfo offsetInfo) { + JSONObject data = new JSONObject(); + data.put("group", offsetInfo.getGroup()); + data.put("topic", offsetInfo.getTopic()); + data.put("partition", offsetInfo.getPartition()); + data.put("offset", offsetInfo.getOffset()); + data.put("logSize", offsetInfo.getLogSize()); + data.put("owner", offsetInfo.getOwner()/* owner.getOrElse("NA") */); + data.put("date", sFormat.format(new Date(timestamp))); + data.put("timestamp", timestamp); + data.put("creation", offsetInfo.getCreation()); + data.put("modified", offsetInfo.getModified()); + data.put("lag", offsetInfo.getLag()); + return data; + } + + /* + * (non-Javadoc) + * + * @see com.chickling.kmonitor.core.db.OffsetDb#offsetHistory() + */ + @Override + public OffsetHistory offsetHistory(String group, String topic) { + List offsetPointsList = esUtil.offsetHistory(indexPrefix, docType, group, topic); + CommonUtils.sortByTimestampThenPartition(offsetPointsList); + return new OffsetHistory(group, topic, offsetPointsList); + } + + @Override + public OffsetHistory offsetHistory(OffsetHistoryQueryParams params) { + List offsetPointsList = esUtil.scrollsSearcher(params, docType, indexPrefix); + CommonUtils.sortByTimestampThenPartition(offsetPointsList); + return new OffsetHistory(params.getGroup(), params.getTopic(), offsetPointsList); + } + + @Override + public boolean check() { + return esUtil.check(); + } + + @Override + public void close() { + esUtil.close(); + } + + @Override + public Ielasticsearch getDB() { + return esUtil; + } } diff --git a/src/main/java/com/chickling/kmonitor/core/db/OffsetDB.java b/src/main/java/com/chickling/kmonitor/core/db/OffsetDB.java index 1be1a91..40d0ffb 100644 --- a/src/main/java/com/chickling/kmonitor/core/db/OffsetDB.java +++ b/src/main/java/com/chickling/kmonitor/core/db/OffsetDB.java @@ -11,16 +11,18 @@ * @author Hulva Luva.H * */ -public interface OffsetDB { - void insert(long timestamp, OffsetInfo offsetInfo); +public interface OffsetDB { + T getDB(); - void batchInsert(List offsetInfoList); + void insert(long timestamp, OffsetInfo offsetInfo); - OffsetHistory offsetHistory(String group, String topic); + void batchInsert(List offsetInfoList); - OffsetHistory offsetHistory(OffsetHistoryQueryParams params); - - void close(); - - boolean check(); + OffsetHistory offsetHistory(String group, String topic); + + OffsetHistory offsetHistory(OffsetHistoryQueryParams params); + + void close(); + + boolean check(); } diff --git a/src/main/java/com/chickling/kmonitor/initialize/SystemManager.java b/src/main/java/com/chickling/kmonitor/initialize/SystemManager.java index 98e53b7..43b10a1 100644 --- a/src/main/java/com/chickling/kmonitor/initialize/SystemManager.java +++ b/src/main/java/com/chickling/kmonitor/initialize/SystemManager.java @@ -3,8 +3,11 @@ import java.io.File; import java.io.IOException; import java.io.PrintWriter; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Date; import java.util.List; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -13,6 +16,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import javax.management.MBeanServerConnection; + +import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,8 +32,14 @@ import com.chickling.kmonitor.core.db.ElasticsearchOffsetDB; import com.chickling.kmonitor.core.db.OffsetDB; import com.chickling.kmonitor.email.EmailSender; +import com.chickling.kmonitor.jmx.FormatedMeterMetric; +import com.chickling.kmonitor.jmx.JMXExecutor; +import com.chickling.kmonitor.jmx.KafkaJMX; +import com.chickling.kmonitor.jmx.KafkaMetrics; import com.chickling.kmonitor.model.KafkaInfo; import com.chickling.kmonitor.utils.CommonUtils; +import com.chickling.kmonitor.utils.ZKUtils; +import com.chickling.kmonitor.utils.elasticsearch.Ielasticsearch; import com.google.gson.Gson; /** @@ -36,137 +48,192 @@ * */ public class SystemManager { - private static Logger LOG = LoggerFactory.getLogger(SystemManager.class); - - // TODO schedule pool size? any other schedule? - private static ScheduledExecutorService scheduler = null; - - private static ExecutorService worker; - - public static final int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors(); - - private static final ExecutorService kafkaInfoCollectAndSavePool = Executors - .newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, new WorkerThreadFactory("KafkaInfo Collector")); - - public static BlockingQueue offsetInfoCacheQueue; - - public static OffsetDB db = null; - - public static OffsetGetter og = null; - - public static AtomicBoolean IS_SYSTEM_READY = new AtomicBoolean(false); - - public static List excludePath = new ArrayList(); - - private static AppConfig config; - - static { - excludePath.add("/"); - excludePath.add("/views/setting.html"); - excludePath.add("/setting"); - excludePath.add("/favicon.svg"); - excludePath.add("/style.css"); - excludePath.add("/index.html"); - } - - public static AppConfig getConfig() { - return config == null ? new AppConfig() : config; - } - - public static synchronized void setConfig(AppConfig _config) throws Exception { - config = _config; - initSystem(); - // TODO - IS_SYSTEM_READY.set(true); - ; - saveToFile(); - } - - private static void saveToFile() { - try { - PrintWriter writer = new PrintWriter("system.json", "UTF-8"); - writer.println(new Gson().toJson(config)); - writer.close(); - } catch (IOException e) { - LOG.error("Save system config to file failed!", e); - } - } - - private static void initSystem() { - try { - if (db != null) - db.close(); - db = new ElasticsearchOffsetDB(config); - if (!db.check()) { - throw new RuntimeException("No elasticsearch node avialable!"); - } - if (og != null) - og.close(); - og = new ZKOffsetGetter(config); - // TODO how cheack og is avialable? - - if (scheduler != null) - scheduler.shutdownNow(); - scheduler = Executors.newScheduledThreadPool(2, new WorkerThreadFactory("FixedRateSchedule")); - - if (config.getIsAlertEnabled()) { - initAlert(config); - } - - scheduler.scheduleAtFixedRate(new Runnable() { - - @Override - public void run() { - try { - List groups = og.getGroups(); - groups.forEach(group -> { - kafkaInfoCollectAndSavePool.submit(new GenerateKafkaInfoTask(group)); - }); - } catch (Exception e) { - LOG.warn("Ops..." + e.getMessage()); - } - } - }, 0, config.getDataCollectFrequency() * 60 * 1000, TimeUnit.MILLISECONDS); - - } catch (Exception e) { - // TODO - IS_SYSTEM_READY.set(false); - throw new RuntimeException("Init system failed! " + e.getMessage()); - } - } - - private static void initAlert(AppConfig config) { - EmailSender.setConfig(config); - TaskManager.init(config); - if (offsetInfoCacheQueue != null) { - offsetInfoCacheQueue.clear(); - } else { - offsetInfoCacheQueue = new LinkedBlockingQueue(config.getOffsetInfoCacheQueue()); - } - if (worker != null) - worker.shutdownNow(); - int corePoolSize = config.getOffsetInfoHandler() != null ? config.getOffsetInfoHandler() - : DEFAULT_THREAD_POOL_SIZE; - if (worker != null) { - worker.shutdownNow(); - } - worker = Executors.newFixedThreadPool(corePoolSize, new WorkerThreadFactory("AlertTaskChecker")); - - for (int i = 0; i < corePoolSize; i++) { - worker.submit(new TaskHandler()); - } - - File dir = new File(config.getTaskFolder()); - if (!dir.exists()) { - dir.mkdir(); - } - // Scan task folder load tasks to memory - File[] listOfFiles = dir.listFiles(); - for (File file : listOfFiles) { - if (file.isFile() && (file.getName().substring(file.getName().lastIndexOf('.') + 1).equals("task"))) { - String strTaskContent = CommonUtils.loadFileContent(file.getAbsolutePath()); - TaskManager.addTask(new Gson().fromJson(strTaskContent, TaskContent.class)); - } - } - } + private static Logger LOG = LoggerFactory.getLogger(SystemManager.class); + + // TODO schedule pool size? any other schedule? + private static ScheduledExecutorService scheduler = null; + + private static ExecutorService worker; + + public static final int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors(); + + private static final ExecutorService kafkaInfoCollectAndSavePool = + Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, new WorkerThreadFactory("KafkaInfo Collector")); + + public static final String JMX_METRIC_ES_DOC_TYPE = "jmxMetrics"; + + public static BlockingQueue offsetInfoCacheQueue; + + public static OffsetDB db = null; + + public static OffsetGetter og = null; + + public static AtomicBoolean IS_SYSTEM_READY = new AtomicBoolean(false); + + public static List excludePath = new ArrayList(); + + private static AppConfig config; + + static { + excludePath.add("/"); + excludePath.add("/views/setting.html"); + excludePath.add("/setting"); + excludePath.add("/favicon.svg"); + excludePath.add("/style.css"); + excludePath.add("/index.html"); + } + + public static AppConfig getConfig() { + return config == null ? new AppConfig() : config; + } + + public static synchronized void setConfig(AppConfig _config) throws Exception { + config = _config; + initSystem(); + // TODO + IS_SYSTEM_READY.set(true);; + saveToFile(); + } + + private static void saveToFile() { + try { + PrintWriter writer = new PrintWriter("system.json", "UTF-8"); + writer.println(new Gson().toJson(config)); + writer.close(); + } catch (IOException e) { + LOG.error("Save system config to file failed!", e); + } + } + + private static void initSystem() { + try { + if (db != null) + db.close(); + db = new ElasticsearchOffsetDB(config); + if (!db.check()) { + throw new RuntimeException("No elasticsearch node avialable!"); + } + if (og != null) + og.close(); + og = new ZKOffsetGetter(config); + // TODO how cheack og is avialable? + + if (scheduler != null) + scheduler.shutdownNow(); + scheduler = Executors.newScheduledThreadPool(2, new WorkerThreadFactory("FixedRateSchedule")); + + if (config.getIsAlertEnabled()) { + initAlert(config); + } + + // Offset info data + scheduler.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + List groups = og.getGroups(); + groups.forEach(group -> { + kafkaInfoCollectAndSavePool.submit(new GenerateKafkaInfoTask(group)); + }); + } catch (Exception e) { + LOG.warn("Ops..." + e.getMessage()); + } + } + }, 0, config.getDataCollectFrequency() * 60 * 1000, TimeUnit.MILLISECONDS); + + + // JMX metrics data + scheduler.scheduleAtFixedRate(new Runnable() { + private final SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + + @Override + public void run() { + Date now = new Date(); + JSONObject data = new JSONObject(); + try { + List jmxHosts = ZKUtils.getKafkaJMXHostsFromZookeeper(); + for (String jmxHost : jmxHosts) { + String[] jmxArr = jmxHost.split(":"); + if ("-1".equals(jmxArr[2])) { + LOG.warn("JMX disabled in " + jmxHost); + continue; + } + KafkaJMX kafkaJMX = new KafkaJMX(); + kafkaJMX.doWithConnection(jmxArr[1], Integer.parseInt(jmxArr[2]), Optional.of(""), Optional.of(""), false, new JMXExecutor() { + + @Override + public void doWithConnection(MBeanServerConnection mBeanServerConnection) { + KafkaMetrics metrics = new KafkaMetrics(); + JSONObject metric = null; + metric = new JSONObject(new FormatedMeterMetric(metrics.getMessagesInPerSec(mBeanServerConnection, Optional.empty()), 0)); + metric.put("broker", jmxArr[1]); + metric.put("date", sFormat.format(now)); + metric.put("timestamp", now.getTime()); + metric.put("metric", "MessagesInPerSec"); + data.put("MessagesInPerSec" + jmxArr[0], metric); + + metric = new JSONObject(new FormatedMeterMetric(metrics.getBytesInPerSec(mBeanServerConnection, Optional.empty()))); + metric.put("broker", jmxArr[1]); + metric.put("date", sFormat.format(now)); + metric.put("timestamp", now.getTime()); + metric.put("metric", "BytesInPerSec"); + data.put("BytesInPerSec" + jmxArr[0], metric); + + metric = new JSONObject(new FormatedMeterMetric(metrics.getBytesOutPerSec(mBeanServerConnection, Optional.empty()))); + metric.put("broker", jmxArr[1]); + metric.put("date", sFormat.format(now)); + metric.put("timestamp", now.getTime()); + metric.put("metric", "BytesOutPerSec"); + data.put("BytesOutPerSec" + jmxArr[0], metric); + } + }); + } + db.getDB().bulkIndex(data, JMX_METRIC_ES_DOC_TYPE, config.getEsIndex() + "-"); + } catch (Exception e) { + LOG.warn("Ops..." + e.getMessage()); + } + } + }, 0, config.getDataCollectFrequency() * 60 * 1000, TimeUnit.MILLISECONDS); + + } catch (Exception e) { + // TODO + IS_SYSTEM_READY.set(false); + throw new RuntimeException("Init system failed! " + e.getMessage()); + } + } + + private static void initAlert(AppConfig config) { + EmailSender.setConfig(config); + TaskManager.init(config); + if (offsetInfoCacheQueue != null) { + offsetInfoCacheQueue.clear(); + } else { + offsetInfoCacheQueue = new LinkedBlockingQueue(config.getOffsetInfoCacheQueue()); + } + if (worker != null) + worker.shutdownNow(); + int corePoolSize = config.getOffsetInfoHandler() != null ? config.getOffsetInfoHandler() : DEFAULT_THREAD_POOL_SIZE; + if (worker != null) { + worker.shutdownNow(); + } + worker = Executors.newFixedThreadPool(corePoolSize, new WorkerThreadFactory("AlertTaskChecker")); + + for (int i = 0; i < corePoolSize; i++) { + worker.submit(new TaskHandler()); + } + + File dir = new File(config.getTaskFolder()); + if (!dir.exists()) { + dir.mkdir(); + } + // Scan task folder load tasks to memory + File[] listOfFiles = dir.listFiles(); + for (File file : listOfFiles) { + if (file.isFile() && (file.getName().substring(file.getName().lastIndexOf('.') + 1).equals("task"))) { + String strTaskContent = CommonUtils.loadFileContent(file.getAbsolutePath()); + TaskManager.addTask(new Gson().fromJson(strTaskContent, TaskContent.class)); + } + } + } } diff --git a/src/main/java/com/chickling/kmonitor/utils/MetricUtils.java b/src/main/java/com/chickling/kmonitor/utils/MetricUtils.java index 0d19450..e772445 100644 --- a/src/main/java/com/chickling/kmonitor/utils/MetricUtils.java +++ b/src/main/java/com/chickling/kmonitor/utils/MetricUtils.java @@ -34,7 +34,7 @@ public static String sizeFormat(Double bytes) { return new BigDecimal(bytes).setScale(2, BigDecimal.ROUND_HALF_UP).toString() + " B"; } else { int exp = new Double((Math.log(bytes) / Math.log(unit))).intValue(); - char pre = "kMGTPE".charAt(exp - 1); + char pre = "KMGTPE".charAt(exp - 1); return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre); } } diff --git a/src/main/java/com/chickling/kmonitor/utils/elasticsearch/restapi/ElasticsearchRESTUtil.java b/src/main/java/com/chickling/kmonitor/utils/elasticsearch/restapi/ElasticsearchRESTUtil.java index 383cde3..1905bb3 100644 --- a/src/main/java/com/chickling/kmonitor/utils/elasticsearch/restapi/ElasticsearchRESTUtil.java +++ b/src/main/java/com/chickling/kmonitor/utils/elasticsearch/restapi/ElasticsearchRESTUtil.java @@ -361,4 +361,21 @@ public void close() { } + public static String metricVizDataSearch(String metric) { + String searchResult = ""; + try { + String indexNameSearch = SystemManager.getConfig().getEsIndex() + "-*"; + ResponseEntity response = REST.exchange( + "http://" + SystemManager.getConfig().getEsHosts().split(":")[0] + ":9200/" + indexNameSearch + "/" + + SystemManager.JMX_METRIC_ES_DOC_TYPE + "/_search", + HttpMethod.POST, new HttpEntity(ScrollSearchTemplate.getMetricVizSearchBody(metric), headers), String.class); + + searchResult = response.getBody(); + } catch (Exception e) { + // TODO + LOG.error("Damn...", e); + } + return searchResult; + } + } diff --git a/src/main/java/com/chickling/kmonitor/utils/elasticsearch/restapi/ScrollSearchTemplate.java b/src/main/java/com/chickling/kmonitor/utils/elasticsearch/restapi/ScrollSearchTemplate.java index 748da81..189a115 100644 --- a/src/main/java/com/chickling/kmonitor/utils/elasticsearch/restapi/ScrollSearchTemplate.java +++ b/src/main/java/com/chickling/kmonitor/utils/elasticsearch/restapi/ScrollSearchTemplate.java @@ -1,20 +1,46 @@ package com.chickling.kmonitor.utils.elasticsearch.restapi; +import java.util.Date; + /** - * @author Hulva Luva.H from ECBD - * @date 2017年7月19日 - * @description + * @author Hulva Luva.H + * @since 2017-7-19 * */ public class ScrollSearchTemplate { - public static String getScrollSearchBody(String topic, String group, String from, String to) { - return "{\"size\":1000,\"sort\":[{\"timestamp\":{\"order\":\"asc\"}},{\"partition\":{\"order\":\"asc\"}}],\"query\":{\"bool\":{\"must\":[{\"match\":{\"topic\":\"" - + topic + "\"}},{\"match\":{\"group\":\"" + group + "\"}}],\"filter\":[{\"range\":{\"date\":{\"gte\":\"" - + from + "\",\"lte\":\"" + to + "\",\"format\":\"yyyy-MM-dd'T'HH:mm:ss.SSSZ\"}}}]}}}"; - } + public static String getScrollSearchBody(String topic, String group, String from, String to) { + return "{\"size\":1000," + + + "\"sort\":[{\"timestamp\":{\"order\":\"asc\"}},{\"partition\":{\"order\":\"asc\"}}]," + + + "\"query\":{\"bool\":{" + + + "\"must\":[{\"match\":{\"topic\":\"" + topic + "\"}},{\"match\":{\"group\":\"" + group + "\"}}]," + + + "\"filter\":[{\"range\":{\"date\":{\"gte\":\"" + from + "\",\"lte\":\"" + to + + "\",\"format\":\"yyyy-MM-dd'T'HH:mm:ss.SSSZ\"}}}]}}}"; + } + + public static String getScrollNextBody(String scrollId) { + return "{\"scroll\":\"1m\",\"scroll_id\":\"" + scrollId + "\"}"; + } + + public static String getMetricVizSearchBody(String metric) { + return "{\"size\": 2000," + + + "\"sort\": [{\"timestamp\": {\"order\": \"desc\"}},\"broker\"]," + + + "\"query\": {\"bool\": {" + + + "\"must\": [{\"match\": { \"metric\": \"" + metric + "\" }}]," + + + "\"filter\": [{" + + + "\"range\": {\"timestamp\": {" + + + "\"gte\": " + (new Date().getTime() - 480 * 60000) + "," - public static String getScrollNextBody(String scrollId) { - return "{\"scroll\":\"1m\",\"scroll_id\":\"" + scrollId + "\"}"; - } + + "\"lte\": " + new Date().getTime() + "}}}]}}}"; + } } diff --git a/src/main/resources/static/index.html b/src/main/resources/static/index.html index 0b4e4b3..f4b3c4e 100644 --- a/src/main/resources/static/index.html +++ b/src/main/resources/static/index.html @@ -26,7 +26,8 @@ - + @@ -110,9 +111,11 @@ src="//cdnjs.cloudflare.com/ajax/libs/lodash.js/2.4.1/lodash.min.js"> - - - + + + diff --git a/src/main/resources/static/scripts/app.js b/src/main/resources/static/scripts/app.js index 1e90762..97508cf 100644 --- a/src/main/resources/static/scripts/app.js +++ b/src/main/resources/static/scripts/app.js @@ -111,7 +111,7 @@ angular.module("offsetapp.services", [ "ngResource" ]) }).value(); return groups; } - + return { getGroup : function(group, cb) { return $resource("./group/:group").get({ @@ -131,6 +131,16 @@ angular.module("offsetapp.services", [ "ngResource" ]) loadClusterViz : function(group, cb) { cb(loadViz("#dataviz-container", "./clusterlist")) }, + loadMetricVizChart: function(metric, title) { + $http({ + method: 'POST', + url: './metrics/metricviz', + headers: {'Content-Type': 'application/json'}, + data: metric + }).success(function (response) { + intervalHighchart(response, title); + }); + }, loadTopicConsumerViz : function(group, cb) { cb(loadViz("#dataviz-container", "./activetopics")) }, diff --git a/src/main/resources/static/scripts/cluster-viz.js b/src/main/resources/static/scripts/cluster-viz.js index 9bbab3d..84b4beb 100644 --- a/src/main/resources/static/scripts/cluster-viz.js +++ b/src/main/resources/static/scripts/cluster-viz.js @@ -16,6 +16,12 @@ var diagonal = d3.svg.diagonal().projection(function(d) { var svg; +Highcharts.setOptions({ + global: { + useUTC: false + } +}); + function loadViz(load_to_id, data_path) { svg = d3.select(load_to_id).append("svg").attr("width", width + margin.right + margin.left).attr("height", @@ -43,6 +49,82 @@ function loadViz(load_to_id, data_path) { d3.select(self.frameElement).style("height", "800px"); } +function intervalHighchart(result, title) { + try{ + var seriesOptions = []; + var hits = result.hits.hits; + let brokerHitsMap = new Map(); + $.each(hits, function (i, hit){ + var source = hit._source; + var brokerHits = brokerHitsMap.get(source.broker); + if(brokerHits == undefined){ + brokerHitsMap.set(source.broker, [[source.timestamp, source.count]]); + }else{ + brokerHits.push([source.timestamp, source.count]); + } + }); + let i=0 + for (let [broker, hits] of brokerHitsMap) { + hits.sort(function(a, b) { + return a[0] - b[0]; + }); + seriesOptions[i] = { + name: broker, + data: hits + }; + i++; + } + createChart(seriesOptions, title); + }catch (err) { + console.log("Your browser version is too too low ~"); + } +} + +/** + * Create the chart when all data is loaded + * + * @returns {undefined} + */ +function createChart(_seriesOptions, title) { + + Highcharts.stockChart('metrics', { + title: { + text: title + }, + + rangeSelector: { + selected: 4, + enabled: false + }, + + yAxis: { + labels: { + formatter: function () { + return this.value + '%'; + } + }, + plotLines: [{ + value: 0, + width: 2, + color: 'silver' + }] + }, + + plotOptions: { + series: { + compare: 'percent', + showInNavigator: true + } + }, + + tooltip: { + pointFormat: '{series.name}: {point.y} ({point.change}%)
' + }, + + series: _seriesOptions + }); +} + function update(source) { // Compute the new tree layout. diff --git a/src/main/resources/static/scripts/controllers.js b/src/main/resources/static/scripts/controllers.js index 3e30dc8..37c900d 100644 --- a/src/main/resources/static/scripts/controllers.js +++ b/src/main/resources/static/scripts/controllers.js @@ -62,13 +62,32 @@ angular.module('offsetapp.controllers', [ "offsetapp.services" ]) .controller("ClusterVizCtrl", [ "$scope", "$interval", "$routeParams", "offsetinfo", function($scope, $interval, $routeParams, offsetinfo) { $scope.loading = true; - offsetinfo.loadClusterViz($routeParams.group, function(d) {}); + + $scope.metrics = [ + {code: "MessagesInPerSec", value: "MessagesIn"}, + {code: "BytesInPerSec", value: "BytesIn"}, + {code: "BytesOutPerSec", value: "BytesOut"} + ]; + $scope.currentMetric = "MessagesInPerSec"; + let esUrl; offsetinfo.brokerTopicMetricsForBrokers().success(function(d) { if(d.BytesInPerSec){ $scope.jmxEnabled = true; + esUrl = d.esUrl; } $scope.brokerTopicMetrics = d; + + offsetinfo.loadMetricVizChart($scope.currentMetric, "Messages"); }); + offsetinfo.loadClusterViz($routeParams.group, function(d) {}); + + $scope.onMetricChanged = function() { + if ($scope.currentMetric == "MessagesInPerSec") { + offsetinfo.loadMetricVizChart($scope.currentMetric, "Messages"); + } else { + offsetinfo.loadMetricVizChart($scope.currentMetric, "Bytes"); + } + } } ]) .controller("ActiveTopicsVizCtrl", [ "$scope", "$interval", "$routeParams", "offsetinfo", function($scope, $interval, $routeParams, offsetinfo) { diff --git a/src/main/resources/static/style.css b/src/main/resources/static/style.css index 807109c..c1057ad 100644 --- a/src/main/resources/static/style.css +++ b/src/main/resources/static/style.css @@ -40,6 +40,7 @@ body { } .footer { + padding-top: 16px; position: absolute; bottom: 0; width: 100%; diff --git a/src/main/resources/static/views/cluster-viz.html b/src/main/resources/static/views/cluster-viz.html index 1f02a28..b79a4eb 100644 --- a/src/main/resources/static/views/cluster-viz.html +++ b/src/main/resources/static/views/cluster-viz.html @@ -66,7 +66,7 @@

Metrics

- +
+
+

Choose one to show its change in 8 hours

+
+
+
+ +
+
+
+
+ +