From 8c242e7c8e8b472699f11082cea55112337648d6 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Wed, 20 Apr 2016 17:51:22 -0700 Subject: [PATCH 1/9] Handle zero task --- .../storage/KafkaConfigBackingStore.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 8d2028852ecf8..770f74302d683 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -339,6 +339,7 @@ private void updateConnectorConfig(String connector, byte[] serializedConfig) { * Write these task configurations and associated commit messages, unless an inconsistency is found that indicates * that we would be leaving one of the referenced connectors with an inconsistent state. * + * @param connector the connector to write task configuration * @param configs map containing task configurations * @throws ConnectException if the task configurations do not resolve inconsistencies found in the existing root * and task configurations. @@ -357,17 +358,18 @@ public void putTaskConfigs(String connector, Map newTaskCounts = new HashMap<>(); + int newTaskCount = 0; synchronized (lock) { // Validate tasks in this assignment. Any task configuration updates should include updates for *all* tasks // in the connector -- we should have all task IDs 0 - N-1 within a connector if any task is included here Map> updatedConfigIdsByConnector = taskIdsByConnector(configs); - for (Map.Entry> taskConfigSetEntry : updatedConfigIdsByConnector.entrySet()) { - if (!completeTaskIdSet(taskConfigSetEntry.getValue(), taskConfigSetEntry.getValue().size())) { + if (updatedConfigIdsByConnector.containsKey(connector)) { + Set taskConfigSet = updatedConfigIdsByConnector.get(connector); + if (!completeTaskIdSet(taskConfigSet, taskConfigSet.size())) { log.error("Submitted task configuration contain invalid range of task IDs, ignoring this submission"); throw new ConnectException("Error writing task configurations: found some connectors with invalid connectors"); } - newTaskCounts.put(taskConfigSetEntry.getKey(), taskConfigSetEntry.getValue().size()); + newTaskCount += taskConfigSet.size(); } } @@ -386,14 +388,12 @@ public void putTaskConfigs(String connector, Map taskCountEntry : newTaskCounts.entrySet()) { - Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0); - connectConfig.put("tasks", taskCountEntry.getValue()); - byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0, connectConfig); - log.debug("Writing commit for connector " + taskCountEntry.getKey() + " with " + taskCountEntry.getValue() + " tasks."); - configLog.send(COMMIT_TASKS_KEY(taskCountEntry.getKey()), serializedConfig); - } + // Write the commit message + Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0); + connectConfig.put("tasks", newTaskCount); + byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0, connectConfig); + log.debug("Writing commit for connector " + connector + " with " + newTaskCount + " tasks."); + configLog.send(COMMIT_TASKS_KEY(connector), serializedConfig); // Read to end to ensure all the commit messages have been written configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS); From 2f054116f464b74807e464cbb0e5296400de2d13 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Thu, 21 Apr 2016 14:21:26 -0700 Subject: [PATCH 2/9] Add unit test --- .../storage/KafkaConfigBackingStore.java | 41 ++++++++------- .../storage/KafkaConfigBackingStoreTest.java | 51 +++++++++++++++++++ 2 files changed, 71 insertions(+), 21 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 770f74302d683..94d869321fdf0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -362,15 +362,12 @@ public void putTaskConfigs(String connector, Map> updatedConfigIdsByConnector = taskIdsByConnector(configs); - if (updatedConfigIdsByConnector.containsKey(connector)) { - Set taskConfigSet = updatedConfigIdsByConnector.get(connector); - if (!completeTaskIdSet(taskConfigSet, taskConfigSet.size())) { - log.error("Submitted task configuration contain invalid range of task IDs, ignoring this submission"); - throw new ConnectException("Error writing task configurations: found some connectors with invalid connectors"); - } - newTaskCount += taskConfigSet.size(); + Set taskConfigSet = taskIds(connector, configs); + if (!completeTaskIdSet(taskConfigSet, taskConfigSet.size())) { + log.error("Submitted task configuration contain invalid range of task IDs, ignoring this submission"); + throw new ConnectException("Error writing task configurations: found some connectors with invalid connectors"); } + newTaskCount += taskConfigSet.size(); } // Start sending all the individual updates @@ -386,8 +383,9 @@ public void putTaskConfigs(String connector, Map 0) { + configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } // Write the commit message Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0); connectConfig.put("tasks", newTaskCount); @@ -426,6 +424,7 @@ private KafkaBasedLog createKafkaBasedLog(String topic, Map(topic, producerProps, consumerProps, consumedCallback, new SystemTime()); } + @SuppressWarnings("unchecked") private class ConsumeCallback implements Callback> { @Override public void onCompletion(Throwable error, ConsumerRecord record) { @@ -569,8 +568,7 @@ public void onCompletion(Throwable error, ConsumerRecord record) // Validate the configs we're supposed to update to ensure we're getting a complete configuration // update of all tasks that are expected based on the number of tasks in the commit message. - Map> updatedConfigIdsByConnector = taskIdsByConnector(deferred); - Set taskIdSet = updatedConfigIdsByConnector.get(connectorName); + Set taskIdSet = taskIds(connectorName, deferred); if (taskIdSet == null) { //TODO: Figure out why this happens (KAFKA-3321) log.error("Received a commit message for connector " + connectorName + " but there is no matching configuration for tasks in this connector. This should never happen."); @@ -622,19 +620,20 @@ private ConnectorTaskId parseTaskId(String key) { } /** - * Given task configurations, get a set of integer task IDs organized by connector name. + * Given task configurations, get a set of integer task IDs for the connector. */ - private Map> taskIdsByConnector(Map> configs) { - Map> connectorTaskIds = new HashMap<>(); - if (configs == null) - return connectorTaskIds; + private Set taskIds(String connector, Map> configs) { + Set tasks = new TreeSet<>(); + if (configs == null) { + return tasks; + } for (Map.Entry> taskConfigEntry : configs.entrySet()) { ConnectorTaskId taskId = taskConfigEntry.getKey(); - if (!connectorTaskIds.containsKey(taskId.connector())) - connectorTaskIds.put(taskId.connector(), new TreeSet()); - connectorTaskIds.get(taskId.connector()).add(taskId.task()); + if (taskId.connector().equals(connector)) { + tasks.add(taskId.task()); + } } - return connectorTaskIds; + return tasks; } private boolean completeTaskIdSet(Set idSet, int expectedSize) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index eaad34bd72269..705c60c28371f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -295,6 +295,57 @@ public void testPutTaskConfigs() throws Exception { PowerMock.verifyAll(); } + @Test + public void testPutTaskConfigsZeroTasks() throws Exception { + expectConfigure(); + expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP); + + // Task configs should read to end, write to the log, read to end, write root. + expectReadToEnd(new LinkedHashMap()); + expectConvertWriteRead( + COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0), + "tasks", 0); // We have 0 tasks + // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks + configUpdateListener.onTaskConfigUpdate(Collections.emptyList()); + EasyMock.expectLastCall(); + + // Records to be read by consumer as it reads to the end of the log + LinkedHashMap serializedConfigs = new LinkedHashMap<>(); + serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)); + expectReadToEnd(serializedConfigs); + + expectStop(); + + PowerMock.replayAll(); + + configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.start(); + + // Bootstrap as if we had already added the connector, but no tasks had been added yet + whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.EMPTY_LIST); + + // Null before writing + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(-1, configState.offset()); + + // Writing task task configs should block until all the writes have been performed and the root record update + // has completed + Map> taskConfigs = new HashMap<>(); + configStorage.putTaskConfigs("connector1", taskConfigs); + + // Validate root config by listing all connectors and tasks + configState = configStorage.snapshot(); + assertEquals(1, configState.offset()); + String connectorName = CONNECTOR_IDS.get(0); + assertEquals(Arrays.asList(connectorName), new ArrayList<>(configState.connectors())); + assertEquals(Collections.emptyList(), configState.tasks(connectorName)); + assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); + + configStorage.stop(); + + PowerMock.verifyAll(); + } + @Test public void testRestore() throws Exception { // Restoring data should notify only of the latest values after loading is complete. This also validates From 402e120c5d6ca5f6ea05ebd2fd71369d2abfa7bc Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Wed, 27 Apr 2016 22:58:57 -0700 Subject: [PATCH 3/9] WIP --- .../distributed/ClusterConfigState.java | 6 ++-- .../distributed/DistributedHerder.java | 15 +++++----- .../runtime/standalone/StandaloneHerder.java | 16 +++++----- .../connect/storage/ConfigBackingStore.java | 5 ++-- .../storage/KafkaConfigBackingStore.java | 29 +++++++------------ .../storage/MemoryConfigBackingStore.java | 5 +++- 6 files changed, 37 insertions(+), 39 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java index c5c217e6f4731..303b2bf302e06 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java @@ -118,11 +118,11 @@ public Map taskConfig(ConnectorTaskId task) { * @param connector name of the connector * @return a map from the task id to its configuration */ - public Map> allTaskConfigs(String connector) { - Map> taskConfigs = new HashMap<>(); + public Map> allTaskConfigs(String connector) { + Map> taskConfigs = new HashMap<>(); for (Map.Entry> taskConfigEntry : this.taskConfigs.entrySet()) { if (taskConfigEntry.getKey().connector().equals(connector)) - taskConfigs.put(taskConfigEntry.getKey(), taskConfigEntry.getValue()); + taskConfigs.put(taskConfigEntry.getKey().task(), taskConfigEntry.getValue()); } return taskConfigs; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index cbef1867744b4..b24017a0228db 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -44,13 +44,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -543,7 +543,7 @@ public Void call() throws Exception { else if (!configState.contains(connName)) callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); else { - configBackingStore.putTaskConfigs(connName, taskConfigListAsMap(connName, configs)); + configBackingStore.putTaskConfigs(connName, taskConfigListAsMap(configs)); callback.onCompletion(null, null); } return null; @@ -853,7 +853,7 @@ private void reconfigureConnector(final String connName, final Callback cb } if (changed) { if (isLeader()) { - configBackingStore.putTaskConfigs(connName, taskConfigListAsMap(connName, taskProps)); + configBackingStore.putTaskConfigs(connName, taskConfigListAsMap(taskProps)); cb.onCompletion(null, null); } else { // We cannot forward the request on the same thread because this reconfiguration can happen as a result of connector @@ -1064,12 +1064,11 @@ public void onRevoked(String leader, Collection connectors, Collection> taskConfigListAsMap(String connName, List> configs) { + private static Map> taskConfigListAsMap(List> configs) { int index = 0; - Map> result = new HashMap<>(); - for (Map taskConfigMap : configs) { - ConnectorTaskId taskId = new ConnectorTaskId(connName, index); - result.put(taskId, taskConfigMap); + Map> result = new TreeMap<>(); + for (Map taskConfigMap: configs) { + result.put(index, taskConfigMap); index++; } return result; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index ad02e99587c12..05a2a7a87a785 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -250,7 +250,7 @@ private String startConnector(Map connectorProps) { return connName; } - private Map> recomputeTaskConfigs(String connName) { + private Map> recomputeTaskConfigs(String connName) { Map config = configState.connectorConfig(connName); ConnectorConfig connConfig = new ConnectorConfig(config); @@ -258,10 +258,12 @@ private Map> recomputeTaskConfigs(String co connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), connConfig.getList(ConnectorConfig.TOPICS_CONFIG)); - int i = 0; - Map> taskConfigMap = new HashMap<>(); - for (Map taskConfig : taskConfigs) - taskConfigMap.put(new ConnectorTaskId(connName, i++), taskConfig); + int index = 0; + Map> taskConfigMap = new HashMap<>(); + for (Map taskConfig : taskConfigs) { + taskConfigMap.put(index, taskConfig); + index++; + } return taskConfigMap; } @@ -296,8 +298,8 @@ private void updateConnectorTasks(String connName) { return; } - Map> newTaskConfigs = recomputeTaskConfigs(connName); - Map> oldTaskConfigs = configState.allTaskConfigs(connName); + Map> newTaskConfigs = recomputeTaskConfigs(connName); + Map> oldTaskConfigs = configState.allTaskConfigs(connName); if (!newTaskConfigs.equals(oldTaskConfigs)) { removeConnectorTasks(connName); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java index 5244842a2c984..77fc43b680c19 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.util.ConnectorTaskId; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -64,9 +65,9 @@ public interface ConfigBackingStore { /** * Update the task configurations for a connector. * @param connector name of the connector - * @param configs the new task configs + * @param configs the new task configs for the connector */ - void putTaskConfigs(String connector, Map> configs); + void putTaskConfigs(String connector, List> configs); /** * Remove the task configs associated with a connector. diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 94d869321fdf0..19690802e1bdb 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -205,6 +205,7 @@ public static String COMMIT_TASKS_KEY(String connectorName) { // Connector and task configs: name or id -> config map private Map> connectorConfigs = new HashMap<>(); private Map> taskConfigs = new HashMap<>(); + // Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data // is in an inconsistent state and we cannot safely use them until they have been refreshed. private Set inconsistent = new HashSet<>(); @@ -340,12 +341,12 @@ private void updateConnectorConfig(String connector, byte[] serializedConfig) { * that we would be leaving one of the referenced connectors with an inconsistent state. * * @param connector the connector to write task configuration - * @param configs map containing task configurations + * @param configs map containing task configurations for the connector * @throws ConnectException if the task configurations do not resolve inconsistencies found in the existing root * and task configurations. */ @Override - public void putTaskConfigs(String connector, Map> configs) { + public void putTaskConfigs(String connector, List> configs) { // Make sure we're at the end of the log. We should be the only writer, but we want to make sure we don't have // any outstanding lagging data to consume. try { @@ -354,29 +355,21 @@ public void putTaskConfigs(String connector, Map taskConfigSet = taskIds(connector, configs); - if (!completeTaskIdSet(taskConfigSet, taskConfigSet.size())) { - log.error("Submitted task configuration contain invalid range of task IDs, ignoring this submission"); - throw new ConnectException("Error writing task configurations: found some connectors with invalid connectors"); - } - newTaskCount += taskConfigSet.size(); - } + int newTaskCount = configs.size(); // Start sending all the individual updates - for (Map.Entry> taskConfigEntry : configs.entrySet()) { + int index = 0; + for (Map taskConfig: configs) { Struct connectConfig = new Struct(TASK_CONFIGURATION_V0); - connectConfig.put("properties", taskConfigEntry.getValue()); + connectConfig.put("properties", taskConfig); byte[] serializedConfig = converter.fromConnectData(topic, TASK_CONFIGURATION_V0, connectConfig); - log.debug("Writing configuration for task " + taskConfigEntry.getKey() + " configuration: " + taskConfigEntry.getValue()); - configLog.send(TASK_KEY(taskConfigEntry.getKey()), serializedConfig); + log.debug("Writing configuration for task " + index + " configuration: " + taskConfig); + ConnectorTaskId connectorTaskId = new ConnectorTaskId(connector, index); + configLog.send(TASK_KEY(connectorTaskId), serializedConfig); + index++; } // Finally, send the commit to update the number of tasks and apply the new configs, then wait until we read to diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java index ec5f2e6c47522..7e827dc945bd9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -108,7 +109,7 @@ public synchronized void removeTaskConfigs(String connector) { } @Override - public synchronized void putTaskConfigs(String connector, Map> configs) { + public synchronized void putTaskConfigs(String connector, List> configs) { ConnectorState state = connectors.get(connector); if (state == null) throw new IllegalArgumentException("Cannot put tasks for non-existing connector"); @@ -151,4 +152,6 @@ public ConnectorState(Map connConfig) { this.taskConfigs = new HashMap<>(); } } + + private static } From 5b72ffa57fb99c3aa7b61300cb91ad656bba0278 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Thu, 28 Apr 2016 15:25:11 -0700 Subject: [PATCH 4/9] Change interface of putTaskConfigs --- .../distributed/ClusterConfigState.java | 11 ++++++----- .../runtime/distributed/DistributedHerder.java | 4 ++-- .../runtime/standalone/StandaloneHerder.java | 18 ++++-------------- .../storage/KafkaConfigBackingStore.java | 12 +++++------- .../storage/MemoryConfigBackingStore.java | 15 ++++++++++++--- 5 files changed, 29 insertions(+), 31 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java index 303b2bf302e06..ea5ba82d4ba08 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java @@ -22,10 +22,11 @@ import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; /** * An immutable snapshot of the configuration state of connectors and tasks in a Kafka Connect cluster. @@ -116,15 +117,15 @@ public Map taskConfig(ConnectorTaskId task) { /** * Get all task configs for a connector. * @param connector name of the connector - * @return a map from the task id to its configuration + * @return a list of task configurations */ - public Map> allTaskConfigs(String connector) { - Map> taskConfigs = new HashMap<>(); + public List> allTaskConfigs(String connector) { + Map> taskConfigs = new TreeMap<>(); for (Map.Entry> taskConfigEntry : this.taskConfigs.entrySet()) { if (taskConfigEntry.getKey().connector().equals(connector)) taskConfigs.put(taskConfigEntry.getKey().task(), taskConfigEntry.getValue()); } - return taskConfigs; + return new LinkedList<>(taskConfigs.values()); } /** diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index b24017a0228db..bb8606dae903c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -543,7 +543,7 @@ public Void call() throws Exception { else if (!configState.contains(connName)) callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); else { - configBackingStore.putTaskConfigs(connName, taskConfigListAsMap(configs)); + configBackingStore.putTaskConfigs(connName, configs); callback.onCompletion(null, null); } return null; @@ -853,7 +853,7 @@ private void reconfigureConnector(final String connName, final Callback cb } if (changed) { if (isLeader()) { - configBackingStore.putTaskConfigs(connName, taskConfigListAsMap(taskProps)); + configBackingStore.putTaskConfigs(connName, taskProps); cb.onCompletion(null, null); } else { // We cannot forward the request on the same thread because this reconfiguration can happen as a result of connector diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index 05a2a7a87a785..2316baefb3654 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -40,7 +40,6 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -250,22 +249,13 @@ private String startConnector(Map connectorProps) { return connName; } - private Map> recomputeTaskConfigs(String connName) { + private List> recomputeTaskConfigs(String connName) { Map config = configState.connectorConfig(connName); ConnectorConfig connConfig = new ConnectorConfig(config); - List> taskConfigs = worker.connectorTaskConfigs(connName, + return worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), connConfig.getList(ConnectorConfig.TOPICS_CONFIG)); - - int index = 0; - Map> taskConfigMap = new HashMap<>(); - for (Map taskConfig : taskConfigs) { - taskConfigMap.put(index, taskConfig); - index++; - } - - return taskConfigMap; } private void createConnectorTasks(String connName, TargetState initialState) { @@ -298,8 +288,8 @@ private void updateConnectorTasks(String connName) { return; } - Map> newTaskConfigs = recomputeTaskConfigs(connName); - Map> oldTaskConfigs = configState.allTaskConfigs(connName); + List> newTaskConfigs = recomputeTaskConfigs(connName); + List> oldTaskConfigs = configState.allTaskConfigs(connName); if (!newTaskConfigs.equals(oldTaskConfigs)) { removeConnectorTasks(connName); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 19690802e1bdb..8eacc82367009 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -355,10 +355,8 @@ public void putTaskConfigs(String connector, List> configs) log.error("Failed to write root configuration to Kafka: ", e); throw new ConnectException("Error writing root configuration to Kafka", e); } - // In theory, there is only a single writer and we shouldn't need this lock since the background thread should - // not invoke any callbacks that would conflict, but in practice this guards against inconsistencies due to - // the root config being updated. - int newTaskCount = configs.size(); + + int taskCount = configs.size(); // Start sending all the individual updates int index = 0; @@ -376,14 +374,14 @@ public void putTaskConfigs(String connector, List> configs) // the end of the log try { // Read to end to ensure all the task configs have been written - if (newTaskCount > 0) { + if (taskCount > 0) { configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS); } // Write the commit message Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0); - connectConfig.put("tasks", newTaskCount); + connectConfig.put("tasks", taskCount); byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0, connectConfig); - log.debug("Writing commit for connector " + connector + " with " + newTaskCount + " tasks."); + log.debug("Writing commit for connector " + connector + " with " + taskCount + " tasks."); configLog.send(COMMIT_TASKS_KEY(connector), serializedConfig); // Read to end to ensure all the commit messages have been written diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java index 7e827dc945bd9..212022dd0fc7a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; public class MemoryConfigBackingStore implements ConfigBackingStore { @@ -114,10 +115,11 @@ public synchronized void putTaskConfigs(String connector, List> taskConfigsMap = taskConfigListAsMap(connector, configs); + state.taskConfigs = taskConfigsMap; if (updateListener != null) - updateListener.onTaskConfigUpdate(configs.keySet()); + updateListener.onTaskConfigUpdate(taskConfigsMap.keySet()); } @Override @@ -153,5 +155,12 @@ public ConnectorState(Map connConfig) { } } - private static + private static Map> taskConfigListAsMap(String connector, List> configs) { + int index = 0; + Map> result = new TreeMap<>(); + for (Map taskConfigMap: configs) { + result.put(new ConnectorTaskId(connector, index++), taskConfigMap); + } + return result; + } } From 942a081eaa0a7670a39837b889ccea8253a62739 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Thu, 28 Apr 2016 15:25:30 -0700 Subject: [PATCH 5/9] Fix test failures --- .../storage/KafkaConfigBackingStoreTest.java | 26 ++++--------------- 1 file changed, 5 insertions(+), 21 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index 705c60c28371f..1eb755d3c28bb 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -21,7 +21,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; @@ -57,7 +56,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; @RunWith(PowerMockRunner.class) @PrepareForTest(KafkaConfigBackingStore.class) @@ -275,9 +273,7 @@ public void testPutTaskConfigs() throws Exception { // Writing task task configs should block until all the writes have been performed and the root record update // has completed - Map> taskConfigs = new HashMap<>(); - taskConfigs.put(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0)); - taskConfigs.put(TASK_IDS.get(1), SAMPLE_CONFIGS.get(1)); + List> taskConfigs = Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1)); configStorage.putTaskConfigs("connector1", taskConfigs); // Validate root config by listing all connectors and tasks @@ -330,7 +326,7 @@ public void testPutTaskConfigsZeroTasks() throws Exception { // Writing task task configs should block until all the writes have been performed and the root record update // has completed - Map> taskConfigs = new HashMap<>(); + List> taskConfigs = Collections.emptyList(); configStorage.putTaskConfigs("connector1", taskConfigs); // Validate root config by listing all connectors and tasks @@ -403,8 +399,8 @@ public void testRestore() throws Exception { @Test public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception { // Test a case where a failure and compaction has left us in an inconsistent state when reading the log. - // We start out by loading an initial configuration where we started to write a task update and failed before - // writing an the commit, and then compaction cleaned up the earlier record. + // We start out by loading an initial configuration where we started to write a task update, and then + // compaction cleaned up the earlier record. expectConfigure(); List> existingRecords = Arrays.asList( @@ -422,9 +418,6 @@ public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exceptio logOffset = 6; expectStart(existingRecords, deserialized); - // One failed attempt to write new task configs - expectReadToEnd(new LinkedHashMap()); - // Successful attempt to write new task config expectReadToEnd(new LinkedHashMap()); expectConvertWriteRead( @@ -443,7 +436,6 @@ public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exceptio serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2)); expectReadToEnd(serializedConfigs); - expectStop(); PowerMock.replayAll(); @@ -461,17 +453,9 @@ public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exceptio assertNull(configState.taskConfig(TASK_IDS.get(1))); assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), configState.inconsistentConnectors()); - // First try sending an invalid set of configs (can't possibly represent a valid config set for the tasks) - try { - configStorage.putTaskConfigs("connector1", Collections.singletonMap(TASK_IDS.get(1), SAMPLE_CONFIGS.get(2))); - fail("Should have failed due to incomplete task set."); - } catch (KafkaException e) { - // expected - } - // Next, issue a write that has everything that is needed and it should be accepted. Note that in this case // we are going to shrink the number of tasks to 1 - configStorage.putTaskConfigs("connector1", Collections.singletonMap(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0))); + configStorage.putTaskConfigs("connector1", Collections.singletonList(SAMPLE_CONFIGS.get(0))); // Validate updated config configState = configStorage.snapshot(); // This is only two more ahead of the last one because multiple calls fail, and so their configs are not written From 7b7fbbbb9ddd61bb1e3ecd660274a9c38c578f14 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Thu, 28 Apr 2016 16:16:15 -0700 Subject: [PATCH 6/9] Simplify taskIds --- .../connect/storage/KafkaConfigBackingStore.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 8eacc82367009..87f6ad352eae0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -559,7 +559,7 @@ public void onCompletion(Throwable error, ConsumerRecord record) // Validate the configs we're supposed to update to ensure we're getting a complete configuration // update of all tasks that are expected based on the number of tasks in the commit message. - Set taskIdSet = taskIds(connectorName, deferred); + Set taskIdSet = taskIds(deferred); if (taskIdSet == null) { //TODO: Figure out why this happens (KAFKA-3321) log.error("Received a commit message for connector " + connectorName + " but there is no matching configuration for tasks in this connector. This should never happen."); @@ -613,16 +613,13 @@ private ConnectorTaskId parseTaskId(String key) { /** * Given task configurations, get a set of integer task IDs for the connector. */ - private Set taskIds(String connector, Map> configs) { + private Set taskIds(Map> configs) { Set tasks = new TreeSet<>(); if (configs == null) { return tasks; } - for (Map.Entry> taskConfigEntry : configs.entrySet()) { - ConnectorTaskId taskId = taskConfigEntry.getKey(); - if (taskId.connector().equals(connector)) { - tasks.add(taskId.task()); - } + for (ConnectorTaskId taskId : configs.keySet()) { + tasks.add(taskId.task()); } return tasks; } From 04833b0a7dd3e9e439852beb09870d58c7d2bad2 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Thu, 28 Apr 2016 16:56:08 -0700 Subject: [PATCH 7/9] Fix Java doc --- .../apache/kafka/connect/storage/KafkaConfigBackingStore.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 87f6ad352eae0..37f11612cf0fe 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -341,7 +341,7 @@ private void updateConnectorConfig(String connector, byte[] serializedConfig) { * that we would be leaving one of the referenced connectors with an inconsistent state. * * @param connector the connector to write task configuration - * @param configs map containing task configurations for the connector + * @param configs list of task configurations for the connector * @throws ConnectException if the task configurations do not resolve inconsistencies found in the existing root * and task configurations. */ From 0417e78eaf489783bba6ec8aa652a951e960ebd4 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Fri, 29 Apr 2016 12:06:13 -0700 Subject: [PATCH 8/9] Remove null check for task set and add aseert to guard against task configs from other connectors --- .../connect/runtime/distributed/DistributedHerder.java | 10 ---------- .../kafka/connect/storage/KafkaConfigBackingStore.java | 10 +++------- 2 files changed, 3 insertions(+), 17 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index bb8606dae903c..037eba742816f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -50,7 +50,6 @@ import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; -import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -1064,13 +1063,4 @@ public void onRevoked(String leader, Collection connectors, Collection> taskConfigListAsMap(List> configs) { - int index = 0; - Map> result = new TreeMap<>(); - for (Map taskConfigMap: configs) { - result.put(index, taskConfigMap); - index++; - } - return result; - } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 37f11612cf0fe..e36886b5bbccb 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -559,12 +559,7 @@ public void onCompletion(Throwable error, ConsumerRecord record) // Validate the configs we're supposed to update to ensure we're getting a complete configuration // update of all tasks that are expected based on the number of tasks in the commit message. - Set taskIdSet = taskIds(deferred); - if (taskIdSet == null) { - //TODO: Figure out why this happens (KAFKA-3321) - log.error("Received a commit message for connector " + connectorName + " but there is no matching configuration for tasks in this connector. This should never happen."); - return; - } + Set taskIdSet = taskIds(connectorName, deferred); if (!completeTaskIdSet(taskIdSet, newTaskCount)) { // Given the logic for writing commit messages, we should only hit this condition due to compacted // historical data, in which case we would not have applied any updates yet and there will be no @@ -613,12 +608,13 @@ private ConnectorTaskId parseTaskId(String key) { /** * Given task configurations, get a set of integer task IDs for the connector. */ - private Set taskIds(Map> configs) { + private Set taskIds(String connector, Map> configs) { Set tasks = new TreeSet<>(); if (configs == null) { return tasks; } for (ConnectorTaskId taskId : configs.keySet()) { + assert taskId.connector().equals(connector); tasks.add(taskId.task()); } return tasks; From f07e684edf274b0980175c6c71820b4fff8296ee Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Fri, 29 Apr 2016 12:56:24 -0700 Subject: [PATCH 9/9] Add unit test for restore --- .../storage/KafkaConfigBackingStore.java | 1 - .../storage/KafkaConfigBackingStoreTest.java | 56 +++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index e36886b5bbccb..9412e42629dc8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -552,7 +552,6 @@ public void onCompletion(Throwable error, ConsumerRecord record) log.error("Ignoring connector tasks configuration commit for connector " + connectorName + " because it is in the wrong format: " + value.value()); return; } - Map> deferred = deferredTaskUpdates.get(connectorName); int newTaskCount = intValue(((Map) value.value()).get("tasks")); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index 1eb755d3c28bb..617177e1a4f16 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -111,6 +111,9 @@ public class KafkaConfigBackingStoreTest { private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2); + private static final Struct TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR + = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 0); + // The exact format doesn't matter here since both conversions are mocked private static final List CONFIGS_SERIALIZED = Arrays.asList( "config-bytes-1".getBytes(), "config-bytes-2".getBytes(), "config-bytes-3".getBytes(), @@ -396,6 +399,59 @@ public void testRestore() throws Exception { PowerMock.verifyAll(); } + @Test + public void testRestoreZeroTasks() throws Exception { + // Restoring data should notify only of the latest values after loading is complete. This also validates + // that inconsistent state is ignored. + expectConfigure(); + // Overwrite each type at least once to ensure we see the latest data after loading + List> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), + new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)), + new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)), + // Connector after root update should make it through, task update shouldn't + new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)), + new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)), + new ConsumerRecord<>(TOPIC, 0, 7, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(7))); + LinkedHashMap deserialized = new LinkedHashMap(); + deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_CONFIG_STRUCTS.get(1)); + deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + deserialized.put(CONFIGS_SERIALIZED.get(5), CONNECTOR_CONFIG_STRUCTS.get(2)); + deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1)); + deserialized.put(CONFIGS_SERIALIZED.get(7), TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR); + logOffset = 8; + expectStart(existingRecords, deserialized); + + // Shouldn't see any callbacks since this is during startup + + expectStop(); + + PowerMock.replayAll(); + + configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.start(); + + // Should see a single connector and its config should be the last one seen anywhere in the log + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(8, configState.offset()); // Should always be next to be read, even if uncommitted + assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); + // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2] + assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0))); + // Should see 0 tasks for that connector. + assertEquals(Collections.emptyList(), configState.tasks(CONNECTOR_IDS.get(0))); + // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0] + assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); + + configStorage.stop(); + + PowerMock.verifyAll(); + } + @Test public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception { // Test a case where a failure and compaction has left us in an inconsistent state when reading the log.