Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.connect;

import com.xiaojukeji.know.streaming.km.common.bean.entity.EntityIdInterface;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import lombok.Data;

import java.io.Serializable;
Expand Down Expand Up @@ -54,6 +55,22 @@ public class ConnectCluster implements Serializable, Comparable<ConnectCluster>,
*/
private String clusterUrl;

public String getSuitableRequestUrl() {
// 优先使用用户填写的url
String suitableRequestUrl = this.clusterUrl;
if (ValidateUtils.isBlank(suitableRequestUrl)) {
// 用户如果没有填写,则使用元信息中的url
suitableRequestUrl = this.memberLeaderUrl;
}

//url去斜杠
if (suitableRequestUrl.length() > 0 && suitableRequestUrl.charAt(suitableRequestUrl.length() - 1) == '/') {
return suitableRequestUrl.substring(0, suitableRequestUrl.length() - 1);
}

return suitableRequestUrl;
}

@Override
public int compareTo(ConnectCluster connectCluster) {
return this.id.compareTo(connectCluster.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class ConnectClusterPO extends BasePO {
private Integer state;

/**
* 集群地址
* 用户填写的集群地址
*/
private String clusterUrl;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ public class ConnectClusterServiceImpl implements ConnectClusterService {

@Override
public Long replaceAndReturnIdInDB(ConnectClusterMetadata metadata) {
//url去斜杠
String clusterUrl = metadata.getMemberLeaderUrl();
if (clusterUrl.charAt(clusterUrl.length() - 1) == '/') {
clusterUrl = clusterUrl.substring(0, clusterUrl.length() - 1);
}

ConnectClusterPO oldPO = this.getPOFromDB(metadata.getKafkaClusterPhyId(), metadata.getGroupName());
if (oldPO == null) {
oldPO = new ConnectClusterPO();
Expand All @@ -54,7 +48,7 @@ public Long replaceAndReturnIdInDB(ConnectClusterMetadata metadata) {
oldPO.setName(metadata.getGroupName());
oldPO.setState(metadata.getState().getCode());
oldPO.setMemberLeaderUrl(metadata.getMemberLeaderUrl());
oldPO.setClusterUrl(clusterUrl);
oldPO.setClusterUrl("");
oldPO.setVersion(KafkaConstant.DEFAULT_CONNECT_VERSION);
connectClusterDAO.insert(oldPO);

Expand All @@ -69,11 +63,11 @@ public Long replaceAndReturnIdInDB(ConnectClusterMetadata metadata) {
if (ValidateUtils.isBlank(oldPO.getVersion())) {
oldPO.setVersion(KafkaConstant.DEFAULT_CONNECT_VERSION);
}
if (!ValidateUtils.isBlank(clusterUrl)) {
oldPO.setClusterUrl(clusterUrl);
if (ValidateUtils.isNull(oldPO.getClusterUrl())) {
oldPO.setClusterUrl("");
}
connectClusterDAO.updateById(oldPO);

connectClusterDAO.updateById(oldPO);
return oldPO.getId();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public Result<KSConnectorInfo> createConnector(Long connectClusterId, String con
props.put("config", configs);

ConnectorInfo connectorInfo = restTool.postObjectWithJsonContent(
connectCluster.getClusterUrl() + CREATE_CONNECTOR_URI,
connectCluster.getSuitableRequestUrl() + CREATE_CONNECTOR_URI,
props,
ConnectorInfo.class
);
Expand Down Expand Up @@ -127,7 +127,7 @@ public Result<List<String>> listConnectorsFromCluster(Long connectClusterId) {
}

List<String> nameList = restTool.getArrayObjectWithJsonContent(
connectCluster.getClusterUrl() + LIST_CONNECTORS_URI,
connectCluster.getSuitableRequestUrl() + LIST_CONNECTORS_URI,
new HashMap<>(),
String.class
);
Expand Down Expand Up @@ -224,7 +224,7 @@ public Result<Void> resumeConnector(Long connectClusterId, String connectorName,
}

restTool.putJsonForObject(
connectCluster.getClusterUrl() + String.format(RESUME_CONNECTOR_URI, connectorName),
connectCluster.getSuitableRequestUrl() + String.format(RESUME_CONNECTOR_URI, connectorName),
new HashMap<>(),
String.class
);
Expand Down Expand Up @@ -259,7 +259,7 @@ public Result<Void> restartConnector(Long connectClusterId, String connectorName
}

restTool.postObjectWithJsonContent(
connectCluster.getClusterUrl() + String.format(RESTART_CONNECTOR_URI, connectorName),
connectCluster.getSuitableRequestUrl() + String.format(RESTART_CONNECTOR_URI, connectorName),
new HashMap<>(),
String.class
);
Expand Down Expand Up @@ -294,7 +294,7 @@ public Result<Void> stopConnector(Long connectClusterId, String connectorName, S
}

restTool.putJsonForObject(
connectCluster.getClusterUrl() + String.format(PAUSE_CONNECTOR_URI, connectorName),
connectCluster.getSuitableRequestUrl() + String.format(PAUSE_CONNECTOR_URI, connectorName),
new HashMap<>(),
String.class
);
Expand Down Expand Up @@ -329,7 +329,7 @@ public Result<Void> deleteConnector(Long connectClusterId, String connectorName,
}

restTool.deleteWithParamsAndHeader(
connectCluster.getClusterUrl() + String.format(DELETE_CONNECTOR_URI, connectorName),
connectCluster.getSuitableRequestUrl() + String.format(DELETE_CONNECTOR_URI, connectorName),
new HashMap<>(),
new HashMap<>(),
String.class
Expand Down Expand Up @@ -365,7 +365,7 @@ public Result<Void> updateConnectorConfig(Long connectClusterId, String connecto
}

ConnectorInfo connectorInfo = restTool.putJsonForObject(
connectCluster.getClusterUrl() + String.format(UPDATE_CONNECTOR_CONFIG_URI, connectorName),
connectCluster.getSuitableRequestUrl() + String.format(UPDATE_CONNECTOR_CONFIG_URI, connectorName),
configs,
org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo.class
);
Expand Down Expand Up @@ -532,7 +532,7 @@ private int deleteConnectorInDB(Long connectClusterId, String connectorName) {
private Result<KSConnectorInfo> getConnectorInfoFromCluster(ConnectCluster connectCluster, String connectorName) {
try {
ConnectorInfo connectorInfo = restTool.getForObject(
connectCluster.getClusterUrl() + GET_CONNECTOR_INFO_PREFIX_URI + "/" + connectorName,
connectCluster.getSuitableRequestUrl() + GET_CONNECTOR_INFO_PREFIX_URI + "/" + connectorName,
new HashMap<>(),
ConnectorInfo.class
);
Expand All @@ -558,7 +558,7 @@ private Result<KSConnectorInfo> getConnectorInfoFromCluster(ConnectCluster conne
private Result<List<String>> getConnectorTopicsFromCluster(ConnectCluster connectCluster, String connectorName) {
try {
Properties properties = restTool.getForObject(
connectCluster.getClusterUrl() + String.format(GET_CONNECTOR_TOPICS_URI, connectorName),
connectCluster.getSuitableRequestUrl() + String.format(GET_CONNECTOR_TOPICS_URI, connectorName),
new HashMap<>(),
Properties.class
);
Expand All @@ -578,7 +578,7 @@ private Result<List<String>> getConnectorTopicsFromCluster(ConnectCluster connec
private Result<KSConnectorStateInfo> getConnectorStateInfoFromCluster(ConnectCluster connectCluster, String connectorName) {
try {
KSConnectorStateInfo connectorStateInfo = restTool.getForObject(
connectCluster.getClusterUrl() + String.format(GET_CONNECTOR_STATUS_URI, connectorName),
connectCluster.getSuitableRequestUrl() + String.format(GET_CONNECTOR_STATUS_URI, connectorName),
new HashMap<>(),
KSConnectorStateInfo.class
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public Result<ConnectConfigInfos> validateConfig(Long connectClusterId, Properti

// 通过参数检查接口,获取插件配置
ConfigInfos configInfos = restTool.putJsonForObject(
connectCluster.getClusterUrl() + String.format(GET_PLUGIN_CONFIG_DESC_URI, props.getProperty(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME)),
connectCluster.getSuitableRequestUrl() + String.format(GET_PLUGIN_CONFIG_DESC_URI, props.getProperty(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME)),
props,
ConfigInfos.class
);
Expand Down Expand Up @@ -94,7 +94,7 @@ public Result<List<ConnectPluginBasic>> listPluginsFromCluster(Long connectClust

// 通过参数检查接口,获取插件配置
List<ConnectPluginBasic> pluginList = restTool.getArrayObjectWithJsonContent(
connectCluster.getClusterUrl() + GET_ALL_PLUGINS_URI,
connectCluster.getSuitableRequestUrl() + GET_ALL_PLUGINS_URI,
new HashMap<>(),
ConnectPluginBasic.class
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public Result<Void> actionTask(TaskActionDTO dto) {
return Result.buildFailure(ResultStatus.NOT_EXIST);
}

String url = String.format(RESTART_TASK_URI, connectCluster.getClusterUrl(), dto.getConnectorName(), dto.getTaskId());
String url = String.format(RESTART_TASK_URI, connectCluster.getSuitableRequestUrl(), dto.getConnectorName(), dto.getTaskId());
try {
restTool.postObjectWithJsonContent(url, null, String.class);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.jmx.JmxEnum;
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService;
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
import com.xiaojukeji.know.streaming.km.persistence.connect.cache.LoadedConnectClusterCache;
Expand All @@ -47,9 +46,6 @@ public class SyncConnectClusterAndWorkerTask extends AbstractAsyncMetadataDispat
@Autowired
private WorkerService workerService;

@Autowired
private WorkerConnectorService workerConnectorService;

@Autowired
private ConnectClusterService connectClusterService;

Expand All @@ -60,7 +56,6 @@ public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnit
//获取connect集群
List<Group> groupList = groupService.listClusterGroups(clusterPhy.getId()).stream().filter(elem->elem.getType()==GroupTypeEnum.CONNECT_CLUSTER).collect(Collectors.toList());
for (Group group: groupList) {

try {
KSGroupDescription ksGroupDescription = groupService.getGroupDescriptionFromKafka(clusterPhy, group.getName());
if (!ksGroupDescription.protocolType().equals(CONNECT_CLUSTER_PROTOCOL_TYPE)) {
Expand Down