Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.connect.runtime;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.Config;
Expand All @@ -40,6 +42,7 @@
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.Converter;
Expand Down Expand Up @@ -349,9 +352,11 @@ public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id) {
status.workerId(), status.trace());
}

protected Map<String, ConfigValue> validateBasicConnectorConfig(Connector connector,
ConfigDef configDef,
Map<String, String> config) {
protected Map<String, ConfigValue> validateSinkConnectorConfig(SinkConnector connector, ConfigDef configDef, Map<String, String> config) {
return configDef.validateAll(config);
}

protected Map<String, ConfigValue> validateSourceConnectorConfig(SourceConnector connector, ConfigDef configDef, Map<String, String> config) {
return configDef.validateAll(config);
}

Expand Down Expand Up @@ -417,7 +422,23 @@ public Optional<RestartPlan> buildRestartPlan(RestartRequest request) {
conf == null ? ConnectorType.UNKNOWN : connectorTypeForClass(conf.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))
);
return Optional.of(new RestartPlan(request, stateInfo));
}

protected boolean connectorUsesConsumer(org.apache.kafka.connect.health.ConnectorType connectorType, Map<String, String> connProps) {
return connectorType == org.apache.kafka.connect.health.ConnectorType.SINK;
}

protected boolean connectorUsesAdmin(org.apache.kafka.connect.health.ConnectorType connectorType, Map<String, String> connProps) {
if (connectorType == org.apache.kafka.connect.health.ConnectorType.SOURCE) {
return SourceConnectorConfig.usesTopicCreation(connProps);
} else {
return SinkConnectorConfig.hasDlqTopicConfig(connProps);
}
}

protected boolean connectorUsesProducer(org.apache.kafka.connect.health.ConnectorType connectorType, Map<String, String> connProps) {
return connectorType == org.apache.kafka.connect.health.ConnectorType.SOURCE
|| SinkConnectorConfig.hasDlqTopicConfig(connProps);
}

