Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
@Autowired
private PluginService pluginService;

@Autowired
private ApiCallFutureWaitUtil apiCallFutureWaitUtil;

@Override
public Result<Void> createMirrorMaker(MirrorMakerCreateDTO dto, String operator) {
// 检查基本参数
Expand Down Expand Up @@ -296,16 +299,17 @@ public PaginationResult<ClusterMirrorMakerOverviewVO> getClusterMirrorMakersOver

List<ClusterMirrorMakerOverviewVO> mirrorMakerOverviewVOList = this.convert2ClusterMirrorMakerOverviewVO(mirrorMakerList, connectClusterList, latestMetricsResult.getData());

PaginationResult<ClusterMirrorMakerOverviewVO> voPaginationResult = this.pagingMirrorMakerInLocal(mirrorMakerOverviewVOList, dto);
List<ClusterMirrorMakerOverviewVO> mirrorMakerVOList = this.completeClusterInfo(mirrorMakerOverviewVOList);

PaginationResult<ClusterMirrorMakerOverviewVO> voPaginationResult = this.pagingMirrorMakerInLocal(mirrorMakerVOList, dto);

if (voPaginationResult.failed()) {
LOGGER.error("method=ClusterMirrorMakerOverviewVO||clusterPhyId={}||result={}||errMsg=pagination in local failed", clusterPhyId, voPaginationResult);

return PaginationResult.buildFailure(voPaginationResult, dto);
}

//这里再补充源集群和目的集群信息,减少网络请求。
this.completeClusterInfo(voPaginationResult.getData().getBizData());



// 查询历史指标
Expand Down Expand Up @@ -596,14 +600,30 @@ public static List<ClusterMirrorMakerOverviewVO> supplyData2ClusterMirrorMakerOv
return voList;
}

private void completeClusterInfo(List<ClusterMirrorMakerOverviewVO> mirrorMakerVOList) {
private List<ClusterMirrorMakerOverviewVO> completeClusterInfo(List<ClusterMirrorMakerOverviewVO> mirrorMakerVOList) {

Map<String, KSConnectorInfo> connectorInfoMap = new HashMap<>();

for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) {
Result<KSConnectorInfo> connectorInfoRet = connectorService.getConnectorInfoFromCluster(mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName());
if (!connectorInfoRet.hasData()) {
apiCallFutureWaitUtil.runnableTask(String.format("method=completeClusterInfo||connectClusterId=%d||connectorName=%s||getMirrorMakerInfo", mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName()),
3000
, () -> {
Result<KSConnectorInfo> connectorInfoRet = connectorService.getConnectorInfoFromCluster(mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName());
if (connectorInfoRet.hasData()) {
connectorInfoMap.put(mirrorMakerVO.getConnectClusterId() + mirrorMakerVO.getConnectorName(), connectorInfoRet.getData());
}
return connectorInfoRet.getData();
});
}

apiCallFutureWaitUtil.waitResult(1000);

List<ClusterMirrorMakerOverviewVO> newMirrorMakerVOList = new ArrayList<>();
for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) {
KSConnectorInfo connectorInfo = connectorInfoMap.get(mirrorMakerVO.getConnectClusterId() + mirrorMakerVO.getConnectorName());
if (connectorInfo == null) {
continue;
}
KSConnectorInfo connectorInfo = connectorInfoRet.getData();

String sourceClusterAlias = connectorInfo.getConfig().get(MIRROR_MAKER_SOURCE_CLUSTER_ALIAS_FIELD_NAME);
String targetClusterAlias = connectorInfo.getConfig().get(MIRROR_MAKER_TARGET_CLUSTER_ALIAS_FIELD_NAME);
Expand All @@ -627,6 +647,10 @@ private void completeClusterInfo(List<ClusterMirrorMakerOverviewVO> mirrorMakerV
}
}

newMirrorMakerVOList.add(mirrorMakerVO);

}

return newMirrorMakerVOList;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.xiaojukeji.know.streaming.km.common.utils;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.concurrent.Callable;

/**
* @author wyb
* @date 2023/2/22
*/
@Service
public class ApiCallFutureWaitUtil {

@Value(value = "${thread-pool.api.future-util.core-size:2}")
private Integer corePoolSize;

@Value(value = "${thread-pool.api.future-util.max-size:8}")
private Integer maxPoolSize;

@Value(value = "${thread-pool.api.future-util.queue-size:500}")
private Integer queueSize;


private static FutureWaitUtil<Object> apiFutureUtil;

@PostConstruct
private void init() {
apiFutureUtil = FutureWaitUtil.init(
"apiFutureThreadPool",
corePoolSize,
maxPoolSize,
queueSize
);
}

public static void runnableTask(String taskName, Integer timeoutUnisMs, Callable<Object> callable) {
apiFutureUtil.runnableTask(taskName, timeoutUnisMs, callable);
}

public static void waitResult(Integer stepWaitTimeUnitMs) {
apiFutureUtil.waitResult(stepWaitTimeUnitMs);
}
}