diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java index 3f169b46920a8..d49435ab7ac35 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java @@ -219,16 +219,12 @@ public void startClusters(Map additionalMM2Config) throws Except .build(); primary.start(); - primary.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Workers of " + PRIMARY_CLUSTER_ALIAS + "-connect-cluster did not start in time."); waitForTopicCreated(primary, "mm2-status.backup.internal"); waitForTopicCreated(primary, "mm2-offsets.backup.internal"); waitForTopicCreated(primary, "mm2-configs.backup.internal"); backup.start(); - backup.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Workers of " + BACKUP_CLUSTER_ALIAS + "-connect-cluster did not start in time."); primaryProducer = initializeProducer(primary); backupProducer = initializeProducer(backup); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index 3eefee64c0d1f..532ab1baaf02f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -134,11 +134,6 @@ public void setup() throws Exception { // start the clusters connect.start(); - - connect.assertions().assertAtLeastNumWorkersAreUp( - NUM_WORKERS, - "Initial group of workers did not start in time" - ); } @After diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index c540016f104bb..84ff88013fe41 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -172,9 +172,6 @@ public void testAddAndRemoveWorker() throws Exception { // set up props for the source connector Map props = defaultSourceConnectorProps(TOPIC_NAME); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - // start a source connector connect.configureConnector(CONNECTOR_NAME, props); @@ -218,9 +215,6 @@ public void testRestartFailedTask() throws Exception { props.put(TASKS_MAX_CONFIG, Objects.toString(numTasks)); props.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, "nobrokerrunningatthisaddress"); - connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - // Try to start the connector and its single task. connect.configureConnector(CONNECTOR_NAME, props); @@ -258,9 +252,6 @@ public void testBrokerCoordinator() throws Exception { // set up props for the source connector Map props = defaultSourceConnectorProps(TOPIC_NAME); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - // start a source connector connect.configureConnector(CONNECTOR_NAME, props); @@ -312,9 +303,6 @@ public void testTaskStatuses() throws Exception { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - // base connector props Map props = defaultSourceConnectorProps(TOPIC_NAME); props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName()); @@ -352,8 +340,6 @@ public void testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce .build(); connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(1, "Initial group of workers did not start in time."); - // and when the connector is not configured to create topics Map props = defaultSourceConnectorProps("nonexistenttopic"); props.remove(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG); @@ -405,9 +391,6 @@ public void testPauseStopResume() throws Exception { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - // Want to make sure to use multiple tasks final int numTasks = 4; Map props = defaultSourceConnectorProps(TOPIC_NAME); @@ -497,9 +480,6 @@ public void testStoppedState() throws Exception { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - Map props = defaultSourceConnectorProps(TOPIC_NAME); // Fail the connector on startup props.put("connector.start.inject.error", "true"); @@ -572,11 +552,6 @@ public void testTasksConfigDeprecation() throws Exception { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp( - NUM_WORKERS, - "Initial group of workers did not start in time." - ); - connect.configureConnector(CONNECTOR_NAME, defaultSourceConnectorProps(TOPIC_NAME)); connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( CONNECTOR_NAME, @@ -600,9 +575,6 @@ public void testCreateConnectorWithPausedInitialState() throws Exception { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - CreateConnectorRequest createConnectorRequest = new CreateConnectorRequest( CONNECTOR_NAME, defaultSourceConnectorProps(TOPIC_NAME), @@ -634,9 +606,6 @@ public void testCreateSourceConnectorWithStoppedInitialStateAndModifyOffsets() t // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - Map props = defaultSourceConnectorProps(TOPIC_NAME); // Configure the connector to produce a maximum of 10 messages @@ -687,9 +656,6 @@ public void testCreateSinkConnectorWithStoppedInitialStateAndModifyOffsets() thr // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - // Create topic and produce 10 messages connect.kafka().createTopic(TOPIC_NAME); for (int i = 0; i < 10; i++) { @@ -754,9 +720,6 @@ public void testDeleteConnectorCreatedWithPausedOrStoppedInitialState() throws E // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - // Create a connector with PAUSED initial state CreateConnectorRequest createConnectorRequest = new CreateConnectorRequest( CONNECTOR_NAME, @@ -806,9 +769,6 @@ public void testPatchConnectorConfig() throws Exception { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - connect.kafka().createTopic(TOPIC_NAME); Map props = defaultSinkConnectorProps(TOPIC_NAME); @@ -859,8 +819,6 @@ public void testRequestTimeouts() throws Exception { .numWorkers(1) .build(); connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(1, - "Worker did not start in time"); Map connectorConfig1 = defaultSourceConnectorProps(TOPIC_NAME); Map connectorConfig2 = new HashMap<>(connectorConfig1); @@ -927,8 +885,6 @@ public void testPollTimeoutExpiry() throws Exception { connect.start(); - connect.assertions().assertExactlyNumWorkersAreUp(1, "Worker not brought up in time"); - Map connectorWithBlockingTaskStopConfig = new HashMap<>(); connectorWithBlockingTaskStopConfig.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSourceConnector.class.getName()); connectorWithBlockingTaskStopConfig.put(TASKS_MAX_CONFIG, "1"); @@ -1008,11 +964,6 @@ public void testTasksMaxEnforcement() throws Exception { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp( - NUM_WORKERS, - "Initial group of workers did not start in time." - ); - Map connectorProps = defaultSourceConnectorProps(TOPIC_NAME); int maxTasks = 1; connectorProps.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks)); @@ -1177,11 +1128,6 @@ public void testCompactedDeletedOlderConnectorConfig() throws Exception { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp( - numWorkers, - "Initial group of workers did not start in time." - ); - final String connectorTopic = "connector-topic"; connect.kafka().createTopic(connectorTopic, 1); @@ -1214,7 +1160,7 @@ public void testCompactedDeletedOlderConnectorConfig() throws Exception { connect.assertions().assertAtLeastNumWorkersAreUp( numWorkers, - "Initial group of workers did not start in time." + "Workers did not start in time after cluster was rolled." ); final TopicPartition connectorTopicPartition = new TopicPartition(connectorTopic, 0); @@ -1270,11 +1216,6 @@ public void testReconfigureConnectorWithFailingTaskConfigs() throws Exception { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp( - numWorkers, - "Initial group of workers did not start in time." - ); - final String firstConnectorTopic = "connector-topic-1"; connect.kafka().createTopic(firstConnectorTopic); @@ -1344,11 +1285,6 @@ public void testRuntimePropertyReconfiguration() throws Exception { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp( - NUM_WORKERS, - "Initial group of workers did not start in time." - ); - final String topic = "kafka9228"; connect.kafka().createTopic(topic, 1); connect.kafka().produce(topic, "non-json-value"); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java index a0abece17d750..f09a949010c69 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java @@ -121,8 +121,6 @@ private EmbeddedConnectCluster connectClusterWithPolicy(String policy) throws In // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); return connect; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java index a512eeaae0a71..e957b97f6114d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java @@ -113,8 +113,6 @@ private void startOrReuseConnectWithNumWorkers(int numWorkers) throws Exception connect.start(); return connect; }); - connect.assertions().assertExactlyNumWorkersAreUp(numWorkers, - "Initial group of workers did not start in time."); } @After diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java index eb055ab13fb11..e9fdd91c38796 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java @@ -115,8 +115,6 @@ public void testGetActiveTopics() throws InterruptedException { connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS); connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time."); - connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(), "Active topic set is not empty for connector: " + FOO_CONNECTOR); @@ -179,8 +177,6 @@ public void testTopicTrackingResetIsDisabled() throws InterruptedException { connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS); connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time."); - connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(), "Active topic set is not empty for connector: " + FOO_CONNECTOR); @@ -235,8 +231,6 @@ public void testTopicTrackingIsDisabled() throws InterruptedException { connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS); connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time."); - // start a source connector connect.configureConnector(FOO_CONNECTOR, defaultSourceConnectorProps(FOO_TOPIC)); connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR, NUM_TASKS, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java index 1a76956be610c..30fe48116fc97 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java @@ -94,8 +94,6 @@ public void setup() throws InterruptedException { // start Connect cluster connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); // get connector handles before starting test. connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java index 04dd4c7de67c4..6d4b648201ac3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java @@ -500,9 +500,6 @@ public void testFencedLeaderRecovery() throws Exception { connectorHandle.expectedRecords(MINIMUM_MESSAGES); connectorHandle.expectedCommits(MINIMUM_MESSAGES); - // make sure the worker is actually up (otherwise, it may fence out our simulated zombie leader, instead of the other way around) - connect.assertions().assertExactlyNumWorkersAreUp(1, "Connect worker did not complete startup in time"); - // fence out the leader of the cluster Producer zombieLeader = transactionalProducer( "simulated-zombie-leader", diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java index d73d1c4ed069e..c044bb8229855 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java @@ -72,8 +72,6 @@ public void testCreateInternalTopicsWithDefaultSettings() throws InterruptedExce // Start the Connect cluster connect.start(); - connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Brokers did not start in time."); - connect.assertions().assertExactlyNumWorkersAreUp(numWorkers, "Worker did not start in time."); log.info("Completed startup of {} Kafka brokers and {} Connect workers", numBrokers, numWorkers); // Check the topics @@ -111,9 +109,6 @@ public void testCreateInternalTopicsWithFewerReplicasThanBrokers() throws Interr // Start the Connect cluster connect.start(); - connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker did not start in time."); - connect.assertions().assertAtLeastNumWorkersAreUp(numWorkers, "Worker did not start in time."); - log.info("Completed startup of {} Kafka brokers and {} Connect workers", numBrokers, numWorkers); // Check the topics log.info("Verifying the internal topics for Connect"); @@ -126,7 +121,7 @@ public void testFailToCreateInternalTopicsWithMoreReplicasThanBrokers() throws I workerProps.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, "3"); workerProps.put(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, "2"); workerProps.put(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, "1"); - int numWorkers = 1; + int numWorkers = 0; int numBrokers = 1; connect = new EmbeddedConnectCluster.Builder().name("connect-cluster-1") .workerProps(workerProps) @@ -137,11 +132,14 @@ public void testFailToCreateInternalTopicsWithMoreReplicasThanBrokers() throws I // Start the brokers and Connect, but Connect should fail to create config and offset topic connect.start(); - connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker did not start in time."); log.info("Completed startup of {} Kafka broker. Expected Connect worker to fail", numBrokers); + // Try to start a worker + connect.addWorker(); + // Verify that the offset and config topic don't exist; // the status topic may have been created if timing was right but we don't care + // TODO: Synchronously await and verify that the worker fails during startup log.info("Verifying the internal topics for Connect"); connect.assertions().assertTopicsDoNotExist(configTopic(), offsetTopic()); } @@ -169,7 +167,6 @@ public void testFailToStartWhenInternalTopicsAreNotCompacted() throws Interrupte // Start the brokers but not Connect log.info("Starting {} Kafka brokers, but no Connect workers yet", numBrokers); connect.start(); - connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker did not start in time."); log.info("Completed startup of {} Kafka broker. Expected Connect worker to fail", numBrokers); // Create the good topics @@ -243,7 +240,6 @@ public void testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefau // Start the brokers but not Connect log.info("Starting {} Kafka brokers, but no Connect workers yet", numBrokers); connect.start(); - connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker did not start in time."); log.info("Completed startup of {} Kafka broker. Expected Connect worker to fail", numBrokers); // Create the valid internal topics w/o topic settings, so these will use the broker's diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java index 2da52cf9abd05..dc507b68df7c6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java @@ -143,15 +143,6 @@ private static EmbeddedConnectCluster createOrReuseConnectWithWorkerProps(Map props = defaultSourceConnectorProps(TOPIC_NAME); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Connect workers did not start in time."); - // start a source connector connect.configureConnector(CONNECTOR_NAME, props); @@ -147,9 +144,6 @@ public void testReconfigConnector() throws Exception { // setup up props for the source connector Map props = defaultSourceConnectorProps(TOPIC_NAME); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Connect workers did not start in time."); - // start a source connector connect.configureConnector(CONNECTOR_NAME, props); @@ -194,9 +188,6 @@ public void testDeleteConnector() throws Exception { // setup up props for the source connector Map props = defaultSourceConnectorProps(TOPIC_NAME); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Connect workers did not start in time."); - // start several source connectors IntStream.range(0, 4).forEachOrdered(i -> connect.configureConnector(CONNECTOR_NAME + i, props)); @@ -221,9 +212,6 @@ public void testAddingWorker() throws Exception { // setup up props for the source connector Map props = defaultSourceConnectorProps(TOPIC_NAME); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Connect workers did not start in time."); - // start a source connector IntStream.range(0, 4).forEachOrdered(i -> connect.configureConnector(CONNECTOR_NAME + i, props)); @@ -250,9 +238,6 @@ public void testRemovingWorker() throws Exception { // setup up props for the source connector Map props = defaultSourceConnectorProps(TOPIC_NAME); - connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS, - "Connect workers did not start in time."); - // start a source connector IntStream.range(0, 4).forEachOrdered(i -> connect.configureConnector(CONNECTOR_NAME + i, props)); @@ -276,9 +261,6 @@ public void testMultipleWorkersRejoining() throws Exception { // setup up props for the source connector Map props = defaultSourceConnectorProps(TOPIC_NAME); - connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS, - "Connect workers did not start in time."); - // start a source connector IntStream.range(0, 4).forEachOrdered(i -> connect.configureConnector(CONNECTOR_NAME + i, props)); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java index a0f993f2f63a8..112dd219811fb 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java @@ -77,9 +77,6 @@ public void testRestExtensionApi() throws InterruptedException { // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, - "Initial group of workers did not start in time."); - WorkerHandle worker = connect.workers().stream() .findFirst() .orElseThrow(() -> new AssertionError("At least one worker handle should be available")); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java index 7ced24c82f8c1..8f71033b798a6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java @@ -19,9 +19,7 @@ import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; -import org.apache.kafka.connect.util.clusters.WorkerHandle; import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -98,10 +96,6 @@ public void ensureInternalEndpointIsSecured() throws Throwable { invalidSignatureHeaders.put(SIGNATURE_HEADER, "S2Fma2Flc3F1ZQ=="); invalidSignatureHeaders.put(SIGNATURE_ALGORITHM_HEADER, "HmacSHA256"); - TestUtils.waitForCondition( - () -> connect.workers().stream().allMatch(WorkerHandle::isRunning), - 30000L, "Timed out waiting for workers to start"); - // We haven't created the connector yet, but this should still return a 400 instead of a 404 // if the endpoint is secured log.info( @@ -120,9 +114,10 @@ public void ensureInternalEndpointIsSecured() throws Throwable { + "expecting 403 error response", connectorTasksEndpoint ); - TestUtils.waitForCondition( - () -> connect.requestPost(connectorTasksEndpoint, "[]", invalidSignatureHeaders).getStatus() == FORBIDDEN.getStatusCode(), - 30000L, "Timed out waiting for workers to start"); + assertEquals( + FORBIDDEN.getStatusCode(), + connect.requestPost(connectorTasksEndpoint, "[]", invalidSignatureHeaders).getStatus() + ); // Create the connector now // setup up props for the sink connector diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java index a8bfbb291ffa1..f42addf396c94 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java @@ -83,7 +83,6 @@ public void setup() throws Exception { .brokerProps(brokerProps) .build(); connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time."); } @After @@ -209,7 +208,6 @@ public void testCooperativeConsumerPartitionAssignment() throws Exception { final Collection topics = Arrays.asList(topic1, topic2, topic3); Map connectorProps = baseSinkConnectorProps(String.join(",", topics)); - // Need an eager assignor here; round robin is as good as any connectorProps.put( CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java index b35b072080257..046e5a9ccf7c2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java @@ -103,8 +103,6 @@ public void testTopicsAreCreatedWhenAutoCreateTopicsIsEnabledAtTheBroker() throw // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time."); - Map fooProps = sourceConnectorPropsWithGroups(FOO_TOPIC); // start a source connector @@ -128,8 +126,6 @@ public void testTopicsAreCreatedWhenTopicCreationIsEnabled() throws InterruptedE // start the clusters connect.start(); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time."); - Map fooProps = sourceConnectorPropsWithGroups(FOO_TOPIC); // start a source connector @@ -160,8 +156,6 @@ public void testSwitchingToTopicCreationEnabled() throws InterruptedException { connect.assertions().assertTopicSettings(BAR_TOPIC, DEFAULT_REPLICATION_FACTOR, DEFAULT_PARTITIONS, "Topic " + BAR_TOPIC + " does not have the expected settings"); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time."); - Map barProps = defaultSourceConnectorProps(BAR_TOPIC); // start a source connector with topic creation properties connect.configureConnector(BAR_CONNECTOR, barProps); @@ -195,7 +189,7 @@ public void testSwitchingToTopicCreationEnabled() throws InterruptedException { workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(true)); IntStream.range(0, 3).forEach(i -> connect.addWorker()); - connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time."); + connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Workers did not start in time after cluster was rolled."); connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR, NUM_TASKS, "Connector tasks did not start in time."); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java index 02d8c7f71b1ac..760684425df1f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java @@ -115,8 +115,6 @@ public void close() { */ @Test public void testFilterOnTopicNameWithSinkConnector() throws Exception { - assertConnectReady(); - Map observedRecords = observeRecords(); // create test topics @@ -180,12 +178,6 @@ public void testFilterOnTopicNameWithSinkConnector() throws Exception { connect.deleteConnector(CONNECTOR_NAME); } - private void assertConnectReady() throws InterruptedException { - connect.assertions().assertExactlyNumBrokersAreUp(1, "Brokers did not start in time."); - connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS, "Worker did not start in time."); - log.info("Completed startup of {} Kafka brokers and {} Connect workers", 1, NUM_WORKERS); - } - private void assertConnectorRunning() throws InterruptedException { connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector tasks did not start in time."); @@ -212,8 +204,6 @@ record -> observedRecords.compute(record.topic(), */ @Test public void testFilterOnTombstonesWithSinkConnector() throws Exception { - assertConnectReady(); - Map observedRecords = observeRecords(); // create test topics @@ -273,8 +263,6 @@ public void testFilterOnTombstonesWithSinkConnector() throws Exception { */ @Test public void testFilterOnHasHeaderKeyWithSourceConnectorAndTopicCreation() throws Exception { - assertConnectReady(); - // setup up props for the sink connector Map props = new HashMap<>(); props.put("name", CONNECTOR_NAME); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java index a37de76d1ae03..e7e268425c710 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java @@ -118,7 +118,8 @@ protected EmbeddedConnect( }; /** - * Start the connect cluster and the embedded Kafka and Zookeeper cluster. + * Start the Connect cluster and the embedded Kafka and Zookeeper cluster, + * and wait for the Kafka and Connect clusters to become healthy. */ public void start() { if (maskExitProcedures) { @@ -132,6 +133,27 @@ public void start() { } catch (Exception e) { throw new ConnectException("Failed to start HTTP client", e); } + + try { + if (numBrokers > 0) { + assertions().assertExactlyNumBrokersAreUp( + numBrokers, + "Kafka cluster did not start in time" + ); + log.info("Completed startup of {} Kafka brokers", numBrokers); + } + + int numWorkers = workers().size(); + if (numWorkers > 0) { + assertions().assertExactlyNumWorkersAreUp( + numWorkers, + "Connect cluster did not start in time" + ); + log.info("Completed startup of {} Connect workers", numWorkers); + } + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while awaiting cluster startup", e); + } } /**