diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md index 66cd410b1eb78d..56ad146528fe34 100644 --- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md +++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md @@ -156,8 +156,7 @@ 示例: "kafka_partitions" = "0,1,2,3", - "kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END" - + "kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END" 4. property 指定自定义kafka参数。 @@ -170,7 +169,7 @@ "property.client.id" = "12345", "property.ssl.ca.location" = "FILE:ca.pem" - 使用 SSL 连接 Kafka 时,需要指定以下参数: + 1.使用 SSL 连接 Kafka 时,需要指定以下参数: "property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", @@ -189,6 +188,14 @@ 分别用于指定 client 的 public key,private key 以及 private key 的密码。 + + 2.指定kafka partition的默认起始offset + 如果没有指定kafka_partitions/kafka_offsets,默认消费所有分区,此时可以指定kafka_default_offsets指定起始 offset。默认为 OFFSET_END,即从末尾开始订阅。 + 值为 + 1) OFFSET_BEGINNING: 从有数据的位置开始订阅。 + 2) OFFSET_END: 从末尾开始订阅。 + 示例: + "property.kafka_default_offsets" = "OFFSET_BEGINNING" 7. 导入数据格式样例 diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 37f38a58e06515..f3ffc56cf89391 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -94,6 +94,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { // optional public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions"; public static final String KAFKA_OFFSETS_PROPERTY = "kafka_offsets"; + public static final String KAFKA_DEFAULT_OFFSETS = "kafka_default_offsets"; private static final String NAME_TYPE = "ROUTINE LOAD NAME"; private static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; @@ -136,6 +137,8 @@ public class CreateRoutineLoadStmt extends DdlStmt { private String kafkaTopic; // pair private List> kafkaPartitionOffsets = Lists.newArrayList(); + + //custom kafka property map private Map customKafkaProperties = Maps.newHashMap(); @@ -212,6 +215,7 @@ public Map getCustomKafkaProperties() { return customKafkaProperties; } + @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); @@ -401,7 +405,6 @@ private void checkKafkaProperties() throws AnalysisException { } } } - // check custom kafka property for (Map.Entry dataSourceProperty : dataSourceProperties.entrySet()) { if (dataSourceProperty.getKey().startsWith("property.")) { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 4ab576e6acccc5..b60a21c8459792 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -56,6 +56,8 @@ import java.util.Map; import java.util.UUID; +import static org.apache.doris.analysis.CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS; + /** * KafkaRoutineLoadJob is a kind of RoutineLoadJob which fetch data from kafka. * The progress which is super class property is seems like "{"partition1": offset1, "partition2": offset2}" @@ -71,6 +73,8 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { private List customKafkaPartitions = Lists.newArrayList(); // current kafka partitions is the actually partition which will be fetched private List currentKafkaPartitions = Lists.newArrayList(); + // optional, user want to set default offset when new partiton add or offset not set. + private String kafkaDefaultOffSet = ""; // kafka properties ,property prefix will be mapped to kafka custom parameters, which can be extended in the future private Map customProperties = Maps.newHashMap(); private Map convertedCustomProperties = Maps.newHashMap(); @@ -128,6 +132,9 @@ private void convertCustomProperties() throws DdlException { convertedCustomProperties.put(entry.getKey(), entry.getValue()); } } + if (convertedCustomProperties.containsKey(KAFKA_DEFAULT_OFFSETS)) { + kafkaDefaultOffSet = convertedCustomProperties.remove(KAFKA_DEFAULT_OFFSETS); + } } @Override @@ -368,11 +375,21 @@ private void updateNewPartitionProgress() { for (Integer kafkaPartition : currentKafkaPartitions) { if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) { // if offset is not assigned, start from OFFSET_END - ((KafkaProgress) progress).addPartitionOffset(Pair.create(kafkaPartition, KafkaProgress.OFFSET_END_VAL)); + long beginOffSet = KafkaProgress.OFFSET_END_VAL; + if (!kafkaDefaultOffSet.isEmpty()) { + if (kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) { + beginOffSet = KafkaProgress.OFFSET_BEGINNING_VAL; + } else if (kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_END)) { + beginOffSet = KafkaProgress.OFFSET_END_VAL; + } else { + beginOffSet = KafkaProgress.OFFSET_END_VAL; + } + } + ((KafkaProgress) progress).addPartitionOffset(Pair.create(kafkaPartition, beginOffSet)); if (LOG.isDebugEnabled()) { LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) .add("kafka_partition_id", kafkaPartition) - .add("begin_offset", KafkaProgress.OFFSET_END) + .add("begin_offset", beginOffSet) .add("msg", "The new partition has been added in job")); } } @@ -402,7 +419,6 @@ private void setCustomKafkaPartitions(List> kafkaPartitionOf private void setCustomKafkaProperties(Map kafkaProperties) { this.customProperties = kafkaProperties; } - @Override protected String dataSourcePropertiesJsonToString() { Map dataSourceProperties = Maps.newHashMap();