Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions km-biz/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
<artifactId>km-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.xiaojukeji.kafka</groupId>
<artifactId>km-rebalance</artifactId>
<version>${project.parent.version}</version>
</dependency>

<!-- spring -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyBaseVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyDashboardVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.converter.ClusterVOConverter;
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthStateEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
Expand All @@ -24,6 +25,10 @@
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ClusterMetricVersionItems;
import com.xiaojukeji.know.streaming.km.rebalance.algorithm.model.Resource;
import com.xiaojukeji.know.streaming.km.rebalance.common.BalanceMetricConstant;
import com.xiaojukeji.know.streaming.km.rebalance.common.bean.entity.ClusterBalanceItemState;
import com.xiaojukeji.know.streaming.km.rebalance.core.service.ClusterBalanceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

Expand All @@ -40,6 +45,9 @@ public class MultiClusterPhyManagerImpl implements MultiClusterPhyManager {
@Autowired
private ClusterMetricService clusterMetricService;

@Autowired
private ClusterBalanceService clusterBalanceService;

@Override
public ClusterPhysState getClusterPhysState() {
List<ClusterPhy> clusterPhyList = clusterPhyService.listAllClusters();
Expand Down Expand Up @@ -153,6 +161,11 @@ private PaginationResult<ClusterMetrics> getAndPagingClusterWithLatestMetricsFro
ClusterMetrics clusterMetrics = clusterMetricService.getLatestMetricsFromCache(vo.getId());
clusterMetrics.getMetrics().putIfAbsent(ClusterMetricVersionItems.CLUSTER_METRIC_HEALTH_STATE, (float) HealthStateEnum.UNKNOWN.getDimension());

Result<ClusterMetrics> balanceMetricsResult = this.getClusterLoadReBalanceInfo(vo.getId());
if (balanceMetricsResult.hasData()) {
clusterMetrics.putMetric(balanceMetricsResult.getData().getMetrics());
}

metricsList.add(clusterMetrics);
}

Expand All @@ -174,4 +187,21 @@ private MetricsClusterPhyDTO buildMetricsClusterPhyDTO(List<Long> clusterIdList,
dto.setClusterPhyIds(clusterIdList);
return dto;
}

private Result<ClusterMetrics> getClusterLoadReBalanceInfo(Long clusterPhyId) {
Result<ClusterBalanceItemState> stateResult = clusterBalanceService.getItemStateFromCacheFirst(clusterPhyId);
if (stateResult.failed()) {
return Result.buildFromIgnoreData(stateResult);
}

ClusterBalanceItemState state = stateResult.getData();

ClusterMetrics metric = ClusterMetrics.initWithMetrics(clusterPhyId, BalanceMetricConstant.CLUSTER_METRIC_LOAD_RE_BALANCE_ENABLE, state.getEnable()? Constant.YES: Constant.NO);
metric.putMetric(BalanceMetricConstant.CLUSTER_METRIC_LOAD_RE_BALANCE_CPU, state.getResItemState(Resource.CPU).floatValue());
metric.putMetric(BalanceMetricConstant.CLUSTER_METRIC_LOAD_RE_BALANCE_NW_IN, state.getResItemState(Resource.NW_IN).floatValue());
metric.putMetric(BalanceMetricConstant.CLUSTER_METRIC_LOAD_RE_BALANCE_NW_OUT, state.getResItemState(Resource.NW_OUT).floatValue());
metric.putMetric(BalanceMetricConstant.CLUSTER_METRIC_LOAD_RE_BALANCE_DISK, state.getResItemState(Resource.DISK).floatValue());

return Result.buildSuc(metric);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicExpansionDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.config.KafkaTopicConfigParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam;
Expand All @@ -17,17 +18,17 @@
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.common.utils.*;
import com.xiaojukeji.know.streaming.km.common.utils.kafka.KafkaReplicaAssignUtil;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.topic.OpTopicService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import kafka.admin.AdminUtils;
import kafka.admin.BrokerMetadata;
import org.apache.kafka.common.config.TopicConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
Expand Down Expand Up @@ -61,6 +62,9 @@ public class OpTopicManagerImpl implements OpTopicManager {
@Autowired
private PartitionService partitionService;

@Autowired
private TopicConfigService topicConfigService;

@Override
public Result<Void> createTopic(TopicCreateDTO dto, String operator) {
log.info("method=createTopic||param={}||operator={}.", dto, operator);
Expand Down Expand Up @@ -160,17 +164,72 @@ public Result<Void> expandTopic(TopicExpansionDTO dto, String operator) {

@Override
public Result<Void> truncateTopic(Long clusterPhyId, String topicName, String operator) {
// 增加delete配置
Result<Tuple<Boolean, String>> rt = this.addDeleteConfigIfNotExist(clusterPhyId, topicName, operator);
if (rt.failed()) {
log.error("method=truncateTopic||clusterPhyId={}||topicName={}||operator={}||result={}||msg=get config from kafka failed", clusterPhyId, topicName, operator, rt);
return Result.buildFromIgnoreData(rt);
}

// 清空Topic
Result<Void> rv = opTopicService.truncateTopic(new TopicTruncateParam(clusterPhyId, topicName, KafkaConstant.TOPICK_TRUNCATE_DEFAULT_OFFSET), operator);
if (rv.failed()) {
return rv;
log.error("method=truncateTopic||clusterPhyId={}||topicName={}||originConfig={}||operator={}||result={}||msg=truncate topic failed", clusterPhyId, topicName, rt.getData().v2(), operator, rv);
// config被修改了,则错误提示需要提醒一下,否则直接返回错误
return rt.getData().v1() ? Result.buildFailure(rv.getCode(), rv.getMessage() + "\t\n" + String.format("Topic的CleanupPolicy已被修改,需要手动恢复为%s", rt.getData().v2())) : rv;
}

// 恢复compact配置
rv = this.recoverConfigIfChanged(clusterPhyId, topicName, rt.getData().v1(), rt.getData().v2(), operator);
if (rv.failed()) {
log.error("method=truncateTopic||clusterPhyId={}||topicName={}||originConfig={}||operator={}||result={}||msg=truncate topic success but recover config failed", clusterPhyId, topicName, rt.getData().v2(), operator, rv);
// config被修改了,则错误提示需要提醒一下,否则直接返回错误
return Result.buildFailure(rv.getCode(), String.format("Topic清空操作已成功,但是恢复CleanupPolicy配置失败,需要手动恢复为%s。", rt.getData().v2()) + "\t\n" + rv.getMessage());
}

return Result.buildSuc();
}

/**************************************************** private method ****************************************************/

private Result<Tuple<Boolean, String>> addDeleteConfigIfNotExist(Long clusterPhyId, String topicName, String operator) {
// 获取Topic配置
Result<Map<String, String>> configMapResult = topicConfigService.getTopicConfigFromKafka(clusterPhyId, topicName);
if (configMapResult.failed()) {
return Result.buildFromIgnoreData(configMapResult);
}

String cleanupPolicyValue = configMapResult.getData().getOrDefault(TopicConfig.CLEANUP_POLICY_CONFIG, "");
List<String> cleanupPolicyValueList = CommonUtils.string2StrList(cleanupPolicyValue);
if (cleanupPolicyValueList.size() == 1 && cleanupPolicyValueList.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
// 不需要修改
return Result.buildSuc(new Tuple<>(Boolean.FALSE, cleanupPolicyValue));
}

Map<String, String> changedConfigMap = new HashMap<>(1);
changedConfigMap.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);

Result<Void> rv = topicConfigService.modifyTopicConfig(new KafkaTopicConfigParam(clusterPhyId, topicName, changedConfigMap), operator);
if (rv.failed()) {
// 修改失败
return Result.buildFromIgnoreData(rv);
}

return Result.buildSuc(new Tuple<>(Boolean.TRUE, cleanupPolicyValue));
}

private Result<Void> recoverConfigIfChanged(Long clusterPhyId, String topicName, Boolean changed, String originValue, String operator) {
if (!changed) {
// 没有修改,直接返回
return Result.buildSuc();
}

// 恢复配置
Map<String, String> changedConfigMap = new HashMap<>(1);
changedConfigMap.put(TopicConfig.CLEANUP_POLICY_CONFIG, originValue);

return topicConfigService.modifyTopicConfig(new KafkaTopicConfigParam(clusterPhyId, topicName, changedConfigMap), operator);
}

private Seq<BrokerMetadata> buildBrokerMetadataSeq(Long clusterPhyId, final List<Integer> selectedBrokerIdList) {
// 选取Broker列表
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.xiaojukeji.know.streaming.km.common.annotations.enterprise.EnterpriseLoadReBalance;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;

import java.util.List;
Expand Down Expand Up @@ -40,6 +41,9 @@ public enum ModuleEnum {

JOB_KAFKA_REPLICA_REASSIGN(110, "Job-KafkaReplica迁移"),

@EnterpriseLoadReBalance
JOB_CLUSTER_BALANCE(111, "Job-ClusterBalance"),

;

ModuleEnum(int code, String desc) {
Expand Down
2 changes: 1 addition & 1 deletion km-console/.env
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
BUSINESS_VERSION='false'
BUSINESS_VERSION='true'
PUBLIC_PATH=''
12 changes: 3 additions & 9 deletions km-console/packages/config-manager-fe/src/app.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,15 @@ export const { Provider, Consumer } = React.createContext('zh');

const defaultLanguage = 'zh';

const AppContent = (props: {
getLicenseInfo?: (cbk: (msg: string) => void) => void | undefined;
licenseEventBus?: Record<string, any> | undefined;
}) => {
const { getLicenseInfo, licenseEventBus } = props;

const AppContent = (props: any) => {
return (
<div className="config-system">
<DProLayout.Sider prefixCls={'dcd-two-columns'} width={200} theme={'light'} systemKey={systemKey} menuConf={leftMenus} />
<DProLayout.Content>
<RouteGuard
routeList={pageRoutes}
beforeEach={() => {
getLicenseInfo?.((msg) => licenseEventBus?.emit('licenseError', msg));
// getLicenseInfo?.((msg) => licenseEventBus?.emit('licenseError', msg));
return Promise.resolve(true);
}}
noMatch={() => <Redirect to="/404" />}
Expand All @@ -55,7 +50,6 @@ const AppContent = (props: {
};

const App = (props: any) => {
const { getLicenseInfo, licenseEventBus } = props;
const intlMessages = _.get(localeMap[defaultLanguage], 'intlMessages', intlZhCN);
const locale = _.get(localeMap[defaultLanguage], 'intl', 'zh-CN');
const antdLocale = _.get(localeMap[defaultLanguage], 'dantd', dantdZhCN);
Expand All @@ -65,7 +59,7 @@ const App = (props: any) => {
<AppContainer intlProvider={{ locale, messages: intlMessages }} antdProvider={{ locale: antdLocale }}>
<Router basename={systemKey}>
<Switch>
<AppContent getLicenseInfo={getLicenseInfo} licenseEventBus={licenseEventBus} />
<AppContent />
</Switch>
</Router>
</AppContainer>
Expand Down
42 changes: 1 addition & 41 deletions km-console/packages/layout-clusters-fe/src/app.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -73,44 +73,6 @@ const logout = () => {
localStorage.removeItem('userInfo');
};

const LicenseLimitModal = () => {
const [visible, setVisible] = useState<boolean>(false);
const [msg, setMsg] = useState<string>('');

useLayoutEffect(() => {
licenseEventBus.on('licenseError', (desc: string) => {
!visible && setVisible(true);
setMsg(desc);
});
return () => {
licenseEventBus.removeAll('licenseError');
};
}, []);

return (
<Modal
visible={visible}
centered={true}
width={400}
zIndex={10001}
title={
<>
<IconFont type="icon-yichang" style={{ marginRight: 10, fontSize: 18 }} />
许可证限制
</>
}
footer={null}
onCancel={() => setVisible(false)}
>
<div style={{ margin: '0 28px', lineHeight: '24px' }}>
<div>
{msg},<a>前往帮助文档</a>
</div>
</div>
</Modal>
);
};

const AppContent = (props: { setlanguage: (language: string) => void }) => {
const { pathname } = useLocation();
const history = useHistory();
Expand Down Expand Up @@ -186,7 +148,7 @@ const AppContent = (props: { setlanguage: (language: string) => void }) => {
}}
onMount={(customProps: any) => {
judgePage404();
registerApps(systemsConfig, { ...customProps, getLicenseInfo, licenseEventBus }, () => {
registerApps(systemsConfig, { ...customProps }, () => {
// postMessage();
});
}}
Expand All @@ -207,7 +169,6 @@ const AppContent = (props: { setlanguage: (language: string) => void }) => {
}}
/>
</Switch>
<LicenseLimitModal />
</>
</DProLayout.Container>
);
Expand Down Expand Up @@ -241,7 +202,6 @@ export default function App(): JSX.Element {
<BrowserRouter basename="">
<Switch>
<Route path="/login" component={Login} />
<Route path="/no-license" exact component={NoLicense} />
<Route render={() => <AppContent setlanguage={setlanguage} />} />
</Switch>
</BrowserRouter>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ interface PropsType {
};
onChange: (options: KsHeaderOptions) => void;
openMetricFilter: () => void;
setScreenType?: any;
}

interface ScopeData {
Expand All @@ -56,12 +57,29 @@ const GRID_SIZE_OPTIONS = [
},
];

// connect 筛选逻辑补充
const CONNECT_OPTIONS = [
{
label: '全部',
value: 'all',
},
{
label: 'Cluster',
value: 'Connect',
},
{
label: 'Connector',
value: 'Connector',
},
];

const MetricOperateBar = ({
nodeSelect = {},
hideNodeScope = false,
hideGridSelect = false,
onChange: onChangeCallback,
openMetricFilter,
setScreenType,
}: PropsType): JSX.Element => {
const [gridNum, setGridNum] = useState<number>(GRID_SIZE_OPTIONS[1].value);
const [rangeTime, setRangeTime] = useState<[number, number]>(() => {
Expand Down Expand Up @@ -139,6 +157,17 @@ const MetricOperateBar = ({
<DRangeTime timeChange={timeChange} rangeTimeArr={rangeTime} />
</div>
<div className="header-right">
{/* connect 单独逻辑 */}
{setScreenType && (
<Select
style={{ width: 120, marginRight: 10 }}
defaultValue="all"
options={CONNECT_OPTIONS}
onChange={(e) => {
setScreenType(e);
}}
/>
)}
{/* 节点范围 */}
{!hideNodeScope && (
<NodeSelect name={nodeSelect.name || ''} onChange={nodeScopeChange}>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ const ChartList = (props: ChartListProps) => {
const { metricName, metricType, metricUnit, metricLines, showLegend } = data;

return (
<div key={metricName} className="dashboard-drag-item-box">
<div key={metricName + metricType} className="dashboard-drag-item-box">
<div className="dashboard-drag-item-box-title">
<Tooltip
placement="topLeft"
Expand Down
Loading