Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.xiaojukeji.know.streaming.km.biz.group;

import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupSummaryDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetDeleteDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
Expand Down Expand Up @@ -39,5 +40,7 @@ PaginationResult<GroupTopicConsumedDetailVO> pagingGroupTopicConsumedMetrics(Lon

Result<Void> resetGroupOffsets(GroupOffsetResetDTO dto, String operator) throws Exception;

Result<Void> deleteGroupOffsets(GroupOffsetDeleteDTO dto, String operator) throws Exception;

List<GroupTopicOverviewVO> getGroupTopicOverviewVOList(Long clusterPhyId, List<GroupMemberPO> groupMemberPOList);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.biz.group.GroupManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupSummaryDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetDeleteDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
Expand All @@ -17,6 +18,9 @@
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSMemberDescription;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.GroupMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupTopicParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupTopicPartitionParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
Expand All @@ -32,6 +36,7 @@
import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
Expand All @@ -42,6 +47,7 @@
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.group.GroupMetricService;
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
import com.xiaojukeji.know.streaming.km.core.service.group.OpGroupService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.GroupMetricVersionItems;
Expand All @@ -58,14 +64,17 @@

@Component
public class GroupManagerImpl implements GroupManager {
private static final ILog log = LogFactory.getLog(GroupManagerImpl.class);
private static final ILog LOGGER = LogFactory.getLog(GroupManagerImpl.class);

@Autowired
private TopicService topicService;

@Autowired
private GroupService groupService;

@Autowired
private OpGroupService opGroupService;

@Autowired
private PartitionService partitionService;

Expand Down Expand Up @@ -246,6 +255,52 @@ public Result<Void> resetGroupOffsets(GroupOffsetResetDTO dto, String operator)
return groupService.resetGroupOffsets(dto.getClusterId(), dto.getGroupName(), offsetMapResult.getData(), operator);
}

@Override
public Result<Void> deleteGroupOffsets(GroupOffsetDeleteDTO dto, String operator) throws Exception {
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(dto.getClusterPhyId());
if (clusterPhy == null) {
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(dto.getClusterPhyId()));
}


// 按照group纬度进行删除
if (ValidateUtils.isBlank(dto.getGroupName())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "groupName不允许为空");
}
if (DeleteGroupTypeEnum.GROUP.getCode().equals(dto.getDeleteType())) {
return opGroupService.deleteGroupOffset(
new DeleteGroupParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP),
operator
);
}


// 按照topic纬度进行删除
if (ValidateUtils.isBlank(dto.getTopicName())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "topicName不允许为空");
}
if (DeleteGroupTypeEnum.GROUP_TOPIC.getCode().equals(dto.getDeleteType())) {
return opGroupService.deleteGroupOffset(
new DeleteGroupTopicParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP, dto.getTopicName()),
operator
);
}


// 按照partition纬度进行删除
if (ValidateUtils.isNullOrLessThanZero(dto.getPartitionId())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "partitionId不允许为空或小于0");
}
if (DeleteGroupTypeEnum.GROUP_TOPIC_PARTITION.getCode().equals(dto.getDeleteType())) {
return opGroupService.deleteGroupOffset(
new DeleteGroupTopicPartitionParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP, dto.getTopicName(), dto.getPartitionId()),
operator
);
}

return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "deleteType类型错误");
}

