From b80bdff6007992c80f781564f3dbc87e43fde19d Mon Sep 17 00:00:00 2001 From: Han Date: Thu, 22 Apr 2021 16:30:10 +0800 Subject: [PATCH 1/4] allow user to set group.id for Kafka ingestion task --- .../org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java | 4 ---- .../java/org/apache/druid/indexing/kafka/KafkaIndexTask.java | 3 +++ .../org/apache/druid/indexing/kafka/KafkaRecordSupplier.java | 3 +++ 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java index 365be19cc03c..9ba3123582d2 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java @@ -19,9 +19,6 @@ package org.apache.druid.indexing.kafka; -import org.apache.druid.common.utils.IdUtils; -import org.apache.druid.java.util.common.StringUtils; - import java.util.HashMap; import java.util.Map; @@ -35,7 +32,6 @@ public static Map getConsumerProperties() { final Map props = new HashMap<>(); props.put("metadata.max.age.ms", "10000"); - props.put("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId())); props.put("auto.offset.reset", "none"); props.put("enable.auto.commit", "false"); return props; diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index bd98510027d5..1307c70ba543 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -25,10 +25,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.indexing.DataSchema; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; @@ -99,6 +101,7 @@ KafkaConsumer newConsumer() ioConfig.getConsumerProperties() ); props.putIfAbsent("isolation.level", "read_committed"); + props.putIfAbsent("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId())); props.putAll(consumerConfigs); return new KafkaConsumer<>(props); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 1397cddf22dc..fe32ffe5ffb6 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; @@ -29,6 +30,7 @@ import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.metadata.DynamicConfigProvider; import org.apache.druid.metadata.PasswordProvider; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -255,6 +257,7 @@ private static KafkaConsumer getKafkaConsumer(ObjectMapper sorti final Properties props = new Properties(); addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); props.putIfAbsent("isolation.level", "read_committed"); + props.putIfAbsent("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId())); props.putAll(consumerConfigs); ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); From 89dea4c49104768bff9b5e71621862fe67624ff2 Mon Sep 17 00:00:00 2001 From: Han Date: Wed, 5 May 2021 17:03:24 +0800 Subject: [PATCH 2/4] fix test coverage by removing deprecated code and add doc --- .../extensions-core/kafka-ingestion.md | 2 + .../druid/indexing/kafka/KafkaIndexTask.java | 47 ------------------- 2 files changed, 2 insertions(+), 47 deletions(-) diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index 0dd31677af9e..fdc6932cc9fa 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -40,6 +40,8 @@ This service is provided in the `druid-kafka-indexing-service` core Apache Druid > In addition, users could set `isolation.level` `read_uncommitted` in `consumerProperties`, if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. > Make sure offsets are sequential, since there is no offset gap check in Druid anymore. +> If your Kafka cluster enables ACLs, you can set `group.id` in `consumerProperties` to override the defualt auto generated group id. + ## Tutorial This page contains reference documentation for Apache Kafka-based ingestion. diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index 1307c70ba543..623379aae55e 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -25,22 +25,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.indexing.DataSchema; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import java.util.ArrayList; import java.util.HashMap; import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.stream.Collectors; public class KafkaIndexTask extends SeekableStreamIndexTask { @@ -86,45 +78,6 @@ long getPollRetryMs() return pollRetryMs; } - @Deprecated - KafkaConsumer newConsumer() - { - ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); - try { - Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); - - final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); - final Properties props = new Properties(); - KafkaRecordSupplier.addConsumerPropertiesFromConfig( - props, - configMapper, - ioConfig.getConsumerProperties() - ); - props.putIfAbsent("isolation.level", "read_committed"); - props.putIfAbsent("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId())); - props.putAll(consumerConfigs); - - return new KafkaConsumer<>(props); - } - finally { - Thread.currentThread().setContextClassLoader(currCtxCl); - } - } - - @Deprecated - static void assignPartitions( - final KafkaConsumer consumer, - final String topic, - final Set partitions - ) - { - consumer.assign( - new ArrayList<>( - partitions.stream().map(n -> new TopicPartition(topic, n)).collect(Collectors.toList()) - ) - ); - } - @Override protected SeekableStreamIndexTaskRunner createTaskRunner() { From 32702747752160d54c8feda4c0eb581d1715c54b Mon Sep 17 00:00:00 2001 From: Han Date: Wed, 5 May 2021 18:10:44 +0800 Subject: [PATCH 3/4] fix typo --- docs/development/extensions-core/kafka-ingestion.md | 2 +- website/.spelling | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index fdc6932cc9fa..c41cabe18dc2 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -40,7 +40,7 @@ This service is provided in the `druid-kafka-indexing-service` core Apache Druid > In addition, users could set `isolation.level` `read_uncommitted` in `consumerProperties`, if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. > Make sure offsets are sequential, since there is no offset gap check in Druid anymore. -> If your Kafka cluster enables ACLs, you can set `group.id` in `consumerProperties` to override the defualt auto generated group id. +> If your Kafka cluster enables ACLs, you can set `group.id` in `consumerProperties` to override the default auto generated group id. ## Tutorial diff --git a/website/.spelling b/website/.spelling index 315e5ffc9aaa..25b6d65b8063 100644 --- a/website/.spelling +++ b/website/.spelling @@ -22,6 +22,7 @@ 500MiB 64-bit ACL +ACLs APIs AvroStorage AWS From 5102f4e097fcfff3ce144142d3ecf5feb29984b9 Mon Sep 17 00:00:00 2001 From: Yuanli Han <44718283+yuanlihan@users.noreply.github.com> Date: Fri, 7 May 2021 10:37:32 +0800 Subject: [PATCH 4/4] Update docs/development/extensions-core/kafka-ingestion.md Co-authored-by: frank chen --- docs/development/extensions-core/kafka-ingestion.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index c41cabe18dc2..e272bc1a1d53 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -40,7 +40,7 @@ This service is provided in the `druid-kafka-indexing-service` core Apache Druid > In addition, users could set `isolation.level` `read_uncommitted` in `consumerProperties`, if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. > Make sure offsets are sequential, since there is no offset gap check in Druid anymore. -> If your Kafka cluster enables ACLs, you can set `group.id` in `consumerProperties` to override the default auto generated group id. +> If your Kafka cluster enables consumer-group based ACLs, you can set `group.id` in `consumerProperties` to override the default auto generated group id. ## Tutorial