Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,11 @@
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.function.ThrowingRunnable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -68,16 +67,16 @@
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Tests situations during which certain connector operations, such as start, validation,
* configuration and others, take longer than expected.
*/
@Category(IntegrationTest.class)
@Tag("integration")
public class BlockingConnectorTest {

private static final Logger log = LoggerFactory.getLogger(BlockingConnectorTest.class);
Expand Down Expand Up @@ -121,7 +120,7 @@ public class BlockingConnectorTest {
private EmbeddedConnectCluster connect;
private ConnectorHandle normalConnectorHandle;

@Before
@BeforeEach
public void setup() throws Exception {
// build a Connect cluster backed by Kafka and Zk
connect = new EmbeddedConnectCluster.Builder()
Expand All @@ -136,7 +135,7 @@ public void setup() throws Exception {
connect.start();
}

@After
@AfterEach
public void close() {
// stop all Connect, Kafka and Zk threads.
connect.stop();
Expand Down Expand Up @@ -354,26 +353,26 @@ private void verifyNormalConnector() throws InterruptedException {
normalConnectorHandle.awaitCommits(RECORD_TRANSFER_TIMEOUT_MS);
}

private void assertRequestTimesOut(String requestDescription, ThrowingRunnable request, String expectedTimeoutMessage) {
private void assertRequestTimesOut(String requestDescription, Executable request, String expectedTimeoutMessage) {
// Artificially reduce the REST request timeout so that these don't take 90 seconds
connect.requestTimeout(REDUCED_REST_REQUEST_TIMEOUT);
ConnectRestException exception = assertThrows(
"Should have failed to " + requestDescription,
ConnectRestException.class, request
ConnectRestException.class, request,
"Should have failed to " + requestDescription
);
assertEquals(
"Should have gotten 500 error from trying to " + requestDescription,
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), exception.statusCode()
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), exception.statusCode(),
"Should have gotten 500 error from trying to " + requestDescription
);
assertTrue(
exception.getMessage().contains("Request timed out"),
"Should have gotten timeout message from trying to " + requestDescription
+ "; instead, message was: " + exception.getMessage(),
exception.getMessage().contains("Request timed out")
+ "; instead, message was: " + exception.getMessage()
);
if (expectedTimeoutMessage != null) {
assertTrue(
"Timeout error message '" + exception.getMessage() + "' does not match expected format",
exception.getMessage().contains(expectedTimeoutMessage)
exception.getMessage().contains(expectedTimeoutMessage),
"Timeout error message '" + exception.getMessage() + "' does not match expected format"
);
}
// Reset the REST request timeout so that other requests aren't impacted
Expand Down Expand Up @@ -510,8 +509,8 @@ public void maybeBlockOn(String block) {
CountDownLatch blockLatch;
synchronized (Block.class) {
assertNotNull(
"Block was reset prematurely",
awaitBlockLatch
awaitBlockLatch,
"Block was reset prematurely"
);
awaitBlockLatch.countDown();
blockLatch = newBlockLatch();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,20 @@
import org.apache.kafka.connect.util.SinkUtils;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.WorkerHandle;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

import java.io.File;
import java.io.FileOutputStream;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -98,16 +97,16 @@
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;

/**
* Test simple operations on the workers of a Connect cluster.
*/
@Category(IntegrationTest.class)
@Tag("integration")
public class ConnectWorkerIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(ConnectWorkerIntegrationTest.class);

Expand All @@ -124,14 +123,9 @@ public class ConnectWorkerIntegrationTest {
private Map<String, String> workerProps;
private Properties brokerProps;

@Rule
public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log);

@Rule
public TemporaryFolder tmp = new TemporaryFolder();

@Before
public void setup() {
@BeforeEach
public void setup(TestInfo testInfo) {
log.info("Starting test {}", testInfo.getDisplayName());
// setup Connect worker properties
workerProps = new HashMap<>();
workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
Expand All @@ -150,8 +144,9 @@ public void setup() {
.maskExitProcedures(true); // true is the default, setting here as example
}

@After
public void close() {
@AfterEach
public void close(TestInfo testInfo) {
log.info("Finished test {}", testInfo.getDisplayName());
// stop all Connect, Kafka and Zk threads.
connect.stop();
}
Expand Down Expand Up @@ -268,9 +263,9 @@ public void testBrokerCoordinator() throws Exception {
Thread.sleep(TimeUnit.SECONDS.toMillis(10));

// Wait for the connector to be stopped
assertTrue("Failed to stop connector and tasks after coordinator failure within "
+ CONNECTOR_SETUP_DURATION_MS + "ms",
stopLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
assertTrue(stopLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS),
"Failed to stop connector and tasks after coordinator failure within "
+ CONNECTOR_SETUP_DURATION_MS + "ms");

StartAndStopLatch startLatch = connectorHandle.expectedStarts(1, false);
connect.kafka().startOnlyKafkaOnSamePorts();
Expand All @@ -288,9 +283,9 @@ public void testBrokerCoordinator() throws Exception {
"Connector tasks did not start in time.");

// Expect that the connector has started again
assertTrue("Failed to stop connector and tasks after coordinator failure within "
+ CONNECTOR_SETUP_DURATION_MS + "ms",
startLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
assertTrue(startLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS),
"Failed to stop connector and tasks after coordinator failure within "
+ CONNECTOR_SETUP_DURATION_MS + "ms");
}

/**
Expand Down Expand Up @@ -358,7 +353,7 @@ public void testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce
StartAndStopLatch stopCounter = connector.expectedStops(1);
connect.deleteConnector(CONNECTOR_NAME);

assertTrue("Connector and all tasks were not stopped in time", stopCounter.await(1, TimeUnit.MINUTES));
assertTrue(stopCounter.await(1, TimeUnit.MINUTES), "Connector and all tasks were not stopped in time");
}

/**
Expand Down Expand Up @@ -927,8 +922,8 @@ private void assertTimeoutException(Runnable operation, String expectedStageDesc
assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), restException.statusCode());
assertNotNull(restException.getMessage());
assertTrue(
"Message '" + restException.getMessage() + "' does not match expected format",
restException.getMessage().contains("Request timed out. The worker is currently " + expectedStageDescription)
restException.getMessage().contains("Request timed out. The worker is currently " + expectedStageDescription),
"Message '" + restException.getMessage() + "' does not match expected format"
);

return true;
Expand Down Expand Up @@ -1166,8 +1161,8 @@ public void testCompactedDeletedOlderConnectorConfig() throws Exception {
final TopicPartition connectorTopicPartition = new TopicPartition(connectorTopic, 0);
final long initialEndOffset = connect.kafka().endOffset(connectorTopicPartition);
assertTrue(
"Source connector should have published at least one record to Kafka",
initialEndOffset > 0
initialEndOffset > 0,
"Source connector should have published at least one record to Kafka"
);

connectorHandle.expectedCommits(NUM_TASKS * 2);
Expand All @@ -1187,9 +1182,9 @@ public void testCompactedDeletedOlderConnectorConfig() throws Exception {
// See if any new records got written to the old topic
final long nextEndOffset = connect.kafka().endOffset(connectorTopicPartition);
assertEquals(
"No new records should have been written to the older topic",
initialEndOffset,
nextEndOffset
nextEndOffset,
"No new records should have been written to the older topic"
);
}

Expand All @@ -1203,7 +1198,7 @@ public void testCompactedDeletedOlderConnectorConfig() throws Exception {
* an invalid config provider reference, it will still be possible to reconfigure the connector.
*/
@Test
public void testReconfigureConnectorWithFailingTaskConfigs() throws Exception {
public void testReconfigureConnectorWithFailingTaskConfigs(@TempDir Path tmp) throws Exception {
final int offsetCommitIntervalMs = 100;
workerProps.put(CONFIG_PROVIDERS_CONFIG, "file");
workerProps.put(CONFIG_PROVIDERS_CONFIG + ".file.class", FileConfigProvider.class.getName());
Expand All @@ -1219,7 +1214,7 @@ public void testReconfigureConnectorWithFailingTaskConfigs() throws Exception {
final String firstConnectorTopic = "connector-topic-1";
connect.kafka().createTopic(firstConnectorTopic);

final File secretsFile = tmp.newFile("test-secrets");
final File secretsFile = tmp.resolve("test-secrets").toFile();
final Properties secrets = new Properties();
final String throughputSecretKey = "secret-throughput";
secrets.put(throughputSecretKey, "10");
Expand All @@ -1244,7 +1239,7 @@ public void testReconfigureConnectorWithFailingTaskConfigs() throws Exception {
connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);

// Delete the secrets file, which should render the old task configs invalid
assertTrue("Failed to delete secrets file", secretsFile.delete());
assertTrue(secretsFile.delete(), "Failed to delete secrets file");

// Use a start latch here instead of assertConnectorAndExactlyNumTasksAreRunning
// since failure to reconfigure the tasks (which may occur if the bug this test was written
Expand All @@ -1260,8 +1255,8 @@ public void testReconfigureConnectorWithFailingTaskConfigs() throws Exception {
connectorConfig.put(TOPIC_CONFIG, secondConnectorTopic);
connect.configureConnector(CONNECTOR_NAME, connectorConfig);
assertTrue(
"Connector tasks were not restarted in time",
restarts.await(10, TimeUnit.SECONDS)
restarts.await(10, TimeUnit.SECONDS),
"Connector tasks were not restarted in time"
);

// Wait for at least one task to commit offsets after being restarted
Expand All @@ -1270,9 +1265,9 @@ public void testReconfigureConnectorWithFailingTaskConfigs() throws Exception {

final long endOffset = connect.kafka().endOffset(new TopicPartition(secondConnectorTopic, 0));
assertTrue(
endOffset > 0,
"Source connector should have published at least one record to new Kafka topic "
+ "after being reconfigured",
endOffset > 0
+ "after being reconfigured"
);
}

Expand Down Expand Up @@ -1307,9 +1302,9 @@ public void testRuntimePropertyReconfiguration() throws Exception {
"Connector did not start or task did not fail in time"
);
assertEquals(
"Connector should not have any committed offsets when only task fails on first record",
new ConnectorOffsets(Collections.emptyList()),
connect.connectorOffsets(CONNECTOR_NAME)
connect.connectorOffsets(CONNECTOR_NAME),
"Connector should not have any committed offsets when only task fails on first record"
);

// Reconfigure the connector to use the string converter, which should not cause any more task failures
Expand Down
Loading