From cfb736356107176de512da62360cea899d0db83c Mon Sep 17 00:00:00 2001 From: Jesse Tuglu Date: Mon, 26 May 2025 14:35:18 -0700 Subject: [PATCH] Forward-port from internal --- .../data/input/kafka/KafkaTopicPartition.java | 80 +++-- ...ementalPublishingKafkaIndexTaskRunner.java | 5 +- .../druid/indexing/kafka/KafkaIndexTask.java | 14 +- .../kafka/KafkaIndexTaskIOConfig.java | 17 +- .../indexing/kafka/KafkaRecordSupplier.java | 57 +++- .../indexing/kafka/KafkaSamplerSpec.java | 50 ++- ...KafkaSeekableStreamEndSequenceNumbers.java | 9 +- ...fkaSeekableStreamStartSequenceNumbers.java | 17 +- .../kafka/supervisor/KafkaSupervisor.java | 59 +++- .../supervisor/KafkaSupervisorIOConfig.java | 30 +- .../indexing/kafka/KafkaSamplerSpecTest.java | 18 +- .../KafkaSupervisorIOConfigTest.java | 9 +- .../supervisor/KafkaSupervisorSpecTest.java | 6 +- .../kafka/supervisor/KafkaSupervisorTest.java | 12 +- .../seekablestream/common/PartitionId.java | 29 ++ .../common/RecordSupplierGroup.java | 216 +++++++++++++ .../common/RecordSupplierGroupTest.java | 289 ++++++++++++++++++ 17 files changed, 832 insertions(+), 85 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/PartitionId.java create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplierGroup.java create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/common/RecordSupplierGroupTest.java diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java index d8b26c75f174..dc0b81e12279 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java @@ -29,6 +29,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import org.apache.druid.error.DruidException; +import org.apache.druid.indexing.seekablestream.common.PartitionId; import org.apache.kafka.common.TopicPartition; import javax.annotation.Nullable; @@ -47,11 +48,13 @@ KafkaTopicPartition.KafkaTopicPartitionKeySerializer.class) @JsonDeserialize(using = KafkaTopicPartition.KafkaTopicPartitionDeserializer.class, keyUsing = KafkaTopicPartition.KafkaTopicPartitionKeyDeserializer.class) -public class KafkaTopicPartition +public class KafkaTopicPartition implements PartitionId { private final int partition; @Nullable private final String topic; + @Nullable + private final String clusterKey; /** * This flag is used to maintain backward compatibilty with older versions of kafka indexing. If this flag @@ -64,6 +67,16 @@ public class KafkaTopicPartition private final boolean multiTopicPartition; public KafkaTopicPartition(boolean multiTopicPartition, @Nullable String topic, int partition) + { + this(multiTopicPartition, null, topic, partition); + } + + public KafkaTopicPartition( + boolean multiTopicPartition, + @Nullable String clusterKey, + @Nullable String topic, + int partition + ) { this.partition = partition; this.multiTopicPartition = multiTopicPartition; @@ -75,6 +88,7 @@ public KafkaTopicPartition(boolean multiTopicPartition, @Nullable String topic, } else { this.topic = null; } + this.clusterKey = clusterKey; } public int partition() @@ -114,13 +128,13 @@ public boolean equals(Object o) return partition == that.partition && multiTopicPartition == that.multiTopicPartition && Objects.equals( topic, that.topic - ); + ) && Objects.equals(clusterKey, that.clusterKey); } @Override public int hashCode() { - return Objects.hash(partition, multiTopicPartition, topic); + return Objects.hash(partition, multiTopicPartition, topic, clusterKey); } @Override @@ -130,9 +144,17 @@ public String toString() "partition=" + partition + ", topic='" + topic + '\'' + ", multiTopicPartition=" + multiTopicPartition + + ", clusterKey='" + clusterKey + '\'' + '}'; } + @Override + @Nullable + public String getCluster() + { + return clusterKey; + } + public static class KafkaTopicPartitionDeserializer extends JsonDeserializer { @Override @@ -155,10 +177,11 @@ public static class KafkaTopicPartitionSerializer extends JsonSerializer resetPartitions = new HashMap<>(); boolean doReset = false; @@ -135,7 +136,7 @@ private void possiblyResetOffsetsOrWait( // seek to the beginning to get the least available offset StreamPartition streamPartition = StreamPartition.of( stream, - new KafkaTopicPartition(isMultiTopic, topicPartition.topic(), topicPartition.partition()) + new KafkaTopicPartition(isMultiTopic, clusterKey, topicPartition.topic(), topicPartition.partition()) ); final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition); if (leastAvailableOffset == null) { @@ -159,7 +160,7 @@ private void possiblyResetOffsetsOrWait( if (doReset) { sendResetRequestAndWait(CollectionUtils.mapKeys(resetPartitions, topicPartition -> StreamPartition.of( stream, - new KafkaTopicPartition(isMultiTopic, topicPartition.topic(), topicPartition.partition()) + new KafkaTopicPartition(isMultiTopic, clusterKey, topicPartition.topic(), topicPartition.partition()) )), taskToolbox); } else { log.warn("Retrying in %dms", task.getPollRetryMs()); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index c7f477ebb501..98f77a534640 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -32,6 +32,7 @@ import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.Resource; @@ -98,19 +99,24 @@ protected SeekableStreamIndexTaskRunner newTaskRecordSupplier(final TaskToolbox toolbox) { ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); KafkaIndexTaskIOConfig kafkaIndexTaskIOConfig = (KafkaIndexTaskIOConfig) super.ioConfig; - final Map props = new HashMap<>(kafkaIndexTaskIOConfig.getConsumerProperties()); - + final String cluster = kafkaIndexTaskIOConfig.getCluster(); + final Map props; + if (cluster != null) { + props = (Map) new HashMap<>(kafkaIndexTaskIOConfig.getConsumerProperties()).get(cluster); + } else { + props = new HashMap<>(kafkaIndexTaskIOConfig.getConsumerProperties()); + } props.put("auto.offset.reset", "none"); final KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(props, configMapper, kafkaIndexTaskIOConfig.getConfigOverrides(), - kafkaIndexTaskIOConfig.isMultiTopic()); + kafkaIndexTaskIOConfig.isMultiTopic(), kafkaIndexTaskIOConfig.getCluster()); if (toolbox.getMonitorScheduler() != null) { toolbox.getMonitorScheduler().addMonitor(recordSupplier.monitor()); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java index 07c0f80fbe83..1b41aedf4e87 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java @@ -41,6 +41,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig myEndSequenceNumbers = getEndSequenceNumbers(); for (KafkaTopicPartition partition : myEndSequenceNumbers.getPartitionSequenceNumberMap().keySet()) { @@ -128,7 +131,8 @@ public KafkaIndexTaskIOConfig( inputFormat, configOverrides, KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC, - refreshRejectionPeriodsInMinutes + refreshRejectionPeriodsInMinutes, + null ); } @@ -184,6 +188,13 @@ public boolean isMultiTopic() return multiTopic; } + @JsonProperty + @Nullable + public String getCluster() + { + return cluster; + } + @Override public String toString() { @@ -193,6 +204,8 @@ public String toString() ", startSequenceNumbers=" + getStartSequenceNumbers() + ", endSequenceNumbers=" + getEndSequenceNumbers() + ", consumerProperties=" + consumerProperties + + ", multiTopic=" + multiTopic + + ", cluster=" + cluster + ", pollTimeout=" + pollTimeout + ", useTransaction=" + isUseTransaction() + ", minimumMessageTime=" + getMinimumMessageTime() + diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index ac7683a25ef4..0431f41034a2 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -46,6 +46,7 @@ import org.apache.kafka.common.serialization.Deserializer; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Type; @@ -75,6 +76,23 @@ public class KafkaRecordSupplier implements RecordSupplier consumerProperties, + ObjectMapper sortingMapper, + KafkaConfigOverrides configOverrides, + boolean multiTopic, + final @Nullable String clusterId + ) + { + this(getKafkaConsumer(sortingMapper, consumerProperties, configOverrides), multiTopic, clusterId); + } + + @VisibleForTesting public KafkaRecordSupplier( Map consumerProperties, ObjectMapper sortingMapper, @@ -90,14 +108,25 @@ public KafkaRecordSupplier( KafkaConsumer consumer, boolean multiTopic ) + { + this(consumer, multiTopic, null); + } + + @VisibleForTesting + public KafkaRecordSupplier( + KafkaConsumer consumer, + boolean multiTopic, + final String clusterId + ) { this.consumer = consumer; this.multiTopic = multiTopic; this.monitor = new KafkaConsumerMonitor(consumer); + this.clusterId = clusterId; } @Override - public void assign(Set> streamPartitions) + public synchronized void assign(Set> streamPartitions) { if (streamPartitions.isEmpty()) { wrapExceptions(() -> consumer.assign(Collections.emptyList())); @@ -116,7 +145,7 @@ public void assign(Set> streamPartitions) } @Override - public void seek(StreamPartition partition, Long sequenceNumber) + public synchronized void seek(StreamPartition partition, Long sequenceNumber) { wrapExceptions(() -> consumer.seek( partition.getPartitionId().asTopicPartition(partition.getStream()), @@ -125,7 +154,7 @@ public void seek(StreamPartition partition, Long sequenceNu } @Override - public void seekToEarliest(Set> partitions) + public synchronized void seekToEarliest(Set> partitions) { wrapExceptions(() -> consumer.seekToBeginning(partitions .stream() @@ -134,7 +163,7 @@ public void seekToEarliest(Set> partitions) } @Override - public void seekToLatest(Set> partitions) + public synchronized void seekToLatest(Set> partitions) { wrapExceptions(() -> consumer.seekToEnd(partitions .stream() @@ -143,13 +172,13 @@ public void seekToLatest(Set> partitions) } @Override - public Set> getAssignment() + public synchronized Set> getAssignment() { return wrapExceptions(() -> consumer.assignment() .stream() .map(e -> new StreamPartition<>( stream, - new KafkaTopicPartition(multiTopic, e.topic(), + new KafkaTopicPartition(multiTopic, clusterId, e.topic(), e.partition() ) )) @@ -175,7 +204,7 @@ public List partition) + public synchronized Long getLatestSequenceNumber(StreamPartition partition) { Long currPos = getPosition(partition); seekToLatest(Collections.singleton(partition)); @@ -185,7 +214,7 @@ public Long getLatestSequenceNumber(StreamPartition partiti } @Override - public Long getEarliestSequenceNumber(StreamPartition partition) + public synchronized Long getEarliestSequenceNumber(StreamPartition partition) { Long currPos = getPosition(partition); seekToEarliest(Collections.singleton(partition)); @@ -195,7 +224,7 @@ public Long getEarliestSequenceNumber(StreamPartition parti } @Override - public boolean isOffsetAvailable(StreamPartition partition, OrderedSequenceNumber offset) + public synchronized boolean isOffsetAvailable(StreamPartition partition, OrderedSequenceNumber offset) { final Long earliestOffset = getEarliestSequenceNumber(partition); return earliestOffset != null @@ -203,13 +232,13 @@ public boolean isOffsetAvailable(StreamPartition partition, } @Override - public Long getPosition(StreamPartition partition) + public synchronized Long getPosition(StreamPartition partition) { return wrapExceptions(() -> consumer.position(partition.getPartitionId().asTopicPartition(partition.getStream()))); } @Override - public Map getLatestSequenceNumbers(Set> partitions) + public synchronized Map getLatestSequenceNumbers(Set> partitions) { return wrapExceptions(() -> CollectionUtils.mapKeys( consumer.endOffsets( @@ -224,7 +253,7 @@ public Map getLatestSequenceNumbers(Set getPartitionIds(String stream) + public synchronized Set getPartitionIds(String stream) { return wrapExceptions(() -> { List allPartitions; @@ -252,7 +281,7 @@ public Set getPartitionIds(String stream) } } return allPartitions.stream() - .map(p -> new KafkaTopicPartition(multiTopic, p.topic(), p.partition())) + .map(p -> new KafkaTopicPartition(multiTopic, clusterId, p.topic(), p.partition())) .collect(Collectors.toSet()); }); } @@ -266,7 +295,7 @@ public Monitor monitor() } @Override - public void close() + public synchronized void close() { if (closed) { return; diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java index e0683fb605bb..b5f591211016 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java @@ -24,11 +24,15 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.kafka.KafkaRecordEntity; +import org.apache.druid.data.input.kafka.KafkaTopicPartition; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; import org.apache.druid.indexing.overlord.sampler.InputSourceSampler; import org.apache.druid.indexing.overlord.sampler.SamplerConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamSamplerSpec; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.indexing.seekablestream.common.RecordSupplierGroup; import org.apache.druid.java.util.common.UOE; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.Resource; @@ -37,8 +41,10 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; @@ -60,22 +66,50 @@ public KafkaSamplerSpec( } @Override - protected KafkaRecordSupplier createRecordSupplier() + protected RecordSupplier createRecordSupplier() { ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); + KafkaSupervisorIOConfig kafkaSupervisorIOConfig = (KafkaSupervisorIOConfig) ioConfig; try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); final Map props = new HashMap<>(((KafkaSupervisorIOConfig) ioConfig).getConsumerProperties()); - props.put("enable.auto.commit", "false"); - props.put("auto.offset.reset", "none"); - props.put("request.timeout.ms", Integer.toString(samplerConfig.getTimeoutMs())); - KafkaSupervisorIOConfig kafkaSupervisorIOConfig = (KafkaSupervisorIOConfig) ioConfig; + if (kafkaSupervisorIOConfig.isMultiCluster()) { + List keyList = new ArrayList<>(); + List> supplierList = new ArrayList<>(); - return new KafkaRecordSupplier(props, objectMapper, kafkaSupervisorIOConfig.getConfigOverrides(), - kafkaSupervisorIOConfig.isMultiTopic() - ); + for (Map.Entry entry : props.entrySet()) { + final String clusterKey = entry.getKey(); + final Map consumerProperty = (Map) entry.getValue(); + + consumerProperty.put("enable.auto.commit", "false"); + consumerProperty.put("auto.offset.reset", "none"); + consumerProperty.put("request.timeout.ms", Integer.toString(samplerConfig.getTimeoutMs())); + + keyList.add(clusterKey); + + final KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( + consumerProperty, + objectMapper, + kafkaSupervisorIOConfig.getConfigOverrides(), + kafkaSupervisorIOConfig.isMultiTopic(), + clusterKey + ); + + supplierList.add(recordSupplier); + } + + return new RecordSupplierGroup<>(keyList, supplierList); + } else { + props.put("enable.auto.commit", "false"); + props.put("auto.offset.reset", "none"); + props.put("request.timeout.ms", Integer.toString(samplerConfig.getTimeoutMs())); + + return new KafkaRecordSupplier(props, objectMapper, kafkaSupervisorIOConfig.getConfigOverrides(), + kafkaSupervisorIOConfig.isMultiTopic() + ); + } } finally { Thread.currentThread().setContextClassLoader(currCtxCl); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbers.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbers.java index 5476f88c6cec..d7db93c2daa6 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbers.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbers.java @@ -102,7 +102,7 @@ public SeekableStreamSequenceNumbers plus( } }) .collect(Collectors.toMap( - e -> new KafkaTopicPartition(false, thisTopic, e.getKey().partition()), + e -> new KafkaTopicPartition(false, e.getKey().getCluster(), thisTopic, e.getKey().partition()), Map.Entry::getValue ))); } else { @@ -114,6 +114,7 @@ public SeekableStreamSequenceNumbers plus( getPartitionSequenceNumberMap(), k -> new KafkaTopicPartition( true, + k.getCluster(), k.asTopicPartition(thisTopic).topic(), k.partition() ) @@ -131,7 +132,7 @@ public SeekableStreamSequenceNumbers plus( } }) .collect(Collectors.toMap( - e -> new KafkaTopicPartition(true, e.getKey().asTopicPartition(thatTopic).topic(), e.getKey().partition()), + e -> new KafkaTopicPartition(true, e.getKey().getCluster(), e.getKey().asTopicPartition(thatTopic).topic(), e.getKey().partition()), Map.Entry::getValue ))); } @@ -163,9 +164,9 @@ public SeekableStreamSequenceNumbers minus( String thisTopic = entry.getKey().asTopicPartition(getStream()).topic(); boolean otherContainsThis = otherEnd.getPartitionSequenceNumberMap().containsKey(entry.getKey()); boolean otherContainsThisMultiTopic = otherEnd.getPartitionSequenceNumberMap() - .containsKey(new KafkaTopicPartition(true, thisTopic, entry.getKey().partition())); + .containsKey(new KafkaTopicPartition(true, entry.getKey().getCluster(), thisTopic, entry.getKey().partition())); boolean otherContainsThisSingleTopic = (thatTopic.equals(thisTopic) && otherEnd.getPartitionSequenceNumberMap() - .containsKey(new KafkaTopicPartition(false, null, entry.getKey().partition()))); + .containsKey(new KafkaTopicPartition(false, entry.getKey().getCluster(), null, entry.getKey().partition()))); if (!otherContainsThis && !otherContainsThisMultiTopic && !otherContainsThisSingleTopic) { newMap.put(entry.getKey(), entry.getValue()); } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbers.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbers.java index 24a3e08b5d5a..06d01840f196 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbers.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbers.java @@ -106,7 +106,7 @@ public SeekableStreamSequenceNumbers plus( } }) .collect(Collectors.toMap( - e -> new KafkaTopicPartition(false, thisTopic, e.getKey().partition()), + e -> new KafkaTopicPartition(false, e.getKey().getCluster(), thisTopic, e.getKey().partition()), Map.Entry::getValue ))); @@ -115,9 +115,9 @@ public SeekableStreamSequenceNumbers plus( // 2) exclusive in "other" getPartitionSequenceNumberMap().forEach( (partitionId, sequenceOffset) -> { - KafkaTopicPartition multiTopicPartitonIdToSearch = new KafkaTopicPartition(true, thisTopic, partitionId.partition()); + KafkaTopicPartition multiTopicPartitonIdToSearch = new KafkaTopicPartition(true, partitionId.getCluster(), thisTopic, partitionId.partition()); if (getExclusivePartitions().contains(partitionId) && !that.getPartitionSequenceNumberMap().containsKey(multiTopicPartitonIdToSearch)) { - newExclusivePartitions.add(new KafkaTopicPartition(false, this.getStream(), partitionId.partition())); + newExclusivePartitions.add(new KafkaTopicPartition(false, partitionId.getCluster(), this.getStream(), partitionId.partition())); } } ); @@ -131,6 +131,7 @@ public SeekableStreamSequenceNumbers plus( getPartitionSequenceNumberMap(), k -> new KafkaTopicPartition( true, + k.getCluster(), k.asTopicPartition(thisTopic).topic(), k.partition() ) @@ -148,7 +149,7 @@ public SeekableStreamSequenceNumbers plus( } }) .collect(Collectors.toMap( - e -> new KafkaTopicPartition(true, e.getKey().asTopicPartition(thatTopic).topic(), e.getKey().partition()), + e -> new KafkaTopicPartition(true, e.getKey().getCluster(), e.getKey().asTopicPartition(thatTopic).topic(), e.getKey().partition()), Map.Entry::getValue ))); @@ -157,10 +158,10 @@ public SeekableStreamSequenceNumbers plus( // 2) exclusive in "other" getPartitionSequenceNumberMap().forEach( (partitionId, sequenceOffset) -> { - KafkaTopicPartition multiTopicPartitonIdToSearch = new KafkaTopicPartition(true, thisTopic, partitionId.partition()); + KafkaTopicPartition multiTopicPartitonIdToSearch = new KafkaTopicPartition(true, partitionId.getCluster(), thisTopic, partitionId.partition()); boolean thatTopicMatchesThisTopicPattern = partitionId.topic().isPresent() ? pattern.matcher(partitionId.topic().get()).matches() : pattern.matcher(thatTopic).matches(); if (getExclusivePartitions().contains(partitionId) && (!thatTopicMatchesThisTopicPattern || !that.getPartitionSequenceNumberMap().containsKey(multiTopicPartitonIdToSearch))) { - newExclusivePartitions.add(new KafkaTopicPartition(true, this.getStream(), partitionId.partition())); + newExclusivePartitions.add(new KafkaTopicPartition(true, partitionId.getCluster(), this.getStream(), partitionId.partition())); } } ); @@ -193,9 +194,9 @@ public SeekableStreamSequenceNumbers minus( String thisTopic = entry.getKey().asTopicPartition(getStream()).topic(); boolean otherContainsThis = otherStart.getPartitionSequenceNumberMap().containsKey(entry.getKey()); boolean otherContainsThisMultiTopic = otherStart.getPartitionSequenceNumberMap() - .containsKey(new KafkaTopicPartition(true, thisTopic, entry.getKey().partition())); + .containsKey(new KafkaTopicPartition(true, entry.getKey().getCluster(), thisTopic, entry.getKey().partition())); boolean otherContainsThisSingleTopic = (thatTopic.equals(thisTopic) && otherStart.getPartitionSequenceNumberMap() - .containsKey(new KafkaTopicPartition(false, null, entry.getKey().partition()))); + .containsKey(new KafkaTopicPartition(false, entry.getKey().getCluster(), null, entry.getKey().partition()))); if (!otherContainsThis && !otherContainsThisMultiTopic && !otherContainsThisSingleTopic) { newMap.put(entry.getKey(), entry.getValue()); // A partition is exclusive if it's exclusive in "this" and not in "other"'s partitionSequenceNumberMap diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 743e4edc32ca..4ff76fab310e 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.data.input.kafka.KafkaTopicPartition; @@ -49,6 +50,7 @@ import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.indexing.seekablestream.common.RecordSupplierGroup; import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; @@ -95,6 +97,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor partitionToTimeLag; private final KafkaSupervisorSpec spec; + private final Map clusterIndices; public KafkaSupervisor( final TaskStorage taskStorage, @@ -120,12 +123,46 @@ public KafkaSupervisor( this.spec = spec; this.pattern = getIoConfig().isMultiTopic() ? Pattern.compile(getIoConfig().getStream()) : null; + + if (getIoConfig().isMultiCluster()) { + int idx = 0; + final Map consumerProperties = getIoConfig().getConsumerProperties(); + final ImmutableMap.Builder clusterIndices = new ImmutableMap.Builder<>(); + final List sortedClusters = consumerProperties.keySet().stream().sorted().collect(Collectors.toList()); + for (final String cluster : sortedClusters) { + clusterIndices.put(cluster, idx++); + } + this.clusterIndices = clusterIndices.build(); + } else { + this.clusterIndices = ImmutableMap.of(); + } } @Override protected RecordSupplier setupRecordSupplier() { + final KafkaSupervisorIOConfig ioConfig = spec.getIoConfig(); + if (!ioConfig.isMultiCluster()) { + final Map props = new HashMap<>(ioConfig.getConsumerProperties()); + final List keyList = new ArrayList<>(); + final List> supplierList = new ArrayList<>(); + for (final Map.Entry entry : props.entrySet()) { + final String clusterKey = entry.getKey(); + final Map consumerProperty = (Map) entry.getValue(); + log.info("cluster=[%s], consumerProperty=[%s]", clusterKey, consumerProperty); + keyList.add(clusterKey); + final KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( + consumerProperty, + sortingMapper, + ioConfig.getConfigOverrides(), + ioConfig.isMultiTopic(), + clusterKey + ); + supplierList.add(recordSupplier); + } + return new RecordSupplierGroup<>(keyList, supplierList); + } return new KafkaRecordSupplier( spec.getIoConfig().getConsumerProperties(), sortingMapper, @@ -138,10 +175,14 @@ protected RecordSupplier setupReco protected int getTaskGroupIdForPartition(KafkaTopicPartition partitionId) { Integer taskCount = spec.getIoConfig().getTaskCount(); + final int tasksPerCluster = getIoConfig().isMultiCluster() ? taskCount / clusterIndices.size() : taskCount; + final int clusterIdx = getIoConfig().isMultiCluster() ? clusterIndices.get(partitionId.getCluster()) : 0; + final int clusterOffset = clusterIdx * (tasksPerCluster); if (partitionId.isMultiTopicPartition()) { - return Math.abs(31 * partitionId.topic().hashCode() + partitionId.partition()) % taskCount; + return clusterOffset + (Math.abs(31 * partitionId.topic().hashCode() + partitionId.partition()) + % tasksPerCluster); } else { - return partitionId.partition() % taskCount; + return clusterOffset + (Math.abs(partitionId.partition()) % tasksPerCluster); } } @@ -198,6 +239,7 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( ) { KafkaSupervisorIOConfig kafkaIoConfig = (KafkaSupervisorIOConfig) ioConfig; + final String cluster = startPartitions.keySet().stream().findFirst().get().getCluster(); return new KafkaIndexTaskIOConfig( groupId, baseSequenceName, @@ -213,7 +255,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( ioConfig.getInputFormat(), kafkaIoConfig.getConfigOverrides(), kafkaIoConfig.isMultiTopic(), - ioConfig.getTaskDuration().getStandardMinutes() + ioConfig.getTaskDuration().getStandardMinutes(), + cluster ); } @@ -459,7 +502,8 @@ private Map getTimestampPerPartitionAtCurrentOffset(S try { int maxPolls = 5; while (!remainingPartitions.isEmpty() && maxPolls-- > 0) { - for (OrderedPartitionableRecord record : recordSupplier.poll(getIoConfig().getPollTimeout())) { + for (OrderedPartitionableRecord record : recordSupplier.poll( + getIoConfig().getPollTimeout())) { if (!result.containsKey(record.getPartitionId())) { result.put(record.getPartitionId(), record.getTimestamp()); remainingPartitions.remove(new StreamPartition<>(getIoConfig().getStream(), record.getPartitionId())); @@ -620,6 +664,11 @@ private KafkaTopicPartition getMatchingKafkaTopicPartition( ? pattern.matcher(streamMatchValue).matches() : getIoConfig().getStream().equals(streamMatchValue); - return match ? new KafkaTopicPartition(isMultiTopic(), streamMatchValue, kafkaTopicPartition.partition()) : null; + return match ? new KafkaTopicPartition( + isMultiTopic(), + kafkaTopicPartition.getCluster(), + streamMatchValue, + kafkaTopicPartition.partition() + ) : null; } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index 4eac3163fe34..1d1eaee921d2 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -54,6 +54,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig private final String topic; private final String topicPattern; private final boolean emitTimeLagMetrics; + private final boolean multiCluster; @JsonCreator public KafkaSupervisorIOConfig( @@ -77,7 +78,8 @@ public KafkaSupervisorIOConfig( @JsonProperty("configOverrides") KafkaConfigOverrides configOverrides, @JsonProperty("idleConfig") IdleConfig idleConfig, @JsonProperty("stopTaskCount") Integer stopTaskCount, - @Nullable @JsonProperty("emitTimeLagMetrics") Boolean emitTimeLagMetrics + @Nullable @JsonProperty("emitTimeLagMetrics") Boolean emitTimeLagMetrics, + @Nullable @JsonProperty("multiCluster") Boolean multiCluster ) { super( @@ -100,15 +102,24 @@ public KafkaSupervisorIOConfig( ); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); - Preconditions.checkNotNull( - consumerProperties.get(BOOTSTRAP_SERVERS_KEY), - StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY) - ); this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS; this.configOverrides = configOverrides; this.topic = topic; this.topicPattern = topicPattern; this.emitTimeLagMetrics = Configs.valueOrDefault(emitTimeLagMetrics, false); + this.multiCluster = Configs.valueOrDefault(multiCluster, false); + + if (!this.multiCluster) { + Preconditions.checkNotNull( + consumerProperties.get(BOOTSTRAP_SERVERS_KEY), + StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY) + ); + } else { + Preconditions.checkState( + !consumerProperties.isEmpty(), + "Multi-cluster consumerProperties must contain at least 1 entry!" + ); + } } /** @@ -167,15 +178,22 @@ public boolean isEmitTimeLagMetrics() return emitTimeLagMetrics; } + @JsonProperty + public boolean isMultiCluster() + { + return multiCluster; + } + @Override public String toString() { return "KafkaSupervisorIOConfig{" + "topic='" + getTopic() + '\'' + - "topicPattern='" + getTopicPattern() + '\'' + + ", topicPattern='" + getTopicPattern() + '\'' + ", replicas=" + getReplicas() + ", taskCount=" + getTaskCount() + ", taskDuration=" + getTaskDuration() + + ", multiCluster=" + isMultiCluster() + ", consumerProperties=" + consumerProperties + ", autoScalerConfig=" + getAutoScalerConfig() + ", pollTimeout=" + pollTimeout + diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java index f0d2bacd3598..45e6112c82a4 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -167,7 +167,8 @@ public void testSample() null, null, null, - false + false, + null ), null, null, @@ -222,7 +223,8 @@ public void testSampleWithTopicPattern() null, null, null, - false + false, + null ), null, null, @@ -286,7 +288,8 @@ public void testSampleKafkaInputFormat() null, null, null, - false + false, + null ), null, null, @@ -392,7 +395,8 @@ public void testWithInputRowParser() throws IOException null, null, null, - false + false, + null ), null, null, @@ -578,7 +582,8 @@ public void testInvalidKafkaConfig() null, null, null, - false + false, + null ), null, null, @@ -636,7 +641,8 @@ public void testGetInputSourceResources() null, null, null, - false + false, + null ), null, null, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java index 2da46aaf0e2b..6295d41937e8 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java @@ -340,7 +340,8 @@ public void testAutoScalingConfigSerde() throws JsonProcessingException null, null, null, - false + false, + null ); String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig); KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 = mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class); @@ -375,7 +376,8 @@ public void testAutoScalingConfigSerde() throws JsonProcessingException null, null, null, - false + false, + null ); Assert.assertEquals(5, kafkaSupervisorIOConfig.getTaskCount().intValue()); @@ -427,7 +429,8 @@ public void testIdleConfigSerde() throws JsonProcessingException null, mapper.convertValue(idleConfig, IdleConfig.class), null, - false + false, + null ); String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig); KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 = mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java index 1f1fa85b475f..75120cdd93bc 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java @@ -701,7 +701,8 @@ public void test_validateSpecUpdateTo() null, null, null, - false + false, + null ), Map.of( "key1", @@ -750,7 +751,8 @@ private KafkaSupervisorSpec getSpec(String topic, String topicPattern) null, null, null, - false + false, + null ), null, null, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 9428de463bf5..f5ccbcfbf7c0 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -326,7 +326,8 @@ public SeekableStreamIndexTaskClient build( null, new IdleConfig(true, 1000L), 1, - false + false, + null ); final KafkaSupervisorTuningConfig tuningConfigOri = new KafkaSupervisorTuningConfig( @@ -5317,7 +5318,8 @@ private TestableKafkaSupervisor getTestableSupervisor( null, idleConfig, null, - true + true, + null ); KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory( @@ -5433,7 +5435,8 @@ private TestableKafkaSupervisor getTestableSupervisorCustomIsTaskCurrent( null, null, null, - false + false, + null ); KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory( @@ -5552,7 +5555,8 @@ private KafkaSupervisor createSupervisor( null, null, null, - false + false, + null ); KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/PartitionId.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/PartitionId.java new file mode 100644 index 000000000000..efc4d5dc98e5 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/PartitionId.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.common; + +/** + * Used to uniquely identify a partition in a Kinesis/Kafka stream. + * Partition IDs should identify which cluster they are reading from, as well as +*/ +public interface PartitionId +{ + String getCluster(); +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplierGroup.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplierGroup.java new file mode 100644 index 000000000000..131f090b1b3b --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplierGroup.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.common; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.java.util.common.logger.Logger; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +/** + * Appears like a single RecordSupplier, except muxes calls to a map of underlying RecordSuppliers. + * Used in situations where one needs to read from multiple RecordSuppliers at once, e.g multi-cluster ingest. + */ +public class RecordSupplierGroup + implements Closeable, RecordSupplier +{ + private static final Logger log = new Logger(RecordSupplierGroup.class); + final ImmutableMap> suppliers; + + public RecordSupplierGroup( + final List keyList, + final List> supplierList + ) + { + if (keyList.size() != supplierList.size()) { + throw new IllegalArgumentException("Key list and supplier list must have the same size."); + } + + final ImmutableMap.Builder> builder = ImmutableMap.builder(); + for (int i = 0; i < keyList.size(); ++i) { + builder.put(keyList.get(i), supplierList.get(i)); + } + this.suppliers = builder.build(); + } + + @Override + public void assign(Set> streamPartitions) + { + streamPartitions.stream() + .collect(Collectors.groupingBy( + streamPartition -> streamPartition.getPartitionId().getCluster(), + Collectors.toSet() + )) + .forEach((clusterId, partitions) -> this.suppliers.get(clusterId).assign(partitions)); + } + + @Override + public void seek(StreamPartition partition, SequenceOffsetType sequenceNumber) + throws InterruptedException + { + this.suppliers.get(partition.getPartitionId().getCluster()).seek(partition, sequenceNumber); + } + + @Override + public void seekToEarliest(Set> streamPartitions) throws InterruptedException + { + streamPartitions.stream() + .collect(Collectors.groupingBy( + streamPartition -> streamPartition.getPartitionId().getCluster(), + Collectors.toSet() + )) + .forEach((clusterId, partitions) -> { + try { + this.suppliers.get(clusterId).seekToEarliest(partitions); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public void seekToLatest(Set> streamPartitions) throws InterruptedException + { + streamPartitions.stream() + .collect(Collectors.groupingBy( + streamPartition -> streamPartition.getPartitionId().getCluster(), + Collectors.toSet() + )) + .forEach((clusterId, partitions) -> { + try { + this.suppliers.get(clusterId).seekToLatest(partitions); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public Collection> getAssignment() + { + return suppliers.values() + .stream() + .flatMap(supplier -> supplier.getAssignment().stream()) + .collect(Collectors.toList()); + } + + /** + * Polls all record suppliers asynchronously with a specified timeout. + * This is currently only used in sampler specs and not in tasks. + * + * @param timeout the maximum time to wait for all suppliers to poll records, in milliseconds. + * @return a list of records from all suppliers that completed within the timeout. If a supplier fails or times out, + * its results are not included in the returned list. + * @throws RuntimeException if an unexpected exception occurs while waiting for the suppliers to complete. + */ + @Override + public List> poll(long timeout) + { + final List>>> futures = suppliers.values() + .stream() + .map( + supplier -> CompletableFuture.supplyAsync( + () -> supplier.poll( + timeout))) + .collect( + Collectors.toList()); + final CompletableFuture groupFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + + try { + groupFuture.get(timeout, TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { + log.warn(e, "Record suppliers poll() timed out"); + } + catch (Exception e) { + throw new RuntimeException(e); + } + + return futures.stream().map(future -> { + try { + return future.getNow(null); + } + catch (Exception e) { + log.error(e, "Failure polling from record supplier"); + } + return null; + }).filter(result -> result != null).flatMap(List::stream).collect(Collectors.toList()); + } + + @Override + public @Nullable SequenceOffsetType getLatestSequenceNumber(StreamPartition partition) + { + return suppliers.get(partition.getPartitionId().getCluster()).getLatestSequenceNumber(partition); + } + + @Override + public @Nullable SequenceOffsetType getEarliestSequenceNumber(StreamPartition partition) + { + return suppliers.get(partition.getPartitionId().getCluster()).getEarliestSequenceNumber(partition); + } + + @Override + public boolean isOffsetAvailable( + StreamPartition partition, + OrderedSequenceNumber offset + ) + { + return suppliers.get(partition.getPartitionId().getCluster()).isOffsetAvailable(partition, offset); + } + + @Override + public SequenceOffsetType getPosition(StreamPartition partition) + { + return suppliers.get(partition.getPartitionId().getCluster()).getPosition(partition); + } + + @Override + public Set getPartitionIds(String stream) + { + return suppliers.values() + .stream() + .flatMap(supplier -> supplier.getPartitionIds(stream).stream()) + .collect(Collectors.toSet()); + } + + @Override + public void close() + { + try { + for (final RecordSupplier supplier : suppliers.values()) { + supplier.close(); + } + } + catch (Exception e) { + log.error(e, "Failed to close record supplier group"); + } + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/common/RecordSupplierGroupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/common/RecordSupplierGroupTest.java new file mode 100644 index 000000000000..5ee75ede7f38 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/common/RecordSupplierGroupTest.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.common; + +import org.apache.druid.data.input.impl.ByteEntity; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +public class RecordSupplierGroupTest +{ + + private RecordSupplier mockSupplier1; + private RecordSupplier mockSupplier2; + private RecordSupplierGroup recordSupplierGroup; + private List keys; + private List> suppliers; + + private static class TestPartitionId implements PartitionId + { + private final String cluster; + + public TestPartitionId(final String cluster) + { + this.cluster = cluster; + } + + @Override + public String getCluster() + { + return cluster; + } + } + + @Before + public void setUp() + { + mockSupplier1 = EasyMock.createMock(RecordSupplier.class); + mockSupplier2 = EasyMock.createMock(RecordSupplier.class); + keys = Arrays.asList("cluster1", "cluster2"); + suppliers = Arrays.asList(mockSupplier1, mockSupplier2); + recordSupplierGroup = new RecordSupplierGroup<>(keys, suppliers); + } + + @Test + public void testAssign() + { + Set> partitions1 = new HashSet<>(); + Set> partitions2 = new HashSet<>(); + + partitions1.add(new StreamPartition<>("stream-1", new TestPartitionId("cluster1"))); + partitions2.add(new StreamPartition<>("stream-2", new TestPartitionId("cluster2"))); + + mockSupplier1.assign(partitions1); + mockSupplier2.assign(partitions2); + + EasyMock.replay(mockSupplier1, mockSupplier2); + + recordSupplierGroup.assign(partitions1); + recordSupplierGroup.assign(partitions2); + + EasyMock.verify(mockSupplier1, mockSupplier2); + } + + @Test + public void testSeek() throws InterruptedException + { + StreamPartition partition1 = new StreamPartition<>("stream-1", new TestPartitionId("cluster1")); + StreamPartition partition2 = new StreamPartition<>("stream-2", new TestPartitionId("cluster2")); + + mockSupplier1.seek(partition1, 100L); + mockSupplier2.seek(partition2, 200L); + + EasyMock.replay(mockSupplier1, mockSupplier2); + + recordSupplierGroup.seek(partition1, 100L); + recordSupplierGroup.seek(partition2, 200L); + + EasyMock.verify(mockSupplier1, mockSupplier2); + } + + @Test + public void testSeekToEarliest() throws InterruptedException + { + Set> partitions1 = new HashSet<>(); + Set> partitions2 = new HashSet<>(); + + partitions1.add(new StreamPartition<>("stream-1", new TestPartitionId("cluster1"))); + partitions2.add(new StreamPartition<>("stream-2", new TestPartitionId("cluster2"))); + + mockSupplier1.seekToEarliest(partitions1); + mockSupplier2.seekToEarliest(partitions2); + + EasyMock.replay(mockSupplier1, mockSupplier2); + + recordSupplierGroup.seekToEarliest(partitions1); + recordSupplierGroup.seekToEarliest(partitions2); + + EasyMock.verify(mockSupplier1, mockSupplier2); + } + + @Test + public void testSeekToLatest() throws InterruptedException + { + Set> partitions1 = new HashSet<>(); + Set> partitions2 = new HashSet<>(); + + partitions1.add(new StreamPartition<>("stream-1", new TestPartitionId("cluster1"))); + partitions2.add(new StreamPartition<>("stream-2", new TestPartitionId("cluster2"))); + + mockSupplier1.seekToLatest(partitions1); + mockSupplier2.seekToLatest(partitions2); + + EasyMock.replay(mockSupplier1, mockSupplier2); + + recordSupplierGroup.seekToLatest(partitions1); + recordSupplierGroup.seekToLatest(partitions2); + + EasyMock.verify(mockSupplier1, mockSupplier2); + } + + @Test + public void testGetAssignment() + { + Collection> expectedAssignments1 = List.of( + new StreamPartition<>("stream-1", new TestPartitionId("cluster1"))); + Collection> expectedAssignments2 = List.of( + new StreamPartition<>("stream-2", new TestPartitionId("cluster2"))); + + EasyMock.expect(mockSupplier1.getAssignment()).andReturn(expectedAssignments1); + EasyMock.expect(mockSupplier2.getAssignment()).andReturn(expectedAssignments2); + + EasyMock.replay(mockSupplier1, mockSupplier2); + + Collection> assignments = recordSupplierGroup.getAssignment(); + + assertEquals(2, assignments.size()); + + EasyMock.verify(mockSupplier1, mockSupplier2); + } + + @Test + public void testPoll() + { + List> records1 = Collections.emptyList(); + List> records2 = Collections.emptyList(); + + EasyMock.expect(mockSupplier1.poll(1)).andReturn(records1); + EasyMock.expect(mockSupplier2.poll(1)).andReturn(records2); + + EasyMock.replay(mockSupplier1, mockSupplier2); + + List> records = recordSupplierGroup.poll(1); + + assertEquals(0, records.size()); + + EasyMock.verify(mockSupplier1, mockSupplier2); + } + + @Test + public void testGetLatestSequenceNumber() + { + StreamPartition partition1 = new StreamPartition<>("stream-1", new TestPartitionId("cluster1")); + StreamPartition partition2 = new StreamPartition<>("stream-2", new TestPartitionId("cluster2")); + + EasyMock.expect(mockSupplier1.getLatestSequenceNumber(partition1)).andReturn(100L); + EasyMock.expect(mockSupplier2.getLatestSequenceNumber(partition2)).andReturn(200L); + + EasyMock.replay(mockSupplier1, mockSupplier2); + + assertEquals(Long.valueOf(100), recordSupplierGroup.getLatestSequenceNumber(partition1)); + assertEquals(Long.valueOf(200), recordSupplierGroup.getLatestSequenceNumber(partition2)); + + EasyMock.verify(mockSupplier1, mockSupplier2); + } + + @Test + public void testGetEarliestSequenceNumber() + { + StreamPartition partition1 = new StreamPartition<>("stream-1", new TestPartitionId("cluster1")); + StreamPartition partition2 = new StreamPartition<>("stream-2", new TestPartitionId("cluster2")); + + EasyMock.expect(mockSupplier1.getEarliestSequenceNumber(partition1)).andReturn(10L); + EasyMock.expect(mockSupplier2.getEarliestSequenceNumber(partition2)).andReturn(20L); + + EasyMock.replay(mockSupplier1, mockSupplier2); + + assertEquals(Long.valueOf(10), recordSupplierGroup.getEarliestSequenceNumber(partition1)); + assertEquals(Long.valueOf(20), recordSupplierGroup.getEarliestSequenceNumber(partition2)); + + EasyMock.verify(mockSupplier1, mockSupplier2); + } + + @Test + public void testIsOffsetAvailable() + { + StreamPartition partition1 = new StreamPartition<>("stream-1", new TestPartitionId("cluster1")); + StreamPartition partition2 = new StreamPartition<>("stream-2", new TestPartitionId("cluster2")); + + OrderedSequenceNumber offset1 = EasyMock.createMock(OrderedSequenceNumber.class); + OrderedSequenceNumber offset2 = EasyMock.createMock(OrderedSequenceNumber.class); + + EasyMock.expect(mockSupplier1.isOffsetAvailable(partition1, offset1)).andReturn(true); + EasyMock.expect(mockSupplier2.isOffsetAvailable(partition2, offset2)).andReturn(false); + + EasyMock.replay(mockSupplier1, mockSupplier2, offset1, offset2); + + assertEquals(true, recordSupplierGroup.isOffsetAvailable(partition1, offset1)); + assertEquals(false, recordSupplierGroup.isOffsetAvailable(partition2, offset2)); + + EasyMock.verify(mockSupplier1, mockSupplier2, offset1, offset2); + } + + @Test + public void testGetPosition() + { + StreamPartition partition1 = new StreamPartition<>("stream-1", new TestPartitionId("cluster1")); + StreamPartition partition2 = new StreamPartition<>("stream-2", new TestPartitionId("cluster2")); + + EasyMock.expect(mockSupplier1.getPosition(partition1)).andReturn(50L); + EasyMock.expect(mockSupplier2.getPosition(partition2)).andReturn(60L); + + EasyMock.replay(mockSupplier1, mockSupplier2); + + assertEquals(Long.valueOf(50), recordSupplierGroup.getPosition(partition1)); + assertEquals(Long.valueOf(60), recordSupplierGroup.getPosition(partition2)); + + EasyMock.verify(mockSupplier1, mockSupplier2); + } + + @Test + public void testGetPartitionIds() + { + Set partitionIds1 = new HashSet<>(Arrays.asList( + new TestPartitionId("partition1"), new TestPartitionId("partition3"))); + Set partitionIds2 = new HashSet<>(Arrays.asList( + new TestPartitionId("partition2"), new TestPartitionId("partition4"))); + + EasyMock.expect(mockSupplier1.getPartitionIds("stream")).andReturn(partitionIds1); + EasyMock.expect(mockSupplier2.getPartitionIds("stream")).andReturn(partitionIds2); + + EasyMock.replay(mockSupplier1, mockSupplier2); + + Set partitionIds = recordSupplierGroup.getPartitionIds("stream"); + + assertEquals(4, partitionIds.size()); + + EasyMock.verify(mockSupplier1, mockSupplier2); + } + + @Test + public void testClose() + { + mockSupplier1.close(); + mockSupplier2.close(); + + EasyMock.replay(mockSupplier1, mockSupplier2); + + recordSupplierGroup.close(); + + EasyMock.verify(mockSupplier1, mockSupplier2); + } +}