diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java index 8d5cf2ed388f..46a3caa8a354 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import org.apache.druid.common.config.Configs; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; @@ -43,6 +44,8 @@ public class KafkaInputFormat implements InputFormat private static final String DEFAULT_HEADER_COLUMN_PREFIX = "kafka.header."; private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = "kafka.timestamp"; private static final String DEFAULT_TOPIC_COLUMN_NAME = "kafka.topic"; + private static final String DEFAULT_PARTITION_COLUMN_NAME = "kafka.partition"; + private static final String DEFAULT_OFFSET_COLUMN_NAME = "kafka.offset"; private static final String DEFAULT_KEY_COLUMN_NAME = "kafka.key"; public static final String DEFAULT_AUTO_TIMESTAMP_STRING = "__kif_auto_timestamp"; @@ -58,6 +61,8 @@ public class KafkaInputFormat implements InputFormat private final String keyColumnName; private final String timestampColumnName; private final String topicColumnName; + private final String partitionColumnName; + private final String offsetColumnName; public KafkaInputFormat( @JsonProperty("headerFormat") @Nullable KafkaHeaderFormat headerFormat, @@ -66,7 +71,9 @@ public KafkaInputFormat( @JsonProperty("headerColumnPrefix") @Nullable String headerColumnPrefix, @JsonProperty("keyColumnName") @Nullable String keyColumnName, @JsonProperty("timestampColumnName") @Nullable String timestampColumnName, - @JsonProperty("topicColumnName") @Nullable String topicColumnName + @JsonProperty("topicColumnName") @Nullable String topicColumnName, + @JsonProperty("partitionColumnName") @Nullable String partitionColumnName, + @JsonProperty("offsetColumnName") @Nullable String offsetColumnName ) { this.headerFormat = headerFormat; @@ -76,6 +83,8 @@ public KafkaInputFormat( this.keyColumnName = keyColumnName != null ? keyColumnName : DEFAULT_KEY_COLUMN_NAME; this.timestampColumnName = timestampColumnName != null ? timestampColumnName : DEFAULT_TIMESTAMP_COLUMN_NAME; this.topicColumnName = topicColumnName != null ? topicColumnName : DEFAULT_TOPIC_COLUMN_NAME; + this.partitionColumnName = Configs.valueOrDefault(partitionColumnName, DEFAULT_PARTITION_COLUMN_NAME); + this.offsetColumnName = Configs.valueOrDefault(offsetColumnName, DEFAULT_OFFSET_COLUMN_NAME); } @Override @@ -128,7 +137,9 @@ record -> ), keyColumnName, timestampColumnName, - topicColumnName + topicColumnName, + partitionColumnName, + offsetColumnName ); } @@ -180,6 +191,20 @@ public String getTopicColumnName() return topicColumnName; } + @Nullable + @JsonProperty + public String getPartitionColumnName() + { + return partitionColumnName; + } + + @Nullable + @JsonProperty + public String getOffsetColumnName() + { + return offsetColumnName; + } + @Override public boolean equals(Object o) { @@ -196,14 +221,24 @@ public boolean equals(Object o) && Objects.equals(headerColumnPrefix, that.headerColumnPrefix) && Objects.equals(keyColumnName, that.keyColumnName) && Objects.equals(timestampColumnName, that.timestampColumnName) - && Objects.equals(topicColumnName, that.topicColumnName); + && Objects.equals(topicColumnName, that.topicColumnName) + && Objects.equals(partitionColumnName, that.partitionColumnName) + && Objects.equals(offsetColumnName, that.offsetColumnName); } @Override public int hashCode() { - return Objects.hash(headerFormat, valueFormat, keyFormat, - headerColumnPrefix, keyColumnName, timestampColumnName, topicColumnName + return Objects.hash( + headerFormat, + valueFormat, + keyFormat, + headerColumnPrefix, + keyColumnName, + timestampColumnName, + topicColumnName, + partitionColumnName, + offsetColumnName ); } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java index b0964dfe5467..06a4dea5c3ed 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java @@ -56,6 +56,8 @@ public class KafkaInputReader implements InputEntityReader private final String keyColumnName; private final String timestampColumnName; private final String topicColumnName; + private final String partitionColumnName; + private final String offsetColumnName; /** * @param inputRowSchema Actual schema from the ingestion spec @@ -74,7 +76,9 @@ public KafkaInputReader( InputEntityReader valueParser, String keyColumnName, String timestampColumnName, - String topicColumnName + String topicColumnName, + String partitionColumnName, + String offsetColumnName ) { this.inputRowSchema = inputRowSchema; @@ -85,6 +89,8 @@ public KafkaInputReader( this.keyColumnName = keyColumnName; this.timestampColumnName = timestampColumnName; this.topicColumnName = topicColumnName; + this.partitionColumnName = partitionColumnName; + this.offsetColumnName = offsetColumnName; } @Override @@ -131,6 +137,8 @@ private Map extractHeader(KafkaRecordEntity record) // Add kafka record topic to the mergelist, only if the key doesn't already exist mergedHeaderMap.putIfAbsent(topicColumnName, record.getRecord().topic()); + mergedHeaderMap.putIfAbsent(partitionColumnName, record.getRecord().partition()); + mergedHeaderMap.putIfAbsent(offsetColumnName, record.getRecord().offset()); return mergedHeaderMap; } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java index edff7464d073..2f5cd0f06998 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java @@ -144,7 +144,9 @@ public void setUp() "kafka.newheader.", "kafka.newkey.key", "kafka.newts.timestamp", - "kafka.newtopic.topic" + "kafka.newtopic.topic", + "kafka.new.partition", + "kafka.new.offset" ); } @@ -183,7 +185,9 @@ public void testSerde() throws JsonProcessingException "kafka.newheader.", "kafka.newkey.key", "kafka.newts.timestamp", - "kafka.newtopic.topic" + "kafka.newtopic.topic", + "kafka.new.partition", + "kafka.new.offset" ); Assert.assertEquals(format, kif); @@ -419,7 +423,8 @@ public void testWithOutKeyAndHeaderSpecs() throws IOException false, false ), - "kafka.newheader.", "kafka.newkey.", "kafka.newts.", "kafka.newtopic." + "kafka.newheader.", "kafka.newkey.", "kafka.newts.", "kafka.newtopic.", + "kafka.new.partition", "kafka.new.offset" ); final InputEntityReader reader = localFormat.createReader( @@ -648,6 +653,7 @@ public void testWithSchemaDiscovery() throws IOException "foo", "kafka.newts.timestamp", "kafka.newkey.key", + "kafka.new.partition", "root_baz", "o", "bar", @@ -656,6 +662,7 @@ public void testWithSchemaDiscovery() throws IOException "jq_omg", "jq_omg2", "baz", + "kafka.new.offset", "root_baz2", "kafka.newheader.encoding", "path_omg2" @@ -714,7 +721,9 @@ public void testKeyInCsvFormat() throws IOException "kafka.newheader.", "kafka.newkey.key", "kafka.newts.timestamp", - "kafka.newtopic.topic" + "kafka.newtopic.topic", + "kafka.new.partition", + "kafka.new.offset" ); Headers headers = new RecordHeaders(SAMPLE_HEADERS); @@ -812,7 +821,9 @@ public void testValueInCsvFormat() throws IOException "kafka.newheader.", "kafka.newkey.key", "kafka.newts.timestamp", - "kafka.newtopic.topic" + "kafka.newtopic.topic", + "kafka.new.partition", + "kafka.new.offset" ); Headers headers = new RecordHeaders(SAMPLE_HEADERS); @@ -913,12 +924,14 @@ public void testWithPartialDeclarationSchemaDiscovery() throws IOException "foo", "kafka.newts.timestamp", "kafka.newkey.key", + "kafka.new.partition", "root_baz", "o", "path_omg", "jq_omg", "jq_omg2", "baz", + "kafka.new.offset", "root_baz2", "kafka.newheader.encoding", "path_omg2" diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 9954de88c26f..0020b452ac8f 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -172,7 +172,7 @@ public byte[] value() new KafkaStringHeaderFormat(null), INPUT_FORMAT, INPUT_FORMAT, - "kafka.testheader.", "kafka.key", "kafka.timestamp", "kafka.topic" + "kafka.testheader.", "kafka.key", "kafka.timestamp", "kafka.topic", "kafka.partition", "kafka.offset" ); private static TestingCluster zkServer; diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java index 1e0b12eb2a05..a649e07915a6 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java @@ -61,6 +61,8 @@ public KafkaIOConfigBuilder withKafkaInputFormat(InputFormat valueFormat) null, null, null, + null, + null, null ); return this;