From b9a3c4d74a0c48df5c2b1875adbd4eca6453f9d1 Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Tue, 14 Nov 2023 12:12:52 -0800 Subject: [PATCH 1/4] KAFKA-15816: Fix leaked sockets in runtime tests Signed-off-by: Greg Harris --- .../ConnectorTopicsIntegrationTest.java | 1 + .../ExactlyOnceSourceIntegrationTest.java | 49 ++++---- .../kafka/connect/runtime/WorkerTest.java | 116 +++++++++++++++--- .../distributed/WorkerGroupMemberTest.java | 1 + 4 files changed, 130 insertions(+), 37 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java index 0614ba8a9f741..61be2769f3b24 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java @@ -296,6 +296,7 @@ public void assertNoTopicStatusInStatusTopic() { } } } + verifiableConsumer.close(); } private Map defaultSourceConnectorProps(String topic) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java index 4eac236810a80..b793cbf020911 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java @@ -655,8 +655,9 @@ public void testTasksFailOnInabilityToFence() throws Exception { startConnect(); String topic = "test-topic"; - Admin admin = connect.kafka().createAdminClient(); - admin.createTopics(Collections.singleton(new NewTopic(topic, 3, (short) 1))).all().get(); + try (Admin admin = connect.kafka().createAdminClient()) { + admin.createTopics(Collections.singleton(new NewTopic(topic, 3, (short) 1))).all().get(); + } Map props = new HashMap<>(); int tasksMax = 2; // Use two tasks since single-task connectors don't require zombie fencing @@ -680,16 +681,18 @@ public void testTasksFailOnInabilityToFence() throws Exception { + "password=\"connector_pwd\";"); // Grant the connector's admin permissions to access the topics for its records and offsets // Intentionally leave out permissions required for fencing - admin.createAcls(Arrays.asList( - new AclBinding( - new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL), - new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) - ), - new AclBinding( - new ResourcePattern(ResourceType.TOPIC, globalOffsetsTopic, PatternType.LITERAL), - new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) - ) - )).all().get(); + try (Admin admin = connect.kafka().createAdminClient()) { + admin.createAcls(Arrays.asList( + new AclBinding( + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL), + new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) + ), + new AclBinding( + new ResourcePattern(ResourceType.TOPIC, globalOffsetsTopic, PatternType.LITERAL), + new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) + ) + )).all().get(); + } StartAndStopLatch connectorStart = connectorAndTaskStart(tasksMax); @@ -707,16 +710,18 @@ public void testTasksFailOnInabilityToFence() throws Exception { connect.assertions().assertConnectorIsRunningAndTasksHaveFailed(CONNECTOR_NAME, tasksMax, "Task should have failed on startup"); // Now grant the necessary permissions for fencing to the connector's admin - admin.createAcls(Arrays.asList( - new AclBinding( - new ResourcePattern(ResourceType.TRANSACTIONAL_ID, Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, 0), PatternType.LITERAL), - new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) - ), - new AclBinding( - new ResourcePattern(ResourceType.TRANSACTIONAL_ID, Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, 1), PatternType.LITERAL), - new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) - ) - )); + try (Admin admin = connect.kafka().createAdminClient()) { + admin.createAcls(Arrays.asList( + new AclBinding( + new ResourcePattern(ResourceType.TRANSACTIONAL_ID, Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, 0), PatternType.LITERAL), + new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) + ), + new AclBinding( + new ResourcePattern(ResourceType.TRANSACTIONAL_ID, Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, 1), PatternType.LITERAL), + new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW) + ) + )); + } log.info("Restarting connector after tweaking its ACLs; fencing should succeed this time"); connect.restartConnectorAndTasks(CONNECTOR_NAME, false, true, false); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 0de444be2025e..6a34fad6e9383 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -44,6 +44,7 @@ import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.Task; @@ -97,6 +98,7 @@ import org.mockito.MockedConstruction; import org.mockito.Mockito; import org.mockito.MockitoSession; +import org.mockito.invocation.InvocationOnMock; import org.mockito.quality.Strictness; import javax.management.MBeanServer; @@ -162,7 +164,6 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockConstructionWithAnswer; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; @@ -235,6 +236,8 @@ public class WorkerTest { private final boolean enableTopicCreation; private MockedConstruction sourceTaskMockedConstruction; + private MockedConstruction eosSourceTaskMockedConstruction; + private MockedConstruction sinkTaskMockedConstruction; private MockitoSession mockitoSession; @ParameterizedTest.Parameters @@ -293,20 +296,18 @@ public void setup() { connectorProps = anyConnectorConfigMap(); // Make calls to new WorkerSourceTask() return a mock to avoid the source task trying to connect to a broker. - sourceTaskMockedConstruction = mockConstructionWithAnswer(WorkerSourceTask.class, invocation -> { - - // provide implementations of three methods used during testing - switch (invocation.getMethod().getName()) { - case "id": - return TASK_ID; - case "loader": - return pluginLoader; - case "awaitStop": - return true; - default: - return null; - } - }); + sourceTaskMockedConstruction = Mockito.mockConstruction( + WorkerSourceTask.class, + context -> Mockito.withSettings().defaultAnswer(this::workerTaskMethod), + WorkerTest::workerTaskConstructor); + eosSourceTaskMockedConstruction = Mockito.mockConstruction( + ExactlyOnceWorkerSourceTask.class, + context -> Mockito.withSettings().defaultAnswer(this::workerTaskMethod), + WorkerTest::workerTaskConstructor); + sinkTaskMockedConstruction = Mockito.mockConstruction( + WorkerSinkTask.class, + context -> Mockito.withSettings().defaultAnswer(this::workerTaskMethod), + WorkerTest::workerTaskConstructor); } @After @@ -315,6 +316,8 @@ public void teardown() { // Ideal would be to use try-with-resources in an individual test, but it introduced a rather large level of // indentation of most test bodies, hence sticking with setup() / teardown() sourceTaskMockedConstruction.close(); + eosSourceTaskMockedConstruction.close(); + sinkTaskMockedConstruction.close(); mockitoSession.finishMocking(); } @@ -1309,48 +1312,60 @@ public void testOffsetStoreForRegularSourceConnector() { SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With no connector-specific offsets topic in the config, we should only use the worker-global offsets store ConnectorOffsetBackingStore connectorStore = worker.offsetStoreForRegularSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertFalse(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "connector-offsets-topic"); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config (whose name differs from the worker's offsets topic), we should use both a // connector-specific store and the worker-global store connectorStore = worker.offsetStoreForRegularSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, workerOffsetsTopic); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and no overridden bootstrap.servers // for the connector, we should only use a connector-specific offsets store connectorStore = worker.offsetStoreForRegularSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertFalse(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, workerBootstrapServers); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and an overridden bootstrap.servers // for the connector that exactly matches the worker's, we should only use a connector-specific offsets store connectorStore = worker.offsetStoreForRegularSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertFalse(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, "localhost:1111"); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and an overridden bootstrap.servers // for the connector that doesn't match the worker's, we should use both a connector-specific store and the worker-global store connectorStore = worker.offsetStoreForRegularSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.remove(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With no connector-specific offsets topic in the config, even with an overridden bootstrap.servers // for the connector that doesn't match the worker's, we should still only use the worker-global offsets store connectorStore = worker.offsetStoreForRegularSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertFalse(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); worker.stop(); @@ -1384,48 +1399,60 @@ public void testOffsetStoreForExactlyOnceSourceConnector() { SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With no connector-specific offsets topic in the config, we should only use a connector-specific offsets store ConnectorOffsetBackingStore connectorStore = worker.offsetStoreForExactlyOnceSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertFalse(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "connector-offsets-topic"); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config (whose name differs from the worker's offsets topic), we should use both a // connector-specific store and the worker-global store connectorStore = worker.offsetStoreForExactlyOnceSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, workerOffsetsTopic); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and no overridden bootstrap.servers // for the connector, we should only use a connector-specific store connectorStore = worker.offsetStoreForExactlyOnceSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertFalse(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, workerBootstrapServers); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and an overridden bootstrap.servers // for the connector that exactly matches the worker's, we should only use a connector-specific store connectorStore = worker.offsetStoreForExactlyOnceSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertFalse(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, "localhost:1111"); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and an overridden bootstrap.servers // for the connector that doesn't match the worker's, we should use both a connector-specific store and the worker-global store connectorStore = worker.offsetStoreForExactlyOnceSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.remove(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With no connector-specific offsets topic in the config and an overridden bootstrap.servers // for the connector that doesn't match the worker's, we should use both a connector-specific store and the worker-global store connectorStore = worker.offsetStoreForExactlyOnceSourceConnector(sourceConfig, CONNECTOR_ID, sourceConnector, null); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); worker.stop(); @@ -1467,14 +1494,17 @@ public void testOffsetStoreForRegularSourceTask() { // With no connector-specific offsets topic in the config, we should only use the worker-global store // Pass in a null topic admin to make sure that with these parameters, the method doesn't require a topic admin ConnectorOffsetBackingStore connectorStore = worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConfigWithoutOffsetsTopic, sourceConnector.getClass(), producer, producerProps, null); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertFalse(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "connector-offsets-topic"); final SourceConnectorConfig sourceConfigWithOffsetsTopic = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config (whose name differs from the worker's offsets topic), we should use both a // connector-specific store and the worker-global store connectorStore = worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConfigWithOffsetsTopic, sourceConnector.getClass(), producer, producerProps, topicAdmin); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); assertThrows(NullPointerException.class, @@ -1482,12 +1512,14 @@ public void testOffsetStoreForRegularSourceTask() { TASK_ID, sourceConfigWithOffsetsTopic, sourceConnector.getClass(), producer, producerProps, null ) ); + connectorStore.stop(); connectorProps.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, workerOffsetsTopic); final SourceConnectorConfig sourceConfigWithSameOffsetsTopicAsWorker = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and no overridden bootstrap.servers // for the connector, we should only use a connector-specific store connectorStore = worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConfigWithSameOffsetsTopicAsWorker, sourceConnector.getClass(), producer, producerProps, topicAdmin); + connectorStore.configure(config); assertFalse(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); assertThrows( @@ -1496,11 +1528,13 @@ public void testOffsetStoreForRegularSourceTask() { TASK_ID, sourceConfigWithSameOffsetsTopicAsWorker, sourceConnector.getClass(), producer, producerProps, null ) ); + connectorStore.stop(); producerProps.put(BOOTSTRAP_SERVERS_CONFIG, workerBootstrapServers); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and an overridden bootstrap.servers // for the connector that exactly matches the worker's, we should only use a connector-specific store connectorStore = worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConfigWithSameOffsetsTopicAsWorker, sourceConnector.getClass(), producer, producerProps, topicAdmin); + connectorStore.configure(config); assertFalse(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); assertThrows( @@ -1509,11 +1543,13 @@ public void testOffsetStoreForRegularSourceTask() { TASK_ID, sourceConfigWithSameOffsetsTopicAsWorker, sourceConnector.getClass(), producer, producerProps, null ) ); + connectorStore.stop(); producerProps.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:1111"); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and an overridden bootstrap.servers // for the connector that doesn't match the worker's, we should use both a connector-specific store and the worker-global store connectorStore = worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConfigWithSameOffsetsTopicAsWorker, sourceConnector.getClass(), producer, producerProps, topicAdmin); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); assertThrows( @@ -1522,14 +1558,17 @@ public void testOffsetStoreForRegularSourceTask() { TASK_ID, sourceConfigWithSameOffsetsTopicAsWorker, sourceConnector.getClass(), producer, producerProps, null ) ); + connectorStore.stop(); connectorProps.remove(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG); // With no connector-specific offsets topic in the config and an overridden bootstrap.servers // for the connector that doesn't match the worker's, we should still only use the worker-global store // Pass in a null topic admin to make sure that with these parameters, the method doesn't require a topic admin connectorStore = worker.offsetStoreForRegularSourceTask(TASK_ID, sourceConfigWithoutOffsetsTopic, sourceConnector.getClass(), producer, producerProps, null); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertFalse(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); worker.stop(); @@ -1569,48 +1608,60 @@ public void testOffsetStoreForExactlyOnceSourceTask() { producerProps.put(BOOTSTRAP_SERVERS_CONFIG, workerBootstrapServers); // With no connector-specific offsets topic in the config, we should only use a connector-specific offsets store ConnectorOffsetBackingStore connectorStore = worker.offsetStoreForExactlyOnceSourceTask(TASK_ID, sourceConfig, sourceConnector.getClass(), producer, producerProps, topicAdmin); + connectorStore.configure(config); assertFalse(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "connector-offsets-topic"); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config (whose name differs from the worker's offsets topic), we should use both a // connector-specific store and the worker-global store connectorStore = worker.offsetStoreForExactlyOnceSourceTask(TASK_ID, sourceConfig, sourceConnector.getClass(), producer, producerProps, topicAdmin); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, workerOffsetsTopic); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and no overridden bootstrap.servers // for the connector, we should only use a connector-specific store connectorStore = worker.offsetStoreForExactlyOnceSourceTask(TASK_ID, sourceConfig, sourceConnector.getClass(), producer, producerProps, topicAdmin); + connectorStore.configure(config); assertFalse(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); producerProps.put(BOOTSTRAP_SERVERS_CONFIG, workerBootstrapServers); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and an overridden bootstrap.servers // for the connector that exactly matches the worker's, we should only use a connector-specific store connectorStore = worker.offsetStoreForExactlyOnceSourceTask(TASK_ID, sourceConfig, sourceConnector.getClass(), producer, producerProps, topicAdmin); + connectorStore.configure(config); assertFalse(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); producerProps.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:1111"); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With a connector-specific offsets topic in the config whose name matches the worker's offsets topic, and an overridden bootstrap.servers // for the connector that doesn't match the worker's, we should use both a connector-specific store and the worker-global store connectorStore = worker.offsetStoreForExactlyOnceSourceTask(TASK_ID, sourceConfig, sourceConnector.getClass(), producer, producerProps, topicAdmin); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); connectorProps.remove(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); // With no connector-specific offsets topic in the config and an overridden bootstrap.servers // for the connector that doesn't match the worker's, we should use both a connector-specific store and the worker-global store connectorStore = worker.offsetStoreForExactlyOnceSourceTask(TASK_ID, sourceConfig, sourceConnector.getClass(), producer, producerProps, topicAdmin); + connectorStore.configure(config); assertTrue(connectorStore.hasWorkerGlobalStore()); assertTrue(connectorStore.hasConnectorSpecificStore()); + connectorStore.stop(); worker.stop(); @@ -1703,6 +1754,8 @@ public void testExecutorServiceShutdownWhenTerminationThrowsException() throws I assertEquals(Collections.emptySet(), worker.connectorNames()); worker.stop(); + // Clear the interrupted status so that the test infrastructure doesn't hit an unexpected interrupt. + assertTrue(Thread.interrupted()); verifyKafkaClusterId(); verify(executorService, times(1)).shutdown(); verify(executorService, times(1)).shutdownNow(); @@ -2643,6 +2696,39 @@ private Map anyConnectorConfigMap() { return props; } + /** + * This method is called in place of the constructor of WorkerTask subclasses. + * All AutoClosable objects (producers, consumers, admin clients, etc.) are closed, as their lifetimes + * are managed by the WorkerTask. While the worker task is mocked, it cannot manage the lifetimes itself. + */ + private static void workerTaskConstructor(WorkerTask mock, MockedConstruction.Context context) { + for (Object argument : context.arguments()) { + if (argument instanceof AutoCloseable) { + Utils.closeQuietly((AutoCloseable) argument, "worker task client"); + } + if (argument instanceof OffsetBackingStore) { + Utils.closeQuietly(((OffsetBackingStore) argument)::stop, "offset backing store"); + } + } + } + + /** + * This method is called in place of methods on WorkerTask subclasses. + */ + private Object workerTaskMethod(InvocationOnMock invocation) { + // provide implementations of three methods used during testing + switch (invocation.getMethod().getName()) { + case "id": + return TASK_ID; + case "loader": + return pluginLoader; + case "awaitStop": + return true; + default: + return null; + } + } + private static class TestSourceTask extends SourceTask { public TestSourceTask() { } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java index ea7ee186f807a..77d70d8fd9a7b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java @@ -91,6 +91,7 @@ public void testMetrics() throws Exception { MBeanServer server = ManagementFactory.getPlatformMBeanServer(); //verify metric exists with correct prefix assertNotNull(server.getObjectInstance(new ObjectName("kafka.connect:type=grp1,client-id=client-1"))); + member.stop(); } @Test From 9cb074ec342b210ce95e848cf72cc8589daa6101 Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Mon, 11 Dec 2023 13:23:53 -0800 Subject: [PATCH 2/4] Allow Block.resetBlockLatch to release blocked operation for end-of-test cleanup Signed-off-by: Greg Harris --- .../connect/integration/BlockingConnectorTest.java | 13 ++++++++++--- .../integration/OffsetsApiIntegrationTest.java | 2 ++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index 33614e317c490..cc3463f018b4f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -138,8 +138,8 @@ public void setup() throws Exception { @After public void close() { // stop all Connect, Kafka and Zk threads. - connect.stop(); Block.resetBlockLatch(); + connect.stop(); } @Test @@ -349,8 +349,9 @@ private void assertRequestTimesOut(String requestDescription, ThrowingRunnable r connect.requestTimeout(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); } - private static class Block { + public static class Block { private static CountDownLatch blockLatch; + private static CountDownLatch clearLatch; private final String block; @@ -392,6 +393,10 @@ public static void resetBlockLatch() { blockLatch.countDown(); blockLatch = null; } + if (clearLatch != null) { + clearLatch.countDown(); + clearLatch = null; + } } } @@ -406,6 +411,7 @@ public Block(String block) { blockLatch.countDown(); } blockLatch = new CountDownLatch(1); + clearLatch = new CountDownLatch(1); } } @@ -415,7 +421,8 @@ public void maybeBlockOn(String block) { blockLatch.countDown(); while (true) { try { - Thread.sleep(Long.MAX_VALUE); + clearLatch.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + return; } catch (InterruptedException e) { // No-op. Just keep blocking. } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java index ad2a5f168ff8d..a71b48eda9891 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java @@ -429,6 +429,7 @@ public void testAlterSinkConnectorOffsetsZombieSinkTasks() throws Exception { ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter))); assertThat(e.getMessage(), containsString("zombie sink task")); + BlockingConnectorTest.Block.resetBlockLatch(); } @Test @@ -785,6 +786,7 @@ public void testResetSinkConnectorOffsetsZombieSinkTasks() throws Exception { // Try to reset the offsets ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorOffsets(CONNECTOR_NAME)); assertThat(e.getMessage(), containsString("zombie sink task")); + BlockingConnectorTest.Block.resetBlockLatch(); } @Test From f55e673347112943ad3a959068a40f22f1f9f6f1 Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Mon, 11 Dec 2023 13:24:54 -0800 Subject: [PATCH 3/4] Fix admin and consumer resource leak in EmbeddedKafkaCluster Signed-off-by: Greg Harris --- .../util/clusters/EmbeddedKafkaCluster.java | 59 ++++++++++--------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java index 19b8090f69db0..e3694cae2911a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java @@ -554,42 +554,47 @@ public ConsumerRecords consumeAll( ) throws TimeoutException, InterruptedException, ExecutionException { long endTimeMs = System.currentTimeMillis() + maxDurationMs; - Consumer consumer = createConsumer(consumerProps != null ? consumerProps : Collections.emptyMap()); - Admin admin = createAdminClient(adminProps != null ? adminProps : Collections.emptyMap()); + long remainingTimeMs; + Set topicPartitions; + Map endOffsets; + try (Admin admin = createAdminClient(adminProps != null ? adminProps : Collections.emptyMap())) { - long remainingTimeMs = endTimeMs - System.currentTimeMillis(); - Set topicPartitions = listPartitions(remainingTimeMs, admin, Arrays.asList(topics)); + remainingTimeMs = endTimeMs - System.currentTimeMillis(); + topicPartitions = listPartitions(remainingTimeMs, admin, Arrays.asList(topics)); - remainingTimeMs = endTimeMs - System.currentTimeMillis(); - Map endOffsets = readEndOffsets(remainingTimeMs, admin, topicPartitions); + remainingTimeMs = endTimeMs - System.currentTimeMillis(); + endOffsets = readEndOffsets(remainingTimeMs, admin, topicPartitions); + } Map>> records = topicPartitions.stream() .collect(Collectors.toMap( Function.identity(), tp -> new ArrayList<>() )); - consumer.assign(topicPartitions); - - while (!endOffsets.isEmpty()) { - Iterator> it = endOffsets.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry entry = it.next(); - TopicPartition topicPartition = entry.getKey(); - long endOffset = entry.getValue(); - long lastConsumedOffset = consumer.position(topicPartition); - if (lastConsumedOffset >= endOffset) { - // We've reached the end offset for the topic partition; can stop polling it now - it.remove(); - } else { - remainingTimeMs = endTimeMs - System.currentTimeMillis(); - if (remainingTimeMs <= 0) { - throw new AssertionError("failed to read to end of topic(s) " + Arrays.asList(topics) + " within " + maxDurationMs + "ms"); + try (Consumer consumer = createConsumer(consumerProps != null ? consumerProps : Collections.emptyMap())) { + consumer.assign(topicPartitions); + + while (!endOffsets.isEmpty()) { + Iterator> it = endOffsets.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + TopicPartition topicPartition = entry.getKey(); + long endOffset = entry.getValue(); + long lastConsumedOffset = consumer.position(topicPartition); + if (lastConsumedOffset >= endOffset) { + // We've reached the end offset for the topic partition; can stop polling it now + it.remove(); + } else { + remainingTimeMs = endTimeMs - System.currentTimeMillis(); + if (remainingTimeMs <= 0) { + throw new AssertionError("failed to read to end of topic(s) " + Arrays.asList(topics) + " within " + maxDurationMs + "ms"); + } + // We haven't reached the end offset yet; need to keep polling + ConsumerRecords recordBatch = consumer.poll(Duration.ofMillis(remainingTimeMs)); + recordBatch.partitions().forEach(tp -> records.get(tp) + .addAll(recordBatch.records(tp)) + ); } - // We haven't reached the end offset yet; need to keep polling - ConsumerRecords recordBatch = consumer.poll(Duration.ofMillis(remainingTimeMs)); - recordBatch.partitions().forEach(tp -> records.get(tp) - .addAll(recordBatch.records(tp)) - ); } } } From d30d51914d479e097b0811509aa2a55d43bc930a Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Mon, 11 Dec 2023 14:14:47 -0800 Subject: [PATCH 4/4] Revert "Allow Block.resetBlockLatch to release blocked operation for end-of-test cleanup" This reverts commit 9cb074ec342b210ce95e848cf72cc8589daa6101. --- .../connect/integration/BlockingConnectorTest.java | 13 +++---------- .../integration/OffsetsApiIntegrationTest.java | 2 -- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index cc3463f018b4f..33614e317c490 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -138,8 +138,8 @@ public void setup() throws Exception { @After public void close() { // stop all Connect, Kafka and Zk threads. - Block.resetBlockLatch(); connect.stop(); + Block.resetBlockLatch(); } @Test @@ -349,9 +349,8 @@ private void assertRequestTimesOut(String requestDescription, ThrowingRunnable r connect.requestTimeout(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); } - public static class Block { + private static class Block { private static CountDownLatch blockLatch; - private static CountDownLatch clearLatch; private final String block; @@ -393,10 +392,6 @@ public static void resetBlockLatch() { blockLatch.countDown(); blockLatch = null; } - if (clearLatch != null) { - clearLatch.countDown(); - clearLatch = null; - } } } @@ -411,7 +406,6 @@ public Block(String block) { blockLatch.countDown(); } blockLatch = new CountDownLatch(1); - clearLatch = new CountDownLatch(1); } } @@ -421,8 +415,7 @@ public void maybeBlockOn(String block) { blockLatch.countDown(); while (true) { try { - clearLatch.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS); - return; + Thread.sleep(Long.MAX_VALUE); } catch (InterruptedException e) { // No-op. Just keep blocking. } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java index a71b48eda9891..ad2a5f168ff8d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java @@ -429,7 +429,6 @@ public void testAlterSinkConnectorOffsetsZombieSinkTasks() throws Exception { ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter))); assertThat(e.getMessage(), containsString("zombie sink task")); - BlockingConnectorTest.Block.resetBlockLatch(); } @Test @@ -786,7 +785,6 @@ public void testResetSinkConnectorOffsetsZombieSinkTasks() throws Exception { // Try to reset the offsets ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorOffsets(CONNECTOR_NAME)); assertThat(e.getMessage(), containsString("zombie sink task")); - BlockingConnectorTest.Block.resetBlockLatch(); } @Test