diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index 0dd31677af9e..e272bc1a1d53 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -40,6 +40,8 @@ This service is provided in the `druid-kafka-indexing-service` core Apache Druid > In addition, users could set `isolation.level` `read_uncommitted` in `consumerProperties`, if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. > Make sure offsets are sequential, since there is no offset gap check in Druid anymore. +> If your Kafka cluster enables consumer-group based ACLs, you can set `group.id` in `consumerProperties` to override the default auto generated group id. + ## Tutorial This page contains reference documentation for Apache Kafka-based ingestion. diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java index 365be19cc03c..9ba3123582d2 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java @@ -19,9 +19,6 @@ package org.apache.druid.indexing.kafka; -import org.apache.druid.common.utils.IdUtils; -import org.apache.druid.java.util.common.StringUtils; - import java.util.HashMap; import java.util.Map; @@ -35,7 +32,6 @@ public static Map getConsumerProperties() { final Map props = new HashMap<>(); props.put("metadata.max.age.ms", "10000"); - props.put("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId())); props.put("auto.offset.reset", "none"); props.put("enable.auto.commit", "false"); return props; diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index bd98510027d5..623379aae55e 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -30,15 +30,9 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.segment.indexing.DataSchema; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import java.util.ArrayList; import java.util.HashMap; import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.stream.Collectors; public class KafkaIndexTask extends SeekableStreamIndexTask { @@ -84,44 +78,6 @@ long getPollRetryMs() return pollRetryMs; } - @Deprecated - KafkaConsumer newConsumer() - { - ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); - try { - Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); - - final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); - final Properties props = new Properties(); - KafkaRecordSupplier.addConsumerPropertiesFromConfig( - props, - configMapper, - ioConfig.getConsumerProperties() - ); - props.putIfAbsent("isolation.level", "read_committed"); - props.putAll(consumerConfigs); - - return new KafkaConsumer<>(props); - } - finally { - Thread.currentThread().setContextClassLoader(currCtxCl); - } - } - - @Deprecated - static void assignPartitions( - final KafkaConsumer consumer, - final String topic, - final Set partitions - ) - { - consumer.assign( - new ArrayList<>( - partitions.stream().map(n -> new TopicPartition(topic, n)).collect(Collectors.toList()) - ) - ); - } - @Override protected SeekableStreamIndexTaskRunner createTaskRunner() { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 1397cddf22dc..fe32ffe5ffb6 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; @@ -29,6 +30,7 @@ import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.metadata.DynamicConfigProvider; import org.apache.druid.metadata.PasswordProvider; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -255,6 +257,7 @@ private static KafkaConsumer getKafkaConsumer(ObjectMapper sorti final Properties props = new Properties(); addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); props.putIfAbsent("isolation.level", "read_committed"); + props.putIfAbsent("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId())); props.putAll(consumerConfigs); ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); diff --git a/website/.spelling b/website/.spelling index 315e5ffc9aaa..25b6d65b8063 100644 --- a/website/.spelling +++ b/website/.spelling @@ -22,6 +22,7 @@ 500MiB 64-bit ACL +ACLs APIs AvroStorage AWS