Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.common.config.Configs;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
Expand All @@ -43,6 +44,8 @@ public class KafkaInputFormat implements InputFormat
private static final String DEFAULT_HEADER_COLUMN_PREFIX = "kafka.header.";
private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = "kafka.timestamp";
private static final String DEFAULT_TOPIC_COLUMN_NAME = "kafka.topic";
private static final String DEFAULT_PARTITION_COLUMN_NAME = "kafka.partition";
private static final String DEFAULT_OFFSET_COLUMN_NAME = "kafka.offset";
private static final String DEFAULT_KEY_COLUMN_NAME = "kafka.key";
public static final String DEFAULT_AUTO_TIMESTAMP_STRING = "__kif_auto_timestamp";

Expand All @@ -58,6 +61,8 @@ public class KafkaInputFormat implements InputFormat
private final String keyColumnName;
private final String timestampColumnName;
private final String topicColumnName;
private final String partitionColumnName;
private final String offsetColumnName;

public KafkaInputFormat(
@JsonProperty("headerFormat") @Nullable KafkaHeaderFormat headerFormat,
Expand All @@ -66,7 +71,9 @@ public KafkaInputFormat(
@JsonProperty("headerColumnPrefix") @Nullable String headerColumnPrefix,
@JsonProperty("keyColumnName") @Nullable String keyColumnName,
@JsonProperty("timestampColumnName") @Nullable String timestampColumnName,
@JsonProperty("topicColumnName") @Nullable String topicColumnName
@JsonProperty("topicColumnName") @Nullable String topicColumnName,
@JsonProperty("partitionColumnName") @Nullable String partitionColumnName,
@JsonProperty("offsetColumnName") @Nullable String offsetColumnName
)
{
this.headerFormat = headerFormat;
Expand All @@ -76,6 +83,8 @@ public KafkaInputFormat(
this.keyColumnName = keyColumnName != null ? keyColumnName : DEFAULT_KEY_COLUMN_NAME;
this.timestampColumnName = timestampColumnName != null ? timestampColumnName : DEFAULT_TIMESTAMP_COLUMN_NAME;
this.topicColumnName = topicColumnName != null ? topicColumnName : DEFAULT_TOPIC_COLUMN_NAME;
this.partitionColumnName = Configs.valueOrDefault(partitionColumnName, DEFAULT_PARTITION_COLUMN_NAME);
this.offsetColumnName = Configs.valueOrDefault(offsetColumnName, DEFAULT_OFFSET_COLUMN_NAME);
}

@Override
Expand Down Expand Up @@ -128,7 +137,9 @@ record ->
),
keyColumnName,
timestampColumnName,
topicColumnName
topicColumnName,
partitionColumnName,
offsetColumnName
);
}

Expand Down Expand Up @@ -180,6 +191,20 @@ public String getTopicColumnName()
return topicColumnName;
}

@Nullable
@JsonProperty
public String getPartitionColumnName()
{
return partitionColumnName;
}

@Nullable
@JsonProperty
public String getOffsetColumnName()
{
return offsetColumnName;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -196,14 +221,24 @@ public boolean equals(Object o)
&& Objects.equals(headerColumnPrefix, that.headerColumnPrefix)
&& Objects.equals(keyColumnName, that.keyColumnName)
&& Objects.equals(timestampColumnName, that.timestampColumnName)
&& Objects.equals(topicColumnName, that.topicColumnName);
&& Objects.equals(topicColumnName, that.topicColumnName)
&& Objects.equals(partitionColumnName, that.partitionColumnName)
&& Objects.equals(offsetColumnName, that.offsetColumnName);
}

