Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.kafka.connect.health;

import java.util.Objects;

/**
* Provides the current status along with identifier for Connect worker and tasks.
*/
Expand All @@ -34,10 +36,10 @@ public abstract class AbstractState {
* @param traceMessage any error trace message associated with the connector or the task; may be null or empty
*/
public AbstractState(String state, String workerId, String traceMessage) {
if (state != null && !state.trim().isEmpty()) {
if (state == null || state.trim().isEmpty()) {
throw new IllegalArgumentException("State must not be null or empty");
}
if (workerId != null && !workerId.trim().isEmpty()) {
if (workerId == null || workerId.trim().isEmpty()) {
throw new IllegalArgumentException("Worker ID must not be null or empty");
}
this.state = state;
Expand Down Expand Up @@ -71,4 +73,21 @@ public String workerId() {
public String traceMessage() {
return traceMessage;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
AbstractState that = (AbstractState) o;
return state.equals(that.state)
&& Objects.equals(traceMessage, that.traceMessage)
&& workerId.equals(that.workerId);
}

@Override
public int hashCode() {
return Objects.hash(state, traceMessage, workerId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public ConnectorHealth(String name,
ConnectorState connectorState,
Map<Integer, TaskState> tasks,
ConnectorType type) {
if (name != null && !name.trim().isEmpty()) {
if (name == null || name.trim().isEmpty()) {
throw new IllegalArgumentException("Connector name is required");
}
Objects.requireNonNull(connectorState, "connectorState can't be null");
Expand Down Expand Up @@ -83,4 +83,31 @@ public ConnectorType type() {
return type;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
ConnectorHealth that = (ConnectorHealth) o;
return name.equals(that.name)
&& connectorState.equals(that.connectorState)
&& tasks.equals(that.tasks)
&& type == that.type;
}

@Override
public int hashCode() {
return Objects.hash(name, connectorState, tasks, type);
}

@Override
public String toString() {
return "ConnectorHealth{"
+ "name='" + name + '\''
+ ", connectorState=" + connectorState
+ ", tasks=" + tasks
+ ", type=" + type
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,13 @@ public class ConnectorState extends AbstractState {
public ConnectorState(String state, String workerId, String traceMessage) {
super(state, workerId, traceMessage);
}

@Override
public String toString() {
return "ConnectorState{"
+ "state='" + state() + '\''
+ ", traceMessage='" + traceMessage() + '\''
+ ", workerId='" + workerId() + '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,28 @@ public int taskId() {

@Override
public boolean equals(Object o) {
if (this == o) {
if (this == o)
return true;
}
if (o == null || getClass() != o.getClass()) {
if (o == null || getClass() != o.getClass())
return false;
if (!super.equals(o))
return false;
}

TaskState taskState = (TaskState) o;

return taskId == taskState.taskId;
}

@Override
public int hashCode() {
return Objects.hash(taskId);
return Objects.hash(super.hashCode(), taskId);
}

@Override
public String toString() {
return "TaskState{"
+ "taskId='" + taskId + '\''
+ "state='" + state() + '\''
+ ", traceMessage='" + traceMessage() + '\''
+ ", workerId='" + workerId() + '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private Map<Integer, TaskState> taskStates(List<ConnectorStateInfo.TaskState> st
for (ConnectorStateInfo.TaskState state : states) {
taskStates.put(
state.id(),
new TaskState(state.id(), state.workerId(), state.state(), state.trace())
new TaskState(state.id(), state.state(), state.workerId(), state.trace())
);
}
return taskStates;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@
*/
package org.apache.kafka.connect.integration;

import org.apache.kafka.connect.health.ConnectClusterState;
import org.apache.kafka.connect.health.ConnectorHealth;
import org.apache.kafka.connect.health.ConnectorState;
import org.apache.kafka.connect.health.ConnectorType;
import org.apache.kafka.connect.health.TaskState;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.WorkerHandle;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Test;
Expand All @@ -28,52 +34,105 @@
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.REST_EXTENSION_CLASSES_CONFIG;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertEquals;

/**
* A simple integration test to ensure that REST extensions are registered correctly.
*/
@Category(IntegrationTest.class)
public class RestExtensionIntegrationTest {

private static final int NUM_WORKERS = 3;
private static final long REST_EXTENSION_REGISTRATION_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);
private static final long CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);

private EmbeddedConnectCluster connect;

@Test
public void testImmediateRequestForListOfConnectors() throws IOException, InterruptedException {
public void testRestExtensionApi() throws IOException, InterruptedException {
// setup Connect worker properties
Map<String, String> workerProps = new HashMap<>();
workerProps.put(REST_EXTENSION_CLASSES_CONFIG, IntegrationTestRestExtension.class.getName());

// build a Connect cluster backed by Kafka and Zk
connect = new EmbeddedConnectCluster.Builder()
.name("connect-cluster")
.numWorkers(NUM_WORKERS)
.numBrokers(1)
.workerProps(workerProps)
.build();
.name("connect-cluster")
.numWorkers(1)
.numBrokers(1)
.workerProps(workerProps)
.build();

// start the clusters
connect.start();

WorkerHandle worker = connect.workers().stream()
.findFirst()
.orElseThrow(() -> new AssertionError("At least one worker handle should be available"));

waitForCondition(
this::extensionIsRegistered,
REST_EXTENSION_REGISTRATION_TIMEOUT_MS,
"REST extension was never registered"
);

ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle("test-conn");
try {
// setup up props for the connector
Map<String, String> connectorProps = new HashMap<>();
connectorProps.put(CONNECTOR_CLASS_CONFIG, MonitorableSinkConnector.class.getSimpleName());
connectorProps.put(TASKS_MAX_CONFIG, String.valueOf(1));
connectorProps.put(TOPICS_CONFIG, "test-topic");

// start a connector
connectorHandle.taskHandle(connectorHandle.name() + "-0");
StartAndStopLatch connectorStartLatch = connectorHandle.expectedStarts(1);
connect.configureConnector(connectorHandle.name(), connectorProps);
connectorStartLatch.await(CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS, TimeUnit.MILLISECONDS);

String workerId = String.format("%s:%d", worker.url().getHost(), worker.url().getPort());
ConnectorHealth expectedHealth = new ConnectorHealth(
connectorHandle.name(),
new ConnectorState(
"RUNNING",
workerId,
null
),
Collections.singletonMap(
0,
new TaskState(0, "RUNNING", workerId, null)
),
ConnectorType.SINK
);

connectorProps.put(NAME_CONFIG, connectorHandle.name());

// Test the REST extension API; specifically, that the connector's health and configuration
// are available to the REST extension we registered and that they contain expected values
waitForCondition(
() -> verifyConnectorHealth(connectorHandle.name(), expectedHealth),
CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS,
"Connector health and/or config was never accessible by the REST extension"
);
} finally {
RuntimeHandles.get().deleteConnector(connectorHandle.name());
}
}

@After
public void close() {
// stop all Connect, Kafka and Zk threads.
connect.stop();
IntegrationTestRestExtension.instance = null;
}

private boolean extensionIsRegistered() {
Expand All @@ -85,11 +144,38 @@ private boolean extensionIsRegistered() {
}
}

private boolean verifyConnectorHealth(
String connectorName,
ConnectorHealth expectedHealth
) {
ConnectClusterState clusterState =
IntegrationTestRestExtension.instance.restPluginContext.clusterState();

ConnectorHealth actualHealth = clusterState.connectorHealth(connectorName);
if (actualHealth.tasksState().isEmpty()) {
// Happens if the task has been started but its status has not yet been picked up from
// the status topic by the worker.
return false;
}
assertEquals(expectedHealth, actualHealth);

return true;
}

public static class IntegrationTestRestExtension implements ConnectRestExtension {
private static IntegrationTestRestExtension instance;

public ConnectRestExtensionContext restPluginContext;

@Override
public void register(ConnectRestExtensionContext restPluginContext) {
instance = this;
this.restPluginContext = restPluginContext;
// Immediately request a list of connectors to confirm that the context and its fields
// has been fully initialized and there is no risk of deadlock
restPluginContext.clusterState().connectors();
// Install a new REST resource that can be used to confirm that the extension has been
// successfully registered
restPluginContext.configurable().register(new IntegrationTestRestExtensionResource());
}

Expand Down