From 601e6f618944a0b27b8c3444ee053d898899b24d Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Mon, 27 Feb 2017 23:35:33 +0100 Subject: [PATCH 1/5] KAFKA-4794: Add access to OffsetStorageReader from SourceConnector Add two interfaces SinkConnectorContext/SourceConnectContext that extend ConnectorContext in order to expose an OffsetStorageReader instance. --- .../kafka/connect/connector/Connector.java | 9 ++ .../kafka/connect/sink/SinkConnector.java | 8 ++ .../connect/sink/SinkConnectorContext.java | 25 ++++ .../kafka/connect/source/SourceConnector.java | 7 ++ .../source/SourceConnectorContext.java | 32 ++++++ .../apache/kafka/connect/runtime/Worker.java | 18 ++- .../connect/runtime/WorkerConnector.java | 107 ++++++++++++++---- .../connect/runtime/WorkerConnectorTest.java | 22 ++-- 8 files changed, 198 insertions(+), 30 deletions(-) create mode 100644 connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnectorContext.java create mode 100644 connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnectorContext.java diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java index 79caeebf21041..6d54aabf9f587 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java @@ -79,6 +79,15 @@ public void initialize(ConnectorContext ctx, List> taskConfi // are very different, but reduces the difficulty of implementing a Connector } + /** + * Returns the context object used to interact with the Kafka Connect runtime. + * + * @return the context for this Connector. + */ + protected ConnectorContext context() { + return context; + } + /** * Start this Connector. This method will only be called on a clean Connector, i.e. it has * either just been instantiated and initialized or {@link #stop()} has been invoked. diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java index 5c5886163e23d..511bab363956b 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java @@ -34,4 +34,12 @@ public abstract class SinkConnector extends Connector { */ public static final String TOPICS_CONFIG = "topics"; + /** + * {@inheritDoc} + */ + @Override + protected SinkConnectorContext context() { + return (SinkConnectorContext) context; + } + } diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnectorContext.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnectorContext.java new file mode 100644 index 0000000000000..5e2b07a9fbb99 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnectorContext.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.sink; + +import org.apache.kafka.connect.connector.ConnectorContext; + +/** + * A context to allow a {@link SinkConnector} to interact with the Kafka Connect runtime. + */ +public interface SinkConnectorContext extends ConnectorContext { +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java index 0ca3b335fc289..d9cdc22891adb 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java @@ -24,4 +24,11 @@ */ public abstract class SourceConnector extends Connector { + /** + * {@inheritDoc} + */ + @Override + protected SourceConnectorContext context() { + return (SourceConnectorContext) context; + } } diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnectorContext.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnectorContext.java new file mode 100644 index 0000000000000..417fbddd08d46 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnectorContext.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.source; + +import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.storage.OffsetStorageReader; + +/** + * A context to allow a {@link SourceConnector} to interact with the Kafka Connect runtime. + */ +public interface SourceConnectorContext extends ConnectorContext { + + /** + * Returns the {@link OffsetStorageReader} for this SourceConnectorContext. + * @return the OffsetStorageReader for this connector. + */ + OffsetStorageReader offsetStorageReader(); +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 99e44d470a22c..e269df66e9778 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -54,6 +54,7 @@ import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.OffsetBackingStore; +import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; import org.apache.kafka.connect.storage.OffsetStorageWriter; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -241,6 +242,7 @@ public boolean startConnector( ConnectorStatus.Listener statusListener, TargetState initialState ) { + try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) { if (connectors.containsKey(connName)) throw new ConnectException("Connector with name " + connName + " already exists"); @@ -252,7 +254,21 @@ public boolean startConnector( final String connClass = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG); log.info("Creating connector {} of type {}", connName, connClass); final Connector connector = plugins.newConnector(connClass); - workerConnector = new WorkerConnector(connName, connector, ctx, metrics, statusListener); + + final OffsetStorageReader offsetReader = new OffsetStorageReaderImpl( + offsetBackingStore, + connName, + internalKeyConverter, + internalValueConverter + ); + workerConnector = new WorkerConnector( + connName, + connector, + ctx, + metrics, + statusListener, + offsetReader + ); log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass()); savedLoader = plugins.compareAndSwapLoaders(connector); workerConnector.initialize(connConfig); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java index 7923943d121f7..b9ba8efddcc9b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java @@ -21,6 +21,9 @@ import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.sink.SinkConnectorContext; +import org.apache.kafka.connect.source.SourceConnectorContext; +import org.apache.kafka.connect.storage.OffsetStorageReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,41 +61,40 @@ private enum State { private Map config; private State state; - - public WorkerConnector(String connName, - Connector connector, - ConnectorContext ctx, - ConnectMetrics metrics, - ConnectorStatus.Listener statusListener) { + private final OffsetStorageReader offsetStorageReader; + + public WorkerConnector(final String connName, + final Connector connector, + final ConnectorContext ctx, + final ConnectMetrics metrics, + final ConnectorStatus.Listener statusListener, + final OffsetStorageReader offsetStorageReader) { this.connName = connName; this.ctx = ctx; this.connector = connector; this.state = State.INIT; this.metrics = new ConnectorMetricsGroup(metrics, AbstractStatus.State.UNASSIGNED, statusListener); this.statusListener = this.metrics; + this.offsetStorageReader = offsetStorageReader; } public void initialize(ConnectorConfig connectorConfig) { try { this.config = connectorConfig.originalsStrings(); + log.debug("{} Initializing connector {}", this, connName); if (isSinkConnector()) { SinkConnectorConfig.validate(config); } - connector.initialize(new ConnectorContext() { - @Override - public void requestTaskReconfiguration() { - ctx.requestTaskReconfiguration(); - } - - @Override - public void raiseError(Exception e) { - log.error("{} Connector raised an error", WorkerConnector.this, e); - onFailure(e); - ctx.raiseError(e); - } - }); + final ConnectorContext delegateCtx = new DelegateToWorkerConnectorContext(); + + if (isSinkConnector()) { + SinkConnectorConfig.validate(config); + connector.initialize(new DelegateSinkConnectorContext(delegateCtx)); + } else { + connector.initialize(new DelegateSourceConnectorContext(delegateCtx, offsetStorageReader)); + } } catch (Throwable t) { log.error("{} Error initializing connector", this, t); onFailure(t); @@ -319,4 +321,71 @@ protected MetricGroup metricGroup() { return metricGroup; } } + + private class DelegateToWorkerConnectorContext implements ConnectorContext { + + @Override + public void requestTaskReconfiguration() { + WorkerConnector.this.ctx.requestTaskReconfiguration(); + } + + @Override + public void raiseError(Exception e) { + log.error("{} Connector raised an error", WorkerConnector.this, e); + onFailure(e); + WorkerConnector.this.ctx.raiseError(e); + } + } + + /** + * An internal SinkConnectorContext that delegates to worker connector. + */ + private static class DelegateSinkConnectorContext implements SinkConnectorContext { + + private final ConnectorContext delegateCtx; + + DelegateSinkConnectorContext(final ConnectorContext delegateCtx) { + this.delegateCtx = delegateCtx; + } + + @Override + public void requestTaskReconfiguration() { + delegateCtx.requestTaskReconfiguration(); + } + + @Override + public void raiseError(Exception e) { + delegateCtx.raiseError(e); + } + } + + /** + * An internal SourceConnectorContext that delegates to worker connector. + */ + private static class DelegateSourceConnectorContext implements SourceConnectorContext { + + private final ConnectorContext delegateCtx; + private final OffsetStorageReader offsetStorageReader; + + DelegateSourceConnectorContext(final ConnectorContext delegateCtx, + final OffsetStorageReader offsetStorageReader) { + this.delegateCtx = delegateCtx; + this.offsetStorageReader = offsetStorageReader; + } + + @Override + public void requestTaskReconfiguration() { + delegateCtx.requestTaskReconfiguration(); + } + + @Override + public void raiseError(Exception e) { + delegateCtx.raiseError(e); + } + + @Override + public OffsetStorageReader offsetStorageReader() { + return offsetStorageReader; + } + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java index 10c413d23d600..bdd49cb301160 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.storage.OffsetStorageReader; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; @@ -55,6 +56,7 @@ public class WorkerConnectorTest extends EasyMockSupport { @Mock Connector connector; @Mock ConnectorContext ctx; @Mock ConnectorStatus.Listener listener; + @Mock OffsetStorageReader offsetStorageReader; @Before public void setup() { @@ -85,7 +87,7 @@ public void testInitializeFailure() { replayAll(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); assertFailedMetric(workerConnector); @@ -115,7 +117,7 @@ public void testFailureIsFinalState() { replayAll(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); assertFailedMetric(workerConnector); @@ -149,7 +151,7 @@ public void testStartupAndShutdown() { replayAll(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); assertInitializedMetric(workerConnector); @@ -186,7 +188,7 @@ public void testStartupAndPause() { replayAll(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); assertInitializedMetric(workerConnector); @@ -225,7 +227,7 @@ public void testOnResume() { replayAll(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); assertInitializedMetric(workerConnector); @@ -257,7 +259,7 @@ public void testStartupPaused() { replayAll(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); assertInitializedMetric(workerConnector); @@ -290,7 +292,7 @@ public void testStartupFailure() { replayAll(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); assertInitializedMetric(workerConnector); @@ -326,7 +328,7 @@ public void testShutdownFailure() { replayAll(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); assertInitializedMetric(workerConnector); @@ -361,7 +363,7 @@ public void testTransitionStartedToStarted() { replayAll(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); assertInitializedMetric(workerConnector); @@ -400,7 +402,7 @@ public void testTransitionPausedToPaused() { replayAll(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); assertInitializedMetric(workerConnector); From 868d6fed0c0d8d281d140045f9f12f2f281b9ce1 Mon Sep 17 00:00:00 2001 From: Randall Hauch Date: Thu, 21 May 2020 17:51:37 -0500 Subject: [PATCH 2/5] KAFKA-4794: Avoid unnecessary lines --- .../src/main/java/org/apache/kafka/connect/runtime/Worker.java | 1 - .../java/org/apache/kafka/connect/runtime/WorkerConnector.java | 1 - 2 files changed, 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index e269df66e9778..98e8e78e2358b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -242,7 +242,6 @@ public boolean startConnector( ConnectorStatus.Listener statusListener, TargetState initialState ) { - try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) { if (connectors.containsKey(connName)) throw new ConnectException("Connector with name " + connName + " already exists"); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java index b9ba8efddcc9b..61b55df56fc2a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java @@ -81,7 +81,6 @@ public WorkerConnector(final String connName, public void initialize(ConnectorConfig connectorConfig) { try { this.config = connectorConfig.originalsStrings(); - log.debug("{} Initializing connector {}", this, connName); if (isSinkConnector()) { SinkConnectorConfig.validate(config); From d1d6dee07179fd024e9d0f4c10304ac7201a024d Mon Sep 17 00:00:00 2001 From: Randall Hauch Date: Thu, 21 May 2020 17:52:20 -0500 Subject: [PATCH 3/5] KAFKA-4794: Simplify SourceConnectorContext and SinkConnectorContext initialization in WorkerConnector --- .../connect/runtime/WorkerConnector.java | 51 ++------ .../connect/runtime/WorkerConnectorTest.java | 109 ++++++++++++++---- 2 files changed, 96 insertions(+), 64 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java index 61b55df56fc2a..4cc11fc765557 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java @@ -86,13 +86,13 @@ public void initialize(ConnectorConfig connectorConfig) { SinkConnectorConfig.validate(config); } - final ConnectorContext delegateCtx = new DelegateToWorkerConnectorContext(); - if (isSinkConnector()) { SinkConnectorConfig.validate(config); - connector.initialize(new DelegateSinkConnectorContext(delegateCtx)); + connector.initialize(new WorkerSinkConnectorContext()); + } else if (isSourceConnector()) { + connector.initialize(new WorkerSourceConnectorContext(offsetStorageReader)); } else { - connector.initialize(new DelegateSourceConnectorContext(delegateCtx, offsetStorageReader)); + connector.initialize(new WorkerConnectorContext()); } } catch (Throwable t) { log.error("{} Error initializing connector", this, t); @@ -321,7 +321,7 @@ protected MetricGroup metricGroup() { } } - private class DelegateToWorkerConnectorContext implements ConnectorContext { + private class WorkerConnectorContext implements ConnectorContext { @Override public void requestTaskReconfiguration() { @@ -336,52 +336,17 @@ public void raiseError(Exception e) { } } - /** - * An internal SinkConnectorContext that delegates to worker connector. - */ - private static class DelegateSinkConnectorContext implements SinkConnectorContext { - - private final ConnectorContext delegateCtx; - - DelegateSinkConnectorContext(final ConnectorContext delegateCtx) { - this.delegateCtx = delegateCtx; - } - - @Override - public void requestTaskReconfiguration() { - delegateCtx.requestTaskReconfiguration(); - } - - @Override - public void raiseError(Exception e) { - delegateCtx.raiseError(e); - } + private class WorkerSinkConnectorContext extends WorkerConnectorContext implements SinkConnectorContext { } - /** - * An internal SourceConnectorContext that delegates to worker connector. - */ - private static class DelegateSourceConnectorContext implements SourceConnectorContext { + private class WorkerSourceConnectorContext extends WorkerConnectorContext implements SourceConnectorContext { - private final ConnectorContext delegateCtx; private final OffsetStorageReader offsetStorageReader; - DelegateSourceConnectorContext(final ConnectorContext delegateCtx, - final OffsetStorageReader offsetStorageReader) { - this.delegateCtx = delegateCtx; + WorkerSourceConnectorContext(final OffsetStorageReader offsetStorageReader) { this.offsetStorageReader = offsetStorageReader; } - @Override - public void requestTaskReconfiguration() { - delegateCtx.requestTaskReconfiguration(); - } - - @Override - public void raiseError(Exception e) { - delegateCtx.raiseError(e); - } - @Override public OffsetStorageReader offsetStorageReader() { return offsetStorageReader; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java index bdd49cb301160..53bc1eea09cbc 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java @@ -20,7 +20,12 @@ import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkConnectorContext; +import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.source.SourceConnectorContext; import org.apache.kafka.connect.storage.OffsetStorageReader; +import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; @@ -48,11 +53,14 @@ public class WorkerConnectorTest extends EasyMockSupport { static { CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName()); CONFIG.put(ConnectorConfig.NAME_CONFIG, CONNECTOR); + CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "my-topic"); } public ConnectorConfig connectorConfig; public MockConnectMetrics metrics; @Mock Plugins plugins; + @Mock SourceConnector sourceConnector; + @Mock SinkConnector sinkConnector; @Mock Connector connector; @Mock ConnectorContext ctx; @Mock ConnectorStatus.Listener listener; @@ -72,11 +80,12 @@ public void tearDown() { @Test public void testInitializeFailure() { RuntimeException exception = new RuntimeException(); + connector = sourceConnector; connector.version(); expectLastCall().andReturn(VERSION); - connector.initialize(EasyMock.notNull(ConnectorContext.class)); + connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); expectLastCall().andThrow(exception); listener.onFailure(CONNECTOR, exception); @@ -100,11 +109,12 @@ public void testInitializeFailure() { @Test public void testFailureIsFinalState() { RuntimeException exception = new RuntimeException(); + connector = sinkConnector; connector.version(); expectLastCall().andReturn(VERSION); - connector.initialize(EasyMock.notNull(ConnectorContext.class)); + connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); expectLastCall().andThrow(exception); listener.onFailure(CONNECTOR, exception); @@ -131,10 +141,11 @@ public void testFailureIsFinalState() { @Test public void testStartupAndShutdown() { + connector = sourceConnector; connector.version(); expectLastCall().andReturn(VERSION); - connector.initialize(EasyMock.notNull(ConnectorContext.class)); + connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); expectLastCall(); connector.start(CONFIG); @@ -154,7 +165,7 @@ public void testStartupAndShutdown() { WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); - assertInitializedMetric(workerConnector); + assertInitializedSourceMetric(workerConnector); workerConnector.transitionTo(TargetState.STARTED); assertRunningMetric(workerConnector); workerConnector.shutdown(); @@ -165,10 +176,11 @@ public void testStartupAndShutdown() { @Test public void testStartupAndPause() { + connector = sinkConnector; connector.version(); expectLastCall().andReturn(VERSION); - connector.initialize(EasyMock.notNull(ConnectorContext.class)); + connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); expectLastCall(); connector.start(CONFIG); @@ -191,7 +203,7 @@ public void testStartupAndPause() { WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); - assertInitializedMetric(workerConnector); + assertInitializedSinkMetric(workerConnector); workerConnector.transitionTo(TargetState.STARTED); assertRunningMetric(workerConnector); workerConnector.transitionTo(TargetState.PAUSED); @@ -204,10 +216,11 @@ public void testStartupAndPause() { @Test public void testOnResume() { + connector = sourceConnector; connector.version(); expectLastCall().andReturn(VERSION); - connector.initialize(EasyMock.notNull(ConnectorContext.class)); + connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); expectLastCall(); listener.onPause(CONNECTOR); @@ -230,7 +243,7 @@ public void testOnResume() { WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); - assertInitializedMetric(workerConnector); + assertInitializedSourceMetric(workerConnector); workerConnector.transitionTo(TargetState.PAUSED); assertPausedMetric(workerConnector); workerConnector.transitionTo(TargetState.STARTED); @@ -243,10 +256,11 @@ public void testOnResume() { @Test public void testStartupPaused() { + connector = sinkConnector; connector.version(); expectLastCall().andReturn(VERSION); - connector.initialize(EasyMock.notNull(ConnectorContext.class)); + connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); expectLastCall(); // connector never gets started @@ -262,7 +276,7 @@ public void testStartupPaused() { WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); - assertInitializedMetric(workerConnector); + assertInitializedSinkMetric(workerConnector); workerConnector.transitionTo(TargetState.PAUSED); assertPausedMetric(workerConnector); workerConnector.shutdown(); @@ -275,10 +289,11 @@ public void testStartupPaused() { public void testStartupFailure() { RuntimeException exception = new RuntimeException(); + connector = sinkConnector; connector.version(); expectLastCall().andReturn(VERSION); - connector.initialize(EasyMock.notNull(ConnectorContext.class)); + connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); expectLastCall(); connector.start(CONFIG); @@ -295,7 +310,7 @@ public void testStartupFailure() { WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); - assertInitializedMetric(workerConnector); + assertInitializedSinkMetric(workerConnector); workerConnector.transitionTo(TargetState.STARTED); assertFailedMetric(workerConnector); workerConnector.shutdown(); @@ -307,11 +322,12 @@ public void testStartupFailure() { @Test public void testShutdownFailure() { RuntimeException exception = new RuntimeException(); + connector = sourceConnector; connector.version(); expectLastCall().andReturn(VERSION); - connector.initialize(EasyMock.notNull(ConnectorContext.class)); + connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); expectLastCall(); connector.start(CONFIG); @@ -331,7 +347,7 @@ public void testShutdownFailure() { WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); - assertInitializedMetric(workerConnector); + assertInitializedSourceMetric(workerConnector); workerConnector.transitionTo(TargetState.STARTED); assertRunningMetric(workerConnector); workerConnector.shutdown(); @@ -342,10 +358,11 @@ public void testShutdownFailure() { @Test public void testTransitionStartedToStarted() { + connector = sourceConnector; connector.version(); expectLastCall().andReturn(VERSION); - connector.initialize(EasyMock.notNull(ConnectorContext.class)); + connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); expectLastCall(); connector.start(CONFIG); @@ -366,7 +383,7 @@ public void testTransitionStartedToStarted() { WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); - assertInitializedMetric(workerConnector); + assertInitializedSourceMetric(workerConnector); workerConnector.transitionTo(TargetState.STARTED); assertRunningMetric(workerConnector); workerConnector.transitionTo(TargetState.STARTED); @@ -379,10 +396,11 @@ public void testTransitionStartedToStarted() { @Test public void testTransitionPausedToPaused() { + connector = sourceConnector; connector.version(); expectLastCall().andReturn(VERSION); - connector.initialize(EasyMock.notNull(ConnectorContext.class)); + connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); expectLastCall(); connector.start(CONFIG); @@ -405,7 +423,7 @@ public void testTransitionPausedToPaused() { WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); - assertInitializedMetric(workerConnector); + assertInitializedSourceMetric(workerConnector); workerConnector.transitionTo(TargetState.STARTED); assertRunningMetric(workerConnector); workerConnector.transitionTo(TargetState.PAUSED); @@ -418,6 +436,48 @@ public void testTransitionPausedToPaused() { verifyAll(); } + @Test + public void testInitializeConnectorThatIsNeitherSourceNorSink() { + connector.version(); + expectLastCall().andReturn(VERSION); + + Capture captureContext = Capture.newInstance(); + connector.initialize(EasyMock.capture(captureContext)); + expectLastCall(); + + connector.start(CONFIG); + expectLastCall(); + + listener.onStartup(CONNECTOR); + expectLastCall(); + + connector.stop(); + expectLastCall(); + + listener.onShutdown(CONNECTOR); + expectLastCall(); + + replayAll(); + + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); + + workerConnector.initialize(connectorConfig); + assertInitializedMetric(workerConnector, "unknown"); + + assertTrue(captureContext.hasCaptured()); + assertNotNull(captureContext.getValue()); + assertFalse(captureContext.getValue() instanceof SinkConnectorContext); + assertFalse(captureContext.getValue() instanceof SourceConnectorContext); + assertTrue(captureContext.getValue() instanceof ConnectorContext); + + workerConnector.transitionTo(TargetState.STARTED); + assertRunningMetric(workerConnector); + workerConnector.shutdown(); + assertStoppedMetric(workerConnector); + + verifyAll(); + } + protected void assertFailedMetric(WorkerConnector workerConnector) { assertFalse(workerConnector.metrics().isUnassigned()); assertTrue(workerConnector.metrics().isFailed()); @@ -446,7 +506,15 @@ protected void assertStoppedMetric(WorkerConnector workerConnector) { assertFalse(workerConnector.metrics().isRunning()); } - protected void assertInitializedMetric(WorkerConnector workerConnector) { + protected void assertInitializedSinkMetric(WorkerConnector workerConnector) { + assertInitializedMetric(workerConnector, "sink"); + } + + protected void assertInitializedSourceMetric(WorkerConnector workerConnector) { + assertInitializedMetric(workerConnector, "source"); + } + + protected void assertInitializedMetric(WorkerConnector workerConnector, String expectedType) { assertTrue(workerConnector.metrics().isUnassigned()); assertFalse(workerConnector.metrics().isFailed()); assertFalse(workerConnector.metrics().isPaused()); @@ -456,12 +524,11 @@ protected void assertInitializedMetric(WorkerConnector workerConnector) { String type = metrics.currentMetricValueAsString(metricGroup, "connector-type"); String clazz = metrics.currentMetricValueAsString(metricGroup, "connector-class"); String version = metrics.currentMetricValueAsString(metricGroup, "connector-version"); - assertEquals(type, "unknown"); + assertEquals(expectedType, type); assertNotNull(clazz); assertEquals(VERSION, version); } private static abstract class TestConnector extends Connector { } - } From d8b4c2623db01c9450208be26811c2b1f5be2052 Mon Sep 17 00:00:00 2001 From: Randall Hauch Date: Thu, 21 May 2020 17:52:51 -0500 Subject: [PATCH 4/5] KAFKA-4794: Add unit tests for Connector, SinkConnector and SourceConnector default methods --- .../connect/connector/ConnectorTest.java | 198 ++++++++++++++++++ .../kafka/connect/sink/SinkConnectorTest.java | 134 ++++++++++++ .../connect/source/SourceConnectorTest.java | 140 +++++++++++++ 3 files changed, 472 insertions(+) create mode 100644 connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java create mode 100644 connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java create mode 100644 connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java diff --git a/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java new file mode 100644 index 0000000000000..b2fa54aebba25 --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.connector; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public class ConnectorTest { + + protected ConnectorContext context; + protected Connector connector; + protected AssertableConnector assertableConnector; + + @Before + public void beforeEach() { + connector = createConnector(); + context = createContext(); + assertableConnector = (AssertableConnector) connector; + } + + @Test + public void shouldInitializeContext() { + connector.initialize(context); + assertableConnector.assertInitialized(); + assertableConnector.assertContext(context); + assertableConnector.assertTaskConfigs(null); + } + + @Test + public void shouldInitializeContextWithTaskConfigs() { + List> taskConfigs = new ArrayList<>(); + connector.initialize(context, taskConfigs); + assertableConnector.assertInitialized(); + assertableConnector.assertContext(context); + assertableConnector.assertTaskConfigs(taskConfigs); + } + + @Test + public void shouldStopAndStartWhenReconfigure() { + Map props = new HashMap<>(); + connector.initialize(context); + assertableConnector.assertContext(context); + assertableConnector.assertStarted(false); + assertableConnector.assertStopped(false); + connector.reconfigure(props); + assertableConnector.assertStarted(true); + assertableConnector.assertStopped(true); + assertableConnector.assertProperties(props); + } + + protected ConnectorContext createContext() { + return new TestConnectorContext(); + } + + protected Connector createConnector() { + return new TestConnector(); + } + + public static class TestConnectorContext implements ConnectorContext { + + @Override + public void requestTaskReconfiguration() { + } + + @Override + public void raiseError(Exception e) { + } + } + + public interface AssertableConnector { + + void assertContext(ConnectorContext expected); + + void assertInitialized(); + + void assertTaskConfigs(List> expectedTaskConfigs); + + void assertStarted(boolean expected); + + void assertStopped(boolean expected); + + void assertProperties(Map expected); + + } + + public static class TestConnector extends Connector implements AssertableConnector { + + public static final String VERSION = "an entirely different version"; + + private boolean initialized; + private List> taskConfigs; + private Map props; + private boolean started; + private boolean stopped; + + @Override + public String version() { + return VERSION; + } + + @Override + public void initialize(ConnectorContext ctx) { + super.initialize(ctx); + initialized = true; + this.taskConfigs = null; + } + + @Override + public void initialize(ConnectorContext ctx, List> taskConfigs) { + super.initialize(ctx, taskConfigs); + initialized = true; + this.taskConfigs = taskConfigs; + } + + @Override + public void start(Map props) { + this.props = props; + started = true; + } + + @Override + public Class taskClass() { + return null; + } + + @Override + public List> taskConfigs(int maxTasks) { + return null; + } + + @Override + public void stop() { + stopped = true; + } + + @Override + public ConfigDef config() { + return new ConfigDef() + .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs") + .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs"); + } + + @Override + public void assertContext(ConnectorContext expected) { + assertSame(expected, context); + assertSame(expected, context()); + } + + @Override + public void assertInitialized() { + assertTrue(initialized); + } + + @Override + public void assertTaskConfigs(List> expectedTaskConfigs) { + assertSame(expectedTaskConfigs, taskConfigs); + } + + @Override + public void assertStarted(boolean expected) { + assertEquals(expected, started); + } + + @Override + public void assertStopped(boolean expected) { + assertEquals(expected, stopped); + } + + @Override + public void assertProperties(Map expected) { + assertSame(expected, props); + } + } +} \ No newline at end of file diff --git a/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java new file mode 100644 index 0000000000000..57c4766bf8b2c --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.sink; + +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.connector.ConnectorTest; +import org.apache.kafka.connect.connector.Task; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public class SinkConnectorTest extends ConnectorTest { + + @Override + protected ConnectorContext createContext() { + return new TestSinkConnectorContext(); + } + + @Override + protected SinkConnector createConnector() { + return new TestSinkConnector(); + } + + private static class TestSinkConnectorContext extends TestConnectorContext implements SinkConnectorContext { + } + + private static class TestSinkConnector extends SinkConnector implements AssertableConnector { + + public static final String VERSION = "an entirely different version"; + + private boolean initialized; + private List> taskConfigs; + private Map props; + private boolean started; + private boolean stopped; + + @Override + public String version() { + return VERSION; + } + + @Override + public void initialize(ConnectorContext ctx) { + super.initialize(ctx); + initialized = true; + this.taskConfigs = null; + } + + @Override + public void initialize(ConnectorContext ctx, List> taskConfigs) { + super.initialize(ctx, taskConfigs); + initialized = true; + this.taskConfigs = taskConfigs; + } + + @Override + public void start(Map props) { + this.props = props; + started = true; + } + + @Override + public Class taskClass() { + return null; + } + + @Override + public List> taskConfigs(int maxTasks) { + return null; + } + + @Override + public void stop() { + stopped = true; + } + + @Override + public ConfigDef config() { + return new ConfigDef() + .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs") + .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs"); + } + + @Override + public void assertContext(ConnectorContext expected) { + assertSame(expected, context); + assertSame(expected, context()); + } + + @Override + public void assertInitialized() { + assertTrue(initialized); + } + + @Override + public void assertTaskConfigs(List> expectedTaskConfigs) { + assertSame(expectedTaskConfigs, taskConfigs); + } + + @Override + public void assertStarted(boolean expected) { + assertEquals(expected, started); + } + + @Override + public void assertStopped(boolean expected) { + assertEquals(expected, stopped); + } + + @Override + public void assertProperties(Map expected) { + assertSame(expected, props); + } + } +} \ No newline at end of file diff --git a/connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java new file mode 100644 index 0000000000000..2798a794e6df4 --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.source; + +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.connector.ConnectorTest; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.storage.OffsetStorageReader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public class SourceConnectorTest extends ConnectorTest { + + @Override + protected ConnectorContext createContext() { + return new TestSourceConnectorContext(); + } + + @Override + protected SourceConnector createConnector() { + return new TestSourceConnector(); + } + + private static class TestSourceConnectorContext extends TestConnectorContext implements SourceConnectorContext { + + @Override + public OffsetStorageReader offsetStorageReader() { + return null; + } + } + + private static class TestSourceConnector extends SourceConnector implements AssertableConnector { + + public static final String VERSION = "an entirely different version"; + + private boolean initialized; + private List> taskConfigs; + private Map props; + private boolean started; + private boolean stopped; + + @Override + public String version() { + return VERSION; + } + + @Override + public void initialize(ConnectorContext ctx) { + super.initialize(ctx); + initialized = true; + this.taskConfigs = null; + } + + @Override + public void initialize(ConnectorContext ctx, List> taskConfigs) { + super.initialize(ctx, taskConfigs); + initialized = true; + this.taskConfigs = taskConfigs; + } + + @Override + public void start(Map props) { + this.props = props; + started = true; + } + + @Override + public Class taskClass() { + return null; + } + + @Override + public List> taskConfigs(int maxTasks) { + return null; + } + + @Override + public void stop() { + stopped = true; + } + + @Override + public ConfigDef config() { + return new ConfigDef() + .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs") + .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs"); + } + + @Override + public void assertContext(ConnectorContext expected) { + assertSame(expected, context); + assertSame(expected, context()); + } + + @Override + public void assertInitialized() { + assertTrue(initialized); + } + + @Override + public void assertTaskConfigs(List> expectedTaskConfigs) { + assertSame(expectedTaskConfigs, taskConfigs); + } + + @Override + public void assertStarted(boolean expected) { + assertEquals(expected, started); + } + + @Override + public void assertStopped(boolean expected) { + assertEquals(expected, stopped); + } + + @Override + public void assertProperties(Map expected) { + assertSame(expected, props); + } + } +} \ No newline at end of file From 7784286397f1d8e23c7a18d67f1b62a720e222ab Mon Sep 17 00:00:00 2001 From: Randall Hauch Date: Sun, 24 May 2020 11:32:19 -0500 Subject: [PATCH 5/5] KAFKA-4794: Removed ambiguous case of connector not implementing source or sink, and incorporated other feedback --- .../kafka/connect/sink/SinkConnector.java | 3 - .../kafka/connect/source/SourceConnector.java | 3 - .../connect/connector/ConnectorTest.java | 116 +----------------- .../kafka/connect/sink/SinkConnectorTest.java | 20 ++- .../connect/source/SourceConnectorTest.java | 16 ++- .../connect/runtime/WorkerConnector.java | 28 ++--- .../connect/runtime/WorkerConnectorTest.java | 34 ++--- .../kafka/connect/runtime/WorkerTest.java | 68 +++++----- 8 files changed, 88 insertions(+), 200 deletions(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java index 511bab363956b..9627571482bc4 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java @@ -34,9 +34,6 @@ public abstract class SinkConnector extends Connector { */ public static final String TOPICS_CONFIG = "topics"; - /** - * {@inheritDoc} - */ @Override protected SinkConnectorContext context() { return (SinkConnectorContext) context; diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java index d9cdc22891adb..6e9694024d334 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java @@ -24,9 +24,6 @@ */ public abstract class SourceConnector extends Connector { - /** - * {@inheritDoc} - */ @Override protected SourceConnectorContext context() { return (SourceConnectorContext) context; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java index b2fa54aebba25..7addf8f0f048e 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java @@ -21,15 +21,10 @@ import java.util.List; import java.util.Map; -import org.apache.kafka.common.config.ConfigDef; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; - -public class ConnectorTest { +public abstract class ConnectorTest { protected ConnectorContext context; protected Connector connector; @@ -72,24 +67,9 @@ public void shouldStopAndStartWhenReconfigure() { assertableConnector.assertProperties(props); } - protected ConnectorContext createContext() { - return new TestConnectorContext(); - } - - protected Connector createConnector() { - return new TestConnector(); - } - - public static class TestConnectorContext implements ConnectorContext { + protected abstract ConnectorContext createContext(); - @Override - public void requestTaskReconfiguration() { - } - - @Override - public void raiseError(Exception e) { - } - } + protected abstract Connector createConnector(); public interface AssertableConnector { @@ -104,95 +84,5 @@ public interface AssertableConnector { void assertStopped(boolean expected); void assertProperties(Map expected); - - } - - public static class TestConnector extends Connector implements AssertableConnector { - - public static final String VERSION = "an entirely different version"; - - private boolean initialized; - private List> taskConfigs; - private Map props; - private boolean started; - private boolean stopped; - - @Override - public String version() { - return VERSION; - } - - @Override - public void initialize(ConnectorContext ctx) { - super.initialize(ctx); - initialized = true; - this.taskConfigs = null; - } - - @Override - public void initialize(ConnectorContext ctx, List> taskConfigs) { - super.initialize(ctx, taskConfigs); - initialized = true; - this.taskConfigs = taskConfigs; - } - - @Override - public void start(Map props) { - this.props = props; - started = true; - } - - @Override - public Class taskClass() { - return null; - } - - @Override - public List> taskConfigs(int maxTasks) { - return null; - } - - @Override - public void stop() { - stopped = true; - } - - @Override - public ConfigDef config() { - return new ConfigDef() - .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs") - .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs"); - } - - @Override - public void assertContext(ConnectorContext expected) { - assertSame(expected, context); - assertSame(expected, context()); - } - - @Override - public void assertInitialized() { - assertTrue(initialized); - } - - @Override - public void assertTaskConfigs(List> expectedTaskConfigs) { - assertSame(expectedTaskConfigs, taskConfigs); - } - - @Override - public void assertStarted(boolean expected) { - assertEquals(expected, started); - } - - @Override - public void assertStopped(boolean expected) { - assertEquals(expected, stopped); - } - - @Override - public void assertProperties(Map expected) { - assertSame(expected, props); - } } } \ No newline at end of file diff --git a/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java index 57c4766bf8b2c..a4924ae3bbff6 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java @@ -31,19 +31,31 @@ public class SinkConnectorTest extends ConnectorTest { @Override - protected ConnectorContext createContext() { + protected TestSinkConnectorContext createContext() { return new TestSinkConnectorContext(); } @Override - protected SinkConnector createConnector() { + protected TestSinkConnector createConnector() { return new TestSinkConnector(); } - private static class TestSinkConnectorContext extends TestConnectorContext implements SinkConnectorContext { + private static class TestSinkConnectorContext implements SinkConnectorContext { + + @Override + public void requestTaskReconfiguration() { + // Unexpected in these tests + throw new UnsupportedOperationException(); + } + + @Override + public void raiseError(Exception e) { + // Unexpected in these tests + throw new UnsupportedOperationException(); + } } - private static class TestSinkConnector extends SinkConnector implements AssertableConnector { + protected static class TestSinkConnector extends SinkConnector implements ConnectorTest.AssertableConnector { public static final String VERSION = "an entirely different version"; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java index 2798a794e6df4..9b105409fffba 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java @@ -37,11 +37,23 @@ protected ConnectorContext createContext() { } @Override - protected SourceConnector createConnector() { + protected TestSourceConnector createConnector() { return new TestSourceConnector(); } - private static class TestSourceConnectorContext extends TestConnectorContext implements SourceConnectorContext { + private static class TestSourceConnectorContext implements SourceConnectorContext { + + @Override + public void requestTaskReconfiguration() { + // Unexpected in these tests + throw new UnsupportedOperationException(); + } + + @Override + public void raiseError(Exception e) { + // Unexpected in these tests + throw new UnsupportedOperationException(); + } @Override public OffsetStorageReader offsetStorageReader() { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java index 4cc11fc765557..ead9d712a5c88 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java @@ -18,6 +18,7 @@ import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.source.SourceConnector; @@ -63,12 +64,12 @@ private enum State { private State state; private final OffsetStorageReader offsetStorageReader; - public WorkerConnector(final String connName, - final Connector connector, - final ConnectorContext ctx, - final ConnectMetrics metrics, - final ConnectorStatus.Listener statusListener, - final OffsetStorageReader offsetStorageReader) { + public WorkerConnector(String connName, + Connector connector, + ConnectorContext ctx, + ConnectMetrics metrics, + ConnectorStatus.Listener statusListener, + OffsetStorageReader offsetStorageReader) { this.connName = connName; this.ctx = ctx; this.connector = connector; @@ -80,19 +81,16 @@ public WorkerConnector(final String connName, public void initialize(ConnectorConfig connectorConfig) { try { + if (!isSourceConnector() && !isSinkConnector()) { + throw new ConnectException("Connector implementations must be a subclass of either SourceConnector or SinkConnector"); + } this.config = connectorConfig.originalsStrings(); log.debug("{} Initializing connector {}", this, connName); - if (isSinkConnector()) { - SinkConnectorConfig.validate(config); - } - if (isSinkConnector()) { SinkConnectorConfig.validate(config); connector.initialize(new WorkerSinkConnectorContext()); - } else if (isSourceConnector()) { - connector.initialize(new WorkerSourceConnectorContext(offsetStorageReader)); } else { - connector.initialize(new WorkerConnectorContext()); + connector.initialize(new WorkerSourceConnectorContext(offsetStorageReader)); } } catch (Throwable t) { log.error("{} Error initializing connector", this, t); @@ -321,7 +319,7 @@ protected MetricGroup metricGroup() { } } - private class WorkerConnectorContext implements ConnectorContext { + private abstract class WorkerConnectorContext implements ConnectorContext { @Override public void requestTaskReconfiguration() { @@ -343,7 +341,7 @@ private class WorkerSourceConnectorContext extends WorkerConnectorContext implem private final OffsetStorageReader offsetStorageReader; - WorkerSourceConnectorContext(final OffsetStorageReader offsetStorageReader) { + WorkerSourceConnectorContext(OffsetStorageReader offsetStorageReader) { this.offsetStorageReader = offsetStorageReader; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java index 53bc1eea09cbc..088900b4c5a22 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.sink.SinkConnector; @@ -437,24 +438,12 @@ public void testTransitionPausedToPaused() { } @Test - public void testInitializeConnectorThatIsNeitherSourceNorSink() { + public void testFailConnectorThatIsNeitherSourceNorSink() { connector.version(); expectLastCall().andReturn(VERSION); - Capture captureContext = Capture.newInstance(); - connector.initialize(EasyMock.capture(captureContext)); - expectLastCall(); - - connector.start(CONFIG); - expectLastCall(); - - listener.onStartup(CONNECTOR); - expectLastCall(); - - connector.stop(); - expectLastCall(); - - listener.onShutdown(CONNECTOR); + Capture exceptionCapture = Capture.newInstance(); + listener.onFailure(EasyMock.eq(CONNECTOR), EasyMock.capture(exceptionCapture)); expectLastCall(); replayAll(); @@ -462,18 +451,9 @@ public void testInitializeConnectorThatIsNeitherSourceNorSink() { WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader); workerConnector.initialize(connectorConfig); - assertInitializedMetric(workerConnector, "unknown"); - - assertTrue(captureContext.hasCaptured()); - assertNotNull(captureContext.getValue()); - assertFalse(captureContext.getValue() instanceof SinkConnectorContext); - assertFalse(captureContext.getValue() instanceof SourceConnectorContext); - assertTrue(captureContext.getValue() instanceof ConnectorContext); - - workerConnector.transitionTo(TargetState.STARTED); - assertRunningMetric(workerConnector); - workerConnector.shutdown(); - assertStoppedMetric(workerConnector); + Throwable e = exceptionCapture.getValue(); + assertTrue(e instanceof ConnectException); + assertTrue(e.getMessage().contains("must be a subclass of")); verifyAll(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 1e8c0bccaffd3..b3fb6758f57f6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.config.provider.MockFileConfigProvider; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; @@ -48,7 +47,9 @@ import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.storage.Converter; @@ -125,7 +126,8 @@ public class WorkerTest extends ThreadedTest { @Mock private Herder herder; @Mock private StatusBackingStore statusBackingStore; - @Mock private Connector connector; + @Mock private SourceConnector sourceConnector; + @Mock private SinkConnector sinkConnector; @Mock private ConnectorContext ctx; @Mock private TestSourceTask task; @Mock private WorkerSourceTask workerTask; @@ -185,8 +187,8 @@ public void testStartAndStopConnector() { // Create EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName())) - .andReturn(connector); - EasyMock.expect(connector.version()).andReturn("1.0"); + .andReturn(sourceConnector); + EasyMock.expect(sourceConnector.version()).andReturn("1.0"); Map props = new HashMap<>(); props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); @@ -194,16 +196,16 @@ public void testStartAndStopConnector() { props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName()); - EasyMock.expect(connector.version()).andReturn("1.0"); + EasyMock.expect(sourceConnector.version()).andReturn("1.0"); expectFileConfigProvider(); - EasyMock.expect(plugins.compareAndSwapLoaders(connector)) + EasyMock.expect(plugins.compareAndSwapLoaders(sourceConnector)) .andReturn(delegatingLoader) .times(2); - connector.initialize(anyObject(ConnectorContext.class)); + sourceConnector.initialize(anyObject(ConnectorContext.class)); EasyMock.expectLastCall(); - connector.start(props); + sourceConnector.start(props); EasyMock.expectLastCall(); EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) @@ -213,7 +215,7 @@ public void testStartAndStopConnector() { EasyMock.expectLastCall(); // Remove - connector.stop(); + sourceConnector.stop(); EasyMock.expectLastCall(); connectorStatusListener.onShutdown(CONNECTOR_ID); @@ -313,8 +315,8 @@ public void testAddConnectorByAlias() { expectFileConfigProvider(); EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); - EasyMock.expect(plugins.newConnector("WorkerTestConnector")).andReturn(connector); - EasyMock.expect(connector.version()).andReturn("1.0"); + EasyMock.expect(plugins.newConnector("WorkerTestConnector")).andReturn(sinkConnector); + EasyMock.expect(sinkConnector.version()).andReturn("1.0"); Map props = new HashMap<>(); props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); @@ -322,13 +324,13 @@ public void testAddConnectorByAlias() { props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTestConnector"); - EasyMock.expect(connector.version()).andReturn("1.0"); - EasyMock.expect(plugins.compareAndSwapLoaders(connector)) + EasyMock.expect(sinkConnector.version()).andReturn("1.0"); + EasyMock.expect(plugins.compareAndSwapLoaders(sinkConnector)) .andReturn(delegatingLoader) .times(2); - connector.initialize(anyObject(ConnectorContext.class)); + sinkConnector.initialize(anyObject(ConnectorContext.class)); EasyMock.expectLastCall(); - connector.start(props); + sinkConnector.start(props); EasyMock.expectLastCall(); EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) @@ -339,7 +341,7 @@ public void testAddConnectorByAlias() { EasyMock.expectLastCall(); // Remove - connector.stop(); + sinkConnector.stop(); EasyMock.expectLastCall(); connectorStatusListener.onShutdown(CONNECTOR_ID); @@ -379,8 +381,8 @@ public void testAddConnectorByShortAlias() { expectFileConfigProvider(); EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); - EasyMock.expect(plugins.newConnector("WorkerTest")).andReturn(connector); - EasyMock.expect(connector.version()).andReturn("1.0"); + EasyMock.expect(plugins.newConnector("WorkerTest")).andReturn(sinkConnector); + EasyMock.expect(sinkConnector.version()).andReturn("1.0"); Map props = new HashMap<>(); props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); @@ -388,13 +390,13 @@ public void testAddConnectorByShortAlias() { props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTest"); - EasyMock.expect(connector.version()).andReturn("1.0"); - EasyMock.expect(plugins.compareAndSwapLoaders(connector)) + EasyMock.expect(sinkConnector.version()).andReturn("1.0"); + EasyMock.expect(plugins.compareAndSwapLoaders(sinkConnector)) .andReturn(delegatingLoader) .times(2); - connector.initialize(anyObject(ConnectorContext.class)); + sinkConnector.initialize(anyObject(ConnectorContext.class)); EasyMock.expectLastCall(); - connector.start(props); + sinkConnector.start(props); EasyMock.expectLastCall(); EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) @@ -405,7 +407,7 @@ public void testAddConnectorByShortAlias() { EasyMock.expectLastCall(); // Remove - connector.stop(); + sinkConnector.stop(); EasyMock.expectLastCall(); connectorStatusListener.onShutdown(CONNECTOR_ID); @@ -460,8 +462,8 @@ public void testReconfigureConnectorTasks() { EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(3); EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName())) - .andReturn(connector); - EasyMock.expect(connector.version()).andReturn("1.0"); + .andReturn(sinkConnector); + EasyMock.expect(sinkConnector.version()).andReturn("1.0"); Map props = new HashMap<>(); props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); @@ -469,13 +471,13 @@ public void testReconfigureConnectorTasks() { props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName()); - EasyMock.expect(connector.version()).andReturn("1.0"); - EasyMock.expect(plugins.compareAndSwapLoaders(connector)) + EasyMock.expect(sinkConnector.version()).andReturn("1.0"); + EasyMock.expect(plugins.compareAndSwapLoaders(sinkConnector)) .andReturn(delegatingLoader) .times(3); - connector.initialize(anyObject(ConnectorContext.class)); + sinkConnector.initialize(anyObject(ConnectorContext.class)); EasyMock.expectLastCall(); - connector.start(props); + sinkConnector.start(props); EasyMock.expectLastCall(); EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) @@ -486,13 +488,13 @@ public void testReconfigureConnectorTasks() { EasyMock.expectLastCall(); // Reconfigure - EasyMock.>expect(connector.taskClass()).andReturn(TestSourceTask.class); + EasyMock.>expect(sinkConnector.taskClass()).andReturn(TestSourceTask.class); Map taskProps = new HashMap<>(); taskProps.put("foo", "bar"); - EasyMock.expect(connector.taskConfigs(2)).andReturn(Arrays.asList(taskProps, taskProps)); + EasyMock.expect(sinkConnector.taskConfigs(2)).andReturn(Arrays.asList(taskProps, taskProps)); // Remove - connector.stop(); + sinkConnector.stop(); EasyMock.expectLastCall(); connectorStatusListener.onShutdown(CONNECTOR_ID); @@ -1359,7 +1361,7 @@ private Map anyConnectorConfigMap() { } /* Name here needs to be unique as we are testing the aliasing mechanism */ - public static class WorkerTestConnector extends Connector { + public static class WorkerTestConnector extends SourceConnector { private static final ConfigDef CONFIG_DEF = new ConfigDef() .define("configName", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Test configName.");