From d774fe2aedcb9e8c0b0085eed1819b5f8909eef6 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Wed, 12 Jun 2024 12:57:49 -0400 Subject: [PATCH] MINOR: Add readiness check for connector and separate Kafka cluster in ExactlyOnceSourceIntegrationTest::testSeparateOffsetsTopic --- .../ExactlyOnceSourceIntegrationTest.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java index 04dd4c7de67c4..4695f39a3b02f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java @@ -99,6 +99,7 @@ import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.CONNECTOR; import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.INTERVAL; import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.POLL; +import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -750,9 +751,18 @@ public void testSeparateOffsetsTopic() throws Exception { workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, globalOffsetsTopic); startConnect(); - EmbeddedKafkaCluster connectorTargetedCluster = new EmbeddedKafkaCluster(1, brokerProps); + + int numConnectorTargetedBrokers = 1; + EmbeddedKafkaCluster connectorTargetedCluster = new EmbeddedKafkaCluster(numConnectorTargetedBrokers, brokerProps); try (Closeable clusterShutdown = connectorTargetedCluster::stop) { connectorTargetedCluster.start(); + // Wait for the connector-targeted Kafka cluster to get on its feet + waitForCondition( + () -> connectorTargetedCluster.runningBrokers().size() == numConnectorTargetedBrokers, + ConnectAssertions.WORKER_SETUP_DURATION_MS, + "Separate Kafka cluster did not start in time" + ); + String topic = "test-topic"; connectorTargetedCluster.createTopic(topic, 3); @@ -780,6 +790,11 @@ public void testSeparateOffsetsTopic() throws Exception { // start a source connector connect.configureConnector(CONNECTOR_NAME, props); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + numTasks, + "connector and tasks did not start in time" + ); log.info("Waiting for records to be provided to worker by task"); // wait for the connector tasks to produce enough records