Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
Expand Down Expand Up @@ -62,6 +63,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* Abstract Herder implementation which handles connector/task lifecycle tracking. Extensions
Expand Down Expand Up @@ -260,6 +262,25 @@ public ConnectorStateInfo connectorStatus(String connName) {
conf == null ? ConnectorType.UNKNOWN : connectorTypeForClass(conf.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)));
}

@Override
public ActiveTopicsInfo connectorActiveTopics(String connName) {
Collection<String> topics = statusBackingStore.getAllTopics(connName).stream()
.map(TopicStatus::topic)
.collect(Collectors.toList());
return new ActiveTopicsInfo(connName, topics);
}

@Override
public void resetConnectorActiveTopics(String connName) {
statusBackingStore.getAllTopics(connName).stream()
.forEach(status -> statusBackingStore.deleteTopic(status.connector(), status.topic()));
}

@Override
public StatusBackingStore statusBackingStore() {
return statusBackingStore;
}

@Override
public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id) {
TaskStatus status = statusBackingStore.get(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;

Expand Down Expand Up @@ -144,6 +146,28 @@ public interface Herder {
*/
ConnectorStateInfo connectorStatus(String connName);

/**
* Lookup the set of topics currently used by a connector.
*
* @param connName name of the connector
* @return the set of active topics
*/
ActiveTopicsInfo connectorActiveTopics(String connName);

/**
* Request to asynchronously reset the active topics for the named connector.
*
* @param connName name of the connector
*/
void resetConnectorActiveTopics(String connName);

/**
* Return a reference to the status backing store used by this herder.
*
* @return the status backing store used by this herder
*/
StatusBackingStore statusBackingStore();

/**
* Lookup the status of the a task.
* @param id id of the task
Expand Down Expand Up @@ -200,7 +224,6 @@ public interface Herder {
*/
Plugins plugins();


/**
* Get the cluster ID of the Kafka cluster backing this Connect cluster.
* @return the cluster ID of the Kafka cluster backing this connect cluster
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime;

import org.apache.kafka.connect.util.ConnectorTaskId;

import java.util.Objects;

/**
* Represents the metadata that is stored as the value of the record that is stored in the
* {@link org.apache.kafka.connect.storage.StatusBackingStore#put(TopicStatus)},
*/
public class TopicStatus {
private final String topic;
private final String connector;
private final int task;
private final long discoverTimestamp;

public TopicStatus(String topic, ConnectorTaskId task, long discoverTimestamp) {
this(topic, task.connector(), task.task(), discoverTimestamp);
}

public TopicStatus(String topic, String connector, int task, long discoverTimestamp) {
this.topic = Objects.requireNonNull(topic);
this.connector = Objects.requireNonNull(connector);
this.task = task;
this.discoverTimestamp = discoverTimestamp;
}

/**
* Get the name of the topic.
*
* @return the topic name; never null
*/
public String topic() {
Comment thread
rhauch marked this conversation as resolved.
return topic;
}

/**
* Get the name of the connector.
*
* @return the connector name; never null
*/
public String connector() {
return connector;
}

/**
* Get the ID of the task that stored the topic status.
*
* @return the task ID
*/
public int task() {
return task;
}

/**
* Get a timestamp that represents when this topic was discovered as being actively used by
* this connector.
*
* @return the discovery timestamp
*/
public long discoverTimestamp() {
return discoverTimestamp;
}
Comment thread
rhauch marked this conversation as resolved.

@Override
public String toString() {
return "TopicStatus{" +
"topic='" + topic + '\'' +
", connector='" + connector + '\'' +
", task=" + task +
", discoverTimestamp=" + discoverTimestamp +
'}';
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof TopicStatus)) {
return false;
}
TopicStatus that = (TopicStatus) o;
return task == that.task &&
discoverTimestamp == that.discoverTimestamp &&
topic.equals(that.topic) &&
connector.equals(that.connector);
}

@Override
public int hashCode() {
return Objects.hash(topic, connector, task, discoverTimestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState,
// Note we pass the configState as it performs dynamic transformations under the covers
return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter,
headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, configState, metrics, loader,
time, retryWithToleranceOperator);
time, retryWithToleranceOperator, herder.statusBackingStore());
} else if (task instanceof SinkTask) {
TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);
Expand All @@ -535,7 +535,7 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState,

return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
valueConverter, headerConverter, transformationChain, consumer, loader, time,
retryWithToleranceOperator);
retryWithToleranceOperator, herder.statusBackingStore());
} else {
log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,16 @@ public class WorkerConfig extends AbstractConfig {
public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;

public static final String TOPIC_TRACKING_ENABLE_CONFIG = "topic.tracking.enable";
protected static final String TOPIC_TRACKING_ENABLE_DOC = "Enable tracking the set of active "
+ "topics per connector during runtime.";
protected static final boolean TOPIC_TRACKING_ENABLE_DEFAULT = true;

public static final String TOPIC_TRACKING_ALLOW_RESET_CONFIG = "topic.tracking.allow.reset";
protected static final String TOPIC_TRACKING_ALLOW_RESET_DOC = "If set to true, it allows "
+ "user requests to reset the set of active topics per connector.";
protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true;

/**
* Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to
* bootstrap their own ConfigDef.
Expand Down Expand Up @@ -310,7 +320,11 @@ protected static ConfigDef baseConfigDef() {
.define(ADMIN_LISTENERS_CONFIG, Type.LIST, null,
new AdminListenersValidator(), Importance.LOW, ADMIN_LISTENERS_DOC)
.define(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, Type.STRING, CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT,
Importance.MEDIUM, CONNECTOR_CLIENT_POLICY_CLASS_DOC);
Importance.MEDIUM, CONNECTOR_CLIENT_POLICY_CLASS_DOC)
.define(TOPIC_TRACKING_ENABLE_CONFIG, Type.BOOLEAN, TOPIC_TRACKING_ENABLE_DEFAULT,
Importance.LOW, TOPIC_TRACKING_ENABLE_DOC)
.define(TOPIC_TRACKING_ALLOW_RESET_CONFIG, Type.BOOLEAN, TOPIC_TRACKING_ALLOW_RESET_DEFAULT,
Importance.LOW, TOPIC_TRACKING_ALLOW_RESET_DOC);
}

private void logInternalConverterDeprecationWarnings(Map<String, String> props) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
Expand All @@ -71,7 +72,6 @@ class WorkerSinkTask extends WorkerTask {
private final SinkTask task;
private final ClusterConfigState configState;
private Map<String, String> taskConfig;
private final Time time;
private final Converter keyConverter;
private final Converter valueConverter;
private final HeaderConverter headerConverter;
Expand Down Expand Up @@ -105,8 +105,10 @@ public WorkerSinkTask(ConnectorTaskId id,
KafkaConsumer<byte[], byte[]> consumer,
ClassLoader loader,
Time time,
RetryWithToleranceOperator retryWithToleranceOperator) {
super(id, statusListener, initialState, loader, connectMetrics, retryWithToleranceOperator);
RetryWithToleranceOperator retryWithToleranceOperator,
StatusBackingStore statusBackingStore) {
super(id, statusListener, initialState, loader, connectMetrics,
retryWithToleranceOperator, time, statusBackingStore);

this.workerConfig = workerConfig;
this.task = task;
Expand All @@ -115,7 +117,6 @@ public WorkerSinkTask(ConnectorTaskId id,
this.valueConverter = valueConverter;
this.headerConverter = headerConverter;
this.transformationChain = transformationChain;
this.time = time;
this.messageBatch = new ArrayList<>();
this.currentOffsets = new HashMap<>();
this.origOffsets = new HashMap<>();
Expand Down Expand Up @@ -504,6 +505,7 @@ private SinkRecord convertAndTransformRecord(final ConsumerRecord<byte[], byte[]
headers);
log.trace("{} Applying transformations to record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}",
this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value());
recordActiveTopic(origRecord.topic());
return transformationChain.apply(origRecord);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
Expand Down Expand Up @@ -77,7 +78,6 @@ class WorkerSourceTask extends WorkerTask {
private KafkaProducer<byte[], byte[]> producer;
private final CloseableOffsetStorageReader offsetReader;
private final OffsetStorageWriter offsetWriter;
private final Time time;
private final SourceTaskMetricsGroup sourceTaskMetricsGroup;
private final AtomicReference<Exception> producerSendException;

Expand Down Expand Up @@ -112,9 +112,11 @@ public WorkerSourceTask(ConnectorTaskId id,
ConnectMetrics connectMetrics,
ClassLoader loader,
Time time,
RetryWithToleranceOperator retryWithToleranceOperator) {
RetryWithToleranceOperator retryWithToleranceOperator,
StatusBackingStore statusBackingStore) {

super(id, statusListener, initialState, loader, connectMetrics, retryWithToleranceOperator);
super(id, statusListener, initialState, loader, connectMetrics,
retryWithToleranceOperator, time, statusBackingStore);

this.workerConfig = workerConfig;
this.task = task;
Expand All @@ -126,7 +128,6 @@ public WorkerSourceTask(ConnectorTaskId id,
this.producer = producer;
this.offsetReader = offsetReader;
this.offsetWriter = offsetWriter;
this.time = time;

this.toSend = null;
this.lastSendFailed = false;
Expand Down Expand Up @@ -355,6 +356,7 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset());
commitTaskRecord(preTransformRecord, recordMetadata);
recordActiveTopic(producerRecord.topic());
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.LoggingContext;
import org.slf4j.Logger;
Expand Down Expand Up @@ -57,6 +58,8 @@ abstract class WorkerTask implements Runnable {
protected final ConnectorTaskId id;
private final TaskStatus.Listener statusListener;
protected final ClassLoader loader;
protected final StatusBackingStore statusBackingStore;
protected final Time time;
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private final TaskMetricsGroup taskMetricsGroup;
private volatile TargetState targetState;
Expand All @@ -70,7 +73,9 @@ public WorkerTask(ConnectorTaskId id,
TargetState initialState,
ClassLoader loader,
ConnectMetrics connectMetrics,
RetryWithToleranceOperator retryWithToleranceOperator) {
RetryWithToleranceOperator retryWithToleranceOperator,
Time time,
StatusBackingStore statusBackingStore) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we really want to pass in the StatusBackingStore interface, or whether we should define a new interface? It'd be nice to constrain future developers to only use the methods related to active topics. It's a bit more complicated, but better captures the current intent to just allow the worker tasks to check whether a topic is considered active for a connector and if not to record that a topic has been used.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main issue is that the new methods seem to belong to the StatusBackingStore interface. Fleshing them out to a different one (let's say TopicTracker) would would separate concerns here, but would also add a step of redirection that might be more complex to follow. wdyt?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neither approach is terribly clean. I think the current approach certainly works, but as you point out if we add something like TopicTracker as a superinterface of StatusBackingStore then we'd be inconsistent with the pre-existing status methods.

Let's keep things simple.

this.id = id;
this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener);
this.statusListener = taskMetricsGroup;
Expand All @@ -80,6 +85,8 @@ public WorkerTask(ConnectorTaskId id,
this.cancelled = false;
this.taskMetricsGroup.recordState(this.targetState);
this.retryWithToleranceOperator = retryWithToleranceOperator;
this.time = time;
this.statusBackingStore = statusBackingStore;
}

public ConnectorTaskId id() {
Expand Down Expand Up @@ -279,6 +286,20 @@ public void transitionTo(TargetState state) {
}
}

/**
* Include this topic to the set of active topics for the connector that this worker task
* is running. This information is persisted in the status backing store used by this worker.
*
* @param topic the topic to mark as active for this connector
*/
protected void recordActiveTopic(String topic) {
if (statusBackingStore.getTopic(id.connector(), topic) != null) {
// The topic is already recorded as active. No further action is required.
return;
}
statusBackingStore.put(new TopicStatus(topic, id, time.milliseconds()));
}

/**
* Record that offsets have been committed.
*
Expand Down
Loading