Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,9 @@ public interface OpTopicManager {
* 扩分区
*/
Result<Void> expandTopic(TopicExpansionDTO dto, String operator);

/**
* 清空Topic
*/
Result<Void> truncateTopic(Long clusterPhyId, String topicName, String operator);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicTruncateParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
Expand Down Expand Up @@ -156,6 +158,16 @@ public Result<Void> expandTopic(TopicExpansionDTO dto, String operator) {
return rv;
}

@Override
public Result<Void> truncateTopic(Long clusterPhyId, String topicName, String operator) {
// 清空Topic
Result<Void> rv = opTopicService.truncateTopic(new TopicTruncateParam(clusterPhyId, topicName, KafkaConstant.TOPICK_TRUNCATE_DEFAULT_OFFSET), operator);
if (rv.failed()) {
return rv;
}

return Result.buildSuc();
}

/**************************************************** private method ****************************************************/

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic;

import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class TopicTruncateParam extends ClusterPhyParam {
protected String topicName;
protected long offset;

public TopicTruncateParam(Long clusterPhyId, String topicName, long offset) {
super(clusterPhyId);
this.topicName = topicName;
this.offset = offset;
}

@Override
public String toString() {
return "TopicParam{" +
"clusterPhyId=" + clusterPhyId +
", topicName='" + topicName + '\'' +
", offset='" + offset + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class KafkaConstant {

public static final Map<String, ConfigDef.ConfigKey> KAFKA_ALL_CONFIG_DEF_MAP = new ConcurrentHashMap<>();

public static final Integer TOPICK_TRUNCATE_DEFAULT_OFFSET = -1;

static {
try {
KAFKA_ALL_CONFIG_DEF_MAP.putAll(CollectionConverters.asJava(LogConfig$.MODULE$.configKeys()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public enum OperationEnum {

RESTART(11, "重启"),

TRUNCATE(12, "清空"),

;

OperationEnum(int code, String desc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicTruncateParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;

Expand All @@ -21,4 +22,9 @@ public interface OpTopicService {
* 扩分区
*/
Result<Void> expandTopic(TopicPartitionExpandParam expandParam, String operator);

/**
* 清空topic消息
*/
Result<Void> truncateTopic(TopicTruncateParam param, String operator);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicTruncateParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
Expand All @@ -33,6 +34,7 @@
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import scala.Option;
Expand All @@ -57,6 +59,7 @@ public class OpTopicServiceImpl extends BaseKafkaVersionControlService implement
private static final String TOPIC_CREATE = "createTopic";
private static final String TOPIC_DELETE = "deleteTopic";
private static final String TOPIC_EXPAND = "expandTopic";
private static final String TOPIC_TRUNCATE = "truncateTopic";

@Autowired
private TopicService topicService;
Expand Down Expand Up @@ -92,6 +95,8 @@ private void init() {

registerVCHandler(TOPIC_EXPAND, V_0_10_0_0, V_0_11_0_3, "expandTopicByZKClient", this::expandTopicByZKClient);
registerVCHandler(TOPIC_EXPAND, V_0_11_0_3, V_MAX, "expandTopicByKafkaClient", this::expandTopicByKafkaClient);

registerVCHandler(TOPIC_TRUNCATE, V_0_11_0_0, V_MAX, "truncateTopicByKafkaClient", this::truncateTopicByKafkaClient);
}

@Override
Expand Down Expand Up @@ -203,9 +208,58 @@ public Result<Void> expandTopic(TopicPartitionExpandParam expandParam, String op
return rv;
}

@Override
public Result<Void> truncateTopic(TopicTruncateParam param, String operator) {
try {
// 清空topic数据
Result<Void> rv = (Result<Void>) doVCHandler(param.getClusterPhyId(), TOPIC_TRUNCATE, param);

if (rv == null || rv.failed()) {
return rv;
}

// 记录操作
OplogDTO oplogDTO = new OplogDTO(operator,
OperationEnum.TRUNCATE.getDesc(),
ModuleEnum.KAFKA_TOPIC.getDesc(),
MsgConstant.getTopicBizStr(param.getClusterPhyId(), param.getTopicName()),
String.format("清空Topic:[%s]", param.toString()));
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);
return rv;
} catch (VCHandlerNotExistException e) {
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
}
}

/**************************************************** private method ****************************************************/

private Result<Void> truncateTopicByKafkaClient(VersionItemParam itemParam) {
TopicTruncateParam param = (TopicTruncateParam) itemParam;
try {
AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId());
//获取topic的分区信息
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(param.getTopicName()), new DescribeTopicsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS));
Map<String, TopicDescription> descriptionMap = describeTopicsResult.all().get();

Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
RecordsToDelete recordsToDeleteOffset = RecordsToDelete.beforeOffset(param.getOffset());

descriptionMap.forEach((topicName, topicDescription) -> {
for (TopicPartitionInfo topicPartition : topicDescription.partitions()) {
recordsToDelete.put(new TopicPartition(topicName, topicPartition.partition()), recordsToDeleteOffset);
}
});

DeleteRecordsResult deleteRecordsResult = adminClient.deleteRecords(recordsToDelete, new DeleteRecordsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS));
deleteRecordsResult.all().get();
} catch (Exception e) {
log.error("truncate topic by kafka-client failed,clusterPhyId:{} topicName:{} offset:{}", param.getClusterPhyId(), param.getTopicName(), param.getOffset(), e);

return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
}

return Result.buildSuc();
}

private Result<Void> deleteByKafkaClient(VersionItemParam itemParam) {
TopicParam param = (TopicParam) itemParam;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class FrontEndControlVersionItems extends BaseMetricVersionMetric {
private static final String FE_HA_CREATE_MIRROR_TOPIC = "FEHaCreateMirrorTopic";
private static final String FE_HA_DELETE_MIRROR_TOPIC = "FEHaDeleteMirrorTopic";

private static final String FE_TRUNCATE_TOPIC = "FETruncateTopic";

public FrontEndControlVersionItems(){}

@Override
Expand Down Expand Up @@ -89,6 +91,10 @@ public List<VersionControlItem> init(){
itemList.add(buildItem().minVersion(VersionEnum.V_2_5_0_D_300).maxVersion(VersionEnum.V_2_5_0_D_MAX)
.name(FE_HA_DELETE_MIRROR_TOPIC).desc("HA-取消Topic复制"));

//truncate topic
itemList.add(buildItem().minVersion(VersionEnum.V_0_11_0_0).maxVersion(VersionEnum.V_MAX)
.name(FE_TRUNCATE_TOPIC).desc("清空topic"));

return itemList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ public Result<Void> expandTopics(@Validated @RequestBody TopicExpansionDTO dto)
return opTopicManager.expandTopic(dto, HttpRequestUtil.getOperator());
}

@ApiOperation(value = "Topic数据清空", notes = "")
@PostMapping(value = "topics/truncate-topic")
@ResponseBody
public Result<Void> truncateTopic(@Validated @RequestBody ClusterTopicDTO dto) {
return opTopicManager.truncateTopic(dto.getClusterId(), dto.getTopicName(), HttpRequestUtil.getOperator());
}

@ApiOperation(value = "Topic元信息", notes = "带是否存在信息")
@GetMapping(value = "clusters/{clusterPhyId}/topics/{topicName}/metadata-combine-exist")
@ResponseBody
Expand Down