From 3fcf148fb97f97722abed6bfbfb29a476e761ae0 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Fri, 6 Nov 2015 11:10:17 -0800 Subject: [PATCH] KAFKA-2764: Change use of Properties in Copycat to Maps. --- .../kafka/common/config/AbstractConfig.java | 19 ++++++- .../kafka/copycat/connector/Connector.java | 12 ++--- .../apache/kafka/copycat/connector/Task.java | 4 +- .../apache/kafka/copycat/sink/SinkTask.java | 3 +- .../kafka/copycat/source/SourceTask.java | 4 +- .../ConnectorReconfigurationTest.java | 11 ++-- .../copycat/file/FileStreamSinkConnector.java | 15 +++--- .../copycat/file/FileStreamSinkTask.java | 5 +- .../file/FileStreamSourceConnector.java | 19 +++---- .../copycat/file/FileStreamSourceTask.java | 6 +-- .../file/FileStreamSinkConnectorTest.java | 17 ++++--- .../file/FileStreamSourceConnectorTest.java | 27 +++++----- .../file/FileStreamSourceTaskTest.java | 12 ++--- .../kafka/copycat/cli/CopycatDistributed.java | 8 +-- .../kafka/copycat/cli/CopycatStandalone.java | 14 ++--- .../apache/kafka/copycat/runtime/Worker.java | 21 +++----- .../kafka/copycat/runtime/WorkerConfig.java | 4 +- .../kafka/copycat/runtime/WorkerSinkTask.java | 21 ++++---- .../copycat/runtime/WorkerSourceTask.java | 8 +-- .../kafka/copycat/runtime/WorkerTask.java | 4 +- .../distributed/DistributedConfig.java | 4 +- .../runtime/standalone/StandaloneConfig.java | 4 +- .../copycat/runtime/WorkerSinkTaskTest.java | 17 +++---- .../copycat/runtime/WorkerSourceTaskTest.java | 16 +++--- .../kafka/copycat/runtime/WorkerTest.java | 51 +++++++++---------- .../distributed/DistributedHerderTest.java | 3 +- 26 files changed, 167 insertions(+), 162 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 327a9edf981c7..07b64c044d745 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -105,9 +105,9 @@ public Set unused() { return keys; } - public Properties unusedProperties() { + public Map unusedConfigs() { Set unusedKeys = this.unused(); - Properties unusedProps = new Properties(); + Map unusedProps = new HashMap<>(); for (String key : unusedKeys) unusedProps.put(key, this.originals.get(key)); return unusedProps; @@ -119,6 +119,21 @@ public Map originals() { return copy; } + /** + * Get all the original settings, ensuring that all values are of type String. + * @return the original settings + * @throw ClassCastException if any of the values are not strings + */ + public Map originalsStrings() { + Map copy = new RecordingMap<>(); + for (Map.Entry entry : originals.entrySet()) { + if (!(entry.getValue() instanceof String)) + throw new ClassCastException("Non-string value found in original settings"); + copy.put(entry.getKey(), (String) entry.getValue()); + } + return copy; + } + /** * Gets all original settings with the given prefix, stripping the prefix before adding it to the output. * diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java index ae141c4416fe5..6972d3d6a4765 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.annotation.InterfaceStability; import java.util.List; -import java.util.Properties; +import java.util.Map; /** *

@@ -69,7 +69,7 @@ public void initialize(ConnectorContext ctx) { * @param taskConfigs existing task configurations, which may be used when generating new task configs to avoid * churn in partition to task assignments */ - public void initialize(ConnectorContext ctx, List taskConfigs) { + public void initialize(ConnectorContext ctx, List> taskConfigs) { context = ctx; // Ignore taskConfigs. May result in more churn of tasks during recovery if updated configs // are very different, but reduces the difficulty of implementing a Connector @@ -81,17 +81,17 @@ public void initialize(ConnectorContext ctx, List taskConfigs) { * * @param props configuration settings */ - public abstract void start(Properties props); + public abstract void start(Map props); /** * Reconfigure this Connector. Most implementations will not override this, using the default - * implementation that calls {@link #stop()} followed by {@link #start(Properties)}. + * implementation that calls {@link #stop()} followed by {@link #start(Map)}. * Implementations only need to override this if they want to handle this process more * efficiently, e.g. without shutting down network connections to the external system. * * @param props new configuration settings */ - public void reconfigure(Properties props) { + public void reconfigure(Map props) { stop(); start(props); } @@ -108,7 +108,7 @@ public void reconfigure(Properties props) { * @param maxTasks maximum number of configurations to generate * @return configurations for Tasks */ - public abstract List taskConfigs(int maxTasks); + public abstract List> taskConfigs(int maxTasks); /** * Stop this connector. diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java index cdaba0845c224..2a8c98ce0f139 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java @@ -19,7 +19,7 @@ import org.apache.kafka.common.annotation.InterfaceStability; -import java.util.Properties; +import java.util.Map; /** *

@@ -40,7 +40,7 @@ public interface Task { * Start the Task * @param props initial configuration */ - void start(Properties props); + void start(Map props); /** * Stop this task. diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java index 7c03cda9f3715..b2d5ff67bcf7b 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.Map; -import java.util.Properties; /** * SinkTask is a Task takes records loaded from Kafka and sends them to another system. In @@ -52,7 +51,7 @@ public void initialize(SinkTaskContext context) { * @param props initial configuration */ @Override - public abstract void start(Properties props); + public abstract void start(Map props); /** * Put the records in the sink. Usually this should send the records to the sink asynchronously diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java index 30cbf16f155c8..841943f4d239c 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java @@ -21,7 +21,7 @@ import org.apache.kafka.copycat.connector.Task; import java.util.List; -import java.util.Properties; +import java.util.Map; /** * SourceTask is a Task that pulls records from another system for storage in Kafka. @@ -43,7 +43,7 @@ public void initialize(SourceTaskContext context) { * @param props initial configuration */ @Override - public abstract void start(Properties props); + public abstract void start(Map props); /** * Poll this SourceTask for new records. This method should block if no data is currently diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java index cbaf86610bf6c..79ddfd7a2a880 100644 --- a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java +++ b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java @@ -20,8 +20,9 @@ import org.apache.kafka.copycat.errors.CopycatException; import org.junit.Test; +import java.util.Collections; import java.util.List; -import java.util.Properties; +import java.util.Map; import static org.junit.Assert.assertEquals; @@ -30,7 +31,7 @@ public class ConnectorReconfigurationTest { @Test public void testDefaultReconfigure() throws Exception { TestConnector conn = new TestConnector(false); - conn.reconfigure(new Properties()); + conn.reconfigure(Collections.emptyMap()); assertEquals(conn.stopOrder, 0); assertEquals(conn.configureOrder, 1); } @@ -38,7 +39,7 @@ public void testDefaultReconfigure() throws Exception { @Test(expected = CopycatException.class) public void testReconfigureStopException() throws Exception { TestConnector conn = new TestConnector(true); - conn.reconfigure(new Properties()); + conn.reconfigure(Collections.emptyMap()); } private static class TestConnector extends Connector { @@ -52,7 +53,7 @@ public TestConnector(boolean stopException) { } @Override - public void start(Properties props) { + public void start(Map props) { configureOrder = order++; } @@ -62,7 +63,7 @@ public Class taskClass() { } @Override - public List taskConfigs(int count) { + public List> taskConfigs(int count) { return null; } diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java index 6e2b04ddcdc98..763f638b5a36c 100644 --- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java +++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java @@ -21,8 +21,9 @@ import org.apache.kafka.copycat.sink.SinkConnector; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; -import java.util.Properties; +import java.util.Map; /** * Very simple connector that works with the console. This connector supports both source and @@ -34,8 +35,8 @@ public class FileStreamSinkConnector extends SinkConnector { private String filename; @Override - public void start(Properties props) { - filename = props.getProperty(FILE_CONFIG); + public void start(Map props) { + filename = props.get(FILE_CONFIG); } @Override @@ -44,12 +45,12 @@ public Class taskClass() { } @Override - public List taskConfigs(int maxTasks) { - ArrayList configs = new ArrayList<>(); + public List> taskConfigs(int maxTasks) { + ArrayList> configs = new ArrayList<>(); for (int i = 0; i < maxTasks; i++) { - Properties config = new Properties(); + Map config = new HashMap<>(); if (filename != null) - config.setProperty(FILE_CONFIG, filename); + config.put(FILE_CONFIG, filename); configs.add(config); } return configs; diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java index 6dfe4a7af3bf6..5286d2b6f584d 100644 --- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java +++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java @@ -30,7 +30,6 @@ import java.io.PrintStream; import java.util.Collection; import java.util.Map; -import java.util.Properties; /** * FileStreamSinkTask writes records to stdout or a file. @@ -51,8 +50,8 @@ public FileStreamSinkTask(PrintStream outputStream) { } @Override - public void start(Properties props) { - filename = props.getProperty(FileStreamSinkConnector.FILE_CONFIG); + public void start(Map props) { + filename = props.get(FileStreamSinkConnector.FILE_CONFIG); if (filename == null) { outputStream = System.out; } else { diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java index 716322f4b77c7..9784bb1569055 100644 --- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java +++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java @@ -22,8 +22,9 @@ import org.apache.kafka.copycat.source.SourceConnector; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; -import java.util.Properties; +import java.util.Map; /** * Very simple connector that works with the console. This connector supports both source and @@ -37,9 +38,9 @@ public class FileStreamSourceConnector extends SourceConnector { private String topic; @Override - public void start(Properties props) { - filename = props.getProperty(FILE_CONFIG); - topic = props.getProperty(TOPIC_CONFIG); + public void start(Map props) { + filename = props.get(FILE_CONFIG); + topic = props.get(TOPIC_CONFIG); if (topic == null || topic.isEmpty()) throw new CopycatException("FileStreamSourceConnector configuration must include 'topic' setting"); if (topic.contains(",")) @@ -52,13 +53,13 @@ public Class taskClass() { } @Override - public List taskConfigs(int maxTasks) { - ArrayList configs = new ArrayList<>(); + public List> taskConfigs(int maxTasks) { + ArrayList> configs = new ArrayList<>(); // Only one input stream makes sense. - Properties config = new Properties(); + Map config = new HashMap<>(); if (filename != null) - config.setProperty(FILE_CONFIG, filename); - config.setProperty(TOPIC_CONFIG, topic); + config.put(FILE_CONFIG, filename); + config.put(TOPIC_CONFIG, topic); configs.add(config); return configs; } diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java index f2249d0ef8266..70eef5cdd0033 100644 --- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java +++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java @@ -46,15 +46,15 @@ public class FileStreamSourceTask extends SourceTask { private Long streamOffset; @Override - public void start(Properties props) { - filename = props.getProperty(FileStreamSourceConnector.FILE_CONFIG); + public void start(Map props) { + filename = props.get(FileStreamSourceConnector.FILE_CONFIG); if (filename == null || filename.isEmpty()) { stream = System.in; // Tracking offset for stdin doesn't make sense streamOffset = null; reader = new BufferedReader(new InputStreamReader(stream)); } - topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG); + topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG); if (topic == null) throw new CopycatException("FileStreamSourceTask config missing topic setting"); } diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java index ab5fd3b2863e4..b30856fbfef40 100644 --- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java +++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java @@ -25,8 +25,9 @@ import org.powermock.api.easymock.PowerMock; import java.util.Arrays; +import java.util.HashMap; import java.util.List; -import java.util.Properties; +import java.util.Map; import static org.junit.Assert.assertEquals; @@ -42,7 +43,7 @@ public class FileStreamSinkConnectorTest { private FileStreamSinkConnector connector; private ConnectorContext ctx; - private Properties sinkProperties; + private Map sinkProperties; @Before public void setup() { @@ -50,9 +51,9 @@ public void setup() { ctx = PowerMock.createMock(ConnectorContext.class); connector.initialize(ctx); - sinkProperties = new Properties(); - sinkProperties.setProperty(SinkConnector.TOPICS_CONFIG, MULTIPLE_TOPICS); - sinkProperties.setProperty(FileStreamSinkConnector.FILE_CONFIG, FILENAME); + sinkProperties = new HashMap<>(); + sinkProperties.put(SinkConnector.TOPICS_CONFIG, MULTIPLE_TOPICS); + sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, FILENAME); } @Test @@ -60,14 +61,14 @@ public void testSinkTasks() { PowerMock.replayAll(); connector.start(sinkProperties); - List taskConfigs = connector.taskConfigs(1); + List> taskConfigs = connector.taskConfigs(1); assertEquals(1, taskConfigs.size()); - assertEquals(FILENAME, taskConfigs.get(0).getProperty(FileStreamSinkConnector.FILE_CONFIG)); + assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG)); taskConfigs = connector.taskConfigs(2); assertEquals(2, taskConfigs.size()); for (int i = 0; i < 2; i++) { - assertEquals(FILENAME, taskConfigs.get(0).getProperty(FileStreamSinkConnector.FILE_CONFIG)); + assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG)); } PowerMock.verifyAll(); diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java index 41e15a08332d4..28bfa62f07ed0 100644 --- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java +++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java @@ -23,8 +23,9 @@ import org.junit.Test; import org.powermock.api.easymock.PowerMock; +import java.util.HashMap; import java.util.List; -import java.util.Properties; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -37,7 +38,7 @@ public class FileStreamSourceConnectorTest { private FileStreamSourceConnector connector; private ConnectorContext ctx; - private Properties sourceProperties; + private Map sourceProperties; @Before public void setup() { @@ -45,9 +46,9 @@ public void setup() { ctx = PowerMock.createMock(ConnectorContext.class); connector.initialize(ctx); - sourceProperties = new Properties(); - sourceProperties.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, SINGLE_TOPIC); - sourceProperties.setProperty(FileStreamSourceConnector.FILE_CONFIG, FILENAME); + sourceProperties = new HashMap<>(); + sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, SINGLE_TOPIC); + sourceProperties.put(FileStreamSourceConnector.FILE_CONFIG, FILENAME); } @Test @@ -55,20 +56,20 @@ public void testSourceTasks() { PowerMock.replayAll(); connector.start(sourceProperties); - List taskConfigs = connector.taskConfigs(1); + List> taskConfigs = connector.taskConfigs(1); assertEquals(1, taskConfigs.size()); assertEquals(FILENAME, - taskConfigs.get(0).getProperty(FileStreamSourceConnector.FILE_CONFIG)); + taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG)); assertEquals(SINGLE_TOPIC, - taskConfigs.get(0).getProperty(FileStreamSourceConnector.TOPIC_CONFIG)); + taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG)); // Should be able to return fewer than requested # taskConfigs = connector.taskConfigs(2); assertEquals(1, taskConfigs.size()); assertEquals(FILENAME, - taskConfigs.get(0).getProperty(FileStreamSourceConnector.FILE_CONFIG)); + taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG)); assertEquals(SINGLE_TOPIC, - taskConfigs.get(0).getProperty(FileStreamSourceConnector.TOPIC_CONFIG)); + taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG)); PowerMock.verifyAll(); } @@ -79,16 +80,16 @@ public void testSourceTasksStdin() { sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG); connector.start(sourceProperties); - List taskConfigs = connector.taskConfigs(1); + List> taskConfigs = connector.taskConfigs(1); assertEquals(1, taskConfigs.size()); - assertNull(taskConfigs.get(0).getProperty(FileStreamSourceConnector.FILE_CONFIG)); + assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG)); PowerMock.verifyAll(); } @Test(expected = CopycatException.class) public void testMultipleSourcesInvalid() { - sourceProperties.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, MULTIPLE_TOPICS); + sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, MULTIPLE_TOPICS); connector.start(sourceProperties); } diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java index 4365defb69238..ddf8e43deaaa3 100644 --- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java +++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java @@ -31,9 +31,9 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import static org.junit.Assert.assertEquals; @@ -42,7 +42,7 @@ public class FileStreamSourceTaskTest { private static final String TOPIC = "test"; private File tempFile; - private Properties config; + private Map config; private OffsetStorageReader offsetStorageReader; private SourceTaskContext context; private FileStreamSourceTask task; @@ -52,9 +52,9 @@ public class FileStreamSourceTaskTest { @Before public void setup() throws IOException { tempFile = File.createTempFile("file-stream-source-task-test", null); - config = new Properties(); - config.setProperty(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsolutePath()); - config.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC); + config = new HashMap<>(); + config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsolutePath()); + config.put(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC); task = new FileStreamSourceTask(); offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class); context = PowerMock.createMock(SourceTaskContext.class); @@ -135,7 +135,7 @@ public void testMissingTopic() throws InterruptedException { } public void testInvalidFile() throws InterruptedException { - config.setProperty(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename"); + config.put(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename"); task.start(config); // Currently the task retries indefinitely if the file isn't found, but shouldn't return any data. for (int i = 0; i < 100; i++) diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java index ca3f76c96b15b..8dfefaad5d52a 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java @@ -28,7 +28,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Properties; +import java.util.Collections; +import java.util.Map; /** *

@@ -44,15 +45,14 @@ public class CopycatDistributed { private static final Logger log = LoggerFactory.getLogger(CopycatDistributed.class); public static void main(String[] args) throws Exception { - Properties workerProps; - if (args.length < 1) { log.info("Usage: CopycatDistributed worker.properties"); System.exit(1); } String workerPropsFile = args[0]; - workerProps = !workerPropsFile.isEmpty() ? Utils.loadProps(workerPropsFile) : new Properties(); + Map workerProps = !workerPropsFile.isEmpty() ? + Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap(); DistributedConfig config = new DistributedConfig(workerProps); Worker worker = new Worker(config, new KafkaOffsetBackingStore()); diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java index cd4fc96fc75b2..38695522c7dc8 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java @@ -34,7 +34,8 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; -import java.util.Properties; +import java.util.Collections; +import java.util.Map; /** *

@@ -52,8 +53,6 @@ public class CopycatStandalone { private static final Logger log = LoggerFactory.getLogger(CopycatStandalone.class); public static void main(String[] args) throws Exception { - Properties workerProps; - Properties connectorProps; if (args.length < 2) { log.info("Usage: CopycatStandalone worker.properties connector1.properties [connector2.properties ...]"); @@ -61,7 +60,8 @@ public static void main(String[] args) throws Exception { } String workerPropsFile = args[0]; - workerProps = !workerPropsFile.isEmpty() ? Utils.loadProps(workerPropsFile) : new Properties(); + Map workerProps = !workerPropsFile.isEmpty() ? + Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap(); StandaloneConfig config = new StandaloneConfig(workerProps); Worker worker = new Worker(config, new FileOffsetBackingStore()); @@ -72,7 +72,7 @@ public static void main(String[] args) throws Exception { try { for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) { - connectorProps = Utils.loadProps(connectorPropsFile); + Map connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile)); FutureCallback> cb = new FutureCallback<>(new Callback>() { @Override public void onCompletion(Throwable error, Herder.Created info) { @@ -83,8 +83,8 @@ public void onCompletion(Throwable error, Herder.Created info) { } }); herder.putConnectorConfig( - connectorProps.getProperty(ConnectorConfig.NAME_CONFIG), - Utils.propsToStringMap(connectorProps), false, cb); + connectorProps.get(ConnectorConfig.NAME_CONFIG), + connectorProps, false, cb); cb.get(); } } catch (Throwable t) { diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java index de9f533121746..08eab86233309 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java @@ -38,7 +38,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; /** @@ -89,15 +88,12 @@ public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingSt public void start() { log.info("Worker starting"); - Properties unusedConfigs = config.unusedProperties(); - Map producerProps = new HashMap<>(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - for (String propName : unusedConfigs.stringPropertyNames()) { - producerProps.put(propName, unusedConfigs.getProperty(propName)); - } + producerProps.putAll(config.unusedConfigs()); + producer = new KafkaProducer<>(producerProps); offsetBackingStore.start(); @@ -177,10 +173,7 @@ public void addConnector(ConnectorConfig connConfig, ConnectorContext ctx) { final Connector connector = instantiateConnector(connClass); connector.initialize(ctx); try { - Map originals = connConfig.originals(); - Properties props = new Properties(); - props.putAll(originals); - connector.start(props); + connector.start(connConfig.originalsStrings()); } catch (CopycatException e) { throw new CopycatException("Connector threw an exception while starting", e); } @@ -209,8 +202,8 @@ public List> connectorTaskConfigs(String connName, int maxTa List> result = new ArrayList<>(); String taskClassName = connector.taskClass().getName(); - for (Properties taskProps : connector.taskConfigs(maxTasks)) { - Map taskConfig = Utils.propsToStringMap(taskProps); + for (Map taskProps : connector.taskConfigs(maxTasks)) { + Map taskConfig = new HashMap<>(taskProps); // Ensure we don't modify the connector's copy of the config taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName); if (sinkTopics != null) taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ",")); @@ -280,9 +273,7 @@ public void addTask(ConnectorTaskId id, TaskConfig taskConfig) { // Start the task before adding modifying any state, any exceptions are caught higher up the // call chain and there's no cleanup to do here - Properties props = new Properties(); - props.putAll(taskConfig.originals()); - workerTask.start(props); + workerTask.start(taskConfig.originalsStrings()); if (task instanceof SourceTask) { WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask; sourceTaskOffsetCommitter.schedule(id, workerSourceTask); diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java index 0c6a6f62b3f05..b962d543f81ce 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; -import java.util.Properties; +import java.util.Map; /** * Common base class providing configuration for Copycat workers, whether standalone or distributed. @@ -132,7 +132,7 @@ protected static ConfigDef baseConfigDef() { .define(REST_ADVERTISED_PORT_CONFIG, Type.INT, null, Importance.LOW, REST_ADVERTISED_PORT_DOC); } - public WorkerConfig(ConfigDef definition, Properties props) { + public WorkerConfig(ConfigDef definition, Map props) { super(definition, props); } } diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java index dc5173049c7b2..e9193b874ff99 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java @@ -44,7 +44,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.concurrent.TimeUnit; /** @@ -60,7 +59,7 @@ class WorkerSinkTask implements WorkerTask { private final Converter keyConverter; private final Converter valueConverter; private WorkerSinkTaskThread workThread; - private Properties taskProps; + private Map taskProps; private KafkaConsumer consumer; private WorkerSinkTaskContext context; private boolean started; @@ -78,7 +77,7 @@ public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConf } @Override - public void start(Properties props) { + public void start(Map props) { taskProps = props; consumer = createConsumer(); context = new WorkerSinkTaskContext(consumer); @@ -126,7 +125,7 @@ public void close() { * @returns true if successful, false if joining the consumer group was interrupted */ public boolean joinConsumerGroupAndStart() { - String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG); + String topicsStr = taskProps.get(SinkTask.TOPICS_CONFIG); if (topicsStr == null || topicsStr.isEmpty()) throw new CopycatException("Sink tasks require a list of topics."); String[] topics = topicsStr.split(","); @@ -222,14 +221,14 @@ public WorkerConfig workerConfig() { private KafkaConsumer createConsumer() { // Include any unknown worker configs so consumer configs can be set globally on the worker // and through to the task - Properties props = workerConfig.unusedProperties(); - props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "copycat-" + id.connector()); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + Map props = workerConfig.unusedConfigs(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "copycat-" + id.connector()); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); - props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); KafkaConsumer newConsumer; try { diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java index 1f96c78604104..cdb41b0ea994e 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java @@ -34,7 +34,7 @@ import java.util.IdentityHashMap; import java.util.List; -import java.util.Properties; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -85,7 +85,7 @@ public WorkerSourceTask(ConnectorTaskId id, SourceTask task, } @Override - public void start(Properties props) { + public void start(Map props) { workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id, props); workThread.start(); } @@ -273,11 +273,11 @@ private void finishSuccessfulFlush() { private class WorkerSourceTaskThread extends ShutdownableThread { - private Properties workerProps; + private Map workerProps; private boolean finishedStart; private boolean startedShutdownBeforeStartCompleted; - public WorkerSourceTaskThread(String name, Properties workerProps) { + public WorkerSourceTaskThread(String name, Map workerProps) { super(name); this.workerProps = workerProps; this.finishedStart = false; diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java index af225bb7e2d05..0759efe5724c1 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java @@ -17,7 +17,7 @@ package org.apache.kafka.copycat.runtime; -import java.util.Properties; +import java.util.Map; /** * Handles processing for an individual task. This interface only provides the basic methods @@ -29,7 +29,7 @@ interface WorkerTask { * Start the Task * @param props initial configuration */ - void start(Properties props); + void start(Map props); /** * Stop this task from processing messages. This method does not block, it only triggers diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java index 90d63cfa22b65..a2848b1a3e985 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.copycat.runtime.WorkerConfig; -import java.util.Properties; +import java.util.Map; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; @@ -180,7 +180,7 @@ public class DistributedConfig extends WorkerConfig { WORKER_UNSYNC_BACKOFF_MS_DOC); } - public DistributedConfig(Properties props) { + public DistributedConfig(Map props) { super(CONFIG, props); } diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java index 246d36d3bf2b3..6e547d3d8c028 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.copycat.runtime.WorkerConfig; -import java.util.Properties; +import java.util.Map; public class StandaloneConfig extends WorkerConfig { private static final ConfigDef CONFIG; @@ -29,7 +29,7 @@ public class StandaloneConfig extends WorkerConfig { CONFIG = baseConfigDef(); } - public StandaloneConfig(Properties props) { + public StandaloneConfig(Map props) { super(CONFIG, props); } } diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java index 177f7a668bdf9..79057368a76a8 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java @@ -56,7 +56,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Properties; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -86,7 +85,7 @@ public class WorkerSinkTaskTest extends ThreadedTest { private static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3); private static final TopicPartition UNASSIGNED_TOPIC_PARTITION = new TopicPartition(TOPIC, 200); - private static final Properties TASK_PROPS = new Properties(); + private static final Map TASK_PROPS = new HashMap<>(); static { TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC); } @@ -111,13 +110,13 @@ public class WorkerSinkTaskTest extends ThreadedTest { public void setup() { super.setup(); time = new MockTime(); - Properties workerProps = new Properties(); - workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.setProperty("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.setProperty("internal.key.converter.schemas.enable", "false"); - workerProps.setProperty("internal.value.converter.schemas.enable", "false"); + Map workerProps = new HashMap<>(); + workerProps.put("key.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.put("value.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.put("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.put("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.put("internal.key.converter.schemas.enable", "false"); + workerProps.put("internal.value.converter.schemas.enable", "false"); workerConfig = new StandaloneConfig(workerProps); workerTask = PowerMock.createPartialMock( WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"}, diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java index 452c5cbea0f7d..0fa14bdc9629d 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java @@ -82,7 +82,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { private Capture producerCallbacks; - private static final Properties EMPTY_TASK_PROPS = new Properties(); + private static final Map EMPTY_TASK_PROPS = Collections.emptyMap(); private static final List RECORDS = Arrays.asList( new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD) ); @@ -90,13 +90,13 @@ public class WorkerSourceTaskTest extends ThreadedTest { @Override public void setup() { super.setup(); - Properties workerProps = new Properties(); - workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.setProperty("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.setProperty("internal.key.converter.schemas.enable", "false"); - workerProps.setProperty("internal.value.converter.schemas.enable", "false"); + Map workerProps = new HashMap<>(); + workerProps.put("key.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.put("value.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.put("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.put("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.put("internal.key.converter.schemas.enable", "false"); + workerProps.put("internal.value.converter.schemas.enable", "false"); config = new StandaloneConfig(workerProps); producerCallbacks = EasyMock.newCapture(); } diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java index 05015a475c600..f99c7118f5f9d 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java @@ -19,7 +19,6 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.copycat.connector.Connector; import org.apache.kafka.copycat.connector.ConnectorContext; import org.apache.kafka.copycat.connector.Task; @@ -46,10 +45,10 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Properties; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -70,13 +69,13 @@ public class WorkerTest extends ThreadedTest { public void setup() { super.setup(); - Properties workerProps = new Properties(); - workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.setProperty("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.setProperty("internal.key.converter.schemas.enable", "false"); - workerProps.setProperty("internal.value.converter.schemas.enable", "false"); + Map workerProps = new HashMap<>(); + workerProps.put("key.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.put("value.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.put("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.put("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.put("internal.key.converter.schemas.enable", "false"); + workerProps.put("internal.value.converter.schemas.enable", "false"); config = new StandaloneConfig(workerProps); } @@ -94,7 +93,7 @@ public void testAddRemoveConnector() throws Exception { PowerMock.mockStatic(Worker.class); PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector); - Properties props = new Properties(); + Map props = new HashMap<>(); props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); @@ -117,7 +116,7 @@ public void testAddRemoveConnector() throws Exception { worker = new Worker(new MockTime(), config, offsetBackingStore); worker.start(); - ConnectorConfig config = new ConnectorConfig(Utils.propsToStringMap(props)); + ConnectorConfig config = new ConnectorConfig(props); assertEquals(Collections.emptySet(), worker.connectorNames()); worker.addConnector(config, ctx); assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames()); @@ -164,7 +163,7 @@ public void testReconfigureConnectorTasks() throws Exception { PowerMock.mockStatic(Worker.class); PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector); - Properties props = new Properties(); + Map props = new HashMap<>(); props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); @@ -177,8 +176,8 @@ public void testReconfigureConnectorTasks() throws Exception { // Reconfigure EasyMock.>expect(connector.taskClass()).andReturn(TestSourceTask.class); - Properties taskProps = new Properties(); - taskProps.setProperty("foo", "bar"); + Map taskProps = new HashMap<>(); + taskProps.put("foo", "bar"); EasyMock.expect(connector.taskConfigs(2)).andReturn(Arrays.asList(taskProps, taskProps)); // Remove @@ -193,7 +192,7 @@ public void testReconfigureConnectorTasks() throws Exception { worker = new Worker(new MockTime(), config, offsetBackingStore); worker.start(); - ConnectorConfig config = new ConnectorConfig(Utils.propsToStringMap(props)); + ConnectorConfig config = new ConnectorConfig(props); assertEquals(Collections.emptySet(), worker.connectorNames()); worker.addConnector(config, ctx); assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames()); @@ -204,10 +203,10 @@ public void testReconfigureConnectorTasks() throws Exception { // expected } List> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, 2, Arrays.asList("foo", "bar")); - Properties expectedTaskProps = new Properties(); - expectedTaskProps.setProperty("foo", "bar"); - expectedTaskProps.setProperty(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); - expectedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, "foo,bar"); + Map expectedTaskProps = new HashMap<>(); + expectedTaskProps.put("foo", "bar"); + expectedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); + expectedTaskProps.put(SinkTask.TOPICS_CONFIG, "foo,bar"); assertEquals(2, taskConfigs.size()); assertEquals(expectedTaskProps, taskConfigs.get(0)); assertEquals(expectedTaskProps, taskConfigs.get(1)); @@ -246,7 +245,7 @@ public void testAddRemoveTask() throws Exception { EasyMock.anyObject(WorkerConfig.class), EasyMock.anyObject(Time.class)) .andReturn(workerTask); - Properties origProps = new Properties(); + Map origProps = new HashMap<>(); origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); workerTask.start(origProps); EasyMock.expectLastCall(); @@ -266,7 +265,7 @@ public void testAddRemoveTask() throws Exception { worker = new Worker(new MockTime(), config, offsetBackingStore); worker.start(); assertEquals(Collections.emptySet(), worker.taskIds()); - worker.addTask(taskId, new TaskConfig(Utils.propsToStringMap(origProps))); + worker.addTask(taskId, new TaskConfig(origProps)); assertEquals(new HashSet<>(Arrays.asList(taskId)), worker.taskIds()); worker.stopTask(taskId); assertEquals(Collections.emptySet(), worker.taskIds()); @@ -315,7 +314,7 @@ public void testCleanupTasksOnStop() throws Exception { EasyMock.anyObject(WorkerConfig.class), EasyMock.anyObject(Time.class)) .andReturn(workerTask); - Properties origProps = new Properties(); + Map origProps = new HashMap<>(); origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); workerTask.start(origProps); EasyMock.expectLastCall(); @@ -335,7 +334,7 @@ public void testCleanupTasksOnStop() throws Exception { worker = new Worker(new MockTime(), config, offsetBackingStore); worker.start(); - worker.addTask(TASK_ID, new TaskConfig(Utils.propsToStringMap(origProps))); + worker.addTask(TASK_ID, new TaskConfig(origProps)); worker.stop(); PowerMock.verifyAll(); @@ -344,7 +343,7 @@ public void testCleanupTasksOnStop() throws Exception { private static class TestConnector extends Connector { @Override - public void start(Properties props) { + public void start(Map props) { } @@ -354,7 +353,7 @@ public Class taskClass() { } @Override - public List taskConfigs(int maxTasks) { + public List> taskConfigs(int maxTasks) { return null; } @@ -369,7 +368,7 @@ public TestSourceTask() { } @Override - public void start(Properties props) { + public void start(Map props) { } @Override diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java index 78734470ce461..512cb5c1ebc65 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java @@ -55,7 +55,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; @@ -65,7 +64,7 @@ @PrepareForTest(DistributedHerder.class) @PowerMockIgnore("javax.management.*") public class DistributedHerderTest { - private static final Properties HERDER_CONFIG = new Properties(); + private static final Map HERDER_CONFIG = new HashMap<>(); static { HERDER_CONFIG.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic"); HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");