From 49dfb9543b1737bd7d16c0a59afe55caf02a25d0 Mon Sep 17 00:00:00 2001 From: Konstantine Karantasis Date: Tue, 28 Jan 2020 22:20:13 -0500 Subject: [PATCH 1/7] KAFKA-9422: Track the set of topics a connector is using (KIP-558) --- .../kafka/connect/runtime/AbstractHerder.java | 22 +++ .../apache/kafka/connect/runtime/Herder.java | 21 +- .../kafka/connect/runtime/TopicStatus.java | 57 ++++++ .../apache/kafka/connect/runtime/Worker.java | 4 +- .../kafka/connect/runtime/WorkerConfig.java | 16 +- .../kafka/connect/runtime/WorkerSinkTask.java | 10 +- .../connect/runtime/WorkerSourceTask.java | 10 +- .../kafka/connect/runtime/WorkerTask.java | 26 ++- .../rest/entities/ActiveTopicsInfo.java | 43 ++++ .../rest/resources/ConnectorsResource.java | 14 ++ .../storage/KafkaStatusBackingStore.java | 186 +++++++++++++++++- .../storage/MemoryStatusBackingStore.java | 36 ++++ .../connect/storage/StatusBackingStore.java | 29 +++ .../runtime/ErrorHandlingTaskTest.java | 33 +++- .../connect/runtime/WorkerSinkTaskTest.java | 47 ++++- .../runtime/WorkerSinkTaskThreadedTest.java | 36 +++- .../connect/runtime/WorkerSourceTaskTest.java | 28 ++- .../kafka/connect/runtime/WorkerTaskTest.java | 25 ++- .../kafka/connect/runtime/WorkerTest.java | 27 ++- .../storage/KafkaStatusBackingStoreTest.java | 58 ++++++ 20 files changed, 688 insertions(+), 40 deletions(-) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicStatus.java create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ActiveTopicsInfo.java diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 83d81bb347da0..9a5bdf6886c20 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -30,6 +30,7 @@ import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo; @@ -49,6 +50,7 @@ import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -62,6 +64,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * Abstract Herder implementation which handles connector/task lifecycle tracking. Extensions @@ -260,6 +263,25 @@ public ConnectorStateInfo connectorStatus(String connName) { conf == null ? ConnectorType.UNKNOWN : connectorTypeForClass(conf.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))); } + @Override + public Map connectorActiveTopics(String connName) { + Collection topics = statusBackingStore.getAllTopics(connName).stream() + .map(TopicStatus::topic) + .collect(Collectors.toList()); + return Collections.singletonMap(connName, new ActiveTopicsInfo(connName, topics)); + } + + @Override + public void resetConnectorActiveTopics(String connName) { + statusBackingStore.getAllTopics(connName).stream() + .forEach(status -> statusBackingStore.deleteTopic(status.connector(), status.topic())); + } + + @Override + public StatusBackingStore statusBackingStore() { + return statusBackingStore; + } + @Override public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id) { TaskStatus status = statusBackingStore.get(id); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java index 7915682790317..b344f4d366419 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -18,10 +18,12 @@ import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.InternalRequestSignature; +import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; +import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -144,6 +146,24 @@ public interface Herder { */ ConnectorStateInfo connectorStatus(String connName); + /** + * Lookup the set of topics currently used by a connector. + * @param connName name of the connector + */ + Map connectorActiveTopics(String connName); + + /** + * Lookup the set of topics currently used by a connector. + * @param connName name of the connector + */ + void resetConnectorActiveTopics(String connName); + + /** + * Return a reference to the status backing store used by this herder. + * @return the status backing store used by this herder + */ + StatusBackingStore statusBackingStore(); + /** * Lookup the status of the a task. * @param id id of the task @@ -200,7 +220,6 @@ public interface Herder { */ Plugins plugins(); - /** * Get the cluster ID of the Kafka cluster backing this Connect cluster. * @return the cluster ID of the Kafka cluster backing this connect cluster diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicStatus.java new file mode 100644 index 0000000000000..5f85d36d824c4 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicStatus.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.util.ConnectorTaskId; + +public class TopicStatus { + private final String topic; + private final String connector; + private final int task; + private final long discoverTimestamp; + + public TopicStatus(String topic, ConnectorTaskId task, long discoverTimestamp) { + //TODO: check non-null + this.topic = topic; + this.connector = task.connector(); + this.task = task.task(); + this.discoverTimestamp = discoverTimestamp; + } + + public TopicStatus(String topic, String connector, int task, long discoverTimestamp) { + this.topic = topic; + this.connector = connector; + this.task = task; + this.discoverTimestamp = discoverTimestamp; + } + + public String topic() { + return topic; + } + + public String connector() { + return connector; + } + + public int task() { + return task; + } + + public long discoverTimestamp() { + return discoverTimestamp; + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index e496d416ef11f..39390f23bda82 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -523,7 +523,7 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState, // Note we pass the configState as it performs dynamic transformations under the covers return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, configState, metrics, loader, - time, retryWithToleranceOperator); + time, retryWithToleranceOperator, herder.statusBackingStore()); } else if (task instanceof SinkTask) { TransformationChain transformationChain = new TransformationChain<>(connConfig.transformations(), retryWithToleranceOperator); log.info("Initializing: {}", transformationChain); @@ -535,7 +535,7 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState, return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter, valueConverter, headerConverter, transformationChain, consumer, loader, time, - retryWithToleranceOperator); + retryWithToleranceOperator, herder.statusBackingStore()); } else { log.error("Tasks must be a subclass of either SourceTask or SinkTask", task); throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask"); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index 17d1d5fe0c89d..347e250cefbf8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -234,6 +234,16 @@ public class WorkerConfig extends AbstractConfig { public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG; public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG; + public static final String TOPIC_TRACKING_ENABLE_CONFIG = "topic.tracking.enable"; + protected static final String TOPIC_TRACKING_ENABLE_DOC = "Enable tracking the set of active " + + "topics per connector during runtime."; + protected static final boolean TOPIC_TRACKING_ENABLE_DEFAULT = true; + + public static final String TOPIC_TRACKING_ALLOW_RESET_CONFIG = "topic.tracking.allow.reset"; + protected static final String TOPIC_TRACKING_ALLOW_RESET_DOC = "If set to true, it allows " + + "user requests to reset the set of active topics per connector."; + protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true; + /** * Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to * bootstrap their own ConfigDef. @@ -310,7 +320,11 @@ protected static ConfigDef baseConfigDef() { .define(ADMIN_LISTENERS_CONFIG, Type.LIST, null, new AdminListenersValidator(), Importance.LOW, ADMIN_LISTENERS_DOC) .define(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, Type.STRING, CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT, - Importance.MEDIUM, CONNECTOR_CLIENT_POLICY_CLASS_DOC); + Importance.MEDIUM, CONNECTOR_CLIENT_POLICY_CLASS_DOC) + .define(TOPIC_TRACKING_ENABLE_CONFIG, Type.BOOLEAN, TOPIC_TRACKING_ENABLE_DEFAULT, + Importance.LOW, TOPIC_TRACKING_ENABLE_DOC) + .define(TOPIC_TRACKING_ALLOW_RESET_CONFIG, Type.BOOLEAN, TOPIC_TRACKING_ALLOW_RESET_DEFAULT, + Importance.LOW, TOPIC_TRACKING_ALLOW_RESET_DOC); } private void logInternalConverterDeprecationWarnings(Map props) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 70b8a9bb8e58b..259676141aa70 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -45,6 +45,7 @@ import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; @@ -71,7 +72,6 @@ class WorkerSinkTask extends WorkerTask { private final SinkTask task; private final ClusterConfigState configState; private Map taskConfig; - private final Time time; private final Converter keyConverter; private final Converter valueConverter; private final HeaderConverter headerConverter; @@ -105,8 +105,10 @@ public WorkerSinkTask(ConnectorTaskId id, KafkaConsumer consumer, ClassLoader loader, Time time, - RetryWithToleranceOperator retryWithToleranceOperator) { - super(id, statusListener, initialState, loader, connectMetrics, retryWithToleranceOperator); + RetryWithToleranceOperator retryWithToleranceOperator, + StatusBackingStore statusBackingStore) { + super(id, statusListener, initialState, loader, connectMetrics, + retryWithToleranceOperator, time, statusBackingStore); this.workerConfig = workerConfig; this.task = task; @@ -115,7 +117,6 @@ public WorkerSinkTask(ConnectorTaskId id, this.valueConverter = valueConverter; this.headerConverter = headerConverter; this.transformationChain = transformationChain; - this.time = time; this.messageBatch = new ArrayList<>(); this.currentOffsets = new HashMap<>(); this.origOffsets = new HashMap<>(); @@ -504,6 +505,7 @@ private SinkRecord convertAndTransformRecord(final ConsumerRecord producer; private final CloseableOffsetStorageReader offsetReader; private final OffsetStorageWriter offsetWriter; - private final Time time; private final SourceTaskMetricsGroup sourceTaskMetricsGroup; private final AtomicReference producerSendException; @@ -112,9 +112,11 @@ public WorkerSourceTask(ConnectorTaskId id, ConnectMetrics connectMetrics, ClassLoader loader, Time time, - RetryWithToleranceOperator retryWithToleranceOperator) { + RetryWithToleranceOperator retryWithToleranceOperator, + StatusBackingStore statusBackingStore) { - super(id, statusListener, initialState, loader, connectMetrics, retryWithToleranceOperator); + super(id, statusListener, initialState, loader, connectMetrics, + retryWithToleranceOperator, time, statusBackingStore); this.workerConfig = workerConfig; this.task = task; @@ -126,7 +128,6 @@ public WorkerSourceTask(ConnectorTaskId id, this.producer = producer; this.offsetReader = offsetReader; this.offsetWriter = offsetWriter; - this.time = time; this.toSend = null; this.lastSendFailed = false; @@ -355,6 +356,7 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) { recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); commitTaskRecord(preTransformRecord, recordMetadata); + recordActiveTopic(producerRecord.topic()); } } }); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index 28d2a8ff96c30..d67962fbda15f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricNameTemplate; import org.apache.kafka.common.metrics.Measurable; @@ -30,6 +31,9 @@ import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.storage.KafkaStatusBackingStore; +import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.LoggingContext; import org.slf4j.Logger; @@ -57,6 +61,8 @@ abstract class WorkerTask implements Runnable { protected final ConnectorTaskId id; private final TaskStatus.Listener statusListener; protected final ClassLoader loader; + protected final StatusBackingStore statusBackingStore; + protected final Time time; private final CountDownLatch shutdownLatch = new CountDownLatch(1); private final TaskMetricsGroup taskMetricsGroup; private volatile TargetState targetState; @@ -70,7 +76,9 @@ public WorkerTask(ConnectorTaskId id, TargetState initialState, ClassLoader loader, ConnectMetrics connectMetrics, - RetryWithToleranceOperator retryWithToleranceOperator) { + RetryWithToleranceOperator retryWithToleranceOperator, + Time time, + StatusBackingStore statusBackingStore) { this.id = id; this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener); this.statusListener = taskMetricsGroup; @@ -80,6 +88,8 @@ public WorkerTask(ConnectorTaskId id, this.cancelled = false; this.taskMetricsGroup.recordState(this.targetState); this.retryWithToleranceOperator = retryWithToleranceOperator; + this.time = time; + this.statusBackingStore = statusBackingStore; } public ConnectorTaskId id() { @@ -279,6 +289,20 @@ public void transitionTo(TargetState state) { } } + /** + * Include this topic to the set of active topics for the connector that this worker task + * is running. This information is persisted in the status backing store used by this worker. + * + * @param topic the topic to mark as active for this connector + */ + protected void recordActiveTopic(String topic) { + if (statusBackingStore.getTopic(id.connector(), topic) != null) { + // The topic is already recorded as active. No further action is required. + return; + } + statusBackingStore.put(new TopicStatus(topic, id, time.milliseconds())); + } + /** * Record that offsets have been committed. * diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ActiveTopicsInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ActiveTopicsInfo.java new file mode 100644 index 0000000000000..b43c5aa88da84 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ActiveTopicsInfo.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.rest.entities; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Collection; + +public class ActiveTopicsInfo { + private final String connector; + private final Collection topics; + + @JsonCreator + public ActiveTopicsInfo(String connector, @JsonProperty("topics") Collection topics) { + this.connector = connector; + this.topics = topics; + } + + public String connector() { + return connector; + } + + @JsonProperty + public Collection topics() { + return topics; + } + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index 3b41fb7c143a2..5630ac7a8d4a2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -29,6 +29,7 @@ import org.apache.kafka.connect.runtime.distributed.RequestTargetException; import org.apache.kafka.connect.runtime.rest.InternalRequestSignature; import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest; @@ -173,6 +174,19 @@ public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") Strin return herder.connectorStatus(connector); } + @GET + @Path("/{connector}/topics") + public Map getConnectorActiveTopics(final @PathParam("connector") String connector) throws Throwable { + return herder.connectorActiveTopics(connector); + } + + @PUT + @Path("/{connector}/topics/reset") + public Response resetConnectorActiveTopics(final @PathParam("connector") String connector, final @Context HttpHeaders headers) throws Throwable { + herder.resetConnectorActiveTopics(connector); + return Response.accepted().build(); + } + @PUT @Path("/{connector}/config") public Response putConnectorConfig(final @PathParam("connector") String connector, diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index fee10108dd8e6..459f716bbe67c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -36,6 +36,7 @@ import org.apache.kafka.connect.runtime.AbstractStatus; import org.apache.kafka.connect.runtime.ConnectorStatus; import org.apache.kafka.connect.runtime.TaskStatus; +import org.apache.kafka.connect.runtime.TopicStatus; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; @@ -49,11 +50,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * StatusBackingStore implementation which uses a compacted topic for storage @@ -82,12 +86,20 @@ public class KafkaStatusBackingStore implements StatusBackingStore { private static final String TASK_STATUS_PREFIX = "status-task-"; private static final String CONNECTOR_STATUS_PREFIX = "status-connector-"; + private static final String TOPIC_STATUS_PREFIX = "status-topic-"; + private static final String TOPIC_STATUS_SEPARATOR = ":connector-"; public static final String STATE_KEY_NAME = "state"; public static final String TRACE_KEY_NAME = "trace"; public static final String WORKER_ID_KEY_NAME = "worker_id"; public static final String GENERATION_KEY_NAME = "generation"; + public static final String TOPIC_STATE_KEY = "topic"; + public static final String TOPIC_NAME_KEY = "name"; + public static final String TOPIC_CONNECTOR_KEY = "connector"; + public static final String TOPIC_TASK_KEY = "task"; + public static final String TOPIC_DISCOVER_TIMESTAMP_KEY = "discoverTimestamp"; + private static final Schema STATUS_SCHEMA_V0 = SchemaBuilder.struct() .field(STATE_KEY_NAME, Schema.STRING_SCHEMA) .field(TRACE_KEY_NAME, SchemaBuilder.string().optional().build()) @@ -95,12 +107,25 @@ public class KafkaStatusBackingStore implements StatusBackingStore { .field(GENERATION_KEY_NAME, Schema.INT32_SCHEMA) .build(); + private static final Schema TOPIC_STATUS_VALUE_SCHEMA_V0 = SchemaBuilder.struct() + .field(TOPIC_NAME_KEY, Schema.STRING_SCHEMA) + .field(TOPIC_CONNECTOR_KEY, Schema.STRING_SCHEMA) + .field(TOPIC_TASK_KEY, Schema.INT32_SCHEMA) + .field(TOPIC_DISCOVER_TIMESTAMP_KEY, Schema.INT64_SCHEMA) + .build(); + + private static final Schema TOPIC_STATUS_SCHEMA_V0 = SchemaBuilder.map( + Schema.STRING_SCHEMA, + TOPIC_STATUS_VALUE_SCHEMA_V0 + ).build(); + private final Time time; private final Converter converter; private final Table> tasks; private final Map> connectors; + private final ConcurrentMap> topics; - private String topic; + private String statusTopic; private KafkaBasedLog kafkaLog; private int generation; @@ -109,19 +134,20 @@ public KafkaStatusBackingStore(Time time, Converter converter) { this.converter = converter; this.tasks = new Table<>(); this.connectors = new HashMap<>(); + this.topics = new ConcurrentHashMap<>(); } // visible for testing - KafkaStatusBackingStore(Time time, Converter converter, String topic, KafkaBasedLog kafkaLog) { + KafkaStatusBackingStore(Time time, Converter converter, String statusTopic, KafkaBasedLog kafkaLog) { this(time, converter); this.kafkaLog = kafkaLog; - this.topic = topic; + this.statusTopic = statusTopic; } @Override public void configure(final WorkerConfig config) { - this.topic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG); - if (this.topic == null || this.topic.trim().length() == 0) + this.statusTopic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG); + if (this.statusTopic == null || this.statusTopic.trim().length() == 0) throw new ConfigException("Must specify topic for connector status."); Map originals = config.originals(); @@ -135,7 +161,7 @@ public void configure(final WorkerConfig config) { consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Map adminProps = new HashMap<>(originals); - NewTopic topicDescription = TopicAdmin.defineTopic(topic). + NewTopic topicDescription = TopicAdmin.defineTopic(statusTopic). compacted(). partitions(config.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG)). replicationFactor(config.getShort(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG)). @@ -147,7 +173,7 @@ public void onCompletion(Throwable error, ConsumerRecord record) read(record); } }; - this.kafkaLog = createKafkaBasedLog(topic, producerProps, consumerProps, readCallback, topicDescription, adminProps); + this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminProps); } private KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, @@ -199,6 +225,11 @@ public void putSafe(final TaskStatus status) { sendTaskStatus(status, true); } + @Override + public void put(final TopicStatus status) { + sendTopicStatus(status.connector(), status.topic(), status); + } + @Override public void flush() { kafkaLog.flush(); @@ -218,6 +249,25 @@ private void sendTaskStatus(final TaskStatus status, boolean safeWrite) { send(key, status, entry, safeWrite); } + private void sendTopicStatus(final String connector, final String topic, final TopicStatus status) { + String key = TOPIC_STATUS_PREFIX + topic + TOPIC_STATUS_SEPARATOR + connector; + + final byte[] value = serializeTopicStatus(status); + + kafkaLog.send(key, value, new org.apache.kafka.clients.producer.Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception == null) return; + // TODO: retry more gracefully and not forever + if (exception instanceof RetriableException) { + kafkaLog.send(key, value, this); + } else { + log.error("Failed to write status update", exception); + } + } + }); + } + private void send(final String key, final V status, final CacheEntry entry, @@ -287,6 +337,14 @@ private synchronized void remove(ConnectorTaskId id) { removed.delete(); } + private void removeTopic(String topic, String connector) { + ConcurrentMap activeTopics = topics.get(connector); + if (activeTopics == null) { + return; + } + activeTopics.remove(topic); + } + @Override public synchronized TaskStatus get(ConnectorTaskId id) { CacheEntry entry = tasks.get(id.connector(), id.task()); @@ -310,6 +368,27 @@ public synchronized Collection getAll(String connector) { return res; } + @Override + public TopicStatus getTopic(String connector, String topic) { + ConcurrentMap activeTopics = topics.get(connector); + return activeTopics != null + ? activeTopics.get(topic) + : null; + } + + @Override + public Collection getAllTopics(String connector) { + ConcurrentMap activeTopics = topics.get(connector); + return activeTopics != null + ? Collections.unmodifiableCollection(activeTopics.values()) + : Collections.emptySet(); + } + + @Override + public void deleteTopic(String connector, String topic) { + sendTopicStatus(connector, topic, null); + } + @Override public synchronized Set connectors() { return new HashSet<>(connectors.keySet()); @@ -317,7 +396,7 @@ public synchronized Set connectors() { private ConnectorStatus parseConnectorStatus(String connector, byte[] data) { try { - SchemaAndValue schemaAndValue = converter.toConnectData(topic, data); + SchemaAndValue schemaAndValue = converter.toConnectData(statusTopic, data); if (!(schemaAndValue.value() instanceof Map)) { log.error("Invalid connector status type {}", schemaAndValue.value().getClass()); return null; @@ -338,7 +417,7 @@ private ConnectorStatus parseConnectorStatus(String connector, byte[] data) { private TaskStatus parseTaskStatus(ConnectorTaskId taskId, byte[] data) { try { - SchemaAndValue schemaAndValue = converter.toConnectData(topic, data); + SchemaAndValue schemaAndValue = converter.toConnectData(statusTopic, data); if (!(schemaAndValue.value() instanceof Map)) { log.error("Invalid task status type {}", schemaAndValue.value().getClass()); return null; @@ -356,6 +435,30 @@ private TaskStatus parseTaskStatus(ConnectorTaskId taskId, byte[] data) { } } + private TopicStatus parseTopicStatus(byte[] data) { + try { + SchemaAndValue schemaAndValue = converter.toConnectData(statusTopic, data); + if (!(schemaAndValue.value() instanceof Map)) { + log.error("Invalid topic status value {}", schemaAndValue.value()); + return null; + } + @SuppressWarnings("unchecked") + Object innerValue = ((Map) schemaAndValue.value()).get(TOPIC_STATE_KEY); + if (!(innerValue instanceof Map)) { + log.error("Invalid topic status value {} for field {}", innerValue, TOPIC_STATE_KEY); + return null; + } + Map topicStatusMetadata = (Map) innerValue; + return new TopicStatus((String) topicStatusMetadata.get(TOPIC_NAME_KEY), + (String) topicStatusMetadata.get(TOPIC_CONNECTOR_KEY), + ((Long) topicStatusMetadata.get(TOPIC_TASK_KEY)).intValue(), + (long) topicStatusMetadata.get(TOPIC_DISCOVER_TIMESTAMP_KEY)); + } catch (Exception e) { + log.error("Failed to deserialize topic status", e); + return null; + } + } + private byte[] serialize(AbstractStatus status) { Struct struct = new Struct(STATUS_SCHEMA_V0); struct.put(STATE_KEY_NAME, status.state().name()); @@ -363,7 +466,24 @@ private byte[] serialize(AbstractStatus status) { struct.put(TRACE_KEY_NAME, status.trace()); struct.put(WORKER_ID_KEY_NAME, status.workerId()); struct.put(GENERATION_KEY_NAME, status.generation()); - return converter.fromConnectData(topic, STATUS_SCHEMA_V0, struct); + return converter.fromConnectData(statusTopic, STATUS_SCHEMA_V0, struct); + } + + //visible for testing + protected byte[] serializeTopicStatus(TopicStatus status) { + if (status == null) { + // This should send a tombstone record that will represent delete + return null; + } + Struct struct = new Struct(TOPIC_STATUS_VALUE_SCHEMA_V0); + struct.put(TOPIC_NAME_KEY, status.topic()); + struct.put(TOPIC_CONNECTOR_KEY, status.connector()); + struct.put(TOPIC_TASK_KEY, status.task()); + struct.put(TOPIC_DISCOVER_TIMESTAMP_KEY, status.discoverTimestamp()); + return converter.fromConnectData( + statusTopic, + TOPIC_STATUS_SCHEMA_V0, + Collections.singletonMap(TOPIC_STATE_KEY, struct)); } private String parseConnectorStatusKey(String key) { @@ -434,6 +554,50 @@ private void readTaskStatus(String key, byte[] value) { } } + private void readTopicStatus(String key, byte[] value) { + int delimiterPos = key.indexOf(':'); + int beginPos = TOPIC_STATUS_PREFIX.length(); + if (beginPos > delimiterPos) { + log.warn("Discarding record with invalid topic status key {}", key); + return; + } + + String topic = key.substring(beginPos, delimiterPos); + if (topic.isEmpty()) { + log.warn("Discarding record with invalid topic status key containing empty topic {}", key); + return; + } + + beginPos = delimiterPos + TOPIC_STATUS_SEPARATOR.length(); + int endPos = key.length(); + if (beginPos > endPos) { + log.warn("Discarding record with invalid topic status key {}", key); + return; + } + + String connector = key.substring(beginPos); + if (connector.isEmpty()) { + log.warn("Discarding record with invalid topic status key containing empty connector {}", key); + return; + } + + if (value == null) { + log.trace("Removing status for topic {} and connector {}", topic, connector); + removeTopic(topic, connector); + return; + } + + TopicStatus status = parseTopicStatus(value); + if (status == null) { + log.warn("Failed to parse topic status with key {}", key); + return; + } + + log.trace("Received topic status update {}", status); + topics.computeIfAbsent(connector, k -> new ConcurrentHashMap<>()) + .put(topic, status); + } + // visible for testing void read(ConsumerRecord record) { String key = record.key(); @@ -441,6 +605,8 @@ void read(ConsumerRecord record) { readConnectorStatus(key, record.value()); } else if (key.startsWith(TASK_STATUS_PREFIX)) { readTaskStatus(key, record.value()); + } else if (key.startsWith(TOPIC_STATUS_PREFIX)) { + readTopicStatus(key, record.value()); } else { log.warn("Discarding record with invalid key {}", key); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java index 39d098dd1c59a..0d55a232ff3fd 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java @@ -18,23 +18,29 @@ import org.apache.kafka.connect.runtime.ConnectorStatus; import org.apache.kafka.connect.runtime.TaskStatus; +import org.apache.kafka.connect.runtime.TopicStatus; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.Table; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; public class MemoryStatusBackingStore implements StatusBackingStore { private final Table tasks; private final Map connectors; + private final ConcurrentMap> topics; public MemoryStatusBackingStore() { this.tasks = new Table<>(); this.connectors = new HashMap<>(); + this.topics = new ConcurrentHashMap<>(); } @Override @@ -78,6 +84,12 @@ public synchronized void putSafe(TaskStatus status) { put(status); } + @Override + public void put(final TopicStatus status) { + topics.computeIfAbsent(status.connector(), k -> new ConcurrentHashMap<>()) + .put(status.topic(), status); + } + @Override public synchronized TaskStatus get(ConnectorTaskId id) { return tasks.get(id.connector(), id.task()); @@ -93,6 +105,30 @@ public synchronized Collection getAll(String connector) { return new HashSet<>(tasks.row(connector).values()); } + @Override + public TopicStatus getTopic(String connector, String topic) { + ConcurrentMap activeTopics = topics.get(connector); + return activeTopics != null + ? activeTopics.get(topic) + : null; + } + + @Override + public Collection getAllTopics(String connector) { + ConcurrentMap activeTopics = topics.get(connector); + return activeTopics != null + ? Collections.unmodifiableCollection(activeTopics.values()) + : Collections.emptySet(); + } + + @Override + public void deleteTopic(String connector, String topic) { + ConcurrentMap activeTopics = topics.get(connector); + if (activeTopics != null) { + activeTopics.remove(topic); + } + } + @Override public synchronized Set connectors() { return new HashSet<>(connectors.keySet()); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java index 63294a47525c5..1b9fb91b99d7a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java @@ -18,6 +18,7 @@ import org.apache.kafka.connect.runtime.ConnectorStatus; import org.apache.kafka.connect.runtime.TaskStatus; +import org.apache.kafka.connect.runtime.TopicStatus; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -66,6 +67,12 @@ public interface StatusBackingStore { */ void putSafe(TaskStatus status); + /** + * Set the state of a connector's topic to the given value. + * @param status the status of the topic used by a connector + */ + void put(TopicStatus status); + /** * Get the current state of the task. * @param id the id of the task @@ -87,6 +94,28 @@ public interface StatusBackingStore { */ Collection getAll(String connector); + /** + * Get the status of a connector's topic if the connector is actively using this topic + * @param connector the connector name + * @param topic the topic name + * @return the state or null if there is none + */ + TopicStatus getTopic(String connector, String topic); + + /** + * Get the states of all topics that a connector is using. + * @param connector the connector name + * @return a collection of topic states + */ + Collection getAllTopics(String connector); + + /** + * Delete this topic from the connector's set of active topics + * @param connector the connector name + * @param topic the topic name + */ + void deleteTopic(String connector, String topic); + /** * Get all cached connectors. * @return the set of connector names diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index 428b3e4f022f5..5f9e3b9728008 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -48,11 +48,13 @@ import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.util.SimpleConfig; import org.apache.kafka.connect.util.ConnectorTaskId; import org.easymock.Capture; import org.easymock.EasyMock; +import org.easymock.IExpectationSetters; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -135,6 +137,7 @@ public class ErrorHandlingTaskTest { @SuppressWarnings("unused") @Mock private TaskStatus.Listener statusListener; + @Mock private StatusBackingStore statusBackingStore; private ErrorHandlingMetrics errorHandlingMetrics; @@ -175,6 +178,7 @@ public void testErrorHandlingInSinkTasks() throws Exception { createSinkTask(initialState, retryWithToleranceOperator); expectInitializeTask(); + expectTaskGetTopic(true); // valid json ConsumerRecord record1 = new ConsumerRecord<>(TOPIC, PARTITION1, FIRST_OFFSET, null, "{\"a\": 10}".getBytes()); @@ -361,6 +365,29 @@ private void expectInitializeTask() throws Exception { PowerMock.expectLastCall(); } + private void expectTaskGetTopic(boolean anyTimes) { + final Capture connectorCapture = EasyMock.newCapture(); + final Capture topicCapture = EasyMock.newCapture(); + IExpectationSetters expect = EasyMock.expect(statusBackingStore.getTopic( + EasyMock.capture(connectorCapture), + EasyMock.capture(topicCapture))); + if (anyTimes) { + expect.andStubAnswer(() -> new TopicStatus( + topicCapture.getValue(), + new ConnectorTaskId(connectorCapture.getValue(), 0), + Time.SYSTEM.milliseconds())); + } else { + expect.andStubAnswer(() -> new TopicStatus( + topicCapture.getValue(), + new ConnectorTaskId(connectorCapture.getValue(), 0), + Time.SYSTEM.milliseconds())); + } + if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) { + assertEquals("job", connectorCapture.getValue()); + assertEquals(TOPIC, topicCapture.getValue()); + } + } + private void createSinkTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) { JsonConverter converter = new JsonConverter(); Map oo = workerConfig.originalsWithPrefix("value.converter."); @@ -373,7 +400,8 @@ private void createSinkTask(TargetState initialState, RetryWithToleranceOperator workerSinkTask = new WorkerSinkTask( taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, converter, converter, - headerConverter, sinkTransforms, consumer, pluginLoader, time, retryWithToleranceOperator); + headerConverter, sinkTransforms, consumer, pluginLoader, time, + retryWithToleranceOperator, statusBackingStore); } private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) { @@ -402,7 +430,8 @@ private void createSourceTask(TargetState initialState, RetryWithToleranceOperat WorkerSourceTask.class, new String[]{"commitOffsets", "isStopping"}, taskId, sourceTask, statusListener, initialState, converter, converter, headerConverter, sourceTransforms, producer, offsetReader, offsetWriter, workerConfig, - ClusterConfigState.EMPTY, metrics, pluginLoader, time, retryWithToleranceOperator); + ClusterConfigState.EMPTY, metrics, pluginLoader, time, retryWithToleranceOperator, + statusBackingStore); } private ConsumerRecords records(ConsumerRecord record) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 8c93528b76521..285cbbe50b5a8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.RetriableException; @@ -47,12 +48,14 @@ import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.ConnectorTaskId; import org.easymock.Capture; import org.easymock.CaptureType; import org.easymock.EasyMock; import org.easymock.IAnswer; +import org.easymock.IExpectationSetters; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -142,6 +145,8 @@ public class WorkerSinkTaskTest { @Mock private TaskStatus.Listener statusListener; @Mock + private StatusBackingStore statusBackingStore; + @Mock private KafkaConsumer consumer; private Capture rebalanceListener = EasyMock.newCapture(); private Capture topicsRegex = EasyMock.newCapture(); @@ -176,7 +181,7 @@ private void createTask(TargetState initialState, Converter keyConverter, Conver taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverter, valueConverter, headerConverter, transformationChain, consumer, pluginLoader, time, - RetryWithToleranceOperatorTest.NOOP_OPERATOR); + RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore); } @After @@ -189,6 +194,7 @@ public void testStartPaused() throws Exception { createTask(TargetState.PAUSED); expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); Set partitions = new HashSet<>(asList(TOPIC_PARTITION, TOPIC_PARTITION2)); @@ -217,6 +223,7 @@ public void testPause() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); expectConsumerPoll(1); @@ -311,6 +318,7 @@ public void testPollRedelivery() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); // If a retriable exception is thrown, we should redeliver the same batch, pausing the consumer in the meantime @@ -387,6 +395,7 @@ public void testErrorInRebalancePartitionRevocation() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); expectRebalanceRevocationError(exception); @@ -412,6 +421,7 @@ public void testErrorInRebalancePartitionAssignment() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); expectRebalanceAssignmentError(exception); @@ -435,7 +445,7 @@ public void testWakeupInCommitSyncCausesRetry() throws Exception { createTask(initialState); expectInitializeTask(); - + expectTaskGetTopic(true); expectPollInitialAssignment(); expectConsumerPoll(1); @@ -529,7 +539,7 @@ public void testRequestCommit() throws Exception { createTask(initialState); expectInitializeTask(); - + expectTaskGetTopic(true); expectPollInitialAssignment(); expectConsumerPoll(1); @@ -635,6 +645,7 @@ public void testPreCommit() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); // iter 1 expectPollInitialAssignment(); @@ -702,6 +713,7 @@ public void testIgnoredCommit() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); // iter 1 expectPollInitialAssignment(); @@ -752,6 +764,7 @@ public void testLongRunningCommitWithoutTimeout() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); // iter 1 expectPollInitialAssignment(); @@ -851,6 +864,7 @@ public void testCommitWithOutOfOrderCallback() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); // iter 1 expectPollInitialAssignment(); @@ -1071,6 +1085,7 @@ public void testDeliveryWithMutatingTransform() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); @@ -1125,6 +1140,7 @@ public void testMissingTimestampPropagation() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); expectConsumerPoll(1, RecordBatch.NO_TIMESTAMP, TimestampType.CREATE_TIME); expectConversionAndTransformation(1); @@ -1157,6 +1173,7 @@ public void testTimestampPropagation() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); expectConsumerPoll(1, timestamp, timestampType); expectConversionAndTransformation(1); @@ -1282,6 +1299,7 @@ public void testHeaders() throws Exception { createTask(initialState); expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); expectConsumerPoll(1, headers); @@ -1307,6 +1325,7 @@ public void testHeadersWithCustomConverter() throws Exception { createTask(initialState, stringConverter, testConverter, stringConverter); expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); String keyA = "a"; @@ -1520,6 +1539,28 @@ public SinkRecord answer() { }).times(numMessages); } + private void expectTaskGetTopic(boolean anyTimes) { + final Capture connectorCapture = EasyMock.newCapture(); + final Capture topicCapture = EasyMock.newCapture(); + IExpectationSetters expect = EasyMock.expect(statusBackingStore.getTopic( + EasyMock.capture(connectorCapture), + EasyMock.capture(topicCapture))); + if (anyTimes) { + expect.andStubAnswer(() -> new TopicStatus( + topicCapture.getValue(), + new ConnectorTaskId(connectorCapture.getValue(), 0), + Time.SYSTEM.milliseconds())); + } else { + expect.andAnswer(() -> new TopicStatus( + topicCapture.getValue(), + new ConnectorTaskId(connectorCapture.getValue(), 0), + Time.SYSTEM.milliseconds())); + } + if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) { + assertEquals("job", connectorCapture.getValue()); + assertEquals(TOPIC, topicCapture.getValue()); + } + } private void assertSinkMetricValue(String name, double expected) { MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index ab20dce2d26bd..be2584a9235b7 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.connect.util.ThreadedTest; @@ -119,6 +120,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { @Mock private KafkaConsumer consumer; private Capture rebalanceListener = EasyMock.newCapture(); @Mock private TaskStatus.Listener statusListener; + @Mock private StatusBackingStore statusBackingStore; private long recordsReturned; @@ -142,7 +144,7 @@ public void setup() { taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverter, valueConverter, headerConverter, new TransformationChain<>(Collections.emptyList(), RetryWithToleranceOperatorTest.NOOP_OPERATOR), - consumer, pluginLoader, time, RetryWithToleranceOperatorTest.NOOP_OPERATOR); + consumer, pluginLoader, time, RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore); recordsReturned = 0; } @@ -155,6 +157,7 @@ public void tearDown() { @Test public void testPollsInBackground() throws Exception { expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); Capture> capturedRecords = expectPolls(1L); @@ -195,6 +198,7 @@ public void testPollsInBackground() throws Exception { @Test public void testCommit() throws Exception { expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); // Make each poll() take the offset commit interval @@ -228,6 +232,7 @@ public void testCommit() throws Exception { @Test public void testCommitFailure() throws Exception { expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); Capture> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); @@ -267,6 +272,7 @@ public void testCommitSuccessFollowedByFailure() throws Exception { // Validate that we rewind to the correct offsets if a task's preCommit() method throws an exception expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); Capture> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); expectOffsetCommit(1L, null, null, 0, true); @@ -305,6 +311,7 @@ public void testCommitSuccessFollowedByFailure() throws Exception { @Test public void testCommitConsumerFailure() throws Exception { expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); Capture> capturedRecords @@ -336,6 +343,7 @@ public void testCommitConsumerFailure() throws Exception { @Test public void testCommitTimeout() throws Exception { expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); // Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit @@ -373,6 +381,7 @@ public void testAssignmentPauseResume() throws Exception { // Just validate that the calls are passed through to the consumer, and that where appropriate errors are // converted expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); expectOnePoll().andAnswer(new IAnswer() { @@ -441,6 +450,7 @@ public Object answer() throws Throwable { @Test public void testRewind() throws Exception { expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); final long startOffset = 40L; @@ -484,6 +494,7 @@ public Object answer() throws Throwable { @Test public void testRewindOnRebalanceDuringPoll() throws Exception { expectInitializeTask(); + expectTaskGetTopic(true); expectPollInitialAssignment(); expectRebalanceDuringPoll().andAnswer(new IAnswer() { @@ -695,6 +706,29 @@ public Object answer() throws Throwable { return capturedCallback; } + private void expectTaskGetTopic(boolean anyTimes) { + final Capture connectorCapture = EasyMock.newCapture(); + final Capture topicCapture = EasyMock.newCapture(); + IExpectationSetters expect = EasyMock.expect(statusBackingStore.getTopic( + EasyMock.capture(connectorCapture), + EasyMock.capture(topicCapture))); + if (anyTimes) { + expect.andStubAnswer(() -> new TopicStatus( + topicCapture.getValue(), + new ConnectorTaskId(connectorCapture.getValue(), 0), + Time.SYSTEM.milliseconds())); + } else { + expect.andAnswer(() -> new TopicStatus( + topicCapture.getValue(), + new ConnectorTaskId(connectorCapture.getValue(), 0), + Time.SYSTEM.milliseconds())); + } + if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) { + assertEquals("job", connectorCapture.getValue()); + assertEquals(TOPIC, topicCapture.getValue()); + } + } + private RecordHeaders emptyHeaders() { return new RecordHeaders(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index ea3716d207111..b6c3c8d688143 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -45,6 +45,7 @@ import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -120,6 +121,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { private WorkerSourceTask workerTask; @Mock private Future sendFuture; @MockStrict private TaskStatus.Listener statusListener; + @Mock private StatusBackingStore statusBackingStore; private Capture producerCallbacks; @@ -166,7 +168,7 @@ private void createWorkerTask(TargetState initialState) { private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) { workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM, - RetryWithToleranceOperatorTest.NOOP_OPERATOR); + RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore); } @Test @@ -962,6 +964,7 @@ public Future answer() throws Throwable { if (sendSuccess) { // 3. As a result of a successful producer send callback, we'll notify the source task of the record commit expectTaskCommitRecordWithOffset(anyTimes, commitSuccess); + expectTaskGetTopic(anyTimes); } return sent; @@ -1021,6 +1024,29 @@ private void expectTaskCommitRecordWithOffset(boolean anyTimes, boolean succeed) } } + private void expectTaskGetTopic(boolean anyTimes) { + final Capture connectorCapture = EasyMock.newCapture(); + final Capture topicCapture = EasyMock.newCapture(); + IExpectationSetters expect = EasyMock.expect(statusBackingStore.getTopic( + EasyMock.capture(connectorCapture), + EasyMock.capture(topicCapture))); + if (anyTimes) { + expect.andStubAnswer(() -> new TopicStatus( + topicCapture.getValue(), + new ConnectorTaskId(connectorCapture.getValue(), 0), + Time.SYSTEM.milliseconds())); + } else { + expect.andAnswer(() -> new TopicStatus( + topicCapture.getValue(), + new ConnectorTaskId(connectorCapture.getValue(), 0), + Time.SYSTEM.milliseconds())); + } + if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) { + assertEquals("job", connectorCapture.getValue()); + assertEquals(TOPIC, topicCapture.getValue()); + } + } + private boolean awaitLatch(CountDownLatch latch) { try { return latch.await(5000, TimeUnit.MILLISECONDS); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java index 33349f4c2f5b7..44c45d5ace06f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java @@ -16,11 +16,13 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.WorkerTask.TaskMetricsGroup; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest; import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.common.utils.MockTime; import org.easymock.EasyMock; @@ -59,6 +61,8 @@ public class WorkerTaskTest { @Mock private TaskStatus.Listener statusListener; @Mock private ClassLoader loader; RetryWithToleranceOperator retryWithToleranceOperator; + @Mock + StatusBackingStore statusBackingStore; @Before public void setup() { @@ -82,9 +86,12 @@ public void standardStartup() { TargetState.class, ClassLoader.class, ConnectMetrics.class, - RetryWithToleranceOperator.class + RetryWithToleranceOperator.class, + Time.class, + StatusBackingStore.class ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, retryWithToleranceOperator) + .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, + retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) .addMockedMethod("initialize") .addMockedMethod("execute") .addMockedMethod("close") @@ -129,9 +136,12 @@ public void stopBeforeStarting() { TargetState.class, ClassLoader.class, ConnectMetrics.class, - RetryWithToleranceOperator.class + RetryWithToleranceOperator.class, + Time.class, + StatusBackingStore.class ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, retryWithToleranceOperator) + .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, + retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) .addMockedMethod("initialize") .addMockedMethod("execute") .addMockedMethod("close") @@ -169,9 +179,12 @@ public void cancelBeforeStopping() throws Exception { TargetState.class, ClassLoader.class, ConnectMetrics.class, - RetryWithToleranceOperator.class + RetryWithToleranceOperator.class, + Time.class, + StatusBackingStore.class ) - .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, retryWithToleranceOperator) + .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics, + retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) .addMockedMethod("initialize") .addMockedMethod("execute") .addMockedMethod("close") diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 7021503adbed6..16a0a765ac9fa 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -55,6 +55,7 @@ import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.ThreadedTest; import org.easymock.Capture; @@ -121,6 +122,7 @@ public class WorkerTest extends ThreadedTest { private ConnectorStatus.Listener connectorStatusListener; @Mock private Herder herder; + @Mock private StatusBackingStore statusBackingStore; @Mock private Connector connector; @Mock private ConnectorContext ctx; @Mock private TestSourceTask task; @@ -213,6 +215,7 @@ public void testStartAndStopConnector() { PowerMock.replayAll(); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; worker.start(); assertEquals(Collections.emptySet(), worker.connectorNames()); @@ -264,6 +267,7 @@ public void testStartConnectorFailure() { PowerMock.replayAll(); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; worker.start(); assertStatistics(worker, 0, 0); @@ -324,6 +328,7 @@ public void testAddConnectorByAlias() { PowerMock.replayAll(); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; worker.start(); assertStatistics(worker, 0, 0); @@ -388,6 +393,7 @@ public void testAddConnectorByShortAlias() { PowerMock.replayAll(); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; worker.start(); assertStatistics(worker, 0, 0); @@ -414,6 +420,7 @@ public void testStopInvalidConnector() { PowerMock.replayAll(); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; worker.start(); worker.stopConnector(CONNECTOR_ID); @@ -471,6 +478,7 @@ public void testReconfigureConnectorTasks() { PowerMock.replayAll(); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; worker.start(); assertStatistics(worker, 0, 0); @@ -534,7 +542,8 @@ public void testAddRemoveTask() throws Exception { anyObject(ConnectMetrics.class), anyObject(ClassLoader.class), anyObject(Time.class), - anyObject(RetryWithToleranceOperator.class)) + anyObject(RetryWithToleranceOperator.class), + anyObject(StatusBackingStore.class)) .andReturn(workerTask); Map origProps = new HashMap<>(); origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); @@ -583,6 +592,7 @@ public void testAddRemoveTask() throws Exception { worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; worker.start(); assertStatistics(worker, 0, 0); assertStartupStatistics(worker, 0, 0, 0, 0); @@ -627,7 +637,8 @@ public void testTaskStatusMetricsStatuses() throws Exception { anyObject(ConnectMetrics.class), anyObject(ClassLoader.class), anyObject(Time.class), - anyObject(RetryWithToleranceOperator.class)).andReturn(workerTask); + anyObject(RetryWithToleranceOperator.class), + anyObject(StatusBackingStore.class)).andReturn(workerTask); Map origProps = new HashMap<>(); origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); @@ -759,6 +770,7 @@ public void testConnectorStatusMetricsGroup_taskStatusCounter() { config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; Worker.ConnectorStatusMetricsGroup metricGroup = new Worker.ConnectorStatusMetricsGroup( worker.metrics(), tasks, herder @@ -799,6 +811,7 @@ public void testStartTaskFailure() { PowerMock.replayAll(); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; worker.start(); assertStatistics(worker, 0, 0); assertStartupStatistics(worker, 0, 0, 0, 0); @@ -838,7 +851,8 @@ public void testCleanupTasksOnStop() throws Exception { anyObject(ConnectMetrics.class), EasyMock.eq(pluginLoader), anyObject(Time.class), - anyObject(RetryWithToleranceOperator.class)) + anyObject(RetryWithToleranceOperator.class), + anyObject(StatusBackingStore.class)) .andReturn(workerTask); Map origProps = new HashMap<>(); origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); @@ -893,6 +907,7 @@ public void testCleanupTasksOnStop() throws Exception { worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; worker.start(); assertStatistics(worker, 0, 0); worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED); @@ -932,7 +947,8 @@ public void testConverterOverrides() throws Exception { anyObject(ConnectMetrics.class), EasyMock.eq(pluginLoader), anyObject(Time.class), - anyObject(RetryWithToleranceOperator.class)) + anyObject(RetryWithToleranceOperator.class), + anyObject(StatusBackingStore.class)) .andReturn(workerTask); Map origProps = new HashMap<>(); origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); @@ -986,6 +1002,7 @@ public void testConverterOverrides() throws Exception { worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService, noneConnectorClientConfigOverridePolicy); + worker.herder = herder; worker.start(); assertStatistics(worker, 0, 0); assertEquals(Collections.emptySet(), worker.taskIds()); @@ -1225,6 +1242,8 @@ private void expectStartStorage() { EasyMock.expectLastCall(); offsetBackingStore.start(); EasyMock.expectLastCall(); + EasyMock.expect(herder.statusBackingStore()) + .andReturn(statusBackingStore).anyTimes(); } private void expectStopStorage() { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java index e2f5a406c195e..b60680a36fc89 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.runtime.ConnectorStatus; import org.apache.kafka.connect.runtime.TaskStatus; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -394,6 +395,63 @@ public void readTaskState() { verifyAll(); } + @Test + public void putTopicState() { + KafkaBasedLog kafkaBasedLog = mock(KafkaBasedLog.class); + Converter converter = mock(JsonConverter.class); + KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog); + + byte[] value = new byte[0]; + expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class))) + .andStubReturn(value); + + final Capture callbackCapture = newCapture(); + kafkaBasedLog.send(eq("status-connector-conn"), eq(value), capture(callbackCapture)); + expectLastCall() + .andAnswer(new IAnswer() { + @Override + public Void answer() throws Throwable { + callbackCapture.getValue().onCompletion(null, null); + return null; + } + }); + replayAll(); + + ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0); + store.put(status); + + // state is not visible until read back from the log + assertEquals(null, store.get(CONNECTOR)); + + verifyAll(); + } + + @Test + public void readTopicState() { + byte[] value = new byte[0]; + + KafkaBasedLog kafkaBasedLog = mock(KafkaBasedLog.class); + Converter converter = mock(Converter.class); + KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog); + + Map statusMap = new HashMap<>(); + statusMap.put("worker_id", WORKER_ID); + statusMap.put("state", "RUNNING"); + statusMap.put("generation", 0L); + + expect(converter.toConnectData(STATUS_TOPIC, value)) + .andReturn(new SchemaAndValue(null, statusMap)); + + replayAll(); + + store.read(consumerRecord(0, "status-connector-conn", value)); + + ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0); + assertEquals(status, store.get(CONNECTOR)); + + verifyAll(); + } + private static ConsumerRecord consumerRecord(long offset, String key, byte[] value) { return new ConsumerRecord<>(STATUS_TOPIC, 0, offset, System.currentTimeMillis(), TimestampType.CREATE_TIME, 0L, 0, 0, key, value); From ac9e125100080f82e3868c2241e7b93621416375 Mon Sep 17 00:00:00 2001 From: Konstantine Karantasis Date: Tue, 28 Jan 2020 22:38:37 -0500 Subject: [PATCH 2/7] KAFKA-9422: Respond with error on topic/reset requests when reset is disabled --- .../connect/runtime/rest/resources/ConnectorsResource.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index 5630ac7a8d4a2..1cadba368a082 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -65,6 +65,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ALLOW_RESET_CONFIG; + @Path("/connectors") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) @@ -183,6 +185,11 @@ public Map getConnectorActiveTopics(final @PathParam(" @PUT @Path("/{connector}/topics/reset") public Response resetConnectorActiveTopics(final @PathParam("connector") String connector, final @Context HttpHeaders headers) throws Throwable { + if (!config.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)) { + return Response.status(Response.Status.FORBIDDEN) + .entity("Topic tracking reset is disabled.") + .build(); + } herder.resetConnectorActiveTopics(connector); return Response.accepted().build(); } From 228f634ccacaeae08da4bad741f3999c5387db16 Mon Sep 17 00:00:00 2001 From: Konstantine Karantasis Date: Wed, 29 Jan 2020 14:56:50 -0500 Subject: [PATCH 3/7] KAFKA-9422: Fix formatting --- .../java/org/apache/kafka/connect/runtime/WorkerSinkTask.java | 4 ++-- .../apache/kafka/connect/storage/KafkaStatusBackingStore.java | 4 +--- .../kafka/connect/storage/MemoryStatusBackingStore.java | 4 +--- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 259676141aa70..d1d1eecc530a1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -107,8 +107,8 @@ public WorkerSinkTask(ConnectorTaskId id, Time time, RetryWithToleranceOperator retryWithToleranceOperator, StatusBackingStore statusBackingStore) { - super(id, statusListener, initialState, loader, connectMetrics, - retryWithToleranceOperator, time, statusBackingStore); + super(id, statusListener, initialState, loader, connectMetrics, + retryWithToleranceOperator, time, statusBackingStore); this.workerConfig = workerConfig; this.task = task; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index 459f716bbe67c..550d081a40a73 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -371,9 +371,7 @@ public synchronized Collection getAll(String connector) { @Override public TopicStatus getTopic(String connector, String topic) { ConcurrentMap activeTopics = topics.get(connector); - return activeTopics != null - ? activeTopics.get(topic) - : null; + return activeTopics != null ? activeTopics.get(topic) : null; } @Override diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java index 0d55a232ff3fd..ee6b5089dabe3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java @@ -108,9 +108,7 @@ public synchronized Collection getAll(String connector) { @Override public TopicStatus getTopic(String connector, String topic) { ConcurrentMap activeTopics = topics.get(connector); - return activeTopics != null - ? activeTopics.get(topic) - : null; + return activeTopics != null ? activeTopics.get(topic) : null; } @Override From ec119b9ad4fbe6e30eabe165d5ba7818b3dcf652 Mon Sep 17 00:00:00 2001 From: Konstantine Karantasis Date: Wed, 29 Jan 2020 16:20:04 -0500 Subject: [PATCH 4/7] KAFKA-9422: Fix unchecked cast warning and unused imports --- .../java/org/apache/kafka/connect/runtime/AbstractHerder.java | 1 - .../main/java/org/apache/kafka/connect/runtime/WorkerTask.java | 3 --- .../apache/kafka/connect/storage/KafkaStatusBackingStore.java | 1 + 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 9a5bdf6886c20..16c734c41aa85 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -50,7 +50,6 @@ import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index d67962fbda15f..c40dc90012e33 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.connect.runtime; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricNameTemplate; import org.apache.kafka.common.metrics.Measurable; @@ -31,8 +30,6 @@ import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.isolation.Plugins; -import org.apache.kafka.connect.source.SourceRecord; -import org.apache.kafka.connect.storage.KafkaStatusBackingStore; import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.LoggingContext; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index 550d081a40a73..5161a94331d87 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -446,6 +446,7 @@ private TopicStatus parseTopicStatus(byte[] data) { log.error("Invalid topic status value {} for field {}", innerValue, TOPIC_STATE_KEY); return null; } + @SuppressWarnings("unchecked") Map topicStatusMetadata = (Map) innerValue; return new TopicStatus((String) topicStatusMetadata.get(TOPIC_NAME_KEY), (String) topicStatusMetadata.get(TOPIC_CONNECTOR_KEY), From 7c36f7b90d7f29af5dd6daf298b4261006726ac0 Mon Sep 17 00:00:00 2001 From: Konstantine Karantasis Date: Wed, 29 Jan 2020 19:25:30 -0500 Subject: [PATCH 5/7] KAFKA-9422: Address comments --- .../kafka/connect/runtime/AbstractHerder.java | 4 +- .../apache/kafka/connect/runtime/Herder.java | 8 ++- .../kafka/connect/runtime/TopicStatus.java | 67 +++++++++++++++++-- .../rest/resources/ConnectorsResource.java | 4 +- .../storage/KafkaStatusBackingStore.java | 11 +-- .../storage/MemoryStatusBackingStore.java | 11 +-- .../connect/storage/StatusBackingStore.java | 12 ++-- .../runtime/ErrorHandlingTaskTest.java | 2 +- .../storage/KafkaStatusBackingStoreTest.java | 58 ---------------- 9 files changed, 90 insertions(+), 87 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 16c734c41aa85..f0315b6eb7f39 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -263,11 +263,11 @@ public ConnectorStateInfo connectorStatus(String connName) { } @Override - public Map connectorActiveTopics(String connName) { + public ActiveTopicsInfo connectorActiveTopics(String connName) { Collection topics = statusBackingStore.getAllTopics(connName).stream() .map(TopicStatus::topic) .collect(Collectors.toList()); - return Collections.singletonMap(connName, new ActiveTopicsInfo(connName, topics)); + return new ActiveTopicsInfo(connName, topics); } @Override diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java index b344f4d366419..6cb1a475b88f1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -148,18 +148,22 @@ public interface Herder { /** * Lookup the set of topics currently used by a connector. + * * @param connName name of the connector + * @return the set of active topics */ - Map connectorActiveTopics(String connName); + ActiveTopicsInfo connectorActiveTopics(String connName); /** - * Lookup the set of topics currently used by a connector. + * Request to asynchronously reset the active topics for the named connector. + * * @param connName name of the connector */ void resetConnectorActiveTopics(String connName); /** * Return a reference to the status backing store used by this herder. + * * @return the status backing store used by this herder */ StatusBackingStore statusBackingStore(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicStatus.java index 5f85d36d824c4..16dcd80b43f84 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicStatus.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicStatus.java @@ -18,6 +18,12 @@ import org.apache.kafka.connect.util.ConnectorTaskId; +import java.util.Objects; + +/** + * Represents the metadata that is stored as the value of the record that is stored in the + * {@link org.apache.kafka.connect.storage.StatusBackingStore#put(TopicStatus)}, + */ public class TopicStatus { private final String topic; private final String connector; @@ -25,33 +31,80 @@ public class TopicStatus { private final long discoverTimestamp; public TopicStatus(String topic, ConnectorTaskId task, long discoverTimestamp) { - //TODO: check non-null - this.topic = topic; - this.connector = task.connector(); - this.task = task.task(); - this.discoverTimestamp = discoverTimestamp; + this(topic, task.connector(), task.task(), discoverTimestamp); } public TopicStatus(String topic, String connector, int task, long discoverTimestamp) { - this.topic = topic; - this.connector = connector; + this.topic = Objects.requireNonNull(topic); + this.connector = Objects.requireNonNull(connector); this.task = task; this.discoverTimestamp = discoverTimestamp; } + /** + * Get the name of the topic. + * + * @return the topic name; never null + */ public String topic() { return topic; } + /** + * Get the name of the connector. + * + * @return the connector name; never null + */ public String connector() { return connector; } + /** + * Get the ID of the task that stored the topic status. + * + * @return the task ID + */ public int task() { return task; } + /** + * Get a timestamp that represents when this topic was discovered as being actively used by + * this connector. + * + * @return the discovery timestamp + */ public long discoverTimestamp() { return discoverTimestamp; } + + @Override + public String toString() { + return "TopicStatus{" + + "topic='" + topic + '\'' + + ", connector='" + connector + '\'' + + ", task=" + task + + ", discoverTimestamp=" + discoverTimestamp + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TopicStatus)) { + return false; + } + TopicStatus that = (TopicStatus) o; + return task == that.task && + discoverTimestamp == that.discoverTimestamp && + topic.equals(that.topic) && + connector.equals(that.connector); + } + + @Override + public int hashCode() { + return Objects.hash(topic, connector, task, discoverTimestamp); + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index 1cadba368a082..f8ac50fe03aa8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -58,6 +58,7 @@ import javax.ws.rs.core.UriInfo; import java.net.URI; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -179,7 +180,8 @@ public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") Strin @GET @Path("/{connector}/topics") public Map getConnectorActiveTopics(final @PathParam("connector") String connector) throws Throwable { - return herder.connectorActiveTopics(connector); + ActiveTopicsInfo info = herder.connectorActiveTopics(connector); + return Collections.singletonMap(info.connector(), info); } @PUT diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index 5161a94331d87..1393fac1f433a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -55,6 +55,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -370,21 +371,21 @@ public synchronized Collection getAll(String connector) { @Override public TopicStatus getTopic(String connector, String topic) { - ConcurrentMap activeTopics = topics.get(connector); - return activeTopics != null ? activeTopics.get(topic) : null; + ConcurrentMap activeTopics = topics.get(Objects.requireNonNull(connector)); + return activeTopics != null ? activeTopics.get(Objects.requireNonNull(topic)) : null; } @Override public Collection getAllTopics(String connector) { - ConcurrentMap activeTopics = topics.get(connector); + ConcurrentMap activeTopics = topics.get(Objects.requireNonNull(connector)); return activeTopics != null - ? Collections.unmodifiableCollection(activeTopics.values()) + ? Collections.unmodifiableCollection(Objects.requireNonNull(activeTopics.values())) : Collections.emptySet(); } @Override public void deleteTopic(String connector, String topic) { - sendTopicStatus(connector, topic, null); + sendTopicStatus(Objects.requireNonNull(connector), Objects.requireNonNull(topic), null); } @Override diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java index ee6b5089dabe3..fbd7048ed26f3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -107,13 +108,13 @@ public synchronized Collection getAll(String connector) { @Override public TopicStatus getTopic(String connector, String topic) { - ConcurrentMap activeTopics = topics.get(connector); - return activeTopics != null ? activeTopics.get(topic) : null; + ConcurrentMap activeTopics = topics.get(Objects.requireNonNull(connector)); + return activeTopics != null ? activeTopics.get(Objects.requireNonNull(topic)) : null; } @Override public Collection getAllTopics(String connector) { - ConcurrentMap activeTopics = topics.get(connector); + ConcurrentMap activeTopics = topics.get(Objects.requireNonNull(connector)); return activeTopics != null ? Collections.unmodifiableCollection(activeTopics.values()) : Collections.emptySet(); @@ -121,9 +122,9 @@ public Collection getAllTopics(String connector) { @Override public void deleteTopic(String connector, String topic) { - ConcurrentMap activeTopics = topics.get(connector); + ConcurrentMap activeTopics = topics.get(Objects.requireNonNull(connector)); if (activeTopics != null) { - activeTopics.remove(topic); + activeTopics.remove(Objects.requireNonNull(topic)); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java index 1b9fb91b99d7a..0250932df14af 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java @@ -96,23 +96,23 @@ public interface StatusBackingStore { /** * Get the status of a connector's topic if the connector is actively using this topic - * @param connector the connector name - * @param topic the topic name + * @param connector the connector name; never null + * @param topic the topic name; never null * @return the state or null if there is none */ TopicStatus getTopic(String connector, String topic); /** * Get the states of all topics that a connector is using. - * @param connector the connector name - * @return a collection of topic states + * @param connector the connector name; never null + * @return a collection of topic states or an empty collection if there is none */ Collection getAllTopics(String connector); /** * Delete this topic from the connector's set of active topics - * @param connector the connector name - * @param topic the topic name + * @param connector the connector name; never null + * @param topic the topic name; never null */ void deleteTopic(String connector, String topic); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index 5f9e3b9728008..d4a34bb19818d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -377,7 +377,7 @@ private void expectTaskGetTopic(boolean anyTimes) { new ConnectorTaskId(connectorCapture.getValue(), 0), Time.SYSTEM.milliseconds())); } else { - expect.andStubAnswer(() -> new TopicStatus( + expect.andAnswer(() -> new TopicStatus( topicCapture.getValue(), new ConnectorTaskId(connectorCapture.getValue(), 0), Time.SYSTEM.milliseconds())); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java index b60680a36fc89..e2f5a406c195e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java @@ -25,7 +25,6 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.runtime.ConnectorStatus; import org.apache.kafka.connect.runtime.TaskStatus; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -395,63 +394,6 @@ public void readTaskState() { verifyAll(); } - @Test - public void putTopicState() { - KafkaBasedLog kafkaBasedLog = mock(KafkaBasedLog.class); - Converter converter = mock(JsonConverter.class); - KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog); - - byte[] value = new byte[0]; - expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class))) - .andStubReturn(value); - - final Capture callbackCapture = newCapture(); - kafkaBasedLog.send(eq("status-connector-conn"), eq(value), capture(callbackCapture)); - expectLastCall() - .andAnswer(new IAnswer() { - @Override - public Void answer() throws Throwable { - callbackCapture.getValue().onCompletion(null, null); - return null; - } - }); - replayAll(); - - ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0); - store.put(status); - - // state is not visible until read back from the log - assertEquals(null, store.get(CONNECTOR)); - - verifyAll(); - } - - @Test - public void readTopicState() { - byte[] value = new byte[0]; - - KafkaBasedLog kafkaBasedLog = mock(KafkaBasedLog.class); - Converter converter = mock(Converter.class); - KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog); - - Map statusMap = new HashMap<>(); - statusMap.put("worker_id", WORKER_ID); - statusMap.put("state", "RUNNING"); - statusMap.put("generation", 0L); - - expect(converter.toConnectData(STATUS_TOPIC, value)) - .andReturn(new SchemaAndValue(null, statusMap)); - - replayAll(); - - store.read(consumerRecord(0, "status-connector-conn", value)); - - ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0); - assertEquals(status, store.get(CONNECTOR)); - - verifyAll(); - } - private static ConsumerRecord consumerRecord(long offset, String key, byte[] value) { return new ConsumerRecord<>(STATUS_TOPIC, 0, offset, System.currentTimeMillis(), TimestampType.CREATE_TIME, 0L, 0, 0, key, value); From 9529294e04330496e14eed402dedbd4adf7a2421 Mon Sep 17 00:00:00 2001 From: Konstantine Karantasis Date: Wed, 29 Jan 2020 19:39:17 -0500 Subject: [PATCH 6/7] KAFKA-9422: Add a test on topic status records --- .../storage/KafkaStatusBackingStore.java | 3 +- .../KafkaStatusBackingStoreFormatTest.java | 75 +++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) create mode 100644 connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index 1393fac1f433a..6b1535081a304 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -124,7 +124,8 @@ public class KafkaStatusBackingStore implements StatusBackingStore { private final Converter converter; private final Table> tasks; private final Map> connectors; - private final ConcurrentMap> topics; + //visible for testing + protected final ConcurrentMap> topics; private String statusTopic; private KafkaBasedLog kafkaLog; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java new file mode 100644 index 0000000000000..6bb4a1db702b1 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.runtime.TopicStatus; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.easymock.EasyMockSupport; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.kafka.connect.json.JsonConverterConfig.SCHEMAS_ENABLE_CONFIG; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class KafkaStatusBackingStoreFormatTest extends EasyMockSupport { + + private static final String STATUS_TOPIC = "status-topic"; + + private JsonConverter converter; + private KafkaStatusBackingStore store; + + @Before + public void setup() { + converter = new JsonConverter(); + converter.configure(Collections.singletonMap(SCHEMAS_ENABLE_CONFIG, false), false); + store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, null); + } + + @Test + public void readTopicStatus() { + TopicStatus topicStatus = new TopicStatus("foo", new ConnectorTaskId("bar", 0), Time.SYSTEM.milliseconds()); + byte[] value = store.serializeTopicStatus(topicStatus); + ConsumerRecord statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, "status-topic-foo:connector-bar", value); + store.read(statusRecord); + assertTrue(store.topics.containsKey("bar")); + assertTrue(store.topics.get("bar").containsKey("foo")); + assertEquals(topicStatus, store.topics.get("bar").get("foo")); + } + + @Test + public void deleteTopicStatus() { + TopicStatus topicStatus = new TopicStatus("foo", new ConnectorTaskId("bar", 0), Time.SYSTEM.milliseconds()); + store.topics.computeIfAbsent("bar", k -> new ConcurrentHashMap<>()).put("foo", topicStatus); + assertTrue(store.topics.containsKey("bar")); + assertTrue(store.topics.get("bar").containsKey("foo")); + assertEquals(topicStatus, store.topics.get("bar").get("foo")); + ConsumerRecord statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, "status-topic-foo:connector-bar", null); + store.read(statusRecord); + assertTrue(store.topics.containsKey("bar")); + assertFalse(store.topics.get("bar").containsKey("foo")); + assertEquals(Collections.emptyMap(), store.topics.get("bar")); + } +} From 7328b172193c7c52d38fe24553e50c846e18319e Mon Sep 17 00:00:00 2001 From: Konstantine Karantasis Date: Wed, 29 Jan 2020 20:18:24 -0500 Subject: [PATCH 7/7] KAFKA-9422: Return error on topics endpoint when topic tracking is disabled --- .../rest/resources/ConnectorsResource.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index f8ac50fe03aa8..6d30eabd2ee46 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -67,6 +67,7 @@ import java.util.concurrent.TimeoutException; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ALLOW_RESET_CONFIG; +import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG; @Path("/connectors") @Produces(MediaType.APPLICATION_JSON) @@ -179,14 +180,24 @@ public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") Strin @GET @Path("/{connector}/topics") - public Map getConnectorActiveTopics(final @PathParam("connector") String connector) throws Throwable { + public Response getConnectorActiveTopics(final @PathParam("connector") String connector) throws Throwable { + if (!config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)) { + return Response.status(Response.Status.FORBIDDEN) + .entity("Topic tracking is disabled.") + .build(); + } ActiveTopicsInfo info = herder.connectorActiveTopics(connector); - return Collections.singletonMap(info.connector(), info); + return Response.ok(Collections.singletonMap(info.connector(), info)).build(); } @PUT @Path("/{connector}/topics/reset") public Response resetConnectorActiveTopics(final @PathParam("connector") String connector, final @Context HttpHeaders headers) throws Throwable { + if (!config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)) { + return Response.status(Response.Status.FORBIDDEN) + .entity("Topic tracking is disabled.") + .build(); + } if (!config.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)) { return Response.status(Response.Status.FORBIDDEN) .entity("Topic tracking reset is disabled.")