From 6d04c6ae23cd170cffe36b7ade628ea8aa699e49 Mon Sep 17 00:00:00 2001 From: "Hulva Luva.H" Date: Tue, 18 Jul 2017 13:33:11 +0800 Subject: [PATCH 1/6] fix potential endless loop --- .../chickling/kmonitor/config/AppConfig.java | 9 +++++++++ .../core/db/ElasticsearchOffsetDB.java | 15 +++++++++++---- .../kmonitor/utils/ElasticsearchUtil.java | 18 ++++++------------ .../resources/static/scripts/controllers.js | 1 + src/main/resources/static/views/setting.html | 9 +++++---- .../chickling/kmonitor/test/EsSearchTest.java | 3 +-- 6 files changed, 33 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/chickling/kmonitor/config/AppConfig.java b/src/main/java/com/chickling/kmonitor/config/AppConfig.java index 322e6a5..a042bf0 100644 --- a/src/main/java/com/chickling/kmonitor/config/AppConfig.java +++ b/src/main/java/com/chickling/kmonitor/config/AppConfig.java @@ -7,6 +7,7 @@ public class AppConfig { private String esHosts; private String esIndex; + private String docTypeForOffset; private Integer dataCollectFrequency = 1; @@ -61,6 +62,14 @@ public void setEsIndex(String esIndex) { this.esIndex = esIndex; } + public String getDocTypeForOffset() { + return docTypeForOffset; + } + + public void setDocTypeForOffset(String docTypeForOffset) { + this.docTypeForOffset = docTypeForOffset; + } + public Integer getDataCollectFrequency() { return dataCollectFrequency; } diff --git a/src/main/java/com/chickling/kmonitor/core/db/ElasticsearchOffsetDB.java b/src/main/java/com/chickling/kmonitor/core/db/ElasticsearchOffsetDB.java index 536e8e3..82dc85b 100644 --- a/src/main/java/com/chickling/kmonitor/core/db/ElasticsearchOffsetDB.java +++ b/src/main/java/com/chickling/kmonitor/core/db/ElasticsearchOffsetDB.java @@ -22,12 +22,19 @@ public class ElasticsearchOffsetDB implements OffsetDB { private ElasticsearchUtil esUtil; + private String indexPrefix; + private String docType; private static final SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); public ElasticsearchOffsetDB(AppConfig config) { esUtil = new ElasticsearchUtil(config.getEsHosts()); - esUtil.setIndexAndType(config.getEsIndex()); + setIndexAndType(config.getEsIndex(), config.getDocTypeForOffset()); + } + + public void setIndexAndType(String index, String docType) { + this.indexPrefix = index + "-"; + this.docType = docType; } /* @@ -58,7 +65,7 @@ public void batchInsert(List offsetInfoList) { for (int i = 0; i < offsetInfoList.size(); i++) { data.put(i + "", generateRecord(cal.getTimeInMillis(), offsetInfoList.get(i))); } - esUtil.bulkIndex(data, "kafkOffsetInfo"); + esUtil.bulkIndex(data, docType, indexPrefix); } private JSONObject generateRecord(long timestamp, OffsetInfo offsetInfo) { @@ -84,14 +91,14 @@ private JSONObject generateRecord(long timestamp, OffsetInfo offsetInfo) { */ @Override public OffsetHistory offsetHistory(String group, String topic) { - List offsetPointsList = esUtil.offsetHistory("kafkOffsetInfo", group, topic); + List offsetPointsList = esUtil.offsetHistory(indexPrefix, docType, group, topic); CommonUtils.sortByTimestampThenPartition(offsetPointsList); return new OffsetHistory(group, topic, offsetPointsList); } @Override public OffsetHistory offsetHistory(OffsetHistoryQueryParams params) { - List offsetPointsList = esUtil.scrollsSearcher(params, "kafkOffsetInfo"); + List offsetPointsList = esUtil.scrollsSearcher(params, docType, indexPrefix); CommonUtils.sortByTimestampThenPartition(offsetPointsList); return new OffsetHistory(params.getGroup(), params.getTopic(), offsetPointsList); } diff --git a/src/main/java/com/chickling/kmonitor/utils/ElasticsearchUtil.java b/src/main/java/com/chickling/kmonitor/utils/ElasticsearchUtil.java index 04155c2..358cff7 100644 --- a/src/main/java/com/chickling/kmonitor/utils/ElasticsearchUtil.java +++ b/src/main/java/com/chickling/kmonitor/utils/ElasticsearchUtil.java @@ -47,8 +47,6 @@ public class ElasticsearchUtil { private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchUtil.class); static TransportClient client = null; - private String indexPrefix; - public ElasticsearchUtil(String stringHosts) { initClient(stringHosts); } @@ -71,11 +69,7 @@ private void initClient(String stringHosts) { } } - public void setIndexAndType(String index) { - this.indexPrefix = index + "-"; - } - - public void bulkIndex(JSONObject data, String docType) { + public void bulkIndex(JSONObject data, String docType, String indexPrefix) { BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { @@ -107,7 +101,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) } } - public List scrollsSearcher(OffsetHistoryQueryParams params, String docType) { + public List scrollsSearcher(OffsetHistoryQueryParams params, String docType, String indexPrefix) { int parallism = Runtime.getRuntime().availableProcessors(); ExecutorService pool = Executors.newFixedThreadPool(parallism); @@ -131,11 +125,11 @@ public List scrollsSearcher(OffsetHistoryQueryParams params, Strin List>> futureList = new ArrayList>>(); while (true) { - if (response.getHits().getHits().length == 0) { + final SearchHit[] searchHits = response.getHits().getHits(); + if (searchHits.length == 0) { break; } try { - SearchHit[] searchHits = response.getHits().getHits(); int step = searchHits.length / parallism; Future> future = null; for (int i = 0; i < searchHits.length; i = i + step) { @@ -253,7 +247,7 @@ private String getRangeFrom(OffsetHistoryQueryParams params) throws ParseExcepti // } // } - public List offsetHistory(String docType, String group, String topic) { + public List offsetHistory(String indexPrefix, String docType, String group, String topic) { int parallism = Runtime.getRuntime().availableProcessors(); ExecutorService pool = Executors.newFixedThreadPool(parallism); @@ -288,7 +282,7 @@ public List offsetHistory(String docType, String group, String top int step = searchHits.length / parallism; Future> future = null; for (int i = 0; i < searchHits.length; i = i + step) { - int to = i + step < searchHits.length ? i + step : searchHits.length; + int to = i + step < searchHits.length ? i + step : searchHits.length; SearchHit[] searchHitPart = Arrays.copyOfRange(searchHits, i, to); future = pool.submit(new GenerateOffsetHistoryDataset(searchHitPart)); futureList.add(future); diff --git a/src/main/resources/static/scripts/controllers.js b/src/main/resources/static/scripts/controllers.js index 91a59ab..e7f222b 100644 --- a/src/main/resources/static/scripts/controllers.js +++ b/src/main/resources/static/scripts/controllers.js @@ -219,6 +219,7 @@ angular.module('offsetapp.controllers', [ "offsetapp.services" ]) excludeByLastSeen: 2592000, esHosts: "", esIndex: "", + docTypeForOffset: "kafkaoffsetinfo", isAlertEnabled: false, smtpServer: "", smtpAuth: false, diff --git a/src/main/resources/static/views/setting.html b/src/main/resources/static/views/setting.html index be688ca..6e682f8 100644 --- a/src/main/resources/static/views/setting.html +++ b/src/main/resources/static/views/setting.html @@ -44,11 +44,12 @@

Elasticsearch

ng-model="settingForm.esIndex" placeholder="Like logx_healthcheck_kafka, whatever"> - + class="form-control" id="inputESDocType" name="docTypeForOffset" + ng-model="settingForm.docTypeForOffset" + placeholder="Leave it blank, it'll set with [kafkaoffsetinfo]"> +

Alerting

diff --git a/src/test/java/com/chickling/kmonitor/test/EsSearchTest.java b/src/test/java/com/chickling/kmonitor/test/EsSearchTest.java index 403c7a6..26e26da 100644 --- a/src/test/java/com/chickling/kmonitor/test/EsSearchTest.java +++ b/src/test/java/com/chickling/kmonitor/test/EsSearchTest.java @@ -14,8 +14,7 @@ public class EsSearchTest { public static void main(String[] args) { ElasticsearchUtil es = new ElasticsearchUtil("10.16.238.82:9300,10.16.238.83:9300,10.16.238.84:9300"); - es.setIndexAndType("logx_healthcheck_test"); - List result = es.offsetHistory("kafkaoffset", "testkafka", "EC2_Test"); + List result = es.offsetHistory("logx_healthcheck_test", "kafkaoffset", "testkafka", "EC2_Test"); System.out.println(result); } From b95cd390946d29ccdf34ea83b48fd773afe369c0 Mon Sep 17 00:00:00 2001 From: "Hulva Luva.H" Date: Tue, 18 Jul 2017 17:53:54 +0800 Subject: [PATCH 2/6] in progress --- .../chickling/kmonitor/alert/TaskManager.java | 17 - .../kmonitor/controller/AlertController.java | 10 +- src/main/resources/static/scripts/alerting.js | 253 --------------- src/main/resources/static/scripts/app.js | 18 +- .../resources/static/scripts/controllers.js | 294 ++++++++++++++++-- src/main/resources/static/style.css | 188 +++++++++++ src/main/resources/static/views/topic.html | 12 +- 7 files changed, 475 insertions(+), 317 deletions(-) diff --git a/src/main/java/com/chickling/kmonitor/alert/TaskManager.java b/src/main/java/com/chickling/kmonitor/alert/TaskManager.java index 186c69d..dd8ea56 100644 --- a/src/main/java/com/chickling/kmonitor/alert/TaskManager.java +++ b/src/main/java/com/chickling/kmonitor/alert/TaskManager.java @@ -53,24 +53,7 @@ public static boolean exits(TaskContent taskContent) { return false; } - public static void refreshTask(TaskContent taskContent) { - String taskFilePath = taskFolder + "/" + taskContent.getGroup() + "-" + taskContent.getTopic() + ".task"; - try { - PrintWriter writer = new PrintWriter(taskFilePath); - writer.println(new Gson().toJson(taskContent)); - writer.close(); - Map task = new HashMap(); - task.put(taskContent.getTopic(), taskContent); - tasks.put(taskContent.getGroup(), task); - } catch (Exception e) { - logger.error("refreshTask to file failed!", e); - } - } - public static void addTask(TaskContent taskContent) throws JSONException { - if (exits(taskContent)) { - return; - } Map task = null; if (tasks.containsKey(taskContent.getGroup())) { task = tasks.get(taskContent.getGroup()); diff --git a/src/main/java/com/chickling/kmonitor/controller/AlertController.java b/src/main/java/com/chickling/kmonitor/controller/AlertController.java index e8a6942..04e57c3 100644 --- a/src/main/java/com/chickling/kmonitor/controller/AlertController.java +++ b/src/main/java/com/chickling/kmonitor/controller/AlertController.java @@ -36,15 +36,9 @@ public String isAlertEnabled() { return response.toString(); } - @RequestMapping(value = "/new", method = RequestMethod.POST) - public Set post(@RequestBody TaskContent taskContent) { - TaskManager.saveTaskToFileAndAddToTasks(taskContent); - return TaskManager.getTasks(); - } - - @RequestMapping(value = { "/update" }, method = RequestMethod.POST) + @RequestMapping(value = { "/task" }, method = RequestMethod.POST) public Set put(@RequestBody TaskContent taskContent) { - TaskManager.refreshTask(taskContent); + TaskManager.saveTaskToFileAndAddToTasks(taskContent); return TaskManager.getTasks(); } diff --git a/src/main/resources/static/scripts/alerting.js b/src/main/resources/static/scripts/alerting.js index f51a820..4169920 100644 --- a/src/main/resources/static/scripts/alerting.js +++ b/src/main/resources/static/scripts/alerting.js @@ -1,73 +1,3 @@ -function newAlertInOffsetHistory() { - $('#taskModal') - .on( - 'show.bs.modal', - function(event) { - var button = $(event.relatedTarget); - var topic = button.data('topic'); - var group = button.data('group'); - var modal = $(this); - $("#inputTopicName").empty().append( - '

' + topic + '

'); - $('#create-consumer').empty().append( - '

' + group + '

'); - document.getElementById("inputThreshold").style.borderColor = ""; - document.getElementById("inputEmail").style.borderColor = ""; - modal.find('.modal-title').text('New Task'); - modal.find('.modal-body #topic').val(topic); - modal.find('.modal-body #group').val(group); - }); - - $("#submitTask") - .click( - function() { - var frm = $('#taskForm'); - var go = true; - var inputThreshold = $("#inputThreshold").val(); - if (!inputThreshold || inputThreshold === "") { - document.getElementById("inputThreshold").style.borderColor = "red"; - go = false; - } - var inputDiapause = $("#inputDiapause").val(); - if (!inputDiapause || inputDiapause === "") { - document.getElementById("inputDiapause").style.borderColor = "red"; - go = false; - } - var inputEmail = $("#inputEmail").val(); - if (!inputEmail || inputEmail === "") { - document.getElementById("inputEmail").style.borderColor = "red"; - go = false; - } - if (!go) { - go = true; - return; - } - - var sendData = formArrToObject(frm.serializeArray()); - $.ajax({ - beforeSend : function(xhrObj) { - xhrObj.setRequestHeader("Content-Type", - "application/json"); - xhrObj.setRequestHeader("Accept", - "application/json"); - }, - type : frm.attr('method'), - url : frm.attr('action'), - dataType : "json", - data : JSON.stringify(sendData), - success : function(response) { - $('#taskModal').modal('hide'); - swal({ - title : "Task created!", - type : "success", - timer : 1000, - showConfirmButton : false - }); - } - }); - }); -} - function updateTaskGroup(groupcb) { $('.chosen-select-group').trigger("chosen:updated").chosen({ width : "95%", @@ -111,189 +41,6 @@ function generateGroupSelect(topic, groups) { }); } -function newAlert(reloadcb) { - $('#taskModal') - .on( - 'show.bs.modal', - function(event) { - var button = $(event.relatedTarget) // Button that - var modal = $(this); - $('.chosen-select-topic') - .trigger("chosen:updated") - .chosen({ - width : "100%", - no_results_text : 'Oops, no such Topic!' - }) - .on( - 'change', - function(evt, params) { - var choosedTopic = $( - '#inputTopicName_chosen .chosen-single span') - .text(); - $.ajax({ - url : "activeconsumers/" - + choosedTopic, - type : "GET", - success : function(groups) { - generateGroupSelect( - choosedTopic, - groups); - } - }); - $('#create-consumer') - .empty() - .append( - '
'); - }); - $('#create-consumer') - .empty() - .append( - ''); - var select = $('#taskForm .chosen-select-group'); - select.find('option').remove().end().append( - ''); - $('.chosen-select-group').chosen({ - width : "100%", - no_results_text : 'Oops, no such Group!' - }) - }); - - $("#submitTask") - .click( - function() { - var frm = $('#taskForm'); - var go = true; - var inputThreshold = $("#inputThreshold").val(); - if (!inputThreshold || inputThreshold === "") { - document.getElementById("inputThreshold").style.borderColor = "red"; - go = false; - } - var inputDiapause = $("#inputDiapause").val(); - if (!inputDiapause || inputDiapause === "") { - document.getElementById("inputDiapause").style.borderColor = "red"; - go = false; - } - var inputEmail = $("#inputEmail").val(); - if (!inputEmail || inputEmail === "") { - document.getElementById("inputEmail").style.borderColor = "red"; - go = false; - } - if (!go) { - go = true; - return; - } - - var sendData = formArrToObject(frm.serializeArray()); - var choosedTopic = $( - '#inputTopicName_chosen .chosen-single span') - .text(); - sendData.topic = choosedTopic; - $.ajax({ - beforeSend : function(xhrObj) { - xhrObj.setRequestHeader("Content-Type", - "application/json"); - xhrObj.setRequestHeader("Accept", - "application/json"); - }, - type : frm.attr('method'), - url : frm.attr('action'), - dataType : "json", - data : JSON.stringify(sendData), - success : function(response) { - $('#taskModal').modal('hide'); - swal({ - title : "Task created!", - type : "success", - timer : 1000, - showConfirmButton : false - }); - window.location.reload(); - // reloadcb(response) - } - }); - }); -} - -function alertTaskDetail(cb) { - $('#taskDetailModal') - .on( - 'show.bs.modal', - function(event) { - var button = $(event.relatedTarget); - var group = button.data('group'); - var topic = button.data('topic'); - var threshold = button.data('threshold'); - var diapause = button.data('diapause'); - var mailTo = button.data('mailto'); - document.getElementById("taskDetail-inputThreshold").style.borderColor = ""; - document.getElementById("taskDetail-inputDiapause").style.borderColor = ""; - document.getElementById("taskDetail-inputEmail").style.borderColor = ""; - var modal = $(this); - modal.find('.modal-body #taskDetail-inputTopic').val( - topic); - modal.find('.modal-body #taskDetail-inputConsumer') - .val(group); - $('#taskDetail-inputTopic').prop('readonly', true); - $('#taskDetail-inputConsumer').prop('readonly', true); - // Message Lag Threshold - modal.find('.modal-body #taskDetail-inputThreshold') - .val(threshold); - // diapause - modal.find('.modal-body #taskDetail-inputDiapause') - .val(diapause); - // Mail to - modal.find('.modal-body #taskDetail-inputEmail').val( - mailTo); - }); - - $("#updateTask") - .click( - function() { - var frm = $('#taskDetailForm'); - var inputThreshold = $("#taskDetail-inputThreshold") - .val(); - if (!inputThreshold || inputThreshold === "") { - document - .getElementById("taskDetail-inputThreshold").style.borderColor = "red"; - return; - } - var inputDiapause = $("#taskDetail-inputDiapause") - .val(); - if (!inputDiapause || inputDiapause === "") { - document.getElementById("taskDetail-inputDiapause").style.borderColor = "red"; - return; - } - var inputEmail = $("#taskDetail-inputEmail").val(); - if (!inputEmail || inputEmail === "") { - document.getElementById("taskDetail-inputEmail").style.borderColor = "red"; - return; - } - - var sendData = formArrToObject(frm.serializeArray()); - $.ajax({ - beforeSend : function(xhrObj) { - xhrObj.setRequestHeader("Content-Type", - "application/json"); - xhrObj.setRequestHeader("Accept", - "application/json"); - }, - type : 'POST', - url : frm.attr('action'), - data : JSON.stringify(sendData), - success : function(response) { - $('#taskDetailModal').modal('hide'); - swal({ - title : "Task updated!", - type : "success", - timer : 1000, - showConfirmButton : false - }); - cb(response); - } - }); - }); -} - function formArrToObject(formArray) { var obj = {}; $.each(formArray, function(i, pair) { diff --git a/src/main/resources/static/scripts/app.js b/src/main/resources/static/scripts/app.js index 33d6300..b037471 100644 --- a/src/main/resources/static/scripts/app.js +++ b/src/main/resources/static/scripts/app.js @@ -146,19 +146,19 @@ angular.module("offsetapp.services", [ "ngResource" ]) topic : topic }, processConsumer(cb)); }, - onShowNewAlertModal : function(isInOffsetHistory, cb) { - if(isInOffsetHistory){ - cb(newAlertInOffsetHistory()); - }else{ - newAlert(cb); - } + newAlert : function(_url, requestBody, cb) { + $http({ + method: 'POST', + url: _url, + headers: {'Content-Type': 'application/json'}, + data: requestBody + }).success(function (response) { + cb(response); + }); }, listTasks : function(cb) { return $http.get("./alerting/tasks"); }, - onShowAlertTaskDetailModal : function(cb) { - return alertTaskDetail(cb); - }, deleteTask: function(task, cb) { return $http.delete("./alerting/delete/" + task.group + "-" + task.topic) .then( diff --git a/src/main/resources/static/scripts/controllers.js b/src/main/resources/static/scripts/controllers.js index e7f222b..8b44151 100644 --- a/src/main/resources/static/scripts/controllers.js +++ b/src/main/resources/static/scripts/controllers.js @@ -127,7 +127,92 @@ angular.module('offsetapp.controllers', [ "offsetapp.services" ]) } }); - offsetinfo.onShowNewAlertModal(true, function(d) {}); + $scope.task = { + group : "", + topic : "", + diapause : "", + threshold : "", + mailTo : "" + } + + $('#taskModal') + .on( + 'show.bs.modal', + function(event) { + var button = $(event.relatedTarget); + var topic = button.data('topic'); + var group = button.data('group'); + var modal = $(this); + var isTaskExists = false; + offsetinfo.listTasks().success(function(d) { + d.forEach(function(task) { + if (task.group === group && task.topic === topic) { + isTaskExists = true; + $scope.task = task; + modal.find('.modal-title').text('Task already exists!'); + modal.find('.modal-body #topic').val(topic); + modal.find('.modal-body #group').val(group); + modal.find('.modal-body #threshold').val(task.threshold); + $('#submitTask').prop('disabled', true); + document.getElementById('submitTask').className += " disabled"; + $scope.$watch('task', function() { + console.log($scope.task); + }); + } + }); + }); + + if(isTaskExists){ + return; + } + $("#inputTopicName").empty().append( + '

' + topic + '

'); + $('#create-consumer').empty().append( + '

' + group + '

'); + document.getElementById("inputThreshold").style.borderColor = ""; + document.getElementById("inputEmail").style.borderColor = ""; + modal.find('.modal-title').text('New Task'); + modal.find('.modal-body #topic').val(topic); + modal.find('.modal-body #group').val(group); + }); + + $("#submitTask") + .click( + function() { + var frm = $('#taskForm'); + var go = true; + var inputThreshold = $("#inputThreshold").val(); + if (!inputThreshold || inputThreshold === "") { + document.getElementById("inputThreshold").style.borderColor = "red"; + go = false; + } + var inputDiapause = $("#inputDiapause").val(); + if (!inputDiapause || inputDiapause === "") { + document.getElementById("inputDiapause").style.borderColor = "red"; + go = false; + } + var inputEmail = $("#inputEmail").val(); + if (!inputEmail || inputEmail === "") { + document.getElementById("inputEmail").style.borderColor = "red"; + go = false; + } + if (!go) { + go = true; + return; + } + + var sendData = formArrToObject(frm.serializeArray()); + offsetinfo.newAlert('alerting/task', sendData, function(d) { + $scope.tasks = d; + $('#taskModal').modal('hide'); + swal({ + title : "Task created!", + type : "success", + timer : 1000, + showConfirmButton : false + }); + }); + }); $scope.offsetHistoryByDateRange = function() { $scope.loading = true; offsetinfo.queryOffsetHistoryWithOptions(JSON.stringify($scope.rangeform), function(d) { @@ -173,11 +258,75 @@ angular.module('offsetapp.controllers', [ "offsetapp.services" ]) $scope.tasks = d; $scope.loading = false; }); - offsetinfo.onShowAlertTaskDetailModal(function(d) { - $scope.tasks = d.data; - $scope.loading = false; - location.reload(); - }); + + $('#taskDetailModal') + .on( + 'show.bs.modal', + function(event) { + var button = $(event.relatedTarget); + var group = button.data('group'); + var topic = button.data('topic'); + var threshold = button.data('threshold'); + var diapause = button.data('diapause'); + var mailTo = button.data('mailto'); + document.getElementById("taskDetail-inputThreshold").style.borderColor = ""; + document.getElementById("taskDetail-inputDiapause").style.borderColor = ""; + document.getElementById("taskDetail-inputEmail").style.borderColor = ""; + var modal = $(this); + modal.find('.modal-body #taskDetail-inputTopic').val( + topic); + modal.find('.modal-body #taskDetail-inputConsumer') + .val(group); + $('#taskDetail-inputTopic').prop('readonly', true); + $('#taskDetail-inputConsumer').prop('readonly', true); + // Message Lag Threshold + modal.find('.modal-body #taskDetail-inputThreshold') + .val(threshold); + // diapause + modal.find('.modal-body #taskDetail-inputDiapause') + .val(diapause); + // Mail to + modal.find('.modal-body #taskDetail-inputEmail').val( + mailTo); + }); + + $("#updateTask") + .click( + function() { + var frm = $('#taskDetailForm'); + var inputThreshold = $("#taskDetail-inputThreshold") + .val(); + if (!inputThreshold || inputThreshold === "") { + document + .getElementById("taskDetail-inputThreshold").style.borderColor = "red"; + return; + } + var inputDiapause = $("#taskDetail-inputDiapause") + .val(); + if (!inputDiapause || inputDiapause === "") { + document.getElementById("taskDetail-inputDiapause").style.borderColor = "red"; + return; + } + var inputEmail = $("#taskDetail-inputEmail").val(); + if (!inputEmail || inputEmail === "") { + document.getElementById("taskDetail-inputEmail").style.borderColor = "red"; + return; + } + + var sendData = formArrToObject(frm.serializeArray()); + + offsetinfo.newAlert('alerting/task', sendData, function(d) { + $scope.tasks = d; + $('#taskModal').modal('hide'); + swal({ + title : "Task updated!", + type : "success", + timer : 1000, + showConfirmButton : false + }); + }); + }); + $scope.taskform = { group: "", topic: "", @@ -201,14 +350,99 @@ angular.module('offsetapp.controllers', [ "offsetapp.services" ]) }); } - offsetinfo.onShowNewAlertModal(false, function(d) { - // TODO reload task list here when new task add? - }); + $('#taskModal') + .on( + 'show.bs.modal', + function(event) { + var button = $(event.relatedTarget) // Button that + var modal = $(this); + $('.chosen-select-topic') + .trigger("chosen:updated") + .chosen({ + width : "100%", + no_results_text : 'Oops, no such Topic!' + }) + .on( + 'change', + function(evt, params) { + var choosedTopic = $( + '#inputTopicName_chosen .chosen-single span') + .text(); + $.ajax({ + url : "activeconsumers/" + + choosedTopic, + type : "GET", + success : function(groups) { + generateGroupSelect( + choosedTopic, + groups); + } + }); + $('#create-consumer') + .empty() + .append( + '
'); + }); + $('#create-consumer') + .empty() + .append( + ''); + var select = $('#taskForm .chosen-select-group'); + select.find('option').remove().end().append( + ''); + $('.chosen-select-group').chosen({ + width : "100%", + no_results_text : 'Oops, no such Group!' + }) + }); }else{ $scope.alertEnabled = false; $scope.loading = false; $('#newTask').prop('disabled', true); } + + $("#submitTask") + .click( + function() { + var frm = $('#taskForm'); + var go = true; + var inputThreshold = $("#inputThreshold").val(); + if (!inputThreshold || inputThreshold === "") { + document.getElementById("inputThreshold").style.borderColor = "red"; + go = false; + } + var inputDiapause = $("#inputDiapause").val(); + if (!inputDiapause || inputDiapause === "") { + document.getElementById("inputDiapause").style.borderColor = "red"; + go = false; + } + var inputEmail = $("#inputEmail").val(); + if (!inputEmail || inputEmail === "") { + document.getElementById("inputEmail").style.borderColor = "red"; + go = false; + } + if (!go) { + go = true; + return; + } + + var sendData = formArrToObject(frm.serializeArray()); + var choosedTopic = $( + '#inputTopicName_chosen .chosen-single span') + .text(); + sendData.topic = choosedTopic; + + offsetinfo.newAlert('alerting/task', sendData, function(d) { + $scope.tasks = d; + $('#taskModal').modal('hide'); + swal({ + title : "Task created!", + type : "success", + timer : 1000, + showConfirmButton : false + }); + }); + }); }); } ]) .controller("SettingCtrl", [ "$scope", "offsetinfo", @@ -245,23 +479,33 @@ angular.module('offsetapp.controllers', [ "offsetapp.services" ]) $scope.settingForm.mailSubject = ""; } - offsetinfo.postSetting($scope.settingForm, function(d) { - if(d.isSystemReady!=undefined && d.isSystemReady){ - swal({ - title : "Setting updated! Kmonitor is ready to use!", - type : "success", - timer : 1000, - showConfirmButton : false - }); - }else{ - swal({ - title : "Something went wrong!", - text: d.message, - type : "error", - showConfirmButton : true + swal({ + title: "Submit setting", + text: "Make sure you have input everything right then click OK", + type: "info", + showCancelButton: false, + closeOnConfirm: false, + showLoaderOnConfirm: true + }, + function(){ + offsetinfo.postSetting($scope.settingForm, function(d) { + if(d.isSystemReady!=undefined && d.isSystemReady){ + swal({ + title : "Setting updated! Kmonitor is ready to use!", + type : "success", + timer : 1500, + showConfirmButton : false + }); + }else{ + swal({ + title : "Something went wrong!", + text: d.message, + type : "error", + showConfirmButton : true + }); + } }); - } - }); + }); } } ]) /*.controller("BrokerCtrl", [ "$scope", "$routeParams", "offsetinfo", diff --git a/src/main/resources/static/style.css b/src/main/resources/static/style.css index d117198..807109c 100644 --- a/src/main/resources/static/style.css +++ b/src/main/resources/static/style.css @@ -424,4 +424,192 @@ footer p { 100% { -moz-transform: translate3d(10px, 10px, 0); } +} + +#escapingBallG{ + position:relative; + width:96px; + height:33px; + margin:auto; +} + +.escapingBallG{ + background-color:rgb(230,154,230); + position:absolute; + top:0; + left:0; + width:33px; + height:33px; + border-radius:16px; + -o-border-radius:16px; + -ms-border-radius:16px; + -webkit-border-radius:16px; + -moz-border-radius:16px; + animation-name:bounce_escapingBallG; + -o-animation-name:bounce_escapingBallG; + -ms-animation-name:bounce_escapingBallG; + -webkit-animation-name:bounce_escapingBallG; + -moz-animation-name:bounce_escapingBallG; + animation-duration:1.5s; + -o-animation-duration:1.5s; + -ms-animation-duration:1.5s; + -webkit-animation-duration:1.5s; + -moz-animation-duration:1.5s; + animation-iteration-count:infinite; + -o-animation-iteration-count:infinite; + -ms-animation-iteration-count:infinite; + -webkit-animation-iteration-count:infinite; + -moz-animation-iteration-count:infinite; + animation-timing-function:linear; + -o-animation-timing-function:linear; + -ms-animation-timing-function:linear; + -webkit-animation-timing-function:linear; + -moz-animation-timing-function:linear; + animation-delay:0s; + -o-animation-delay:0s; + -ms-animation-delay:0s; + -webkit-animation-delay:0s; + -moz-animation-delay:0s; + transform:scale(0.5, 1); + -o-transform:scale(0.5, 1); + -ms-transform:scale(0.5, 1); + -webkit-transform:scale(0.5, 1); + -moz-transform:scale(0.5, 1); +} + + + +@keyframes bounce_escapingBallG{ + 0%{ + left:0px; + transform:scale(0.5, 1); + } + + 25%{ + left:31px; + transform:scale(1, 0.5); + } + + 50%{ + left:79px; + transform:scale(0.5, 1); + } + + 75%{ + left:31px; + transform:scale(1, 0.5); + } + + 100%{ + left:0px; + transform:scale(0.5, 1); + } +} + +@-o-keyframes bounce_escapingBallG{ + 0%{ + left:0px; + -o-transform:scale(0.5, 1); + } + + 25%{ + left:31px; + -o-transform:scale(1, 0.5); + } + + 50%{ + left:79px; + -o-transform:scale(0.5, 1); + } + + 75%{ + left:31px; + -o-transform:scale(1, 0.5); + } + + 100%{ + left:0px; + -o-transform:scale(0.5, 1); + } +} + +@-ms-keyframes bounce_escapingBallG{ + 0%{ + left:0px; + -ms-transform:scale(0.5, 1); + } + + 25%{ + left:31px; + -ms-transform:scale(1, 0.5); + } + + 50%{ + left:79px; + -ms-transform:scale(0.5, 1); + } + + 75%{ + left:31px; + -ms-transform:scale(1, 0.5); + } + + 100%{ + left:0px; + -ms-transform:scale(0.5, 1); + } +} + +@-webkit-keyframes bounce_escapingBallG{ + 0%{ + left:0px; + -webkit-transform:scale(0.5, 1); + } + + 25%{ + left:31px; + -webkit-transform:scale(1, 0.5); + } + + 50%{ + left:79px; + -webkit-transform:scale(0.5, 1); + } + + 75%{ + left:31px; + -webkit-transform:scale(1, 0.5); + } + + 100%{ + left:0px; + -webkit-transform:scale(0.5, 1); + } +} + +@-moz-keyframes bounce_escapingBallG{ + 0%{ + left:0px; + -moz-transform:scale(0.5, 1); + } + + 25%{ + left:31px; + -moz-transform:scale(1, 0.5); + } + + 50%{ + left:79px; + -moz-transform:scale(0.5, 1); + } + + 75%{ + left:31px; + -moz-transform:scale(1, 0.5); + } + + 100%{ + left:0px; + -moz-transform:scale(0.5, 1); + } } \ No newline at end of file diff --git a/src/main/resources/static/views/topic.html b/src/main/resources/static/views/topic.html index eef7d37..d957cb8 100644 --- a/src/main/resources/static/views/topic.html +++ b/src/main/resources/static/views/topic.html @@ -34,13 +34,15 @@
- +
- +
@@ -56,7 +58,7 @@ send Diapause
@@ -64,7 +66,7 @@
From 38b6d5b5c2a43d49afc6fb2015ac553a645d6d6d Mon Sep 17 00:00:00 2001 From: "Hulva Luva.H" Date: Tue, 18 Jul 2017 20:00:05 +0800 Subject: [PATCH 3/6] regular commit --- .../com/chickling/kmonitor/initialize/SystemManager.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/chickling/kmonitor/initialize/SystemManager.java b/src/main/java/com/chickling/kmonitor/initialize/SystemManager.java index 03a58ed..bebb9e9 100644 --- a/src/main/java/com/chickling/kmonitor/initialize/SystemManager.java +++ b/src/main/java/com/chickling/kmonitor/initialize/SystemManager.java @@ -19,6 +19,7 @@ import com.chickling.kmonitor.alert.TaskContent; import com.chickling.kmonitor.alert.TaskHandler; import com.chickling.kmonitor.alert.TaskManager; +import com.chickling.kmonitor.alert.WorkerThreadFactory; import com.chickling.kmonitor.config.AppConfig; import com.chickling.kmonitor.core.OffsetGetter; import com.chickling.kmonitor.core.ZKOffsetGetter; @@ -45,7 +46,7 @@ public class SystemManager { private static final int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors(); private static final ExecutorService kafkaInfoCollectAndSavePool = Executors - .newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE); + .newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, new WorkerThreadFactory("KafkaInfo Collector")); public static BlockingQueue offsetInfoCacheQueue; @@ -106,7 +107,7 @@ private static void initSystem() { if (scheduler != null) scheduler.shutdownNow(); - scheduler = Executors.newScheduledThreadPool(2); + scheduler = Executors.newScheduledThreadPool(2, new WorkerThreadFactory("FixedRateSchedule")); if (config.getIsAlertEnabled()) { initAlert(config); @@ -149,7 +150,7 @@ private static void initAlert(AppConfig config) { if (worker != null) { worker.shutdownNow(); } - worker = Executors.newFixedThreadPool(corePoolSize); + worker = Executors.newFixedThreadPool(corePoolSize, new WorkerThreadFactory("AlertTaskChecker")); for (int i = 0; i < corePoolSize; i++) { worker.submit(new TaskHandler()); From 1e69807753c3db18a412a023e8b7b8bb2d58455c Mon Sep 17 00:00:00 2001 From: "Hulva Luva.H" Date: Wed, 19 Jul 2017 09:50:38 +0800 Subject: [PATCH 4/6] =?UTF-8?q?alert=20ui=20=E5=BE=AE=E8=B0=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kmonitor/utils/ElasticsearchUtil.java | 14 ++- .../resources/static/scripts/controllers.js | 94 +++++++++++-------- src/main/resources/static/views/topic.html | 29 +++--- 3 files changed, 86 insertions(+), 51 deletions(-) diff --git a/src/main/java/com/chickling/kmonitor/utils/ElasticsearchUtil.java b/src/main/java/com/chickling/kmonitor/utils/ElasticsearchUtil.java index 358cff7..11dccfb 100644 --- a/src/main/java/com/chickling/kmonitor/utils/ElasticsearchUtil.java +++ b/src/main/java/com/chickling/kmonitor/utils/ElasticsearchUtil.java @@ -130,7 +130,12 @@ public List scrollsSearcher(OffsetHistoryQueryParams params, Strin break; } try { - int step = searchHits.length / parallism; + int step = 1; + if (searchHits.length < parallism) { + step = 1; + } else { + step = searchHits.length / parallism; + } Future> future = null; for (int i = 0; i < searchHits.length; i = i + step) { int to = i + step < searchHits.length ? i + step : searchHits.length; @@ -279,7 +284,12 @@ public List offsetHistory(String indexPrefix, String docType, Stri break; } try { - int step = searchHits.length / parallism; + int step = 1; + if (searchHits.length < parallism) { + step = 1; + } else { + step = searchHits.length / parallism; + } Future> future = null; for (int i = 0; i < searchHits.length; i = i + step) { int to = i + step < searchHits.length ? i + step : searchHits.length; diff --git a/src/main/resources/static/scripts/controllers.js b/src/main/resources/static/scripts/controllers.js index 8b44151..604776a 100644 --- a/src/main/resources/static/scripts/controllers.js +++ b/src/main/resources/static/scripts/controllers.js @@ -128,52 +128,43 @@ angular.module('offsetapp.controllers', [ "offsetapp.services" ]) }); $scope.task = { - group : "", - topic : "", + group : $routeParams.group, + topic : $routeParams.topic, diapause : "", threshold : "", mailTo : "" } + var isTaskExists = false; + offsetinfo.listTasks().success(function(d) { + d.forEach(function(t) { + if (t.group === $routeParams.group && t.topic === $routeParams.topic) { + isTaskExists = true; + $scope.task = t; + } + }); + }); + $('#taskModal') .on( 'show.bs.modal', function(event) { var button = $(event.relatedTarget); - var topic = button.data('topic'); - var group = button.data('group'); + var topic = $routeParams.group; + var group = $routeParams.topic; var modal = $(this); - var isTaskExists = false; - offsetinfo.listTasks().success(function(d) { - d.forEach(function(task) { - if (task.group === group && task.topic === topic) { - isTaskExists = true; - $scope.task = task; - modal.find('.modal-title').text('Task already exists!'); - modal.find('.modal-body #topic').val(topic); - modal.find('.modal-body #group').val(group); - modal.find('.modal-body #threshold').val(task.threshold); - $('#submitTask').prop('disabled', true); - document.getElementById('submitTask').className += " disabled"; - $scope.$watch('task', function() { - console.log($scope.task); - }); - } - }); - }); - + modal.find('.modal-body #topic').val(topic); + modal.find('.modal-body #group').val(group); if(isTaskExists){ + modal.find('.modal-title').text('Task already exists!'); + modal.find('.modal-body #inputThreshold').val($scope.task.threshold); + modal.find('.modal-body #inputDiapause').val($scope.task.diapause); + modal.find('.modal-body #inputEmail').val($scope.task.mailTo); return; } - $("#inputTopicName").empty().append( - '

' + topic + '

'); - $('#create-consumer').empty().append( - '

' + group + '

'); document.getElementById("inputThreshold").style.borderColor = ""; document.getElementById("inputEmail").style.borderColor = ""; modal.find('.modal-title').text('New Task'); - modal.find('.modal-body #topic').val(topic); - modal.find('.modal-body #group').val(group); }); $("#submitTask") @@ -202,16 +193,45 @@ angular.module('offsetapp.controllers', [ "offsetapp.services" ]) } var sendData = formArrToObject(frm.serializeArray()); - offsetinfo.newAlert('alerting/task', sendData, function(d) { - $scope.tasks = d; - $('#taskModal').modal('hide'); + if(inputThreshold === $scope.task.threshold+'' && inputDiapause === $scope.task.diapause+'' && inputEmail === $scope.task.mailTo){ swal({ - title : "Task created!", - type : "success", - timer : 1000, - showConfirmButton : false + title: "Are you sure?", + text: "The task is already exists and you have do no change!", + type: "warning", + showCancelButton: true, + confirmButtonColor: "#DD6B55", + confirmButtonText: "Yes, still submit!", + cancelButtonText: "No, cancel", + closeOnConfirm: false, + closeOnCancel: true + }, + function(isConfirm){ + if (!isConfirm) { + return; + } + offsetinfo.newAlert('alerting/task', sendData, function(d) { + $scope.tasks = d; + $('#taskModal').modal('hide'); + swal({ + title : "Task created!", + type : "success", + timer : 1000, + showConfirmButton : false + }); + }); + }); + }else{ + offsetinfo.newAlert('alerting/task', sendData, function(d) { + $scope.tasks = d; + $('#taskModal').modal('hide'); + swal({ + title : "Task created!", + type : "success", + timer : 1000, + showConfirmButton : false + }); }); - }); + } }); $scope.offsetHistoryByDateRange = function() { $scope.loading = true; diff --git a/src/main/resources/static/views/topic.html b/src/main/resources/static/views/topic.html index d957cb8..64585aa 100644 --- a/src/main/resources/static/views/topic.html +++ b/src/main/resources/static/views/topic.html @@ -31,18 +31,23 @@
- -
- + +
+ +

{{task.topic}}

+
+
-
- + Group +
+ +

{{task.group}}

+
+
@@ -58,7 +63,7 @@ send Diapause
@@ -66,7 +71,7 @@
From ccb3dd1d840a074ddf50333b213939cb5642e8b4 Mon Sep 17 00:00:00 2001 From: "Hulva Luva.H" Date: Wed, 19 Jul 2017 18:07:20 +0800 Subject: [PATCH 5/6] add Elasticsearch RESTAPI implement and some JMX stuff; in progress --- .../chickling/kmonitor/config/AppConfig.java | 9 + .../controller/JMXMetricController.java | 33 ++ .../core/db/ElasticsearchOffsetDB.java | 13 +- .../kmonitor/core/jmx/JMXExecutor.java | 14 + .../chickling/kmonitor/core/jmx/KafkaJMX.java | 68 ++++ .../kmonitor/core/jmx/KafkaMetrics.java | 147 +++++++ .../kmonitor/core/jmx/MeterMetric.java | 76 ++++ .../kmonitor/core/jmx/metric/OSMetric.java | 54 +++ .../kmonitor/initialize/SystemManager.java | 4 +- .../chickling/kmonitor/utils/CommonUtils.java | 16 + .../utils/elasticsearch/Ielasticsearch.java | 28 ++ .../javaapi/ElasticsearchJavaUtil.java} | 27 +- .../restapi/ElasticsearchRESTUtil.java | 376 ++++++++++++++++++ .../restapi/ScrollSearchTemplate.java | 20 + src/main/resources/static/scripts/app.js | 8 +- .../resources/static/scripts/cluster-viz.js | 8 +- .../resources/static/scripts/controllers.js | 39 +- src/main/resources/static/views/broker.html | 73 ++++ .../resources/static/views/cluster-viz.html | 62 +++ src/main/resources/static/views/setting.html | 8 +- .../static/views/topic-consumers.html | 149 +++++-- .../resources/static/views/topic-detail.html | 101 ++++- .../chickling/kmonitor/test/EsSearchTest.java | 4 +- 23 files changed, 1233 insertions(+), 104 deletions(-) create mode 100644 src/main/java/com/chickling/kmonitor/controller/JMXMetricController.java create mode 100644 src/main/java/com/chickling/kmonitor/core/jmx/JMXExecutor.java create mode 100644 src/main/java/com/chickling/kmonitor/core/jmx/KafkaJMX.java create mode 100644 src/main/java/com/chickling/kmonitor/core/jmx/KafkaMetrics.java create mode 100644 src/main/java/com/chickling/kmonitor/core/jmx/MeterMetric.java create mode 100644 src/main/java/com/chickling/kmonitor/core/jmx/metric/OSMetric.java create mode 100644 src/main/java/com/chickling/kmonitor/utils/elasticsearch/Ielasticsearch.java rename src/main/java/com/chickling/kmonitor/utils/{ElasticsearchUtil.java => elasticsearch/javaapi/ElasticsearchJavaUtil.java} (92%) create mode 100644 src/main/java/com/chickling/kmonitor/utils/elasticsearch/restapi/ElasticsearchRESTUtil.java create mode 100644 src/main/java/com/chickling/kmonitor/utils/elasticsearch/restapi/ScrollSearchTemplate.java create mode 100644 src/main/resources/static/views/broker.html diff --git a/src/main/java/com/chickling/kmonitor/config/AppConfig.java b/src/main/java/com/chickling/kmonitor/config/AppConfig.java index a042bf0..d10323c 100644 --- a/src/main/java/com/chickling/kmonitor/config/AppConfig.java +++ b/src/main/java/com/chickling/kmonitor/config/AppConfig.java @@ -5,6 +5,7 @@ * */ public class AppConfig { + private String apiType; private String esHosts; private String esIndex; private String docTypeForOffset; @@ -30,6 +31,14 @@ public class AppConfig { private Long excludeByLastSeen = 2592000L; + public String getApiType() { + return apiType; + } + + public void setApiType(String apiType) { + this.apiType = apiType; + } + public String getZkHosts() { return zkHosts; } diff --git a/src/main/java/com/chickling/kmonitor/controller/JMXMetricController.java b/src/main/java/com/chickling/kmonitor/controller/JMXMetricController.java new file mode 100644 index 0000000..3171c64 --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/controller/JMXMetricController.java @@ -0,0 +1,33 @@ +package com.chickling.kmonitor.controller; + +import java.util.Collections; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +import com.chickling.kmonitor.initialize.SystemManager; + +/** + * + * @author Hulva Luva.H + * @since 2017-07-12 + * + */ + +@RestController +@RequestMapping("/metrics") +public class JMXMetricController { + private static Logger LOG = LoggerFactory.getLogger(JMXMetricController.class); + + @RequestMapping(value = "/brokerTopicMetrics/broker", method = RequestMethod.GET) + public List getBrokerTopicMetrics(@PathVariable String broker, @PathVariable String topic) { + List groups = SystemManager.og.getGroups(); + Collections.sort(groups); + return groups; + } +} diff --git a/src/main/java/com/chickling/kmonitor/core/db/ElasticsearchOffsetDB.java b/src/main/java/com/chickling/kmonitor/core/db/ElasticsearchOffsetDB.java index 82dc85b..2d0f1fd 100644 --- a/src/main/java/com/chickling/kmonitor/core/db/ElasticsearchOffsetDB.java +++ b/src/main/java/com/chickling/kmonitor/core/db/ElasticsearchOffsetDB.java @@ -13,7 +13,9 @@ import com.chickling.kmonitor.model.OffsetInfo; import com.chickling.kmonitor.model.OffsetPoints; import com.chickling.kmonitor.utils.CommonUtils; -import com.chickling.kmonitor.utils.ElasticsearchUtil; +import com.chickling.kmonitor.utils.elasticsearch.Ielasticsearch; +import com.chickling.kmonitor.utils.elasticsearch.javaapi.ElasticsearchJavaUtil; +import com.chickling.kmonitor.utils.elasticsearch.restapi.ElasticsearchRESTUtil; /** * @author Hulva Luva.H @@ -21,14 +23,19 @@ */ public class ElasticsearchOffsetDB implements OffsetDB { - private ElasticsearchUtil esUtil; + private Ielasticsearch esUtil; private String indexPrefix; private String docType; private static final SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); public ElasticsearchOffsetDB(AppConfig config) { - esUtil = new ElasticsearchUtil(config.getEsHosts()); + if (config.getApiType().equalsIgnoreCase("Java API")) { + esUtil = new ElasticsearchJavaUtil(config.getEsHosts()); + } else { + esUtil = new ElasticsearchRESTUtil(config.getEsHosts()); + } + setIndexAndType(config.getEsIndex(), config.getDocTypeForOffset()); } diff --git a/src/main/java/com/chickling/kmonitor/core/jmx/JMXExecutor.java b/src/main/java/com/chickling/kmonitor/core/jmx/JMXExecutor.java new file mode 100644 index 0000000..4b5e2bd --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/core/jmx/JMXExecutor.java @@ -0,0 +1,14 @@ +package com.chickling.kmonitor.core.jmx; + +import javax.management.MBeanServerConnection; + +/** + * @author Hulva Luva.H + * @since 2017-07-11 + * + */ +public interface JMXExecutor { + + public void doWithConnection(MBeanServerConnection mBeanServerConnection); + +} diff --git a/src/main/java/com/chickling/kmonitor/core/jmx/KafkaJMX.java b/src/main/java/com/chickling/kmonitor/core/jmx/KafkaJMX.java new file mode 100644 index 0000000..a4d2575 --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/core/jmx/KafkaJMX.java @@ -0,0 +1,68 @@ +package com.chickling.kmonitor.core.jmx; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; +import javax.rmi.ssl.SslRMIClientSocketFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * @author Hulva Luva.H + * @since 2017-07-11 + * + */ +public class KafkaJMX { + private static Logger LOG = LoggerFactory.getLogger(KafkaJMX.class); + + private Map defaultJmxConnectorProperties = new HashMap(); + + public KafkaJMX() { + initDefaultJMXConnectorProperties(); + } + + private void initDefaultJMXConnectorProperties() { + defaultJmxConnectorProperties.put("jmx.remote.x.request.waiting.timeout", "3000"); + defaultJmxConnectorProperties.put("jmx.remote.x.notification.fetch.timeout", "3000"); + defaultJmxConnectorProperties.put("sun.rmi.transport.connectionTimeout", "3000"); + defaultJmxConnectorProperties.put("sun.rmi.transport.tcp.handshakeTimeout", "3000"); + defaultJmxConnectorProperties.put("sun.rmi.transport.tcp.responseTimeout", "3000"); + } + + public void doWithConnection(String jmxHost, int jmxPort, Optional jmxUser, Optional jmxPasswd, + boolean jmxSSL, JMXExecutor excutor) { + String urlStr = "service:jmx:rmi:///jndi/rmi://" + jmxHost + ":" + jmxPort + "/jmxrmi"; + JMXConnector jmxc = null; + try { + JMXServiceURL url = new JMXServiceURL(urlStr); + // authenticate + Map env = new HashMap(); + String[] credentials = { jmxUser.get(), jmxPasswd.get() }; + env.put(JMXConnector.CREDENTIALS, credentials); + + if (jmxSSL) { // com.sun.management.jmxremote.registry.ssl=true + env.put("com.sun.jndi.rmi.factory.socket", new SslRMIClientSocketFactory()); + } + jmxc = JMXConnectorFactory.connect(url, env); + excutor.doWithConnection(jmxc.getMBeanServerConnection()); + } catch (Exception e) { + LOG.error("KafkaJMX doWithConnection error! " + e.getMessage()); + } finally { + try { + if (jmxc != null) { + jmxc.close(); + } + } catch (IOException e) { + LOG.error("Close JMXConnector error! " + e.getMessage()); + } + } + } + +} diff --git a/src/main/java/com/chickling/kmonitor/core/jmx/KafkaMetrics.java b/src/main/java/com/chickling/kmonitor/core/jmx/KafkaMetrics.java new file mode 100644 index 0000000..ebb0dcb --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/core/jmx/KafkaMetrics.java @@ -0,0 +1,147 @@ +package com.chickling.kmonitor.core.jmx; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import javax.management.Attribute; +import javax.management.AttributeList; +import javax.management.MBeanServerConnection; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.chickling.kmonitor.core.jmx.metric.OSMetric; + +/** + * @author Hulva Luva.H + * @since 2017-07-11 + * + */ +public class KafkaMetrics { + private static Logger LOG = LoggerFactory.getLogger(KafkaMetrics.class); + + // kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower} + + public static final Map objectNames = new HashMap(); + public static final String replicaFetcherManagerMinFetchRate = "replicaFetcherManagerMinFetchRate"; + public static final String replicaFetcherManagerMaxLag = "replicaFetcherManagerMaxLag "; + public static final String kafkaControllerActiveControllerCount = "kafkaControllerActiveControllerCount "; + public static final String kafkaControllerOfflinePartitionsCount = "kafkaControllerOfflinePartitionsCount "; + public static final String logFlushStats = "logFlushStats "; + public static final String operatingSystemObjectName = "operatingSystemObjectName "; + public static final String logSegmentObjectName = "logSegmentObjectName "; + public static final String directoryObjectName = "directoryObjectName "; + + static { + try { + objectNames.put(replicaFetcherManagerMinFetchRate, + new ObjectName("kafka.server:type=ReplicaFetcherManager,name=MinFetchRate,clientId=Replica")); + objectNames.put(replicaFetcherManagerMaxLag, + new ObjectName("kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica")); + objectNames.put(kafkaControllerActiveControllerCount, + new ObjectName("kafka.controller:type=KafkaController,name=ActiveControllerCount")); + objectNames.put(kafkaControllerOfflinePartitionsCount, + new ObjectName("kafka.controller:type=KafkaController,name=OfflinePartitionsCount")); + objectNames.put(logFlushStats, new ObjectName("kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs")); + objectNames.put(operatingSystemObjectName, new ObjectName("java.lang:type=OperatingSystem")); + objectNames.put(logSegmentObjectName, new ObjectName("kafka.log:type=Log,name=*-LogSegments")); + objectNames.put(directoryObjectName, new ObjectName("kafka.log:type=Log,name=*-Directory")); + } catch (MalformedObjectNameException e) { + LOG.error(e.getMessage()); + } + } + + public MeterMetric getBytesInPerSec(MBeanServerConnection mbsc, Optional topicName) { + return getBrokerTopicMetrics(mbsc, "BytesInPerSec", topicName); + } + + public MeterMetric getBytesOutPerSec(MBeanServerConnection mbsc, Optional topicName) { + return getBrokerTopicMetrics(mbsc, "BytesOutPerSec", topicName); + } + + public MeterMetric getBytesRejectedPerSec(MBeanServerConnection mbsc, Optional topicName) { + return getBrokerTopicMetrics(mbsc, "BytesRejectedPerSec", topicName); + } + + public MeterMetric getFailedFetchRequestsPerSec(MBeanServerConnection mbsc, Optional topicName) { + return getBrokerTopicMetrics(mbsc, "FailedFetchRequestsPerSec", topicName); + } + + public MeterMetric getFailedProduceRequestsPerSec(MBeanServerConnection mbsc, Optional topicName) { + return getBrokerTopicMetrics(mbsc, "FailedProduceRequestsPerSec", topicName); + } + + public MeterMetric getMessagesInPerSec(MBeanServerConnection mbsc, Optional topicName) { + return getBrokerTopicMetrics(mbsc, "MessagesInPerSec", topicName); + } + + public MeterMetric getBrokerTopicMetrics(MBeanServerConnection mbsc, String metricName, + Optional topicName) { + return getMeterMetric(mbsc, getObjectName(metricName, topicName)); + } + + private MeterMetric getMeterMetric(MBeanServerConnection mbsc, ObjectName objectName) { + String[] attributes = { "Count", "FifteenMinuteRate", "FiveMinuteRate", "OneMinuteRate", "MeanRate" }; + AttributeList attributeList = null; + try { + attributeList = mbsc.getAttributes(objectName, attributes); + } catch (Exception e) { + LOG.warn("getMeterMetric failed! " + e.getMessage()); + return new MeterMetric(0L, 0D, 0D, 0D, 0D); + } + return new MeterMetric(getLongValue(attributeList, attributes[0]), getDoubleValue(attributeList, attributes[1]), + getDoubleValue(attributeList, attributes[2]), getDoubleValue(attributeList, attributes[3]), + getDoubleValue(attributeList, attributes[4])); + } + + private ObjectName getObjectName(String metricName, Optional topicName) { + ObjectName objectName = null; + try { + if (topicName.isPresent()) { + objectName = new ObjectName( + "kafka.server:type=BrokerTopicMetrics,name=" + metricName + ",topic=" + topicName.get()); + } else { + objectName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=" + metricName); + } + } catch (MalformedObjectNameException e) { + LOG.error("Get ObjectName error! " + e.getMessage()); + } + return objectName; + } + + public OSMetric getOSMetric(MBeanServerConnection mbsc) { + String[] attributes = { "ProcessCpuLoad", "SystemCpuLoad" }; + AttributeList attributeList = null; + try { + attributeList = mbsc.getAttributes(objectNames.get(operatingSystemObjectName), attributes); + } catch (Exception e) { + LOG.warn("getOSMetric failed! " + e.getMessage()); + return new OSMetric(0D, 0D); + } + return new OSMetric(getDoubleValue(attributeList, attributes[0]), getDoubleValue(attributeList, attributes[0])); + } + + private Double getDoubleValue(AttributeList attributes, String name) { + List _attributes = attributes.asList(); + for (Attribute attr : _attributes) { + if (attr.getName().equalsIgnoreCase(name)) { + return (Double) attr.getValue(); + } + } + return 0D; + } + + private Long getLongValue(AttributeList attributes, String name) { + List _attributes = attributes.asList(); + for (Attribute attr : _attributes) { + if (attr.getName().equalsIgnoreCase(name)) { + return (Long) attr.getValue(); + } + } + return 0L; + } +} diff --git a/src/main/java/com/chickling/kmonitor/core/jmx/MeterMetric.java b/src/main/java/com/chickling/kmonitor/core/jmx/MeterMetric.java new file mode 100644 index 0000000..54428d1 --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/core/jmx/MeterMetric.java @@ -0,0 +1,76 @@ +package com.chickling.kmonitor.core.jmx; + +/** + * @author Hulva Luva.H + * @since 2017-07-11 + * + */ +public class MeterMetric { + + private Long count; + private Double fifteenMinuteRate; + private Double fiveMinuteRate; + private Double oneMinuteRate; + private Double meanRate; + + public MeterMetric() { + super(); + } + + public MeterMetric(Long count, Double fifteenMinuteRate, Double fiveMinuteRate, Double oneMinuteRate, + Double meanRate) { + super(); + this.count = count; + this.fifteenMinuteRate = fifteenMinuteRate; + this.fiveMinuteRate = fiveMinuteRate; + this.oneMinuteRate = oneMinuteRate; + this.meanRate = meanRate; + } + + public Long getCount() { + return count; + } + + public void setCount(Long count) { + this.count = count; + } + + public Double getFifteenMinuteRate() { + return fifteenMinuteRate; + } + + public void setFifteenMinuteRate(Double fifteenMinuteRate) { + this.fifteenMinuteRate = fifteenMinuteRate; + } + + public Double getFiveMinuteRate() { + return fiveMinuteRate; + } + + public void setFiveMinuteRate(Double fiveMinuteRate) { + this.fiveMinuteRate = fiveMinuteRate; + } + + public Double getOneMinuteRate() { + return oneMinuteRate; + } + + public void setOneMinuteRate(Double oneMinuteRate) { + this.oneMinuteRate = oneMinuteRate; + } + + public Double getMeanRate() { + return meanRate; + } + + public void setMeanRate(Double meanRate) { + this.meanRate = meanRate; + } + + @Override + public String toString() { + return "MeterMrtric [count=" + count + ", fifteenMinuteRate=" + fifteenMinuteRate + ", fiveMinuteRate=" + + fiveMinuteRate + ", oneMinuteRate=" + oneMinuteRate + ", meanRate=" + meanRate + "]"; + } + +} diff --git a/src/main/java/com/chickling/kmonitor/core/jmx/metric/OSMetric.java b/src/main/java/com/chickling/kmonitor/core/jmx/metric/OSMetric.java new file mode 100644 index 0000000..9c6ee4d --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/core/jmx/metric/OSMetric.java @@ -0,0 +1,54 @@ +package com.chickling.kmonitor.core.jmx.metric; + +import com.chickling.kmonitor.utils.MetricUtils; + +/** + * @author Hulva Luva.H + * @since 2017-07-11 + * + */ +public class OSMetric { + + private Double processCpuLoad; + private Double systemCpuLoad; + + public OSMetric() { + super(); + } + + public OSMetric(Double processCpuLoad, Double systemCpuLoad) { + super(); + this.processCpuLoad = processCpuLoad; + this.systemCpuLoad = systemCpuLoad; + } + + public Double getProcessCpuLoad() { + return processCpuLoad; + } + + public void setProcessCpuLoad(Double processCpuLoad) { + this.processCpuLoad = processCpuLoad; + } + + public Double getSystemCpuLoad() { + return systemCpuLoad; + } + + public void setSystemCpuLoad(Double systemCpuLoad) { + this.systemCpuLoad = systemCpuLoad; + } + + public String formatedProcessCpuLoad() { + return MetricUtils.rateFormat(getProcessCpuLoad(), 0); + } + + public String formatedSystemCpuLoad() { + return MetricUtils.rateFormat(getSystemCpuLoad(), 0); + } + + @Override + public String toString() { + return "OSMetric [processCpuLoad=" + processCpuLoad + ", systemCpuLoad=" + systemCpuLoad + "]"; + } + +} diff --git a/src/main/java/com/chickling/kmonitor/initialize/SystemManager.java b/src/main/java/com/chickling/kmonitor/initialize/SystemManager.java index bebb9e9..8383d97 100644 --- a/src/main/java/com/chickling/kmonitor/initialize/SystemManager.java +++ b/src/main/java/com/chickling/kmonitor/initialize/SystemManager.java @@ -43,7 +43,7 @@ public class SystemManager { private static ExecutorService worker; - private static final int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors(); + public static final int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors(); private static final ExecutorService kafkaInfoCollectAndSavePool = Executors .newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, new WorkerThreadFactory("KafkaInfo Collector")); @@ -97,7 +97,7 @@ private static void initSystem() { if (db != null) db.close(); db = new ElasticsearchOffsetDB(config); - if (db.check()) { + if (!db.check()) { throw new RuntimeException("No elasticsearch node avialable!"); } if (og != null) diff --git a/src/main/java/com/chickling/kmonitor/utils/CommonUtils.java b/src/main/java/com/chickling/kmonitor/utils/CommonUtils.java index f2fabbe..26a9a21 100644 --- a/src/main/java/com/chickling/kmonitor/utils/CommonUtils.java +++ b/src/main/java/com/chickling/kmonitor/utils/CommonUtils.java @@ -1,6 +1,8 @@ package com.chickling.kmonitor.utils; import java.io.File; +import java.net.URI; +import java.net.URL; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Collections; @@ -50,4 +52,18 @@ public static String loadFileContent(String filePath) { } return contents; } + + public static URI getURI(String urlStr) { + URL url = null; + URI uri = null; + try { + url = new URL(urlStr); + uri = new URI(url.getProtocol(), url.getHost(), url.getPath(), url.getQuery(), null); + } catch (Exception e) { + LOG.error("URL error!"); + // TODO + return null; + } + return uri; + } } diff --git a/src/main/java/com/chickling/kmonitor/utils/elasticsearch/Ielasticsearch.java b/src/main/java/com/chickling/kmonitor/utils/elasticsearch/Ielasticsearch.java new file mode 100644 index 0000000..28a8f14 --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/utils/elasticsearch/Ielasticsearch.java @@ -0,0 +1,28 @@ +package com.chickling.kmonitor.utils.elasticsearch; + +import java.util.List; + +import org.json.JSONObject; + +import com.chickling.kmonitor.model.OffsetHistoryQueryParams; +import com.chickling.kmonitor.model.OffsetPoints; + +/** + * @author Hulva Luva.H from ECBD + * @date 2017年7月19日 + * @description + * + */ +public interface Ielasticsearch { + + void bulkIndex(JSONObject data, String docType, String indexPrefix); + + List offsetHistory(String indexPrefix, String docType, String group, String topic); + + List scrollsSearcher(OffsetHistoryQueryParams params, String docType, String indexPrefix); + + boolean check(); + + void close(); + +} diff --git a/src/main/java/com/chickling/kmonitor/utils/ElasticsearchUtil.java b/src/main/java/com/chickling/kmonitor/utils/elasticsearch/javaapi/ElasticsearchJavaUtil.java similarity index 92% rename from src/main/java/com/chickling/kmonitor/utils/ElasticsearchUtil.java rename to src/main/java/com/chickling/kmonitor/utils/elasticsearch/javaapi/ElasticsearchJavaUtil.java index 11dccfb..303781f 100644 --- a/src/main/java/com/chickling/kmonitor/utils/ElasticsearchUtil.java +++ b/src/main/java/com/chickling/kmonitor/utils/elasticsearch/javaapi/ElasticsearchJavaUtil.java @@ -1,4 +1,4 @@ -package com.chickling.kmonitor.utils; +package com.chickling.kmonitor.utils.elasticsearch.javaapi; import java.net.InetSocketAddress; import java.text.ParseException; @@ -36,18 +36,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.chickling.kmonitor.alert.WorkerThreadFactory; +import com.chickling.kmonitor.initialize.SystemManager; import com.chickling.kmonitor.model.OffsetHistoryQueryParams; import com.chickling.kmonitor.model.OffsetPoints; +import com.chickling.kmonitor.utils.elasticsearch.Ielasticsearch; /** * @author Hulva Luva.H * */ -public class ElasticsearchUtil { - private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchUtil.class); +public class ElasticsearchJavaUtil implements Ielasticsearch{ + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchJavaUtil.class); static TransportClient client = null; - public ElasticsearchUtil(String stringHosts) { + public ElasticsearchJavaUtil(String stringHosts) { initClient(stringHosts); } @@ -102,8 +105,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) } public List scrollsSearcher(OffsetHistoryQueryParams params, String docType, String indexPrefix) { - int parallism = Runtime.getRuntime().availableProcessors(); - ExecutorService pool = Executors.newFixedThreadPool(parallism); + ExecutorService pool = Executors.newFixedThreadPool(SystemManager.DEFAULT_THREAD_POOL_SIZE, + new WorkerThreadFactory("OffsetHistoryQuery-JavaAPI")); List result = new ArrayList(); @@ -131,10 +134,10 @@ public List scrollsSearcher(OffsetHistoryQueryParams params, Strin } try { int step = 1; - if (searchHits.length < parallism) { + if (searchHits.length < SystemManager.DEFAULT_THREAD_POOL_SIZE) { step = 1; } else { - step = searchHits.length / parallism; + step = searchHits.length / SystemManager.DEFAULT_THREAD_POOL_SIZE; } Future> future = null; for (int i = 0; i < searchHits.length; i = i + step) { @@ -253,8 +256,8 @@ private String getRangeFrom(OffsetHistoryQueryParams params) throws ParseExcepti // } public List offsetHistory(String indexPrefix, String docType, String group, String topic) { - int parallism = Runtime.getRuntime().availableProcessors(); - ExecutorService pool = Executors.newFixedThreadPool(parallism); + ExecutorService pool = Executors.newFixedThreadPool(SystemManager.DEFAULT_THREAD_POOL_SIZE, + new WorkerThreadFactory("OffsetHistoryQuery-JavaAPI")); List result = new ArrayList(); SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); @@ -285,10 +288,10 @@ public List offsetHistory(String indexPrefix, String docType, Stri } try { int step = 1; - if (searchHits.length < parallism) { + if (searchHits.length < SystemManager.DEFAULT_THREAD_POOL_SIZE) { step = 1; } else { - step = searchHits.length / parallism; + step = searchHits.length / SystemManager.DEFAULT_THREAD_POOL_SIZE; } Future> future = null; for (int i = 0; i < searchHits.length; i = i + step) { diff --git a/src/main/java/com/chickling/kmonitor/utils/elasticsearch/restapi/ElasticsearchRESTUtil.java b/src/main/java/com/chickling/kmonitor/utils/elasticsearch/restapi/ElasticsearchRESTUtil.java new file mode 100644 index 0000000..3acd741 --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/utils/elasticsearch/restapi/ElasticsearchRESTUtil.java @@ -0,0 +1,376 @@ +package com.chickling.kmonitor.utils.elasticsearch.restapi; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.json.JSONArray; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.ResponseEntity; +import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; +import org.springframework.web.client.RestTemplate; + +import com.chickling.kmonitor.alert.WorkerThreadFactory; +import com.chickling.kmonitor.initialize.SystemManager; +import com.chickling.kmonitor.model.OffsetHistoryQueryParams; +import com.chickling.kmonitor.model.OffsetPoints; +import com.chickling.kmonitor.utils.elasticsearch.Ielasticsearch; + +/** + * @author Hulva Luva.H + * + */ +public class ElasticsearchRESTUtil implements Ielasticsearch { + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchRESTUtil.class); + private static String RERST_HOST; + private static RestTemplate REST; + private static HttpComponentsClientHttpRequestFactory httpRequestFactory; + private static HttpHeaders headers; + + static { + httpRequestFactory = new HttpComponentsClientHttpRequestFactory(); + httpRequestFactory.setConnectTimeout(300000); // 5min + httpRequestFactory.setConnectionRequestTimeout(300000); + httpRequestFactory.setReadTimeout(300000); + REST = new RestTemplate(); + REST.setRequestFactory(httpRequestFactory); + headers = new HttpHeaders(); + headers.add("Content-Type", "application/json"); + headers.add("Accept", "*/*"); + } + + public ElasticsearchRESTUtil(String restHost) { + RERST_HOST = restHost; + } + + /** + * GET _cluster/health + * + * + * { + "cluster_name" : "testcluster", + "status" : "yellow", + "timed_out" : false, + "number_of_nodes" : 1, + "number_of_data_nodes" : 1, + "active_primary_shards" : 5, + "active_shards" : 5, + "relocating_shards" : 0, + "initializing_shards" : 0, + "unassigned_shards" : 5, + "delayed_unassigned_shards": 0, + "number_of_pending_tasks" : 0, + "number_of_in_flight_fetch": 0, + "task_max_waiting_in_queue_millis": 0, + "active_shards_percent_as_number": 50.0 + } + * + * + * @return + */ + public boolean check() { + + return true; + } + + public void bulkIndex(JSONObject data, String docType, String indexPrefix) { + StringBuilder bulkData = new StringBuilder(); + SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd"); + Date now = new Date(); + String indexSufix = sFormat.format(now); + + boolean hasData = false; + Iterator keys = data.keys(); + while (keys.hasNext()) { + hasData = true; + bulkData.append( + "{\"index\": {\"_index\":\"" + indexPrefix + indexSufix + "\",\"_type\":\"" + docType + "\"}}") + .append("\n"); + bulkData.append(data.getJSONObject(keys.next()).toString()).append("\n"); + } + if (!hasData) { + return; + } + ResponseEntity response = REST.exchange("http://" + RERST_HOST + "/_bulk", HttpMethod.POST, + new HttpEntity(bulkData.toString(), headers), String.class); + // TODO Do something with response? + response.getBody(); + } + + public List scrollsSearcher(OffsetHistoryQueryParams params, String docType, String indexPrefix) { + ExecutorService pool = Executors.newFixedThreadPool(SystemManager.DEFAULT_THREAD_POOL_SIZE, + new WorkerThreadFactory("OffsetHistoryQuery-RESTAPI")); + + List result = new ArrayList(); + + SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + try { + String indexNameSearch = indexPrefix + "*"; + + String rangeFrom = getRangeFrom(params); + + List concernedTimestamp = dateHistogram(sFormat.parse(rangeFrom).getTime(), + Long.parseLong(params.getRangeto()), params.getInterval()); + + List>> futureList = new ArrayList>>(); + + ResponseEntity response = REST + .exchange("http://" + RERST_HOST + "/" + indexNameSearch + "/" + docType + "/_search?scroll=1m", + HttpMethod.POST, + new HttpEntity( + ScrollSearchTemplate.getScrollSearchBody(params.getTopic(), params.getGroup(), + rangeFrom, sFormat.format(new Date(Long.parseLong(params.getRangeto())))), + headers), + String.class); + + JSONObject searchResult = null; + while (true) { + searchResult = new JSONObject(response.getBody()); + final JSONArray searchHits = searchResult.getJSONObject("hits").getJSONArray("hits"); + if (searchHits.length() == 0) { + break; + } + try { + Future> future = null; + future = pool.submit(new GenerateOffsetHistoryDataset(searchHits, concernedTimestamp)); + futureList.add(future); + } catch (Exception e) { + LOG.warn("Ops...GenerateOffsetHistoryDataset went wrong! " + e.getMessage()); + } + + response = REST.exchange("http://" + RERST_HOST + "/_search/scroll", HttpMethod.POST, + new HttpEntity( + ScrollSearchTemplate.getScrollNextBody(searchResult.getString("_scroll_id")), headers), + String.class); + } + for (Future> future : futureList) { + try { + result.addAll(future.get()); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + LOG.error("Interrupted when get GenerateOffsetHistoryDataset in future...", e); + } catch (ExecutionException e) { + // TODO Auto-generated catch block + LOG.error("QAQ when get GenerateOffsetHistoryDataset in future...", e); + } + } + pool.shutdown(); + } catch ( + + Exception e) { + // TODO + LOG.error("Damn...", e); + } + return result; + } + + private List dateHistogram(long from, long to, String interval) { + switch (interval) { + case "1m": + return dateHistogram(from, to, 1 * 60 * 1000); + case "10m": + return dateHistogram(from, to, 10 * 60 * 1000); + case "30m": + return dateHistogram(from, to, 30 * 60 * 1000); + case "1h": + return dateHistogram(from, to, 1 * 60 * 60 * 1000); + case "1d": + return dateHistogram(from, to, 24 * 60 * 60 * 1000); + default: + return dateHistogram(from, to, 1 * 60 * 1000); + } + } + + private List dateHistogram(long from, long to, long interval) { + SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + List histogramedTimestamp = new ArrayList(); + for (long i = from; i < to + interval; i = i + interval) { + histogramedTimestamp.add(sFormat.format(new Date(i))); + } + return histogramedTimestamp; + } + + private String getRangeFrom(OffsetHistoryQueryParams params) throws ParseException { + SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + Calendar cal = Calendar.getInstance(); + cal.setTime(new Date(Long.parseLong(params.getRangeto()))); + switch (params.getRange()) { + case "1h": + cal.add(Calendar.HOUR, -1); + break; + case "8h": + cal.add(Calendar.HOUR, -8); + break; + case "16h": + cal.add(Calendar.HOUR, -16); + break; + case "1d": + cal.add(Calendar.DATE, -1); + break; + case "2d": + cal.add(Calendar.DATE, -2); + break; + case "1w": + cal.add(Calendar.WEEK_OF_MONTH, -1); + break; + default: + cal.add(Calendar.HOUR, -1); + break; + } + return sFormat.format(cal.getTime()); + } + + // private int calculatePageSize(LogsizeRequestParams params, String + // rangeFrom) throws ParseException { + // SimpleDateFormat sFormat = new + // SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + // switch (params.getInterval()) { + // case "5m": + // return (int) ((Long.parseLong(params.getRangeTo()) - + // sFormat.parse(rangeFrom).getTime()) / (5 * 60 * 1000)); + // case "30m": + // return (int) ((Long.parseLong(params.getRangeTo()) - + // sFormat.parse(rangeFrom).getTime()) + // / (30 * 60 * 1000)); + // case "1h": + // return (int) ((Long.parseLong(params.getRangeTo()) - + // sFormat.parse(rangeFrom).getTime()) + // / (60 * 60 * 1000)); + // case "1d": + // return (int) ((Long.parseLong(params.getRangeTo()) - + // sFormat.parse(rangeFrom).getTime()) + // / (24 * 60 * 60 * 1000)); + // default: + // return (int) ((Long.parseLong(params.getRangeTo()) - + // sFormat.parse(rangeFrom).getTime()) + // / (60 * 60 * 1000)); + // } + // } + + public List offsetHistory(String indexPrefix, String docType, String group, String topic) { + ExecutorService pool = Executors.newFixedThreadPool(SystemManager.DEFAULT_THREAD_POOL_SIZE, + new WorkerThreadFactory("OffsetHistoryQuery-RESTAPI")); + + List result = new ArrayList(); + SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + try { + String indexNameSearch = indexPrefix + "*"; + + Date now = new Date(); + Calendar cal = Calendar.getInstance(); + cal.setTime(now); + cal.set(Calendar.MILLISECOND, 0); + cal.set(Calendar.SECOND, 0); + cal.add(Calendar.HOUR, -1); + String rangeFrom = sFormat.format(cal.getTime()); + List>> futureList = new ArrayList>>(); + + ResponseEntity response = REST.exchange( + + "http://" + RERST_HOST + "/" + indexNameSearch + "/" + docType + "/_search?scroll=1m", + HttpMethod.POST, + new HttpEntity( + ScrollSearchTemplate.getScrollSearchBody(topic, group, rangeFrom, sFormat.format(now)), + headers), + String.class); + + JSONObject searchResult = null; + while (true) { + searchResult = new JSONObject(response.getBody()); + final JSONArray searchHits = searchResult.getJSONObject("hits").getJSONArray("hits"); + if (searchHits.length() == 0) { + break; + } + try { + Future> future = null; + future = pool.submit(new GenerateOffsetHistoryDataset(searchHits)); + futureList.add(future); + } catch (Exception e) { + LOG.warn("Ops...GenerateOffsetHistoryDataset went wrong! " + e.getMessage()); + } + + response = REST.exchange("http://" + RERST_HOST + "/_search/scroll", HttpMethod.POST, + new HttpEntity( + ScrollSearchTemplate.getScrollNextBody(searchResult.getString("_scroll_id")), headers), + String.class); + } + for (Future> future : futureList) { + try { + result.addAll(future.get()); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + LOG.error("Interrupted when get GenerateOffsetHistoryDataset in future...", e); + } catch (ExecutionException e) { + // TODO Auto-generated catch block + LOG.error("QAQ when get GenerateOffsetHistoryDataset in future...", e); + } + } + pool.shutdown(); + } catch (Exception e) { + pool.shutdown(); + // TODO + LOG.error("Damn...", e); + } + + return result; + } + + class GenerateOffsetHistoryDataset implements Callable> { + + private JSONArray searchHits; + List concernedTimestamp = null; + + public GenerateOffsetHistoryDataset(JSONArray searchHits) { + this.searchHits = searchHits; + } + + public GenerateOffsetHistoryDataset(JSONArray searchHitPart, List concernedTimestamp) { + this.searchHits = searchHitPart; + this.concernedTimestamp = concernedTimestamp; + } + + @Override + public List call() { + JSONObject source = null; + List datasets = new ArrayList(); + try { + for (int i = 0; i < searchHits.length(); i++) { + source = searchHits.getJSONObject(i).getJSONObject("_source"); + if (concernedTimestamp != null) { + if (!concernedTimestamp.contains(source.getString("date"))) { + continue; + } + } + Long offset = source.getLong("offset"); + Long logsize = source.getLong("logSize"); + + datasets.add(new OffsetPoints(source.getLong("timestamp"), source.getInt("partition"), + source.getString("owner"), offset, logsize)); + } + } catch (Exception e) { + LOG.error("GenerateOffsetHistoryDataset error! " + e.getMessage()); + } + return datasets; + } + + } + + @Override + public void close() { + + } +} diff --git a/src/main/java/com/chickling/kmonitor/utils/elasticsearch/restapi/ScrollSearchTemplate.java b/src/main/java/com/chickling/kmonitor/utils/elasticsearch/restapi/ScrollSearchTemplate.java new file mode 100644 index 0000000..748da81 --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/utils/elasticsearch/restapi/ScrollSearchTemplate.java @@ -0,0 +1,20 @@ +package com.chickling.kmonitor.utils.elasticsearch.restapi; + +/** + * @author Hulva Luva.H from ECBD + * @date 2017年7月19日 + * @description + * + */ +public class ScrollSearchTemplate { + + public static String getScrollSearchBody(String topic, String group, String from, String to) { + return "{\"size\":1000,\"sort\":[{\"timestamp\":{\"order\":\"asc\"}},{\"partition\":{\"order\":\"asc\"}}],\"query\":{\"bool\":{\"must\":[{\"match\":{\"topic\":\"" + + topic + "\"}},{\"match\":{\"group\":\"" + group + "\"}}],\"filter\":[{\"range\":{\"date\":{\"gte\":\"" + + from + "\",\"lte\":\"" + to + "\",\"format\":\"yyyy-MM-dd'T'HH:mm:ss.SSSZ\"}}}]}}}"; + } + + public static String getScrollNextBody(String scrollId) { + return "{\"scroll\":\"1m\",\"scroll_id\":\"" + scrollId + "\"}"; + } +} diff --git a/src/main/resources/static/scripts/app.js b/src/main/resources/static/scripts/app.js index b037471..164a6ec 100644 --- a/src/main/resources/static/scripts/app.js +++ b/src/main/resources/static/scripts/app.js @@ -46,10 +46,10 @@ var app = angular.module('offsetapp', templateUrl : "views/setting.html", controller : "SettingCtrl" }) - /* - * .when("/broker/:endpoint", { templateUrl : "views/broker.html", - * controller : "BrokerCtrl" }) - */; + .when("/broker/:endpoint", { + templateUrl : "views/broker.html", + controller : "BrokerCtrl" + }); ; }).factory('isSystemReadyInterceptor', ["$location", function($location) { var isSystemReadyInterceptor = { diff --git a/src/main/resources/static/scripts/cluster-viz.js b/src/main/resources/static/scripts/cluster-viz.js index 7e6fbbc..9bbab3d 100644 --- a/src/main/resources/static/scripts/cluster-viz.js +++ b/src/main/resources/static/scripts/cluster-viz.js @@ -174,10 +174,10 @@ function load_lag_page(d) { && parent != "KafkaCluster") { window.location.replace("./#/group/" + name + "/" + parent); } -// if (parent != undefined && parent != "ActiveTopics" -// && parent == "KafkaCluster") { -// window.location.replace("./#/broker/" + name); -// } + if (parent != undefined && parent != "ActiveTopics" + && parent == "KafkaCluster") { + window.location.replace("./#/broker/" + name); + } } } diff --git a/src/main/resources/static/scripts/controllers.js b/src/main/resources/static/scripts/controllers.js index 604776a..b7b8e6a 100644 --- a/src/main/resources/static/scripts/controllers.js +++ b/src/main/resources/static/scripts/controllers.js @@ -471,6 +471,7 @@ angular.module('offsetapp.controllers', [ "offsetapp.services" ]) zkHosts: "", dataCollectFrequency: 1, excludeByLastSeen: 2592000, + apiType: "", esHosts: "", esIndex: "", docTypeForOffset: "kafkaoffsetinfo", @@ -482,6 +483,8 @@ angular.module('offsetapp.controllers', [ "offsetapp.services" ]) mailSender: "", mailSubject: "" } + + $scope.apiTypes = ["Java API", "REST API"]; offsetinfo.getSetting().success(function(d) { if(d.isSystemReady!=undefined && !d.isSystemReady){ $scope.settingForm = settingFormModal; @@ -528,24 +531,24 @@ angular.module('offsetapp.controllers', [ "offsetapp.services" ]) }); } } ]) - /*.controller("BrokerCtrl", [ "$scope", "$routeParams", "offsetinfo", + .controller("BrokerCtrl", [ "$scope", "$routeParams", "offsetinfo", function($scope, $routeParams, offsetinfo) { $scope.loading = true; $scope.brokerEndpoint = $routeParams.endpoint; - var options = { - axisY : { - type : Chartist.AutoScaleAxis, - low : 4318293, - high : 4319246, - onlyInteger : true - } - }; - var data = { - labels : [ '02:39:34', '02:40:04', '02:40:34', '02:41:04', - '02:41:34', '02:42:04', '02:42:34', '02:43:04', '02:43:34', - '02:44:04' ], - series : [ [ 4318294, 4318393, 4318488, 4318603, 4318695, 4318808, - 4318922, 4319032, 4319146, 4319245 ] ] - }; - new Chartist.Line('.ct-chart', data, options); - } ])*/; \ No newline at end of file +// var options = { +// axisY : { +// type : Chartist.AutoScaleAxis, +// low : 4318293, +// high : 4319246, +// onlyInteger : true +// } +// }; +// var data = { +// labels : [ '02:39:34', '02:40:04', '02:40:34', '02:41:04', +// '02:41:34', '02:42:04', '02:42:34', '02:43:04', '02:43:34', +// '02:44:04' ], +// series : [ [ 4318294, 4318393, 4318488, 4318603, 4318695, 4318808, +// 4318922, 4319032, 4319146, 4319245 ] ] +// }; +// new Chartist.Line('.ct-chart', data, options); + } ]); \ No newline at end of file diff --git a/src/main/resources/static/views/broker.html b/src/main/resources/static/views/broker.html new file mode 100644 index 0000000..4ae27d4 --- /dev/null +++ b/src/main/resources/static/views/broker.html @@ -0,0 +1,73 @@ + + +
+
+

Metrics

+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
RateMean1 min5 min15 min
Messages in /sec0.000.000.000.00
Bytes in /sec1.640.000.000.00
Bytes out /sec1.640.000.000.00
Bytes rejected /sec0.000.000.000.00
Failed fetch request /sec0.000.000.000.00
Failed produce request /sec0.000.000.000.00
+
+ + \ No newline at end of file diff --git a/src/main/resources/static/views/cluster-viz.html b/src/main/resources/static/views/cluster-viz.html index e8cca32..cd9289d 100644 --- a/src/main/resources/static/views/cluster-viz.html +++ b/src/main/resources/static/views/cluster-viz.html @@ -4,3 +4,65 @@

Kafka Cluster Visualization

Loading ...
+ +
+
+

Metrics

+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
RateMean1 min5 min15 min
Messages in /sec0.000.000.000.00
Bytes in /sec1.640.000.000.00
Bytes out /sec1.640.000.000.00
Bytes rejected /sec0.000.000.000.00
Failed fetch request /sec0.000.000.000.00
Failed produce request /sec0.000.000.000.00
+
diff --git a/src/main/resources/static/views/setting.html b/src/main/resources/static/views/setting.html index 6e682f8..79e7ee6 100644 --- a/src/main/resources/static/views/setting.html +++ b/src/main/resources/static/views/setting.html @@ -32,11 +32,17 @@

Zookeeper

Elasticsearch

+
+ +
+ placeholder="JAVA API- esHost1:9300,esHost2:9300,esHostN:9300 REST API- esHost:9200">
-
Loading ...
- -
-

Active Consumers

- +
+
+

Metrics

+
- + + + + + - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
nameRateMean1 min5 min15 min
{{c.name}}
Unable to find Active Consumers
Messages in /sec0.000.000.000.00
Bytes in /sec1.640.000.000.00
Bytes out /sec1.640.000.000.00
Bytes rejected /sec0.000.000.000.00
Failed fetch request /sec0.000.000.000.00
Failed produce request /sec0.000.000.000.00
+
-

Active Consumers Offsets

-
-
-
-
Unable to find - Active Consumers
+
Loading ...
-
+
+

Active Consumers

-

InActive Consumers

+ + + + + + + + + + + + + + +
name
{{c.name}}
Unable to find Active Consumers
- - - - - - - - - - - - - - -
name
{{c.name}}
Unable to find Inactive Consumers
+

Active Consumers Offsets

+
+
+
+
Unable to + find Active Consumers
-

Inactive Consumers Offsets

-
-
-
-
Unable to find - Inactive Consumers
+
-
+

InActive Consumers

-
+ + + + + + + + + + + + + + +
name
{{c.name}}
Unable to find Inactive Consumers
+ +

Inactive Consumers Offsets

+
+
+
+
Unable to + find Inactive Consumers
+ +
+ +
\ No newline at end of file diff --git a/src/main/resources/static/views/topic-detail.html b/src/main/resources/static/views/topic-detail.html index 2b5540d..5f8aa0f 100644 --- a/src/main/resources/static/views/topic-detail.html +++ b/src/main/resources/static/views/topic-detail.html @@ -4,28 +4,99 @@

-
Loading ...
- -
-

Active Consumers

+
+
+

Metrics

+
- + + + + + - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
nameRateMean1 min5 min15 min
- {{c.name}}{{c.name}}
Messages in /sec0.000.000.000.00
Bytes in /sec1.640.000.000.00
Bytes out /sec1.640.000.000.00
Bytes rejected /sec0.000.000.000.00
Failed fetch request /sec0.000.000.000.00
Failed produce request /sec0.000.000.000.00
+
+ +
Loading ...
+ +
+

Active Consumers

+ + + + + + + + + + + + +
name
+ {{c.name}}{{c.name}}
- More details... -
+ More details... +
-
\ No newline at end of file +
\ No newline at end of file diff --git a/src/test/java/com/chickling/kmonitor/test/EsSearchTest.java b/src/test/java/com/chickling/kmonitor/test/EsSearchTest.java index 26e26da..979dd2f 100644 --- a/src/test/java/com/chickling/kmonitor/test/EsSearchTest.java +++ b/src/test/java/com/chickling/kmonitor/test/EsSearchTest.java @@ -3,7 +3,7 @@ import java.util.List; import com.chickling.kmonitor.model.OffsetPoints; -import com.chickling.kmonitor.utils.ElasticsearchUtil; +import com.chickling.kmonitor.utils.elasticsearch.javaapi.ElasticsearchJavaUtil; /** * @@ -13,7 +13,7 @@ public class EsSearchTest { public static void main(String[] args) { - ElasticsearchUtil es = new ElasticsearchUtil("10.16.238.82:9300,10.16.238.83:9300,10.16.238.84:9300"); + ElasticsearchJavaUtil es = new ElasticsearchJavaUtil("10.16.238.82:9300,10.16.238.83:9300,10.16.238.84:9300"); List result = es.offsetHistory("logx_healthcheck_test", "kafkaoffset", "testkafka", "EC2_Test"); System.out.println(result); From a4a6b43afa028ecaded58f7be6c03d36f73a1b38 Mon Sep 17 00:00:00 2001 From: "Hulva Luva.H" Date: Thu, 20 Jul 2017 17:30:58 +0800 Subject: [PATCH 6/6] add JMX metric - BrokerTopicMetrics --- .../kmonitor/alert/DaemonThreadFactory.java | 25 ++ .../kmonitor/alert/KafkaNodeListener.java | 49 ++++ .../kmonitor/controller/AlertController.java | 6 +- .../controller/JMXMetricController.java | 217 +++++++++++++++++- .../chickling/kmonitor/core/OffsetGetter.java | 2 +- .../core/jmx/FormatedMeterMetric.java | 78 +++++++ .../kmonitor/core/jmx/KafkaMetrics.java | 4 +- .../kmonitor/core/jmx/MeterMetric.java | 4 +- .../kmonitor/initialize/SystemManager.java | 2 +- .../chickling/kmonitor/utils/MetricUtils.java | 8 +- .../com/chickling/kmonitor/utils/ZKUtils.java | 3 +- src/main/resources/static/scripts/app.js | 9 + .../resources/static/scripts/controllers.js | 51 ++-- src/main/resources/static/views/broker.html | 62 ++--- .../resources/static/views/cluster-viz.html | 61 ++--- src/main/resources/static/views/setting.html | 2 +- .../static/views/topic-consumers.html | 62 ++--- .../resources/static/views/topic-detail.html | 72 +++--- .../com/chickling/kmonitor/test/JMXTest.java | 131 +++++++++++ 19 files changed, 692 insertions(+), 156 deletions(-) create mode 100644 src/main/java/com/chickling/kmonitor/alert/DaemonThreadFactory.java create mode 100644 src/main/java/com/chickling/kmonitor/alert/KafkaNodeListener.java create mode 100644 src/main/java/com/chickling/kmonitor/core/jmx/FormatedMeterMetric.java create mode 100644 src/test/java/com/chickling/kmonitor/test/JMXTest.java diff --git a/src/main/java/com/chickling/kmonitor/alert/DaemonThreadFactory.java b/src/main/java/com/chickling/kmonitor/alert/DaemonThreadFactory.java new file mode 100644 index 0000000..2867534 --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/alert/DaemonThreadFactory.java @@ -0,0 +1,25 @@ +package com.chickling.kmonitor.alert; + +import java.util.concurrent.ThreadFactory; + +/** + * @author Hulva Luva.H + * + */ +public class DaemonThreadFactory implements ThreadFactory { + private String prefix = ""; + + public DaemonThreadFactory(String prefix) { + this.prefix = prefix; + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "DaemonThread-" + prefix); + t.setDaemon(true); + if (t.getPriority() != Thread.NORM_PRIORITY) + t.setPriority(Thread.NORM_PRIORITY); + return t; + } + +} diff --git a/src/main/java/com/chickling/kmonitor/alert/KafkaNodeListener.java b/src/main/java/com/chickling/kmonitor/alert/KafkaNodeListener.java new file mode 100644 index 0000000..621b214 --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/alert/KafkaNodeListener.java @@ -0,0 +1,49 @@ +package com.chickling.kmonitor.alert; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.I0Itec.zkclient.IZkChildListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.chickling.kmonitor.utils.ZKUtils; + +import kafka.utils.ZkUtils; + +/** + * @author Hulva Luva.H + * + * Listening Kafka node on ZK + * + */ +public class KafkaNodeListener { + private static Logger LOG = LoggerFactory.getLogger(KafkaNodeListener.class); + private static ExecutorService exec = null; + + + public KafkaNodeListener() { + exec = Executors.newCachedThreadPool(new DaemonThreadFactory("KafkaNodeOffLineListener")); + } + + public void startListener() { + LOG.info("Starting Kafka ZK node listener..."); + exec.execute(new Runnable() { + + @Override + public void run() { + ZKUtils.getZKClient().subscribeChildChanges(ZkUtils.BrokerIdsPath(), new IZkChildListener() { + + @Override + public void handleChildChange(String parentPath, List currentChilds) throws Exception { + + } + + }); + } + + }); + } + +} diff --git a/src/main/java/com/chickling/kmonitor/controller/AlertController.java b/src/main/java/com/chickling/kmonitor/controller/AlertController.java index 04e57c3..5032703 100644 --- a/src/main/java/com/chickling/kmonitor/controller/AlertController.java +++ b/src/main/java/com/chickling/kmonitor/controller/AlertController.java @@ -1,5 +1,6 @@ package com.chickling.kmonitor.controller; +import java.util.HashSet; import java.util.Set; import org.json.JSONObject; @@ -26,7 +27,10 @@ public class AlertController { @RequestMapping(value = "/tasks", method = RequestMethod.GET) public Set get() { - return TaskManager.getTasks(); + if(SystemManager.getConfig().getIsAlertEnabled()) { + return TaskManager.getTasks(); + } + return new HashSet(); } @RequestMapping(value = "/isAlertEnabled", method = RequestMethod.GET) diff --git a/src/main/java/com/chickling/kmonitor/controller/JMXMetricController.java b/src/main/java/com/chickling/kmonitor/controller/JMXMetricController.java index 3171c64..8793ea0 100644 --- a/src/main/java/com/chickling/kmonitor/controller/JMXMetricController.java +++ b/src/main/java/com/chickling/kmonitor/controller/JMXMetricController.java @@ -1,8 +1,14 @@ package com.chickling.kmonitor.controller; -import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import javax.management.MBeanServerConnection; + +import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.PathVariable; @@ -10,7 +16,12 @@ import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; -import com.chickling.kmonitor.initialize.SystemManager; +import com.chickling.kmonitor.core.jmx.FormatedMeterMetric; +import com.chickling.kmonitor.core.jmx.JMXExecutor; +import com.chickling.kmonitor.core.jmx.KafkaJMX; +import com.chickling.kmonitor.core.jmx.KafkaMetrics; +import com.chickling.kmonitor.core.jmx.MeterMetric; +import com.chickling.kmonitor.utils.ZKUtils; /** * @@ -24,10 +35,202 @@ public class JMXMetricController { private static Logger LOG = LoggerFactory.getLogger(JMXMetricController.class); - @RequestMapping(value = "/brokerTopicMetrics/broker", method = RequestMethod.GET) - public List getBrokerTopicMetrics(@PathVariable String broker, @PathVariable String topic) { - List groups = SystemManager.og.getGroups(); - Collections.sort(groups); - return groups; + @RequestMapping(value = "/brokerTopicMetrics/brokers", method = RequestMethod.GET) + public String getBrokerTopicMetricsForBrokers() { + Map result = new HashMap(); + try { + List jmxHosts = ZKUtils.getKafkaJMXHostsFromZookeeper(); + for (String jmxHost : jmxHosts) { + String[] jmxArr = jmxHost.split(":"); + if ("-1".equals(jmxArr[2])) { + continue; + } + KafkaJMX kafkaJMX = new KafkaJMX(); + kafkaJMX.doWithConnection(jmxArr[1], Integer.parseInt(jmxArr[2]), Optional.of(""), Optional.of(""), + false, new JMXExecutor() { + + @Override + public void doWithConnection(MBeanServerConnection mBeanServerConnection) { + KafkaMetrics metrics = new KafkaMetrics(); + if (result.containsKey("BytesInPerSec")) { + result.put("BytesInPerSec", merge(result.get("BytesInPerSec"), + metrics.getBytesInPerSec(mBeanServerConnection, Optional.empty()))); + } else { + result.put("BytesInPerSec", + metrics.getBytesInPerSec(mBeanServerConnection, Optional.empty())); + } + + if (result.containsKey("BytesOutPerSec")) { + result.put("BytesOutPerSec", merge(result.get("BytesOutPerSec"), + metrics.getBytesOutPerSec(mBeanServerConnection, Optional.empty()))); + } else { + result.put("BytesOutPerSec", + metrics.getBytesOutPerSec(mBeanServerConnection, Optional.empty())); + } + + if (result.containsKey("BytesRejectedPerSec")) { + result.put("BytesRejectedPerSec", merge(result.get("BytesRejectedPerSec"), + metrics.getBytesRejectedPerSec(mBeanServerConnection, Optional.empty()))); + } else { + result.put("BytesRejectedPerSec", + metrics.getBytesRejectedPerSec(mBeanServerConnection, Optional.empty())); + } + + if (result.containsKey("FailedFetchRequestsPerSec")) { + result.put("FailedFetchRequestsPerSec", + merge(result.get("FailedFetchRequestsPerSec"), + metrics.getFailedFetchRequestsPerSec(mBeanServerConnection, + Optional.empty()))); + } else { + result.put("FailedFetchRequestsPerSec", metrics + .getFailedFetchRequestsPerSec(mBeanServerConnection, Optional.empty())); + } + if (result.containsKey("FailedProduceRequestsPerSec")) { + result.put("FailedProduceRequestsPerSec", + merge(result.get("FailedProduceRequestsPerSec"), + metrics.getFailedProduceRequestsPerSec(mBeanServerConnection, + Optional.empty()))); + } else { + result.put("FailedProduceRequestsPerSec", metrics + .getFailedProduceRequestsPerSec(mBeanServerConnection, Optional.empty())); + } + } + + }); + } + } catch (Exception e) { + LOG.error("Get jmxHosts error!" + e.getMessage()); + + } + JSONObject response = new JSONObject(); + Set keys = result.keySet(); + for (String key : keys) { + response.put(key, new JSONObject(new FormatedMeterMetric(result.get(key)))); + } + return response.toString(); + } + + protected MeterMetric merge(MeterMetric old, MeterMetric newOne) { + return new MeterMetric(old.getCount() + newOne.getCount(), old.getMeanRate() + newOne.getMeanRate(), + old.getOneMinuteRate() + newOne.getOneMinuteRate(), + old.getFiveMinuteRate() + newOne.getFiveMinuteRate(), + old.getFifteenMinuteRate() + newOne.getFifteenMinuteRate()); + } + + @RequestMapping(value = "/brokerTopicMetrics/broker/{bid}", method = RequestMethod.GET) + public String getBrokerTopicMetricsForBroker(@PathVariable String bid) { + JSONObject response = new JSONObject(); + try { + List jmxHosts = ZKUtils.getKafkaJMXHostsFromZookeeper(); + for (String jmxHost : jmxHosts) { + String[] jmxArr = jmxHost.split(":"); + if (bid.equals(jmxArr[0])) { + if ("-1".equals(jmxArr[2])) { + return response.toString(); + } + KafkaJMX kafkaJMX = new KafkaJMX(); + kafkaJMX.doWithConnection(jmxArr[1], Integer.parseInt(jmxArr[2]), Optional.of(""), Optional.of(""), + false, new JMXExecutor() { + + @Override + public void doWithConnection(MBeanServerConnection mBeanServerConnection) { + KafkaMetrics metrics = new KafkaMetrics(); + response.put("BytesInPerSec", new JSONObject(new FormatedMeterMetric( + metrics.getBytesInPerSec(mBeanServerConnection, Optional.empty())))); + response.put("BytesOutPerSec", new JSONObject(new FormatedMeterMetric( + metrics.getBytesOutPerSec(mBeanServerConnection, Optional.empty())))); + response.put("BytesRejectedPerSec", new JSONObject(new FormatedMeterMetric( + metrics.getBytesRejectedPerSec(mBeanServerConnection, Optional.empty())))); + response.put("FailedFetchRequestsPerSec", + new JSONObject(new FormatedMeterMetric(metrics.getFailedFetchRequestsPerSec( + mBeanServerConnection, Optional.empty())))); + response.put("FailedProduceRequestsPerSec", + new JSONObject( + new FormatedMeterMetric(metrics.getFailedProduceRequestsPerSec( + mBeanServerConnection, Optional.empty())))); + } + }); + } + } + } catch (Exception e) { + LOG.error("Get jmxHosts error!" + e.getMessage()); + + } + return response.toString(); + } + + @RequestMapping(value = "/brokerTopicMetrics/topic/{topic}", method = RequestMethod.GET) + public String getBrokerTopicMetrics(@PathVariable String topic) { + Map result = new HashMap(); + try { + List jmxHosts = ZKUtils.getKafkaJMXHostsFromZookeeper(); + for (String jmxHost : jmxHosts) { + String[] jmxArr = jmxHost.split(":"); + if ("-1".equals(jmxArr[2])) { + continue; + } + KafkaJMX kafkaJMX = new KafkaJMX(); + kafkaJMX.doWithConnection(jmxArr[1], Integer.parseInt(jmxArr[2]), Optional.of(""), Optional.of(""), + false, new JMXExecutor() { + + @Override + public void doWithConnection(MBeanServerConnection mBeanServerConnection) { + KafkaMetrics metrics = new KafkaMetrics(); + if (result.containsKey("BytesInPerSec")) { + result.put("BytesInPerSec", merge(result.get("BytesInPerSec"), + metrics.getBytesInPerSec(mBeanServerConnection, Optional.of(topic)))); + } else { + result.put("BytesInPerSec", + metrics.getBytesInPerSec(mBeanServerConnection, Optional.of(topic))); + } + + if (result.containsKey("BytesOutPerSec")) { + result.put("BytesOutPerSec", merge(result.get("BytesOutPerSec"), + metrics.getBytesOutPerSec(mBeanServerConnection, Optional.of(topic)))); + } else { + result.put("BytesOutPerSec", + metrics.getBytesOutPerSec(mBeanServerConnection, Optional.of(topic))); + } + + if (result.containsKey("BytesRejectedPerSec")) { + result.put("BytesRejectedPerSec", merge(result.get("BytesRejectedPerSec"), + metrics.getBytesRejectedPerSec(mBeanServerConnection, Optional.of(topic)))); + } else { + result.put("BytesRejectedPerSec", + metrics.getBytesRejectedPerSec(mBeanServerConnection, Optional.of(topic))); + } + + if (result.containsKey("FailedFetchRequestsPerSec")) { + result.put("FailedFetchRequestsPerSec", + merge(result.get("FailedFetchRequestsPerSec"), + metrics.getFailedFetchRequestsPerSec(mBeanServerConnection, + Optional.empty()))); + } else { + result.put("FailedFetchRequestsPerSec", metrics + .getFailedFetchRequestsPerSec(mBeanServerConnection, Optional.of(topic))); + } + if (result.containsKey("FailedProduceRequestsPerSec")) { + result.put("FailedProduceRequestsPerSec", + merge(result.get("FailedProduceRequestsPerSec"), + metrics.getFailedProduceRequestsPerSec(mBeanServerConnection, + Optional.empty()))); + } else { + result.put("FailedProduceRequestsPerSec", metrics + .getFailedProduceRequestsPerSec(mBeanServerConnection, Optional.of(topic))); + } + } + + }); + } + } catch (Exception e) { + LOG.error("Get jmxHosts error!" + e.getMessage()); + + } + JSONObject response = new JSONObject(); + Set keys = result.keySet(); + for (String key : keys) { + response.put(key, new JSONObject(new FormatedMeterMetric(result.get(key)))); + } + return response.toString(); } } diff --git a/src/main/java/com/chickling/kmonitor/core/OffsetGetter.java b/src/main/java/com/chickling/kmonitor/core/OffsetGetter.java index 158469f..c4bb53d 100644 --- a/src/main/java/com/chickling/kmonitor/core/OffsetGetter.java +++ b/src/main/java/com/chickling/kmonitor/core/OffsetGetter.java @@ -178,7 +178,7 @@ public Node getClusterViz() { List brokers = JavaConversions.seqAsJavaList(ZKUtils.getZKUtilsFromKafka().getAllBrokersInCluster()); brokers.forEach(broker -> { List endPoints = JavaConversions.seqAsJavaList(broker.endPoints().seq()); - childNodes.add(new Node(endPoints.get(0).host() + ":" + endPoints.get(0).port(), null)); + childNodes.add(new Node(broker.id() + ":" + endPoints.get(0).host() + ":" + endPoints.get(0).port(), null)); }); rootNode.setChildren(childNodes); return rootNode; diff --git a/src/main/java/com/chickling/kmonitor/core/jmx/FormatedMeterMetric.java b/src/main/java/com/chickling/kmonitor/core/jmx/FormatedMeterMetric.java new file mode 100644 index 0000000..2f064d6 --- /dev/null +++ b/src/main/java/com/chickling/kmonitor/core/jmx/FormatedMeterMetric.java @@ -0,0 +1,78 @@ +package com.chickling.kmonitor.core.jmx; + +import com.chickling.kmonitor.utils.MetricUtils; + +/** + * @author Hulva Luva.H from ECBD + * @date 2017年7月20日 + * @description + * + */ +public class FormatedMeterMetric { + private Long count; + private String meanRate; + private String oneMinuteRate; + private String fiveMinuteRate; + private String fifteenMinuteRate; + + public FormatedMeterMetric() { + super(); + } + + public FormatedMeterMetric(MeterMetric metric) { + this(metric.getCount(), MetricUtils.sizeFormat(metric.getMeanRate()), + MetricUtils.sizeFormat(metric.getOneMinuteRate()), MetricUtils.sizeFormat(metric.getFiveMinuteRate()), + MetricUtils.sizeFormat(metric.getFifteenMinuteRate())); + } + + public FormatedMeterMetric(Long count, String meanRate, String oneMinuteRate, String fiveMinuteRate, + String fifteenMinuteRate) { + super(); + this.count = count; + this.meanRate = meanRate; + this.oneMinuteRate = oneMinuteRate; + this.fiveMinuteRate = fiveMinuteRate; + this.fifteenMinuteRate = fifteenMinuteRate; + } + + public Long getCount() { + return count; + } + + public void setCount(Long count) { + this.count = count; + } + + public String getFifteenMinuteRate() { + return fifteenMinuteRate; + } + + public void setFifteenMinuteRate(String fifteenMinuteRate) { + this.fifteenMinuteRate = fifteenMinuteRate; + } + + public String getFiveMinuteRate() { + return fiveMinuteRate; + } + + public void setFiveMinuteRate(String fiveMinuteRate) { + this.fiveMinuteRate = fiveMinuteRate; + } + + public String getOneMinuteRate() { + return oneMinuteRate; + } + + public void setOneMinuteRate(String oneMinuteRate) { + this.oneMinuteRate = oneMinuteRate; + } + + public String getMeanRate() { + return meanRate; + } + + public void setMeanRate(String meanRate) { + this.meanRate = meanRate; + } + +} diff --git a/src/main/java/com/chickling/kmonitor/core/jmx/KafkaMetrics.java b/src/main/java/com/chickling/kmonitor/core/jmx/KafkaMetrics.java index ebb0dcb..6eb8115 100644 --- a/src/main/java/com/chickling/kmonitor/core/jmx/KafkaMetrics.java +++ b/src/main/java/com/chickling/kmonitor/core/jmx/KafkaMetrics.java @@ -79,13 +79,13 @@ public MeterMetric getMessagesInPerSec(MBeanServerConnection mbsc, Optional topicName) { return getMeterMetric(mbsc, getObjectName(metricName, topicName)); } private MeterMetric getMeterMetric(MBeanServerConnection mbsc, ObjectName objectName) { - String[] attributes = { "Count", "FifteenMinuteRate", "FiveMinuteRate", "OneMinuteRate", "MeanRate" }; + String[] attributes = { "Count", "MeanRate", "OneMinuteRate", "FiveMinuteRate", "FifteenMinuteRate" }; AttributeList attributeList = null; try { attributeList = mbsc.getAttributes(objectName, attributes); diff --git a/src/main/java/com/chickling/kmonitor/core/jmx/MeterMetric.java b/src/main/java/com/chickling/kmonitor/core/jmx/MeterMetric.java index 54428d1..fa6cc80 100644 --- a/src/main/java/com/chickling/kmonitor/core/jmx/MeterMetric.java +++ b/src/main/java/com/chickling/kmonitor/core/jmx/MeterMetric.java @@ -17,8 +17,8 @@ public MeterMetric() { super(); } - public MeterMetric(Long count, Double fifteenMinuteRate, Double fiveMinuteRate, Double oneMinuteRate, - Double meanRate) { + public MeterMetric(Long count, Double meanRate, Double oneMinuteRate, Double fiveMinuteRate, + Double fifteenMinuteRate) { super(); this.count = count; this.fifteenMinuteRate = fifteenMinuteRate; diff --git a/src/main/java/com/chickling/kmonitor/initialize/SystemManager.java b/src/main/java/com/chickling/kmonitor/initialize/SystemManager.java index 8383d97..98e53b7 100644 --- a/src/main/java/com/chickling/kmonitor/initialize/SystemManager.java +++ b/src/main/java/com/chickling/kmonitor/initialize/SystemManager.java @@ -104,7 +104,7 @@ private static void initSystem() { og.close(); og = new ZKOffsetGetter(config); // TODO how cheack og is avialable? - + if (scheduler != null) scheduler.shutdownNow(); scheduler = Executors.newScheduledThreadPool(2, new WorkerThreadFactory("FixedRateSchedule")); diff --git a/src/main/java/com/chickling/kmonitor/utils/MetricUtils.java b/src/main/java/com/chickling/kmonitor/utils/MetricUtils.java index 4ee7053..0d19450 100644 --- a/src/main/java/com/chickling/kmonitor/utils/MetricUtils.java +++ b/src/main/java/com/chickling/kmonitor/utils/MetricUtils.java @@ -8,7 +8,7 @@ * */ public class MetricUtils { - private static char[] UNIT = { 'k', 'm', 'b', 't' }; + private static char[] UNIT = { 'k', 'm', 'b' }; public static String rateFormat(Double rate, int interation) { if (rate < 100) { @@ -28,10 +28,10 @@ public static String rateFormat(Double rate, int interation) { } } - public static String sizeFormat(Long bytes) { - int unit = 1000; + public static String sizeFormat(Double bytes) { + int unit = 1024; if (bytes < unit) { - return bytes + " B"; + return new BigDecimal(bytes).setScale(2, BigDecimal.ROUND_HALF_UP).toString() + " B"; } else { int exp = new Double((Math.log(bytes) / Math.log(unit))).intValue(); char pre = "kMGTPE".charAt(exp - 1); diff --git a/src/main/java/com/chickling/kmonitor/utils/ZKUtils.java b/src/main/java/com/chickling/kmonitor/utils/ZKUtils.java index 6466a21..f6b65bf 100644 --- a/src/main/java/com/chickling/kmonitor/utils/ZKUtils.java +++ b/src/main/java/com/chickling/kmonitor/utils/ZKUtils.java @@ -175,7 +175,8 @@ public static List getKafkaJMXHostsFromZookeeper() throws Exception { JSONObject jsonObj = new JSONObject(brokerInfo); if (jsonObj.has("host")) { if (jsonObj.has("jmx_port")) { - kafkaHosts.add(jsonObj.get("host").toString() + ":" + jsonObj.get("jmx_port").toString()); + kafkaHosts.add( + id + ":" + jsonObj.get("host").toString() + ":" + jsonObj.get("jmx_port").toString()); } } } catch (Exception e) { diff --git a/src/main/resources/static/scripts/app.js b/src/main/resources/static/scripts/app.js index 164a6ec..1e90762 100644 --- a/src/main/resources/static/scripts/app.js +++ b/src/main/resources/static/scripts/app.js @@ -202,6 +202,15 @@ angular.module("offsetapp.services", [ "ngResource" ]) }, getSetting: function() { return $http.get("./setting"); + }, + brokerTopicMetricsForBrokers: function() { + return $http.get("./metrics/brokerTopicMetrics/brokers"); + }, + brokerTopicMetricsForBroker: function(bid) { + return $http.get("./metrics/brokerTopicMetrics/broker/" + bid); + }, + brokerTopicMetricsForTopic: function(topic) { + return $http.get("./metrics/brokerTopicMetrics/topic/" + topic); } }; } ]); \ No newline at end of file diff --git a/src/main/resources/static/scripts/controllers.js b/src/main/resources/static/scripts/controllers.js index b7b8e6a..6a36c58 100644 --- a/src/main/resources/static/scripts/controllers.js +++ b/src/main/resources/static/scripts/controllers.js @@ -33,8 +33,14 @@ angular.module('offsetapp.controllers', [ "offsetapp.services" ]) $scope.loading = false; }); $scope.loading = true; - + $scope.topic = $routeParams.topic; + offsetinfo.brokerTopicMetricsForTopic($scope.topic).success(function(d) { + if(d.BytesInPerSec){ + $scope.jmxEnabled = true; + } + $scope.brokerTopicMetrics = d; + }); } ]) .controller("TopicConsumersCtrl", [ "$scope", "$interval", "$routeParams", "offsetinfo", function($scope, $interval, $routeParams, offsetinfo) { @@ -46,11 +52,23 @@ angular.module('offsetapp.controllers', [ "offsetapp.services" ]) $scope.topic = $routeParams.topic; $scope.groupBy = 'topic'; + offsetinfo.brokerTopicMetricsForTopic($scope.topic).success(function(d) { + if(d.BytesInPerSec){ + $scope.jmxEnabled = true; + } + $scope.brokerTopicMetrics = d; + }); } ]) .controller("ClusterVizCtrl", [ "$scope", "$interval", "$routeParams", "offsetinfo", function($scope, $interval, $routeParams, offsetinfo) { $scope.loading = true; offsetinfo.loadClusterViz($routeParams.group, function(d) {}); + offsetinfo.brokerTopicMetricsForBrokers().success(function(d) { + if(d.BytesInPerSec){ + $scope.jmxEnabled = true; + } + $scope.brokerTopicMetrics = d; + }); } ]) .controller("ActiveTopicsVizCtrl", [ "$scope", "$interval", "$routeParams", "offsetinfo", function($scope, $interval, $routeParams, offsetinfo) { @@ -121,11 +139,6 @@ angular.module('offsetapp.controllers', [ "offsetapp.services" ]) }); }); - offsetinfo.isAlertEnabled().success(function(d) { - if(!d.isAlertEnabled){ - $('#newTask').prop('disabled', true); - } - }); $scope.task = { group : $routeParams.group, @@ -136,13 +149,19 @@ angular.module('offsetapp.controllers', [ "offsetapp.services" ]) } var isTaskExists = false; - offsetinfo.listTasks().success(function(d) { - d.forEach(function(t) { - if (t.group === $routeParams.group && t.topic === $routeParams.topic) { - isTaskExists = true; - $scope.task = t; - } - }); + offsetinfo.isAlertEnabled().success(function(d) { + if(!d.isAlertEnabled){ + $('#newTask').prop('disabled', true); + }else{ + offsetinfo.listTasks().success(function(d) { + d.forEach(function(t) { + if (t.group === $routeParams.group && t.topic === $routeParams.topic) { + isTaskExists = true; + $scope.task = t; + } + }); + }); + } }); $('#taskModal') @@ -535,6 +554,12 @@ angular.module('offsetapp.controllers', [ "offsetapp.services" ]) function($scope, $routeParams, offsetinfo) { $scope.loading = true; $scope.brokerEndpoint = $routeParams.endpoint; + offsetinfo.brokerTopicMetricsForBroker($scope.brokerEndpoint.split(":", 1)[0]).success(function(d) { + if(d.BytesInPerSec){ + $scope.jmxEnabled = true; + } + $scope.brokerTopicMetrics = d; + }); // var options = { // axisY : { // type : Chartist.AutoScaleAxis, diff --git a/src/main/resources/static/views/broker.html b/src/main/resources/static/views/broker.html index 4ae27d4..a8f3ab0 100644 --- a/src/main/resources/static/views/broker.html +++ b/src/main/resources/static/views/broker.html @@ -4,7 +4,7 @@

-
+

Metrics

@@ -19,52 +19,58 @@

Metrics

- + + Bytes in + {{brokerTopicMetrics.BytesInPerSec.meanRate}} + {{brokerTopicMetrics.BytesInPerSec.oneMinuteRate}} + {{brokerTopicMetrics.BytesInPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.BytesInPerSec.fifteenMinuteRate}} - Bytes in /sec - 1.64 - 0.00 - 0.00 - 0.00 - - Bytes out /sec - 1.64 - 0.00 - 0.00 - 0.00 + Bytes out + {{brokerTopicMetrics.BytesOutPerSec.meanRate}} + {{brokerTopicMetrics.BytesOutPerSec.oneMinuteRate}} + {{brokerTopicMetrics.BytesOutPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.BytesOutPerSec.fifteenMinuteRate}} - Bytes rejected /sec - 0.00 - 0.00 - 0.00 - 0.00 + Bytes rejected + {{brokerTopicMetrics.BytesRejectedPerSec.meanRate}} + {{brokerTopicMetrics.BytesRejectedPerSec.oneMinuteRate}} + {{brokerTopicMetrics.BytesRejectedPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.BytesRejectedPerSec.fifteenMinuteRate}} - Failed fetch request /sec - 0.00 - 0.00 - 0.00 - 0.00 + Failed fetch request + {{brokerTopicMetrics.FailedFetchRequestsPerSec.meanRate}} + {{brokerTopicMetrics.FailedFetchRequestsPerSec.oneMinuteRate}} + {{brokerTopicMetrics.FailedFetchRequestsPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.FailedFetchRequestsPerSec.fifteenMinuteRate}} - Failed produce request /sec - 0.00 - 0.00 - 0.00 - 0.00 + Failed produce request + {{brokerTopicMetrics.FailedProduceRequestsPerSec.meanRate}} + {{brokerTopicMetrics.FailedProduceRequestsPerSec.oneMinuteRate}} + {{brokerTopicMetrics.FailedProduceRequestsPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.FailedProduceRequestsPerSec.fifteenMinuteRate}}
+ + + Bytes in + {{brokerTopicMetrics.BytesInPerSec.meanRate}} + {{brokerTopicMetrics.BytesInPerSec.oneMinuteRate}} + {{brokerTopicMetrics.BytesInPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.BytesInPerSec.fifteenMinuteRate}} - - Bytes in /sec - 1.64 - 0.00 - 0.00 - 0.00 + Bytes out + {{brokerTopicMetrics.BytesOutPerSec.meanRate}} + {{brokerTopicMetrics.BytesOutPerSec.oneMinuteRate}} + {{brokerTopicMetrics.BytesOutPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.BytesOutPerSec.fifteenMinuteRate}} - Bytes out /sec - 1.64 - 0.00 - 0.00 - 0.00 + Bytes rejected + {{brokerTopicMetrics.BytesRejectedPerSec.meanRate}} + {{brokerTopicMetrics.BytesRejectedPerSec.oneMinuteRate}} + {{brokerTopicMetrics.BytesRejectedPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.BytesRejectedPerSec.fifteenMinuteRate}} - Bytes rejected /sec - 0.00 - 0.00 - 0.00 - 0.00 + Failed fetch request + {{brokerTopicMetrics.FailedFetchRequestsPerSec.meanRate}} + {{brokerTopicMetrics.FailedFetchRequestsPerSec.oneMinuteRate}} + {{brokerTopicMetrics.FailedFetchRequestsPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.FailedFetchRequestsPerSec.fifteenMinuteRate}} - Failed fetch request /sec - 0.00 - 0.00 - 0.00 - 0.00 - - Failed produce request /sec - 0.00 - 0.00 - 0.00 - 0.00 + Failed produce request + {{brokerTopicMetrics.FailedProduceRequestsPerSec.meanRate}} + {{brokerTopicMetrics.FailedProduceRequestsPerSec.oneMinuteRate}} + {{brokerTopicMetrics.FailedProduceRequestsPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.FailedProduceRequestsPerSec.fifteenMinuteRate}}
+ + diff --git a/src/main/resources/static/views/setting.html b/src/main/resources/static/views/setting.html index 79e7ee6..02fb1b7 100644 --- a/src/main/resources/static/views/setting.html +++ b/src/main/resources/static/views/setting.html @@ -34,7 +34,7 @@

Zookeeper

Elasticsearch

diff --git a/src/main/resources/static/views/topic-consumers.html b/src/main/resources/static/views/topic-consumers.html index a913da3..00a26ec 100644 --- a/src/main/resources/static/views/topic-consumers.html +++ b/src/main/resources/static/views/topic-consumers.html @@ -19,54 +19,58 @@

Metrics

- + - Bytes in /sec - 1.64 - 0.00 - 0.00 - 0.00 + Bytes in + {{brokerTopicMetrics.BytesInPerSec.meanRate}} + {{brokerTopicMetrics.BytesInPerSec.oneMinuteRate}} + {{brokerTopicMetrics.BytesInPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.BytesInPerSec.fifteenMinuteRate}} - - - Bytes out /sec - 1.64 - 0.00 - 0.00 - 0.00 + Bytes out + {{brokerTopicMetrics.BytesOutPerSec.meanRate}} + {{brokerTopicMetrics.BytesOutPerSec.oneMinuteRate}} + {{brokerTopicMetrics.BytesOutPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.BytesOutPerSec.fifteenMinuteRate}} - Bytes rejected /sec - 0.00 - 0.00 - 0.00 - 0.00 + Bytes rejected + {{brokerTopicMetrics.BytesRejectedPerSec.meanRate}} + {{brokerTopicMetrics.BytesRejectedPerSec.oneMinuteRate}} + {{brokerTopicMetrics.BytesRejectedPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.BytesRejectedPerSec.fifteenMinuteRate}} - Failed fetch request /sec - 0.00 - 0.00 - 0.00 - 0.00 + Failed fetch request + {{brokerTopicMetrics.FailedFetchRequestsPerSec.meanRate}} + {{brokerTopicMetrics.FailedFetchRequestsPerSec.oneMinuteRate}} + {{brokerTopicMetrics.FailedFetchRequestsPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.FailedFetchRequestsPerSec.fifteenMinuteRate}} - Failed produce request /sec - 0.00 - 0.00 - 0.00 - 0.00 + Failed produce request + {{brokerTopicMetrics.FailedProduceRequestsPerSec.meanRate}} + {{brokerTopicMetrics.FailedProduceRequestsPerSec.oneMinuteRate}} + {{brokerTopicMetrics.FailedProduceRequestsPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.FailedProduceRequestsPerSec.fifteenMinuteRate}} + +
Loading ...
diff --git a/src/main/resources/static/views/topic-detail.html b/src/main/resources/static/views/topic-detail.html index 5f8aa0f..61d278f 100644 --- a/src/main/resources/static/views/topic-detail.html +++ b/src/main/resources/static/views/topic-detail.html @@ -19,62 +19,58 @@

Metrics

- + - + Bytes in + {{brokerTopicMetrics.BytesInPerSec.meanRate}} + {{brokerTopicMetrics.BytesInPerSec.oneMinuteRate}} + {{brokerTopicMetrics.BytesInPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.BytesInPerSec.fifteenMinuteRate}} - Bytes out /sec - 1.64 - 0.00 - 0.00 - 0.00 - - + Bytes out + {{brokerTopicMetrics.BytesOutPerSec.meanRate}} + {{brokerTopicMetrics.BytesOutPerSec.oneMinuteRate}} + {{brokerTopicMetrics.BytesOutPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.BytesOutPerSec.fifteenMinuteRate}} - Bytes rejected /sec - 0.00 - 0.00 - 0.00 - 0.00 - - + Bytes rejected + {{brokerTopicMetrics.BytesRejectedPerSec.meanRate}} + {{brokerTopicMetrics.BytesRejectedPerSec.oneMinuteRate}} + {{brokerTopicMetrics.BytesRejectedPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.BytesRejectedPerSec.fifteenMinuteRate}} - Failed fetch request /sec - 0.00 - 0.00 - 0.00 - 0.00 - - + Failed fetch request + {{brokerTopicMetrics.FailedFetchRequestsPerSec.meanRate}} + {{brokerTopicMetrics.FailedFetchRequestsPerSec.oneMinuteRate}} + {{brokerTopicMetrics.FailedFetchRequestsPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.FailedFetchRequestsPerSec.fifteenMinuteRate}} - Failed produce request /sec - 0.00 - 0.00 - 0.00 - 0.00 - - + Failed produce request + {{brokerTopicMetrics.FailedProduceRequestsPerSec.meanRate}} + {{brokerTopicMetrics.FailedProduceRequestsPerSec.oneMinuteRate}} + {{brokerTopicMetrics.FailedProduceRequestsPerSec.fiveMinuteRate}} + {{brokerTopicMetrics.FailedProduceRequestsPerSec.fifteenMinuteRate}} +
+ +
Loading ...
diff --git a/src/test/java/com/chickling/kmonitor/test/JMXTest.java b/src/test/java/com/chickling/kmonitor/test/JMXTest.java new file mode 100644 index 0000000..6fc42be --- /dev/null +++ b/src/test/java/com/chickling/kmonitor/test/JMXTest.java @@ -0,0 +1,131 @@ +package com.chickling.kmonitor.test; + +import java.io.BufferedWriter; +import java.io.FileWriter; +import java.io.PrintWriter; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import javax.management.Attribute; +import javax.management.AttributeList; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanInfo; +import javax.management.MBeanServerConnection; +import javax.management.ObjectInstance; + +import org.json.JSONArray; +import org.json.JSONObject; + +import com.chickling.kmonitor.core.jmx.JMXExecutor; +import com.chickling.kmonitor.core.jmx.KafkaJMX; + +/** + * @author Hulva Luva.H + * @since 2017-07-11 + * + */ +public class JMXTest { + + private static boolean excludeInternalTopic = true; // like __consumer_offsets + + public static void main(String[] args) { + KafkaJMX kafkaJMX = new KafkaJMX(); + objectName_Metrics(kafkaJMX); + objectNames(kafkaJMX); + } + + private static void objectNames(KafkaJMX kafkaJMX) { + kafkaJMX.doWithConnection("10.16.238.94", 8888, Optional.of(""), Optional.of(""), false, new JMXExecutor() { + + @Override + public void doWithConnection(MBeanServerConnection mbsc) { + // KafkaMetrics kafkaMetrics = new KafkaMetrics(); + try (FileWriter fw = new FileWriter("objectNames.json", true); + BufferedWriter bw = new BufferedWriter(fw); + PrintWriter out = new PrintWriter(bw)) { + Set beans = mbsc.queryMBeans(null, null); + JSONArray objectName = new JSONArray(); + for (ObjectInstance bean : beans) { + if (excludeInternalTopic && bean.getObjectName().toString().contains("__consumer_offsets")) { + continue; + } + System.out.println("ObjectName: " + bean.getObjectName()); + objectName.put(bean.getObjectName().toString()); + MBeanInfo mbeanInfo = mbsc.getMBeanInfo(bean.getObjectName()); + System.out.println("\tMBeanInfo: " + mbeanInfo); + MBeanAttributeInfo[] attributes = mbeanInfo.getAttributes(); + String[] attributeArr = new String[attributes.length]; + for (int i = 0; i < attributes.length; i++) { + attributeArr[i] = attributes[i].getName(); + } + AttributeList attributeList = mbsc.getAttributes(bean.getObjectName(), attributeArr); + List attributeList1 = attributeList.asList(); + + for (Attribute attr : attributeList1) { + System.out.println("\t\tName: " + attr.getName() + " Value: " + attr.getValue()); + } + } + out.println(objectName.toString()); + } catch (Exception e) { + + } + } + }); + + } + + private static void objectName_Metrics(KafkaJMX kafkaJMX) { + kafkaJMX.doWithConnection("10.16.238.94", 8888, Optional.of(""), Optional.of(""), false, new JMXExecutor() { + + @Override + public void doWithConnection(MBeanServerConnection mbsc) { + // KafkaMetrics kafkaMetrics = new KafkaMetrics(); + try (FileWriter fw = new FileWriter("metrics.json", true); + BufferedWriter bw = new BufferedWriter(fw); + PrintWriter out = new PrintWriter(bw)) { + Set beans = mbsc.queryMBeans(null, null); + + JSONArray objectNameMetrics = new JSONArray(); + JSONObject objectName = null; + for (ObjectInstance bean : beans) { + objectName = new JSONObject(); + String objectNameStr = bean.getObjectName().toString(); + if (excludeInternalTopic && objectNameStr.contains("__consumer_offsets")) { + continue; + } + System.out.println("ObjectName: " + objectNameStr); + String[] metric_other = objectNameStr.split(":"); + objectName.put("metric", metric_other[0]); + String[] type_name_other = metric_other[1].split(","); + String[] temp; + for (int i = 0; i < type_name_other.length; i++) { + temp = type_name_other[i].split("="); + objectName.put(temp[0], temp[1]); + } + objectName.put("objectName", bean.getObjectName().toString()); + MBeanInfo mbeanInfo = mbsc.getMBeanInfo(bean.getObjectName()); + System.out.println("\tMBeanInfo: " + mbeanInfo); + MBeanAttributeInfo[] attributes = mbeanInfo.getAttributes(); + String[] attributeArr = new String[attributes.length]; + for (int i = 0; i < attributes.length; i++) { + attributeArr[i] = attributes[i].getName(); + } + AttributeList attributeList = mbsc.getAttributes(bean.getObjectName(), attributeArr); + List attributeList1 = attributeList.asList(); + + for (Attribute attr : attributeList1) { + objectName.put(attr.getName(), attr.getValue()); + System.out.println("\t\tName: " + attr.getName() + " Value: " + attr.getValue()); + } + objectNameMetrics.put(objectName); + } + out.println(objectNameMetrics.toString()); + } catch (Exception e) { + + } + } + }); + } + +}