diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java index a18c46334cf29..f707b3c3026ed 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java @@ -17,6 +17,8 @@ package org.apache.kafka.connect.health; +import java.util.Objects; + /** * Provides the current status along with identifier for Connect worker and tasks. */ @@ -34,10 +36,10 @@ public abstract class AbstractState { * @param traceMessage any error trace message associated with the connector or the task; may be null or empty */ public AbstractState(String state, String workerId, String traceMessage) { - if (state != null && !state.trim().isEmpty()) { + if (state == null || state.trim().isEmpty()) { throw new IllegalArgumentException("State must not be null or empty"); } - if (workerId != null && !workerId.trim().isEmpty()) { + if (workerId == null || workerId.trim().isEmpty()) { throw new IllegalArgumentException("Worker ID must not be null or empty"); } this.state = state; @@ -71,4 +73,21 @@ public String workerId() { public String traceMessage() { return traceMessage; } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + AbstractState that = (AbstractState) o; + return state.equals(that.state) + && Objects.equals(traceMessage, that.traceMessage) + && workerId.equals(that.workerId); + } + + @Override + public int hashCode() { + return Objects.hash(state, traceMessage, workerId); + } } diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java index 3a9efd15372ed..12fa6b76aff1e 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java @@ -35,7 +35,7 @@ public ConnectorHealth(String name, ConnectorState connectorState, Map tasks, ConnectorType type) { - if (name != null && !name.trim().isEmpty()) { + if (name == null || name.trim().isEmpty()) { throw new IllegalArgumentException("Connector name is required"); } Objects.requireNonNull(connectorState, "connectorState can't be null"); @@ -83,4 +83,31 @@ public ConnectorType type() { return type; } + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + ConnectorHealth that = (ConnectorHealth) o; + return name.equals(that.name) + && connectorState.equals(that.connectorState) + && tasks.equals(that.tasks) + && type == that.type; + } + + @Override + public int hashCode() { + return Objects.hash(name, connectorState, tasks, type); + } + + @Override + public String toString() { + return "ConnectorHealth{" + + "name='" + name + '\'' + + ", connectorState=" + connectorState + + ", tasks=" + tasks + + ", type=" + type + + '}'; + } } diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java index d5571bc4ff5d0..63044265bb999 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java @@ -32,4 +32,13 @@ public class ConnectorState extends AbstractState { public ConnectorState(String state, String workerId, String traceMessage) { super(state, workerId, traceMessage); } + + @Override + public String toString() { + return "ConnectorState{" + + "state='" + state() + '\'' + + ", traceMessage='" + traceMessage() + '\'' + + ", workerId='" + workerId() + '\'' + + '}'; + } } diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java index 1c1be159970d9..ae78a5f3af990 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java @@ -50,20 +50,28 @@ public int taskId() { @Override public boolean equals(Object o) { - if (this == o) { + if (this == o) return true; - } - if (o == null || getClass() != o.getClass()) { + if (o == null || getClass() != o.getClass()) + return false; + if (!super.equals(o)) return false; - } - TaskState taskState = (TaskState) o; - return taskId == taskState.taskId; } @Override public int hashCode() { - return Objects.hash(taskId); + return Objects.hash(super.hashCode(), taskId); + } + + @Override + public String toString() { + return "TaskState{" + + "taskId='" + taskId + '\'' + + "state='" + state() + '\'' + + ", traceMessage='" + traceMessage() + '\'' + + ", workerId='" + workerId() + '\'' + + '}'; } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java index e3a4833681b4a..43842127eac07 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java @@ -82,7 +82,7 @@ private Map taskStates(List st for (ConnectorStateInfo.TaskState state : states) { taskStates.put( state.id(), - new TaskState(state.id(), state.workerId(), state.state(), state.trace()) + new TaskState(state.id(), state.state(), state.workerId(), state.trace()) ); } return taskStates; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java index d4cac3976f8be..a3e814763751d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java @@ -16,10 +16,16 @@ */ package org.apache.kafka.connect.integration; +import org.apache.kafka.connect.health.ConnectClusterState; +import org.apache.kafka.connect.health.ConnectorHealth; +import org.apache.kafka.connect.health.ConnectorState; +import org.apache.kafka.connect.health.ConnectorType; +import org.apache.kafka.connect.health.TaskState; import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.rest.ConnectRestExtensionContext; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.WorkerHandle; import org.apache.kafka.test.IntegrationTest; import org.junit.After; import org.junit.Test; @@ -28,12 +34,18 @@ import javax.ws.rs.GET; import javax.ws.rs.Path; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.REST_EXTENSION_CLASSES_CONFIG; import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.Assert.assertEquals; /** * A simple integration test to ensure that REST extensions are registered correctly. @@ -41,39 +53,86 @@ @Category(IntegrationTest.class) public class RestExtensionIntegrationTest { - private static final int NUM_WORKERS = 3; private static final long REST_EXTENSION_REGISTRATION_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1); + private static final long CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1); private EmbeddedConnectCluster connect; @Test - public void testImmediateRequestForListOfConnectors() throws IOException, InterruptedException { + public void testRestExtensionApi() throws IOException, InterruptedException { // setup Connect worker properties Map workerProps = new HashMap<>(); workerProps.put(REST_EXTENSION_CLASSES_CONFIG, IntegrationTestRestExtension.class.getName()); - + // build a Connect cluster backed by Kafka and Zk connect = new EmbeddedConnectCluster.Builder() - .name("connect-cluster") - .numWorkers(NUM_WORKERS) - .numBrokers(1) - .workerProps(workerProps) - .build(); - + .name("connect-cluster") + .numWorkers(1) + .numBrokers(1) + .workerProps(workerProps) + .build(); + // start the clusters connect.start(); + WorkerHandle worker = connect.workers().stream() + .findFirst() + .orElseThrow(() -> new AssertionError("At least one worker handle should be available")); + waitForCondition( this::extensionIsRegistered, REST_EXTENSION_REGISTRATION_TIMEOUT_MS, "REST extension was never registered" ); + + ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle("test-conn"); + try { + // setup up props for the connector + Map connectorProps = new HashMap<>(); + connectorProps.put(CONNECTOR_CLASS_CONFIG, MonitorableSinkConnector.class.getSimpleName()); + connectorProps.put(TASKS_MAX_CONFIG, String.valueOf(1)); + connectorProps.put(TOPICS_CONFIG, "test-topic"); + + // start a connector + connectorHandle.taskHandle(connectorHandle.name() + "-0"); + StartAndStopLatch connectorStartLatch = connectorHandle.expectedStarts(1); + connect.configureConnector(connectorHandle.name(), connectorProps); + connectorStartLatch.await(CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + String workerId = String.format("%s:%d", worker.url().getHost(), worker.url().getPort()); + ConnectorHealth expectedHealth = new ConnectorHealth( + connectorHandle.name(), + new ConnectorState( + "RUNNING", + workerId, + null + ), + Collections.singletonMap( + 0, + new TaskState(0, "RUNNING", workerId, null) + ), + ConnectorType.SINK + ); + + connectorProps.put(NAME_CONFIG, connectorHandle.name()); + + // Test the REST extension API; specifically, that the connector's health and configuration + // are available to the REST extension we registered and that they contain expected values + waitForCondition( + () -> verifyConnectorHealth(connectorHandle.name(), expectedHealth), + CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS, + "Connector health and/or config was never accessible by the REST extension" + ); + } finally { + RuntimeHandles.get().deleteConnector(connectorHandle.name()); + } } @After public void close() { // stop all Connect, Kafka and Zk threads. connect.stop(); + IntegrationTestRestExtension.instance = null; } private boolean extensionIsRegistered() { @@ -85,11 +144,38 @@ private boolean extensionIsRegistered() { } } + private boolean verifyConnectorHealth( + String connectorName, + ConnectorHealth expectedHealth + ) { + ConnectClusterState clusterState = + IntegrationTestRestExtension.instance.restPluginContext.clusterState(); + + ConnectorHealth actualHealth = clusterState.connectorHealth(connectorName); + if (actualHealth.tasksState().isEmpty()) { + // Happens if the task has been started but its status has not yet been picked up from + // the status topic by the worker. + return false; + } + assertEquals(expectedHealth, actualHealth); + + return true; + } + public static class IntegrationTestRestExtension implements ConnectRestExtension { + private static IntegrationTestRestExtension instance; + + public ConnectRestExtensionContext restPluginContext; @Override public void register(ConnectRestExtensionContext restPluginContext) { + instance = this; + this.restPluginContext = restPluginContext; + // Immediately request a list of connectors to confirm that the context and its fields + // has been fully initialized and there is no risk of deadlock restPluginContext.clusterState().connectors(); + // Install a new REST resource that can be used to confirm that the extension has been + // successfully registered restPluginContext.configurable().register(new IntegrationTestRestExtensionResource()); }