Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ public Result<Void> updateConnectorConfig(Long connectClusterId, String connecto

@Override
public Result<Void> createConnector(ConnectorCreateDTO dto, String operator) {
dto.getConfigs().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());
dto.getSuitableConfig().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());

Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator);
Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
if (createResult.failed()) {
return Result.buildFromIgnoreData(createResult);
}
Expand All @@ -67,9 +67,9 @@ public Result<Void> createConnector(ConnectorCreateDTO dto, String operator) {

@Override
public Result<Void> createConnector(ConnectorCreateDTO dto, String heartbeatName, String checkpointName, String operator) {
dto.getConfigs().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());
dto.getSuitableConfig().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());

Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator);
Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
if (createResult.failed()) {
return Result.buildFromIgnoreData(createResult);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,17 @@ public Result<Void> createMirrorMaker(MirrorMakerCreateDTO dto, String operator)
} else if (checkpointResult.failed() && checkpointResult.failed()) {
return Result.buildFromRSAndMsg(
ResultStatus.KAFKA_CONNECTOR_OPERATE_FAILED,
String.format("创建 checkpoint & heartbeat 失败.\n失败信息分别为:%s\n\n%s", checkpointResult.getMessage(), heartbeatResult.getMessage())
String.format("创建 checkpoint & heartbeat 失败.%n失败信息分别为:%s%n%n%s", checkpointResult.getMessage(), heartbeatResult.getMessage())
);
} else if (checkpointResult.failed()) {
return Result.buildFromRSAndMsg(
ResultStatus.KAFKA_CONNECTOR_OPERATE_FAILED,
String.format("创建 checkpoint 失败.\n失败信息分别为:%s", checkpointResult.getMessage())
String.format("创建 checkpoint 失败.%n失败信息分别为:%s", checkpointResult.getMessage())
);
} else{
return Result.buildFromRSAndMsg(
ResultStatus.KAFKA_CONNECTOR_OPERATE_FAILED,
String.format("创建 heartbeat 失败.\n失败信息分别为:%s", heartbeatResult.getMessage())
String.format("创建 heartbeat 失败.%n失败信息分别为:%s", heartbeatResult.getMessage())
);
}
}
Expand Down Expand Up @@ -194,7 +194,7 @@ public Result<Void> modifyMirrorMakerConfig(MirrorMakerCreateDTO dto, String ope
return rv;
}

return connectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator);
return connectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
}

@Override
Expand Down Expand Up @@ -426,7 +426,7 @@ public Result<List<Properties>> getMM2Configs(Long connectClusterId, String conn
public Result<List<ConnectConfigInfosVO>> validateConnectors(MirrorMakerCreateDTO dto) {
List<ConnectConfigInfosVO> voList = new ArrayList<>();

Result<ConnectConfigInfos> infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getConfigs());
Result<ConnectConfigInfos> infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getSuitableConfig());
if (infoResult.failed()) {
return Result.buildFromIgnoreData(infoResult);
}
Expand Down Expand Up @@ -480,11 +480,11 @@ public Result<Void> checkCreateMirrorMakerParamAndUnifyData(MirrorMakerCreateDTO
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(connectCluster.getKafkaClusterPhyId()));
}

if (!dto.getConfigs().containsKey(CONNECTOR_CLASS_FILED_NAME)) {
if (!dto.getSuitableConfig().containsKey(CONNECTOR_CLASS_FILED_NAME)) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "SourceConnector缺少connector.class");
}

if (!MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(dto.getConfigs().getProperty(CONNECTOR_CLASS_FILED_NAME))) {
if (!MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(dto.getSuitableConfig().getProperty(CONNECTOR_CLASS_FILED_NAME))) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "SourceConnector的connector.class类型错误");
}

Expand Down Expand Up @@ -589,9 +589,7 @@ public static List<ClusterMirrorMakerOverviewVO> supplyData2ClusterMirrorMakerOv
}
}

