From 0842256390d21f1c8678fea3f4d10d547592a05f Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Thu, 20 Aug 2020 09:10:29 -0700 Subject: [PATCH 1/4] introduce DynamicConfigProvider interface and make kafka consumer props extensible --- .../druid/metadata/DynamicConfigProvider.java | 39 +++++++++++++++ .../MapBasedDynamicConfigProvider.java | 47 +++++++++++++++++++ .../MapBasedDynamicConfigProviderTest.java | 44 +++++++++++++++++ .../indexing/kafka/KafkaRecordSupplier.java | 34 ++++++++++---- .../supervisor/KafkaSupervisorIOConfig.java | 1 + .../kafka/KafkaRecordSupplierTest.java | 31 ++++++++++++ 6 files changed, 186 insertions(+), 10 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/metadata/DynamicConfigProvider.java create mode 100644 core/src/main/java/org/apache/druid/metadata/MapBasedDynamicConfigProvider.java create mode 100644 core/src/test/java/org/apache/druid/metadata/MapBasedDynamicConfigProviderTest.java diff --git a/core/src/main/java/org/apache/druid/metadata/DynamicConfigProvider.java b/core/src/main/java/org/apache/druid/metadata/DynamicConfigProvider.java new file mode 100644 index 000000000000..89aa67fcfe78 --- /dev/null +++ b/core/src/main/java/org/apache/druid/metadata/DynamicConfigProvider.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.guice.annotations.ExtensionPoint; + +import java.util.Map; + +/** + * This is used to get [secure] configuration in various places in an extensible way. + */ +@ExtensionPoint +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = MapBasedDynamicConfigProvider.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "map", value = MapBasedDynamicConfigProvider.class), +}) +public interface DynamicConfigProvider +{ + Map getConfig(); +} diff --git a/core/src/main/java/org/apache/druid/metadata/MapBasedDynamicConfigProvider.java b/core/src/main/java/org/apache/druid/metadata/MapBasedDynamicConfigProvider.java new file mode 100644 index 000000000000..b69774b1ba29 --- /dev/null +++ b/core/src/main/java/org/apache/druid/metadata/MapBasedDynamicConfigProvider.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; + +import java.util.Map; + +public class MapBasedDynamicConfigProvider implements DynamicConfigProvider +{ + private final ImmutableMap config; + + @JsonCreator + public MapBasedDynamicConfigProvider( + @JsonProperty("config") Map config + ) + { + this.config = ImmutableMap.copyOf(config); + } + + + @Override + @JsonProperty + public Map getConfig() + { + return config; + } +} diff --git a/core/src/test/java/org/apache/druid/metadata/MapBasedDynamicConfigProviderTest.java b/core/src/test/java/org/apache/druid/metadata/MapBasedDynamicConfigProviderTest.java new file mode 100644 index 000000000000..53a3447dee50 --- /dev/null +++ b/core/src/test/java/org/apache/druid/metadata/MapBasedDynamicConfigProviderTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Test; + +public class MapBasedDynamicConfigProviderTest +{ + @Test + public void testSerde() throws Exception + { + DynamicConfigProvider original = new MapBasedDynamicConfigProvider(ImmutableMap.of("k", "v")); + + ObjectMapper jsonMapper = new ObjectMapper(); + + MapBasedDynamicConfigProvider recreated = (MapBasedDynamicConfigProvider) jsonMapper.readValue( + jsonMapper.writeValueAsString(original), + DynamicConfigProvider.class + ); + + Assert.assertEquals(1, recreated.getConfig().size()); + Assert.assertEquals("v", recreated.getConfig().get("k")); + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 3ffe7fa83ec0..45deee5e3c1e 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.metadata.DynamicConfigProvider; import org.apache.druid.metadata.PasswordProvider; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -192,16 +193,29 @@ public static void addConsumerPropertiesFromConfig( // Extract passwords before SSL connection to Kafka for (Map.Entry entry : consumerProperties.entrySet()) { String propertyKey = entry.getKey(); - if (propertyKey.equals(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY) - || propertyKey.equals(KafkaSupervisorIOConfig.KEY_STORE_PASSWORD_KEY) - || propertyKey.equals(KafkaSupervisorIOConfig.KEY_PASSWORD_KEY)) { - PasswordProvider configPasswordProvider = configMapper.convertValue( - entry.getValue(), - PasswordProvider.class - ); - properties.setProperty(propertyKey, configPasswordProvider.getPassword()); - } else { - properties.setProperty(propertyKey, String.valueOf(entry.getValue())); + + if (!KafkaSupervisorIOConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY.equals(propertyKey)) { + if (propertyKey.equals(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY) + || propertyKey.equals(KafkaSupervisorIOConfig.KEY_STORE_PASSWORD_KEY) + || propertyKey.equals(KafkaSupervisorIOConfig.KEY_PASSWORD_KEY)) { + PasswordProvider configPasswordProvider = configMapper.convertValue( + entry.getValue(), + PasswordProvider.class + ); + properties.setProperty(propertyKey, configPasswordProvider.getPassword()); + } else { + properties.setProperty(propertyKey, String.valueOf(entry.getValue())); + } + } + } + + Object dynamicConfigProviderJson = consumerProperties.get(KafkaSupervisorIOConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY); + if (dynamicConfigProviderJson != null) { + DynamicConfigProvider dynamicConfigProvider = configMapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); + Map dynamicConfig = dynamicConfigProvider.getConfig(); + + for (Map.Entry e : dynamicConfig.entrySet()) { + properties.setProperty(e.getKey(), e.getValue()); } } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index a1360b5c63f3..62c1e790657b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -32,6 +32,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig { + public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider"; public static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers"; public static final String TRUST_STORE_PASSWORD_KEY = "ssl.truststore.password"; public static final String KEY_STORE_PASSWORD_KEY = "ssl.keystore.password"; diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index d15803352321..491b04d15a52 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -24,10 +24,13 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.curator.test.TestingCluster; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; import org.apache.druid.indexing.kafka.test.TestBroker; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.metadata.DynamicConfigProvider; +import org.apache.druid.metadata.MapBasedDynamicConfigProvider; import org.apache.druid.segment.TestHelper; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -43,6 +46,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -582,6 +586,33 @@ public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShoul Assert.assertEquals(new Long(0), recordSupplier.getEarliestSequenceNumber(streamPartition)); } + @Test + public void testAddConsumerPropertiesFromConfig() throws Exception + { + DynamicConfigProvider dynamicConfigProvider = new MapBasedDynamicConfigProvider( + ImmutableMap.of("kafka.prop.2", "value.2", KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY, "pwd2") + ); + + Properties properties = new Properties(); + + Map consumerProperties = ImmutableMap.of( + KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY, "pwd1", + "kafka.prop.1", "value.1", + "druid.dynamic.config.provider", OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class) + ); + + KafkaRecordSupplier.addConsumerPropertiesFromConfig( + properties, + OBJECT_MAPPER, + consumerProperties + ); + + Assert.assertEquals(3, properties.size()); + Assert.assertEquals("value.1", properties.getProperty("kafka.prop.1")); + Assert.assertEquals("value.2", properties.getProperty("kafka.prop.2")); + Assert.assertEquals("pwd2", properties.getProperty(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY)); + } + private void insertData() throws ExecutionException, InterruptedException { try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { From bc575c338f85a088214888f81b5e63d915243cd9 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 21 Aug 2020 23:14:47 -0700 Subject: [PATCH 2/4] fix intellij inspection error --- .../apache/druid/indexing/kafka/KafkaRecordSupplierTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index 491b04d15a52..2d16dc1a8179 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -587,7 +587,7 @@ public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShoul } @Test - public void testAddConsumerPropertiesFromConfig() throws Exception + public void testAddConsumerPropertiesFromConfig() { DynamicConfigProvider dynamicConfigProvider = new MapBasedDynamicConfigProvider( ImmutableMap.of("kafka.prop.2", "value.2", KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY, "pwd2") From 24fa183c413830a05c9b692526291f0a82a1e2e0 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 26 Aug 2020 09:56:32 -0700 Subject: [PATCH 3/4] make DynamicConfigProvider generic Change-Id: I2e3e89f8617b6fe7fc96859deca4011f609dc5a3 --- .../org/apache/druid/metadata/DynamicConfigProvider.java | 8 ++++---- ...gProvider.java => MapStringDynamicConfigProvider.java} | 4 ++-- ...rTest.java => MapStringDynamicConfigProviderTest.java} | 6 +++--- .../apache/druid/indexing/kafka/KafkaRecordSupplier.java | 1 + .../druid/indexing/kafka/KafkaRecordSupplierTest.java | 4 ++-- 5 files changed, 12 insertions(+), 11 deletions(-) rename core/src/main/java/org/apache/druid/metadata/{MapBasedDynamicConfigProvider.java => MapStringDynamicConfigProvider.java} (91%) rename core/src/test/java/org/apache/druid/metadata/{MapBasedDynamicConfigProviderTest.java => MapStringDynamicConfigProviderTest.java} (83%) diff --git a/core/src/main/java/org/apache/druid/metadata/DynamicConfigProvider.java b/core/src/main/java/org/apache/druid/metadata/DynamicConfigProvider.java index 89aa67fcfe78..52f032a0a195 100644 --- a/core/src/main/java/org/apache/druid/metadata/DynamicConfigProvider.java +++ b/core/src/main/java/org/apache/druid/metadata/DynamicConfigProvider.java @@ -29,11 +29,11 @@ * This is used to get [secure] configuration in various places in an extensible way. */ @ExtensionPoint -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = MapBasedDynamicConfigProvider.class) +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = MapStringDynamicConfigProvider.class) @JsonSubTypes(value = { - @JsonSubTypes.Type(name = "map", value = MapBasedDynamicConfigProvider.class), + @JsonSubTypes.Type(name = "mapString", value = MapStringDynamicConfigProvider.class), }) -public interface DynamicConfigProvider +public interface DynamicConfigProvider { - Map getConfig(); + Map getConfig(); } diff --git a/core/src/main/java/org/apache/druid/metadata/MapBasedDynamicConfigProvider.java b/core/src/main/java/org/apache/druid/metadata/MapStringDynamicConfigProvider.java similarity index 91% rename from core/src/main/java/org/apache/druid/metadata/MapBasedDynamicConfigProvider.java rename to core/src/main/java/org/apache/druid/metadata/MapStringDynamicConfigProvider.java index b69774b1ba29..1ef5a15e8a97 100644 --- a/core/src/main/java/org/apache/druid/metadata/MapBasedDynamicConfigProvider.java +++ b/core/src/main/java/org/apache/druid/metadata/MapStringDynamicConfigProvider.java @@ -25,12 +25,12 @@ import java.util.Map; -public class MapBasedDynamicConfigProvider implements DynamicConfigProvider +public class MapStringDynamicConfigProvider implements DynamicConfigProvider { private final ImmutableMap config; @JsonCreator - public MapBasedDynamicConfigProvider( + public MapStringDynamicConfigProvider( @JsonProperty("config") Map config ) { diff --git a/core/src/test/java/org/apache/druid/metadata/MapBasedDynamicConfigProviderTest.java b/core/src/test/java/org/apache/druid/metadata/MapStringDynamicConfigProviderTest.java similarity index 83% rename from core/src/test/java/org/apache/druid/metadata/MapBasedDynamicConfigProviderTest.java rename to core/src/test/java/org/apache/druid/metadata/MapStringDynamicConfigProviderTest.java index 53a3447dee50..cdf46e12b36f 100644 --- a/core/src/test/java/org/apache/druid/metadata/MapBasedDynamicConfigProviderTest.java +++ b/core/src/test/java/org/apache/druid/metadata/MapStringDynamicConfigProviderTest.java @@ -24,16 +24,16 @@ import org.junit.Assert; import org.junit.Test; -public class MapBasedDynamicConfigProviderTest +public class MapStringDynamicConfigProviderTest { @Test public void testSerde() throws Exception { - DynamicConfigProvider original = new MapBasedDynamicConfigProvider(ImmutableMap.of("k", "v")); + DynamicConfigProvider original = new MapStringDynamicConfigProvider(ImmutableMap.of("k", "v")); ObjectMapper jsonMapper = new ObjectMapper(); - MapBasedDynamicConfigProvider recreated = (MapBasedDynamicConfigProvider) jsonMapper.readValue( + MapStringDynamicConfigProvider recreated = (MapStringDynamicConfigProvider) jsonMapper.readValue( jsonMapper.writeValueAsString(original), DynamicConfigProvider.class ); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 45deee5e3c1e..ad870f3ef000 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -209,6 +209,7 @@ public static void addConsumerPropertiesFromConfig( } } + // Additional DynamicConfigProvider based extensible support for all consumer properties Object dynamicConfigProviderJson = consumerProperties.get(KafkaSupervisorIOConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY); if (dynamicConfigProviderJson != null) { DynamicConfigProvider dynamicConfigProvider = configMapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index 2d16dc1a8179..ca152210f57a 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -30,7 +30,7 @@ import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.metadata.DynamicConfigProvider; -import org.apache.druid.metadata.MapBasedDynamicConfigProvider; +import org.apache.druid.metadata.MapStringDynamicConfigProvider; import org.apache.druid.segment.TestHelper; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -589,7 +589,7 @@ public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShoul @Test public void testAddConsumerPropertiesFromConfig() { - DynamicConfigProvider dynamicConfigProvider = new MapBasedDynamicConfigProvider( + DynamicConfigProvider dynamicConfigProvider = new MapStringDynamicConfigProvider( ImmutableMap.of("kafka.prop.2", "value.2", KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY, "pwd2") ); From 1cebe8bb51924a4066066ab2b6fb155588c1c4f9 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 2 Dec 2020 15:23:52 -0800 Subject: [PATCH 4/4] deprecate PasswordProvider --- .../java/org/apache/druid/metadata/PasswordProvider.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/java/org/apache/druid/metadata/PasswordProvider.java b/core/src/main/java/org/apache/druid/metadata/PasswordProvider.java index cd3a01c429f0..ab7ec7c22872 100644 --- a/core/src/main/java/org/apache/druid/metadata/PasswordProvider.java +++ b/core/src/main/java/org/apache/druid/metadata/PasswordProvider.java @@ -26,7 +26,13 @@ /** * Implement this for different ways to (optionally securely) access secrets. + * + * Any further use case of extensible configuration/secrets must use {@link DynamicConfigProvider} interface. Users + * may still implement this interface for existing use cases till https://github.com/apache/druid/issues/9351 is + * resolved. + * */ +@Deprecated @ExtensionPoint @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = DefaultPasswordProvider.class) @JsonSubTypes(value = {