Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

/**
* An immutable snapshot of the configuration state of connectors and tasks in a Kafka Connect cluster.
Expand Down Expand Up @@ -116,15 +117,15 @@ public Map<String, String> taskConfig(ConnectorTaskId task) {
/**
* Get all task configs for a connector.
* @param connector name of the connector
* @return a map from the task id to its configuration
* @return a list of task configurations
*/
public Map<ConnectorTaskId, Map<String, String>> allTaskConfigs(String connector) {
Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
public List<Map<String, String>> allTaskConfigs(String connector) {
Map<Integer, Map<String, String>> taskConfigs = new TreeMap<>();
for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : this.taskConfigs.entrySet()) {
if (taskConfigEntry.getKey().connector().equals(connector))
taskConfigs.put(taskConfigEntry.getKey(), taskConfigEntry.getValue());
taskConfigs.put(taskConfigEntry.getKey().task(), taskConfigEntry.getValue());
}
return taskConfigs;
return new LinkedList<>(taskConfigs.values());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason to use a linked list?

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -543,7 +542,7 @@ public Void call() throws Exception {
else if (!configState.contains(connName))
callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
else {
configBackingStore.putTaskConfigs(connName, taskConfigListAsMap(connName, configs));
configBackingStore.putTaskConfigs(connName, configs);
callback.onCompletion(null, null);
}
return null;
Expand Down Expand Up @@ -853,7 +852,7 @@ private void reconfigureConnector(final String connName, final Callback<Void> cb
}
if (changed) {
if (isLeader()) {
configBackingStore.putTaskConfigs(connName, taskConfigListAsMap(connName, taskProps));
configBackingStore.putTaskConfigs(connName, taskProps);
cb.onCompletion(null, null);
} else {
// We cannot forward the request on the same thread because this reconfiguration can happen as a result of connector
Expand Down Expand Up @@ -1064,14 +1063,4 @@ public void onRevoked(String leader, Collection<String> connectors, Collection<C
}
}

private static Map<ConnectorTaskId, Map<String, String>> taskConfigListAsMap(String connName, List<Map<String, String>> configs) {
int index = 0;
Map<ConnectorTaskId, Map<String, String>> result = new HashMap<>();
for (Map<String, String> taskConfigMap : configs) {
ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
result.put(taskId, taskConfigMap);
index++;
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -250,20 +249,13 @@ private String startConnector(Map<String, String> connectorProps) {
return connName;
}

private Map<ConnectorTaskId, Map<String, String>> recomputeTaskConfigs(String connName) {
private List<Map<String, String>> recomputeTaskConfigs(String connName) {
Map<String, String> config = configState.connectorConfig(connName);
ConnectorConfig connConfig = new ConnectorConfig(config);

List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(connName,
return worker.connectorTaskConfigs(connName,
connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
connConfig.getList(ConnectorConfig.TOPICS_CONFIG));

int i = 0;
Map<ConnectorTaskId, Map<String, String>> taskConfigMap = new HashMap<>();
for (Map<String, String> taskConfig : taskConfigs)
taskConfigMap.put(new ConnectorTaskId(connName, i++), taskConfig);

return taskConfigMap;
}

private void createConnectorTasks(String connName, TargetState initialState) {
Expand Down Expand Up @@ -296,8 +288,8 @@ private void updateConnectorTasks(String connName) {
return;
}

Map<ConnectorTaskId, Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName);
Map<ConnectorTaskId, Map<String, String>> oldTaskConfigs = configState.allTaskConfigs(connName);
List<Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName);
List<Map<String, String>> oldTaskConfigs = configState.allTaskConfigs(connName);

if (!newTaskConfigs.equals(oldTaskConfigs)) {
removeConnectorTasks(connName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.connect.util.ConnectorTaskId;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -64,9 +65,9 @@ public interface ConfigBackingStore {
/**
* Update the task configurations for a connector.
* @param connector name of the connector
* @param configs the new task configs
* @param configs the new task configs for the connector
*/
void putTaskConfigs(String connector, Map<ConnectorTaskId, Map<String, String>> configs);
void putTaskConfigs(String connector, List<Map<String, String>> configs);

/**
* Remove the task configs associated with a connector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ public static String COMMIT_TASKS_KEY(String connectorName) {
// Connector and task configs: name or id -> config map
private Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
private Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();

// Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
// is in an inconsistent state and we cannot safely use them until they have been refreshed.
private Set<String> inconsistent = new HashSet<>();
Expand Down Expand Up @@ -339,12 +340,13 @@ private void updateConnectorConfig(String connector, byte[] serializedConfig) {
* Write these task configurations and associated commit messages, unless an inconsistency is found that indicates
* that we would be leaving one of the referenced connectors with an inconsistent state.
*
* @param configs map containing task configurations
* @param connector the connector to write task configuration
* @param configs list of task configurations for the connector
* @throws ConnectException if the task configurations do not resolve inconsistencies found in the existing root
* and task configurations.
*/
@Override
public void putTaskConfigs(String connector, Map<ConnectorTaskId, Map<String, String>> configs) {
public void putTaskConfigs(String connector, List<Map<String, String>> configs) {
// Make sure we're at the end of the log. We should be the only writer, but we want to make sure we don't have
// any outstanding lagging data to consume.
try {
Expand All @@ -354,46 +356,33 @@ public void putTaskConfigs(String connector, Map<ConnectorTaskId, Map<String, St
throw new ConnectException("Error writing root configuration to Kafka", e);
}

// In theory, there is only a single writer and we shouldn't need this lock since the background thread should
// not invoke any callbacks that would conflict, but in practice this guards against inconsistencies due to
// the root config being updated.
Map<String, Integer> newTaskCounts = new HashMap<>();
synchronized (lock) {
// Validate tasks in this assignment. Any task configuration updates should include updates for *all* tasks
// in the connector -- we should have all task IDs 0 - N-1 within a connector if any task is included here
Map<String, Set<Integer>> updatedConfigIdsByConnector = taskIdsByConnector(configs);
for (Map.Entry<String, Set<Integer>> taskConfigSetEntry : updatedConfigIdsByConnector.entrySet()) {
if (!completeTaskIdSet(taskConfigSetEntry.getValue(), taskConfigSetEntry.getValue().size())) {
log.error("Submitted task configuration contain invalid range of task IDs, ignoring this submission");
throw new ConnectException("Error writing task configurations: found some connectors with invalid connectors");
}
newTaskCounts.put(taskConfigSetEntry.getKey(), taskConfigSetEntry.getValue().size());
}
}
int taskCount = configs.size();

// Start sending all the individual updates
for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : configs.entrySet()) {
int index = 0;
for (Map<String, String> taskConfig: configs) {
Struct connectConfig = new Struct(TASK_CONFIGURATION_V0);
connectConfig.put("properties", taskConfigEntry.getValue());
connectConfig.put("properties", taskConfig);
byte[] serializedConfig = converter.fromConnectData(topic, TASK_CONFIGURATION_V0, connectConfig);
log.debug("Writing configuration for task " + taskConfigEntry.getKey() + " configuration: " + taskConfigEntry.getValue());
configLog.send(TASK_KEY(taskConfigEntry.getKey()), serializedConfig);
log.debug("Writing configuration for task " + index + " configuration: " + taskConfig);
ConnectorTaskId connectorTaskId = new ConnectorTaskId(connector, index);
configLog.send(TASK_KEY(connectorTaskId), serializedConfig);
index++;
}

// Finally, send the commit to update the number of tasks and apply the new configs, then wait until we read to
// the end of the log
try {
// Read to end to ensure all the task configs have been written
configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);

// Write all the commit messages
for (Map.Entry<String, Integer> taskCountEntry : newTaskCounts.entrySet()) {
Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
connectConfig.put("tasks", taskCountEntry.getValue());
byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0, connectConfig);
log.debug("Writing commit for connector " + taskCountEntry.getKey() + " with " + taskCountEntry.getValue() + " tasks.");
configLog.send(COMMIT_TASKS_KEY(taskCountEntry.getKey()), serializedConfig);
if (taskCount > 0) {
configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
// Write the commit message
Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
connectConfig.put("tasks", taskCount);
byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0, connectConfig);
log.debug("Writing commit for connector " + connector + " with " + taskCount + " tasks.");
configLog.send(COMMIT_TASKS_KEY(connector), serializedConfig);

// Read to end to ensure all the commit messages have been written
configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -426,6 +415,7 @@ private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<Stri
return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
}

@SuppressWarnings("unchecked")
private class ConsumeCallback implements Callback<ConsumerRecord<String, byte[]>> {
@Override
public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
Expand Down Expand Up @@ -562,20 +552,13 @@ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record)
log.error("Ignoring connector tasks configuration commit for connector " + connectorName + " because it is in the wrong format: " + value.value());
return;
}

Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName);

int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks"));

// Validate the configs we're supposed to update to ensure we're getting a complete configuration
// update of all tasks that are expected based on the number of tasks in the commit message.
Map<String, Set<Integer>> updatedConfigIdsByConnector = taskIdsByConnector(deferred);
Set<Integer> taskIdSet = updatedConfigIdsByConnector.get(connectorName);
if (taskIdSet == null) {
//TODO: Figure out why this happens (KAFKA-3321)
log.error("Received a commit message for connector " + connectorName + " but there is no matching configuration for tasks in this connector. This should never happen.");
return;
}
Set<Integer> taskIdSet = taskIds(connectorName, deferred);
if (!completeTaskIdSet(taskIdSet, newTaskCount)) {
// Given the logic for writing commit messages, we should only hit this condition due to compacted
// historical data, in which case we would not have applied any updates yet and there will be no
Expand Down Expand Up @@ -622,19 +605,18 @@ private ConnectorTaskId parseTaskId(String key) {
}

/**
* Given task configurations, get a set of integer task IDs organized by connector name.
* Given task configurations, get a set of integer task IDs for the connector.
*/
private Map<String, Set<Integer>> taskIdsByConnector(Map<ConnectorTaskId, Map<String, String>> configs) {
Map<String, Set<Integer>> connectorTaskIds = new HashMap<>();
if (configs == null)
return connectorTaskIds;
for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : configs.entrySet()) {
ConnectorTaskId taskId = taskConfigEntry.getKey();
if (!connectorTaskIds.containsKey(taskId.connector()))
connectorTaskIds.put(taskId.connector(), new TreeSet<Integer>());
connectorTaskIds.get(taskId.connector()).add(taskId.task());
private Set<Integer> taskIds(String connector, Map<ConnectorTaskId, Map<String, String>> configs) {
Set<Integer> tasks = new TreeSet<>();
if (configs == null) {
return tasks;
}
for (ConnectorTaskId taskId : configs.keySet()) {
assert taskId.connector().equals(connector);
tasks.add(taskId.task());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method had me worried at first because it doesn't seem to do anything to guarantee there's only one connector's tasks in the map. We might be able to switch the internal representation here to also be a list, although the way we collect them makes it natural to maintain as a map and then convert to the list (especially given a compacted topic).

Maybe a good solution is an assertion here that the connector name matches as expected? I think we're sort of covered by the unit tests that test that compacted topics work even if random sets of task configs get cleaned up, but right now I'm not certain we test the possibility that we have two connectors with tasks that have been compacted and are in an inconsistent state.

}
return connectorTaskIds;
return tasks;
}

private boolean completeTaskIdSet(Set<Integer> idSet, int expectedSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;

public class MemoryConfigBackingStore implements ConfigBackingStore {
Expand Down Expand Up @@ -108,15 +110,16 @@ public synchronized void removeTaskConfigs(String connector) {
}

@Override
public synchronized void putTaskConfigs(String connector, Map<ConnectorTaskId, Map<String, String>> configs) {
public synchronized void putTaskConfigs(String connector, List<Map<String, String>> configs) {
ConnectorState state = connectors.get(connector);
if (state == null)
throw new IllegalArgumentException("Cannot put tasks for non-existing connector");

state.taskConfigs = configs;
Map<ConnectorTaskId, Map<String, String>> taskConfigsMap = taskConfigListAsMap(connector, configs);
state.taskConfigs = taskConfigsMap;

if (updateListener != null)
updateListener.onTaskConfigUpdate(configs.keySet());
updateListener.onTaskConfigUpdate(taskConfigsMap.keySet());
}

@Override
Expand Down Expand Up @@ -151,4 +154,13 @@ public ConnectorState(Map<String, String> connConfig) {
this.taskConfigs = new HashMap<>();
}
}

private static Map<ConnectorTaskId, Map<String, String>> taskConfigListAsMap(String connector, List<Map<String, String>> configs) {
int index = 0;
Map<ConnectorTaskId, Map<String, String>> result = new TreeMap<>();
for (Map<String, String> taskConfigMap: configs) {
result.put(new ConnectorTaskId(connector, index++), taskConfigMap);
}
return result;
}
}
Loading