From 0f1d2e2c1c0c68fd20db8b3df1dcb2e32371a109 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Tue, 11 Jun 2024 14:33:37 -0400 Subject: [PATCH 1/3] KAFKA-16935: Automatically wait for cluster startup in embedded Connect integration tests --- .../MirrorConnectorsIntegrationBaseTest.java | 4 -- .../integration/BlockingConnectorTest.java | 5 -- .../ConnectWorkerIntegrationTest.java | 63 +------------------ .../ConnectorClientPolicyIntegrationTest.java | 2 - .../ConnectorRestartApiIntegrationTest.java | 2 - .../ConnectorTopicsIntegrationTest.java | 6 -- .../ErrorHandlingIntegrationTest.java | 2 - .../ExactlyOnceSourceIntegrationTest.java | 3 - .../InternalTopicsIntegrationTest.java | 9 +-- ...alanceSourceConnectorsIntegrationTest.java | 18 ------ .../RestExtensionIntegrationTest.java | 3 - .../SessionedProtocolIntegrationTest.java | 13 ++-- .../SinkConnectorsIntegrationTest.java | 2 - .../SourceConnectorsIntegrationTest.java | 6 -- .../TransformationIntegrationTest.java | 12 ---- .../util/clusters/EmbeddedConnect.java | 43 ++++++++++++- 16 files changed, 48 insertions(+), 145 deletions(-) diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java index 3f169b46920a8..d49435ab7ac35 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java @@ -219,16 +219,12 @@ public void startClusters(Map additionalMM2Config) throws Except .build(); primary.start(); - primary.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Workers of " + PRIMARY_CLUSTER_ALIAS + "-connect-cluster did not start in time."); waitForTopicCreated(primary, "mm2-status.backup.internal"); waitForTopicCreated(primary, "mm2-offsets.backup.internal"); waitForTopicCreated(primary, "mm2-configs.backup.internal"); backup.start(); - backup.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Workers of " + BACKUP_CLUSTER_ALIAS + "-connect-cluster did not start in time."); primaryProducer = initializeProducer(primary); backupProducer = initializeProducer(backup); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index 3eefee64c0d1f..532ab1baaf02f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -134,11 +134,6 @@ public void setup() throws Exception { // start the clusters connect.start(); - - connect.assertions().assertAtLeastNumWorkersAreUp( - NUM_WORKERS, - "Initial group of workers did not start in time" - ); } @After diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index c540016f104bb..a7bb6c1959660 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -218,9 +218,6 @@ public void testRestartFailedTask() throws Exception { props.put(TASKS_MAX_CONFIG, Objects.toString(numTasks)); props.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, "nobrokerrunningatthisaddress"); - connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - // Try to start the connector and its single task. connect.configureConnector(CONNECTOR_NAME, props); @@ -258,9 +255,6 @@ public void testBrokerCoordinator() throws Exception { // set up props for the source connector Map props = defaultSourceConnectorProps(TOPIC_NAME); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - // start a source connector connect.configureConnector(CONNECTOR_NAME, props); @@ -312,9 +306,6 @@ public void testTaskStatuses() throws Exception { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - // base connector props Map props = defaultSourceConnectorProps(TOPIC_NAME); props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName()); @@ -352,8 +343,6 @@ public void testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce .build(); connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(1, "Initial group of workers did not start in time."); - // and when the connector is not configured to create topics Map props = defaultSourceConnectorProps("nonexistenttopic"); props.remove(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG); @@ -405,9 +394,6 @@ public void testPauseStopResume() throws Exception { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - // Want to make sure to use multiple tasks final int numTasks = 4; Map props = defaultSourceConnectorProps(TOPIC_NAME); @@ -497,9 +483,6 @@ public void testStoppedState() throws Exception { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - Map props = defaultSourceConnectorProps(TOPIC_NAME); // Fail the connector on startup props.put("connector.start.inject.error", "true"); @@ -572,11 +555,6 @@ public void testTasksConfigDeprecation() throws Exception { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp( - NUM_WORKERS, - "Initial group of workers did not start in time." - ); - connect.configureConnector(CONNECTOR_NAME, defaultSourceConnectorProps(TOPIC_NAME)); connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( CONNECTOR_NAME, @@ -600,9 +578,6 @@ public void testCreateConnectorWithPausedInitialState() throws Exception { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - CreateConnectorRequest createConnectorRequest = new CreateConnectorRequest( CONNECTOR_NAME, defaultSourceConnectorProps(TOPIC_NAME), @@ -634,9 +609,6 @@ public void testCreateSourceConnectorWithStoppedInitialStateAndModifyOffsets() t // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - Map props = defaultSourceConnectorProps(TOPIC_NAME); // Configure the connector to produce a maximum of 10 messages @@ -687,9 +659,6 @@ public void testCreateSinkConnectorWithStoppedInitialStateAndModifyOffsets() thr // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - // Create topic and produce 10 messages connect.kafka().createTopic(TOPIC_NAME); for (int i = 0; i < 10; i++) { @@ -754,9 +723,6 @@ public void testDeleteConnectorCreatedWithPausedOrStoppedInitialState() throws E // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - // Create a connector with PAUSED initial state CreateConnectorRequest createConnectorRequest = new CreateConnectorRequest( CONNECTOR_NAME, @@ -806,9 +772,6 @@ public void testPatchConnectorConfig() throws Exception { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - connect.kafka().createTopic(TOPIC_NAME); Map props = defaultSinkConnectorProps(TOPIC_NAME); @@ -859,8 +822,6 @@ public void testRequestTimeouts() throws Exception { .numWorkers(1) .build(); connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(1, - "Worker did not start in time"); Map connectorConfig1 = defaultSourceConnectorProps(TOPIC_NAME); Map connectorConfig2 = new HashMap<>(connectorConfig1); @@ -927,8 +888,6 @@ public void testPollTimeoutExpiry() throws Exception { connect.start(); - connect.assertions().assertExactlyNumWorkersAreUp(1, "Worker not brought up in time"); - Map connectorWithBlockingTaskStopConfig = new HashMap<>(); connectorWithBlockingTaskStopConfig.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSourceConnector.class.getName()); connectorWithBlockingTaskStopConfig.put(TASKS_MAX_CONFIG, "1"); @@ -1008,11 +967,6 @@ public void testTasksMaxEnforcement() throws Exception { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp( - NUM_WORKERS, - "Initial group of workers did not start in time." - ); - Map connectorProps = defaultSourceConnectorProps(TOPIC_NAME); int maxTasks = 1; connectorProps.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks)); @@ -1177,11 +1131,6 @@ public void testCompactedDeletedOlderConnectorConfig() throws Exception { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp( - numWorkers, - "Initial group of workers did not start in time." - ); - final String connectorTopic = "connector-topic"; connect.kafka().createTopic(connectorTopic, 1); @@ -1214,7 +1163,7 @@ public void testCompactedDeletedOlderConnectorConfig() throws Exception { connect.assertions().assertAtLeastNumWorkersAreUp( numWorkers, - "Initial group of workers did not start in time." + "Workers did not start in time after cluster was rolled." ); final TopicPartition connectorTopicPartition = new TopicPartition(connectorTopic, 0); @@ -1270,11 +1219,6 @@ public void testReconfigureConnectorWithFailingTaskConfigs() throws Exception { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp( - numWorkers, - "Initial group of workers did not start in time." - ); - final String firstConnectorTopic = "connector-topic-1"; connect.kafka().createTopic(firstConnectorTopic); @@ -1344,11 +1288,6 @@ public void testRuntimePropertyReconfiguration() throws Exception { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp( - NUM_WORKERS, - "Initial group of workers did not start in time." - ); - final String topic = "kafka9228"; connect.kafka().createTopic(topic, 1); connect.kafka().produce(topic, "non-json-value"); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java index a0abece17d750..f09a949010c69 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java @@ -121,8 +121,6 @@ private EmbeddedConnectCluster connectClusterWithPolicy(String policy) throws In // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); return connect; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java index a512eeaae0a71..e957b97f6114d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java @@ -113,8 +113,6 @@ private void startOrReuseConnectWithNumWorkers(int numWorkers) throws Exception connect.start(); return connect; }); - connect.assertions().assertExactlyNumWorkersAreUp(numWorkers, - "Initial group of workers did not start in time."); } @After diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java index eb055ab13fb11..e9fdd91c38796 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java @@ -115,8 +115,6 @@ public void testGetActiveTopics() throws InterruptedException { connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS); connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time."); - connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(), "Active topic set is not empty for connector: " + FOO_CONNECTOR); @@ -179,8 +177,6 @@ public void testTopicTrackingResetIsDisabled() throws InterruptedException { connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS); connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time."); - connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(), "Active topic set is not empty for connector: " + FOO_CONNECTOR); @@ -235,8 +231,6 @@ public void testTopicTrackingIsDisabled() throws InterruptedException { connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS); connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time."); - // start a source connector connect.configureConnector(FOO_CONNECTOR, defaultSourceConnectorProps(FOO_TOPIC)); connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR, NUM_TASKS, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java index 1a76956be610c..30fe48116fc97 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java @@ -94,8 +94,6 @@ public void setup() throws InterruptedException { // start Connect cluster connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); // get connector handles before starting test. connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java index 04dd4c7de67c4..6d4b648201ac3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java @@ -500,9 +500,6 @@ public void testFencedLeaderRecovery() throws Exception { connectorHandle.expectedRecords(MINIMUM_MESSAGES); connectorHandle.expectedCommits(MINIMUM_MESSAGES); - // make sure the worker is actually up (otherwise, it may fence out our simulated zombie leader, instead of the other way around) - connect.assertions().assertExactlyNumWorkersAreUp(1, "Connect worker did not complete startup in time"); - // fence out the leader of the cluster Producer zombieLeader = transactionalProducer( "simulated-zombie-leader", diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java index d73d1c4ed069e..6eb8122598bbe 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java @@ -72,8 +72,6 @@ public void testCreateInternalTopicsWithDefaultSettings() throws InterruptedExce // Start the Connect cluster connect.start(); - connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Brokers did not start in time."); - connect.assertions().assertExactlyNumWorkersAreUp(numWorkers, "Worker did not start in time."); log.info("Completed startup of {} Kafka brokers and {} Connect workers", numBrokers, numWorkers); // Check the topics @@ -111,9 +109,6 @@ public void testCreateInternalTopicsWithFewerReplicasThanBrokers() throws Interr // Start the Connect cluster connect.start(); - connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker did not start in time."); - connect.assertions().assertAtLeastNumWorkersAreUp(numWorkers, "Worker did not start in time."); - log.info("Completed startup of {} Kafka brokers and {} Connect workers", numBrokers, numWorkers); // Check the topics log.info("Verifying the internal topics for Connect"); @@ -136,7 +131,7 @@ public void testFailToCreateInternalTopicsWithMoreReplicasThanBrokers() throws I .build(); // Start the brokers and Connect, but Connect should fail to create config and offset topic - connect.start(); + connect.start(false); connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker did not start in time."); log.info("Completed startup of {} Kafka broker. Expected Connect worker to fail", numBrokers); @@ -169,7 +164,6 @@ public void testFailToStartWhenInternalTopicsAreNotCompacted() throws Interrupte // Start the brokers but not Connect log.info("Starting {} Kafka brokers, but no Connect workers yet", numBrokers); connect.start(); - connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker did not start in time."); log.info("Completed startup of {} Kafka broker. Expected Connect worker to fail", numBrokers); // Create the good topics @@ -243,7 +237,6 @@ public void testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefau // Start the brokers but not Connect log.info("Starting {} Kafka brokers, but no Connect workers yet", numBrokers); connect.start(); - connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker did not start in time."); log.info("Completed startup of {} Kafka broker. Expected Connect worker to fail", numBrokers); // Create the valid internal topics w/o topic settings, so these will use the broker's diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java index 82004c8dc3aa8..4c803a349212a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java @@ -116,9 +116,6 @@ public void testStartTwoConnectors() throws Exception { // setup up props for the source connector Map props = defaultSourceConnectorProps(TOPIC_NAME); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Connect workers did not start in time."); - // start a source connector connect.configureConnector(CONNECTOR_NAME, props); @@ -147,9 +144,6 @@ public void testReconfigConnector() throws Exception { // setup up props for the source connector Map props = defaultSourceConnectorProps(TOPIC_NAME); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Connect workers did not start in time."); - // start a source connector connect.configureConnector(CONNECTOR_NAME, props); @@ -194,9 +188,6 @@ public void testDeleteConnector() throws Exception { // setup up props for the source connector Map props = defaultSourceConnectorProps(TOPIC_NAME); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Connect workers did not start in time."); - // start several source connectors IntStream.range(0, 4).forEachOrdered(i -> connect.configureConnector(CONNECTOR_NAME + i, props)); @@ -221,9 +212,6 @@ public void testAddingWorker() throws Exception { // setup up props for the source connector Map props = defaultSourceConnectorProps(TOPIC_NAME); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Connect workers did not start in time."); - // start a source connector IntStream.range(0, 4).forEachOrdered(i -> connect.configureConnector(CONNECTOR_NAME + i, props)); @@ -250,9 +238,6 @@ public void testRemovingWorker() throws Exception { // setup up props for the source connector Map props = defaultSourceConnectorProps(TOPIC_NAME); - connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS, - "Connect workers did not start in time."); - // start a source connector IntStream.range(0, 4).forEachOrdered(i -> connect.configureConnector(CONNECTOR_NAME + i, props)); @@ -276,9 +261,6 @@ public void testMultipleWorkersRejoining() throws Exception { // setup up props for the source connector Map props = defaultSourceConnectorProps(TOPIC_NAME); - connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS, - "Connect workers did not start in time."); - // start a source connector IntStream.range(0, 4).forEachOrdered(i -> connect.configureConnector(CONNECTOR_NAME + i, props)); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java index a0f993f2f63a8..112dd219811fb 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java @@ -77,9 +77,6 @@ public void testRestExtensionApi() throws InterruptedException { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - WorkerHandle worker = connect.workers().stream() .findFirst() .orElseThrow(() -> new AssertionError("At least one worker handle should be available")); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java index 7ced24c82f8c1..8f71033b798a6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java @@ -19,9 +19,7 @@ import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; -import org.apache.kafka.connect.util.clusters.WorkerHandle; import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -98,10 +96,6 @@ public void ensureInternalEndpointIsSecured() throws Throwable { invalidSignatureHeaders.put(SIGNATURE_HEADER, "S2Fma2Flc3F1ZQ=="); invalidSignatureHeaders.put(SIGNATURE_ALGORITHM_HEADER, "HmacSHA256"); - TestUtils.waitForCondition( - () -> connect.workers().stream().allMatch(WorkerHandle::isRunning), - 30000L, "Timed out waiting for workers to start"); - // We haven't created the connector yet, but this should still return a 400 instead of a 404 // if the endpoint is secured log.info( @@ -120,9 +114,10 @@ public void ensureInternalEndpointIsSecured() throws Throwable { + "expecting 403 error response", connectorTasksEndpoint ); - TestUtils.waitForCondition( - () -> connect.requestPost(connectorTasksEndpoint, "[]", invalidSignatureHeaders).getStatus() == FORBIDDEN.getStatusCode(), - 30000L, "Timed out waiting for workers to start"); + assertEquals( + FORBIDDEN.getStatusCode(), + connect.requestPost(connectorTasksEndpoint, "[]", invalidSignatureHeaders).getStatus() + ); // Create the connector now // setup up props for the sink connector diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java index a8bfbb291ffa1..f42addf396c94 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java @@ -83,7 +83,6 @@ public void setup() throws Exception { .brokerProps(brokerProps) .build(); connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time."); } @After @@ -209,7 +208,6 @@ public void testCooperativeConsumerPartitionAssignment() throws Exception { final Collection topics = Arrays.asList(topic1, topic2, topic3); Map connectorProps = baseSinkConnectorProps(String.join(",", topics)); - // Need an eager assignor here; round robin is as good as any connectorProps.put( CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java index b35b072080257..28db8ae94f064 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java @@ -103,8 +103,6 @@ public void testTopicsAreCreatedWhenAutoCreateTopicsIsEnabledAtTheBroker() throw // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time."); - Map fooProps = sourceConnectorPropsWithGroups(FOO_TOPIC); // start a source connector @@ -128,8 +126,6 @@ public void testTopicsAreCreatedWhenTopicCreationIsEnabled() throws InterruptedE // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time."); - Map fooProps = sourceConnectorPropsWithGroups(FOO_TOPIC); // start a source connector @@ -160,8 +156,6 @@ public void testSwitchingToTopicCreationEnabled() throws InterruptedException { connect.assertions().assertTopicSettings(BAR_TOPIC, DEFAULT_REPLICATION_FACTOR, DEFAULT_PARTITIONS, "Topic " + BAR_TOPIC + " does not have the expected settings"); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time."); - Map barProps = defaultSourceConnectorProps(BAR_TOPIC); // start a source connector with topic creation properties connect.configureConnector(BAR_CONNECTOR, barProps); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java index 02d8c7f71b1ac..760684425df1f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java @@ -115,8 +115,6 @@ public void close() { */ @Test public void testFilterOnTopicNameWithSinkConnector() throws Exception { - assertConnectReady(); - Map observedRecords = observeRecords(); // create test topics @@ -180,12 +178,6 @@ public void testFilterOnTopicNameWithSinkConnector() throws Exception { connect.deleteConnector(CONNECTOR_NAME); } - private void assertConnectReady() throws InterruptedException { - connect.assertions().assertExactlyNumBrokersAreUp(1, "Brokers did not start in time."); - connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS, "Worker did not start in time."); - log.info("Completed startup of {} Kafka brokers and {} Connect workers", 1, NUM_WORKERS); - } - private void assertConnectorRunning() throws InterruptedException { connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not start in time."); @@ -212,8 +204,6 @@ record -> observedRecords.compute(record.topic(), */ @Test public void testFilterOnTombstonesWithSinkConnector() throws Exception { - assertConnectReady(); - Map observedRecords = observeRecords(); // create test topics @@ -273,8 +263,6 @@ public void testFilterOnTombstonesWithSinkConnector() throws Exception { */ @Test public void testFilterOnHasHeaderKeyWithSourceConnectorAndTopicCreation() throws Exception { - assertConnectReady(); - // setup up props for the sink connector Map props = new HashMap<>(); props.put("name", CONNECTOR_NAME); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java index a37de76d1ae03..88aea97f7ee75 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java @@ -118,9 +118,27 @@ protected EmbeddedConnect( }; /** - * Start the connect cluster and the embedded Kafka and Zookeeper cluster. + * Start the Connect cluster and the embedded Kafka and Zookeeper cluster, + * and wait for the Kafka and Connect clusters to become healthy. */ public void start() { + start(true); + } + + /** + * Start the Connect cluster and the embedded Kafka and Zookeeper cluster. + *

+ * Note that in most cases, {@link #start()} is preferable. This method should only + * be used if it is expected that either Connect or the underlying Kafka cluster will + * not be able to complete startup successfully. + * + * @param awaitStartupCompletion whether to + * {@link ConnectAssertions#assertExactlyNumBrokersAreUp(int, String) await} + * the successful startup of each broker in the Kafka cluster, and + * {@link ConnectAssertions#assertExactlyNumWorkersAreUp(int, String) await} + * the successful startup of each worker in the Connect cluster + */ + public void start(boolean awaitStartupCompletion) { if (maskExitProcedures) { Exit.setExitProcedure(exitProcedure); Exit.setHaltProcedure(haltProcedure); @@ -132,6 +150,29 @@ public void start() { } catch (Exception e) { throw new ConnectException("Failed to start HTTP client", e); } + + if (awaitStartupCompletion) { + try { + if (numBrokers > 0) { + assertions().assertExactlyNumBrokersAreUp( + numBrokers, + "Kafka cluster did not start in time" + ); + log.info("Completed startup of {} Kafka brokers", numBrokers); + } + + int numWorkers = workers().size(); + if (numWorkers > 0) { + assertions().assertExactlyNumWorkersAreUp( + numWorkers, + "Connect cluster did not start in time" + ); + log.info("Completed startup of {} Connect workers", numWorkers); + } + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while awaiting cluster startup", e); + } + } } /** From 5327ddaaec44b476a784fc32f4a0f2e8abcfc9f3 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Wed, 12 Jun 2024 10:45:10 -0400 Subject: [PATCH 2/3] Address review comments --- .../ConnectWorkerIntegrationTest.java | 3 -- .../InternalTopicsIntegrationTest.java | 9 ++-- .../SourceConnectorsIntegrationTest.java | 2 +- .../util/clusters/EmbeddedConnect.java | 53 ++++++------------- 4 files changed, 24 insertions(+), 43 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index a7bb6c1959660..84ff88013fe41 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -172,9 +172,6 @@ public void testAddAndRemoveWorker() throws Exception { // set up props for the source connector Map props = defaultSourceConnectorProps(TOPIC_NAME); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - // start a source connector connect.configureConnector(CONNECTOR_NAME, props); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java index 6eb8122598bbe..c044bb8229855 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java @@ -121,7 +121,7 @@ public void testFailToCreateInternalTopicsWithMoreReplicasThanBrokers() throws I workerProps.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, "3"); workerProps.put(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, "2"); workerProps.put(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, "1"); - int numWorkers = 1; + int numWorkers = 0; int numBrokers = 1; connect = new EmbeddedConnectCluster.Builder().name("connect-cluster-1") .workerProps(workerProps) @@ -131,12 +131,15 @@ public void testFailToCreateInternalTopicsWithMoreReplicasThanBrokers() throws I .build(); // Start the brokers and Connect, but Connect should fail to create config and offset topic - connect.start(false); - connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker did not start in time."); + connect.start(); log.info("Completed startup of {} Kafka broker. Expected Connect worker to fail", numBrokers); + // Try to start a worker + connect.addWorker(); + // Verify that the offset and config topic don't exist; // the status topic may have been created if timing was right but we don't care + // TODO: Synchronously await and verify that the worker fails during startup log.info("Verifying the internal topics for Connect"); connect.assertions().assertTopicsDoNotExist(configTopic(), offsetTopic()); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java index 28db8ae94f064..046e5a9ccf7c2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java @@ -189,7 +189,7 @@ public void testSwitchingToTopicCreationEnabled() throws InterruptedException { workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(true)); IntStream.range(0, 3).forEach(i -> connect.addWorker()); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time."); + connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Workers did not start in time after cluster was rolled."); connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR, NUM_TASKS, "Connector tasks did not start in time."); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java index 88aea97f7ee75..e7e268425c710 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java @@ -122,23 +122,6 @@ protected EmbeddedConnect( * and wait for the Kafka and Connect clusters to become healthy. */ public void start() { - start(true); - } - - /** - * Start the Connect cluster and the embedded Kafka and Zookeeper cluster. - *

- * Note that in most cases, {@link #start()} is preferable. This method should only - * be used if it is expected that either Connect or the underlying Kafka cluster will - * not be able to complete startup successfully. - * - * @param awaitStartupCompletion whether to - * {@link ConnectAssertions#assertExactlyNumBrokersAreUp(int, String) await} - * the successful startup of each broker in the Kafka cluster, and - * {@link ConnectAssertions#assertExactlyNumWorkersAreUp(int, String) await} - * the successful startup of each worker in the Connect cluster - */ - public void start(boolean awaitStartupCompletion) { if (maskExitProcedures) { Exit.setExitProcedure(exitProcedure); Exit.setHaltProcedure(haltProcedure); @@ -151,27 +134,25 @@ public void start(boolean awaitStartupCompletion) { throw new ConnectException("Failed to start HTTP client", e); } - if (awaitStartupCompletion) { - try { - if (numBrokers > 0) { - assertions().assertExactlyNumBrokersAreUp( - numBrokers, - "Kafka cluster did not start in time" - ); - log.info("Completed startup of {} Kafka brokers", numBrokers); - } + try { + if (numBrokers > 0) { + assertions().assertExactlyNumBrokersAreUp( + numBrokers, + "Kafka cluster did not start in time" + ); + log.info("Completed startup of {} Kafka brokers", numBrokers); + } - int numWorkers = workers().size(); - if (numWorkers > 0) { - assertions().assertExactlyNumWorkersAreUp( - numWorkers, - "Connect cluster did not start in time" - ); - log.info("Completed startup of {} Connect workers", numWorkers); - } - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while awaiting cluster startup", e); + int numWorkers = workers().size(); + if (numWorkers > 0) { + assertions().assertExactlyNumWorkersAreUp( + numWorkers, + "Connect cluster did not start in time" + ); + log.info("Completed startup of {} Connect workers", numWorkers); } + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while awaiting cluster startup", e); } } From cffb1e149193c9619dd1ac7c5e5228cac2d8c94c Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Wed, 12 Jun 2024 11:57:21 -0400 Subject: [PATCH 3/3] Remove unnecessary cluster startup await in OffsetsApiIntegrationTest --- .../connect/integration/OffsetsApiIntegrationTest.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java index 2da52cf9abd05..dc507b68df7c6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java @@ -143,15 +143,6 @@ private static EmbeddedConnectCluster createOrReuseConnectWithWorkerProps(Map