Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.springframework.stereotype.Service;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -597,7 +598,7 @@ public static List<ClusterMirrorMakerOverviewVO> supplyData2ClusterMirrorMakerOv

private List<ClusterMirrorMakerOverviewVO> completeClusterInfo(List<ClusterMirrorMakerOverviewVO> mirrorMakerVOList) {

Map<String, KSConnectorInfo> connectorInfoMap = new HashMap<>();
Map<String, KSConnectorInfo> connectorInfoMap = new ConcurrentHashMap<>();

for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) {
ApiCallThreadPoolService.runnableTask(String.format("method=completeClusterInfo||connectClusterId=%d||connectorName=%s||getMirrorMakerInfo", mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName()),
Expand All @@ -607,12 +608,10 @@ private List<ClusterMirrorMakerOverviewVO> completeClusterInfo(List<ClusterMirro
if (connectorInfoRet.hasData()) {
connectorInfoMap.put(mirrorMakerVO.getConnectClusterId() + mirrorMakerVO.getConnectorName(), connectorInfoRet.getData());
}

return connectorInfoRet.getData();
});
}

ApiCallThreadPoolService.waitResult(1000);
ApiCallThreadPoolService.waitResult();

List<ClusterMirrorMakerOverviewVO> newMirrorMakerVOList = new ArrayList<>();
for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems;
import com.xiaojukeji.know.streaming.km.core.utils.ApiCallThreadPoolService;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
Expand All @@ -60,7 +61,7 @@

@Component
public class TopicStateManagerImpl implements TopicStateManager {
private static final ILog log = LogFactory.getLog(TopicStateManagerImpl.class);
private static final ILog LOGGER = LogFactory.getLog(TopicStateManagerImpl.class);

@Autowired
private TopicService topicService;
Expand Down Expand Up @@ -232,26 +233,37 @@ public Result<TopicStateVO> getTopicState(Long clusterPhyId, String topicName) {

@Override
public Result<List<TopicPartitionVO>> getTopicPartitions(Long clusterPhyId, String topicName, List<String> metricsNames) {
long startTime = System.currentTimeMillis();

List<Partition> partitionList = partitionService.listPartitionByTopic(clusterPhyId, topicName);
if (ValidateUtils.isEmptyList(partitionList)) {
return Result.buildSuc();
}

Result<List<PartitionMetrics>> metricsResult = partitionMetricService.collectPartitionsMetricsFromKafka(clusterPhyId, topicName, metricsNames);
if (metricsResult.failed()) {
// 仅打印错误日志,但是不直接返回错误
log.error(
"method=getTopicPartitions||clusterPhyId={}||topicName={}||result={}||msg=get metrics from es failed",
clusterPhyId, topicName, metricsResult
);
}

// 转map
Map<Integer, PartitionMetrics> metricsMap = new HashMap<>();
if (metricsResult.hasData()) {
for (PartitionMetrics metrics: metricsResult.getData()) {
metricsMap.put(metrics.getPartitionId(), metrics);
}
ApiCallThreadPoolService.runnableTask(
String.format("clusterPhyId=%d||topicName=%s||method=getTopicPartitions", clusterPhyId, topicName),
ksConfigUtils.getApiCallLeftTimeUnitMs(System.currentTimeMillis() - startTime),
() -> {
Result<List<PartitionMetrics>> metricsResult = partitionMetricService.collectPartitionsMetricsFromKafka(clusterPhyId, topicName, metricsNames);
if (metricsResult.failed()) {
// 仅打印错误日志,但是不直接返回错误
LOGGER.error(
"method=getTopicPartitions||clusterPhyId={}||topicName={}||result={}||msg=get metrics from kafka failed",
clusterPhyId, topicName, metricsResult
);
}

for (PartitionMetrics metrics: metricsResult.getData()) {
metricsMap.put(metrics.getPartitionId(), metrics);
}
}
);
boolean finished = ApiCallThreadPoolService.waitResultAndReturnFinished(1);

if (!finished && metricsMap.isEmpty()) {
// 未完成 -> 打印日志
LOGGER.error("method=getTopicPartitions||clusterPhyId={}||topicName={}||msg=get metrics from kafka failed", clusterPhyId, topicName);
}

List<TopicPartitionVO> voList = new ArrayList<>();
Expand Down Expand Up @@ -423,7 +435,7 @@ private List<TopicRecordVO> getTopicMessages(ClusterPhy clusterPhy,

return voList;
} catch (Exception e) {
log.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhy.getId(), topicName, dto, e);
LOGGER.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhy.getId(), topicName, dto, e);

throw new AdminOperateException(e.getMessage(), e, ResultStatus.KAFKA_OPERATE_FAILED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.concurrent.Callable;

/**
Expand All @@ -21,7 +22,7 @@ public class ApiCallThreadPoolService {
@Value(value = "${thread-pool.api.queue-size:500}")
private Integer queueSize;

private static FutureWaitUtil<Object> apiFutureUtil;
private static FutureWaitUtil<Boolean> apiFutureUtil;

@PostConstruct
private void init() {
Expand All @@ -33,20 +34,21 @@ private void init() {
);
}

public static void runnableTask(String taskName, Integer timeoutUnisMs, Callable<Object> callable) {
public static void runnableTask(String taskName, Integer timeoutUnisMs, Callable<Boolean> callable) {
apiFutureUtil.runnableTask(taskName, timeoutUnisMs, callable);
}

public static void runnableTask(String taskName, Integer timeoutUnisMs, Runnable runnable) {
apiFutureUtil.runnableTask(taskName, timeoutUnisMs, runnable);
}

@Deprecated
public static void waitResult(Integer stepWaitTimeUnitMs) {
apiFutureUtil.waitResult(stepWaitTimeUnitMs);
}

public static void waitResult() {
apiFutureUtil.waitResult(0);
}

public static boolean waitResultAndReturnFinished(int taskNum) {
List<Boolean> resultList = apiFutureUtil.waitResult(0);

return resultList != null && resultList.size() == taskNum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Result<TopicBrokerAllVO> getTopicBrokers(@PathVariable Long clusterPhyId,
@GetMapping(value = "clusters/{clusterPhyId}/topics/{topicName}/brokers-partitions-summary")
@ResponseBody
public Result<TopicBrokersPartitionsSummaryVO> getTopicBrokersPartitionsSummary(@PathVariable Long clusterPhyId,
@PathVariable String topicName) throws Exception {
@PathVariable String topicName) {
return topicStateManager.getTopicBrokersPartitionsSummary(clusterPhyId, topicName);
}

Expand All @@ -83,7 +83,7 @@ public Result<TopicBrokersPartitionsSummaryVO> getTopicBrokersPartitionsSummary(
@ResponseBody
public Result<List<TopicPartitionVO>> getTopicPartitions(@PathVariable Long clusterPhyId,
@PathVariable String topicName,
@RequestBody List<String> metricsNames) throws Exception {
@RequestBody List<String> metricsNames) {
return topicStateManager.getTopicPartitions(clusterPhyId, topicName, metricsNames);
}

Expand Down