diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java index 7d7351f2f..7df467db2 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java @@ -86,6 +86,9 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager { @Autowired private PluginService pluginService; + @Autowired + private ApiCallFutureWaitUtil apiCallFutureWaitUtil; + @Override public Result createMirrorMaker(MirrorMakerCreateDTO dto, String operator) { // 检查基本参数 @@ -296,7 +299,9 @@ public PaginationResult getClusterMirrorMakersOver List mirrorMakerOverviewVOList = this.convert2ClusterMirrorMakerOverviewVO(mirrorMakerList, connectClusterList, latestMetricsResult.getData()); - PaginationResult voPaginationResult = this.pagingMirrorMakerInLocal(mirrorMakerOverviewVOList, dto); + List mirrorMakerVOList = this.completeClusterInfo(mirrorMakerOverviewVOList); + + PaginationResult voPaginationResult = this.pagingMirrorMakerInLocal(mirrorMakerVOList, dto); if (voPaginationResult.failed()) { LOGGER.error("method=ClusterMirrorMakerOverviewVO||clusterPhyId={}||result={}||errMsg=pagination in local failed", clusterPhyId, voPaginationResult); @@ -304,8 +309,7 @@ public PaginationResult getClusterMirrorMakersOver return PaginationResult.buildFailure(voPaginationResult, dto); } - //这里再补充源集群和目的集群信息,减少网络请求。 - this.completeClusterInfo(voPaginationResult.getData().getBizData()); + // 查询历史指标 @@ -596,14 +600,30 @@ public static List supplyData2ClusterMirrorMakerOv return voList; } - private void completeClusterInfo(List mirrorMakerVOList) { + private List completeClusterInfo(List mirrorMakerVOList) { + + Map connectorInfoMap = new HashMap<>(); for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) { - Result connectorInfoRet = connectorService.getConnectorInfoFromCluster(mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName()); - if (!connectorInfoRet.hasData()) { + apiCallFutureWaitUtil.runnableTask(String.format("method=completeClusterInfo||connectClusterId=%d||connectorName=%s||getMirrorMakerInfo", mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName()), + 3000 + , () -> { + Result connectorInfoRet = connectorService.getConnectorInfoFromCluster(mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName()); + if (connectorInfoRet.hasData()) { + connectorInfoMap.put(mirrorMakerVO.getConnectClusterId() + mirrorMakerVO.getConnectorName(), connectorInfoRet.getData()); + } + return connectorInfoRet.getData(); + }); + } + + apiCallFutureWaitUtil.waitResult(1000); + + List newMirrorMakerVOList = new ArrayList<>(); + for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) { + KSConnectorInfo connectorInfo = connectorInfoMap.get(mirrorMakerVO.getConnectClusterId() + mirrorMakerVO.getConnectorName()); + if (connectorInfo == null) { continue; } - KSConnectorInfo connectorInfo = connectorInfoRet.getData(); String sourceClusterAlias = connectorInfo.getConfig().get(MIRROR_MAKER_SOURCE_CLUSTER_ALIAS_FIELD_NAME); String targetClusterAlias = connectorInfo.getConfig().get(MIRROR_MAKER_TARGET_CLUSTER_ALIAS_FIELD_NAME); @@ -627,6 +647,10 @@ private void completeClusterInfo(List mirrorMakerV } } + newMirrorMakerVOList.add(mirrorMakerVO); + } + + return newMirrorMakerVOList; } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/ApiCallFutureWaitUtil.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/ApiCallFutureWaitUtil.java new file mode 100644 index 000000000..b70412907 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/ApiCallFutureWaitUtil.java @@ -0,0 +1,45 @@ +package com.xiaojukeji.know.streaming.km.common.utils; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.concurrent.Callable; + +/** + * @author wyb + * @date 2023/2/22 + */ +@Service +public class ApiCallFutureWaitUtil { + + @Value(value = "${thread-pool.api.future-util.core-size:2}") + private Integer corePoolSize; + + @Value(value = "${thread-pool.api.future-util.max-size:8}") + private Integer maxPoolSize; + + @Value(value = "${thread-pool.api.future-util.queue-size:500}") + private Integer queueSize; + + + private static FutureWaitUtil apiFutureUtil; + + @PostConstruct + private void init() { + apiFutureUtil = FutureWaitUtil.init( + "apiFutureThreadPool", + corePoolSize, + maxPoolSize, + queueSize + ); + } + + public static void runnableTask(String taskName, Integer timeoutUnisMs, Callable callable) { + apiFutureUtil.runnableTask(taskName, timeoutUnisMs, callable); + } + + public static void waitResult(Integer stepWaitTimeUnitMs) { + apiFutureUtil.waitResult(stepWaitTimeUnitMs); + } +} \ No newline at end of file