Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ const ConsumeClientTest = () => {
...configInfo,
needFilterKeyValue: changeValue === 1 || changeValue === 2,
needFilterSize: changeValue === 3 || changeValue === 4 || changeValue === 5,
needFilterKey: changeValue === 6,
needFilterValue: changeValue === 7,
});
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@ export const cardList = [

export const filterList = [
{
label: 'none',
label: 'None',
value: 0,
},
{
label: 'contains',
label: 'Contains',
value: 1,
},
{
label: 'does not contains',
label: 'Does Not Contains',
value: 2,
},
{
label: 'equals',
label: 'Equals',
value: 3,
},
{
Expand All @@ -39,6 +39,14 @@ export const filterList = [
label: 'Under Size',
value: 5,
},
{
label: 'Key Contains',
value: 6,
},
{
label: 'Value Contains',
value: 7,
}
];

export const untilList = [
Expand Down Expand Up @@ -324,10 +332,10 @@ export const getFormConfig = (topicMetaData: any, info = {} as any, partitionLis
key: 'filterKey',
label: 'Key',
type: FormItemType.input,
invisible: !info?.needFilterKeyValue,
invisible: !info?.needFilterKeyValue && !info?.needFilterKey,
rules: [
{
required: info?.needFilterKeyValue,
required: info?.needFilterKeyValue || info?.needFilterKey,
message: '请输入Key',
},
],
Expand All @@ -336,10 +344,10 @@ export const getFormConfig = (topicMetaData: any, info = {} as any, partitionLis
key: 'filterValue',
label: 'Value',
type: FormItemType.input,
invisible: !info?.needFilterKeyValue,
invisible: !info?.needFilterKeyValue && !info?.needFilterValue,
rules: [
{
required: info?.needFilterKeyValue,
required: info?.needFilterKeyValue || info?.needFilterValue,
message: '请输入Value',
},
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,18 @@ private Result<Void> checkStartFromAndFilterLegal(KafkaConsumerStartFromDTO star
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "包含的方式过滤,必须有过滤的key或value");
}

// key包含过滤
if (KafkaConsumerFilterEnum.KEY_CONTAINS.getCode().equals(filter.getFilterType())
&& ValidateUtils.isBlank(filter.getFilterCompareKey())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "key包含的方式过滤,必须有过滤的key");
}

// value包含过滤
if (KafkaConsumerFilterEnum.VALUE_CONTAINS.getCode().equals(filter.getFilterType())
&& ValidateUtils.isBlank(filter.getFilterCompareValue())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "value包含的方式过滤,必须有过滤的value");
}

// 不包含过滤
if (KafkaConsumerFilterEnum.NOT_CONTAINS.getCode().equals(filter.getFilterType())
&& ValidateUtils.isBlank(filter.getFilterCompareKey()) && ValidateUtils.isBlank(filter.getFilterCompareValue())) {
Expand Down Expand Up @@ -550,6 +562,18 @@ private boolean checkMatchFilter(ConsumerRecord consumerRecord, KafkaConsumerFil
return true;
}

// key包含过滤
if (KafkaConsumerFilterEnum.KEY_CONTAINS.getCode().equals(filter.getFilterType())
&& (!ValidateUtils.isBlank(filter.getFilterCompareKey()) && consumerRecord.key() != null && consumerRecord.key().toString().contains(filter.getFilterCompareKey()))) {
return true;
}

// value包含过滤
if (KafkaConsumerFilterEnum.VALUE_CONTAINS.getCode().equals(filter.getFilterType())
&& (!ValidateUtils.isBlank(filter.getFilterCompareValue()) && consumerRecord.value() != null && consumerRecord.value().toString().contains(filter.getFilterCompareValue()))) {
return true;
}

// 不包含过滤
if (KafkaConsumerFilterEnum.NOT_CONTAINS.getCode().equals(filter.getFilterType())
&& (!ValidateUtils.isBlank(filter.getFilterCompareKey()) && (consumerRecord.key() == null || !consumerRecord.key().toString().contains(filter.getFilterCompareKey())))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class KafkaConsumerFilterDTO extends BaseDTO {
/**
* @see KafkaConsumerFilterEnum
*/
@Range(min = 0, max = 5, message = "filterType最大和最小值必须在[0, 5]之间")
@Range(min = 0, max = 7, message = "filterType最大和最小值必须在[0, 7]之间")
@ApiModelProperty(value = "开始消费位置的类型", example = "2")
private Integer filterType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ public enum KafkaConsumerFilterEnum {

UNDER_SIZE(5, "size小于"),

KEY_CONTAINS(6, "key包含"),

VALUE_CONTAINS(7, "value包含"),

;

private final Integer code;
Expand Down