Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.chickling.kmonitor.initialize.SystemManager;
import com.chickling.kmonitor.jmx.FormatedMeterMetric;
import com.chickling.kmonitor.jmx.JMXExecutor;
import com.chickling.kmonitor.jmx.KafkaJMX;
import com.chickling.kmonitor.jmx.KafkaMetrics;
import com.chickling.kmonitor.jmx.MeterMetric;
import com.chickling.kmonitor.utils.ZKUtils;
import com.chickling.kmonitor.utils.elasticsearch.restapi.ElasticsearchRESTUtil;

/**
*
Expand Down Expand Up @@ -107,6 +110,7 @@ public void doWithConnection(MBeanServerConnection mBeanServerConnection) {
response.put(key, new JSONObject(new FormatedMeterMetric(result.get(key))));
}
}
response.put("esUrl", SystemManager.getConfig().getEsHosts().split(":")[0] + ":9200/" + SystemManager.getConfig().getEsIndex() + "-*");
return response.toString();
}

Expand Down Expand Up @@ -151,7 +155,6 @@ public void doWithConnection(MBeanServerConnection mBeanServerConnection) {
}
} catch (Exception e) {
LOG.error("Get jmxHosts error!" + e.getMessage());

}
return response.toString();
}
Expand Down Expand Up @@ -230,4 +233,9 @@ public void doWithConnection(MBeanServerConnection mBeanServerConnection) {
}
return response.toString();
}

@RequestMapping(value = "/metricviz", method = RequestMethod.POST)
public String getMetricVizData(@RequestBody String metric) {
return ElasticsearchRESTUtil.metricVizDataSearch(metric);
}
}
200 changes: 102 additions & 98 deletions src/main/java/com/chickling/kmonitor/core/db/ElasticsearchOffsetDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,103 +21,107 @@
* @author Hulva Luva.H
*
*/
public class ElasticsearchOffsetDB implements OffsetDB {

private Ielasticsearch esUtil;
private String indexPrefix;
private String docType;

private static final SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");

public ElasticsearchOffsetDB(AppConfig config) {
if (config.getApiType().equalsIgnoreCase("Java API")) {
esUtil = new ElasticsearchJavaUtil(config.getEsHosts());
} else {
esUtil = new ElasticsearchRESTUtil(config.getEsHosts());
}

setIndexAndType(config.getEsIndex(), config.getDocTypeForOffset());
}

public void setIndexAndType(String index, String docType) {
this.indexPrefix = index + "-";
this.docType = docType;
}

/*
* (non-Javadoc)
*
* @see com.chickling.kmonitor.core.db.OffsetDb#insert(long,
* com.chickling.kmonitor.model.OffsetInfo)
*/
@Override
public void insert(long timestamp, OffsetInfo offsetInfo) {
// TODO Auto-generated method stub
}

/*
* (non-Javadoc)
*
* @see com.chickling.kmonitor.core.db.OffsetDb#batchInsert(java. util.List)
*/
@Override
public void batchInsert(List<OffsetInfo> offsetInfoList) {
long now = System.currentTimeMillis();
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(now);
cal.set(Calendar.MILLISECOND, 0);
cal.set(Calendar.SECOND, 0);

JSONObject data = new JSONObject();
for (int i = 0; i < offsetInfoList.size(); i++) {
data.put(i + "", generateRecord(cal.getTimeInMillis(), offsetInfoList.get(i)));
}
esUtil.bulkIndex(data, docType, indexPrefix);
}

private JSONObject generateRecord(long timestamp, OffsetInfo offsetInfo) {
JSONObject data = new JSONObject();
data.put("group", offsetInfo.getGroup());
data.put("topic", offsetInfo.getTopic());
data.put("partition", offsetInfo.getPartition());
data.put("offset", offsetInfo.getOffset());
data.put("logSize", offsetInfo.getLogSize());
data.put("owner", offsetInfo.getOwner()/* owner.getOrElse("NA") */);
data.put("date", sFormat.format(new Date(timestamp)));
data.put("timestamp", timestamp);
data.put("creation", offsetInfo.getCreation());
data.put("modified", offsetInfo.getModified());
data.put("lag", offsetInfo.getLag());
return data;
}

/*
* (non-Javadoc)
*
* @see com.chickling.kmonitor.core.db.OffsetDb#offsetHistory()
*/
@Override
public OffsetHistory offsetHistory(String group, String topic) {
List<OffsetPoints> offsetPointsList = esUtil.offsetHistory(indexPrefix, docType, group, topic);
CommonUtils.sortByTimestampThenPartition(offsetPointsList);
return new OffsetHistory(group, topic, offsetPointsList);
}

@Override
public OffsetHistory offsetHistory(OffsetHistoryQueryParams params) {
List<OffsetPoints> offsetPointsList = esUtil.scrollsSearcher(params, docType, indexPrefix);
CommonUtils.sortByTimestampThenPartition(offsetPointsList);
return new OffsetHistory(params.getGroup(), params.getTopic(), offsetPointsList);
}

@Override
public boolean check() {
return esUtil.check();
}

@Override
public void close() {
esUtil.close();
}
public class ElasticsearchOffsetDB implements OffsetDB<Ielasticsearch> {

private Ielasticsearch esUtil;
private String indexPrefix;
private String docType;

private static final SimpleDateFormat sFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");

public ElasticsearchOffsetDB(AppConfig config) {
if (config.getApiType().equalsIgnoreCase("Java API")) {
esUtil = new ElasticsearchJavaUtil(config.getEsHosts());
} else {
esUtil = new ElasticsearchRESTUtil(config.getEsHosts());
}
setIndexAndType(config.getEsIndex(), config.getDocTypeForOffset());
}

public void setIndexAndType(String index, String docType) {
this.indexPrefix = index + "-";
this.docType = docType;
}

/*
* (non-Javadoc)
*
* @see com.chickling.kmonitor.core.db.OffsetDb#insert(long,
* com.chickling.kmonitor.model.OffsetInfo)
*/
@Override
public void insert(long timestamp, OffsetInfo offsetInfo) {
// TODO Auto-generated method stub
}

/*
* (non-Javadoc)
*
* @see com.chickling.kmonitor.core.db.OffsetDb#batchInsert(java. util.List)
*/
@Override
public void batchInsert(List<OffsetInfo> offsetInfoList) {
long now = System.currentTimeMillis();
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(now);
cal.set(Calendar.MILLISECOND, 0);
cal.set(Calendar.SECOND, 0);

JSONObject data = new JSONObject();
for (int i = 0; i < offsetInfoList.size(); i++) {
data.put(i + "", generateRecord(cal.getTimeInMillis(), offsetInfoList.get(i)));
}
esUtil.bulkIndex(data, docType, indexPrefix);
}

private JSONObject generateRecord(long timestamp, OffsetInfo offsetInfo) {
JSONObject data = new JSONObject();
data.put("group", offsetInfo.getGroup());
data.put("topic", offsetInfo.getTopic());
data.put("partition", offsetInfo.getPartition());
data.put("offset", offsetInfo.getOffset());
data.put("logSize", offsetInfo.getLogSize());
data.put("owner", offsetInfo.getOwner()/* owner.getOrElse("NA") */);
data.put("date", sFormat.format(new Date(timestamp)));
data.put("timestamp", timestamp);
data.put("creation", offsetInfo.getCreation());
data.put("modified", offsetInfo.getModified());
data.put("lag", offsetInfo.getLag());
return data;
}

/*
* (non-Javadoc)
*
* @see com.chickling.kmonitor.core.db.OffsetDb#offsetHistory()
*/
@Override
public OffsetHistory offsetHistory(String group, String topic) {
List<OffsetPoints> offsetPointsList = esUtil.offsetHistory(indexPrefix, docType, group, topic);
CommonUtils.sortByTimestampThenPartition(offsetPointsList);
return new OffsetHistory(group, topic, offsetPointsList);
}

@Override
public OffsetHistory offsetHistory(OffsetHistoryQueryParams params) {
List<OffsetPoints> offsetPointsList = esUtil.scrollsSearcher(params, docType, indexPrefix);
CommonUtils.sortByTimestampThenPartition(offsetPointsList);
return new OffsetHistory(params.getGroup(), params.getTopic(), offsetPointsList);
}

@Override
public boolean check() {
return esUtil.check();
}

@Override
public void close() {
esUtil.close();
}

@Override
public Ielasticsearch getDB() {
return esUtil;
}

}
20 changes: 11 additions & 9 deletions src/main/java/com/chickling/kmonitor/core/db/OffsetDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@
* @author Hulva Luva.H
*
*/
public interface OffsetDB {
void insert(long timestamp, OffsetInfo offsetInfo);
public interface OffsetDB<T> {
T getDB();

void batchInsert(List<OffsetInfo> offsetInfoList);
void insert(long timestamp, OffsetInfo offsetInfo);

OffsetHistory offsetHistory(String group, String topic);
void batchInsert(List<OffsetInfo> offsetInfoList);

OffsetHistory offsetHistory(OffsetHistoryQueryParams params);

void close();

boolean check();
OffsetHistory offsetHistory(String group, String topic);

OffsetHistory offsetHistory(OffsetHistoryQueryParams params);

void close();

boolean check();
}
Loading