transformation : transformations) {
transformation.close();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 98e8e78e2358b..d0a04e8a9c55a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -60,6 +60,8 @@
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.LoggingContext;
import org.apache.kafka.connect.util.SinkUtils;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.apache.kafka.connect.util.TopicCreationGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,7 +78,6 @@
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
-
/**
*
* Worker runs a (dynamic) set of tasks in a set of threads, doing the work of actually moving
@@ -249,25 +250,18 @@ public boolean startConnector(
final WorkerConnector workerConnector;
ClassLoader savedLoader = plugins.currentThreadLoader();
try {
- final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
- final String connClass = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
- log.info("Creating connector {} of type {}", connName, connClass);
- final Connector connector = plugins.newConnector(connClass);
-
+ // By the time we arrive here, CONNECTOR_CLASS_CONFIG has been validated already
+ // Getting this value from the unparsed map will allow us to instantiate the
+ // right config (source or sink)
+ final String connClassProp = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ log.info("Creating connector {} of type {}", connName, connClassProp);
+ final Connector connector = plugins.newConnector(connClassProp);
final OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(
- offsetBackingStore,
- connName,
- internalKeyConverter,
- internalValueConverter
- );
- workerConnector = new WorkerConnector(
- connName,
- connector,
- ctx,
- metrics,
- statusListener,
- offsetReader
- );
+ offsetBackingStore, connName, internalKeyConverter, internalValueConverter);
+ workerConnector = new WorkerConnector(connName, connector, ctx, metrics, statusListener, offsetReader);
+ final ConnectorConfig connConfig = workerConnector.isSinkConnector()
+ ? new SinkConnectorConfig(plugins, connProps)
+ : new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable());
log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass());
savedLoader = plugins.compareAndSwapLoaders(connector);
workerConnector.initialize(connConfig);
@@ -526,21 +520,34 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState,
// Decide which type of worker task we need based on the type of task.
if (task instanceof SourceTask) {
- retryWithToleranceOperator.reporters(sourceTaskReporters(id, connConfig, errorHandlingMetrics));
- TransformationChain transformationChain = new TransformationChain<>(connConfig.transformations(), retryWithToleranceOperator);
+ SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins,
+ connConfig.originalsStrings(), config.topicCreationEnable());
+ retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
+ TransformationChain transformationChain = new TransformationChain<>(sourceConfig.transformations(), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);
CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
internalKeyConverter, internalValueConverter);
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
internalKeyConverter, internalValueConverter);
- Map producerProps = producerConfigs(id, "connector-producer-" + id, config, connConfig, connectorClass,
+ Map producerProps = producerConfigs(id, "connector-producer-" + id, config, sourceConfig, connectorClass,
connectorClientConfigOverridePolicy);
KafkaProducer producer = new KafkaProducer<>(producerProps);
+ TopicAdmin admin;
+ Map topicCreationGroups;
+ if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) {
+ Map adminProps = adminConfigs(id, "connector-adminclient-" + id, config,
+ sourceConfig, connectorClass, connectorClientConfigOverridePolicy);
+ admin = new TopicAdmin(adminProps);
+ topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig);
+ } else {
+ admin = null;
+ topicCreationGroups = null;
+ }
// Note we pass the configState as it performs dynamic transformations under the covers
return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter,
- headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, configState, metrics, loader,
- time, retryWithToleranceOperator, herder.statusBackingStore());
+ headerConverter, transformationChain, producer, admin, topicCreationGroups,
+ offsetReader, offsetWriter, config, configState, metrics, loader, time, retryWithToleranceOperator, herder.statusBackingStore());
} else if (task instanceof SinkTask) {
TransformationChain transformationChain = new TransformationChain<>(connConfig.transformations(), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);
@@ -620,6 +627,7 @@ static Map consumerConfigs(ConnectorTaskId id,
}
static Map adminConfigs(ConnectorTaskId id,
+ String defaultClientId,
WorkerConfig config,
ConnectorConfig connConfig,
Class extends Connector> connectorClass,
@@ -637,6 +645,7 @@ static Map adminConfigs(ConnectorTaskId id,
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
+ adminProps.put(AdminClientConfig.CLIENT_ID_CONFIG, defaultClientId);
adminProps.putAll(nonPrefixedWorkerConfigs);
// Admin client-specific overrides in the worker config
@@ -693,7 +702,7 @@ private List sinkTaskReporters(ConnectorTaskId id, SinkConnectorC
if (topic != null && !topic.isEmpty()) {
Map producerProps = producerConfigs(id, "connector-dlq-producer-" + id, config, connConfig, connectorClass,
connectorClientConfigOverridePolicy);
- Map adminProps = adminConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
+ Map adminProps = adminConfigs(id, "connector-dlq-adminclient-", config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(adminProps, id, connConfig, producerProps, errorHandlingMetrics);
reporters.add(reporter);
}
@@ -817,6 +826,16 @@ public String workerId() {
return workerId;
}
+ /**
+ * Returns whether this worker is configured to allow source connectors to create the topics
+ * that they use with custom configurations, if these topics don't already exist.
+ *
+ * @return true if topic creation by source connectors is allowed; false otherwise
+ */
+ public boolean isTopicCreationEnabled() {
+ return config.topicCreationEnable();
+ }
+
/**
* Get the {@link ConnectMetrics} that uses Kafka Metrics and manages the JMX reporter.
* @return the Connect-specific metrics; never null
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 7a4f04ec7cd98..02349265b1576 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -44,6 +44,7 @@
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_PREFIX;
/**
* Common base class providing configuration for Kafka Connect workers, whether standalone or distributed.
@@ -250,9 +251,17 @@ public class WorkerConfig extends AbstractConfig {
+ "user requests to reset the set of active topics per connector.";
protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true;
+ public static final String TOPIC_CREATION_ENABLE_CONFIG = "topic.creation.enable";
+ protected static final String TOPIC_CREATION_ENABLE_DOC = "Whether to allow "
+ + "automatic creation of topics used by source connectors, when source connectors "
+ + "are configured with `" + TOPIC_CREATION_PREFIX + "` properties. Each task will use an "
+ + "admin client to create its topics and will not depend on the Kafka brokers "
+ + "to create topics automatically.";
+ protected static final boolean TOPIC_CREATION_ENABLE_DEFAULT = true;
+
public static final String RESPONSE_HTTP_HEADERS_CONFIG = "response.http.headers.config";
- public static final String RESPONSE_HTTP_HEADERS_DOC = "Rules for REST API HTTP response headers";
- public static final String RESPONSE_HTTP_HEADERS_DEFAULT = "";
+ protected static final String RESPONSE_HTTP_HEADERS_DOC = "Rules for REST API HTTP response headers";
+ protected static final String RESPONSE_HTTP_HEADERS_DEFAULT = "";
/**
* Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to
@@ -335,6 +344,8 @@ protected static ConfigDef baseConfigDef() {
Importance.LOW, TOPIC_TRACKING_ENABLE_DOC)
.define(TOPIC_TRACKING_ALLOW_RESET_CONFIG, Type.BOOLEAN, TOPIC_TRACKING_ALLOW_RESET_DEFAULT,
Importance.LOW, TOPIC_TRACKING_ALLOW_RESET_DOC)
+ .define(TOPIC_CREATION_ENABLE_CONFIG, Type.BOOLEAN, TOPIC_CREATION_ENABLE_DEFAULT, Importance.LOW,
+ TOPIC_CREATION_ENABLE_DOC)
.define(RESPONSE_HTTP_HEADERS_CONFIG, Type.STRING, RESPONSE_HTTP_HEADERS_DEFAULT,
new ResponseHttpHeadersValidator(), Importance.LOW, RESPONSE_HTTP_HEADERS_DOC);
}
@@ -395,6 +406,10 @@ public Integer getRebalanceTimeout() {
return null;
}
+ public boolean topicCreationEnable() {
+ return getBoolean(TOPIC_CREATION_ENABLE_CONFIG);
+ }
+
@Override
protected Map postProcessParsedConfig(final Map parsedValues) {
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 3a8c8d484021c..3df94511d9c70 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -165,18 +165,8 @@ protected void close() {
} catch (Throwable t) {
log.warn("Could not stop task", t);
}
- if (consumer != null) {
- try {
- consumer.close();
- } catch (Throwable t) {
- log.warn("Could not close consumer", t);
- }
- }
- try {
- transformationChain.close();
- } catch (Throwable t) {
- log.warn("Could not close transformation chain", t);
- }
+ Utils.closeQuietly(consumer, "consumer");
+ Utils.closeQuietly(transformationChain, "transformation chain");
Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index dc7474766a034..e44d93a718bf0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -16,7 +16,8 @@
*/
package org.apache.kafka.connect.runtime;
-import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
@@ -47,6 +48,9 @@
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.apache.kafka.connect.util.TopicCreation;
+import org.apache.kafka.connect.util.TopicCreationGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,12 +82,14 @@ class WorkerSourceTask extends WorkerTask {
private final Converter valueConverter;
private final HeaderConverter headerConverter;
private final TransformationChain transformationChain;
- private KafkaProducer producer;
+ private final KafkaProducer producer;
+ private final TopicAdmin admin;
private final CloseableOffsetStorageReader offsetReader;
private final OffsetStorageWriter offsetWriter;
private final SourceTaskMetricsGroup sourceTaskMetricsGroup;
private final AtomicReference producerSendException;
private final boolean isTopicTrackingEnabled;
+ private final TopicCreation topicCreation;
private List toSend;
private boolean lastSendFailed; // Whether the last send failed *synchronously*, i.e. never made it into the producer's RecordAccumulator
@@ -109,6 +115,8 @@ public WorkerSourceTask(ConnectorTaskId id,
HeaderConverter headerConverter,
TransformationChain transformationChain,
KafkaProducer producer,
+ TopicAdmin admin,
+ Map topicGroups,
CloseableOffsetStorageReader offsetReader,
OffsetStorageWriter offsetWriter,
WorkerConfig workerConfig,
@@ -130,6 +138,7 @@ public WorkerSourceTask(ConnectorTaskId id,
this.headerConverter = headerConverter;
this.transformationChain = transformationChain;
this.producer = producer;
+ this.admin = admin;
this.offsetReader = offsetReader;
this.offsetWriter = offsetWriter;
@@ -142,6 +151,7 @@ public WorkerSourceTask(ConnectorTaskId id,
this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
this.producerSendException = new AtomicReference<>();
this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+ this.topicCreation = TopicCreation.newTopicCreation(workerConfig, topicGroups);
}
@Override
@@ -166,11 +176,14 @@ protected void close() {
log.warn("Could not close producer", t);
}
}
- try {
- transformationChain.close();
- } catch (Throwable t) {
- log.warn("Could not close transformation chain", t);
+ if (admin != null) {
+ try {
+ admin.close(Duration.ofSeconds(30));
+ } catch (Throwable t) {
+ log.warn("Failed to close admin client on time", t);
+ }
}
+ Utils.closeQuietly(transformationChain, "transformation chain");
Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
}
@@ -344,37 +357,41 @@ private boolean sendRecords() {
}
}
try {
+ maybeCreateTopic(record.topic());
final String topic = producerRecord.topic();
producer.send(
- producerRecord,
- new Callback() {
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if (e != null) {
- log.error("{} failed to send record to {}:", WorkerSourceTask.this, topic, e);
- log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
- producerSendException.compareAndSet(null, e);
- } else {
- recordSent(producerRecord);
- counter.completeRecord();
- log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
- WorkerSourceTask.this,
- recordMetadata.topic(), recordMetadata.partition(),
- recordMetadata.offset());
- commitTaskRecord(preTransformRecord, recordMetadata);
- if (isTopicTrackingEnabled) {
- recordActiveTopic(producerRecord.topic());
- }
- }
+ producerRecord,
+ (recordMetadata, e) -> {
+ if (e != null) {
+ log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
+ log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
+ producerSendException.compareAndSet(null, e);
+ } else {
+ recordSent(producerRecord);
+ counter.completeRecord();
+ log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
+ WorkerSourceTask.this,
+ recordMetadata.topic(), recordMetadata.partition(),
+ recordMetadata.offset());
+ commitTaskRecord(preTransformRecord, recordMetadata);
+ if (isTopicTrackingEnabled) {
+ recordActiveTopic(producerRecord.topic());
}
- });
+ }
+ });
lastSendFailed = false;
- } catch (org.apache.kafka.common.errors.RetriableException e) {
- log.warn("{} Failed to send {}, backing off before retrying:", this, producerRecord, e);
+ } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
+ log.warn("{} Failed to send record to topic '{}' and partition '{}'. Backing off before retrying: ",
+ this, producerRecord.topic(), producerRecord.partition(), e);
toSend = toSend.subList(processed, toSend.size());
lastSendFailed = true;
counter.retryRemaining();
return false;
+ } catch (ConnectException e) {
+ log.warn("{} Failed to send record to topic '{}' and partition '{}' due to an unrecoverable exception: ",
+ this, producerRecord.topic(), producerRecord.partition(), e);
+ log.warn("{} Failed to send {} with unrecoverable exception: ", this, producerRecord, e);
+ throw e;
} catch (KafkaException e) {
throw new ConnectException("Unrecoverable exception trying to send", e);
}
@@ -384,6 +401,37 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
return true;
}
+ // Due to transformations that may change the destination topic of a record (such as
+ // RegexRouter) topic creation can not be batched for multiple topics
+ private void maybeCreateTopic(String topic) {
+ if (!topicCreation.isTopicCreationRequired(topic)) {
+ return;
+ }
+ log.info("The task will send records to topic '{}' for the first time. Checking "
+ + "whether topic exists", topic);
+ Map existing = admin.describeTopics(topic);
+ if (!existing.isEmpty()) {
+ log.info("Topic '{}' already exists.", topic);
+ topicCreation.addTopic(topic);
+ return;
+ }
+
+ log.info("Creating topic '{}'", topic);
+ TopicCreationGroup topicGroup = topicCreation.findFirstGroup(topic);
+ log.debug("Topic '{}' matched topic creation group: {}", topic, topicGroup);
+ NewTopic newTopic = topicGroup.newTopic(topic);
+
+ if (admin.createTopic(newTopic)) {
+ topicCreation.addTopic(topic);
+ log.info("Created topic '{}' using creation group {}", newTopic, topicGroup);
+ } else {
+ log.warn("Request to create new topic '{}' failed", topic);
+ throw new ConnectException("Task failed to create new topic " + topic + ". Ensure "
+ + "that the task is authorized to create topics or that the topic exists and "
+ + "restart the task");
+ }
+ }
+
private RecordHeaders convertHeaderFor(SourceRecord record) {
Headers headers = record.headers();
RecordHeaders result = new RecordHeaders();
@@ -479,14 +527,11 @@ public boolean commitOffsets() {
}
// Now we can actually flush the offsets to user storage.
- Future flushFuture = offsetWriter.doFlush(new org.apache.kafka.connect.util.Callback() {
- @Override
- public void onCompletion(Throwable error, Void result) {
- if (error != null) {
- log.error("{} Failed to flush offsets to storage: ", WorkerSourceTask.this, error);
- } else {
- log.trace("{} Finished flushing offsets to storage", WorkerSourceTask.this);
- }
+ Future flushFuture = offsetWriter.doFlush((error, result) -> {
+ if (error != null) {
+ log.error("{} Failed to flush offsets to storage: ", WorkerSourceTask.this, error);
+ } else {
+ log.trace("{} Finished flushing offsets to storage", WorkerSourceTask.this);
}
});
// Very rare case: offsets were unserializable and we finished immediately, unable to store
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index c7fc1cff128a6..9af6291e3df83 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -18,13 +18,10 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigDef.Validator;
-import org.apache.kafka.common.config.ConfigDef.LambdaValidator;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +37,8 @@
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_VALIDATOR;
+import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_VALIDATOR;
public class DistributedConfig extends WorkerConfig {
@@ -193,15 +192,6 @@ public class DistributedConfig extends WorkerConfig {
public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A list of permitted algorithms for verifying internal requests";
public static final List INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = Collections.singletonList(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT);
- private static final Validator REPLICATION_FACTOR_VALIDATOR = LambdaValidator.with(
- (name, value) -> validateReplicationFactor(name, (short) value),
- () -> "Positive number, or -1 to use the broker's default"
- );
- private static final Validator PARTITIONS_VALIDATOR = LambdaValidator.with(
- (name, value) -> validatePartitions(name, (int) value),
- () -> "Positive number, or -1 to use the broker's default"
- );
-
@SuppressWarnings("unchecked")
private static final ConfigDef CONFIG = baseConfigDef()
.define(GROUP_ID_CONFIG,
@@ -490,18 +480,4 @@ private static void validateKeyAlgorithm(String configName, String algorithm) {
throw new ConfigException(configName, algorithm, e.getMessage());
}
}
-
- private static void validatePartitions(String configName, int factor) {
- if (factor != TopicAdmin.NO_PARTITIONS && factor < 1) {
- throw new ConfigException(configName, factor,
- "Number of partitions must be positive, or -1 to use the broker's default");
- }
- }
-
- private static void validateReplicationFactor(String configName, short factor) {
- if (factor != TopicAdmin.NO_REPLICATION_FACTOR && factor < 1) {
- throw new ConfigException(configName, factor,
- "Replication factor must be positive, or -1 to use the broker's default");
- }
- }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 349aa71e54e0c..851b75ff51f2a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1289,7 +1289,7 @@ private void reconfigureConnector(final String connName, final Callback cb
if (worker.isSinkConnector(connName)) {
connConfig = new SinkConnectorConfig(plugins(), configs);
} else {
- connConfig = new SourceConnectorConfig(plugins(), configs);
+ connConfig = new SourceConnectorConfig(plugins(), configs, worker.isTopicCreationEnabled());
}
final List