Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.seekablestream.common.PartitionId;
import org.apache.kafka.common.TopicPartition;

import javax.annotation.Nullable;
Expand All @@ -47,11 +48,13 @@
KafkaTopicPartition.KafkaTopicPartitionKeySerializer.class)
@JsonDeserialize(using = KafkaTopicPartition.KafkaTopicPartitionDeserializer.class, keyUsing =
KafkaTopicPartition.KafkaTopicPartitionKeyDeserializer.class)
public class KafkaTopicPartition
public class KafkaTopicPartition implements PartitionId
{
private final int partition;
@Nullable
private final String topic;
@Nullable
private final String clusterKey;

/**
* This flag is used to maintain backward compatibilty with older versions of kafka indexing. If this flag
Expand All @@ -64,6 +67,16 @@ public class KafkaTopicPartition
private final boolean multiTopicPartition;

public KafkaTopicPartition(boolean multiTopicPartition, @Nullable String topic, int partition)
{
this(multiTopicPartition, null, topic, partition);
}

public KafkaTopicPartition(
boolean multiTopicPartition,
@Nullable String clusterKey,
@Nullable String topic,
int partition
)
{
this.partition = partition;
this.multiTopicPartition = multiTopicPartition;
Expand All @@ -75,6 +88,7 @@ public KafkaTopicPartition(boolean multiTopicPartition, @Nullable String topic,
} else {
this.topic = null;
}
this.clusterKey = clusterKey;
}

public int partition()
Expand Down Expand Up @@ -114,13 +128,13 @@ public boolean equals(Object o)
return partition == that.partition && multiTopicPartition == that.multiTopicPartition && Objects.equals(
topic,
that.topic
);
) && Objects.equals(clusterKey, that.clusterKey);
}

@Override
public int hashCode()
{
return Objects.hash(partition, multiTopicPartition, topic);
return Objects.hash(partition, multiTopicPartition, topic, clusterKey);
}

@Override
Expand All @@ -130,9 +144,17 @@ public String toString()
"partition=" + partition +
", topic='" + topic + '\'' +
", multiTopicPartition=" + multiTopicPartition +
", clusterKey='" + clusterKey + '\'' +
'}';
}

@Override
@Nullable
public String getCluster()
{
return clusterKey;
}

public static class KafkaTopicPartitionDeserializer extends JsonDeserializer<KafkaTopicPartition>
{
@Override
Expand All @@ -155,10 +177,11 @@ public static class KafkaTopicPartitionSerializer extends JsonSerializer<KafkaTo
public void serialize(KafkaTopicPartition value, JsonGenerator gen, SerializerProvider serializers)
throws IOException
{
if (null != value.topic && value.multiTopicPartition) {
gen.writeString(value.topic + ":" + value.partition);
final String clusterPrefix = value.clusterKey != null ? value.clusterKey + "|" : "";
if (value.topic != null && value.multiTopicPartition) {
gen.writeString(clusterPrefix + value.topic + ":" + value.partition);
} else {
gen.writeString(String.valueOf(value.partition));
gen.writeString(clusterPrefix + value.partition);
}
}

Expand All @@ -175,10 +198,11 @@ public static class KafkaTopicPartitionKeySerializer extends JsonSerializer<Kafk
public void serialize(KafkaTopicPartition value, JsonGenerator gen, SerializerProvider serializers)
throws IOException
{
if (null != value.topic && value.multiTopicPartition) {
gen.writeFieldName(value.topic + ":" + value.partition);
final String clusterPrefix = value.clusterKey != null ? value.clusterKey + "|" : "";
if (value.topic != null && value.multiTopicPartition) {
gen.writeFieldName(clusterPrefix + value.topic + ":" + value.partition);
} else {
gen.writeFieldName(String.valueOf(value.partition));
gen.writeFieldName(clusterPrefix + value.partition);
}
}

Expand All @@ -200,15 +224,37 @@ public KafkaTopicPartition deserializeKey(String key, DeserializationContext ctx

public static KafkaTopicPartition fromString(String str)
{
int index = str.lastIndexOf(':');
if (index < 0) {
return new KafkaTopicPartition(false, null, Integer.parseInt(str));
final String[] parts = str.split("\\|");
final String clusterKey;
final String[] topicAndPartition;
if (parts.length == 1) { // cluster key doesn't exist
clusterKey = null;
topicAndPartition = parts[0].split(":");
} else if (parts.length == 2) {
clusterKey = parts[0];
topicAndPartition = parts[1].split(":");
} else {
throw new IllegalArgumentException("Invalid partition id format: " + str);
}

final String topic;
final int partition;

if (topicAndPartition.length == 1) {
if (topicAndPartition[0].isEmpty()) {
throw new IllegalArgumentException("Partition is required: " + str);
}
topic = null;
partition = Integer.parseInt(topicAndPartition[0]);
} else if (topicAndPartition.length == 2) {
if (topicAndPartition[0].isEmpty() || topicAndPartition[1].isEmpty()) {
throw new IllegalArgumentException("Topic and partition are required: " + str);
}
topic = topicAndPartition[0];
partition = Integer.parseInt(topicAndPartition[1]);
} else {
return new KafkaTopicPartition(
true,
str.substring(0, index),
Integer.parseInt(str.substring(index + 1))
);
throw new IllegalArgumentException("Invalid format: " + str);
}
return new KafkaTopicPartition(topic != null, clusterKey, topic, partition);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ private void possiblyResetOffsetsOrWait(
) throws InterruptedException, IOException
{
final String stream = task.getIOConfig().getStartSequenceNumbers().getStream();
final String clusterKey = task.getIOConfig().getCluster();
final boolean isMultiTopic = task.getIOConfig().isMultiTopic();
final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
boolean doReset = false;
Expand All @@ -135,7 +136,7 @@ private void possiblyResetOffsetsOrWait(
// seek to the beginning to get the least available offset
StreamPartition<KafkaTopicPartition> streamPartition = StreamPartition.of(
stream,
new KafkaTopicPartition(isMultiTopic, topicPartition.topic(), topicPartition.partition())
new KafkaTopicPartition(isMultiTopic, clusterKey, topicPartition.topic(), topicPartition.partition())
);
final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
if (leastAvailableOffset == null) {
Expand All @@ -159,7 +160,7 @@ private void possiblyResetOffsetsOrWait(
if (doReset) {
sendResetRequestAndWait(CollectionUtils.mapKeys(resetPartitions, topicPartition -> StreamPartition.of(
stream,
new KafkaTopicPartition(isMultiTopic, topicPartition.topic(), topicPartition.partition())
new KafkaTopicPartition(isMultiTopic, clusterKey, topicPartition.topic(), topicPartition.partition())
)), taskToolbox);
} else {
log.warn("Retrying in %dms", task.getPollRetryMs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
Expand Down Expand Up @@ -98,19 +99,24 @@ protected SeekableStreamIndexTaskRunner<KafkaTopicPartition, Long, KafkaRecordEn
}

@Override
protected KafkaRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox)
protected RecordSupplier<KafkaTopicPartition, Long, KafkaRecordEntity> newTaskRecordSupplier(final TaskToolbox toolbox)
{
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
KafkaIndexTaskIOConfig kafkaIndexTaskIOConfig = (KafkaIndexTaskIOConfig) super.ioConfig;
final Map<String, Object> props = new HashMap<>(kafkaIndexTaskIOConfig.getConsumerProperties());

final String cluster = kafkaIndexTaskIOConfig.getCluster();
final Map<String, Object> props;
if (cluster != null) {
props = (Map<String, Object>) new HashMap<>(kafkaIndexTaskIOConfig.getConsumerProperties()).get(cluster);
} else {
props = new HashMap<>(kafkaIndexTaskIOConfig.getConsumerProperties());
}
props.put("auto.offset.reset", "none");

final KafkaRecordSupplier recordSupplier =
new KafkaRecordSupplier(props, configMapper, kafkaIndexTaskIOConfig.getConfigOverrides(),
kafkaIndexTaskIOConfig.isMultiTopic());
kafkaIndexTaskIOConfig.isMultiTopic(), kafkaIndexTaskIOConfig.getCluster());

if (toolbox.getMonitorScheduler() != null) {
toolbox.getMonitorScheduler().addMonitor(recordSupplier.monitor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Kafk
private final KafkaConfigOverrides configOverrides;

private final boolean multiTopic;
private final String cluster;

@JsonCreator
public KafkaIndexTaskIOConfig(
Expand All @@ -64,7 +65,8 @@ public KafkaIndexTaskIOConfig(
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("configOverrides") @Nullable KafkaConfigOverrides configOverrides,
@JsonProperty("multiTopic") @Nullable Boolean multiTopic,
@JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes
@JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes,
@JsonProperty("cluster") @Nullable String cluster
)
{
super(
Expand All @@ -85,6 +87,7 @@ public KafkaIndexTaskIOConfig(
this.pollTimeout = pollTimeout != null ? pollTimeout : KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS;
this.configOverrides = configOverrides;
this.multiTopic = multiTopic != null ? multiTopic : KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC;
this.cluster = cluster;

final SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long> myEndSequenceNumbers = getEndSequenceNumbers();
for (KafkaTopicPartition partition : myEndSequenceNumbers.getPartitionSequenceNumberMap().keySet()) {
Expand Down Expand Up @@ -128,7 +131,8 @@ public KafkaIndexTaskIOConfig(
inputFormat,
configOverrides,
KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC,
refreshRejectionPeriodsInMinutes
refreshRejectionPeriodsInMinutes,
null
);
}

Expand Down Expand Up @@ -184,6 +188,13 @@ public boolean isMultiTopic()
return multiTopic;
}

@JsonProperty
@Nullable
public String getCluster()
{
return cluster;
}

@Override
public String toString()
{
Expand All @@ -193,6 +204,8 @@ public String toString()
", startSequenceNumbers=" + getStartSequenceNumbers() +
", endSequenceNumbers=" + getEndSequenceNumbers() +
", consumerProperties=" + consumerProperties +
", multiTopic=" + multiTopic +
", cluster=" + cluster +
", pollTimeout=" + pollTimeout +
", useTransaction=" + isUseTransaction() +
", minimumMessageTime=" + getMinimumMessageTime() +
Expand Down
Loading
Loading