From 991e01c07e542c7aee77b606549ba5132124cc32 Mon Sep 17 00:00:00 2001 From: Julian Jaffe Date: Mon, 13 Apr 2020 10:36:06 -0700 Subject: [PATCH 1/2] Allow bootstrap.servers to be provided for Kafka ingestion. --- .../org/apache/druid/indexing/kafka/KafkaRecordSupplier.java | 3 ++- .../indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 3ffe7fa83ec0..e2a420df6d38 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -194,7 +194,8 @@ public static void addConsumerPropertiesFromConfig( String propertyKey = entry.getKey(); if (propertyKey.equals(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY) || propertyKey.equals(KafkaSupervisorIOConfig.KEY_STORE_PASSWORD_KEY) - || propertyKey.equals(KafkaSupervisorIOConfig.KEY_PASSWORD_KEY)) { + || propertyKey.equals(KafkaSupervisorIOConfig.KEY_PASSWORD_KEY) + || propertyKey.equals(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY)) { PasswordProvider configPasswordProvider = configMapper.convertValue( entry.getValue(), PasswordProvider.class diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java index 50866890b0f1..b5a0488c739f 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java @@ -171,7 +171,7 @@ public void testSerdeForConsumerPropertiesWithPasswords() throws Exception String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + " \"topic\": \"my-topic\",\n" - + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\",\n" + + " \"consumerProperties\": {\"bootstrap.servers\":{\"type\": \"default\", \"password\": \"localhost:9092\"},\n" + " \"ssl.truststore.password\":{\"type\": \"default\", \"password\": \"mytruststorepassword\"},\n" + " \"ssl.keystore.password\":{\"type\": \"default\", \"password\": \"mykeystorepassword\"},\n" + " \"ssl.key.password\":\"mykeypassword\"}\n" From b8417dc6593e64fac21a440875a997ec0872fa18 Mon Sep 17 00:00:00 2001 From: Julian Jaffe Date: Mon, 13 Apr 2020 10:47:06 -0700 Subject: [PATCH 2/2] Update documentation to reflect that bootstrap.servers can be provided. --- docs/development/extensions-core/kafka-ingestion.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index 12d937cee24a..0d38aea9d922 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -194,7 +194,7 @@ For Concise bitmaps: |-----|----|-----------|--------| |`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes| |`inputFormat`|Object|[`inputFormat`](../../ingestion/data-formats.md#input-format) to specify how to parse input data. See [the below section](#specifying-data-format) for details about specifying the input format.|yes| -|`consumerProperties`|Map|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `:,:,...`. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes| +|`consumerProperties`|Map|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `:,:,...`, which may be either provided as a [Password Provider](../../operations/password-provider.md) or as a String. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes| |`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)| |`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against process failure.|no (default == 1)| |`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See 'Capacity Planning' below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)|