Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfi
// are very different, but reduces the difficulty of implementing a Connector
}

/**
* Returns the context object used to interact with the Kafka Connect runtime.
*
* @return the context for this Connector.
*/
Comment thread
rhauch marked this conversation as resolved.
Outdated
protected ConnectorContext context() {
return context;
}

/**
* Start this Connector. This method will only be called on a clean Connector, i.e. it has
* either just been instantiated and initialized or {@link #stop()} has been invoked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,9 @@ public abstract class SinkConnector extends Connector {
*/
public static final String TOPICS_CONFIG = "topics";

@Override
protected SinkConnectorContext context() {
Comment thread
rhauch marked this conversation as resolved.
Outdated
return (SinkConnectorContext) context;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.sink;

import org.apache.kafka.connect.connector.ConnectorContext;

/**
* A context to allow a {@link SinkConnector} to interact with the Kafka Connect runtime.
*/
public interface SinkConnectorContext extends ConnectorContext {
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,8 @@
*/
public abstract class SourceConnector extends Connector {

@Override
protected SourceConnectorContext context() {
return (SourceConnectorContext) context;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.source;

import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;

/**
* A context to allow a {@link SourceConnector} to interact with the Kafka Connect runtime.
*/
public interface SourceConnectorContext extends ConnectorContext {

/**
* Returns the {@link OffsetStorageReader} for this SourceConnectorContext.
* @return the OffsetStorageReader for this connector.
*/
Comment thread
rhauch marked this conversation as resolved.
Outdated
OffsetStorageReader offsetStorageReader();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.connector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.junit.Before;
import org.junit.Test;

public abstract class ConnectorTest {

protected ConnectorContext context;
protected Connector connector;
protected AssertableConnector assertableConnector;

@Before
public void beforeEach() {
connector = createConnector();
context = createContext();
assertableConnector = (AssertableConnector) connector;
}

@Test
public void shouldInitializeContext() {
connector.initialize(context);
assertableConnector.assertInitialized();
assertableConnector.assertContext(context);
assertableConnector.assertTaskConfigs(null);
}

@Test
public void shouldInitializeContextWithTaskConfigs() {
List<Map<String, String>> taskConfigs = new ArrayList<>();
connector.initialize(context, taskConfigs);
assertableConnector.assertInitialized();
assertableConnector.assertContext(context);
assertableConnector.assertTaskConfigs(taskConfigs);
}

@Test
public void shouldStopAndStartWhenReconfigure() {
Map<String, String> props = new HashMap<>();
connector.initialize(context);
assertableConnector.assertContext(context);
assertableConnector.assertStarted(false);
assertableConnector.assertStopped(false);
connector.reconfigure(props);
assertableConnector.assertStarted(true);
assertableConnector.assertStopped(true);
assertableConnector.assertProperties(props);
}

protected abstract ConnectorContext createContext();

protected abstract Connector createConnector();

public interface AssertableConnector {

void assertContext(ConnectorContext expected);

void assertInitialized();

void assertTaskConfigs(List<Map<String, String>> expectedTaskConfigs);

void assertStarted(boolean expected);

void assertStopped(boolean expected);

void assertProperties(Map<String, String> expected);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.sink;

import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.ConnectorTest;
import org.apache.kafka.connect.connector.Task;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

public class SinkConnectorTest extends ConnectorTest {

@Override
protected TestSinkConnectorContext createContext() {
return new TestSinkConnectorContext();
}

@Override
protected TestSinkConnector createConnector() {
return new TestSinkConnector();
}

private static class TestSinkConnectorContext implements SinkConnectorContext {

@Override
public void requestTaskReconfiguration() {
// Unexpected in these tests
throw new UnsupportedOperationException();
}

@Override
public void raiseError(Exception e) {
// Unexpected in these tests
throw new UnsupportedOperationException();
}
}

protected static class TestSinkConnector extends SinkConnector implements ConnectorTest.AssertableConnector {

public static final String VERSION = "an entirely different version";

private boolean initialized;
private List<Map<String, String>> taskConfigs;
private Map<String, String> props;
private boolean started;
private boolean stopped;

@Override
public String version() {
return VERSION;
}

@Override
public void initialize(ConnectorContext ctx) {
super.initialize(ctx);
initialized = true;
this.taskConfigs = null;
}

@Override
public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs) {
super.initialize(ctx, taskConfigs);
initialized = true;
this.taskConfigs = taskConfigs;
}

@Override
public void start(Map<String, String> props) {
this.props = props;
started = true;
}

@Override
public Class<? extends Task> taskClass() {
return null;
}

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return null;
}

@Override
public void stop() {
stopped = true;
}

@Override
public ConfigDef config() {
return new ConfigDef()
.define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs")
.define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs");
}

@Override
public void assertContext(ConnectorContext expected) {
assertSame(expected, context);
assertSame(expected, context());
}

@Override
public void assertInitialized() {
assertTrue(initialized);
}

@Override
public void assertTaskConfigs(List<Map<String, String>> expectedTaskConfigs) {
assertSame(expectedTaskConfigs, taskConfigs);
}

@Override
public void assertStarted(boolean expected) {
assertEquals(expected, started);
}

@Override
public void assertStopped(boolean expected) {
assertEquals(expected, stopped);
}

@Override
public void assertProperties(Map<String, String> expected) {
assertSame(expected, props);
}
}
}
Loading