From 088686d647e8e9a55f6f850aca5e21a959342548 Mon Sep 17 00:00:00 2001 From: Randall Hauch Date: Fri, 2 Nov 2018 18:44:11 -0500 Subject: [PATCH] KAFKA-7509: Changed to avoid passing nonapplicable properties to producer, consumer, and admin client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The producer, consumer, and admin client log properties that are supplied but unused by the producer. Previously, Connect would pass many of its worker configuration properties into the producer, consumer, and admin client used for internal topics, resulting in lots of log warnings about unused config properties. With this change, Connect attempts to filter out the worker’s configuration properties that are not applicable to the producer, consumer, or admin client used for _internal_ topics. (Connect is already including only producer and consumer properties when creating those clients for connectors, since those properties are prefixed in the worker config.) For the most part, this is relatively straightforward, since there are some top-level worker-specific properties that can be removed, and most extension-specific properties have Connect-specific properties. Unfortunately, the REST extension is the only type of connect extension that uses unprefixed properties from the worker config, so any it is not possible to remove those from the properties passed to the producer, consumer, and admin clients. Hopefully, REST extensions are prevalant yet, and this will mean most users may not see any warnings about unused properties in the producer, consumer, and admin client. Removing the Connect worker configs is one step. The other is for the producer to remove the any properties that are specific to the consumer and admin client. Likewise, for the consumer we have to remove any properties that are specific to the producer and admin client, and for the admin client remove any properties that are specific to the producer and consumer. Note that any property that is unknown (e.g., properties for REST extension, interceptors, metric reporters, serdes, partitioners, etc.) must be passed to the producer, consumer, and admin client. All of these — except for the REST extension properties — should indeed be used by the producer and consumer. But, since the admin client only supports metric reporters, any properties for interceptors, serdes, partitioners and REST extension will also be logged as unused. This is about the best we can do at this point. All of this filtering logic was added to the `ConnectUtils` class, allowing the logic to be easily unit tested. All changes are limited to Kafka Connect, and will work with all client and Connect extensions (passing them to the clients if they are unknown). --- .../distributed/DistributedConfig.java | 5 + .../errors/DeadLetterQueueReporter.java | 4 +- .../runtime/standalone/StandaloneConfig.java | 5 + .../storage/KafkaConfigBackingStore.java | 9 +- .../storage/KafkaOffsetBackingStore.java | 9 +- .../storage/KafkaStatusBackingStore.java | 8 +- .../kafka/connect/util/ConnectUtils.java | 196 ++++++++++++++++++ .../apache/kafka/connect/util/TopicAdmin.java | 3 +- .../kafka/connect/util/ConnectUtilsTest.java | 192 +++++++++++++++++ 9 files changed, 417 insertions(+), 14 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java index dc9017beeda75..db6573fb455b7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java @@ -21,6 +21,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig; import java.util.Map; +import java.util.Set; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; @@ -272,6 +273,10 @@ public DistributedConfig(Map props) { super(CONFIG, props); } + public static Set configNames() { + return CONFIG.names(); + } + public static void main(String[] args) { System.out.println(CONFIG.toHtmlTable()); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java index 231226997833d..143fcd8afb438 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java @@ -27,6 +27,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.SinkConnectorConfig; import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,7 +78,8 @@ public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig, ErrorHandlingMetrics errorHandlingMetrics) { String topic = sinkConfig.dlqTopicName(); - try (AdminClient admin = AdminClient.create(workerConfig.originals())) { + Map adminProps = ConnectUtils.retainProducerConfigs(workerConfig.originals()); + try (AdminClient admin = AdminClient.create(adminProps)) { if (!admin.listTopics().names().get().contains(topic)) { log.error("Topic {} doesn't exist. Will attempt to create topic.", topic); NewTopic schemaTopicRequest = new NewTopic(topic, DLQ_NUM_DESIRED_PARTITIONS, sinkConfig.dlqTopicReplicationFactor()); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java index f950edf53542c..313976bde6958 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java @@ -20,6 +20,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig; import java.util.Map; +import java.util.Set; public class StandaloneConfig extends WorkerConfig { private static final ConfigDef CONFIG; @@ -41,4 +42,8 @@ public class StandaloneConfig extends WorkerConfig { public StandaloneConfig(Map props) { super(CONFIG, props); } + + public static Set configNames() { + return CONFIG.names(); + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index e7ee632638d2e..8fc2b8714605d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -39,6 +39,7 @@ import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.TopicAdmin; @@ -411,16 +412,16 @@ public void putTargetState(String connector, TargetState state) { // package private for testing KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) { - Map originals = config.originals(); - Map producerProps = new HashMap<>(originals); + Map producerProps = ConnectUtils.retainProducerConfigs(config.originals()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); - Map consumerProps = new HashMap<>(originals); + + Map consumerProps = ConnectUtils.retainConsumerConfigs(config.originals()); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - Map adminProps = new HashMap<>(originals); + Map adminProps = ConnectUtils.retainAdminClientConfigs(config.originals()); NewTopic topicDescription = TopicAdmin.defineTopic(topic). compacted(). partitions(1). diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java index 195c498edb76f..87d5da7147af0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java @@ -28,6 +28,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConvertingFutureCallback; import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.TopicAdmin; @@ -67,17 +68,17 @@ public void configure(final WorkerConfig config) { data = new HashMap<>(); - Map originals = config.originals(); - Map producerProps = new HashMap<>(originals); + Map producerProps = ConnectUtils.retainProducerConfigs(config.originals()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); + producerProps = ConnectUtils.retainProducerConfigs(producerProps); - Map consumerProps = new HashMap<>(originals); + Map consumerProps = ConnectUtils.retainConsumerConfigs(config.originals()); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - Map adminProps = new HashMap<>(originals); + Map adminProps = ConnectUtils.retainAdminClientConfigs(config.originals()); NewTopic topicDescription = TopicAdmin.defineTopic(topic). compacted(). partitions(config.getInt(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG)). diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index 6710808f9a97a..85705bef336ec 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -39,6 +39,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.Table; @@ -124,17 +125,16 @@ public void configure(final WorkerConfig config) { if (this.topic == null || this.topic.trim().length() == 0) throw new ConfigException("Must specify topic for connector status."); - Map originals = config.originals(); - Map producerProps = new HashMap<>(originals); + Map producerProps = ConnectUtils.retainProducerConfigs(config.originals()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class - Map consumerProps = new HashMap<>(originals); + Map consumerProps = ConnectUtils.retainConsumerConfigs(config.originals()); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - Map adminProps = new HashMap<>(originals); + Map adminProps = ConnectUtils.retainAdminClientConfigs(config.originals()); NewTopic topicDescription = TopicAdmin.defineTopic(topic). compacted(). partitions(config.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG)). diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java index 9f30236fdee47..fc7eace77172e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java @@ -17,19 +17,43 @@ package org.apache.kafka.connect.util; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ExecutionException; +import java.util.function.Predicate; public final class ConnectUtils { private static final Logger log = LoggerFactory.getLogger(ConnectUtils.class); + private static final List WORKER_CONFIG_PREFIXES = Collections.unmodifiableList(Arrays.asList( + "producer.", + "consumer.", + WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, + WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, + WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, + WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, + WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, + WorkerConfig.REST_EXTENSION_CLASSES_CONFIG, + WorkerConfig.CONFIG_PROVIDERS_CONFIG + )); + public static Long checkAndConvertTimestamp(Long timestamp) { if (timestamp == null || timestamp >= 0) return timestamp; @@ -65,4 +89,176 @@ static String lookupKafkaClusterId(AdminClient adminClient) { + "Check worker's broker connection and security properties.", e); } } + + /** + * Modify the supplied map of configurations to retain only the configuration that may apply to the + * {@link org.apache.kafka.clients.producer.Producer}, including extension components. + * This will remove all properties that are known to be for consumers, admin client, and Connect workers. + * + * @param configs the map of configurations to be modified; may not be null + * @return the supplied {@code configs} parameter, returned for convenience + */ + public static Map retainProducerConfigs(Map configs) { + return retainConfigs(configs, ConnectUtils::isProducerConfig); + } + + /** + * Modify the supplied map of configurations to retain only the configuration that may apply to the + * {@link org.apache.kafka.clients.consumer.Consumer}, including extension components. + * This will remove all properties that are known to be for producers, admin client, and Connect workers. + * + * @param configs the map of configurations to be modified; may not be null + * @return the supplied {@code configs} parameter, returned for convenience + */ + public static Map retainConsumerConfigs(Map configs) { + return retainConfigs(configs, ConnectUtils::isConsumerConfig); + } + + /** + * Modify the supplied map of configurations to retain only the configuration that may apply to the {@link AdminClient}, + * including metric reporter configuration properties. + * This will remove all properties that are known to be for producers, consumers, and Connect workers. + * + * @param configs the map of configurations to be modified; may not be null + * @return the supplied {@code configs} parameter, returned for convenience + */ + public static Map retainAdminClientConfigs(Map configs) { + return retainConfigs(configs, ConnectUtils::isAdminClientConfig); + } + + /** + * Modify the supplied map of configurations to retain only those configuration name-value pairs that satisfy the supplied predicate. + * + * @param configs the map of configurations to be modified; may not be null + * @param isValid a function that is used to determine which configuration properties to retain; may not be null + * @return the supplied {@code configs} parameter, returned for convenience + */ + public static Map retainConfigs(Map configs, Predicate isValid) { + Iterator> entryIter = configs.entrySet().iterator(); + while (entryIter.hasNext()) { + Map.Entry entry = entryIter.next(); + if (!isValid.test(entry.getKey())) { + log.debug("Not retaining the '{}' config property when passing to a subcomponent", entry.getKey()); + entryIter.remove(); + } + } + return configs; + } + + /** + * Determine if the configuration with the supplied name is a known Connect {@link WorkerConfig} property, including properties + * that are prefixed and passed to various extension. Note that the {@link org.apache.kafka.connect.rest.ConnectRestExtension + * REST Extension} uses unprefixed properties, so this method treats those as unknown. + * + * @param key the name of the configuration property + * @return true if the named configuration property is known to be a property used by Connect workers, or false otherwise + */ + protected static boolean isKnownWorkerConfig(String key) { + if (key == null) { + return false; + } + if (DistributedConfig.configNames().contains(key) || StandaloneConfig.configNames().contains(key)) { + return true; + } + // Include any property that begins with a known prefix + for (String prefix : WORKER_CONFIG_PREFIXES) { + if (key.startsWith(prefix)) { + return true; + } + } + // REST Extension properties are not prefixed, so we don't know what they are + return false; + } + + /** + * Determine if the configuration with the supplied name is a known {@link ProducerConfig} property. Producers use metric reporters, + * interceptors, partitioners, and key and value serializers, to which the producer passes all of its (unprefixed) properties. + * Therefore, this method will consider all property names unknown to the {@link ProducerConfig}, {@link ConsumerConfig}, + * {@link AdminClientConfig}, and {@link #isKnownWorkerConfig(String) known worker configs} to be valid producer properties. + * + * @param key the name of the configuration property + * @return true if the named configuration property is known to be a producer property or known to not be a consumer, admin client, + * or worker configuration; or false otherwise + */ + protected static boolean isProducerConfig(String key) { + if (key == null) { + return false; + } + if (ProducerConfig.configNames().contains(key)) { + return true; + } + if (ConsumerConfig.configNames().contains(key)) { + return false; + } + if (AdminClientConfig.configNames().contains(key)) { + return false; + } + if (isKnownWorkerConfig(key)) { + return false; + } + // Producers can use metrics reporters, interceptors, and other extensions that take non-prefixed properties, + // so we have to include all non-prefixed properties + return true; + } + + /** + * Determine if the configuration with the supplied name is a known {@link ConsumerConfig} property. Consumers use metric reporters, + * interceptors, partitioners, and key and value deserializers, to which the consumer passes all of its (unprefixed) properties. + * Therefore, this method will consider all property names unknown to the {@link ProducerConfig}, {@link ConsumerConfig}, + * {@link AdminClientConfig}, and {@link #isKnownWorkerConfig(String) known worker configs} to be valid consumer properties. + * + * @param key the name of the configuration property + * @return true if the named configuration property is known to be a consumer property or known to not be a producer, admin client, + * or worker configuration; or false otherwise + */ + protected static boolean isConsumerConfig(String key) { + if (key == null) { + return false; + } + if (ConsumerConfig.configNames().contains(key)) { + return true; + } + if (ProducerConfig.configNames().contains(key)) { + return false; + } + if (AdminClientConfig.configNames().contains(key)) { + return false; + } + if (isKnownWorkerConfig(key)) { + return false; + } + // Consumers can use metrics reporters, interceptors, and other extensions that take non-prefixed properties, + // so we have to include all non-prefixed properties + return true; + } + + /** + * Determine if the configuration with the supplied name is a known {@link AdminClientConfig} property. Admin clients use metric + * reporters to which the admin client passes all of its (unprefixed) properties. + * Therefore, this method will consider all property names unknown to the {@link AdminClientConfig}, {@link ConsumerConfig}, + * {@link ProducerConfig}, and {@link #isKnownWorkerConfig(String) known worker configs} to be valid admin client properties. + * + * @param key the name of the configuration property + * @return true if the named configuration property is known to be a admin client property or known to not be a producer, consumer, + * or worker configuration; or false otherwise + */ + protected static boolean isAdminClientConfig(String key) { + if (key == null) { + return false; + } + if (AdminClientConfig.configNames().contains(key)) { + return true; + } + if (ProducerConfig.configNames().contains(key)) { + return false; + } + if (ConsumerConfig.configNames().contains(key)) { + return false; + } + if (isKnownWorkerConfig(key)) { + return false; + } + // AdminClient can use metrics reporters that take non-prefixed properties, so we have to include all non-prefixed properties + return true; + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index ad21561baf259..00097147bf140 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -165,7 +165,8 @@ public static NewTopicBuilder defineTopic(String topicName) { * @param adminConfig the configuration for the {@link AdminClient} */ public TopicAdmin(Map adminConfig) { - this(adminConfig, AdminClient.create(adminConfig)); + // Prevent logging unused config warnings + this(adminConfig, AdminClient.create(ConnectUtils.retainAdminClientConfigs(adminConfig))); } // visible for testing diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java index 6be3525380b61..1a0dbd83505d2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java @@ -16,16 +16,28 @@ */ package org.apache.kafka.connect.util; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.Node; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.MockConnectMetrics.MockMetricsReporter; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.apache.kafka.connect.storage.StringConverter; import org.junit.Test; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class ConnectUtilsTest { @@ -60,4 +72,184 @@ public void testLookupKafkaClusterIdTimeout() { ConnectUtils.lookupKafkaClusterId(adminClient); } + @Test + public void shouldIncludeKnownWorkerConfigs() { + for (String configName : DistributedConfig.configNames()) { + assertTrue("Should allow " + configName, ConnectUtils.isKnownWorkerConfig(configName)); + } + for (String configName : StandaloneConfig.configNames()) { + assertTrue("Should allow " + configName, ConnectUtils.isKnownWorkerConfig(configName)); + } + assertTrue(ConnectUtils.isKnownWorkerConfig(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG + ".something")); + assertTrue(ConnectUtils.isKnownWorkerConfig(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG + ".something")); + assertTrue(ConnectUtils.isKnownWorkerConfig(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG + ".something")); + assertTrue(ConnectUtils.isKnownWorkerConfig(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG + ".something")); + assertTrue(ConnectUtils.isKnownWorkerConfig(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG + ".something")); + assertTrue(ConnectUtils.isKnownWorkerConfig(WorkerConfig.CONFIG_PROVIDERS_CONFIG + ".something")); + assertTrue(ConnectUtils.isKnownWorkerConfig(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG + ".something")); + + for (String configName : ProducerConfig.configNames()) { + assertTrue("Should allow " + configName, ConnectUtils.isKnownWorkerConfig("producer." + configName)); + } + for (String configName : ConsumerConfig.configNames()) { + assertTrue("Should allow " + configName, ConnectUtils.isKnownWorkerConfig("consumer." + configName)); + } + + // We don't consider extra configs to be known, even though the REST extensions use them + assertFalse(ConnectUtils.isKnownWorkerConfig("some.arbitrary.config")); + } + + @Test + public void shouldIncludeProducerConfigsAndUnknownConfigs() { + for (String configName : ProducerConfig.configNames()) { + assertTrue("Should allow " + configName, ConnectUtils.isProducerConfig(configName)); + } + assertTrue(ConnectUtils.isProducerConfig("custom.config")); + + // Exclude Connect worker configs + for (String configName : DistributedConfig.configNames()) { + if (ProducerConfig.configNames().contains(configName)) { + continue; + } + assertFalse("Should not allow " + configName, ConnectUtils.isProducerConfig(configName)); + } + for (String configName : StandaloneConfig.configNames()) { + if (ProducerConfig.configNames().contains(configName)) { + continue; + } + assertFalse("Should not allow " + configName, ConnectUtils.isProducerConfig(configName)); + } + assertFalse(ConnectUtils.isProducerConfig(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isProducerConfig(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isProducerConfig(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isProducerConfig(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isProducerConfig(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isProducerConfig(WorkerConfig.CONFIG_PROVIDERS_CONFIG + ".something")); + assertFalse(ConnectUtils.isProducerConfig(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG + ".something")); + + // Exclude Consumer-specific configs + assertFalse(ConnectUtils.isProducerConfig(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); + } + + @Test + public void shouldIncludeConsumerConfigsAndUnknownConfigs() { + for (String configName : ConsumerConfig.configNames()) { + assertTrue("Should allow " + configName, ConnectUtils.isConsumerConfig(configName)); + } + assertTrue(ConnectUtils.isConsumerConfig("custom.config")); + + // Exclude Connect worker configs + for (String configName : DistributedConfig.configNames()) { + if (ConsumerConfig.configNames().contains(configName)) { + continue; + } + assertFalse("Should not allow " + configName, ConnectUtils.isConsumerConfig(configName)); + } + for (String configName : StandaloneConfig.configNames()) { + if (ConsumerConfig.configNames().contains(configName)) { + continue; + } + assertFalse("Should not allow " + configName, ConnectUtils.isConsumerConfig(configName)); + } + assertFalse(ConnectUtils.isConsumerConfig(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isConsumerConfig(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isConsumerConfig(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isConsumerConfig(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isConsumerConfig(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isConsumerConfig(WorkerConfig.CONFIG_PROVIDERS_CONFIG + ".something")); + assertFalse(ConnectUtils.isConsumerConfig(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG + ".something")); + + // Exclude Producer-specific configs + assertFalse(ConnectUtils.isConsumerConfig(ProducerConfig.ACKS_CONFIG)); + } + + @Test + public void shouldIncludeAdminClientConfigs() { + for (String configName : AdminClientConfig.configNames()) { + assertTrue(ConnectUtils.isAdminClientConfig(configName)); + } + + // Exclude Connect worker configs + for (String configName : DistributedConfig.configNames()) { + if (AdminClientConfig.configNames().contains(configName)) { + continue; + } + assertFalse("Should not allow " + configName, ConnectUtils.isAdminClientConfig(configName)); + } + for (String configName : StandaloneConfig.configNames()) { + if (AdminClientConfig.configNames().contains(configName)) { + continue; + } + assertFalse("Should not allow " + configName, ConnectUtils.isAdminClientConfig(configName)); + } + assertFalse(ConnectUtils.isAdminClientConfig(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isAdminClientConfig(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isAdminClientConfig(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isAdminClientConfig(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isAdminClientConfig(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isAdminClientConfig(WorkerConfig.CONFIG_PROVIDERS_CONFIG + ".something")); + assertFalse(ConnectUtils.isAdminClientConfig(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG + ".something")); + + // Exclude Producer-specific configs + for (String configName : ProducerConfig.configNames()) { + if (AdminClientConfig.configNames().contains(configName)) { + continue; + } + assertFalse("Should not allow " + configName, ConnectUtils.isAdminClientConfig(configName)); + } + + // Exclude Consumer-specific configs + for (String configName : ConsumerConfig.configNames()) { + if (AdminClientConfig.configNames().contains(configName)) { + continue; + } + assertFalse("Should not allow " + configName, ConnectUtils.isAdminClientConfig(configName)); + } + } + + @Test + public void removeNonAdminClientConfigurations() { + Map configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrap1"); + configs.put(AdminClientConfig.CLIENT_ID_CONFIG, "clientId"); + configs.put(AdminClientConfig.RETRIES_CONFIG, "1"); + configs.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100"); + configs.put(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + configs.put(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG + ".custom", "customValue"); + configs.put(ProducerConfig.ACKS_CONFIG, "all"); + configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + configs.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "my-config-topic"); + configs.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + configs.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG + ".custom", "foo"); + configs.put("producer." + ProducerConfig.ACKS_CONFIG, "all"); + configs.put("consumer." + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + configs.put("some.other.property", "value"); + configs.put("other.property", "value2"); + assertEquals(15, configs.size()); + Map filtered = ConnectUtils.retainAdminClientConfigs(new HashMap<>(configs)); + assertEquals(configs.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), + filtered.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)); + assertEquals(configs.get(AdminClientConfig.CLIENT_ID_CONFIG), + filtered.get(AdminClientConfig.CLIENT_ID_CONFIG)); + assertEquals(configs.get(AdminClientConfig.RETRIES_CONFIG), + filtered.get(AdminClientConfig.RETRIES_CONFIG)); + assertEquals(configs.get(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG), + filtered.get(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)); + assertEquals(configs.get(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG), + filtered.get(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG)); + assertEquals(configs.get(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG + ".custom"), + filtered.get(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG + ".custom")); + assertEquals(configs.get("some.other.property"), + filtered.get("some.other.property")); + assertEquals(configs.get("other.property"), + filtered.get("other.property")); + assertFalse(filtered.containsKey(ProducerConfig.ACKS_CONFIG)); + assertFalse(filtered.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); + assertFalse(filtered.containsKey(DistributedConfig.CONFIG_TOPIC_CONFIG)); + assertFalse(filtered.containsKey(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG)); + assertFalse(filtered.containsKey(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG + ".custom")); + assertEquals(configs.size() - 7, filtered.size()); + assertTrue(configs.keySet().containsAll(filtered.keySet())); + } + }