From 987eea5ebbf10e5beacb3eddfee8d5903d2fa857 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Tue, 15 Oct 2019 15:27:26 -0700 Subject: [PATCH 1/2] KAFKA-8945/KAFKA-8947: Fix bugs in Connect REST extension API (#7392) Fix bug in Connect REST extension API caused by invalid constructor parameter validation, and update integration test to play nicely with Jenkins Fix instantiation of TaskState objects by Connect framework. Author: Chris Egerton Reviewers: Magesh Nandakumar , Randall Hauch --- .../kafka/connect/health/AbstractState.java | 23 +++- .../kafka/connect/health/ConnectorHealth.java | 29 ++++- .../kafka/connect/health/ConnectorState.java | 9 ++ .../kafka/connect/health/TaskState.java | 22 ++-- .../health/ConnectClusterStateImpl.java | 2 +- .../RestExtensionIntegrationTest.java | 108 ++++++++++++++++-- 6 files changed, 173 insertions(+), 20 deletions(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java index a18c46334cf29..f707b3c3026ed 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java @@ -17,6 +17,8 @@ package org.apache.kafka.connect.health; +import java.util.Objects; + /** * Provides the current status along with identifier for Connect worker and tasks. */ @@ -34,10 +36,10 @@ public abstract class AbstractState { * @param traceMessage any error trace message associated with the connector or the task; may be null or empty */ public AbstractState(String state, String workerId, String traceMessage) { - if (state != null && !state.trim().isEmpty()) { + if (state == null || state.trim().isEmpty()) { throw new IllegalArgumentException("State must not be null or empty"); } - if (workerId != null && !workerId.trim().isEmpty()) { + if (workerId == null || workerId.trim().isEmpty()) { throw new IllegalArgumentException("Worker ID must not be null or empty"); } this.state = state; @@ -71,4 +73,21 @@ public String workerId() { public String traceMessage() { return traceMessage; } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + AbstractState that = (AbstractState) o; + return state.equals(that.state) + && Objects.equals(traceMessage, that.traceMessage) + && workerId.equals(that.workerId); + } + + @Override + public int hashCode() { + return Objects.hash(state, traceMessage, workerId); + } } diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java index 3a9efd15372ed..12fa6b76aff1e 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java @@ -35,7 +35,7 @@ public ConnectorHealth(String name, ConnectorState connectorState, Map tasks, ConnectorType type) { - if (name != null && !name.trim().isEmpty()) { + if (name == null || name.trim().isEmpty()) { throw new IllegalArgumentException("Connector name is required"); } Objects.requireNonNull(connectorState, "connectorState can't be null"); @@ -83,4 +83,31 @@ public ConnectorType type() { return type; } + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + ConnectorHealth that = (ConnectorHealth) o; + return name.equals(that.name) + && connectorState.equals(that.connectorState) + && tasks.equals(that.tasks) + && type == that.type; + } + + @Override + public int hashCode() { + return Objects.hash(name, connectorState, tasks, type); + } + + @Override + public String toString() { + return "ConnectorHealth{" + + "name='" + name + '\'' + + ", connectorState=" + connectorState + + ", tasks=" + tasks + + ", type=" + type + + '}'; + } } diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java index d5571bc4ff5d0..63044265bb999 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java @@ -32,4 +32,13 @@ public class ConnectorState extends AbstractState { public ConnectorState(String state, String workerId, String traceMessage) { super(state, workerId, traceMessage); } + + @Override + public String toString() { + return "ConnectorState{" + + "state='" + state() + '\'' + + ", traceMessage='" + traceMessage() + '\'' + + ", workerId='" + workerId() + '\'' + + '}'; + } } diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java index 1c1be159970d9..ae78a5f3af990 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java @@ -50,20 +50,28 @@ public int taskId() { @Override public boolean equals(Object o) { - if (this == o) { + if (this == o) return true; - } - if (o == null || getClass() != o.getClass()) { + if (o == null || getClass() != o.getClass()) + return false; + if (!super.equals(o)) return false; - } - TaskState taskState = (TaskState) o; - return taskId == taskState.taskId; } @Override public int hashCode() { - return Objects.hash(taskId); + return Objects.hash(super.hashCode(), taskId); + } + + @Override + public String toString() { + return "TaskState{" + + "taskId='" + taskId + '\'' + + "state='" + state() + '\'' + + ", traceMessage='" + traceMessage() + '\'' + + ", workerId='" + workerId() + '\'' + + '}'; } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java index e3a4833681b4a..43842127eac07 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java @@ -82,7 +82,7 @@ private Map taskStates(List st for (ConnectorStateInfo.TaskState state : states) { taskStates.put( state.id(), - new TaskState(state.id(), state.workerId(), state.state(), state.trace()) + new TaskState(state.id(), state.state(), state.workerId(), state.trace()) ); } return taskStates; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java index d4cac3976f8be..087246b917767 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java @@ -16,10 +16,16 @@ */ package org.apache.kafka.connect.integration; +import org.apache.kafka.connect.health.ConnectClusterState; +import org.apache.kafka.connect.health.ConnectorHealth; +import org.apache.kafka.connect.health.ConnectorState; +import org.apache.kafka.connect.health.ConnectorType; +import org.apache.kafka.connect.health.TaskState; import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.rest.ConnectRestExtensionContext; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.WorkerHandle; import org.apache.kafka.test.IntegrationTest; import org.junit.After; import org.junit.Test; @@ -28,12 +34,18 @@ import javax.ws.rs.GET; import javax.ws.rs.Path; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.REST_EXTENSION_CLASSES_CONFIG; import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.Assert.assertEquals; /** * A simple integration test to ensure that REST extensions are registered correctly. @@ -41,39 +53,86 @@ @Category(IntegrationTest.class) public class RestExtensionIntegrationTest { - private static final int NUM_WORKERS = 3; private static final long REST_EXTENSION_REGISTRATION_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1); + private static final long CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1); private EmbeddedConnectCluster connect; @Test - public void testImmediateRequestForListOfConnectors() throws IOException, InterruptedException { + public void testRestExtensionApi() throws IOException, InterruptedException { // setup Connect worker properties Map workerProps = new HashMap<>(); workerProps.put(REST_EXTENSION_CLASSES_CONFIG, IntegrationTestRestExtension.class.getName()); - + // build a Connect cluster backed by Kafka and Zk connect = new EmbeddedConnectCluster.Builder() - .name("connect-cluster") - .numWorkers(NUM_WORKERS) - .numBrokers(1) - .workerProps(workerProps) - .build(); - + .name("connect-cluster") + .numWorkers(1) + .numBrokers(1) + .workerProps(workerProps) + .build(); + // start the clusters connect.start(); + WorkerHandle worker = connect.workers().stream() + .findFirst() + .orElseThrow(() -> new AssertionError("At least one worker handle should be available")); + waitForCondition( this::extensionIsRegistered, REST_EXTENSION_REGISTRATION_TIMEOUT_MS, "REST extension was never registered" ); + + ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle("test-conn"); + try { + // setup up props for the connector + Map connectorProps = new HashMap<>(); + connectorProps.put(CONNECTOR_CLASS_CONFIG, MonitorableSinkConnector.class.getSimpleName()); + connectorProps.put(TASKS_MAX_CONFIG, String.valueOf(1)); + connectorProps.put(TOPICS_CONFIG, "test-topic"); + + // start a connector + connectorHandle.taskHandle(connectorHandle.name() + "-0"); + StartAndStopLatch connectorStartLatch = connectorHandle.expectedStarts(1); + connect.configureConnector(connectorHandle.name(), connectorProps); + connectorStartLatch.await(CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + String workerId = String.format("%s:%d", worker.url().getHost(), worker.url().getPort()); + ConnectorHealth expectedHealth = new ConnectorHealth( + connectorHandle.name(), + new ConnectorState( + "RUNNING", + workerId, + null + ), + Collections.singletonMap( + 0, + new TaskState(0, "RUNNING", workerId, null) + ), + ConnectorType.SINK + ); + + connectorProps.put(NAME_CONFIG, connectorHandle.name()); + + // Test the REST extension API; specifically, that the connector's health and configuration + // are available to the REST extension we registered and that they contain expected values + waitForCondition( + () -> verifyConnectorHealthAndConfig(connectorHandle.name(), expectedHealth, connectorProps), + CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS, + "Connector health and/or config was never accessible by the REST extension" + ); + } finally { + RuntimeHandles.get().deleteConnector(connectorHandle.name()); + } } @After public void close() { // stop all Connect, Kafka and Zk threads. connect.stop(); + IntegrationTestRestExtension.instance = null; } private boolean extensionIsRegistered() { @@ -85,11 +144,42 @@ private boolean extensionIsRegistered() { } } + private boolean verifyConnectorHealthAndConfig( + String connectorName, + ConnectorHealth expectedHealth, + Map expectedConfig + ) { + ConnectClusterState clusterState = + IntegrationTestRestExtension.instance.restPluginContext.clusterState(); + + ConnectorHealth actualHealth = clusterState.connectorHealth(connectorName); + if (actualHealth.tasksState().isEmpty()) { + // Happens if the task has been started but its status has not yet been picked up from + // the status topic by the worker. + return false; + } + Map actualConfig = clusterState.connectorConfig(connectorName); + + assertEquals(expectedConfig, actualConfig); + assertEquals(expectedHealth, actualHealth); + + return true; + } + public static class IntegrationTestRestExtension implements ConnectRestExtension { + private static IntegrationTestRestExtension instance; + + public ConnectRestExtensionContext restPluginContext; @Override public void register(ConnectRestExtensionContext restPluginContext) { + instance = this; + this.restPluginContext = restPluginContext; + // Immediately request a list of connectors to confirm that the context and its fields + // has been fully initialized and there is no risk of deadlock restPluginContext.clusterState().connectors(); + // Install a new REST resource that can be used to confirm that the extension has been + // successfully registered restPluginContext.configurable().register(new IntegrationTestRestExtensionResource()); } From 705dd3d24c6cae0b1fc269de28fca07f266ed035 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Tue, 15 Oct 2019 16:19:33 -0700 Subject: [PATCH 2/2] KAFKA-8945/KAFKA-8947: Adjust REST extension integration test to be be compatible with pre-2.3 versions --- .../integration/RestExtensionIntegrationTest.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java index 087246b917767..a3e814763751d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java @@ -119,7 +119,7 @@ public void testRestExtensionApi() throws IOException, InterruptedException { // Test the REST extension API; specifically, that the connector's health and configuration // are available to the REST extension we registered and that they contain expected values waitForCondition( - () -> verifyConnectorHealthAndConfig(connectorHandle.name(), expectedHealth, connectorProps), + () -> verifyConnectorHealth(connectorHandle.name(), expectedHealth), CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS, "Connector health and/or config was never accessible by the REST extension" ); @@ -144,10 +144,9 @@ private boolean extensionIsRegistered() { } } - private boolean verifyConnectorHealthAndConfig( + private boolean verifyConnectorHealth( String connectorName, - ConnectorHealth expectedHealth, - Map expectedConfig + ConnectorHealth expectedHealth ) { ConnectClusterState clusterState = IntegrationTestRestExtension.instance.restPluginContext.clusterState(); @@ -158,9 +157,6 @@ private boolean verifyConnectorHealthAndConfig( // the status topic by the worker. return false; } - Map actualConfig = clusterState.connectorConfig(connectorName); - - assertEquals(expectedConfig, actualConfig); assertEquals(expectedHealth, actualHealth); return true;