@Override
public int hashCode()
{
return Objects.hash(headerFormat, valueFormat, keyFormat,
headerColumnPrefix, keyColumnName, timestampColumnName, topicColumnName
return Objects.hash(
headerFormat,
valueFormat,
keyFormat,
headerColumnPrefix,
keyColumnName,
timestampColumnName,
topicColumnName,
partitionColumnName,
offsetColumnName
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class KafkaInputReader implements InputEntityReader
private final String keyColumnName;
private final String timestampColumnName;
private final String topicColumnName;
private final String partitionColumnName;
private final String offsetColumnName;

/**
* @param inputRowSchema Actual schema from the ingestion spec
Expand All @@ -74,7 +76,9 @@ public KafkaInputReader(
InputEntityReader valueParser,
String keyColumnName,
String timestampColumnName,
String topicColumnName
String topicColumnName,
String partitionColumnName,
String offsetColumnName
)
{
this.inputRowSchema = inputRowSchema;
Expand All @@ -85,6 +89,8 @@ public KafkaInputReader(
this.keyColumnName = keyColumnName;
this.timestampColumnName = timestampColumnName;
this.topicColumnName = topicColumnName;
this.partitionColumnName = partitionColumnName;
this.offsetColumnName = offsetColumnName;
}

@Override
Expand Down Expand Up @@ -131,6 +137,8 @@ private Map<String, Object> extractHeader(KafkaRecordEntity record)

// Add kafka record topic to the mergelist, only if the key doesn't already exist
mergedHeaderMap.putIfAbsent(topicColumnName, record.getRecord().topic());
mergedHeaderMap.putIfAbsent(partitionColumnName, record.getRecord().partition());
mergedHeaderMap.putIfAbsent(offsetColumnName, record.getRecord().offset());

return mergedHeaderMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ public void setUp()
"kafka.newheader.",
"kafka.newkey.key",
"kafka.newts.timestamp",
"kafka.newtopic.topic"
"kafka.newtopic.topic",
"kafka.new.partition",
"kafka.new.offset"
);
}

Expand Down Expand Up @@ -183,7 +185,9 @@ public void testSerde() throws JsonProcessingException
"kafka.newheader.",
"kafka.newkey.key",
"kafka.newts.timestamp",
"kafka.newtopic.topic"
"kafka.newtopic.topic",
"kafka.new.partition",
"kafka.new.offset"
);
Assert.assertEquals(format, kif);

Expand Down Expand Up @@ -419,7 +423,8 @@ public void testWithOutKeyAndHeaderSpecs() throws IOException
false,
false
),
"kafka.newheader.", "kafka.newkey.", "kafka.newts.", "kafka.newtopic."
"kafka.newheader.", "kafka.newkey.", "kafka.newts.", "kafka.newtopic.",
"kafka.new.partition", "kafka.new.offset"
);

final InputEntityReader reader = localFormat.createReader(
Expand Down Expand Up @@ -648,6 +653,7 @@ public void testWithSchemaDiscovery() throws IOException
"foo",
"kafka.newts.timestamp",
"kafka.newkey.key",
"kafka.new.partition",
"root_baz",
"o",
"bar",
Expand All @@ -656,6 +662,7 @@ public void testWithSchemaDiscovery() throws IOException
"jq_omg",
"jq_omg2",
"baz",
"kafka.new.offset",
"root_baz2",
"kafka.newheader.encoding",
"path_omg2"
Expand Down Expand Up @@ -714,7 +721,9 @@ public void testKeyInCsvFormat() throws IOException
"kafka.newheader.",
"kafka.newkey.key",
"kafka.newts.timestamp",
"kafka.newtopic.topic"
"kafka.newtopic.topic",
"kafka.new.partition",
"kafka.new.offset"
);

Headers headers = new RecordHeaders(SAMPLE_HEADERS);
Expand Down Expand Up @@ -812,7 +821,9 @@ public void testValueInCsvFormat() throws IOException
"kafka.newheader.",
"kafka.newkey.key",
"kafka.newts.timestamp",
"kafka.newtopic.topic"
"kafka.newtopic.topic",
"kafka.new.partition",
"kafka.new.offset"
);

Headers headers = new RecordHeaders(SAMPLE_HEADERS);
Expand Down Expand Up @@ -913,12 +924,14 @@ public void testWithPartialDeclarationSchemaDiscovery() throws IOException
"foo",
"kafka.newts.timestamp",
"kafka.newkey.key",
"kafka.new.partition",
"root_baz",
"o",
"path_omg",
"jq_omg",
"jq_omg2",
"baz",
"kafka.new.offset",
"root_baz2",
"kafka.newheader.encoding",
"path_omg2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public byte[] value()
new KafkaStringHeaderFormat(null),
INPUT_FORMAT,
INPUT_FORMAT,
"kafka.testheader.", "kafka.key", "kafka.timestamp", "kafka.topic"
"kafka.testheader.", "kafka.key", "kafka.timestamp", "kafka.topic", "kafka.partition", "kafka.offset"
);

private static TestingCluster zkServer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public KafkaIOConfigBuilder withKafkaInputFormat(InputFormat valueFormat)
null,
null,
null,
null,
null,
null
);
return this;
Expand Down
Loading