Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ public Set<String> unused() {
return keys;
}

public Properties unusedProperties() {
public Map<String, Object> unusedConfigs() {
Set<String> unusedKeys = this.unused();
Properties unusedProps = new Properties();
Map<String, Object> unusedProps = new HashMap<>();
for (String key : unusedKeys)
unusedProps.put(key, this.originals.get(key));
return unusedProps;
Expand All @@ -119,6 +119,21 @@ public Map<String, Object> originals() {
return copy;
}

/**
* Get all the original settings, ensuring that all values are of type String.
* @return the original settings
* @throw ClassCastException if any of the values are not strings
*/
public Map<String, String> originalsStrings() {
Map<String, String> copy = new RecordingMap<>();
for (Map.Entry<String, ?> entry : originals.entrySet()) {
if (!(entry.getValue() instanceof String))
throw new ClassCastException("Non-string value found in original settings");
copy.put(entry.getKey(), (String) entry.getValue());
}
return copy;
}

/**
* Gets all original settings with the given prefix, stripping the prefix before adding it to the output.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.List;
import java.util.Properties;
import java.util.Map;

/**
* <p>
Expand Down Expand Up @@ -69,7 +69,7 @@ public void initialize(ConnectorContext ctx) {
* @param taskConfigs existing task configurations, which may be used when generating new task configs to avoid
* churn in partition to task assignments
*/
public void initialize(ConnectorContext ctx, List<Properties> taskConfigs) {
public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs) {
context = ctx;
// Ignore taskConfigs. May result in more churn of tasks during recovery if updated configs
// are very different, but reduces the difficulty of implementing a Connector
Expand All @@ -81,17 +81,17 @@ public void initialize(ConnectorContext ctx, List<Properties> taskConfigs) {
*
* @param props configuration settings
*/
public abstract void start(Properties props);
public abstract void start(Map<String, String> props);

/**
* Reconfigure this Connector. Most implementations will not override this, using the default
* implementation that calls {@link #stop()} followed by {@link #start(Properties)}.
* implementation that calls {@link #stop()} followed by {@link #start(Map)}.
* Implementations only need to override this if they want to handle this process more
* efficiently, e.g. without shutting down network connections to the external system.
*
* @param props new configuration settings
*/
public void reconfigure(Properties props) {
public void reconfigure(Map<String, String> props) {
stop();
start(props);
}
Expand All @@ -108,7 +108,7 @@ public void reconfigure(Properties props) {
* @param maxTasks maximum number of configurations to generate
* @return configurations for Tasks
*/
public abstract List<Properties> taskConfigs(int maxTasks);
public abstract List<Map<String, String>> taskConfigs(int maxTasks);

/**
* Stop this connector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Properties;
import java.util.Map;

/**
* <p>
Expand All @@ -40,7 +40,7 @@ public interface Task {
* Start the Task
* @param props initial configuration
*/
void start(Properties props);
void start(Map<String, String> props);

/**
* Stop this task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import java.util.Collection;
import java.util.Map;
import java.util.Properties;

/**
* SinkTask is a Task takes records loaded from Kafka and sends them to another system. In
Expand Down Expand Up @@ -52,7 +51,7 @@ public void initialize(SinkTaskContext context) {
* @param props initial configuration
*/
@Override
public abstract void start(Properties props);
public abstract void start(Map<String, String> props);

/**
* Put the records in the sink. Usually this should send the records to the sink asynchronously
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.kafka.copycat.connector.Task;

import java.util.List;
import java.util.Properties;
import java.util.Map;

/**
* SourceTask is a Task that pulls records from another system for storage in Kafka.
Expand All @@ -43,7 +43,7 @@ public void initialize(SourceTaskContext context) {
* @param props initial configuration
*/
@Override
public abstract void start(Properties props);
public abstract void start(Map<String, String> props);

/**
* Poll this SourceTask for new records. This method should block if no data is currently
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
import org.apache.kafka.copycat.errors.CopycatException;
import org.junit.Test;

import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Map;

import static org.junit.Assert.assertEquals;

Expand All @@ -30,15 +31,15 @@ public class ConnectorReconfigurationTest {
@Test
public void testDefaultReconfigure() throws Exception {
TestConnector conn = new TestConnector(false);
conn.reconfigure(new Properties());
conn.reconfigure(Collections.<String, String>emptyMap());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice :)

assertEquals(conn.stopOrder, 0);
assertEquals(conn.configureOrder, 1);
}

@Test(expected = CopycatException.class)
public void testReconfigureStopException() throws Exception {
TestConnector conn = new TestConnector(true);
conn.reconfigure(new Properties());
conn.reconfigure(Collections.<String, String>emptyMap());
}

private static class TestConnector extends Connector {
Expand All @@ -52,7 +53,7 @@ public TestConnector(boolean stopException) {
}

@Override
public void start(Properties props) {
public void start(Map<String, String> props) {
configureOrder = order++;
}

Expand All @@ -62,7 +63,7 @@ public Class<? extends Task> taskClass() {
}

@Override
public List<Properties> taskConfigs(int count) {
public List<Map<String, String>> taskConfigs(int count) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
import org.apache.kafka.copycat.sink.SinkConnector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Map;

/**
* Very simple connector that works with the console. This connector supports both source and
Expand All @@ -34,8 +35,8 @@ public class FileStreamSinkConnector extends SinkConnector {
private String filename;

@Override
public void start(Properties props) {
filename = props.getProperty(FILE_CONFIG);
public void start(Map<String, String> props) {
filename = props.get(FILE_CONFIG);
}

@Override
Expand All @@ -44,12 +45,12 @@ public Class<? extends Task> taskClass() {
}

@Override
public List<Properties> taskConfigs(int maxTasks) {
ArrayList<Properties> configs = new ArrayList<>();
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
Properties config = new Properties();
Map<String, String> config = new HashMap<>();
if (filename != null)
config.setProperty(FILE_CONFIG, filename);
config.put(FILE_CONFIG, filename);
configs.add(config);
}
return configs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.io.PrintStream;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;

/**
* FileStreamSinkTask writes records to stdout or a file.
Expand All @@ -51,8 +50,8 @@ public FileStreamSinkTask(PrintStream outputStream) {
}

@Override
public void start(Properties props) {
filename = props.getProperty(FileStreamSinkConnector.FILE_CONFIG);
public void start(Map<String, String> props) {
filename = props.get(FileStreamSinkConnector.FILE_CONFIG);
if (filename == null) {
outputStream = System.out;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
import org.apache.kafka.copycat.source.SourceConnector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Map;

/**
* Very simple connector that works with the console. This connector supports both source and
Expand All @@ -37,9 +38,9 @@ public class FileStreamSourceConnector extends SourceConnector {
private String topic;

@Override
public void start(Properties props) {
filename = props.getProperty(FILE_CONFIG);
topic = props.getProperty(TOPIC_CONFIG);
public void start(Map<String, String> props) {
filename = props.get(FILE_CONFIG);
topic = props.get(TOPIC_CONFIG);
if (topic == null || topic.isEmpty())
throw new CopycatException("FileStreamSourceConnector configuration must include 'topic' setting");
if (topic.contains(","))
Expand All @@ -52,13 +53,13 @@ public Class<? extends Task> taskClass() {
}

@Override
public List<Properties> taskConfigs(int maxTasks) {
ArrayList<Properties> configs = new ArrayList<>();
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
// Only one input stream makes sense.
Properties config = new Properties();
Map<String, String> config = new HashMap<>();
if (filename != null)
config.setProperty(FILE_CONFIG, filename);
config.setProperty(TOPIC_CONFIG, topic);
config.put(FILE_CONFIG, filename);
config.put(TOPIC_CONFIG, topic);
configs.add(config);
return configs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ public class FileStreamSourceTask extends SourceTask {
private Long streamOffset;

@Override
public void start(Properties props) {
filename = props.getProperty(FileStreamSourceConnector.FILE_CONFIG);
public void start(Map<String, String> props) {
filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
if (filename == null || filename.isEmpty()) {
stream = System.in;
// Tracking offset for stdin doesn't make sense
streamOffset = null;
reader = new BufferedReader(new InputStreamReader(stream));
}
topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG);
topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
if (topic == null)
throw new CopycatException("FileStreamSourceTask config missing topic setting");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
import org.powermock.api.easymock.PowerMock;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Map;

import static org.junit.Assert.assertEquals;

Expand All @@ -42,32 +43,32 @@ public class FileStreamSinkConnectorTest {

private FileStreamSinkConnector connector;
private ConnectorContext ctx;
private Properties sinkProperties;
private Map<String, String> sinkProperties;

@Before
public void setup() {
connector = new FileStreamSinkConnector();
ctx = PowerMock.createMock(ConnectorContext.class);
connector.initialize(ctx);

sinkProperties = new Properties();
sinkProperties.setProperty(SinkConnector.TOPICS_CONFIG, MULTIPLE_TOPICS);
sinkProperties.setProperty(FileStreamSinkConnector.FILE_CONFIG, FILENAME);
sinkProperties = new HashMap<>();
sinkProperties.put(SinkConnector.TOPICS_CONFIG, MULTIPLE_TOPICS);
sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, FILENAME);
}

@Test
public void testSinkTasks() {
PowerMock.replayAll();

connector.start(sinkProperties);
List<Properties> taskConfigs = connector.taskConfigs(1);
List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
assertEquals(1, taskConfigs.size());
assertEquals(FILENAME, taskConfigs.get(0).getProperty(FileStreamSinkConnector.FILE_CONFIG));
assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));

taskConfigs = connector.taskConfigs(2);
assertEquals(2, taskConfigs.size());
for (int i = 0; i < 2; i++) {
assertEquals(FILENAME, taskConfigs.get(0).getProperty(FileStreamSinkConnector.FILE_CONFIG));
assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));
}

PowerMock.verifyAll();
Expand Down
Loading