diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java index 79caeebf21041..6d54aabf9f587 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java @@ -79,6 +79,15 @@ public void initialize(ConnectorContext ctx, List> taskConfi // are very different, but reduces the difficulty of implementing a Connector } + /** + * Returns the context object used to interact with the Kafka Connect runtime. + * + * @return the context for this Connector. + */ + protected ConnectorContext context() { + return context; + } + /** * Start this Connector. This method will only be called on a clean Connector, i.e. it has * either just been instantiated and initialized or {@link #stop()} has been invoked. diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java index 5c5886163e23d..9627571482bc4 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java @@ -34,4 +34,9 @@ public abstract class SinkConnector extends Connector { */ public static final String TOPICS_CONFIG = "topics"; + @Override + protected SinkConnectorContext context() { + return (SinkConnectorContext) context; + } + } diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnectorContext.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnectorContext.java new file mode 100644 index 0000000000000..5e2b07a9fbb99 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnectorContext.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.sink; + +import org.apache.kafka.connect.connector.ConnectorContext; + +/** + * A context to allow a {@link SinkConnector} to interact with the Kafka Connect runtime. + */ +public interface SinkConnectorContext extends ConnectorContext { +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java index 0ca3b335fc289..6e9694024d334 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java @@ -24,4 +24,8 @@ */ public abstract class SourceConnector extends Connector { + @Override + protected SourceConnectorContext context() { + return (SourceConnectorContext) context; + } } diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnectorContext.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnectorContext.java new file mode 100644 index 0000000000000..417fbddd08d46 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnectorContext.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.source; + +import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.storage.OffsetStorageReader; + +/** + * A context to allow a {@link SourceConnector} to interact with the Kafka Connect runtime. + */ +public interface SourceConnectorContext extends ConnectorContext { + + /** + * Returns the {@link OffsetStorageReader} for this SourceConnectorContext. + * @return the OffsetStorageReader for this connector. + */ + OffsetStorageReader offsetStorageReader(); +} diff --git a/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java new file mode 100644 index 0000000000000..7addf8f0f048e --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.connector; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.Before; +import org.junit.Test; + +public abstract class ConnectorTest { + + protected ConnectorContext context; + protected Connector connector; + protected AssertableConnector assertableConnector; + + @Before + public void beforeEach() { + connector = createConnector(); + context = createContext(); + assertableConnector = (AssertableConnector) connector; + } + + @Test + public void shouldInitializeContext() { + connector.initialize(context); + assertableConnector.assertInitialized(); + assertableConnector.assertContext(context); + assertableConnector.assertTaskConfigs(null); + } + + @Test + public void shouldInitializeContextWithTaskConfigs() { + List> taskConfigs = new ArrayList<>(); + connector.initialize(context, taskConfigs); + assertableConnector.assertInitialized(); + assertableConnector.assertContext(context); + assertableConnector.assertTaskConfigs(taskConfigs); + } + + @Test + public void shouldStopAndStartWhenReconfigure() { + Map props = new HashMap<>(); + connector.initialize(context); + assertableConnector.assertContext(context); + assertableConnector.assertStarted(false); + assertableConnector.assertStopped(false); + connector.reconfigure(props); + assertableConnector.assertStarted(true); + assertableConnector.assertStopped(true); + assertableConnector.assertProperties(props); + } + + protected abstract ConnectorContext createContext(); + + protected abstract Connector createConnector(); + + public interface AssertableConnector { + + void assertContext(ConnectorContext expected); + + void assertInitialized(); + + void assertTaskConfigs(List> expectedTaskConfigs); + + void assertStarted(boolean expected); + + void assertStopped(boolean expected); + + void assertProperties(Map expected); + } +} \ No newline at end of file diff --git a/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java new file mode 100644 index 0000000000000..a4924ae3bbff6 --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.sink; + +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.connector.ConnectorTest; +import org.apache.kafka.connect.connector.Task; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public class SinkConnectorTest extends ConnectorTest { + + @Override + protected TestSinkConnectorContext createContext() { + return new TestSinkConnectorContext(); + } + + @Override + protected TestSinkConnector createConnector() { + return new TestSinkConnector(); + } + + private static class TestSinkConnectorContext implements SinkConnectorContext { + + @Override + public void requestTaskReconfiguration() { + // Unexpected in these tests + throw new UnsupportedOperationException(); + } + + @Override + public void raiseError(Exception e) { + // Unexpected in these tests + throw new UnsupportedOperationException(); + } + } + + protected static class TestSinkConnector extends SinkConnector implements ConnectorTest.AssertableConnector { + + public static final String VERSION = "an entirely different version"; + + private boolean initialized; + private List> taskConfigs; + private Map props; + private boolean started; + private boolean stopped; + + @Override + public String version() { + return VERSION; + } + + @Override + public void initialize(ConnectorContext ctx) { + super.initialize(ctx); + initialized = true; + this.taskConfigs = null; + } + + @Override + public void initialize(ConnectorContext ctx, List> taskConfigs) { + super.initialize(ctx, taskConfigs); + initialized = true; + this.taskConfigs = taskConfigs; + } + + @Override + public void start(Map props) { + this.props = props; + started = true; + } + + @Override + public Class taskClass() { + return null; + } + + @Override + public List> taskConfigs(int maxTasks) { + return null; + } + + @Override + public void stop() { + stopped = true; + } + + @Override + public ConfigDef config() { + return new ConfigDef() + .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs") + .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs"); + } + + @Override + public void assertContext(ConnectorContext expected) { + assertSame(expected, context); + assertSame(expected, context()); + } + + @Override + public void assertInitialized() { + assertTrue(initialized); + } + + @Override + public void assertTaskConfigs(List> expectedTaskConfigs) { + assertSame(expectedTaskConfigs, taskConfigs); + } + + @Override + public void assertStarted(boolean expected) { + assertEquals(expected, started); + } + + @Override + public void assertStopped(boolean expected) { + assertEquals(expected, stopped); + } + + @Override + public void assertProperties(Map expected) { + assertSame(expected, props); + } + } +} \ No newline at end of file diff --git a/connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java new file mode 100644 index 0000000000000..9b105409fffba --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.source; + +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.connector.ConnectorTest; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.storage.OffsetStorageReader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public class SourceConnectorTest extends ConnectorTest { + + @Override + protected ConnectorContext createContext() { + return new TestSourceConnectorContext(); + } + + @Override + protected TestSourceConnector createConnector() { + return new TestSourceConnector(); + } + + private static class TestSourceConnectorContext implements SourceConnectorContext { + + @Override + public void requestTaskReconfiguration() { + // Unexpected in these tests + throw new UnsupportedOperationException(); + } + + @Override + public void raiseError(Exception e) { + // Unexpected in these tests + throw new UnsupportedOperationException(); + } + + @Override + public OffsetStorageReader offsetStorageReader() { + return null; + } + } + + private static class TestSourceConnector extends SourceConnector implements AssertableConnector { + + public static final String VERSION = "an entirely different version"; + + private boolean initialized; + private List> taskConfigs; + private Map props; + private boolean started; + private boolean stopped; + + @Override + public String version() { + return VERSION; + } + + @Override + public void initialize(ConnectorContext ctx) { + super.initialize(ctx); + initialized = true; + this.taskConfigs = null; + } + + @Override + public void initialize(ConnectorContext ctx, List> taskConfigs) { + super.initialize(ctx, taskConfigs); + initialized = true; + this.taskConfigs = taskConfigs; + } + + @Override + public void start(Map props) { + this.props = props; + started = true; + } + + @Override + public Class taskClass() { + return null; + } + + @Override + public List> taskConfigs(int maxTasks) { + return null; + } + + @Override + public void stop() { + stopped = true; + } + + @Override + public ConfigDef config() { + return new ConfigDef() + .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs") + .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs"); + } + + @Override + public void assertContext(ConnectorContext expected) { + assertSame(expected, context); + assertSame(expected, context()); + } + + @Override + public void assertInitialized() { + assertTrue(initialized); + } + + @Override + public void assertTaskConfigs(List> expectedTaskConfigs) { + assertSame(expectedTaskConfigs, taskConfigs); + } + + @Override + public void assertStarted(boolean expected) { + assertEquals(expected, started); + } + + @Override + public void assertStopped(boolean expected) { + assertEquals(expected, stopped); + } + + @Override + public void assertProperties(Map expected) { + assertSame(expected, props); + } + } +} \ No newline at end of file diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 99e44d470a22c..98e8e78e2358b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -54,6 +54,7 @@ import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.OffsetBackingStore; +import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; import org.apache.kafka.connect.storage.OffsetStorageWriter; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -252,7 +253,21 @@ public boolean startConnector( final String connClass = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG); log.info("Creating connector {} of type {}", connName, connClass); final Connector connector = plugins.newConnector(connClass); - workerConnector = new WorkerConnector(connName, connector, ctx, metrics, statusListener); + + final OffsetStorageReader offsetReader = new OffsetStorageReaderImpl( + offsetBackingStore, + connName, + internalKeyConverter, + internalValueConverter + ); + workerConnector = new WorkerConnector( + connName, + connector, + ctx, + metrics, + statusListener, + offsetReader + ); log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass()); savedLoader = plugins.compareAndSwapLoaders(connector); workerConnector.initialize(connConfig); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java index 7923943d121f7..ead9d712a5c88 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java @@ -18,9 +18,13 @@ import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.sink.SinkConnectorContext; +import org.apache.kafka.connect.source.SourceConnectorContext; +import org.apache.kafka.connect.storage.OffsetStorageReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,41 +62,36 @@ private enum State { private Map config; private State state; + private final OffsetStorageReader offsetStorageReader; public WorkerConnector(String connName, Connector connector, ConnectorContext ctx, ConnectMetrics metrics, - ConnectorStatus.Listener statusListener) { + ConnectorStatus.Listener statusListener, + OffsetStorageReader offsetStorageReader) { this.connName = connName; this.ctx = ctx; this.connector = connector; this.state = State.INIT; this.metrics = new ConnectorMetricsGroup(metrics, AbstractStatus.State.UNASSIGNED, statusListener); this.statusListener = this.metrics; + this.offsetStorageReader = offsetStorageReader; } public void initialize(ConnectorConfig connectorConfig) { try { + if (!isSourceConnector() && !isSinkConnector()) { + throw new ConnectException("Connector implementations must be a subclass of either SourceConnector or SinkConnector"); + } this.config = connectorConfig.originalsStrings(); log.debug("{} Initializing connector {}", this, connName); if (isSinkConnector()) { SinkConnectorConfig.validate(config); + connector.initialize(new WorkerSinkConnectorContext()); + } else { + connector.initialize(new WorkerSourceConnectorContext(offsetStorageReader)); } - - connector.initialize(new ConnectorContext() { - @Override - public void requestTaskReconfiguration() { - ctx.requestTaskReconfiguration(); - } - - @Override - public void raiseError(Exception e) { - log.error("{} Connector raised an error", WorkerConnector.this, e); - onFailure(e); - ctx.raiseError(e); - } - }); } catch (Throwable t) { log.error("{} Error initializing connector", this, t); onFailure(t); @@ -319,4 +318,36 @@ protected MetricGroup metricGroup() { return metricGroup; } } + + private abstract class WorkerConnectorContext implements ConnectorContext { + + @Override + public void requestTaskReconfiguration() { + WorkerConnector.this.ctx.requestTaskReconfiguration(); + } + + @Override + public void raiseError(Exception e) { + log.error("{} Connector raised an error", WorkerConnector.this, e); + onFailure(e); + WorkerConnector.this.ctx.raiseError(e); + } + } + + private class WorkerSinkConnectorContext extends WorkerConnectorContext implements SinkConnectorContext { + } + + private class WorkerSourceConnectorContext extends WorkerConnectorContext implements SourceConnectorContext { + + private final OffsetStorageReader offsetStorageReader; + + WorkerSourceConnectorContext(OffsetStorageReader offsetStorageReader) { + this.offsetStorageReader = offsetStorageReader; + } + + @Override + public OffsetStorageReader offsetStorageReader() { + return offsetStorageReader; + } + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java index 10c413d23d600..088900b4c5a22 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java @@ -18,8 +18,15 @@ import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkConnectorContext; +import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.source.SourceConnectorContext; +import org.apache.kafka.connect.storage.OffsetStorageReader; +import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; @@ -47,14 +54,18 @@ public class WorkerConnectorTest extends EasyMockSupport { static { CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName()); CONFIG.put(ConnectorConfig.NAME_CONFIG, CONNECTOR); + CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "my-topic"); } public ConnectorConfig connectorConfig; public MockConnectMetrics metrics; @Mock Plugins plugins; + @Mock SourceConnector sourceConnector; + @Mock SinkConnector sinkConnector; @Mock Connector connector; @Mock ConnectorContext ctx; @Mock ConnectorStatus.Listener listener; + @Mock OffsetStorageReader offsetStorageReader; @Before public void setup() { @@ -70,11 +81,12 @@ public void tearDown() { @Test public void testInitializeFailure() { RuntimeException exception = new RuntimeException(); + connector = sourceConnector; connector.version(); expectLastCall().andReturn(VERSION); - connector.initialize(EasyMock.notNull(ConnectorContext.class)); + connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); expectLastCall().andThrow(exception); listener.onFailure(CONNECTOR, exception); @@ -85,7 +97,7 @@ public void testInitializeFailure() { replayAll(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); assertFailedMetric(workerConnector); @@ -98,11 +110,12 @@ public void testInitializeFailure() { @Test public void testFailureIsFinalState() { RuntimeException exception = new RuntimeException(); + connector = sinkConnector; connector.version(); expectLastCall().andReturn(VERSION); - connector.initialize(EasyMock.notNull(ConnectorContext.class)); + connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); expectLastCall().andThrow(exception); listener.onFailure(CONNECTOR, exception); @@ -115,7 +128,7 @@ public void testFailureIsFinalState() { replayAll(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); assertFailedMetric(workerConnector); @@ -129,10 +142,11 @@ public void testFailureIsFinalState() { @Test public void testStartupAndShutdown() { + connector = sourceConnector; connector.version(); expectLastCall().andReturn(VERSION); - connector.initialize(EasyMock.notNull(ConnectorContext.class)); + connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); expectLastCall(); connector.start(CONFIG); @@ -149,10 +163,10 @@ public void testStartupAndShutdown() { replayAll(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); - assertInitializedMetric(workerConnector); + assertInitializedSourceMetric(workerConnector); workerConnector.transitionTo(TargetState.STARTED); assertRunningMetric(workerConnector); workerConnector.shutdown(); @@ -163,10 +177,11 @@ public void testStartupAndShutdown() { @Test public void testStartupAndPause() { + connector = sinkConnector; connector.version(); expectLastCall().andReturn(VERSION); - connector.initialize(EasyMock.notNull(ConnectorContext.class)); + connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); expectLastCall(); connector.start(CONFIG); @@ -186,10 +201,10 @@ public void testStartupAndPause() { replayAll(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); - assertInitializedMetric(workerConnector); + assertInitializedSinkMetric(workerConnector); workerConnector.transitionTo(TargetState.STARTED); assertRunningMetric(workerConnector); workerConnector.transitionTo(TargetState.PAUSED); @@ -202,10 +217,11 @@ public void testStartupAndPause() { @Test public void testOnResume() { + connector = sourceConnector; connector.version(); expectLastCall().andReturn(VERSION); - connector.initialize(EasyMock.notNull(ConnectorContext.class)); + connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); expectLastCall(); listener.onPause(CONNECTOR); @@ -225,10 +241,10 @@ public void testOnResume() { replayAll(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); - assertInitializedMetric(workerConnector); + assertInitializedSourceMetric(workerConnector); workerConnector.transitionTo(TargetState.PAUSED); assertPausedMetric(workerConnector); workerConnector.transitionTo(TargetState.STARTED); @@ -241,10 +257,11 @@ public void testOnResume() { @Test public void testStartupPaused() { + connector = sinkConnector; connector.version(); expectLastCall().andReturn(VERSION); - connector.initialize(EasyMock.notNull(ConnectorContext.class)); + connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); expectLastCall(); // connector never gets started @@ -257,10 +274,10 @@ public void testStartupPaused() { replayAll(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); - assertInitializedMetric(workerConnector); + assertInitializedSinkMetric(workerConnector); workerConnector.transitionTo(TargetState.PAUSED); assertPausedMetric(workerConnector); workerConnector.shutdown(); @@ -273,10 +290,11 @@ public void testStartupPaused() { public void testStartupFailure() { RuntimeException exception = new RuntimeException(); + connector = sinkConnector; connector.version(); expectLastCall().andReturn(VERSION); - connector.initialize(EasyMock.notNull(ConnectorContext.class)); + connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); expectLastCall(); connector.start(CONFIG); @@ -290,10 +308,10 @@ public void testStartupFailure() { replayAll(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); - assertInitializedMetric(workerConnector); + assertInitializedSinkMetric(workerConnector); workerConnector.transitionTo(TargetState.STARTED); assertFailedMetric(workerConnector); workerConnector.shutdown(); @@ -305,11 +323,12 @@ public void testStartupFailure() { @Test public void testShutdownFailure() { RuntimeException exception = new RuntimeException(); + connector = sourceConnector; connector.version(); expectLastCall().andReturn(VERSION); - connector.initialize(EasyMock.notNull(ConnectorContext.class)); + connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); expectLastCall(); connector.start(CONFIG); @@ -326,10 +345,10 @@ public void testShutdownFailure() { replayAll(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); - assertInitializedMetric(workerConnector); + assertInitializedSourceMetric(workerConnector); workerConnector.transitionTo(TargetState.STARTED); assertRunningMetric(workerConnector); workerConnector.shutdown(); @@ -340,10 +359,11 @@ public void testShutdownFailure() { @Test public void testTransitionStartedToStarted() { + connector = sourceConnector; connector.version(); expectLastCall().andReturn(VERSION); - connector.initialize(EasyMock.notNull(ConnectorContext.class)); + connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); expectLastCall(); connector.start(CONFIG); @@ -361,10 +381,10 @@ public void testTransitionStartedToStarted() { replayAll(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); - assertInitializedMetric(workerConnector); + assertInitializedSourceMetric(workerConnector); workerConnector.transitionTo(TargetState.STARTED); assertRunningMetric(workerConnector); workerConnector.transitionTo(TargetState.STARTED); @@ -377,10 +397,11 @@ public void testTransitionStartedToStarted() { @Test public void testTransitionPausedToPaused() { + connector = sourceConnector; connector.version(); expectLastCall().andReturn(VERSION); - connector.initialize(EasyMock.notNull(ConnectorContext.class)); + connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); expectLastCall(); connector.start(CONFIG); @@ -400,10 +421,10 @@ public void testTransitionPausedToPaused() { replayAll(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); - assertInitializedMetric(workerConnector); + assertInitializedSourceMetric(workerConnector); workerConnector.transitionTo(TargetState.STARTED); assertRunningMetric(workerConnector); workerConnector.transitionTo(TargetState.PAUSED); @@ -416,6 +437,27 @@ public void testTransitionPausedToPaused() { verifyAll(); } + @Test + public void testFailConnectorThatIsNeitherSourceNorSink() { + connector.version(); + expectLastCall().andReturn(VERSION); + + Capture exceptionCapture = Capture.newInstance(); + listener.onFailure(EasyMock.eq(CONNECTOR), EasyMock.capture(exceptionCapture)); + expectLastCall(); + + replayAll(); + + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); + + workerConnector.initialize(connectorConfig); + Throwable e = exceptionCapture.getValue(); + assertTrue(e instanceof ConnectException); + assertTrue(e.getMessage().contains("must be a subclass of")); + + verifyAll(); + } + protected void assertFailedMetric(WorkerConnector workerConnector) { assertFalse(workerConnector.metrics().isUnassigned()); assertTrue(workerConnector.metrics().isFailed()); @@ -444,7 +486,15 @@ protected void assertStoppedMetric(WorkerConnector workerConnector) { assertFalse(workerConnector.metrics().isRunning()); } - protected void assertInitializedMetric(WorkerConnector workerConnector) { + protected void assertInitializedSinkMetric(WorkerConnector workerConnector) { + assertInitializedMetric(workerConnector, "sink"); + } + + protected void assertInitializedSourceMetric(WorkerConnector workerConnector) { + assertInitializedMetric(workerConnector, "source"); + } + + protected void assertInitializedMetric(WorkerConnector workerConnector, String expectedType) { assertTrue(workerConnector.metrics().isUnassigned()); assertFalse(workerConnector.metrics().isFailed()); assertFalse(workerConnector.metrics().isPaused()); @@ -454,12 +504,11 @@ protected void assertInitializedMetric(WorkerConnector workerConnector) { String type = metrics.currentMetricValueAsString(metricGroup, "connector-type"); String clazz = metrics.currentMetricValueAsString(metricGroup, "connector-class"); String version = metrics.currentMetricValueAsString(metricGroup, "connector-version"); - assertEquals(type, "unknown"); + assertEquals(expectedType, type); assertNotNull(clazz); assertEquals(VERSION, version); } private static abstract class TestConnector extends Connector { } - } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 1e8c0bccaffd3..b3fb6758f57f6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.config.provider.MockFileConfigProvider; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; @@ -48,7 +47,9 @@ import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.storage.Converter; @@ -125,7 +126,8 @@ public class WorkerTest extends ThreadedTest { @Mock private Herder herder; @Mock private StatusBackingStore statusBackingStore; - @Mock private Connector connector; + @Mock private SourceConnector sourceConnector; + @Mock private SinkConnector sinkConnector; @Mock private ConnectorContext ctx; @Mock private TestSourceTask task; @Mock private WorkerSourceTask workerTask; @@ -185,8 +187,8 @@ public void testStartAndStopConnector() { // Create EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName())) - .andReturn(connector); - EasyMock.expect(connector.version()).andReturn("1.0"); + .andReturn(sourceConnector); + EasyMock.expect(sourceConnector.version()).andReturn("1.0"); Map props = new HashMap<>(); props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); @@ -194,16 +196,16 @@ public void testStartAndStopConnector() { props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName()); - EasyMock.expect(connector.version()).andReturn("1.0"); + EasyMock.expect(sourceConnector.version()).andReturn("1.0"); expectFileConfigProvider(); - EasyMock.expect(plugins.compareAndSwapLoaders(connector)) + EasyMock.expect(plugins.compareAndSwapLoaders(sourceConnector)) .andReturn(delegatingLoader) .times(2); - connector.initialize(anyObject(ConnectorContext.class)); + sourceConnector.initialize(anyObject(ConnectorContext.class)); EasyMock.expectLastCall(); - connector.start(props); + sourceConnector.start(props); EasyMock.expectLastCall(); EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) @@ -213,7 +215,7 @@ public void testStartAndStopConnector() { EasyMock.expectLastCall(); // Remove - connector.stop(); + sourceConnector.stop(); EasyMock.expectLastCall(); connectorStatusListener.onShutdown(CONNECTOR_ID); @@ -313,8 +315,8 @@ public void testAddConnectorByAlias() { expectFileConfigProvider(); EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); - EasyMock.expect(plugins.newConnector("WorkerTestConnector")).andReturn(connector); - EasyMock.expect(connector.version()).andReturn("1.0"); + EasyMock.expect(plugins.newConnector("WorkerTestConnector")).andReturn(sinkConnector); + EasyMock.expect(sinkConnector.version()).andReturn("1.0"); Map props = new HashMap<>(); props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); @@ -322,13 +324,13 @@ public void testAddConnectorByAlias() { props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTestConnector"); - EasyMock.expect(connector.version()).andReturn("1.0"); - EasyMock.expect(plugins.compareAndSwapLoaders(connector)) + EasyMock.expect(sinkConnector.version()).andReturn("1.0"); + EasyMock.expect(plugins.compareAndSwapLoaders(sinkConnector)) .andReturn(delegatingLoader) .times(2); - connector.initialize(anyObject(ConnectorContext.class)); + sinkConnector.initialize(anyObject(ConnectorContext.class)); EasyMock.expectLastCall(); - connector.start(props); + sinkConnector.start(props); EasyMock.expectLastCall(); EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) @@ -339,7 +341,7 @@ public void testAddConnectorByAlias() { EasyMock.expectLastCall(); // Remove - connector.stop(); + sinkConnector.stop(); EasyMock.expectLastCall(); connectorStatusListener.onShutdown(CONNECTOR_ID); @@ -379,8 +381,8 @@ public void testAddConnectorByShortAlias() { expectFileConfigProvider(); EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); - EasyMock.expect(plugins.newConnector("WorkerTest")).andReturn(connector); - EasyMock.expect(connector.version()).andReturn("1.0"); + EasyMock.expect(plugins.newConnector("WorkerTest")).andReturn(sinkConnector); + EasyMock.expect(sinkConnector.version()).andReturn("1.0"); Map props = new HashMap<>(); props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); @@ -388,13 +390,13 @@ public void testAddConnectorByShortAlias() { props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTest"); - EasyMock.expect(connector.version()).andReturn("1.0"); - EasyMock.expect(plugins.compareAndSwapLoaders(connector)) + EasyMock.expect(sinkConnector.version()).andReturn("1.0"); + EasyMock.expect(plugins.compareAndSwapLoaders(sinkConnector)) .andReturn(delegatingLoader) .times(2); - connector.initialize(anyObject(ConnectorContext.class)); + sinkConnector.initialize(anyObject(ConnectorContext.class)); EasyMock.expectLastCall(); - connector.start(props); + sinkConnector.start(props); EasyMock.expectLastCall(); EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) @@ -405,7 +407,7 @@ public void testAddConnectorByShortAlias() { EasyMock.expectLastCall(); // Remove - connector.stop(); + sinkConnector.stop(); EasyMock.expectLastCall(); connectorStatusListener.onShutdown(CONNECTOR_ID); @@ -460,8 +462,8 @@ public void testReconfigureConnectorTasks() { EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(3); EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName())) - .andReturn(connector); - EasyMock.expect(connector.version()).andReturn("1.0"); + .andReturn(sinkConnector); + EasyMock.expect(sinkConnector.version()).andReturn("1.0"); Map props = new HashMap<>(); props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); @@ -469,13 +471,13 @@ public void testReconfigureConnectorTasks() { props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName()); - EasyMock.expect(connector.version()).andReturn("1.0"); - EasyMock.expect(plugins.compareAndSwapLoaders(connector)) + EasyMock.expect(sinkConnector.version()).andReturn("1.0"); + EasyMock.expect(plugins.compareAndSwapLoaders(sinkConnector)) .andReturn(delegatingLoader) .times(3); - connector.initialize(anyObject(ConnectorContext.class)); + sinkConnector.initialize(anyObject(ConnectorContext.class)); EasyMock.expectLastCall(); - connector.start(props); + sinkConnector.start(props); EasyMock.expectLastCall(); EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) @@ -486,13 +488,13 @@ public void testReconfigureConnectorTasks() { EasyMock.expectLastCall(); // Reconfigure - EasyMock.>expect(connector.taskClass()).andReturn(TestSourceTask.class); + EasyMock.>expect(sinkConnector.taskClass()).andReturn(TestSourceTask.class); Map taskProps = new HashMap<>(); taskProps.put("foo", "bar"); - EasyMock.expect(connector.taskConfigs(2)).andReturn(Arrays.asList(taskProps, taskProps)); + EasyMock.expect(sinkConnector.taskConfigs(2)).andReturn(Arrays.asList(taskProps, taskProps)); // Remove - connector.stop(); + sinkConnector.stop(); EasyMock.expectLastCall(); connectorStatusListener.onShutdown(CONNECTOR_ID); @@ -1359,7 +1361,7 @@ private Map anyConnectorConfigMap() { } /* Name here needs to be unique as we are testing the aliasing mechanism */ - public static class WorkerTestConnector extends Connector { + public static class WorkerTestConnector extends SourceConnector { private static final ConfigDef CONFIG_DEF = new ConfigDef() .define("configName", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Test configName.");