Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.connect.runtime.WorkerConfig;

import java.util.Map;
import java.util.Set;

import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;

Expand Down Expand Up @@ -272,6 +273,10 @@ public DistributedConfig(Map<String, String> props) {
super(CONFIG, props);
}

public static Set<String> configNames() {
return CONFIG.names();
}

public static void main(String[] args) {
System.out.println(CONFIG.toHtmlTable());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -77,7 +78,8 @@ public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig,
ErrorHandlingMetrics errorHandlingMetrics) {
String topic = sinkConfig.dlqTopicName();

try (AdminClient admin = AdminClient.create(workerConfig.originals())) {
Map<String, Object> adminProps = ConnectUtils.retainProducerConfigs(workerConfig.originals());
try (AdminClient admin = AdminClient.create(adminProps)) {
if (!admin.listTopics().names().get().contains(topic)) {
log.error("Topic {} doesn't exist. Will attempt to create topic.", topic);
NewTopic schemaTopicRequest = new NewTopic(topic, DLQ_NUM_DESIRED_PARTITIONS, sinkConfig.dlqTopicReplicationFactor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.connect.runtime.WorkerConfig;

import java.util.Map;
import java.util.Set;

public class StandaloneConfig extends WorkerConfig {
private static final ConfigDef CONFIG;
Expand All @@ -41,4 +42,8 @@ public class StandaloneConfig extends WorkerConfig {
public StandaloneConfig(Map<String, String> props) {
super(CONFIG, props);
}

public static Set<String> configNames() {
return CONFIG.names();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
Expand Down Expand Up @@ -411,16 +412,16 @@ public void putTargetState(String connector, TargetState state) {

// package private for testing
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) {
Map<String, Object> originals = config.originals();
Map<String, Object> producerProps = new HashMap<>(originals);
Map<String, Object> producerProps = ConnectUtils.retainProducerConfigs(config.originals());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
Map<String, Object> consumerProps = new HashMap<>(originals);

Map<String, Object> consumerProps = ConnectUtils.retainConsumerConfigs(config.originals());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

Map<String, Object> adminProps = new HashMap<>(originals);
Map<String, Object> adminProps = ConnectUtils.retainAdminClientConfigs(config.originals());
NewTopic topicDescription = TopicAdmin.defineTopic(topic).
compacted().
partitions(1).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConvertingFutureCallback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
Expand Down Expand Up @@ -67,17 +68,17 @@ public void configure(final WorkerConfig config) {

data = new HashMap<>();

Map<String, Object> originals = config.originals();
Map<String, Object> producerProps = new HashMap<>(originals);
Map<String, Object> producerProps = ConnectUtils.retainProducerConfigs(config.originals());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
producerProps = ConnectUtils.retainProducerConfigs(producerProps);

Map<String, Object> consumerProps = new HashMap<>(originals);
Map<String, Object> consumerProps = ConnectUtils.retainConsumerConfigs(config.originals());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

Map<String, Object> adminProps = new HashMap<>(originals);
Map<String, Object> adminProps = ConnectUtils.retainAdminClientConfigs(config.originals());
NewTopic topicDescription = TopicAdmin.defineTopic(topic).
compacted().
partitions(config.getInt(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.Table;
Expand Down Expand Up @@ -124,17 +125,16 @@ public void configure(final WorkerConfig config) {
if (this.topic == null || this.topic.trim().length() == 0)
throw new ConfigException("Must specify topic for connector status.");

Map<String, Object> originals = config.originals();
Map<String, Object> producerProps = new HashMap<>(originals);
Map<String, Object> producerProps = ConnectUtils.retainProducerConfigs(config.originals());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class

Map<String, Object> consumerProps = new HashMap<>(originals);
Map<String, Object> consumerProps = ConnectUtils.retainConsumerConfigs(config.originals());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

Map<String, Object> adminProps = new HashMap<>(originals);
Map<String, Object> adminProps = ConnectUtils.retainAdminClientConfigs(config.originals());
NewTopic topicDescription = TopicAdmin.defineTopic(topic).
compacted().
partitions(config.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,43 @@
package org.apache.kafka.connect.util;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;

public final class ConnectUtils {
private static final Logger log = LoggerFactory.getLogger(ConnectUtils.class);

private static final List<String> WORKER_CONFIG_PREFIXES = Collections.unmodifiableList(Arrays.asList(
"producer.",
"consumer.",
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG,
WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG,
WorkerConfig.REST_EXTENSION_CLASSES_CONFIG,
WorkerConfig.CONFIG_PROVIDERS_CONFIG
));

public static Long checkAndConvertTimestamp(Long timestamp) {
if (timestamp == null || timestamp >= 0)
return timestamp;
Expand Down Expand Up @@ -65,4 +89,176 @@ static String lookupKafkaClusterId(AdminClient adminClient) {
+ "Check worker's broker connection and security properties.", e);
}
}

/**
* Modify the supplied map of configurations to retain only the configuration that may apply to the
* {@link org.apache.kafka.clients.producer.Producer}, including extension components.
* This will remove all properties that are known to be for consumers, admin client, and Connect workers.
*
* @param configs the map of configurations to be modified; may not be null
* @return the supplied {@code configs} parameter, returned for convenience
*/
public static Map<String, Object> retainProducerConfigs(Map<String, Object> configs) {
return retainConfigs(configs, ConnectUtils::isProducerConfig);
}

/**
* Modify the supplied map of configurations to retain only the configuration that may apply to the
* {@link org.apache.kafka.clients.consumer.Consumer}, including extension components.
* This will remove all properties that are known to be for producers, admin client, and Connect workers.
*
* @param configs the map of configurations to be modified; may not be null
* @return the supplied {@code configs} parameter, returned for convenience
*/
public static Map<String, Object> retainConsumerConfigs(Map<String, Object> configs) {
return retainConfigs(configs, ConnectUtils::isConsumerConfig);
}

/**
* Modify the supplied map of configurations to retain only the configuration that may apply to the {@link AdminClient},
* including metric reporter configuration properties.
* This will remove all properties that are known to be for producers, consumers, and Connect workers.
*
* @param configs the map of configurations to be modified; may not be null
* @return the supplied {@code configs} parameter, returned for convenience
*/
public static Map<String, Object> retainAdminClientConfigs(Map<String, Object> configs) {
return retainConfigs(configs, ConnectUtils::isAdminClientConfig);
}

/**
* Modify the supplied map of configurations to retain only those configuration name-value pairs that satisfy the supplied predicate.
*
* @param configs the map of configurations to be modified; may not be null
* @param isValid a function that is used to determine which configuration properties to retain; may not be null
* @return the supplied {@code configs} parameter, returned for convenience
*/
public static Map<String, Object> retainConfigs(Map<String, Object> configs, Predicate<String> isValid) {
Iterator<Entry<String, Object>> entryIter = configs.entrySet().iterator();
while (entryIter.hasNext()) {
Map.Entry<String, Object> entry = entryIter.next();
if (!isValid.test(entry.getKey())) {
log.debug("Not retaining the '{}' config property when passing to a subcomponent", entry.getKey());
entryIter.remove();
}
}
return configs;
}

/**
* Determine if the configuration with the supplied name is a known Connect {@link WorkerConfig} property, including properties
* that are prefixed and passed to various extension. Note that the {@link org.apache.kafka.connect.rest.ConnectRestExtension
* REST Extension} uses <em>unprefixed</em> properties, so this method treats those as unknown.
*
* @param key the name of the configuration property
* @return true if the named configuration property is known to be a property used by Connect workers, or false otherwise
*/
protected static boolean isKnownWorkerConfig(String key) {
if (key == null) {
return false;
}
if (DistributedConfig.configNames().contains(key) || StandaloneConfig.configNames().contains(key)) {
return true;
}
// Include any property that begins with a known prefix
for (String prefix : WORKER_CONFIG_PREFIXES) {
if (key.startsWith(prefix)) {
return true;
}
}
// REST Extension properties are not prefixed, so we don't know what they are
return false;
}

/**
* Determine if the configuration with the supplied name is a known {@link ProducerConfig} property. Producers use metric reporters,
* interceptors, partitioners, and key and value serializers, to which the producer passes all of its (unprefixed) properties.
* Therefore, this method will consider all property names unknown to the {@link ProducerConfig}, {@link ConsumerConfig},
* {@link AdminClientConfig}, and {@link #isKnownWorkerConfig(String) known worker configs} to be valid producer properties.
*
* @param key the name of the configuration property
* @return true if the named configuration property is known to be a producer property or known to not be a consumer, admin client,
* or worker configuration; or false otherwise
*/
protected static boolean isProducerConfig(String key) {
if (key == null) {
return false;
}
if (ProducerConfig.configNames().contains(key)) {
return true;
}
if (ConsumerConfig.configNames().contains(key)) {
return false;
}
if (AdminClientConfig.configNames().contains(key)) {
return false;
}
if (isKnownWorkerConfig(key)) {
return false;
}
// Producers can use metrics reporters, interceptors, and other extensions that take non-prefixed properties,
// so we have to include all non-prefixed properties
return true;
}

/**
* Determine if the configuration with the supplied name is a known {@link ConsumerConfig} property. Consumers use metric reporters,
* interceptors, partitioners, and key and value deserializers, to which the consumer passes all of its (unprefixed) properties.
* Therefore, this method will consider all property names unknown to the {@link ProducerConfig}, {@link ConsumerConfig},
* {@link AdminClientConfig}, and {@link #isKnownWorkerConfig(String) known worker configs} to be valid consumer properties.
*
* @param key the name of the configuration property
* @return true if the named configuration property is known to be a consumer property or known to not be a producer, admin client,
* or worker configuration; or false otherwise
*/
protected static boolean isConsumerConfig(String key) {
if (key == null) {
return false;
}
if (ConsumerConfig.configNames().contains(key)) {
return true;
}
if (ProducerConfig.configNames().contains(key)) {
return false;
}
if (AdminClientConfig.configNames().contains(key)) {
return false;
}
if (isKnownWorkerConfig(key)) {
return false;
}
// Consumers can use metrics reporters, interceptors, and other extensions that take non-prefixed properties,
// so we have to include all non-prefixed properties
return true;
}

/**
* Determine if the configuration with the supplied name is a known {@link AdminClientConfig} property. Admin clients use metric
* reporters to which the admin client passes all of its (unprefixed) properties.
* Therefore, this method will consider all property names unknown to the {@link AdminClientConfig}, {@link ConsumerConfig},
* {@link ProducerConfig}, and {@link #isKnownWorkerConfig(String) known worker configs} to be valid admin client properties.
*
* @param key the name of the configuration property
* @return true if the named configuration property is known to be a admin client property or known to not be a producer, consumer,
* or worker configuration; or false otherwise
*/
protected static boolean isAdminClientConfig(String key) {
if (key == null) {
return false;
}
if (AdminClientConfig.configNames().contains(key)) {
return true;
}
if (ProducerConfig.configNames().contains(key)) {
return false;
}
if (ConsumerConfig.configNames().contains(key)) {
return false;
}
if (isKnownWorkerConfig(key)) {
return false;
}
// AdminClient can use metrics reporters that take non-prefixed properties, so we have to include all non-prefixed properties
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ public static NewTopicBuilder defineTopic(String topicName) {
* @param adminConfig the configuration for the {@link AdminClient}
*/
public TopicAdmin(Map<String, Object> adminConfig) {
this(adminConfig, AdminClient.create(adminConfig));
// Prevent logging unused config warnings
this(adminConfig, AdminClient.create(ConnectUtils.retainAdminClientConfigs(adminConfig)));
}

// visible for testing
Expand Down
Loading