voList.forEach(elem -> {
elem.setMetricLines(metricLineMap.get(elem.getConnectClusterId() + "#" + elem.getConnectorName()));
});
voList.forEach(elem -> elem.setMetricLines(metricLineMap.get(elem.getConnectClusterId() + "#" + elem.getConnectorName())));

return voList;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,36 @@
package com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.ClusterConnectorDTO;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.validation.constraints.NotNull;
import java.util.Properties;

/**
* @author zengqiao
* @date 2022-10-17
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@NoArgsConstructor
@ApiModel(description = "创建Connector")
public class ConnectorCreateDTO extends ClusterConnectorDTO {
@NotNull(message = "configs不允许为空")
@ApiModelProperty(value = "配置", example = "")
@Deprecated
@ApiModelProperty(value = "配置, 优先使用config字段,3.5.0版本将删除该字段", example = "")
protected Properties configs;

public ConnectorCreateDTO(Long connectClusterId, String connectorName, Properties configs) {
@ApiModelProperty(value = "配置", example = "")
protected Properties config;

public ConnectorCreateDTO(Long connectClusterId, String connectorName, Properties config) {
super(connectClusterId, connectorName);
this.configs = configs;
this.config = config;
}

public Properties getSuitableConfig() {
return config != null? config: configs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void unifyData(Long sourceKafkaClusterId, String sourceBootstrapServers,
targetKafkaProps = new Properties();
}

this.unifyData(this.configs, sourceKafkaClusterId, sourceBootstrapServers, sourceKafkaProps, targetKafkaClusterId, targetBootstrapServers, targetKafkaProps);
this.unifyData(this.getSuitableConfig(), sourceKafkaClusterId, sourceBootstrapServers, sourceKafkaProps, targetKafkaClusterId, targetBootstrapServers, targetKafkaProps);

if (heartbeatConnectorConfigs != null) {
this.unifyData(this.heartbeatConnectorConfigs, sourceKafkaClusterId, sourceBootstrapServers, sourceKafkaProps, targetKafkaClusterId, targetBootstrapServers, targetKafkaProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
@Data
@ApiModel(description = "集群MM2状态信息")
public class MirrorMakerBaseStateVO extends BaseVO {

@ApiModelProperty(value = "worker数", example = "1")
private Integer workerCount;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectActionEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService;
import io.swagger.annotations.Api;
Expand Down Expand Up @@ -44,6 +45,10 @@ public class KafkaConnectorController {
@PostMapping(value = "connectors")
@ResponseBody
public Result<Void> createConnector(@Validated @RequestBody ConnectorCreateDTO dto) {
if (ValidateUtils.isNull(dto.getSuitableConfig())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "config字段不能为空");
}

return connectorManager.createConnector(dto, HttpRequestUtil.getOperator());
}

Expand Down Expand Up @@ -73,14 +78,27 @@ public Result<Void> operateConnectors(@Validated @RequestBody ConnectorActionDTO
@PutMapping(value ="connectors-config")
@ResponseBody
public Result<Void> modifyConnectors(@Validated @RequestBody ConnectorCreateDTO dto) {
return connectorManager.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), HttpRequestUtil.getOperator());
if (ValidateUtils.isNull(dto.getSuitableConfig())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "config字段不能为空");
}

return connectorManager.updateConnectorConfig(
dto.getConnectClusterId(),
dto.getConnectorName(),
dto.getSuitableConfig(),
HttpRequestUtil.getOperator()
);
}

@ApiOperation(value = "校验Connector配置", notes = "")
@PutMapping(value ="connectors-config/validate")
@ResponseBody
public Result<ConnectConfigInfosVO> validateConnectors(@Validated @RequestBody ConnectorCreateDTO dto) {
Result<ConnectConfigInfos> infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getConfigs());
if (ValidateUtils.isNull(dto.getSuitableConfig())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "config字段不能为空");
}

Result<ConnectConfigInfos> infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getSuitableConfig());
if (infoResult.failed()) {
return Result.buildFromIgnoreData(infoResult);
}
Expand Down