ConfigInfos validateConnectorConfig(Map<String, String> connectorProps, boolean doLog) {
Expand All @@ -431,29 +452,28 @@ ConfigInfos validateConnectorConfig(Map<String, String> connectorProps, boolean
Connector connector = getConnector(connType);
org.apache.kafka.connect.health.ConnectorType connectorType;
ClassLoader savedLoader = plugins().compareAndSwapLoaders(connector);
ConfigDef enrichedConfigDef;
Map<String, ConfigValue> validatedConnectorConfig;
try {
ConfigDef baseConfigDef;
if (connector instanceof SourceConnector) {
baseConfigDef = SourceConnectorConfig.configDef();
connectorType = org.apache.kafka.connect.health.ConnectorType.SOURCE;
enrichedConfigDef = ConnectorConfig.enrich(plugins(), SourceConnectorConfig.configDef(), connectorProps, false);
validatedConnectorConfig = validateSourceConnectorConfig((SourceConnector) connector, enrichedConfigDef, connectorProps);
} else {
baseConfigDef = SinkConnectorConfig.configDef();
SinkConnectorConfig.validate(connectorProps);
connectorType = org.apache.kafka.connect.health.ConnectorType.SINK;
enrichedConfigDef = ConnectorConfig.enrich(plugins(), SinkConnectorConfig.configDef(), connectorProps, false);
validatedConnectorConfig = validateSinkConnectorConfig((SinkConnector) connector, enrichedConfigDef, connectorProps);
}
ConfigDef enrichedConfigDef = ConnectorConfig.enrich(plugins(), baseConfigDef, connectorProps, false);
Map<String, ConfigValue> validatedConnectorConfig = validateBasicConnectorConfig(
connector,
enrichedConfigDef,
connectorProps
);

connectorProps.entrySet().stream()
.filter(e -> e.getValue() == null)
.map(Map.Entry::getKey)
.forEach(prop ->
validatedConnectorConfig.computeIfAbsent(prop, ConfigValue::new)
.addErrorMessage("Null value can not be supplied as the configuration value.")
);

List<ConfigValue> configValues = new ArrayList<>(validatedConnectorConfig.values());
Map<String, ConfigKey> configKeys = new LinkedHashMap<>(enrichedConfigDef.configKeys());
Set<String> allGroups = new LinkedHashSet<>(enrichedConfigDef.groups());
Expand Down Expand Up @@ -487,40 +507,41 @@ ConfigInfos validateConnectorConfig(Map<String, String> connectorProps, boolean
ConfigInfos producerConfigInfos = null;
ConfigInfos consumerConfigInfos = null;
ConfigInfos adminConfigInfos = null;
if (connectorType.equals(org.apache.kafka.connect.health.ConnectorType.SOURCE)) {
producerConfigInfos = validateClientOverrides(connName,
ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX,
connectorConfig,
ProducerConfig.configDef(),
connector.getClass(),
connectorType,
ConnectorClientConfigRequest.ClientType.PRODUCER,
connectorClientConfigOverridePolicy);
return mergeConfigInfos(connType, configInfos, producerConfigInfos);
} else {
consumerConfigInfos = validateClientOverrides(connName,
ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
connectorConfig,
ProducerConfig.configDef(),
connector.getClass(),
connectorType,
ConnectorClientConfigRequest.ClientType.CONSUMER,
connectorClientConfigOverridePolicy);
// check if topic for dead letter queue exists
String topic = connectorProps.get(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG);
if (topic != null && !topic.isEmpty()) {
adminConfigInfos = validateClientOverrides(connName,
ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX,
connectorConfig,
ProducerConfig.configDef(),
connector.getClass(),
connectorType,
ConnectorClientConfigRequest.ClientType.ADMIN,
connectorClientConfigOverridePolicy);
}

if (connectorUsesProducer(connectorType, connectorProps)) {
producerConfigInfos = validateClientOverrides(
connName,
ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX,
connectorConfig,
ProducerConfig.configDef(),
connector.getClass(),
connectorType,
ConnectorClientConfigRequest.ClientType.PRODUCER,
connectorClientConfigOverridePolicy);
}
return mergeConfigInfos(connType, configInfos, consumerConfigInfos, adminConfigInfos);
if (connectorUsesAdmin(connectorType, connectorProps)) {
adminConfigInfos = validateClientOverrides(
connName,
ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX,
connectorConfig,
AdminClientConfig.configDef(),
connector.getClass(),
connectorType,
ConnectorClientConfigRequest.ClientType.ADMIN,
connectorClientConfigOverridePolicy);
}
if (connectorUsesConsumer(connectorType, connectorProps)) {
consumerConfigInfos = validateClientOverrides(
connName,
ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
connectorConfig,
ConsumerConfig.configDef(),
connector.getClass(),
connectorType,
ConnectorClientConfigRequest.ClientType.CONSUMER,
connectorClientConfigOverridePolicy);
}
return mergeConfigInfos(connType, configInfos, producerConfigInfos, consumerConfigInfos, adminConfigInfos);
} finally {
Plugins.compareAndSwapLoaders(savedLoader);
}
Expand Down Expand Up @@ -665,14 +686,23 @@ protected Connector getConnector(String connType) {
return tempConnectors.computeIfAbsent(connType, k -> plugins().newConnector(k));
}

/*
/**
* Retrieves ConnectorType for the corresponding connector class
* @param connClass class of the connector
*/
public ConnectorType connectorTypeForClass(String connClass) {
return ConnectorType.from(getConnector(connClass).getClass());
}

/**
* Retrieves ConnectorType for the class specified in the connector config
* @param connConfig the connector config; may not be null
* @return the {@link ConnectorType} of the connector
*/
public ConnectorType connectorTypeForConfig(Map<String, String> connConfig) {
return connectorTypeForClass(connConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
}

/**
* Checks a given {@link ConfigInfos} for validation error messages and adds an exception
* to the given {@link Callback} if any were found.
Expand Down
Loading