diff --git a/km-biz/pom.xml b/km-biz/pom.xml index a6db4220c..da4192c83 100644 --- a/km-biz/pom.xml +++ b/km-biz/pom.xml @@ -29,6 +29,11 @@ km-core ${project.parent.version} + + com.xiaojukeji.kafka + km-rebalance + ${project.parent.version} + diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/MultiClusterPhyManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/MultiClusterPhyManagerImpl.java index 7e379c996..984b21523 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/MultiClusterPhyManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/MultiClusterPhyManagerImpl.java @@ -15,6 +15,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyBaseVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyDashboardVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.converter.ClusterVOConverter; import com.xiaojukeji.know.streaming.km.common.enums.health.HealthStateEnum; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; @@ -24,6 +25,10 @@ import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ClusterMetricVersionItems; +import com.xiaojukeji.know.streaming.km.rebalance.algorithm.model.Resource; +import com.xiaojukeji.know.streaming.km.rebalance.common.BalanceMetricConstant; +import com.xiaojukeji.know.streaming.km.rebalance.common.bean.entity.ClusterBalanceItemState; +import com.xiaojukeji.know.streaming.km.rebalance.core.service.ClusterBalanceService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -40,6 +45,9 @@ public class MultiClusterPhyManagerImpl implements MultiClusterPhyManager { @Autowired private ClusterMetricService clusterMetricService; + @Autowired + private ClusterBalanceService clusterBalanceService; + @Override public ClusterPhysState getClusterPhysState() { List clusterPhyList = clusterPhyService.listAllClusters(); @@ -153,6 +161,11 @@ private PaginationResult getAndPagingClusterWithLatestMetricsFro ClusterMetrics clusterMetrics = clusterMetricService.getLatestMetricsFromCache(vo.getId()); clusterMetrics.getMetrics().putIfAbsent(ClusterMetricVersionItems.CLUSTER_METRIC_HEALTH_STATE, (float) HealthStateEnum.UNKNOWN.getDimension()); + Result balanceMetricsResult = this.getClusterLoadReBalanceInfo(vo.getId()); + if (balanceMetricsResult.hasData()) { + clusterMetrics.putMetric(balanceMetricsResult.getData().getMetrics()); + } + metricsList.add(clusterMetrics); } @@ -174,4 +187,21 @@ private MetricsClusterPhyDTO buildMetricsClusterPhyDTO(List clusterIdList, dto.setClusterPhyIds(clusterIdList); return dto; } + + private Result getClusterLoadReBalanceInfo(Long clusterPhyId) { + Result stateResult = clusterBalanceService.getItemStateFromCacheFirst(clusterPhyId); + if (stateResult.failed()) { + return Result.buildFromIgnoreData(stateResult); + } + + ClusterBalanceItemState state = stateResult.getData(); + + ClusterMetrics metric = ClusterMetrics.initWithMetrics(clusterPhyId, BalanceMetricConstant.CLUSTER_METRIC_LOAD_RE_BALANCE_ENABLE, state.getEnable()? Constant.YES: Constant.NO); + metric.putMetric(BalanceMetricConstant.CLUSTER_METRIC_LOAD_RE_BALANCE_CPU, state.getResItemState(Resource.CPU).floatValue()); + metric.putMetric(BalanceMetricConstant.CLUSTER_METRIC_LOAD_RE_BALANCE_NW_IN, state.getResItemState(Resource.NW_IN).floatValue()); + metric.putMetric(BalanceMetricConstant.CLUSTER_METRIC_LOAD_RE_BALANCE_NW_OUT, state.getResItemState(Resource.NW_OUT).floatValue()); + metric.putMetric(BalanceMetricConstant.CLUSTER_METRIC_LOAD_RE_BALANCE_DISK, state.getResItemState(Resource.DISK).floatValue()); + + return Result.buildSuc(metric); + } } diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/OpTopicManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/OpTopicManagerImpl.java index 22d204ea5..424594472 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/OpTopicManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/OpTopicManagerImpl.java @@ -7,6 +7,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicExpansionDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.config.KafkaTopicConfigParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam; @@ -17,17 +18,17 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; -import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils; -import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; -import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.common.utils.*; import com.xiaojukeji.know.streaming.km.common.utils.kafka.KafkaReplicaAssignUtil; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.topic.OpTopicService; +import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import kafka.admin.AdminUtils; import kafka.admin.BrokerMetadata; +import org.apache.kafka.common.config.TopicConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -61,6 +62,9 @@ public class OpTopicManagerImpl implements OpTopicManager { @Autowired private PartitionService partitionService; + @Autowired + private TopicConfigService topicConfigService; + @Override public Result createTopic(TopicCreateDTO dto, String operator) { log.info("method=createTopic||param={}||operator={}.", dto, operator); @@ -160,10 +164,27 @@ public Result expandTopic(TopicExpansionDTO dto, String operator) { @Override public Result truncateTopic(Long clusterPhyId, String topicName, String operator) { + // 增加delete配置 + Result> rt = this.addDeleteConfigIfNotExist(clusterPhyId, topicName, operator); + if (rt.failed()) { + log.error("method=truncateTopic||clusterPhyId={}||topicName={}||operator={}||result={}||msg=get config from kafka failed", clusterPhyId, topicName, operator, rt); + return Result.buildFromIgnoreData(rt); + } + // 清空Topic Result rv = opTopicService.truncateTopic(new TopicTruncateParam(clusterPhyId, topicName, KafkaConstant.TOPICK_TRUNCATE_DEFAULT_OFFSET), operator); if (rv.failed()) { - return rv; + log.error("method=truncateTopic||clusterPhyId={}||topicName={}||originConfig={}||operator={}||result={}||msg=truncate topic failed", clusterPhyId, topicName, rt.getData().v2(), operator, rv); + // config被修改了,则错误提示需要提醒一下,否则直接返回错误 + return rt.getData().v1() ? Result.buildFailure(rv.getCode(), rv.getMessage() + "\t\n" + String.format("Topic的CleanupPolicy已被修改,需要手动恢复为%s", rt.getData().v2())) : rv; + } + + // 恢复compact配置 + rv = this.recoverConfigIfChanged(clusterPhyId, topicName, rt.getData().v1(), rt.getData().v2(), operator); + if (rv.failed()) { + log.error("method=truncateTopic||clusterPhyId={}||topicName={}||originConfig={}||operator={}||result={}||msg=truncate topic success but recover config failed", clusterPhyId, topicName, rt.getData().v2(), operator, rv); + // config被修改了,则错误提示需要提醒一下,否则直接返回错误 + return Result.buildFailure(rv.getCode(), String.format("Topic清空操作已成功,但是恢复CleanupPolicy配置失败,需要手动恢复为%s。", rt.getData().v2()) + "\t\n" + rv.getMessage()); } return Result.buildSuc(); @@ -171,6 +192,44 @@ public Result truncateTopic(Long clusterPhyId, String topicName, String op /**************************************************** private method ****************************************************/ + private Result> addDeleteConfigIfNotExist(Long clusterPhyId, String topicName, String operator) { + // 获取Topic配置 + Result> configMapResult = topicConfigService.getTopicConfigFromKafka(clusterPhyId, topicName); + if (configMapResult.failed()) { + return Result.buildFromIgnoreData(configMapResult); + } + + String cleanupPolicyValue = configMapResult.getData().getOrDefault(TopicConfig.CLEANUP_POLICY_CONFIG, ""); + List cleanupPolicyValueList = CommonUtils.string2StrList(cleanupPolicyValue); + if (cleanupPolicyValueList.size() == 1 && cleanupPolicyValueList.contains(TopicConfig.CLEANUP_POLICY_DELETE)) { + // 不需要修改 + return Result.buildSuc(new Tuple<>(Boolean.FALSE, cleanupPolicyValue)); + } + + Map changedConfigMap = new HashMap<>(1); + changedConfigMap.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); + + Result rv = topicConfigService.modifyTopicConfig(new KafkaTopicConfigParam(clusterPhyId, topicName, changedConfigMap), operator); + if (rv.failed()) { + // 修改失败 + return Result.buildFromIgnoreData(rv); + } + + return Result.buildSuc(new Tuple<>(Boolean.TRUE, cleanupPolicyValue)); + } + + private Result recoverConfigIfChanged(Long clusterPhyId, String topicName, Boolean changed, String originValue, String operator) { + if (!changed) { + // 没有修改,直接返回 + return Result.buildSuc(); + } + + // 恢复配置 + Map changedConfigMap = new HashMap<>(1); + changedConfigMap.put(TopicConfig.CLEANUP_POLICY_CONFIG, originValue); + + return topicConfigService.modifyTopicConfig(new KafkaTopicConfigParam(clusterPhyId, topicName, changedConfigMap), operator); + } private Seq buildBrokerMetadataSeq(Long clusterPhyId, final List selectedBrokerIdList) { // 选取Broker列表 diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/operaterecord/ModuleEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/operaterecord/ModuleEnum.java index e7fe82c4b..cc338cd21 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/operaterecord/ModuleEnum.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/operaterecord/ModuleEnum.java @@ -2,6 +2,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.xiaojukeji.know.streaming.km.common.annotations.enterprise.EnterpriseLoadReBalance; import com.xiaojukeji.know.streaming.km.common.constant.Constant; import java.util.List; @@ -40,6 +41,9 @@ public enum ModuleEnum { JOB_KAFKA_REPLICA_REASSIGN(110, "Job-KafkaReplica迁移"), + @EnterpriseLoadReBalance + JOB_CLUSTER_BALANCE(111, "Job-ClusterBalance"), + ; ModuleEnum(int code, String desc) { diff --git a/km-console/.env b/km-console/.env index a892cff77..15a0e9193 100644 --- a/km-console/.env +++ b/km-console/.env @@ -1,2 +1,2 @@ -BUSINESS_VERSION='false' +BUSINESS_VERSION='true' PUBLIC_PATH='' diff --git a/km-console/packages/config-manager-fe/src/app.tsx b/km-console/packages/config-manager-fe/src/app.tsx index 2f29654d5..c653b8146 100644 --- a/km-console/packages/config-manager-fe/src/app.tsx +++ b/km-console/packages/config-manager-fe/src/app.tsx @@ -31,12 +31,7 @@ export const { Provider, Consumer } = React.createContext('zh'); const defaultLanguage = 'zh'; -const AppContent = (props: { - getLicenseInfo?: (cbk: (msg: string) => void) => void | undefined; - licenseEventBus?: Record | undefined; -}) => { - const { getLicenseInfo, licenseEventBus } = props; - +const AppContent = (props: any) => { return (
@@ -44,7 +39,7 @@ const AppContent = (props: { { - getLicenseInfo?.((msg) => licenseEventBus?.emit('licenseError', msg)); + // getLicenseInfo?.((msg) => licenseEventBus?.emit('licenseError', msg)); return Promise.resolve(true); }} noMatch={() => } @@ -55,7 +50,6 @@ const AppContent = (props: { }; const App = (props: any) => { - const { getLicenseInfo, licenseEventBus } = props; const intlMessages = _.get(localeMap[defaultLanguage], 'intlMessages', intlZhCN); const locale = _.get(localeMap[defaultLanguage], 'intl', 'zh-CN'); const antdLocale = _.get(localeMap[defaultLanguage], 'dantd', dantdZhCN); @@ -65,7 +59,7 @@ const App = (props: any) => { - + diff --git a/km-console/packages/layout-clusters-fe/src/app.tsx b/km-console/packages/layout-clusters-fe/src/app.tsx index 4df691ebd..5d4d82c18 100755 --- a/km-console/packages/layout-clusters-fe/src/app.tsx +++ b/km-console/packages/layout-clusters-fe/src/app.tsx @@ -73,44 +73,6 @@ const logout = () => { localStorage.removeItem('userInfo'); }; -const LicenseLimitModal = () => { - const [visible, setVisible] = useState(false); - const [msg, setMsg] = useState(''); - - useLayoutEffect(() => { - licenseEventBus.on('licenseError', (desc: string) => { - !visible && setVisible(true); - setMsg(desc); - }); - return () => { - licenseEventBus.removeAll('licenseError'); - }; - }, []); - - return ( - - - 许可证限制 - - } - footer={null} - onCancel={() => setVisible(false)} - > -
-
- {msg},前往帮助文档 -
-
-
- ); -}; - const AppContent = (props: { setlanguage: (language: string) => void }) => { const { pathname } = useLocation(); const history = useHistory(); @@ -186,7 +148,7 @@ const AppContent = (props: { setlanguage: (language: string) => void }) => { }} onMount={(customProps: any) => { judgePage404(); - registerApps(systemsConfig, { ...customProps, getLicenseInfo, licenseEventBus }, () => { + registerApps(systemsConfig, { ...customProps }, () => { // postMessage(); }); }} @@ -207,7 +169,6 @@ const AppContent = (props: { setlanguage: (language: string) => void }) => { }} /> - ); @@ -241,7 +202,6 @@ export default function App(): JSX.Element { - } /> diff --git a/km-console/packages/layout-clusters-fe/src/components/ChartOperateBar/index.tsx b/km-console/packages/layout-clusters-fe/src/components/ChartOperateBar/index.tsx index bfff9b4b5..56837b505 100644 --- a/km-console/packages/layout-clusters-fe/src/components/ChartOperateBar/index.tsx +++ b/km-console/packages/layout-clusters-fe/src/components/ChartOperateBar/index.tsx @@ -33,6 +33,7 @@ interface PropsType { }; onChange: (options: KsHeaderOptions) => void; openMetricFilter: () => void; + setScreenType?: any; } interface ScopeData { @@ -56,12 +57,29 @@ const GRID_SIZE_OPTIONS = [ }, ]; +// connect 筛选逻辑补充 +const CONNECT_OPTIONS = [ + { + label: '全部', + value: 'all', + }, + { + label: 'Cluster', + value: 'Connect', + }, + { + label: 'Connector', + value: 'Connector', + }, +]; + const MetricOperateBar = ({ nodeSelect = {}, hideNodeScope = false, hideGridSelect = false, onChange: onChangeCallback, openMetricFilter, + setScreenType, }: PropsType): JSX.Element => { const [gridNum, setGridNum] = useState(GRID_SIZE_OPTIONS[1].value); const [rangeTime, setRangeTime] = useState<[number, number]>(() => { @@ -139,6 +157,17 @@ const MetricOperateBar = ({
+ {/* connect 单独逻辑 */} + {setScreenType && ( +