diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 83d81bb347da0..f0315b6eb7f39 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -30,6 +30,7 @@ import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo; @@ -62,6 +63,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * Abstract Herder implementation which handles connector/task lifecycle tracking. Extensions @@ -260,6 +262,25 @@ public ConnectorStateInfo connectorStatus(String connName) { conf == null ? ConnectorType.UNKNOWN : connectorTypeForClass(conf.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))); } + @Override + public ActiveTopicsInfo connectorActiveTopics(String connName) { + Collection topics = statusBackingStore.getAllTopics(connName).stream() + .map(TopicStatus::topic) + .collect(Collectors.toList()); + return new ActiveTopicsInfo(connName, topics); + } + + @Override + public void resetConnectorActiveTopics(String connName) { + statusBackingStore.getAllTopics(connName).stream() + .forEach(status -> statusBackingStore.deleteTopic(status.connector(), status.topic())); + } + + @Override + public StatusBackingStore statusBackingStore() { + return statusBackingStore; + } + @Override public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id) { TaskStatus status = statusBackingStore.get(id); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java index 7915682790317..6cb1a475b88f1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -18,10 +18,12 @@ import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.InternalRequestSignature; +import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; +import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -144,6 +146,28 @@ public interface Herder { */ ConnectorStateInfo connectorStatus(String connName); + /** + * Lookup the set of topics currently used by a connector. + * + * @param connName name of the connector + * @return the set of active topics + */ + ActiveTopicsInfo connectorActiveTopics(String connName); + + /** + * Request to asynchronously reset the active topics for the named connector. + * + * @param connName name of the connector + */ + void resetConnectorActiveTopics(String connName); + + /** + * Return a reference to the status backing store used by this herder. + * + * @return the status backing store used by this herder + */ + StatusBackingStore statusBackingStore(); + /** * Lookup the status of the a task. * @param id id of the task @@ -200,7 +224,6 @@ public interface Herder { */ Plugins plugins(); - /** * Get the cluster ID of the Kafka cluster backing this Connect cluster. * @return the cluster ID of the Kafka cluster backing this connect cluster diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicStatus.java new file mode 100644 index 0000000000000..16dcd80b43f84 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicStatus.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.util.ConnectorTaskId; + +import java.util.Objects; + +/** + * Represents the metadata that is stored as the value of the record that is stored in the + * {@link org.apache.kafka.connect.storage.StatusBackingStore#put(TopicStatus)}, + */ +public class TopicStatus { + private final String topic; + private final String connector; + private final int task; + private final long discoverTimestamp; + + public TopicStatus(String topic, ConnectorTaskId task, long discoverTimestamp) { + this(topic, task.connector(), task.task(), discoverTimestamp); + } + + public TopicStatus(String topic, String connector, int task, long discoverTimestamp) { + this.topic = Objects.requireNonNull(topic); + this.connector = Objects.requireNonNull(connector); + this.task = task; + this.discoverTimestamp = discoverTimestamp; + } + + /** + * Get the name of the topic. + * + * @return the topic name; never null + */ + public String topic() { + return topic; + } + + /** + * Get the name of the connector. + * + * @return the connector name; never null + */ + public String connector() { + return connector; + } + + /** + * Get the ID of the task that stored the topic status. + * + * @return the task ID + */ + public int task() { + return task; + } + + /** + * Get a timestamp that represents when this topic was discovered as being actively used by + * this connector. + * + * @return the discovery timestamp + */ + public long discoverTimestamp() { + return discoverTimestamp; + } + + @Override + public String toString() { + return "TopicStatus{" + + "topic='" + topic + '\'' + + ", connector='" + connector + '\'' + + ", task=" + task + + ", discoverTimestamp=" + discoverTimestamp + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TopicStatus)) { + return false; + } + TopicStatus that = (TopicStatus) o; + return task == that.task && + discoverTimestamp == that.discoverTimestamp && + topic.equals(that.topic) && + connector.equals(that.connector); + } + + @Override + public int hashCode() { + return Objects.hash(topic, connector, task, discoverTimestamp); + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index e496d416ef11f..39390f23bda82 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -523,7 +523,7 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState, // Note we pass the configState as it performs dynamic transformations under the covers return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, configState, metrics, loader, - time, retryWithToleranceOperator); + time, retryWithToleranceOperator, herder.statusBackingStore()); } else if (task instanceof SinkTask) { TransformationChain transformationChain = new TransformationChain<>(connConfig.transformations(), retryWithToleranceOperator); log.info("Initializing: {}", transformationChain); @@ -535,7 +535,7 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState, return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter, valueConverter, headerConverter, transformationChain, consumer, loader, time, - retryWithToleranceOperator); + retryWithToleranceOperator, herder.statusBackingStore()); } else { log.error("Tasks must be a subclass of either SourceTask or SinkTask", task); throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask"); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index 17d1d5fe0c89d..347e250cefbf8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -234,6 +234,16 @@ public class WorkerConfig extends AbstractConfig { public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG; public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG; + public static final String TOPIC_TRACKING_ENABLE_CONFIG = "topic.tracking.enable"; + protected static final String TOPIC_TRACKING_ENABLE_DOC = "Enable tracking the set of active " + + "topics per connector during runtime."; + protected static final boolean TOPIC_TRACKING_ENABLE_DEFAULT = true; + + public static final String TOPIC_TRACKING_ALLOW_RESET_CONFIG = "topic.tracking.allow.reset"; + protected static final String TOPIC_TRACKING_ALLOW_RESET_DOC = "If set to true, it allows " + + "user requests to reset the set of active topics per connector."; + protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true; + /** * Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to * bootstrap their own ConfigDef. @@ -310,7 +320,11 @@ protected static ConfigDef baseConfigDef() { .define(ADMIN_LISTENERS_CONFIG, Type.LIST, null, new AdminListenersValidator(), Importance.LOW, ADMIN_LISTENERS_DOC) .define(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, Type.STRING, CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT, - Importance.MEDIUM, CONNECTOR_CLIENT_POLICY_CLASS_DOC); + Importance.MEDIUM, CONNECTOR_CLIENT_POLICY_CLASS_DOC) + .define(TOPIC_TRACKING_ENABLE_CONFIG, Type.BOOLEAN, TOPIC_TRACKING_ENABLE_DEFAULT, + Importance.LOW, TOPIC_TRACKING_ENABLE_DOC) + .define(TOPIC_TRACKING_ALLOW_RESET_CONFIG, Type.BOOLEAN, TOPIC_TRACKING_ALLOW_RESET_DEFAULT, + Importance.LOW, TOPIC_TRACKING_ALLOW_RESET_DOC); } private void logInternalConverterDeprecationWarnings(Map props) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 70b8a9bb8e58b..d1d1eecc530a1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -45,6 +45,7 @@ import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; @@ -71,7 +72,6 @@ class WorkerSinkTask extends WorkerTask { private final SinkTask task; private final ClusterConfigState configState; private Map taskConfig; - private final Time time; private final Converter keyConverter; private final Converter valueConverter; private final HeaderConverter headerConverter; @@ -105,8 +105,10 @@ public WorkerSinkTask(ConnectorTaskId id, KafkaConsumer consumer, ClassLoader loader, Time time, - RetryWithToleranceOperator retryWithToleranceOperator) { - super(id, statusListener, initialState, loader, connectMetrics, retryWithToleranceOperator); + RetryWithToleranceOperator retryWithToleranceOperator, + StatusBackingStore statusBackingStore) { + super(id, statusListener, initialState, loader, connectMetrics, + retryWithToleranceOperator, time, statusBackingStore); this.workerConfig = workerConfig; this.task = task; @@ -115,7 +117,6 @@ public WorkerSinkTask(ConnectorTaskId id, this.valueConverter = valueConverter; this.headerConverter = headerConverter; this.transformationChain = transformationChain; - this.time = time; this.messageBatch = new ArrayList<>(); this.currentOffsets = new HashMap<>(); this.origOffsets = new HashMap<>(); @@ -504,6 +505,7 @@ private SinkRecord convertAndTransformRecord(final ConsumerRecord producer; private final CloseableOffsetStorageReader offsetReader; private final OffsetStorageWriter offsetWriter; - private final Time time; private final SourceTaskMetricsGroup sourceTaskMetricsGroup; private final AtomicReference producerSendException; @@ -112,9 +112,11 @@ public WorkerSourceTask(ConnectorTaskId id, ConnectMetrics connectMetrics, ClassLoader loader, Time time, - RetryWithToleranceOperator retryWithToleranceOperator) { + RetryWithToleranceOperator retryWithToleranceOperator, + StatusBackingStore statusBackingStore) { - super(id, statusListener, initialState, loader, connectMetrics, retryWithToleranceOperator); + super(id, statusListener, initialState, loader, connectMetrics, + retryWithToleranceOperator, time, statusBackingStore); this.workerConfig = workerConfig; this.task = task; @@ -126,7 +128,6 @@ public WorkerSourceTask(ConnectorTaskId id, this.producer = producer; this.offsetReader = offsetReader; this.offsetWriter = offsetWriter; - this.time = time; this.toSend = null; this.lastSendFailed = false; @@ -355,6 +356,7 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) { recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); commitTaskRecord(preTransformRecord, recordMetadata); + recordActiveTopic(producerRecord.topic()); } } }); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index 28d2a8ff96c30..c40dc90012e33 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -30,6 +30,7 @@ import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.LoggingContext; import org.slf4j.Logger; @@ -57,6 +58,8 @@ abstract class WorkerTask implements Runnable { protected final ConnectorTaskId id; private final TaskStatus.Listener statusListener; protected final ClassLoader loader; + protected final StatusBackingStore statusBackingStore; + protected final Time time; private final CountDownLatch shutdownLatch = new CountDownLatch(1); private final TaskMetricsGroup taskMetricsGroup; private volatile TargetState targetState; @@ -70,7 +73,9 @@ public WorkerTask(ConnectorTaskId id, TargetState initialState, ClassLoader loader, ConnectMetrics connectMetrics, - RetryWithToleranceOperator retryWithToleranceOperator) { + RetryWithToleranceOperator retryWithToleranceOperator, + Time time, + StatusBackingStore statusBackingStore) { this.id = id; this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener); this.statusListener = taskMetricsGroup; @@ -80,6 +85,8 @@ public WorkerTask(ConnectorTaskId id, this.cancelled = false; this.taskMetricsGroup.recordState(this.targetState); this.retryWithToleranceOperator = retryWithToleranceOperator; + this.time = time; + this.statusBackingStore = statusBackingStore; } public ConnectorTaskId id() { @@ -279,6 +286,20 @@ public void transitionTo(TargetState state) { } } + /** + * Include this topic to the set of active topics for the connector that this worker task + * is running. This information is persisted in the status backing store used by this worker. + * + * @param topic the topic to mark as active for this connector + */ + protected void recordActiveTopic(String topic) { + if (statusBackingStore.getTopic(id.connector(), topic) != null) { + // The topic is already recorded as active. No further action is required. + return; + } + statusBackingStore.put(new TopicStatus(topic, id, time.milliseconds())); + } + /** * Record that offsets have been committed. * diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ActiveTopicsInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ActiveTopicsInfo.java new file mode 100644 index 0000000000000..b43c5aa88da84 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ActiveTopicsInfo.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.rest.entities; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Collection; + +public class ActiveTopicsInfo { + private final String connector; + private final Collection topics; + + @JsonCreator + public ActiveTopicsInfo(String connector, @JsonProperty("topics") Collection topics) { + this.connector = connector; + this.topics = topics; + } + + public String connector() { + return connector; + } + + @JsonProperty + public Collection topics() { + return topics; + } + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index 3b41fb7c143a2..6d30eabd2ee46 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -29,6 +29,7 @@ import org.apache.kafka.connect.runtime.distributed.RequestTargetException; import org.apache.kafka.connect.runtime.rest.InternalRequestSignature; import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest; @@ -57,6 +58,7 @@ import javax.ws.rs.core.UriInfo; import java.net.URI; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -64,6 +66,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ALLOW_RESET_CONFIG; +import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG; + @Path("/connectors") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) @@ -173,6 +178,35 @@ public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") Strin return herder.connectorStatus(connector); } + @GET + @Path("/{connector}/topics") + public Response getConnectorActiveTopics(final @PathParam("connector") String connector) throws Throwable { + if (!config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)) { + return Response.status(Response.Status.FORBIDDEN) + .entity("Topic tracking is disabled.") + .build(); + } + ActiveTopicsInfo info = herder.connectorActiveTopics(connector); + return Response.ok(Collections.singletonMap(info.connector(), info)).build(); + } + + @PUT + @Path("/{connector}/topics/reset") + public Response resetConnectorActiveTopics(final @PathParam("connector") String connector, final @Context HttpHeaders headers) throws Throwable { + if (!config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)) { + return Response.status(Response.Status.FORBIDDEN) + .entity("Topic tracking is disabled.") + .build(); + } + if (!config.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)) { + return Response.status(Response.Status.FORBIDDEN) + .entity("Topic tracking reset is disabled.") + .build(); + } + herder.resetConnectorActiveTopics(connector); + return Response.accepted().build(); + } + @PUT @Path("/{connector}/config") public Response putConnectorConfig(final @PathParam("connector") String connector, diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index fee10108dd8e6..6b1535081a304 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -36,6 +36,7 @@ import org.apache.kafka.connect.runtime.AbstractStatus; import org.apache.kafka.connect.runtime.ConnectorStatus; import org.apache.kafka.connect.runtime.TaskStatus; +import org.apache.kafka.connect.runtime.TopicStatus; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; @@ -49,11 +50,15 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * StatusBackingStore implementation which uses a compacted topic for storage @@ -82,12 +87,20 @@ public class KafkaStatusBackingStore implements StatusBackingStore { private static final String TASK_STATUS_PREFIX = "status-task-"; private static final String CONNECTOR_STATUS_PREFIX = "status-connector-"; + private static final String TOPIC_STATUS_PREFIX = "status-topic-"; + private static final String TOPIC_STATUS_SEPARATOR = ":connector-"; public static final String STATE_KEY_NAME = "state"; public static final String TRACE_KEY_NAME = "trace"; public static final String WORKER_ID_KEY_NAME = "worker_id"; public static final String GENERATION_KEY_NAME = "generation"; + public static final String TOPIC_STATE_KEY = "topic"; + public static final String TOPIC_NAME_KEY = "name"; + public static final String TOPIC_CONNECTOR_KEY = "connector"; + public static final String TOPIC_TASK_KEY = "task"; + public static final String TOPIC_DISCOVER_TIMESTAMP_KEY = "discoverTimestamp"; + private static final Schema STATUS_SCHEMA_V0 = SchemaBuilder.struct() .field(STATE_KEY_NAME, Schema.STRING_SCHEMA) .field(TRACE_KEY_NAME, SchemaBuilder.string().optional().build()) @@ -95,12 +108,26 @@ public class KafkaStatusBackingStore implements StatusBackingStore { .field(GENERATION_KEY_NAME, Schema.INT32_SCHEMA) .build(); + private static final Schema TOPIC_STATUS_VALUE_SCHEMA_V0 = SchemaBuilder.struct() + .field(TOPIC_NAME_KEY, Schema.STRING_SCHEMA) + .field(TOPIC_CONNECTOR_KEY, Schema.STRING_SCHEMA) + .field(TOPIC_TASK_KEY, Schema.INT32_SCHEMA) + .field(TOPIC_DISCOVER_TIMESTAMP_KEY, Schema.INT64_SCHEMA) + .build(); + + private static final Schema TOPIC_STATUS_SCHEMA_V0 = SchemaBuilder.map( + Schema.STRING_SCHEMA, + TOPIC_STATUS_VALUE_SCHEMA_V0 + ).build(); + private final Time time; private final Converter converter; private final Table> tasks; private final Map> connectors; + //visible for testing + protected final ConcurrentMap> topics; - private String topic; + private String statusTopic; private KafkaBasedLog kafkaLog; private int generation; @@ -109,19 +136,20 @@ public KafkaStatusBackingStore(Time time, Converter converter) { this.converter = converter; this.tasks = new Table<>(); this.connectors = new HashMap<>(); + this.topics = new ConcurrentHashMap<>(); } // visible for testing - KafkaStatusBackingStore(Time time, Converter converter, String topic, KafkaBasedLog kafkaLog) { + KafkaStatusBackingStore(Time time, Converter converter, String statusTopic, KafkaBasedLog kafkaLog) { this(time, converter); this.kafkaLog = kafkaLog; - this.topic = topic; + this.statusTopic = statusTopic; } @Override public void configure(final WorkerConfig config) { - this.topic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG); - if (this.topic == null || this.topic.trim().length() == 0) + this.statusTopic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG); + if (this.statusTopic == null || this.statusTopic.trim().length() == 0) throw new ConfigException("Must specify topic for connector status."); Map originals = config.originals(); @@ -135,7 +163,7 @@ public void configure(final WorkerConfig config) { consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Map adminProps = new HashMap<>(originals); - NewTopic topicDescription = TopicAdmin.defineTopic(topic). + NewTopic topicDescription = TopicAdmin.defineTopic(statusTopic). compacted(). partitions(config.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG)). replicationFactor(config.getShort(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG)). @@ -147,7 +175,7 @@ public void onCompletion(Throwable error, ConsumerRecord record) read(record); } }; - this.kafkaLog = createKafkaBasedLog(topic, producerProps, consumerProps, readCallback, topicDescription, adminProps); + this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminProps); } private KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, @@ -199,6 +227,11 @@ public void putSafe(final TaskStatus status) { sendTaskStatus(status, true); } + @Override + public void put(final TopicStatus status) { + sendTopicStatus(status.connector(), status.topic(), status); + } + @Override public void flush() { kafkaLog.flush(); @@ -218,6 +251,25 @@ private void sendTaskStatus(final TaskStatus status, boolean safeWrite) { send(key, status, entry, safeWrite); } + private void sendTopicStatus(final String connector, final String topic, final TopicStatus status) { + String key = TOPIC_STATUS_PREFIX + topic + TOPIC_STATUS_SEPARATOR + connector; + + final byte[] value = serializeTopicStatus(status); + + kafkaLog.send(key, value, new org.apache.kafka.clients.producer.Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception == null) return; + // TODO: retry more gracefully and not forever + if (exception instanceof RetriableException) { + kafkaLog.send(key, value, this); + } else { + log.error("Failed to write status update", exception); + } + } + }); + } + private void send(final String key, final V status, final CacheEntry entry, @@ -287,6 +339,14 @@ private synchronized void remove(ConnectorTaskId id) { removed.delete(); } + private void removeTopic(String topic, String connector) { + ConcurrentMap activeTopics = topics.get(connector); + if (activeTopics == null) { + return; + } + activeTopics.remove(topic); + } + @Override public synchronized TaskStatus get(ConnectorTaskId id) { CacheEntry entry = tasks.get(id.connector(), id.task()); @@ -310,6 +370,25 @@ public synchronized Collection getAll(String connector) { return res; } + @Override + public TopicStatus getTopic(String connector, String topic) { + ConcurrentMap activeTopics = topics.get(Objects.requireNonNull(connector)); + return activeTopics != null ? activeTopics.get(Objects.requireNonNull(topic)) : null; + } + + @Override + public Collection getAllTopics(String connector) { + ConcurrentMap activeTopics = topics.get(Objects.requireNonNull(connector)); + return activeTopics != null + ? Collections.unmodifiableCollection(Objects.requireNonNull(activeTopics.values())) + : Collections.emptySet(); + } + + @Override + public void deleteTopic(String connector, String topic) { + sendTopicStatus(Objects.requireNonNull(connector), Objects.requireNonNull(topic), null); + } + @Override public synchronized Set connectors() { return new HashSet<>(connectors.keySet()); @@ -317,7 +396,7 @@ public synchronized Set connectors() { private ConnectorStatus parseConnectorStatus(String connector, byte[] data) { try { - SchemaAndValue schemaAndValue = converter.toConnectData(topic, data); + SchemaAndValue schemaAndValue = converter.toConnectData(statusTopic, data); if (!(schemaAndValue.value() instanceof Map)) { log.error("Invalid connector status type {}", schemaAndValue.value().getClass()); return null; @@ -338,7 +417,7 @@ private ConnectorStatus parseConnectorStatus(String connector, byte[] data) { private TaskStatus parseTaskStatus(ConnectorTaskId taskId, byte[] data) { try { - SchemaAndValue schemaAndValue = converter.toConnectData(topic, data); + SchemaAndValue schemaAndValue = converter.toConnectData(statusTopic, data); if (!(schemaAndValue.value() instanceof Map)) { log.error("Invalid task status type {}", schemaAndValue.value().getClass()); return null; @@ -356,6 +435,31 @@ private TaskStatus parseTaskStatus(ConnectorTaskId taskId, byte[] data) { } } + private TopicStatus parseTopicStatus(byte[] data) { + try { + SchemaAndValue schemaAndValue = converter.toConnectData(statusTopic, data); + if (!(schemaAndValue.value() instanceof Map)) { + log.error("Invalid topic status value {}", schemaAndValue.value()); + return null; + } + @SuppressWarnings("unchecked") + Object innerValue = ((Map) schemaAndValue.value()).get(TOPIC_STATE_KEY); + if (!(innerValue instanceof Map)) { + log.error("Invalid topic status value {} for field {}", innerValue, TOPIC_STATE_KEY); + return null; + } + @SuppressWarnings("unchecked") + Map topicStatusMetadata = (Map) innerValue; + return new TopicStatus((String) topicStatusMetadata.get(TOPIC_NAME_KEY), + (String) topicStatusMetadata.get(TOPIC_CONNECTOR_KEY), + ((Long) topicStatusMetadata.get(TOPIC_TASK_KEY)).intValue(), + (long) topicStatusMetadata.get(TOPIC_DISCOVER_TIMESTAMP_KEY)); + } catch (Exception e) { + log.error("Failed to deserialize topic status", e); + return null; + } + } + private byte[] serialize(AbstractStatus status) { Struct struct = new Struct(STATUS_SCHEMA_V0); struct.put(STATE_KEY_NAME, status.state().name()); @@ -363,7 +467,24 @@ private byte[] serialize(AbstractStatus status) { struct.put(TRACE_KEY_NAME, status.trace()); struct.put(WORKER_ID_KEY_NAME, status.workerId()); struct.put(GENERATION_KEY_NAME, status.generation()); - return converter.fromConnectData(topic, STATUS_SCHEMA_V0, struct); + return converter.fromConnectData(statusTopic, STATUS_SCHEMA_V0, struct); + } + + //visible for testing + protected byte[] serializeTopicStatus(TopicStatus status) { + if (status == null) { + // This should send a tombstone record that will represent delete + return null; + } + Struct struct = new Struct(TOPIC_STATUS_VALUE_SCHEMA_V0); + struct.put(TOPIC_NAME_KEY, status.topic()); + struct.put(TOPIC_CONNECTOR_KEY, status.connector()); + struct.put(TOPIC_TASK_KEY, status.task()); + struct.put(TOPIC_DISCOVER_TIMESTAMP_KEY, status.discoverTimestamp()); + return converter.fromConnectData( + statusTopic, + TOPIC_STATUS_SCHEMA_V0, + Collections.singletonMap(TOPIC_STATE_KEY, struct)); } private String parseConnectorStatusKey(String key) { @@ -434,6 +555,50 @@ private void readTaskStatus(String key, byte[] value) { } } + private void readTopicStatus(String key, byte[] value) { + int delimiterPos = key.indexOf(':'); + int beginPos = TOPIC_STATUS_PREFIX.length(); + if (beginPos > delimiterPos) { + log.warn("Discarding record with invalid topic status key {}", key); + return; + } + + String topic = key.substring(beginPos, delimiterPos); + if (topic.isEmpty()) { + log.warn("Discarding record with invalid topic status key containing empty topic {}", key); + return; + } + + beginPos = delimiterPos + TOPIC_STATUS_SEPARATOR.length(); + int endPos = key.length(); + if (beginPos > endPos) { + log.warn("Discarding record with invalid topic status key {}", key); + return; + } + + String connector = key.substring(beginPos); + if (connector.isEmpty()) { + log.warn("Discarding record with invalid topic status key containing empty connector {}", key); + return; + } + + if (value == null) { + log.trace("Removing status for topic {} and connector {}", topic, connector); + removeTopic(topic, connector); + return; + } + + TopicStatus status = parseTopicStatus(value); + if (status == null) { + log.warn("Failed to parse topic status with key {}", key); + return; + } + + log.trace("Received topic status update {}", status); + topics.computeIfAbsent(connector, k -> new ConcurrentHashMap<>()) + .put(topic, status); + } + // visible for testing void read(ConsumerRecord record) { String key = record.key(); @@ -441,6 +606,8 @@ void read(ConsumerRecord record) { readConnectorStatus(key, record.value()); } else if (key.startsWith(TASK_STATUS_PREFIX)) { readTaskStatus(key, record.value()); + } else if (key.startsWith(TOPIC_STATUS_PREFIX)) { + readTopicStatus(key, record.value()); } else { log.warn("Discarding record with invalid key {}", key); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java index 39d098dd1c59a..fbd7048ed26f3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java @@ -18,23 +18,30 @@ import org.apache.kafka.connect.runtime.ConnectorStatus; import org.apache.kafka.connect.runtime.TaskStatus; +import org.apache.kafka.connect.runtime.TopicStatus; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.Table; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; public class MemoryStatusBackingStore implements StatusBackingStore { private final Table tasks; private final Map connectors; + private final ConcurrentMap> topics; public MemoryStatusBackingStore() { this.tasks = new Table<>(); this.connectors = new HashMap<>(); + this.topics = new ConcurrentHashMap<>(); } @Override @@ -78,6 +85,12 @@ public synchronized void putSafe(TaskStatus status) { put(status); } + @Override + public void put(final TopicStatus status) { + topics.computeIfAbsent(status.connector(), k -> new ConcurrentHashMap<>()) + .put(status.topic(), status); + } + @Override public synchronized TaskStatus get(ConnectorTaskId id) { return tasks.get(id.connector(), id.task()); @@ -93,6 +106,28 @@ public synchronized Collection getAll(String connector) { return new HashSet<>(tasks.row(connector).values()); } + @Override + public TopicStatus getTopic(String connector, String topic) { + ConcurrentMap activeTopics = topics.get(Objects.requireNonNull(connector)); + return activeTopics != null ? activeTopics.get(Objects.requireNonNull(topic)) : null; + } + + @Override + public Collection getAllTopics(String connector) { + ConcurrentMap activeTopics = topics.get(Objects.requireNonNull(connector)); + return activeTopics != null + ? Collections.unmodifiableCollection(activeTopics.values()) + : Collections.emptySet(); + } + + @Override + public void deleteTopic(String connector, String topic) { + ConcurrentMap activeTopics = topics.get(Objects.requireNonNull(connector)); + if (activeTopics != null) { + activeTopics.remove(Objects.requireNonNull(topic)); + } + } + @Override public synchronized Set connectors() { return new HashSet<>(connectors.keySet()); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java index 63294a47525c5..0250932df14af 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java @@ -18,6 +18,7 @@ import org.apache.kafka.connect.runtime.ConnectorStatus; import org.apache.kafka.connect.runtime.TaskStatus; +import org.apache.kafka.connect.runtime.TopicStatus; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -66,6 +67,12 @@ public interface StatusBackingStore { */ void putSafe(TaskStatus status); + /** + * Set the state of a connector's topic to the given value. + * @param status the status of the topic used by a connector + */ + void put(TopicStatus status); + /** * Get the current state of the task. * @param id the id of the task @@ -87,6 +94,28 @@ public interface StatusBackingStore { */ Collection getAll(String connector); + /** + * Get the status of a connector's topic if the connector is actively using this topic + * @param connector the connector name; never null + * @param topic the topic name; never null + * @return the state or null if there is none + */ + TopicStatus getTopic(String connector, String topic); + + /** + * Get the states of all topics that a connector is using. + * @param connector the connector name; never null + * @return a collection of topic states or an empty collection if there is none + */ + Collection getAllTopics(String connector); + + /** + * Delete this topic from the connector's set of active topics + * @param connector the connector name; never null + * @param topic the topic name; never null + */ + void deleteTopic(String connector, String topic); + /** * Get all cached connectors. * @return the set of connector names diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index 428b3e4f022f5..d4a34bb19818d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -48,11 +48,13 @@ import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.util.SimpleConfig; import org.apache.kafka.connect.util.ConnectorTaskId; import org.easymock.Capture; import org.easymock.EasyMock; +import org.easymock.IExpectationSetters; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -135,6 +137,7 @@ public class ErrorHandlingTaskTest { @SuppressWarnings("unused") @Mock private TaskStatus.Listener statusListener; + @Mock private StatusBackingStore statusBackingStore; private ErrorHandlingMetrics errorHandlingMetrics; @@ -175,6 +178,7 @@ public void testErrorHandlingInSinkTasks() throws Exception { createSinkTask(initialState, retryWithToleranceOperator); expectInitializeTask(); + expectTaskGetTopic(true); // valid json ConsumerRecord record1 = new ConsumerRecord<>(TOPIC, PARTITION1, FIRST_OFFSET, null, "{\"a\": 10}".getBytes()); @@ -361,6 +365,29 @@ private void expectInitializeTask() throws Exception { PowerMock.expectLastCall(); } + private void expectTaskGetTopic(boolean anyTimes) { + final Capture connectorCapture = EasyMock.newCapture(); + final Capture topicCapture = EasyMock.newCapture(); + IExpectationSetters expect = EasyMock.expect(statusBackingStore.getTopic( + EasyMock.capture(connectorCapture), + EasyMock.capture(topicCapture))); + if (anyTimes) { + expect.andStubAnswer(() -> new TopicStatus( + topicCapture.getValue(), + new ConnectorTaskId(connectorCapture.getValue(), 0), + Time.SYSTEM.milliseconds())); + } else { + expect.andAnswer(() -> new TopicStatus( + topicCapture.getValue(), + new ConnectorTaskId(connectorCapture.getValue(), 0), + Time.SYSTEM.milliseconds())); + } + if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) { + assertEquals("job", connectorCapture.getValue()); + assertEquals(TOPIC, topicCapture.getValue()); + } + } + private void createSinkTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) { JsonConverter converter = new JsonConverter(); Map oo = workerConfig.originalsWithPrefix("value.converter."); @@ -373,7 +400,8 @@ private void createSinkTask(TargetState initialState, RetryWithToleranceOperator workerSinkTask = new WorkerSinkTask( taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, converter, converter, - headerConverter, sinkTransforms, consumer, pluginLoader, time, retryWithToleranceOperator); + headerConverter, sinkTransforms, consumer, pluginLoader, time, + retryWithToleranceOperator, statusBackingStore); } private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) { @@ -402,7 +430,8 @@ private void createSourceTask(TargetState initialState, RetryWithToleranceOperat WorkerSourceTask.class, new String[]{"commitOffsets", "isStopping"}, taskId, sourceTask, statusListener, initialState, converter, converter, headerConverter, sourceTransforms, producer, offsetReader, offsetWriter, workerConfig, - ClusterConfigState.EMPTY, metrics, pluginLoader, time, retryWithToleranceOperator); + ClusterConfigState.EMPTY, metrics, pluginLoader, time, retryWithToleranceOperator, + statusBackingStore); } private ConsumerRecords records(ConsumerRecord record) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 8c93528b76521..285cbbe50b5a8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.RetriableException; @@ -47,12 +48,14 @@ import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.ConnectorTaskId; import org.easymock.Capture; import org.easymock.CaptureType; import org.easymock.EasyMock; import org.easymock.IAnswer; +import org.easymock.IExpectationSetters; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -142,6 +145,8 @@ public class WorkerSinkTaskTest { @Mock private TaskStatus.Listener statusListener; @Mock + private StatusBackingStore statusBackingStore; + @Mock private KafkaConsumer consumer; private Capture rebalanceListener = EasyMock.newCapture(); private Capture topicsRegex = EasyMock.newCapture(); @@ -176,7 +181,7 @@ private void createTask(TargetState initialState, Converter keyConverter, Conver taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverter, valueConverter, headerConverter, transformationChain, consumer, pluginLoader, time, - RetryWithToleranceOperatorTest.NOOP_OPERATOR); + RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore); } @After @@ -189,6 +194,7 @@ public void testStartPaused() throws Exception { createTask(TargetState.PAUSED); expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); Set partitions = new HashSet<>(asList(TOPIC_PARTITION, TOPIC_PARTITION2)); @@ -217,6 +223,7 @@ public void testPause() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); expectConsumerPoll(1); @@ -311,6 +318,7 @@ public void testPollRedelivery() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); // If a retriable exception is thrown, we should redeliver the same batch, pausing the consumer in the meantime @@ -387,6 +395,7 @@ public void testErrorInRebalancePartitionRevocation() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); expectRebalanceRevocationError(exception); @@ -412,6 +421,7 @@ public void testErrorInRebalancePartitionAssignment() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); expectRebalanceAssignmentError(exception); @@ -435,7 +445,7 @@ public void testWakeupInCommitSyncCausesRetry() throws Exception { createTask(initialState); expectInitializeTask(); - + expectTaskGetTopic(true); expectPollInitialAssignment(); expectConsumerPoll(1); @@ -529,7 +539,7 @@ public void testRequestCommit() throws Exception { createTask(initialState); expectInitializeTask(); - + expectTaskGetTopic(true); expectPollInitialAssignment(); expectConsumerPoll(1); @@ -635,6 +645,7 @@ public void testPreCommit() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); // iter 1 expectPollInitialAssignment(); @@ -702,6 +713,7 @@ public void testIgnoredCommit() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); // iter 1 expectPollInitialAssignment(); @@ -752,6 +764,7 @@ public void testLongRunningCommitWithoutTimeout() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); // iter 1 expectPollInitialAssignment(); @@ -851,6 +864,7 @@ public void testCommitWithOutOfOrderCallback() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); // iter 1 expectPollInitialAssignment(); @@ -1071,6 +1085,7 @@ public void testDeliveryWithMutatingTransform() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); @@ -1125,6 +1140,7 @@ public void testMissingTimestampPropagation() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); expectConsumerPoll(1, RecordBatch.NO_TIMESTAMP, TimestampType.CREATE_TIME); expectConversionAndTransformation(1); @@ -1157,6 +1173,7 @@ public void testTimestampPropagation() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); expectConsumerPoll(1, timestamp, timestampType); expectConversionAndTransformation(1); @@ -1282,6 +1299,7 @@ public void testHeaders() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); expectConsumerPoll(1, headers); @@ -1307,6 +1325,7 @@ public void testHeadersWithCustomConverter() throws Exception { createTask(initialState, stringConverter, testConverter, stringConverter); expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); String keyA = "a"; @@ -1520,6 +1539,28 @@ public SinkRecord answer() { }).times(numMessages); } + private void expectTaskGetTopic(boolean anyTimes) { + final Capture connectorCapture = EasyMock.newCapture(); + final Capture topicCapture = EasyMock.newCapture(); + IExpectationSetters expect = EasyMock.expect(statusBackingStore.getTopic( + EasyMock.capture(connectorCapture), + EasyMock.capture(topicCapture))); + if (anyTimes) { + expect.andStubAnswer(() -> new TopicStatus( + topicCapture.getValue(), + new ConnectorTaskId(connectorCapture.getValue(), 0), + Time.SYSTEM.milliseconds())); + } else { + expect.andAnswer(() -> new TopicStatus( + topicCapture.getValue(), + new ConnectorTaskId(connectorCapture.getValue(), 0), + Time.SYSTEM.milliseconds())); + } + if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) { + assertEquals("job", connectorCapture.getValue()); + assertEquals(TOPIC, topicCapture.getValue()); + } + } private void assertSinkMetricValue(String name, double expected) { MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index ab20dce2d26bd..be2584a9235b7 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.connect.util.ThreadedTest; @@ -119,6 +120,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { @Mock private KafkaConsumer consumer; private Capture rebalanceListener = EasyMock.newCapture(); @Mock private TaskStatus.Listener statusListener; + @Mock private StatusBackingStore statusBackingStore; private long recordsReturned; @@ -142,7 +144,7 @@ public void setup() { taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverter, valueConverter, headerConverter, new TransformationChain<>(Collections.emptyList(), RetryWithToleranceOperatorTest.NOOP_OPERATOR), - consumer, pluginLoader, time, RetryWithToleranceOperatorTest.NOOP_OPERATOR); + consumer, pluginLoader, time, RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore); recordsReturned = 0; } @@ -155,6 +157,7 @@ public void tearDown() { @Test public void testPollsInBackground() throws Exception { expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); Capture> capturedRecords = expectPolls(1L); @@ -195,6 +198,7 @@ public void testPollsInBackground() throws Exception { @Test public void testCommit() throws Exception { expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); // Make each poll() take the offset commit interval @@ -228,6 +232,7 @@ public void testCommit() throws Exception { @Test public void testCommitFailure() throws Exception { expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); Capture> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); @@ -267,6 +272,7 @@ public void testCommitSuccessFollowedByFailure() throws Exception { // Validate that we rewind to the correct offsets if a task's preCommit() method throws an exception expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); Capture> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); expectOffsetCommit(1L, null, null, 0, true); @@ -305,6 +311,7 @@ public void testCommitSuccessFollowedByFailure() throws Exception { @Test public void testCommitConsumerFailure() throws Exception { expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); Capture> capturedRecords @@ -336,6 +343,7 @@ public void testCommitConsumerFailure() throws Exception { @Test public void testCommitTimeout() throws Exception { expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); // Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit @@ -373,6 +381,7 @@ public void testAssignmentPauseResume() throws Exception { // Just validate that the calls are passed through to the consumer, and that where appropriate errors are // converted expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); expectOnePoll().andAnswer(new IAnswer() { @@ -441,6 +450,7 @@ public Object answer() throws Throwable { @Test public void testRewind() throws Exception { expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); final long startOffset = 40L; @@ -484,6 +494,7 @@ public Object answer() throws Throwable { @Test public void testRewindOnRebalanceDuringPoll() throws Exception { expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); expectRebalanceDuringPoll().andAnswer(new IAnswer() { @@ -695,6 +706,29 @@ public Object answer() throws Throwable { return capturedCallback; } + private void expectTaskGetTopic(boolean anyTimes) { + final Capture connectorCapture = EasyMock.newCapture(); + final Capture topicCapture = EasyMock.newCapture(); + IExpectationSetters expect = EasyMock.expect(statusBackingStore.getTopic( + EasyMock.capture(connectorCapture), + EasyMock.capture(topicCapture))); + if (anyTimes) { + expect.andStubAnswer(() -> new TopicStatus( + topicCapture.getValue(), + new ConnectorTaskId(connectorCapture.getValue(), 0), + Time.SYSTEM.milliseconds())); + } else { + expect.andAnswer(() -> new TopicStatus( + topicCapture.getValue(), + new ConnectorTaskId(connectorCapture.getValue(), 0), + Time.SYSTEM.milliseconds())); + } + if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) { + assertEquals("job", connectorCapture.getValue()); + assertEquals(TOPIC, topicCapture.getValue()); + } + } + private RecordHeaders emptyHeaders() { return new RecordHeaders(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index ea3716d207111..b6c3c8d688143 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -45,6 +45,7 @@ import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -120,6 +121,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { private WorkerSourceTask workerTask; @Mock private Future sendFuture; @MockStrict private TaskStatus.Listener statusListener; + @Mock private StatusBackingStore statusBackingStore; private Capture producerCallbacks; @@ -166,7 +168,7 @@ private void createWorkerTask(TargetState initialState) { private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) { workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM, - RetryWithToleranceOperatorTest.NOOP_OPERATOR); + RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore); } @Test @@ -962,6 +964,7 @@ public Future answer() throws Throwable { if (sendSuccess) { // 3. As a result of a successful producer send callback, we'll notify the source task of the record commit expectTaskCommitRecordWithOffset(anyTimes, commitSuccess); + expectTaskGetTopic(anyTimes); } return sent; @@ -1021,6 +1024,29 @@ private void expectTaskCommitRecordWithOffset(boolean anyTimes, boolean succeed) } } + private void expectTaskGetTopic(boolean anyTimes) { + final Capture connectorCapture = EasyMock.newCapture(); + final Capture topicCapture = EasyMock.newCapture(); + IExpectationSetters expect = EasyMock.expect(statusBackingStore.getTopic( + EasyMock.capture(connectorCapture), + EasyMock.capture(topicCapture))); + if (anyTimes) { + expect.andStubAnswer(() -> new TopicStatus( + topicCapture.getValue(), + new ConnectorTaskId(connectorCapture.getValue(), 0), + Time.SYSTEM.milliseconds())); + } else { + expect.andAnswer(() -> new TopicStatus( + topicCapture.getValue(), + new ConnectorTaskId(connectorCapture.getValue(), 0), + Time.SYSTEM.milliseconds())); + } + if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) { + assertEquals("job", connectorCapture.getValue()); + assertEquals(TOPIC, topicCapture.getValue()); + } + } + private boolean awaitLatch(CountDownLatch latch) { try { return latch.await(5000, TimeUnit.MILLISECONDS); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java index 33349f4c2f5b7..44c45d5ace06f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java @@ -16,11 +16,13 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.WorkerTask.TaskMetricsGroup; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest; import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.common.utils.MockTime; import org.easymock.EasyMock; @@ -59,6 +61,8 @@ public class WorkerTaskTest { @Mock private TaskStatus.Listener statusListener; @Mock private ClassLoader loader; RetryWithToleranceOperator retryWithToleranceOperator; + @Mock + StatusBackingStore statusBackingStore; @Before public void setup() { @@ -82,9 +86,12 @@ public void standardStartup() { TargetState.class, ClassLoader.class, ConnectMetrics.class, - RetryWithToleranceOperator.class + RetryWithToleranceOperator.class, + Time.class, + StatusBackingStore.class ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, retryWithToleranceOperator) + .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, + retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) .addMockedMethod("initialize") .addMockedMethod("execute") .addMockedMethod("close") @@ -129,9 +136,12 @@ public void stopBeforeStarting() { TargetState.class, ClassLoader.class, ConnectMetrics.class, - RetryWithToleranceOperator.class + RetryWithToleranceOperator.class, + Time.class, + StatusBackingStore.class ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, retryWithToleranceOperator) + .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, + retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) .addMockedMethod("initialize") .addMockedMethod("execute") .addMockedMethod("close") @@ -169,9 +179,12 @@ public void cancelBeforeStopping() throws Exception { TargetState.class, ClassLoader.class, ConnectMetrics.class, - RetryWithToleranceOperator.class + RetryWithToleranceOperator.class, + Time.class, + StatusBackingStore.class ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, retryWithToleranceOperator) + .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, + retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) .addMockedMethod("initialize") .addMockedMethod("execute") .addMockedMethod("close") diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 7021503adbed6..16a0a765ac9fa 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -55,6 +55,7 @@ import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.ThreadedTest; import org.easymock.Capture; @@ -121,6 +122,7 @@ public class WorkerTest extends ThreadedTest { private ConnectorStatus.Listener connectorStatusListener; @Mock private Herder herder; + @Mock private StatusBackingStore statusBackingStore; @Mock private Connector connector; @Mock private ConnectorContext ctx; @Mock private TestSourceTask task; @@ -213,6 +215,7 @@ public void testStartAndStopConnector() { PowerMock.replayAll(); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; worker.start(); assertEquals(Collections.emptySet(), worker.connectorNames()); @@ -264,6 +267,7 @@ public void testStartConnectorFailure() { PowerMock.replayAll(); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; worker.start(); assertStatistics(worker, 0, 0); @@ -324,6 +328,7 @@ public void testAddConnectorByAlias() { PowerMock.replayAll(); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; worker.start(); assertStatistics(worker, 0, 0); @@ -388,6 +393,7 @@ public void testAddConnectorByShortAlias() { PowerMock.replayAll(); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; worker.start(); assertStatistics(worker, 0, 0); @@ -414,6 +420,7 @@ public void testStopInvalidConnector() { PowerMock.replayAll(); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; worker.start(); worker.stopConnector(CONNECTOR_ID); @@ -471,6 +478,7 @@ public void testReconfigureConnectorTasks() { PowerMock.replayAll(); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; worker.start(); assertStatistics(worker, 0, 0); @@ -534,7 +542,8 @@ public void testAddRemoveTask() throws Exception { anyObject(ConnectMetrics.class), anyObject(ClassLoader.class), anyObject(Time.class), - anyObject(RetryWithToleranceOperator.class)) + anyObject(RetryWithToleranceOperator.class), + anyObject(StatusBackingStore.class)) .andReturn(workerTask); Map origProps = new HashMap<>(); origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); @@ -583,6 +592,7 @@ public void testAddRemoveTask() throws Exception { worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; worker.start(); assertStatistics(worker, 0, 0); assertStartupStatistics(worker, 0, 0, 0, 0); @@ -627,7 +637,8 @@ public void testTaskStatusMetricsStatuses() throws Exception { anyObject(ConnectMetrics.class), anyObject(ClassLoader.class), anyObject(Time.class), - anyObject(RetryWithToleranceOperator.class)).andReturn(workerTask); + anyObject(RetryWithToleranceOperator.class), + anyObject(StatusBackingStore.class)).andReturn(workerTask); Map origProps = new HashMap<>(); origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); @@ -759,6 +770,7 @@ public void testConnectorStatusMetricsGroup_taskStatusCounter() { config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; Worker.ConnectorStatusMetricsGroup metricGroup = new Worker.ConnectorStatusMetricsGroup( worker.metrics(), tasks, herder @@ -799,6 +811,7 @@ public void testStartTaskFailure() { PowerMock.replayAll(); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; worker.start(); assertStatistics(worker, 0, 0); assertStartupStatistics(worker, 0, 0, 0, 0); @@ -838,7 +851,8 @@ public void testCleanupTasksOnStop() throws Exception { anyObject(ConnectMetrics.class), EasyMock.eq(pluginLoader), anyObject(Time.class), - anyObject(RetryWithToleranceOperator.class)) + anyObject(RetryWithToleranceOperator.class), + anyObject(StatusBackingStore.class)) .andReturn(workerTask); Map origProps = new HashMap<>(); origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); @@ -893,6 +907,7 @@ public void testCleanupTasksOnStop() throws Exception { worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; worker.start(); assertStatistics(worker, 0, 0); worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED); @@ -932,7 +947,8 @@ public void testConverterOverrides() throws Exception { anyObject(ConnectMetrics.class), EasyMock.eq(pluginLoader), anyObject(Time.class), - anyObject(RetryWithToleranceOperator.class)) + anyObject(RetryWithToleranceOperator.class), + anyObject(StatusBackingStore.class)) .andReturn(workerTask); Map origProps = new HashMap<>(); origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); @@ -986,6 +1002,7 @@ public void testConverterOverrides() throws Exception { worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; worker.start(); assertStatistics(worker, 0, 0); assertEquals(Collections.emptySet(), worker.taskIds()); @@ -1225,6 +1242,8 @@ private void expectStartStorage() { EasyMock.expectLastCall(); offsetBackingStore.start(); EasyMock.expectLastCall(); + EasyMock.expect(herder.statusBackingStore()) + .andReturn(statusBackingStore).anyTimes(); } private void expectStopStorage() { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java new file mode 100644 index 0000000000000..6bb4a1db702b1 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.runtime.TopicStatus; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.easymock.EasyMockSupport; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.kafka.connect.json.JsonConverterConfig.SCHEMAS_ENABLE_CONFIG; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class KafkaStatusBackingStoreFormatTest extends EasyMockSupport { + + private static final String STATUS_TOPIC = "status-topic"; + + private JsonConverter converter; + private KafkaStatusBackingStore store; + + @Before + public void setup() { + converter = new JsonConverter(); + converter.configure(Collections.singletonMap(SCHEMAS_ENABLE_CONFIG, false), false); + store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, null); + } + + @Test + public void readTopicStatus() { + TopicStatus topicStatus = new TopicStatus("foo", new ConnectorTaskId("bar", 0), Time.SYSTEM.milliseconds()); + byte[] value = store.serializeTopicStatus(topicStatus); + ConsumerRecord statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, "status-topic-foo:connector-bar", value); + store.read(statusRecord); + assertTrue(store.topics.containsKey("bar")); + assertTrue(store.topics.get("bar").containsKey("foo")); + assertEquals(topicStatus, store.topics.get("bar").get("foo")); + } + + @Test + public void deleteTopicStatus() { + TopicStatus topicStatus = new TopicStatus("foo", new ConnectorTaskId("bar", 0), Time.SYSTEM.milliseconds()); + store.topics.computeIfAbsent("bar", k -> new ConcurrentHashMap<>()).put("foo", topicStatus); + assertTrue(store.topics.containsKey("bar")); + assertTrue(store.topics.get("bar").containsKey("foo")); + assertEquals(topicStatus, store.topics.get("bar").get("foo")); + ConsumerRecord statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, "status-topic-foo:connector-bar", null); + store.read(statusRecord); + assertTrue(store.topics.containsKey("bar")); + assertFalse(store.topics.get("bar").containsKey("foo")); + assertEquals(Collections.emptyMap(), store.topics.get("bar")); + } +}