From 066c688579a02edbc9684dce67b3d5e63817fb24 Mon Sep 17 00:00:00 2001 From: WangYaobo <1164642317@qq.com> Date: Fri, 3 Mar 2023 12:33:40 +0800 Subject: [PATCH] =?UTF-8?q?[Bugfix]topicpartition=E4=BF=A1=E6=81=AF?= =?UTF-8?q?=E8=AF=B7=E6=B1=82=E6=94=B9=E4=B8=BA=E5=B9=B6=E5=8F=91(#791)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../biz/topic/impl/TopicStateManagerImpl.java | 52 +++++++++++++++---- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java index cd970528b..a67025f62 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java @@ -45,6 +45,7 @@ import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems; +import com.xiaojukeji.know.streaming.km.core.utils.ApiCallThreadPoolService; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.*; @@ -89,6 +90,9 @@ public class TopicStateManagerImpl implements TopicStateManager { @Autowired private GroupManager groupManager; + @Autowired + private ApiCallThreadPoolService apiCallThreadPoolService; + @Override public TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, String searchBrokerHost) throws NotExistException { Topic topic = topicService.getTopic(clusterPhyId, topicName); @@ -303,25 +307,38 @@ public Result> getTopicPartitions(Long clusterPhyId, Stri return Result.buildSuc(); } - Result> metricsResult = partitionMetricService.collectPartitionsMetricsFromKafka(clusterPhyId, topicName, metricsNames); - if (metricsResult.failed()) { - // 仅打印错误日志,但是不直接返回错误 - log.error( - "method=getTopicPartitions||clusterPhyId={}||topicName={}||result={}||msg=get metrics from es failed", - clusterPhyId, topicName, metricsResult - ); + List partitionMetricsList = new ArrayList<>(); + for (String metricName : metricsNames) { + apiCallThreadPoolService.runnableTask("task", 3000, () -> { + Result> metricsResult = partitionMetricService.collectPartitionsMetricsFromKafka(clusterPhyId, topicName, Arrays.asList(metricName)); + if (metricsResult.failed()) { + // 仅打印错误日志,但是不直接返回错误 + log.error( + "method=getTopicPartitions||clusterPhyId={}||topicName={}||result={}||msg=get metrics from es failed", + clusterPhyId, topicName, metricsResult + ); + } + if (metricsResult.hasData()) { + partitionMetricsList.addAll(metricsResult.getData()); + } + return null; + }); } + apiCallThreadPoolService.waitResult(400); + + List metricsList = combinePartitionMetrics(partitionMetricsList); + // 转map Map metricsMap = new HashMap<>(); - if (metricsResult.hasData()) { - for (PartitionMetrics metrics: metricsResult.getData()) { + if (!metricsList.isEmpty()) { + for (PartitionMetrics metrics : metricsList) { metricsMap.put(metrics.getPartitionId(), metrics); } } List voList = new ArrayList<>(); - for (Partition partition: partitionList) { + for (Partition partition : partitionList) { voList.add(TopicVOConverter.convert2TopicPartitionVO(partition, metricsMap.get(partition.getPartitionId()))); } return Result.buildSuc(voList); @@ -450,4 +467,17 @@ private Properties generateClientProperties(ClusterPhy clusterPhy, Integer maxPo props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Math.max(2, Math.min(5, maxPollRecords))); return props; } -} + + private List combinePartitionMetrics(List partitionMetricsList) { + Map metricsMap = new HashMap<>(); + for (PartitionMetrics partitionMetrics : partitionMetricsList) { + PartitionMetrics metrics = metricsMap.get(partitionMetrics.getClusterPhyId() + "@" + partitionMetrics.getPartitionId()); + if (metrics == null) { + metricsMap.put(partitionMetrics.getClusterPhyId() + "@" + partitionMetrics.getPartitionId(), partitionMetrics); + } else { + metrics.putMetric(partitionMetrics.getMetrics()); + } + } + return metricsMap.values().stream().collect(Collectors.toList()); + } +} \ No newline at end of file