diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java index cd970528b..a67025f62 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java @@ -45,6 +45,7 @@ import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems; +import com.xiaojukeji.know.streaming.km.core.utils.ApiCallThreadPoolService; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.*; @@ -89,6 +90,9 @@ public class TopicStateManagerImpl implements TopicStateManager { @Autowired private GroupManager groupManager; + @Autowired + private ApiCallThreadPoolService apiCallThreadPoolService; + @Override public TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, String searchBrokerHost) throws NotExistException { Topic topic = topicService.getTopic(clusterPhyId, topicName); @@ -303,25 +307,38 @@ public Result> getTopicPartitions(Long clusterPhyId, Stri return Result.buildSuc(); } - Result> metricsResult = partitionMetricService.collectPartitionsMetricsFromKafka(clusterPhyId, topicName, metricsNames); - if (metricsResult.failed()) { - // 仅打印错误日志,但是不直接返回错误 - log.error( - "method=getTopicPartitions||clusterPhyId={}||topicName={}||result={}||msg=get metrics from es failed", - clusterPhyId, topicName, metricsResult - ); + List partitionMetricsList = new ArrayList<>(); + for (String metricName : metricsNames) { + apiCallThreadPoolService.runnableTask("task", 3000, () -> { + Result> metricsResult = partitionMetricService.collectPartitionsMetricsFromKafka(clusterPhyId, topicName, Arrays.asList(metricName)); + if (metricsResult.failed()) { + // 仅打印错误日志,但是不直接返回错误 + log.error( + "method=getTopicPartitions||clusterPhyId={}||topicName={}||result={}||msg=get metrics from es failed", + clusterPhyId, topicName, metricsResult + ); + } + if (metricsResult.hasData()) { + partitionMetricsList.addAll(metricsResult.getData()); + } + return null; + }); } + apiCallThreadPoolService.waitResult(400); + + List metricsList = combinePartitionMetrics(partitionMetricsList); + // 转map Map metricsMap = new HashMap<>(); - if (metricsResult.hasData()) { - for (PartitionMetrics metrics: metricsResult.getData()) { + if (!metricsList.isEmpty()) { + for (PartitionMetrics metrics : metricsList) { metricsMap.put(metrics.getPartitionId(), metrics); } } List voList = new ArrayList<>(); - for (Partition partition: partitionList) { + for (Partition partition : partitionList) { voList.add(TopicVOConverter.convert2TopicPartitionVO(partition, metricsMap.get(partition.getPartitionId()))); } return Result.buildSuc(voList); @@ -450,4 +467,17 @@ private Properties generateClientProperties(ClusterPhy clusterPhy, Integer maxPo props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Math.max(2, Math.min(5, maxPollRecords))); return props; } -} + + private List combinePartitionMetrics(List partitionMetricsList) { + Map metricsMap = new HashMap<>(); + for (PartitionMetrics partitionMetrics : partitionMetricsList) { + PartitionMetrics metrics = metricsMap.get(partitionMetrics.getClusterPhyId() + "@" + partitionMetrics.getPartitionId()); + if (metrics == null) { + metricsMap.put(partitionMetrics.getClusterPhyId() + "@" + partitionMetrics.getPartitionId(), partitionMetrics); + } else { + metrics.putMetric(partitionMetrics.getMetrics()); + } + } + return metricsMap.values().stream().collect(Collectors.toList()); + } +} \ No newline at end of file