From 80583b0c541efaed2e281237961efa92b2b1b573 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 19 Nov 2025 18:18:03 +0530 Subject: [PATCH 1/3] Add capability to ingest record offsets using KafkaInputFormat --- .../input/kafkainput/KafkaInputFormat.java | 31 ++++++++++++++++--- .../input/kafkainput/KafkaInputReader.java | 7 ++++- .../kafkainput/KafkaInputFormatTest.java | 14 ++++++--- .../indexing/kafka/KafkaIndexTaskTest.java | 2 +- .../supervisor/KafkaIOConfigBuilder.java | 1 + 5 files changed, 43 insertions(+), 12 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java index 8d5cf2ed388f..c63236405063 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import org.apache.druid.common.config.Configs; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; @@ -43,6 +44,7 @@ public class KafkaInputFormat implements InputFormat private static final String DEFAULT_HEADER_COLUMN_PREFIX = "kafka.header."; private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = "kafka.timestamp"; private static final String DEFAULT_TOPIC_COLUMN_NAME = "kafka.topic"; + private static final String DEFAULT_OFFSET_COLUMN_NAME = "kafka.offset"; private static final String DEFAULT_KEY_COLUMN_NAME = "kafka.key"; public static final String DEFAULT_AUTO_TIMESTAMP_STRING = "__kif_auto_timestamp"; @@ -58,6 +60,7 @@ public class KafkaInputFormat implements InputFormat private final String keyColumnName; private final String timestampColumnName; private final String topicColumnName; + private final String offsetColumnName; public KafkaInputFormat( @JsonProperty("headerFormat") @Nullable KafkaHeaderFormat headerFormat, @@ -66,7 +69,8 @@ public KafkaInputFormat( @JsonProperty("headerColumnPrefix") @Nullable String headerColumnPrefix, @JsonProperty("keyColumnName") @Nullable String keyColumnName, @JsonProperty("timestampColumnName") @Nullable String timestampColumnName, - @JsonProperty("topicColumnName") @Nullable String topicColumnName + @JsonProperty("topicColumnName") @Nullable String topicColumnName, + @JsonProperty("offsetColumnName") @Nullable String offsetColumnName ) { this.headerFormat = headerFormat; @@ -76,6 +80,7 @@ public KafkaInputFormat( this.keyColumnName = keyColumnName != null ? keyColumnName : DEFAULT_KEY_COLUMN_NAME; this.timestampColumnName = timestampColumnName != null ? timestampColumnName : DEFAULT_TIMESTAMP_COLUMN_NAME; this.topicColumnName = topicColumnName != null ? topicColumnName : DEFAULT_TOPIC_COLUMN_NAME; + this.offsetColumnName = Configs.valueOrDefault(offsetColumnName, DEFAULT_OFFSET_COLUMN_NAME); } @Override @@ -128,7 +133,8 @@ record -> ), keyColumnName, timestampColumnName, - topicColumnName + topicColumnName, + offsetColumnName ); } @@ -180,6 +186,13 @@ public String getTopicColumnName() return topicColumnName; } + @Nullable + @JsonProperty + public String getOffsetColumnName() + { + return offsetColumnName; + } + @Override public boolean equals(Object o) { @@ -196,14 +209,22 @@ public boolean equals(Object o) && Objects.equals(headerColumnPrefix, that.headerColumnPrefix) && Objects.equals(keyColumnName, that.keyColumnName) && Objects.equals(timestampColumnName, that.timestampColumnName) - && Objects.equals(topicColumnName, that.topicColumnName); + && Objects.equals(topicColumnName, that.topicColumnName) + && Objects.equals(offsetColumnName, that.offsetColumnName); } @Override public int hashCode() { - return Objects.hash(headerFormat, valueFormat, keyFormat, - headerColumnPrefix, keyColumnName, timestampColumnName, topicColumnName + return Objects.hash( + headerFormat, + valueFormat, + keyFormat, + headerColumnPrefix, + keyColumnName, + timestampColumnName, + topicColumnName, + offsetColumnName ); } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java index b0964dfe5467..471554b41fcc 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java @@ -56,6 +56,7 @@ public class KafkaInputReader implements InputEntityReader private final String keyColumnName; private final String timestampColumnName; private final String topicColumnName; + private final String offsetColumnName; /** * @param inputRowSchema Actual schema from the ingestion spec @@ -74,7 +75,8 @@ public KafkaInputReader( InputEntityReader valueParser, String keyColumnName, String timestampColumnName, - String topicColumnName + String topicColumnName, + String offsetColumnName ) { this.inputRowSchema = inputRowSchema; @@ -85,6 +87,7 @@ public KafkaInputReader( this.keyColumnName = keyColumnName; this.timestampColumnName = timestampColumnName; this.topicColumnName = topicColumnName; + this.offsetColumnName = offsetColumnName; } @Override @@ -132,6 +135,8 @@ private Map extractHeader(KafkaRecordEntity record) // Add kafka record topic to the mergelist, only if the key doesn't already exist mergedHeaderMap.putIfAbsent(topicColumnName, record.getRecord().topic()); + mergedHeaderMap.putIfAbsent(offsetColumnName, record.getRecord().offset()); + return mergedHeaderMap; } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java index edff7464d073..aaf000c7f3a6 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java @@ -144,7 +144,8 @@ public void setUp() "kafka.newheader.", "kafka.newkey.key", "kafka.newts.timestamp", - "kafka.newtopic.topic" + "kafka.newtopic.topic", + "kafka.new.offset" ); } @@ -183,7 +184,8 @@ public void testSerde() throws JsonProcessingException "kafka.newheader.", "kafka.newkey.key", "kafka.newts.timestamp", - "kafka.newtopic.topic" + "kafka.newtopic.topic", + "kafka.new.offset" ); Assert.assertEquals(format, kif); @@ -419,7 +421,7 @@ public void testWithOutKeyAndHeaderSpecs() throws IOException false, false ), - "kafka.newheader.", "kafka.newkey.", "kafka.newts.", "kafka.newtopic." + "kafka.newheader.", "kafka.newkey.", "kafka.newts.", "kafka.newtopic.", "kafka.new.offset" ); final InputEntityReader reader = localFormat.createReader( @@ -714,7 +716,8 @@ public void testKeyInCsvFormat() throws IOException "kafka.newheader.", "kafka.newkey.key", "kafka.newts.timestamp", - "kafka.newtopic.topic" + "kafka.newtopic.topic", + "kafka.new.offset" ); Headers headers = new RecordHeaders(SAMPLE_HEADERS); @@ -812,7 +815,8 @@ public void testValueInCsvFormat() throws IOException "kafka.newheader.", "kafka.newkey.key", "kafka.newts.timestamp", - "kafka.newtopic.topic" + "kafka.newtopic.topic", + "kafka.new.offset" ); Headers headers = new RecordHeaders(SAMPLE_HEADERS); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 9954de88c26f..738eb59092e9 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -172,7 +172,7 @@ public byte[] value() new KafkaStringHeaderFormat(null), INPUT_FORMAT, INPUT_FORMAT, - "kafka.testheader.", "kafka.key", "kafka.timestamp", "kafka.topic" + "kafka.testheader.", "kafka.key", "kafka.timestamp", "kafka.topic", "kafka.offset" ); private static TestingCluster zkServer; diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java index 1e0b12eb2a05..0e4beed6cc58 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java @@ -61,6 +61,7 @@ public KafkaIOConfigBuilder withKafkaInputFormat(InputFormat valueFormat) null, null, null, + null, null ); return this; From 6a6f8f0e6f7d8d165b5d9007777eba0c9f07912e Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 19 Nov 2025 19:23:07 +0530 Subject: [PATCH 2/3] Fix unit test --- .../druid/data/input/kafkainput/KafkaInputFormatTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java index aaf000c7f3a6..c06d8cb2bfaa 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java @@ -658,6 +658,7 @@ public void testWithSchemaDiscovery() throws IOException "jq_omg", "jq_omg2", "baz", + "kafka.new.offset", "root_baz2", "kafka.newheader.encoding", "path_omg2" @@ -923,6 +924,7 @@ public void testWithPartialDeclarationSchemaDiscovery() throws IOException "jq_omg", "jq_omg2", "baz", + "kafka.new.offset", "root_baz2", "kafka.newheader.encoding", "path_omg2" From dfba0c106c17fd221354a9a197d061d544e65b7c Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 19 Nov 2025 20:24:03 +0530 Subject: [PATCH 3/3] Add partitionColumnName --- .../data/input/kafkainput/KafkaInputFormat.java | 14 ++++++++++++++ .../data/input/kafkainput/KafkaInputReader.java | 5 ++++- .../input/kafkainput/KafkaInputFormatTest.java | 9 ++++++++- .../druid/indexing/kafka/KafkaIndexTaskTest.java | 2 +- .../kafka/supervisor/KafkaIOConfigBuilder.java | 1 + 5 files changed, 28 insertions(+), 3 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java index c63236405063..46a3caa8a354 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java @@ -44,6 +44,7 @@ public class KafkaInputFormat implements InputFormat private static final String DEFAULT_HEADER_COLUMN_PREFIX = "kafka.header."; private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = "kafka.timestamp"; private static final String DEFAULT_TOPIC_COLUMN_NAME = "kafka.topic"; + private static final String DEFAULT_PARTITION_COLUMN_NAME = "kafka.partition"; private static final String DEFAULT_OFFSET_COLUMN_NAME = "kafka.offset"; private static final String DEFAULT_KEY_COLUMN_NAME = "kafka.key"; public static final String DEFAULT_AUTO_TIMESTAMP_STRING = "__kif_auto_timestamp"; @@ -60,6 +61,7 @@ public class KafkaInputFormat implements InputFormat private final String keyColumnName; private final String timestampColumnName; private final String topicColumnName; + private final String partitionColumnName; private final String offsetColumnName; public KafkaInputFormat( @@ -70,6 +72,7 @@ public KafkaInputFormat( @JsonProperty("keyColumnName") @Nullable String keyColumnName, @JsonProperty("timestampColumnName") @Nullable String timestampColumnName, @JsonProperty("topicColumnName") @Nullable String topicColumnName, + @JsonProperty("partitionColumnName") @Nullable String partitionColumnName, @JsonProperty("offsetColumnName") @Nullable String offsetColumnName ) { @@ -80,6 +83,7 @@ public KafkaInputFormat( this.keyColumnName = keyColumnName != null ? keyColumnName : DEFAULT_KEY_COLUMN_NAME; this.timestampColumnName = timestampColumnName != null ? timestampColumnName : DEFAULT_TIMESTAMP_COLUMN_NAME; this.topicColumnName = topicColumnName != null ? topicColumnName : DEFAULT_TOPIC_COLUMN_NAME; + this.partitionColumnName = Configs.valueOrDefault(partitionColumnName, DEFAULT_PARTITION_COLUMN_NAME); this.offsetColumnName = Configs.valueOrDefault(offsetColumnName, DEFAULT_OFFSET_COLUMN_NAME); } @@ -134,6 +138,7 @@ record -> keyColumnName, timestampColumnName, topicColumnName, + partitionColumnName, offsetColumnName ); } @@ -186,6 +191,13 @@ public String getTopicColumnName() return topicColumnName; } + @Nullable + @JsonProperty + public String getPartitionColumnName() + { + return partitionColumnName; + } + @Nullable @JsonProperty public String getOffsetColumnName() @@ -210,6 +222,7 @@ public boolean equals(Object o) && Objects.equals(keyColumnName, that.keyColumnName) && Objects.equals(timestampColumnName, that.timestampColumnName) && Objects.equals(topicColumnName, that.topicColumnName) + && Objects.equals(partitionColumnName, that.partitionColumnName) && Objects.equals(offsetColumnName, that.offsetColumnName); } @@ -224,6 +237,7 @@ public int hashCode() keyColumnName, timestampColumnName, topicColumnName, + partitionColumnName, offsetColumnName ); } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java index 471554b41fcc..06a4dea5c3ed 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java @@ -56,6 +56,7 @@ public class KafkaInputReader implements InputEntityReader private final String keyColumnName; private final String timestampColumnName; private final String topicColumnName; + private final String partitionColumnName; private final String offsetColumnName; /** @@ -76,6 +77,7 @@ public KafkaInputReader( String keyColumnName, String timestampColumnName, String topicColumnName, + String partitionColumnName, String offsetColumnName ) { @@ -87,6 +89,7 @@ public KafkaInputReader( this.keyColumnName = keyColumnName; this.timestampColumnName = timestampColumnName; this.topicColumnName = topicColumnName; + this.partitionColumnName = partitionColumnName; this.offsetColumnName = offsetColumnName; } @@ -134,7 +137,7 @@ private Map extractHeader(KafkaRecordEntity record) // Add kafka record topic to the mergelist, only if the key doesn't already exist mergedHeaderMap.putIfAbsent(topicColumnName, record.getRecord().topic()); - + mergedHeaderMap.putIfAbsent(partitionColumnName, record.getRecord().partition()); mergedHeaderMap.putIfAbsent(offsetColumnName, record.getRecord().offset()); return mergedHeaderMap; diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java index c06d8cb2bfaa..2f5cd0f06998 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java @@ -145,6 +145,7 @@ public void setUp() "kafka.newkey.key", "kafka.newts.timestamp", "kafka.newtopic.topic", + "kafka.new.partition", "kafka.new.offset" ); } @@ -185,6 +186,7 @@ public void testSerde() throws JsonProcessingException "kafka.newkey.key", "kafka.newts.timestamp", "kafka.newtopic.topic", + "kafka.new.partition", "kafka.new.offset" ); Assert.assertEquals(format, kif); @@ -421,7 +423,8 @@ public void testWithOutKeyAndHeaderSpecs() throws IOException false, false ), - "kafka.newheader.", "kafka.newkey.", "kafka.newts.", "kafka.newtopic.", "kafka.new.offset" + "kafka.newheader.", "kafka.newkey.", "kafka.newts.", "kafka.newtopic.", + "kafka.new.partition", "kafka.new.offset" ); final InputEntityReader reader = localFormat.createReader( @@ -650,6 +653,7 @@ public void testWithSchemaDiscovery() throws IOException "foo", "kafka.newts.timestamp", "kafka.newkey.key", + "kafka.new.partition", "root_baz", "o", "bar", @@ -718,6 +722,7 @@ public void testKeyInCsvFormat() throws IOException "kafka.newkey.key", "kafka.newts.timestamp", "kafka.newtopic.topic", + "kafka.new.partition", "kafka.new.offset" ); @@ -817,6 +822,7 @@ public void testValueInCsvFormat() throws IOException "kafka.newkey.key", "kafka.newts.timestamp", "kafka.newtopic.topic", + "kafka.new.partition", "kafka.new.offset" ); @@ -918,6 +924,7 @@ public void testWithPartialDeclarationSchemaDiscovery() throws IOException "foo", "kafka.newts.timestamp", "kafka.newkey.key", + "kafka.new.partition", "root_baz", "o", "path_omg", diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 738eb59092e9..0020b452ac8f 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -172,7 +172,7 @@ public byte[] value() new KafkaStringHeaderFormat(null), INPUT_FORMAT, INPUT_FORMAT, - "kafka.testheader.", "kafka.key", "kafka.timestamp", "kafka.topic", "kafka.offset" + "kafka.testheader.", "kafka.key", "kafka.timestamp", "kafka.topic", "kafka.partition", "kafka.offset" ); private static TestingCluster zkServer; diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java index 0e4beed6cc58..a649e07915a6 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java @@ -62,6 +62,7 @@ public KafkaIOConfigBuilder withKafkaInputFormat(InputFormat valueFormat) null, null, null, + null, null ); return this;