@Override
public List<GroupTopicOverviewVO> getGroupTopicOverviewVOList(Long clusterPhyId, List<GroupMemberPO> groupMemberPOList) {
// 获取指标
Expand All @@ -257,7 +312,7 @@ public List<GroupTopicOverviewVO> getGroupTopicOverviewVOList(Long clusterPhyId,
);
if (metricsListResult.failed()) {
// 如果查询失败,则输出错误信息,但是依旧进行已有数据的返回
log.error("method=completeMetricData||clusterPhyId={}||result={}||errMsg=search es failed", clusterPhyId, metricsListResult);
LOGGER.error("method=completeMetricData||clusterPhyId={}||result={}||errMsg=search es failed", clusterPhyId, metricsListResult);
}
return this.convert2GroupTopicOverviewVOList(groupMemberPOList, metricsListResult.getData());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.xiaojukeji.know.streaming.km.common.bean.dto.group;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.xiaojukeji.know.streaming.km.common.bean.dto.BaseDTO;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import javax.validation.constraints.Min;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;

/**
* 删除offset
* @author zengqiao
* @date 19/4/8
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class GroupOffsetDeleteDTO extends BaseDTO {
@Min(value = 0, message = "clusterPhyId不允许为null或者小于0")
@ApiModelProperty(value = "集群ID", example = "6")
private Long clusterPhyId;

@NotBlank(message = "groupName不允许为空")
@ApiModelProperty(value = "消费组名称", example = "g-know-streaming")
private String groupName;

@ApiModelProperty(value = "Topic名称,按照Topic纬度进行删除时需要传", example = "know-streaming")
protected String topicName;

@ApiModelProperty(value = "分区ID,按照分区纬度进行删除时需要传")
private Integer partitionId;

/**
* @see com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum
*/
@NotNull(message = "deleteType不允许为空")
@ApiModelProperty(value = "删除类型", example = "0:group纬度,1:Topic纬度,2:Partition纬度")
private Integer deleteType;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.group;

import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public class DeleteGroupParam extends GroupParam {
protected DeleteGroupTypeEnum deleteGroupTypeEnum;

public DeleteGroupParam(Long clusterPhyId, String groupName, DeleteGroupTypeEnum deleteGroupTypeEnum) {
super(clusterPhyId, groupName);
this.deleteGroupTypeEnum = deleteGroupTypeEnum;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.group;

import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public class DeleteGroupTopicParam extends DeleteGroupParam {
protected String topicName;

public DeleteGroupTopicParam(Long clusterPhyId, String groupName, DeleteGroupTypeEnum deleteGroupTypeEnum, String topicName) {
super(clusterPhyId, groupName, deleteGroupTypeEnum);
this.topicName = topicName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.group;

import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public class DeleteGroupTopicPartitionParam extends DeleteGroupTopicParam {
protected Integer partitionId;

public DeleteGroupTopicPartitionParam(Long clusterPhyId, String groupName, DeleteGroupTypeEnum deleteGroupTypeEnum, String topicName, Integer partitionId) {
super(clusterPhyId, groupName, deleteGroupTypeEnum, topicName);
this.partitionId = partitionId;
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.group;

import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class GroupParam extends ClusterPhyParam {
protected String groupName;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.xiaojukeji.know.streaming.km.common.enums.group;

import lombok.Getter;


/**
* @author wyb
* @date 2022/10/11
*/
@Getter
public enum DeleteGroupTypeEnum {
UNKNOWN(-1, "Unknown"),

GROUP(0, "Group纬度"),

GROUP_TOPIC(1, "GroupTopic纬度"),

GROUP_TOPIC_PARTITION(2, "GroupTopicPartition纬度");

private final Integer code;

private final String msg;

DeleteGroupTypeEnum(Integer code, String msg) {
this.code = code;
this.msg = msg;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public enum VersionItemTypeEnum {

SERVICE_OP_REASSIGNMENT(330, "service_reassign_operation"),

SERVICE_OP_GROUP(340, "service_group_operation"),

SERVICE_OP_CONNECT_CLUSTER(400, "service_connect_cluster_operation"),
SERVICE_OP_CONNECT_CONNECTOR(401, "service_connect_connector_operation"),
SERVICE_OP_CONNECT_PLUGIN(402, "service_connect_plugin_operation"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.xiaojukeji.know.streaming.km.core.service.group;

import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;

public interface OpGroupService {
/**
* 删除Topic
*/
Result<Void> deleteGroupOffset(DeleteGroupParam param, String operator);
}
Loading