diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index da8fed5b66d34..420cf5e745ced 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -56,11 +56,14 @@ import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.FutureCallback; -import org.junit.Test; -import org.junit.runner.RunWith; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import java.util.ArrayList; import java.util.Arrays; @@ -79,12 +82,12 @@ import java.util.stream.Collectors; import static org.apache.kafka.connect.runtime.AbstractHerder.keysWithVariableValues; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -98,7 +101,8 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.withSettings; -@RunWith(MockitoJUnitRunner.StrictStubs.class) +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) public class AbstractHerderTest { private static final String CONN1 = "sourceA"; @@ -455,6 +459,7 @@ public void testConfigValidationEmptyConfig() { assertThrows(BadRequestException.class, () -> herder.validateConnectorConfig(Collections.emptyMap(), s -> null, false)); verify(transformer).transform(Collections.emptyMap()); + assertEquals(worker.getPlugins(), plugins); } @Test @@ -1050,16 +1055,16 @@ private void testConnectorPluginConfig( verify(plugins).withClassLoader(newPluginInstance.get().getClass().getClassLoader()); } - @Test(expected = NotFoundException.class) + @Test public void testGetConnectorConfigDefWithBadName() throws Exception { String connName = "AnotherPlugin"; AbstractHerder herder = testHerder(); when(worker.getPlugins()).thenReturn(plugins); when(plugins.pluginClass(anyString())).thenThrow(new ClassNotFoundException()); - herder.connectorPluginConfig(connName); + assertThrows(NotFoundException.class, () -> herder.connectorPluginConfig(connName)); } - @Test(expected = BadRequestException.class) + @Test @SuppressWarnings({"rawtypes", "unchecked"}) public void testGetConnectorConfigDefWithInvalidPluginType() throws Exception { String connName = "AnotherPlugin"; @@ -1067,7 +1072,7 @@ public void testGetConnectorConfigDefWithInvalidPluginType() throws Exception { when(worker.getPlugins()).thenReturn(plugins); when(plugins.pluginClass(anyString())).thenReturn((Class) Object.class); when(plugins.newPlugin(anyString())).thenReturn(new DirectoryConfigProvider()); - herder.connectorPluginConfig(connName); + assertThrows(BadRequestException.class, () -> herder.connectorPluginConfig(connName)); } @Test @@ -1228,7 +1233,7 @@ private void testConfigProviderRegex(String rawConnConfig) { private void testConfigProviderRegex(String rawConnConfig, boolean expected) { Set keys = keysWithVariableValues(Collections.singletonMap("key", rawConnConfig), ConfigTransformer.DEFAULT_PATTERN); boolean actual = keys != null && !keys.isEmpty() && keys.contains("key"); - assertEquals(String.format("%s should have matched regex", rawConnConfig), expected, actual); + assertEquals(expected, actual, String.format("%s should have matched regex", rawConnConfig)); } private AbstractHerder createConfigValidationHerder(Class connectorClass, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java index 62cac0d37877f..0cb4db7064726 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java @@ -55,14 +55,17 @@ import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.TopicAdmin; import org.apache.kafka.connect.util.TopicCreationGroup; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.AdditionalAnswers; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import org.mockito.stubbing.Answer; import java.nio.ByteBuffer; @@ -90,15 +93,16 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG; import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -107,7 +111,8 @@ import static org.mockito.Mockito.when; @SuppressWarnings("unchecked") -@RunWith(MockitoJUnitRunner.StrictStubs.class) +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) public class AbstractWorkerSourceTaskTest { private static final String TOPIC = "topic"; @@ -151,7 +156,7 @@ public class AbstractWorkerSourceTaskTest { private AbstractWorkerSourceTask workerTask; - @Before + @BeforeEach public void setup() { Map workerProps = workerProps(); plugins = new Plugins(workerProps); @@ -187,7 +192,7 @@ private Map sourceConnectorPropsWithGroups() { return props; } - @After + @AfterEach public void tearDown() { if (metrics != null) metrics.stop(); verifyNoMoreInteractions(statusListener); @@ -249,6 +254,7 @@ public void testSendRecordsConvertsData() { ); expectSendRecord(emptyHeaders()); + expectApplyTransformationChain(); expectTopicCreation(TOPIC); workerTask.toSend = records; @@ -269,6 +275,7 @@ public void testSendRecordsPropagatesTimestamp() { createWorkerTask(); expectSendRecord(emptyHeaders()); + expectApplyTransformationChain(); expectTopicCreation(TOPIC); workerTask.toSend = Collections.singletonList( @@ -288,7 +295,8 @@ public void testSendRecordsCorruptTimestamp() { final Long timestamp = -3L; createWorkerTask(); - expectSendRecord(emptyHeaders()); + expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC); + expectApplyTransformationChain(); workerTask.toSend = Collections.singletonList( new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp) @@ -304,6 +312,7 @@ public void testSendRecordsNoTimestamp() { createWorkerTask(); expectSendRecord(emptyHeaders()); + expectApplyTransformationChain(); expectTopicCreation(TOPIC); workerTask.toSend = Collections.singletonList( @@ -329,6 +338,7 @@ public void testHeaders() { createWorkerTask(); expectSendRecord(headers); + expectApplyTransformationChain(); expectTopicCreation(TOPIC); workerTask.toSend = Collections.singletonList( @@ -356,6 +366,7 @@ public void testHeadersWithCustomConverter() throws Exception { Collections::emptyList); expectSendRecord(null); + expectApplyTransformationChain(); expectTopicCreation(TOPIC); String stringA = "Árvíztűrő tükörfúrógép"; @@ -617,6 +628,7 @@ public void testTopicCreateSucceedsWhenCreateReturnsExistingTopicFound() { SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); expectSendRecord(emptyHeaders()); + expectApplyTransformationChain(); when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap()); when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(foundTopic(TOPIC)); @@ -641,6 +653,7 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() { SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); expectSendRecord(emptyHeaders()); + expectApplyTransformationChain(); when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap()); when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC)); @@ -666,7 +679,6 @@ public void testSendRecordsRetriableException() { SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC); - expectTaskGetTopic(); when(transformationChain.apply(any(), eq(record1))).thenReturn(null); when(transformationChain.apply(any(), eq(record2))).thenReturn(null); @@ -698,8 +710,6 @@ private void expectSendRecord(Headers headers) { if (headers != null) expectConvertHeadersAndKeyValue(headers, TOPIC); - expectApplyTransformationChain(); - expectTaskGetTopic(); } @@ -792,11 +802,15 @@ private void expectConvertHeadersAndKeyValue(Headers headers, String topic) { when(valueConverter.fromConnectData(eq(topic), any(Headers.class), eq(RECORD_SCHEMA), eq(RECORD))) .thenReturn(SERIALIZED_RECORD); + assertEquals(keyConverter.fromConnectData(topic, headers, KEY_SCHEMA, KEY), SERIALIZED_KEY); + assertEquals(valueConverter.fromConnectData(topic, headers, RECORD_SCHEMA, RECORD), SERIALIZED_RECORD); } private void expectApplyTransformationChain() { when(transformationChain.apply(any(), any(SourceRecord.class))) .thenAnswer(AdditionalAnswers.returnsSecondArg()); + SourceRecord randomString = mock(SourceRecord.class); + assertEquals(transformationChain.apply(null, randomString), randomString); } private RecordHeaders emptyHeaders() { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java index 11de95284044e..b4f53e630125d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java @@ -24,21 +24,22 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroupId; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class ConnectMetricsTest { @@ -51,12 +52,12 @@ public class ConnectMetricsTest { private ConnectMetrics metrics; - @Before + @BeforeEach public void setUp() { metrics = new ConnectMetrics("worker1", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), new MockTime(), "cluster-1"); } - @After + @AfterEach public void tearDown() { if (metrics != null) metrics.stop(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java index 6fa2808c19ea3..6092f8ca7bdc7 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java @@ -26,7 +26,8 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.predicates.Predicate; -import org.junit.Test; + +import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.HashMap; @@ -34,12 +35,12 @@ import java.util.Map; import java.util.Set; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class ConnectorConfigTest> { @@ -469,8 +470,8 @@ public void testEnrichedConfigDef() { private static void assertEnrichedConfigDef(ConfigDef def, String prefix, String keyName, ConfigDef.Type expectedType) { assertNull(def.configKeys().get(keyName)); ConfigDef.ConfigKey configKey = def.configKeys().get(prefix + keyName); - assertNotNull(prefix + keyName + "' config must be present", configKey); - assertEquals(prefix + keyName + "' config should be a " + expectedType, expectedType, configKey.type); + assertNotNull(configKey, prefix + keyName + "' config must be present"); + assertEquals(expectedType, configKey.type, prefix + keyName + "' config should be a " + expectedType); } public static class HasDuplicateConfigTransformation> implements Transformation, Versioned { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index 3aeecc1d757c3..fbb045d3976fb 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -61,23 +61,20 @@ import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.TopicAdmin; import org.apache.kafka.connect.util.TopicCreationGroup; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.time.Duration; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -98,7 +95,7 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG; import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; @@ -108,10 +105,9 @@ import static org.mockito.Mockito.when; -@RunWith(Parameterized.class) +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) public class ErrorHandlingTaskTest { - @Rule - public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS); private static final String TOPIC = "test"; private static final int PARTITION1 = 12; @@ -179,19 +175,11 @@ public class ErrorHandlingTaskTest { private ErrorHandlingMetrics errorHandlingMetrics; - private final boolean enableTopicCreation; + private boolean enableTopicCreation; - @Parameterized.Parameters - public static Collection parameters() { - return Arrays.asList(false, true); - } - public ErrorHandlingTaskTest(boolean enableTopicCreation) { + public void setup(boolean enableTopicCreation) { this.enableTopicCreation = enableTopicCreation; - } - - @Before - public void setup() { time = new MockTime(0, 0, 0); metrics = new MockConnectMetrics(); Map workerProps = new HashMap<>(); @@ -220,15 +208,17 @@ private Map sourceConnectorProps(String topic) { return props; } - @After + @AfterEach public void tearDown() { if (metrics != null) { metrics.stop(); } } - @Test - public void testErrorHandlingInSinkTasks() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testErrorHandlingInSinkTasks(boolean enableTopicCreation) { + setup(enableTopicCreation); Map reportProps = new HashMap<>(); reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); @@ -277,8 +267,10 @@ private RetryWithToleranceOperator operator() { SYSTEM, errorHandlingMetrics); } - @Test - public void testErrorHandlingInSourceTasks() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testErrorHandlingInSourceTasks(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); Map reportProps = new HashMap<>(); reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); @@ -337,8 +329,10 @@ private ConnectorConfig connConfig(Map connProps) { return new ConnectorConfig(plugins, props); } - @Test - public void testErrorHandlingInSourceTasksWithBadConverter() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testErrorHandlingInSourceTasksWithBadConverter(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); Map reportProps = new HashMap<>(); reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java index 6d5e6350357ed..be3dc2401ad69 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java @@ -51,24 +51,22 @@ import org.apache.kafka.connect.test.util.MockitoUtils; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; -import org.apache.kafka.connect.util.ParameterizedTest; import org.apache.kafka.connect.util.TopicAdmin; import org.apache.kafka.connect.util.TopicCreationGroup; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import org.mockito.stubbing.OngoingStubbing; import org.mockito.verification.VerificationMode; import java.time.Duration; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -99,11 +97,11 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG; import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; @@ -115,7 +113,8 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -@RunWith(Parameterized.class) +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.WARN) public class ExactlyOnceWorkerSourceTaskTest { private static final String TOPIC = "topic"; private static final Map PARTITION = Collections.singletonMap("key", "partition".getBytes()); @@ -157,8 +156,6 @@ public class ExactlyOnceWorkerSourceTaskTest { @Mock private Runnable preProducerCheck; @Mock private Runnable postProducerCheck; - @Rule public MockitoRule rule = MockitoJUnit.rule(); - private static final Map TASK_PROPS = new HashMap<>(); static { TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); @@ -174,23 +171,15 @@ public class ExactlyOnceWorkerSourceTaskTest { private final AtomicReference pollLatch = new AtomicReference<>(new CountDownLatch(0)); private final AtomicReference> pollRecords = new AtomicReference<>(RECORDS); - private final boolean enableTopicCreation; + private boolean enableTopicCreation; private boolean taskStarted; private Future workerTaskFuture; - @ParameterizedTest.Parameters - public static Collection parameters() { - return Arrays.asList(false, true); - } - public ExactlyOnceWorkerSourceTaskTest(boolean enableTopicCreation) { + public void setup(boolean enableTopicCreation) throws Exception { this.enableTopicCreation = enableTopicCreation; this.taskStarted = false; - } - - @Before - public void setup() throws Exception { Map workerProps = workerProps(); plugins = new Plugins(workerProps); config = new StandaloneConfig(workerProps); @@ -209,7 +198,7 @@ public void setup() throws Exception { }); } - @After + @AfterEach public void teardown() throws Exception { // In most tests, we don't really care about how many times the task got polled, // how many times we prepared to write offsets, etc. @@ -293,8 +282,10 @@ private void createWorkerTask(TargetState initialState, Converter keyConverter, sourceConfig, Runnable::run, preProducerCheck, postProducerCheck, Collections::emptyList); } - @Test - public void testRemoveMetrics() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testRemoveMetrics(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTask(); workerTask.removeMetrics(); @@ -310,8 +301,10 @@ private Set filterToTaskMetrics(Set metricNames) { .collect(Collectors.toSet()); } - @Test - public void testStartPaused() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testStartPaused(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTask(TargetState.PAUSED); final CountDownLatch pauseLatch = new CountDownLatch(1); @@ -333,8 +326,10 @@ public void testStartPaused() throws Exception { assertPollMetrics(0); } - @Test - public void testPause() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testPause(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTask(); expectSuccessfulSends(); @@ -372,8 +367,10 @@ public void testPause() throws Exception { verifyCleanShutdown(); } - @Test - public void testFailureInPreProducerCheck() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testFailureInPreProducerCheck(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTask(); Exception exception = new ConnectException("Failed to perform zombie fencing"); @@ -386,8 +383,10 @@ public void testFailureInPreProducerCheck() throws Exception { verifyShutdown(true, false); } - @Test - public void testFailureInProducerInitialization() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testFailureInProducerInitialization(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTask(); Exception exception = new ConnectException("You can't do that!"); @@ -401,8 +400,10 @@ public void testFailureInProducerInitialization() throws Exception { verifyShutdown(true, false); } - @Test - public void testFailureInPostProducerCheck() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testFailureInPostProducerCheck(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTask(); Exception exception = new ConnectException("New task configs for the connector have already been generated"); @@ -417,8 +418,10 @@ public void testFailureInPostProducerCheck() throws Exception { verifyShutdown(true, false); } - @Test - public void testFailureInOffsetStoreStart() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testFailureInOffsetStoreStart(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTask(); Exception exception = new ConnectException("No soup for you!"); @@ -434,8 +437,10 @@ public void testFailureInOffsetStoreStart() throws Exception { verifyShutdown(true, false); } - @Test - public void testPollsInBackground() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testPollsInBackground(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTask(); expectSuccessfulSends(); @@ -459,8 +464,10 @@ public void testPollsInBackground() throws Exception { assertTransactionMetrics(RECORDS.size()); } - @Test - public void testFailureInPoll() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testFailureInPoll(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTask(); final CountDownLatch pollLatch = new CountDownLatch(1); @@ -482,8 +489,10 @@ public void testFailureInPoll() throws Exception { assertPollMetrics(0); } - @Test - public void testFailureInPollAfterCancel() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testFailureInPollAfterCancel(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTask(); final CountDownLatch pollLatch = new CountDownLatch(1); @@ -508,8 +517,10 @@ public void testFailureInPollAfterCancel() throws Exception { assertPollMetrics(0); } - @Test - public void testFailureInPollAfterStop() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testFailureInPollAfterStop(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTask(); final CountDownLatch pollLatch = new CountDownLatch(1); @@ -537,8 +548,10 @@ public void testFailureInPollAfterStop() throws Exception { assertPollMetrics(0); } - @Test - public void testPollReturnsNoRecords() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testPollReturnsNoRecords(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); // Test that the task handles an empty list of records createWorkerTask(); @@ -562,8 +575,10 @@ public void testPollReturnsNoRecords() throws Exception { assertPollMetrics(0); } - @Test - public void testPollBasedCommit() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testPollBasedCommit(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); Map connectorProps = sourceConnectorProps(SourceTask.TransactionBoundary.POLL); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); @@ -590,8 +605,10 @@ public void testPollBasedCommit() throws Exception { assertTransactionMetrics(RECORDS.size()); } - @Test - public void testIntervalBasedCommit() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testIntervalBasedCommit(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); long commitInterval = 618; Map connectorProps = sourceConnectorProps(SourceTask.TransactionBoundary.INTERVAL); connectorProps.put(TRANSACTION_BOUNDARY_INTERVAL_CONFIG, Long.toString(commitInterval)); @@ -607,15 +624,16 @@ public void testIntervalBasedCommit() throws Exception { startTaskThread(); awaitPolls(2); - assertEquals("No flushes should have taken place before offset commit interval has elapsed", 0, flushCount()); + assertEquals(0, flushCount(), "No flushes should have taken place before offset commit interval has elapsed"); time.sleep(commitInterval); awaitPolls(2); - assertEquals("One flush should have taken place after offset commit interval has elapsed", 1, flushCount()); + assertEquals(1, flushCount(), "One flush should have taken place after offset commit interval has elapsed"); time.sleep(commitInterval * 2); awaitPolls(2); - assertEquals("Two flushes should have taken place after offset commit interval has elapsed again", 2, flushCount()); + assertEquals(2, flushCount(), + "Two flushes should have taken place after offset commit interval has elapsed again"); awaitShutdown(); @@ -631,27 +649,36 @@ public void testIntervalBasedCommit() throws Exception { assertTransactionMetrics(RECORDS.size() * 2); } - @Test - public void testConnectorCommitOnBatch() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testConnectorCommitOnBatch(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); testConnectorBasedCommit(TransactionContext::commitTransaction, false); } - @Test - public void testConnectorCommitOnRecord() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testConnectorCommitOnRecord(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); testConnectorBasedCommit(ctx -> ctx.commitTransaction(SOURCE_RECORD_2), false); } - @Test - public void testConnectorAbortOnBatch() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testConnectorAbortOnBatch(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); testConnectorBasedCommit(TransactionContext::abortTransaction, true); } - @Test - public void testConnectorAbortOnRecord() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testConnectorAbortOnRecord(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); testConnectorBasedCommit(ctx -> ctx.abortTransaction(SOURCE_RECORD_2), true); } private void testConnectorBasedCommit(Consumer requestCommit, boolean abort) throws Exception { + setup(enableTopicCreation); Map connectorProps = sourceConnectorProps(SourceTask.TransactionBoundary.CONNECTOR); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); createWorkerTask(); @@ -664,22 +691,22 @@ private void testConnectorBasedCommit(Consumer requestCommit startTaskThread(); awaitPolls(3); - assertEquals("No flushes should have taken place without connector requesting transaction commit", - 0, flushCount()); + assertEquals(0, flushCount(), + "No flushes should have taken place without connector requesting transaction commit"); requestCommit.accept(transactionContext); awaitPolls(3); - assertEquals("One flush should have taken place after transaction commit/abort was requested", - 1, flushCount()); + assertEquals(1, flushCount(), + "One flush should have taken place after transaction commit/abort was requested"); awaitPolls(3); - assertEquals("Only one flush should still have taken place without connector re-requesting commit/abort, even on identical records", - 1, flushCount()); + assertEquals(1, flushCount(), + "Only one flush should still have taken place without connector re-requesting commit/abort, even on identical records"); awaitShutdown(); - assertEquals("Task should have flushed offsets once based on connector-defined boundaries, and skipped final end-of-life offset commit", - 1, flushCount()); + assertEquals(1, flushCount(), + "Task should have flushed offsets once based on connector-defined boundaries, and skipped final end-of-life offset commit"); // We begin a new transaction after connector-requested aborts so that we can still write offsets for the source records that were aborted verify(producer, times(abort ? 3 : 2)).beginTransaction(); verifySends(); @@ -696,8 +723,10 @@ private void testConnectorBasedCommit(Consumer requestCommit assertTransactionMetrics(abort ? 0 : (3 * RECORDS.size())); } - @Test - public void testConnectorAbortsEmptyTransaction() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testConnectorAbortsEmptyTransaction(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); Map connectorProps = sourceConnectorProps(SourceTask.TransactionBoundary.CONNECTOR); sourceConfig = new SourceConnectorConfig(plugins, connectorProps, enableTopicCreation); createWorkerTask(); @@ -741,8 +770,10 @@ public void testConnectorAbortsEmptyTransaction() throws Exception { verifyPossibleTopicCreation(); } - @Test - public void testMixedConnectorTransactionBoundaryCommitLastRecordAbortBatch() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testMixedConnectorTransactionBoundaryCommitLastRecordAbortBatch(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); // We fail tasks that try to abort and commit a transaction for the same record or same batch // But we don't fail if they try to commit the last record of a batch and abort the entire batch // Instead, we give precedence to the record-based operation @@ -779,8 +810,10 @@ public void testMixedConnectorTransactionBoundaryCommitLastRecordAbortBatch() th verifyPossibleTopicCreation(); } - @Test - public void testMixedConnectorTransactionBoundaryAbortLastRecordCommitBatch() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testMixedConnectorTransactionBoundaryAbortLastRecordCommitBatch(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); // We fail tasks that try to abort and commit a transaction for the same record or same batch // But we don't fail if they try to abort the last record of a batch and commit the entire batch // Instead, we give precedence to the record-based operation @@ -822,8 +855,10 @@ public void testMixedConnectorTransactionBoundaryAbortLastRecordCommitBatch() th verifyPossibleTopicCreation(); } - @Test - public void testCommitFlushSyncCallbackFailure() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testCommitFlushSyncCallbackFailure(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); Exception failure = new RecordTooLargeException(); when(offsetWriter.beginFlush()).thenReturn(true); when(offsetWriter.doFlush(any())).thenAnswer(invocation -> { @@ -834,8 +869,10 @@ public void testCommitFlushSyncCallbackFailure() throws Exception { testCommitFailure(failure, false); } - @Test - public void testCommitFlushAsyncCallbackFailure() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testCommitFlushAsyncCallbackFailure(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); Exception failure = new RecordTooLargeException(); when(offsetWriter.beginFlush()).thenReturn(true); // doFlush delegates its callback to the producer, @@ -852,8 +889,10 @@ public void testCommitFlushAsyncCallbackFailure() throws Exception { testCommitFailure(failure, true); } - @Test - public void testCommitTransactionFailure() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testCommitTransactionFailure(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); Exception failure = new RecordTooLargeException(); when(offsetWriter.beginFlush()).thenReturn(true); doThrow(failure).when(producer).commitTransaction(); @@ -894,8 +933,10 @@ private void testCommitFailure(Exception commitException, boolean executeCommit) verifyShutdown(true, false); } - @Test - public void testSendRecordsRetries() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSendRecordsRetries(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTask(); // Differentiate only by Kafka partition so we can reuse conversion expectations @@ -938,8 +979,10 @@ public void testSendRecordsRetries() { verify(offsetWriter).offset(PARTITION, offset(3)); } - @Test - public void testSendRecordsProducerSendFailsImmediately() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSendRecordsProducerSendFailsImmediately(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTask(); SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, VALUE_1); @@ -961,8 +1004,10 @@ public void testSendRecordsProducerSendFailsImmediately() { verifyPossibleTopicCreation(); } - @Test - public void testSlowTaskStart() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSlowTaskStart(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); final CountDownLatch startupLatch = new CountDownLatch(1); final CountDownLatch finishStartupLatch = new CountDownLatch(1); @@ -995,8 +1040,10 @@ public void testSlowTaskStart() throws Exception { verifyCleanShutdown(); } - @Test - public void testCancel() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testCancel(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTask(); // workerTask said something dumb on twitter @@ -1205,8 +1252,8 @@ private void assertTransactionMetrics(int minimumMaxSizeExpected) { if (actualMax - actualMin <= 0.000001d) { assertEquals(actualMax, actualAvg, 0.000002d); } else { - assertTrue("Average transaction size should be greater than minimum transaction size", actualAvg > actualMin); - assertTrue("Average transaction size should be less than maximum transaction size", actualAvg < actualMax); + assertTrue(actualAvg > actualMin, "Average transaction size should be greater than minimum transaction size"); + assertTrue(actualAvg < actualMax, "Average transaction size should be less than maximum transaction size"); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/InternalSinkRecordTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/InternalSinkRecordTest.java index 28ffcfd8d0848..f76c05a005169 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/InternalSinkRecordTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/InternalSinkRecordTest.java @@ -20,17 +20,21 @@ import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.runtime.errors.ProcessingContext; import org.apache.kafka.connect.sink.SinkRecord; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.junit.MockitoJUnitRunner; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import java.util.Collections; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; -@RunWith(MockitoJUnitRunner.StrictStubs.class) +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) public class InternalSinkRecordTest { private static final String TOPIC = "test-topic"; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/LoggersTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/LoggersTest.java index 184724ef25503..3dbe688a076a2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/LoggersTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/LoggersTest.java @@ -19,13 +19,16 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; + import org.apache.log4j.Hierarchy; import org.apache.log4j.Level; import org.apache.log4j.Logger; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.junit.MockitoJUnitRunner; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import java.util.Arrays; import java.util.Collections; @@ -38,17 +41,18 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; -@RunWith(MockitoJUnitRunner.StrictStubs.class) +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) public class LoggersTest { private static final long INITIAL_TIME = 1696951712135L; private Time time; - @Before + @BeforeEach public void setup() { time = new MockTime(0, INITIAL_TIME, 0); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartPlanTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartPlanTest.java index 480ba2bae9ebd..8d6f54ce2581b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartPlanTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartPlanTest.java @@ -19,14 +19,15 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.TaskState; import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; -import org.junit.Test; + +import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.List; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class RestartPlanTest { private static final String CONNECTOR_NAME = "foo"; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartRequestTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartRequestTest.java index c4be5ca88f60e..734bfbe440808 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartRequestTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartRequestTest.java @@ -16,15 +16,15 @@ */ package org.apache.kafka.connect.runtime; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; import java.util.List; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class RestartRequestTest { private static final String CONNECTOR_NAME = "foo"; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleConverterWithHeaders.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleConverterWithHeaders.java index 562507425589c..19320736dc95c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleConverterWithHeaders.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleConverterWithHeaders.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.connect.runtime; -import java.io.UnsupportedEncodingException; -import java.util.Map; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.utils.AppInfoParser; @@ -27,6 +25,9 @@ import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.storage.Converter; +import java.io.UnsupportedEncodingException; +import java.util.Map; + /** * This is a simple Converter implementation that uses "encoding" header to encode/decode strings via provided charset name */ diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java index 251bb72fbe2d9..b3ad1db30e71d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java @@ -18,7 +18,8 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.common.config.ConfigException; -import org.junit.Test; + +import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; @@ -40,10 +41,10 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class SourceConnectorConfigTest { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java index ef962cf5b445e..158e51dc8e22c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java @@ -21,12 +21,15 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.util.ConnectorTaskId; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import java.util.HashMap; import java.util.Map; @@ -38,15 +41,16 @@ import java.util.concurrent.TimeUnit; import static java.util.Collections.singletonMap; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.StrictStubs.class) +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) public class SourceTaskOffsetCommitterTest { private final ConcurrentHashMap> committers = new ConcurrentHashMap<>(); @@ -66,7 +70,7 @@ public class SourceTaskOffsetCommitterTest { private static final long DEFAULT_OFFSET_COMMIT_INTERVAL_MS = 1000; - @Before + @BeforeEach public void setup() { Map workerProps = new HashMap<>(); workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/StateTrackerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/StateTrackerTest.java index 507d196d42982..8c5535e809abf 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/StateTrackerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/StateTrackerTest.java @@ -16,13 +16,14 @@ */ package org.apache.kafka.connect.runtime; -import org.apache.kafka.connect.runtime.AbstractStatus.State; import org.apache.kafka.common.utils.MockTime; -import org.junit.Before; -import org.junit.Test; +import org.apache.kafka.connect.runtime.AbstractStatus.State; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; public class StateTrackerTest { @@ -31,7 +32,7 @@ public class StateTrackerTest { private StateTracker tracker; private MockTime time; - @Before + @BeforeEach public void setUp() { time = new MockTime(); time.sleep(1000L); @@ -100,4 +101,4 @@ public void calculateDurations() { } -} \ No newline at end of file +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java index 39d680a7d46be..8feeee0588a0f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java @@ -17,8 +17,9 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord; -import org.junit.Before; -import org.junit.Test; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; @@ -30,9 +31,9 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class SubmittedRecordsTest { @@ -44,7 +45,7 @@ public class SubmittedRecordsTest { SubmittedRecords submittedRecords; - @Before + @BeforeEach public void setup() { submittedRecords = new SubmittedRecords(); offset = new AtomicInteger(); @@ -178,8 +179,8 @@ public void testRemoveLastSubmittedRecord() { assertEquals(Collections.emptyMap(), committableOffsets.offsets()); assertMetadata(committableOffsets, 0, 1, 1, 1, PARTITION1); - assertTrue("First attempt to remove record from submitted queue should succeed", submittedRecord.drop()); - assertFalse("Attempt to remove already-removed record from submitted queue should fail", submittedRecord.drop()); + assertTrue(submittedRecord.drop(), "First attempt to remove record from submitted queue should succeed"); + assertFalse(submittedRecord.drop(), "Attempt to remove already-removed record from submitted queue should fail"); committableOffsets = submittedRecords.committableOffsets(); // Even if SubmittedRecords::remove is broken, we haven't ack'd anything yet, so there should be no committable offsets @@ -203,7 +204,7 @@ public void testRemoveNotLastSubmittedRecord() { assertMetadata(committableOffsets, 0, 2, 2, 1, PARTITION1, PARTITION2); assertNoEmptyDeques(); - assertTrue("First attempt to remove record from submitted queue should succeed", recordToRemove.drop()); + assertTrue(recordToRemove.drop(), "First attempt to remove record from submitted queue should succeed"); committableOffsets = submittedRecords.committableOffsets(); // Even if SubmittedRecords::remove is broken, we haven't ack'd anything yet, so there should be no committable offsets @@ -265,27 +266,27 @@ public void testAwaitMessagesAfterAllRemoved() { SubmittedRecord recordToRemove1 = submittedRecords.submit(PARTITION1, newOffset()); SubmittedRecord recordToRemove2 = submittedRecords.submit(PARTITION1, newOffset()); assertFalse( - "Await should fail since neither of the in-flight records has been removed so far", - submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS) + submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS), + "Await should fail since neither of the in-flight records has been removed so far" ); recordToRemove1.drop(); assertFalse( - "Await should fail since only one of the two submitted records has been removed so far", - submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS) + submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS), + "Await should fail since only one of the two submitted records has been removed so far" ); recordToRemove1.drop(); assertFalse( + submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS), "Await should fail since only one of the two submitted records has been removed so far, " - + "even though that record has been removed twice", - submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS) + + "even though that record has been removed twice" ); recordToRemove2.drop(); assertTrue( - "Await should succeed since both submitted records have now been removed", - submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS) + submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS), + "Await should succeed since both submitted records have now been removed" ); } @@ -308,48 +309,48 @@ public void testAwaitMessagesReturnsAfterAsynchronousAck() throws Exception { }).start(); assertTrue( - "Should not have finished awaiting message delivery before either in-flight record was acknowledged", - awaitComplete.getCount() > 0 + awaitComplete.getCount() > 0, + "Should not have finished awaiting message delivery before either in-flight record was acknowledged" ); inFlightRecord1.ack(); assertTrue( - "Should not have finished awaiting message delivery before one in-flight record was acknowledged", - awaitComplete.getCount() > 0 + awaitComplete.getCount() > 0, + "Should not have finished awaiting message delivery before one in-flight record was acknowledged" ); inFlightRecord1.ack(); assertTrue( + awaitComplete.getCount() > 0, "Should not have finished awaiting message delivery before one in-flight record was acknowledged, " - + "even though the other record has been acknowledged twice", - awaitComplete.getCount() > 0 + + "even though the other record has been acknowledged twice" ); inFlightRecord2.ack(); assertTrue( - "Should have finished awaiting message delivery after both in-flight records were acknowledged", - awaitComplete.await(1, TimeUnit.SECONDS) + awaitComplete.await(1, TimeUnit.SECONDS), + "Should have finished awaiting message delivery after both in-flight records were acknowledged" ); assertTrue( - "Await of in-flight messages should have succeeded", - awaitResult.get() + awaitResult.get(), + "Await of in-flight messages should have succeeded" ); } private void assertNoRemainingDeques() { - assertEquals("Internal records map should be completely empty", Collections.emptyMap(), submittedRecords.records); + assertEquals(Collections.emptyMap(), submittedRecords.records, "Internal records map should be completely empty"); } @SafeVarargs private final void assertRemovedDeques(Map... partitions) { for (Map partition : partitions) { - assertFalse("Deque for partition " + partition + " should have been cleaned up from internal records map", submittedRecords.records.containsKey(partition)); + assertFalse(submittedRecords.records.containsKey(partition), "Deque for partition " + partition + " should have been cleaned up from internal records map"); } } private void assertNoEmptyDeques() { submittedRecords.records.forEach((partition, deque) -> - assertFalse("Empty deque for partition " + partition + " should have been cleaned up from internal records map", deque.isEmpty()) + assertFalse(deque.isEmpty(), "Empty deque for partition " + partition + " should have been cleaned up from internal records map") ); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java index 1d63d7db55f34..2a6c0ed2b9d1d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java @@ -30,7 +30,8 @@ import org.apache.kafka.connect.transforms.TimestampConverter; import org.apache.kafka.connect.transforms.TimestampRouter; import org.apache.kafka.connect.transforms.ValueToKey; -import org.junit.Test; + +import org.junit.jupiter.api.Test; import java.util.HashMap; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java index 6d6b8dcaa814f..6a4bc1816a5fe 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java @@ -19,18 +19,18 @@ import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.predicates.Predicate; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.junit.MockitoJUnitRunner; +import org.junit.jupiter.api.Test; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import static java.util.Collections.singletonMap; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.StrictStubs.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) public class TransformationStageTest { private final SourceRecord initial = new SourceRecord(singletonMap("initial", 1), null, null, null, null); @@ -52,7 +52,9 @@ private void applyAndAssert(boolean predicateResult, boolean negate, when(predicate.test(any())).thenReturn(predicateResult); @SuppressWarnings("unchecked") Transformation transformation = mock(Transformation.class); - when(transformation.apply(any())).thenReturn(transformed); + if (expectedResult == transformed) { + when(transformation.apply(any())).thenReturn(transformed); + } TransformationStage stage = new TransformationStage<>( predicate, negate, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java index dbb25cdafac15..4ad4c11ee89cd 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java @@ -20,20 +20,21 @@ import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.common.Node; import org.apache.kafka.connect.errors.ConnectException; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.mockito.MockedStatic; import org.mockito.internal.stubbing.answers.CallsRealMethods; import java.util.Arrays; import java.util.HashMap; -import java.util.Map; import java.util.List; +import java.util.Map; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.times; @@ -43,13 +44,13 @@ public class WorkerConfigTest { private static final String CLUSTER_ID = "cluster-id"; private MockedStatic workerConfigMockedStatic; - @Before + @BeforeEach public void setup() { workerConfigMockedStatic = mockStatic(WorkerConfig.class, new CallsRealMethods()); workerConfigMockedStatic.when(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID); } - @After + @AfterEach public void teardown() { workerConfigMockedStatic.close(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java index 4af49c0ea1600..c3a8f151750ec 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java @@ -18,11 +18,14 @@ import org.apache.kafka.common.config.ConfigData; import org.apache.kafka.common.config.provider.ConfigProvider; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import java.util.Collections; import java.util.HashMap; @@ -31,15 +34,16 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONFIG_RELOAD_ACTION_NONE; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.notNull; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.StrictStubs.class) +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) public class WorkerConfigTransformerTest { public static final String MY_KEY = "myKey"; @@ -60,7 +64,7 @@ public class WorkerConfigTransformerTest { private HerderRequest requestId; private WorkerConfigTransformer configTransformer; - @Before + @BeforeEach public void setup() { configTransformer = new WorkerConfigTransformer(worker, Collections.singletonMap("test", new TestConfigProvider())); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java index ab8cdb65a0cb8..1fd45e2bf00ed 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java @@ -28,30 +28,26 @@ import org.apache.kafka.connect.storage.CloseableOffsetStorageReader; import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore; import org.apache.kafka.connect.util.Callback; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.mockito.Mock; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; @@ -63,12 +59,14 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -@RunWith(Parameterized.class) +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) public class WorkerConnectorTest { private static final String VERSION = "1.1"; public static final String CONNECTOR = "connector"; public static final Map CONFIG = new HashMap<>(); + static { CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName()); CONFIG.put(ConnectorConfig.NAME_CONFIG, CONNECTOR); @@ -77,25 +75,18 @@ public class WorkerConnectorTest { public ConnectorConfig connectorConfig; public MockConnectMetrics metrics; - @Rule - public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS); @Mock private Plugins plugins; @Mock private CloseableConnectorContext ctx; @Mock private ConnectorStatus.Listener listener; @Mock private ClassLoader classLoader; - private final ConnectorType connectorType; - private final Connector connector; - private final CloseableOffsetStorageReader offsetStorageReader; - private final ConnectorOffsetBackingStore offsetStore; + private ConnectorType connectorType; + private Connector connector; + private CloseableOffsetStorageReader offsetStorageReader; + private ConnectorOffsetBackingStore offsetStore; - @Parameterized.Parameters - public static Collection parameters() { - return Arrays.asList(ConnectorType.SOURCE, ConnectorType.SINK); - } - - public WorkerConnectorTest(ConnectorType connectorType) { + private void setConnector(ConnectorType connectorType) { this.connectorType = connectorType; switch (connectorType) { case SINK: @@ -112,20 +103,22 @@ public WorkerConnectorTest(ConnectorType connectorType) { throw new IllegalStateException("Unexpected connector type: " + connectorType); } } - - @Before - public void setup() { + + public void setup(ConnectorType connectorType) { + setConnector(connectorType); connectorConfig = new ConnectorConfig(plugins, CONFIG); metrics = new MockConnectMetrics(); } - @After + @AfterEach public void tearDown() { if (metrics != null) metrics.stop(); } - @Test - public void testInitializeFailure() { + @ParameterizedTest + @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"}) + public void testInitializeFailure(ConnectorType connectorType) { + setup(connectorType); RuntimeException exception = new RuntimeException(); when(connector.version()).thenReturn(VERSION); @@ -144,8 +137,10 @@ public void testInitializeFailure() { verifyCleanShutdown(false); } - @Test - public void testFailureIsFinalState() { + @ParameterizedTest + @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"}) + public void testFailureIsFinalState(ConnectorType connectorType) { + setup(connectorType); RuntimeException exception = new RuntimeException(); when(connector.version()).thenReturn(VERSION); @@ -171,8 +166,10 @@ public void testFailureIsFinalState() { verifyNoMoreInteractions(onStateChange); } - @Test - public void testStartupAndShutdown() { + @ParameterizedTest + @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"}) + public void testStartupAndShutdown(ConnectorType connectorType) { + setup(connectorType); when(connector.version()).thenReturn(VERSION); Callback onStateChange = mockCallback(); @@ -195,8 +192,10 @@ public void testStartupAndShutdown() { verifyNoMoreInteractions(onStateChange); } - @Test - public void testStartupAndPause() { + @ParameterizedTest + @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"}) + public void testStartupAndPause(ConnectorType connectorType) { + setup(connectorType); when(connector.version()).thenReturn(VERSION); Callback onStateChange = mockCallback(); @@ -224,8 +223,10 @@ public void testStartupAndPause() { verifyNoMoreInteractions(onStateChange); } - @Test - public void testStartupAndStop() { + @ParameterizedTest + @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"}) + public void testStartupAndStop(ConnectorType connectorType) { + setup(connectorType); when(connector.version()).thenReturn(VERSION); Callback onStateChange = mockCallback(); @@ -254,8 +255,10 @@ public void testStartupAndStop() { verifyNoMoreInteractions(onStateChange); } - @Test - public void testOnResume() { + @ParameterizedTest + @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"}) + public void testOnResume(ConnectorType connectorType) { + setup(connectorType); when(connector.version()).thenReturn(VERSION); Callback onStateChange = mockCallback(); @@ -284,8 +287,10 @@ public void testOnResume() { verifyNoMoreInteractions(onStateChange); } - @Test - public void testStartupPaused() { + @ParameterizedTest + @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"}) + public void testStartupPaused(ConnectorType connectorType) { + setup(connectorType); when(connector.version()).thenReturn(VERSION); Callback onStateChange = mockCallback(); @@ -308,8 +313,10 @@ public void testStartupPaused() { verifyNoMoreInteractions(onStateChange); } - @Test - public void testStartupStopped() { + @ParameterizedTest + @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"}) + public void testStartupStopped(ConnectorType connectorType) { + setup(connectorType); when(connector.version()).thenReturn(VERSION); Callback onStateChange = mockCallback(); @@ -332,8 +339,10 @@ public void testStartupStopped() { verifyNoMoreInteractions(onStateChange); } - @Test - public void testStartupFailure() { + @ParameterizedTest + @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"}) + public void testStartupFailure(ConnectorType connectorType) { + setup(connectorType); RuntimeException exception = new RuntimeException(); when(connector.version()).thenReturn(VERSION); @@ -359,8 +368,10 @@ public void testStartupFailure() { verifyNoMoreInteractions(onStateChange); } - @Test - public void testStopFailure() { + @ParameterizedTest + @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"}) + public void testStopFailure(ConnectorType connectorType) { + setup(connectorType); RuntimeException exception = new RuntimeException(); when(connector.version()).thenReturn(VERSION); @@ -401,8 +412,10 @@ public void testStopFailure() { verifyNoMoreInteractions(listener); } - @Test - public void testShutdownFailure() { + @ParameterizedTest + @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"}) + public void testShutdownFailure(ConnectorType connectorType) { + setup(connectorType); RuntimeException exception = new RuntimeException(); when(connector.version()).thenReturn(VERSION); @@ -429,8 +442,10 @@ public void testShutdownFailure() { verifyShutdown(false, true); } - @Test - public void testTransitionStartedToStarted() { + @ParameterizedTest + @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"}) + public void testTransitionStartedToStarted(ConnectorType connectorType) { + setup(connectorType); when(connector.version()).thenReturn(VERSION); Callback onStateChange = mockCallback(); @@ -456,8 +471,10 @@ public void testTransitionStartedToStarted() { verifyNoMoreInteractions(onStateChange); } - @Test - public void testTransitionPausedToPaused() { + @ParameterizedTest + @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"}) + public void testTransitionPausedToPaused(ConnectorType connectorType) { + setup(connectorType); when(connector.version()).thenReturn(VERSION); Callback onStateChange = mockCallback(); @@ -487,8 +504,10 @@ public void testTransitionPausedToPaused() { verifyNoMoreInteractions(onStateChange); } - @Test - public void testTransitionStoppedToStopped() { + @ParameterizedTest + @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"}) + public void testTransitionStoppedToStopped(ConnectorType connectorType) { + setup(connectorType); when(connector.version()).thenReturn(VERSION); Callback onStateChange = mockCallback(); @@ -518,8 +537,10 @@ public void testTransitionStoppedToStopped() { verifyNoMoreInteractions(onStateChange); } - @Test - public void testFailConnectorThatIsNeitherSourceNorSink() { + @ParameterizedTest + @EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"}) + public void testFailConnectorThatIsNeitherSourceNorSink(ConnectorType connectorType) { + setup(connectorType); Connector badConnector = mock(Connector.class); when(badConnector.version()).thenReturn(VERSION); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, badConnector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java index 9d17b638d5c2e..9077c7fa58723 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java @@ -18,19 +18,19 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricNameTemplate; -import org.apache.kafka.common.metrics.CompoundStat; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.connect.util.ConnectorTaskId; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import java.util.HashMap; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.mock; @@ -38,7 +38,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.StrictStubs.class) +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) public class WorkerMetricsGroupTest { private final String connector = "org.FakeConnector"; private final ConnectorTaskId task = new ConnectorTaskId(connector, 0); @@ -62,7 +63,7 @@ public class WorkerMetricsGroupTest { @Mock private ConnectMetrics.MetricGroup metricGroup; @Mock private MetricName metricName; - @Before + @BeforeEach public void setup() { // We don't expect metricGroup.metricName to be invoked with null in practice, // but it's easier to test this way, and should have no impact @@ -86,8 +87,6 @@ public void setup() { private Sensor mockSensor(ConnectMetrics.MetricGroup metricGroup, String name) { Sensor sensor = mock(Sensor.class); when(metricGroup.sensor(name)).thenReturn(sensor); - when(sensor.add(any(CompoundStat.class))).thenReturn(true); - when(sensor.add(any(MetricName.class), any(CumulativeSum.class))).thenReturn(true); return sensor; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 75f942a6871d8..baa6d81aa83fe 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -16,45 +16,6 @@ */ package org.apache.kafka.connect.runtime; -import static java.util.Arrays.asList; -import static java.util.Collections.singleton; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyMap; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.regex.Pattern; -import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -95,21 +56,61 @@ import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.ConnectorTaskId; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoJUnitRunner; -import org.mockito.junit.MockitoRule; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; import org.mockito.stubbing.Answer; import org.mockito.stubbing.OngoingStubbing; -@RunWith(MockitoJUnitRunner.StrictStubs.class) +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static java.util.Arrays.asList; +import static java.util.Collections.singleton; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) public class WorkerSinkTaskTest { // These are fixed to keep this code simpler. In this example we assume byte[] raw values // with mix of integer/string in Connect @@ -170,13 +171,11 @@ public class WorkerSinkTaskTest { @Mock private ErrorHandlingMetrics errorHandlingMetrics; private final ArgumentCaptor rebalanceListener = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - @Rule - public final MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS); private long recordsReturnedTp1; private long recordsReturnedTp3; - @Before + @BeforeEach public void setUp() { time = new MockTime(); Map workerProps = new HashMap<>(); @@ -207,7 +206,7 @@ private void createTask(TargetState initialState, Converter keyConverter, Conver retryWithToleranceOperator, null, statusBackingStore, errorReportersSupplier); } - @After + @AfterEach public void tearDown() { if (metrics != null) metrics.stop(); } @@ -920,10 +919,10 @@ public void testRequestCommit() { // is the normal commit time less the two sleeps since it started each // of those sleeps were 10 seconds. // KAFKA-8229 - assertEquals("Should have only advanced by 40 seconds", - previousCommitValue + + assertEquals(previousCommitValue + (WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT - 10000L * 2), - workerTask.getNextCommit()); + workerTask.getNextCommit(), + "Should have only advanced by 40 seconds"); assertSinkMetricValue("partition-count", 2); assertSinkMetricValue("sink-record-read-total", 1.0); @@ -1110,7 +1109,7 @@ public void testLongRunningCommitWithoutTimeout() { workerTask.iteration(); // iter 3 -- commit in progress // Make sure the "committing" flag didn't immediately get flipped back to false due to an incorrect timeout - assertTrue("Expected worker to be in the process of committing offsets", workerTask.isCommitting()); + assertTrue(workerTask.isCommitting(), "Expected worker to be in the process of committing offsets"); // Delay the result of trying to commit offsets to Kafka via the consumer.commitAsync method. ArgumentCaptor offsetCommitCallbackArgumentCaptor = @@ -1196,8 +1195,8 @@ public void testSuppressCloseErrors() { workerTask.initializeAndStart(); RuntimeException thrownException = assertThrows(ConnectException.class, () -> workerTask.execute()); - assertEquals("Exception from put should be the cause", putException, thrownException.getCause()); - assertTrue("Exception from close should be suppressed", thrownException.getSuppressed().length > 0); + assertEquals(putException, thrownException.getCause(), "Exception from put should be the cause"); + assertTrue(thrownException.getSuppressed().length > 0, "Exception from close should be suppressed"); assertEquals(closeException, thrownException.getSuppressed()[0]); } @@ -1889,4 +1888,4 @@ private void assertTaskMetricValue(String name, String expected) { String measured = metrics.currentMetricValueAsString(taskGroup, name); assertEquals(expected, measured); } -} \ No newline at end of file +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index facdf9da72a1c..c91223af7ed97 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -25,33 +25,36 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; import org.apache.kafka.connect.runtime.errors.ProcessingContext; -import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest; -import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectorTaskId; -import org.apache.kafka.common.utils.MockTime; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.AdditionalAnswers; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; -import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import org.mockito.stubbing.Answer; import java.io.IOException; @@ -68,10 +71,10 @@ import java.util.function.Function; import static java.util.Collections.singletonList; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; @@ -84,7 +87,8 @@ import static org.mockito.Mockito.when; @SuppressWarnings("unchecked") -@RunWith(MockitoJUnitRunner.StrictStubs.class) +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) public class WorkerSinkTaskThreadedTest { // These are fixed to keep this code simpler. In this example we assume byte[] raw values @@ -161,7 +165,7 @@ public class WorkerSinkTaskThreadedTest { return offsetsToCommit; }; - @Before + @BeforeEach public void setup() { time = new MockTime(); metrics = new MockConnectMetrics(); @@ -178,7 +182,7 @@ public void setup() { recordsReturned = 0; } - @After + @AfterEach public void tearDown() { if (metrics != null) metrics.stop(); } @@ -241,8 +245,10 @@ public void testCommit() { expectTaskGetTopic(); expectInitialAssignment(); expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); - expectOffsetCommit(new ExpectOffsetCommitCommand( - expectedMessages, null, null, 0, true)); + ExpectOffsetCommitCommand command = new ExpectOffsetCommitCommand( + expectedMessages, null, null, 0, true); + expectPreCommit(command); + expectOffsetCommit(command); workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); @@ -276,7 +282,7 @@ public void testCommitFailure() { expectTaskGetTopic(); expectInitialAssignment(); expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); - expectOffsetCommit(new ExpectOffsetCommitCommand( + expectPreCommit(new ExpectOffsetCommitCommand( 1L, new RuntimeException(), null, 0, true)); workerTask.initialize(TASK_CONFIG); @@ -313,10 +319,12 @@ public void testCommitSuccessFollowedByFailure() { expectTaskGetTopic(); expectInitialAssignment(); expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); - expectOffsetCommit( - new ExpectOffsetCommitCommand(1L, null, null, 0, true), - new ExpectOffsetCommitCommand(2L, new RuntimeException(), null, 0, true) - ); + ExpectOffsetCommitCommand[] commands = new ExpectOffsetCommitCommand[]{ + new ExpectOffsetCommitCommand(1L, null, null, 0, true), + new ExpectOffsetCommitCommand(2L, new RuntimeException(), null, 0, true) + }; + expectPreCommit(commands); + expectOffsetCommit(commands); workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); @@ -351,8 +359,10 @@ public void testCommitConsumerFailure() { expectTaskGetTopic(); expectInitialAssignment(); expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); - expectOffsetCommit(new ExpectOffsetCommitCommand( - 1L, null, new Exception(), 0, true)); + ExpectOffsetCommitCommand command = new ExpectOffsetCommitCommand( + 1L, null, new Exception(), 0, true); + expectPreCommit(command); + expectOffsetCommit(command); workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); @@ -383,8 +393,10 @@ public void testCommitTimeout() { expectInitialAssignment(); // Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 2); - expectOffsetCommit(new ExpectOffsetCommitCommand( - 2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false)); + ExpectOffsetCommitCommand command = new ExpectOffsetCommitCommand( + 2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false); + expectPreCommit(command); + expectOffsetCommit(command); workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); @@ -626,15 +638,17 @@ private void expectRebalanceDuringPoll(long startOffset) { when(valueConverter.toConnectData(TOPIC, emptyHeaders(), RAW_VALUE)).thenReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)); } - private void expectOffsetCommit(ExpectOffsetCommitCommand... commands) { + private void expectPreCommit(ExpectOffsetCommitCommand... commands) { doAnswer(new Answer() { int index = 0; @Override public Object answer(InvocationOnMock invocation) { ExpectOffsetCommitCommand commitCommand = commands[index++]; - // All assigned partitions will have offsets committed, but we've only processed messages/updated offsets for one - final Map offsetsToCommit = offsetsToCommitFn.apply(commitCommand.expectedMessages); + // All assigned partitions will have offsets committed, but we've only processed messages/updated + // offsets for one + final Map offsetsToCommit = + offsetsToCommitFn.apply(commitCommand.expectedMessages); if (commitCommand.error != null) { throw commitCommand.error; @@ -643,7 +657,9 @@ public Object answer(InvocationOnMock invocation) { } } }).when(sinkTask).preCommit(anyMap()); - + } + + private void expectOffsetCommit(ExpectOffsetCommitCommand... commands) { doAnswer(new Answer() { int index = 0; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 00cc7b7bb6382..f55a9d66690af 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -22,26 +22,27 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.TopicAuthorizationException; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.integration.MonitorableSourceConnector; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; -import org.apache.kafka.connect.storage.ClusterConfigState; +import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest; -import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.source.SourceTaskContext; import org.apache.kafka.connect.storage.CloseableOffsetStorageReader; +import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; @@ -50,20 +51,18 @@ import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.test.util.ConcurrencyUtils; import org.apache.kafka.connect.util.ConnectorTaskId; -import org.apache.kafka.connect.util.ParameterizedTest; import org.apache.kafka.connect.util.TopicAdmin; import org.apache.kafka.connect.util.TopicCreationGroup; -import org.apache.kafka.common.utils.LogCaptureAppender; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.AdditionalAnswers; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; import org.mockito.stubbing.Answer; @@ -71,7 +70,6 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -99,12 +97,12 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG; import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -121,12 +119,11 @@ import static org.mockito.Mockito.when; @SuppressWarnings({"unchecked"}) -@RunWith(Parameterized.class) +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) public class WorkerSourceTaskTest { public static final String POLL_TIMEOUT_MSG = "Timeout waiting for poll"; - @org.junit.Rule - public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS); private static final String TOPIC = "topic"; private static final Map PARTITION = Collections.singletonMap("key", "partition".getBytes()); @@ -190,27 +187,16 @@ public class WorkerSourceTaskTest { new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD) ); - private final boolean enableTopicCreation; - - @ParameterizedTest.Parameters - public static Collection parameters() { - return Arrays.asList(false, true); - } - - public WorkerSourceTaskTest(boolean enableTopicCreation) { - this.enableTopicCreation = enableTopicCreation; - } - @Before - public void setup() { - Map workerProps = workerProps(); + public void setup(boolean enableTopicCreation) { + Map workerProps = workerProps(enableTopicCreation); plugins = new Plugins(workerProps); config = new StandaloneConfig(workerProps); sourceConfig = new SourceConnectorConfig(plugins, sourceConnectorPropsWithGroups(TOPIC), true); metrics = new MockConnectMetrics(); } - private Map workerProps() { + private Map workerProps(boolean enableTopicCreation) { Map props = new HashMap<>(); props.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); props.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); @@ -237,7 +223,7 @@ private Map sourceConnectorPropsWithGroups(String topic) { return props; } - @After + @AfterEach public void tearDown() { if (metrics != null) metrics.stop(); verifyNoMoreInteractions(statusListener); @@ -267,8 +253,10 @@ private void createWorkerTask(TargetState initialState, Converter keyConverter, retryWithToleranceOperator, statusBackingStore, Runnable::run, Collections::emptyList); } - @Test - public void testStartPaused() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testStartPaused(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); final CountDownLatch pauseLatch = new CountDownLatch(1); createWorkerTask(TargetState.PAUSED); @@ -291,8 +279,10 @@ public void testStartPaused() throws Exception { verifyClose(); } - @Test - public void testPause() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testPause(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTask(); AtomicInteger count = new AtomicInteger(0); @@ -330,8 +320,10 @@ public void testPause() throws Exception { verifyClose(); } - @Test - public void testPollsInBackground() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testPollsInBackground(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTask(); final CountDownLatch pollLatch = expectPolls(10); @@ -355,8 +347,10 @@ public void testPollsInBackground() throws Exception { verifyClose(); } - @Test - public void testFailureInPoll() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testFailureInPoll(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTask(); final CountDownLatch pollLatch = new CountDownLatch(1); @@ -386,8 +380,10 @@ public void testFailureInPoll() throws Exception { verifyClose(); } - @Test - public void testFailureInPollAfterCancel() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testFailureInPollAfterCancel(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTask(); final CountDownLatch pollLatch = new CountDownLatch(1); @@ -425,8 +421,10 @@ public void testFailureInPollAfterCancel() throws Exception { } } - @Test - public void testFailureInPollAfterStop() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testFailureInPollAfterStop(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTask(); final CountDownLatch pollLatch = new CountDownLatch(1); @@ -458,8 +456,10 @@ public void testFailureInPollAfterStop() throws Exception { verifyClose(); } - @Test - public void testPollReturnsNoRecords() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testPollReturnsNoRecords(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); // Test that the task handles an empty list of records createWorkerTask(); @@ -488,8 +488,10 @@ public void testPollReturnsNoRecords() throws Exception { verifyClose(); } - @Test - public void testCommit() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testCommit(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); // Test that the task commits properly when prompted createWorkerTask(); @@ -522,8 +524,10 @@ public void testCommit() throws Exception { verifyClose(); } - @Test - public void testCommitFailure() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testCommitFailure(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); // Test that the task commits properly when prompted createWorkerTask(); @@ -556,8 +560,10 @@ public void testCommitFailure() throws Exception { verifyClose(); } - @Test - public void testSendRecordsRetries() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSendRecordsRetries(boolean enableTopicCreation) { + setup(enableTopicCreation); createWorkerTask(); // Differentiate only by Kafka partition, so we can reuse conversion expectations @@ -589,8 +595,10 @@ public void testSendRecordsRetries() { assertNull(workerTask.toSend); } - @Test - public void testSendRecordsProducerCallbackFail() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSendRecordsProducerCallbackFail(boolean enableTopicCreation) { + setup(enableTopicCreation); createWorkerTask(); SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); @@ -608,8 +616,10 @@ public void testSendRecordsProducerCallbackFail() { verify(valueConverter, times(2)).fromConnectData(anyString(), any(Headers.class), eq(RECORD_SCHEMA), eq(RECORD)); } - @Test - public void testSendRecordsProducerSendFailsImmediately() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSendRecordsProducerSendFailsImmediately(boolean enableTopicCreation) { + setup(enableTopicCreation); createWorkerTask(); SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); @@ -625,8 +635,10 @@ public void testSendRecordsProducerSendFailsImmediately() { assertThrows(ConnectException.class, () -> workerTask.sendRecords()); } - @Test - public void testSendRecordsTaskCommitRecordFail() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSendRecordsTaskCommitRecordFail(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTask(); // Differentiate only by Kafka partition, so we can reuse conversion expectations @@ -648,8 +660,10 @@ public void testSendRecordsTaskCommitRecordFail() throws Exception { assertNull(workerTask.toSend); } - @Test - public void testSourceTaskIgnoresProducerException() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSourceTaskIgnoresProducerException(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); createWorkerTaskWithErrorToleration(); expectTopicCreation(TOPIC); @@ -686,8 +700,10 @@ public void testSourceTaskIgnoresProducerException() throws Exception { assertEquals(0, workerTask.submittedRecords.records.size()); } - @Test - public void testSlowTaskStart() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSlowTaskStart(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); final CountDownLatch startupLatch = new CountDownLatch(1); final CountDownLatch finishStartupLatch = new CountDownLatch(1); @@ -722,8 +738,10 @@ public void testSlowTaskStart() throws Exception { verifyClose(); } - @Test - public void testCancel() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testCancel(boolean enableTopicCreation) { + setup(enableTopicCreation); createWorkerTask(); workerTask.cancel(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java index 859ec64f656a7..c8c8cc49d0565 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.TaskStatus.Listener; @@ -27,13 +28,15 @@ import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectorTaskId; -import org.apache.kafka.common.utils.MockTime; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import java.util.ArrayList; import java.util.HashMap; @@ -42,15 +45,16 @@ import java.util.concurrent.CountDownLatch; import java.util.function.Supplier; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.StrictStubs.class) +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) public class WorkerTaskTest { private static final Map TASK_PROPS = new HashMap<>(); @@ -68,12 +72,12 @@ public class WorkerTaskTest { @Mock private TransformationChain transformationChain; @Mock private Supplier>> errorReportersSupplier; - @Before + @BeforeEach public void setup() { metrics = new MockConnectMetrics(); } - @After + @AfterEach public void tearDown() { if (metrics != null) metrics.stop(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 4c5a04533e226..bd7b980b4089d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -85,14 +85,12 @@ import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.FutureCallback; -import org.apache.kafka.connect.util.ParameterizedTest; import org.apache.kafka.connect.util.SinkUtils; import org.apache.kafka.connect.util.TopicAdmin; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.AdditionalAnswers; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; @@ -103,8 +101,6 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.quality.Strictness; -import javax.management.MBeanServer; -import javax.management.ObjectName; import java.lang.management.ManagementFactory; import java.util.Arrays; import java.util.Collection; @@ -124,6 +120,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import javax.management.MBeanServer; +import javax.management.ObjectName; + import static org.apache.kafka.clients.admin.AdminClientConfig.RETRY_BACKOFF_MS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG; @@ -148,14 +147,14 @@ import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.ArgumentMatchers.anyLong; @@ -176,7 +175,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -@RunWith(Parameterized.class) public class WorkerTest { private static final String CONNECTOR_ID = "test-connector"; @@ -238,24 +236,14 @@ public class WorkerTest { private String mockFileProviderTestId; private Map connectorProps; - private final boolean enableTopicCreation; private MockedConstruction sourceTaskMockedConstruction; private MockedConstruction eosSourceTaskMockedConstruction; private MockedConstruction sinkTaskMockedConstruction; private MockitoSession mockitoSession; - @ParameterizedTest.Parameters - public static Collection parameters() { - return Arrays.asList(false, true); - } - public WorkerTest(boolean enableTopicCreation) { - this.enableTopicCreation = enableTopicCreation; - } - - @Before - public void setup() { + public void setup(boolean enableTopicCreation) { // Use strict mode to detect unused mocks mockitoSession = Mockito.mockitoSession() .initMocks(this) @@ -315,7 +303,7 @@ public void setup() { WorkerTest::workerTaskConstructor); } - @After + @AfterEach public void teardown() { // Critical to always close MockedStatics // Ideal would be to use try-with-resources in an individual test, but it introduced a rather large level of @@ -327,8 +315,10 @@ public void teardown() { mockitoSession.finishMocking(); } - @Test - public void testStartAndStopConnector() throws Throwable { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testStartAndStopConnector(boolean enableTopicCreation) throws Throwable { + setup(enableTopicCreation); final String connectorClass = SampleSourceConnector.class.getName(); connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass); @@ -394,8 +384,10 @@ private void mockFileConfigProvider() { .thenReturn(mockFileConfigProvider); } - @Test - public void testStartConnectorFailure() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testStartConnectorFailure(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); final String nonConnectorClass = "java.util.HashMap"; connectorProps.put(CONNECTOR_CLASS_CONFIG, nonConnectorClass); // Bad connector class name @@ -434,8 +426,10 @@ public void testStartConnectorFailure() throws Exception { verify(connectorStatusListener).onFailure(eq(CONNECTOR_ID), any(ConnectException.class)); } - @Test - public void testAddConnectorByAlias() throws Throwable { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testAddConnectorByAlias(boolean enableTopicCreation) throws Throwable { + setup(enableTopicCreation); final String connectorAlias = "SampleSourceConnector"; mockKafkaClusterId(); mockConnectorIsolation(connectorAlias, sinkConnector); @@ -477,8 +471,10 @@ public void testAddConnectorByAlias() throws Throwable { verify(ctx).close(); } - @Test - public void testAddConnectorByShortAlias() throws Throwable { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testAddConnectorByShortAlias(boolean enableTopicCreation) throws Throwable { + setup(enableTopicCreation); final String shortConnectorAlias = "WorkerTest"; mockKafkaClusterId(); @@ -519,8 +515,10 @@ public void testAddConnectorByShortAlias() throws Throwable { verifyExecutorSubmit(); } - @Test - public void testStopInvalidConnector() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testStopInvalidConnector(boolean enableTopicCreation) { + setup(enableTopicCreation); mockKafkaClusterId(); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); worker.herder = herder; @@ -532,8 +530,10 @@ public void testStopInvalidConnector() { verifyConverters(); } - @Test - public void testReconfigureConnectorTasks() throws Throwable { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testReconfigureConnectorTasks(boolean enableTopicCreation) throws Throwable { + setup(enableTopicCreation); final String connectorClass = SampleSourceConnector.class.getName(); mockKafkaClusterId(); @@ -606,8 +606,10 @@ public void testReconfigureConnectorTasks() throws Throwable { verify(ctx).close(); } - @Test - public void testAddRemoveSourceTask() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testAddRemoveSourceTask(boolean enableTopicCreation) { + setup(enableTopicCreation); mockKafkaClusterId(); mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, task); mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter); @@ -658,8 +660,10 @@ public void testAddRemoveSourceTask() { verifyExecutorSubmit(); } - @Test - public void testAddRemoveSinkTask() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testAddRemoveSinkTask(boolean enableTopicCreation) { + setup(enableTopicCreation); // Most of the other cases use source tasks; we make sure to get code coverage for sink tasks here as well SinkTask task = mock(TestSinkTask.class); mockKafkaClusterId(); @@ -713,8 +717,10 @@ public void testAddRemoveSinkTask() { verifyExecutorSubmit(); } - @Test - public void testAddRemoveExactlyOnceSourceTask() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testAddRemoveExactlyOnceSourceTask(boolean enableTopicCreation) { + setup(enableTopicCreation); Map workerProps = new HashMap<>(); workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); @@ -785,8 +791,10 @@ public void testAddRemoveExactlyOnceSourceTask() { verifyExecutorSubmit(); } - @Test - public void testTaskStatusMetricsStatuses() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTaskStatusMetricsStatuses(boolean enableTopicCreation) { + setup(enableTopicCreation); mockInternalConverters(); mockStorage(); mockFileConfigProvider(); @@ -867,8 +875,10 @@ public void testTaskStatusMetricsStatuses() { verifyTaskHeaderConverter(); } - @Test - public void testConnectorStatusMetricsGroup_taskStatusCounter() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testConnectorStatusMetricsGroup_taskStatusCounter(boolean enableTopicCreation) { + setup(enableTopicCreation); ConcurrentMap> tasks = new ConcurrentHashMap<>(); tasks.put(new ConnectorTaskId("c1", 0), mock(WorkerSourceTask.class)); tasks.put(new ConnectorTaskId("c1", 1), mock(WorkerSourceTask.class)); @@ -896,8 +906,10 @@ public void testConnectorStatusMetricsGroup_taskStatusCounter() { verifyKafkaClusterId(); } - @Test - public void testStartTaskFailure() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testStartTaskFailure(boolean enableTopicCreation) { + setup(enableTopicCreation); mockInternalConverters(); mockFileConfigProvider(); @@ -924,8 +936,10 @@ public void testStartTaskFailure() { verifyGenericIsolation(); } - @Test - public void testCleanupTasksOnStop() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testCleanupTasksOnStop(boolean enableTopicCreation) { + setup(enableTopicCreation); mockInternalConverters(); mockStorage(); mockFileConfigProvider(); @@ -969,8 +983,10 @@ public void testCleanupTasksOnStop() { verifyExecutorSubmit(); } - @Test - public void testConverterOverrides() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testConverterOverrides(boolean enableTopicCreation) { + setup(enableTopicCreation); mockInternalConverters(); mockStorage(); mockFileConfigProvider(); @@ -1023,8 +1039,10 @@ public void testConverterOverrides() { verifyStorage(); } - @Test - public void testProducerConfigsWithoutOverrides() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testProducerConfigsWithoutOverrides(boolean enableTopicCreation) { + setup(enableTopicCreation); when(connectorConfig.originalsWithPrefix(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).thenReturn(new HashMap<>()); Map expectedConfigs = new HashMap<>(defaultProducerConfigs); expectedConfigs.put("client.id", "connector-producer-job-0"); @@ -1034,8 +1052,10 @@ public void testProducerConfigsWithoutOverrides() { verify(connectorConfig).originalsWithPrefix(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX); } - @Test - public void testProducerConfigsWithOverrides() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testProducerConfigsWithOverrides(boolean enableTopicCreation) { + setup(enableTopicCreation); Map props = new HashMap<>(workerProps); props.put("producer.acks", "-1"); props.put("producer.linger.ms", "1000"); @@ -1055,8 +1075,10 @@ public void testProducerConfigsWithOverrides() { verify(connectorConfig).originalsWithPrefix(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX); } - @Test - public void testProducerConfigsWithClientOverrides() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testProducerConfigsWithClientOverrides(boolean enableTopicCreation) { + setup(enableTopicCreation); Map props = new HashMap<>(workerProps); props.put("producer.acks", "-1"); props.put("producer.linger.ms", "1000"); @@ -1081,8 +1103,10 @@ public void testProducerConfigsWithClientOverrides() { verify(connectorConfig).originalsWithPrefix(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX); } - @Test - public void testConsumerConfigsWithoutOverrides() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testConsumerConfigsWithoutOverrides(boolean enableTopicCreation) { + setup(enableTopicCreation); Map expectedConfigs = new HashMap<>(defaultConsumerConfigs); expectedConfigs.put("group.id", "connect-test-connector"); expectedConfigs.put("client.id", "connector-consumer-job-0"); @@ -1094,8 +1118,10 @@ public void testConsumerConfigsWithoutOverrides() { null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID, ConnectorType.SINK)); } - @Test - public void testConsumerConfigsWithOverrides() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testConsumerConfigsWithOverrides(boolean enableTopicCreation) { + setup(enableTopicCreation); Map props = new HashMap<>(workerProps); props.put("consumer.group.id", "connect-test"); props.put("consumer.auto.offset.reset", "latest"); @@ -1117,8 +1143,10 @@ public void testConsumerConfigsWithOverrides() { verify(connectorConfig).originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX); } - @Test - public void testConsumerConfigsWithClientOverrides() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testConsumerConfigsWithClientOverrides(boolean enableTopicCreation) { + setup(enableTopicCreation); Map props = new HashMap<>(workerProps); props.put("consumer.auto.offset.reset", "latest"); props.put("consumer.max.poll.records", "5000"); @@ -1143,8 +1171,10 @@ public void testConsumerConfigsWithClientOverrides() { verify(connectorConfig).originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX); } - @Test - public void testConsumerConfigsClientOverridesWithNonePolicy() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testConsumerConfigsClientOverridesWithNonePolicy(boolean enableTopicCreation) { + setup(enableTopicCreation); Map props = new HashMap<>(workerProps); props.put("consumer.auto.offset.reset", "latest"); props.put("consumer.max.poll.records", "5000"); @@ -1160,8 +1190,10 @@ public void testConsumerConfigsClientOverridesWithNonePolicy() { verify(connectorConfig).originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX); } - @Test - public void testAdminConfigsClientOverridesWithAllPolicy() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testAdminConfigsClientOverridesWithAllPolicy(boolean enableTopicCreation) { + setup(enableTopicCreation); Map props = new HashMap<>(workerProps); props.put("admin.client.id", "testid"); props.put("admin.metadata.max.age.ms", "5000"); @@ -1186,8 +1218,10 @@ public void testAdminConfigsClientOverridesWithAllPolicy() { verify(connectorConfig).originalsWithPrefix(CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX); } - @Test - public void testAdminConfigsClientOverridesWithNonePolicy() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testAdminConfigsClientOverridesWithNonePolicy(boolean enableTopicCreation) { + setup(enableTopicCreation); Map props = new HashMap<>(workerProps); props.put("admin.client.id", "testid"); props.put("admin.metadata.max.age.ms", "5000"); @@ -1201,8 +1235,10 @@ public void testAdminConfigsClientOverridesWithNonePolicy() { verify(connectorConfig).originalsWithPrefix(CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX); } - @Test - public void testRegularSourceOffsetsConsumerConfigs() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testRegularSourceOffsetsConsumerConfigs(boolean enableTopicCreation) { + setup(enableTopicCreation); final Map connectorConsumerOverrides = new HashMap<>(); when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).thenReturn(connectorConsumerOverrides); @@ -1240,8 +1276,10 @@ public void testRegularSourceOffsetsConsumerConfigs() { assertEquals("read_uncommitted", consumerConfigs.get(ISOLATION_LEVEL_CONFIG)); } - @Test - public void testExactlyOnceSourceOffsetsConsumerConfigs() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testExactlyOnceSourceOffsetsConsumerConfigs(boolean enableTopicCreation) { + setup(enableTopicCreation); final Map connectorConsumerOverrides = new HashMap<>(); when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).thenReturn(connectorConsumerOverrides); @@ -1279,8 +1317,10 @@ public void testExactlyOnceSourceOffsetsConsumerConfigs() { assertEquals("read_committed", consumerConfigs.get(ISOLATION_LEVEL_CONFIG)); } - @Test - public void testExactlyOnceSourceTaskProducerConfigs() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testExactlyOnceSourceTaskProducerConfigs(boolean enableTopicCreation) { + setup(enableTopicCreation); final Map connectorProducerOverrides = new HashMap<>(); when(connectorConfig.originalsWithPrefix(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).thenReturn(connectorProducerOverrides); @@ -1335,8 +1375,10 @@ public void testExactlyOnceSourceTaskProducerConfigs() { assertEquals(transactionalId, producerConfigs.get(TRANSACTIONAL_ID_CONFIG)); } - @Test - public void testOffsetStoreForRegularSourceConnector() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testOffsetStoreForRegularSourceConnector(boolean enableTopicCreation) { + setup(enableTopicCreation); mockInternalConverters(); mockFileConfigProvider(); @@ -1422,8 +1464,10 @@ public void testOffsetStoreForRegularSourceConnector() { verifyKafkaClusterId(); } - @Test - public void testOffsetStoreForExactlyOnceSourceConnector() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testOffsetStoreForExactlyOnceSourceConnector(boolean enableTopicCreation) { + setup(enableTopicCreation); mockInternalConverters(); mockFileConfigProvider(); @@ -1509,8 +1553,10 @@ public void testOffsetStoreForExactlyOnceSourceConnector() { verifyKafkaClusterId(); } - @Test - public void testOffsetStoreForRegularSourceTask() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testOffsetStoreForRegularSourceTask(boolean enableTopicCreation) { + setup(enableTopicCreation); mockInternalConverters(); mockFileConfigProvider(); @@ -1625,8 +1671,10 @@ public void testOffsetStoreForRegularSourceTask() { verifyKafkaClusterId(); } - @Test - public void testOffsetStoreForExactlyOnceSourceTask() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testOffsetStoreForExactlyOnceSourceTask(boolean enableTopicCreation) { + setup(enableTopicCreation); mockInternalConverters(); mockFileConfigProvider(); @@ -1718,8 +1766,10 @@ public void testOffsetStoreForExactlyOnceSourceTask() { verifyKafkaClusterId(); } - @Test - public void testWorkerMetrics() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testWorkerMetrics(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); mockKafkaClusterId(); mockInternalConverters(); mockFileConfigProvider(); @@ -1749,8 +1799,10 @@ public void testWorkerMetrics() throws Exception { verifyKafkaClusterId(); } - @Test - public void testExecutorServiceShutdown() throws InterruptedException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testExecutorServiceShutdown(boolean enableTopicCreation) throws InterruptedException { + setup(enableTopicCreation); mockKafkaClusterId(); ExecutorService executorService = mock(ExecutorService.class); doNothing().when(executorService).shutdown(); @@ -1770,8 +1822,10 @@ public void testExecutorServiceShutdown() throws InterruptedException { } - @Test - public void testExecutorServiceShutdownWhenTerminationFails() throws InterruptedException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testExecutorServiceShutdownWhenTerminationFails(boolean enableTopicCreation) throws InterruptedException { + setup(enableTopicCreation); mockKafkaClusterId(); ExecutorService executorService = mock(ExecutorService.class); doNothing().when(executorService).shutdown(); @@ -1791,8 +1845,10 @@ public void testExecutorServiceShutdownWhenTerminationFails() throws Interrupted } - @Test - public void testExecutorServiceShutdownWhenTerminationThrowsException() throws InterruptedException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testExecutorServiceShutdownWhenTerminationThrowsException(boolean enableTopicCreation) throws InterruptedException { + setup(enableTopicCreation); mockKafkaClusterId(); ExecutorService executorService = mock(ExecutorService.class); doNothing().when(executorService).shutdown(); @@ -1813,9 +1869,11 @@ public void testExecutorServiceShutdownWhenTerminationThrowsException() throws I verifyNoMoreInteractions(executorService); } - @Test + @ParameterizedTest + @ValueSource(booleans = {true, false}) @SuppressWarnings("unchecked") - public void testZombieFencing() { + public void testZombieFencing(boolean enableTopicCreation) { + setup(enableTopicCreation); Admin admin = mock(Admin.class); AtomicReference> adminConfig = new AtomicReference<>(); Function, Admin> mockAdminConstructor = actualAdminConfig -> { @@ -1845,17 +1903,19 @@ public void testZombieFencing() { assertEquals(expectedZombieFenceFuture, actualZombieFenceFuture); assertNotNull(adminConfig.get()); - assertEquals("Admin should be configured with user-specified overrides", - "4761", - adminConfig.get().get(RETRY_BACKOFF_MS_CONFIG) + assertEquals("4761", + adminConfig.get().get(RETRY_BACKOFF_MS_CONFIG), + "Admin should be configured with user-specified overrides" ); verifyKafkaClusterId(); verifyGenericIsolation(); } - @Test - public void testGetSinkConnectorOffsets() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testGetSinkConnectorOffsets(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); mockKafkaClusterId(); String connectorClass = SampleSinkConnector.class.getName(); @@ -1881,8 +1941,10 @@ public void testGetSinkConnectorOffsets() throws Exception { verifyKafkaClusterId(); } - @Test - public void testGetSinkConnectorOffsetsAdminClientSynchronousError() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testGetSinkConnectorOffsetsAdminClientSynchronousError(boolean enableTopicCreation) { + setup(enableTopicCreation); mockKafkaClusterId(); String connectorClass = SampleSinkConnector.class.getName(); @@ -1905,8 +1967,10 @@ public void testGetSinkConnectorOffsetsAdminClientSynchronousError() { verifyKafkaClusterId(); } - @Test - public void testGetSinkConnectorOffsetsAdminClientAsynchronousError() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testGetSinkConnectorOffsetsAdminClientAsynchronousError(boolean enableTopicCreation) { + setup(enableTopicCreation); mockKafkaClusterId(); String connectorClass = SampleSinkConnector.class.getName(); @@ -1950,8 +2014,10 @@ private void mockAdminListConsumerGroupOffsets(Admin admin, Map, Map> offsets = Collections.singletonMap( Collections.singletonMap("filename", "/path/to/filename"), Collections.singletonMap("position", 20) @@ -2144,8 +2220,10 @@ public void testNormalizeSourceConnectorOffsets() throws Exception { assertInstanceOf(Long.class, normalizedOffsets.values().iterator().next().get("position")); } - @Test - public void testAlterOffsetsSinkConnectorNoDeletes() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testAlterOffsetsSinkConnectorNoDeletes(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); @SuppressWarnings("unchecked") ArgumentCaptor> alterOffsetsMapCapture = ArgumentCaptor.forClass(Map.class); Map, Map> partitionOffsets = new HashMap<>(); @@ -2166,8 +2244,10 @@ public void testAlterOffsetsSinkConnectorNoDeletes() throws Exception { assertEquals(100, alterOffsetsMapCapture.getValue().get(new TopicPartition("test_topic", 20)).offset()); } - @Test - public void testAlterOffsetSinkConnectorOnlyDeletes() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testAlterOffsetSinkConnectorOnlyDeletes(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); @SuppressWarnings("unchecked") ArgumentCaptor> deleteOffsetsSetCapture = ArgumentCaptor.forClass(Set.class); Map, Map> partitionOffsets = new HashMap<>(); @@ -2191,8 +2271,10 @@ public void testAlterOffsetSinkConnectorOnlyDeletes() throws Exception { assertEquals(expectedTopicPartitionsForOffsetDelete, deleteOffsetsSetCapture.getValue()); } - @Test - public void testAlterOffsetsSinkConnectorAltersAndDeletes() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testAlterOffsetsSinkConnectorAltersAndDeletes(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); @SuppressWarnings("unchecked") ArgumentCaptor> alterOffsetsMapCapture = ArgumentCaptor.forClass(Map.class); @SuppressWarnings("unchecked") @@ -2260,8 +2342,10 @@ private void alterOffsetsSinkConnector(Map, Map> parti verifyKafkaClusterId(); } - @Test - public void testAlterOffsetsSinkConnectorAlterOffsetsError() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testAlterOffsetsSinkConnectorAlterOffsetsError(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); mockKafkaClusterId(); String connectorClass = SampleSinkConnector.class.getName(); connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass); @@ -2299,8 +2383,10 @@ public void testAlterOffsetsSinkConnectorAlterOffsetsError() throws Exception { verifyKafkaClusterId(); } - @Test - public void testAlterOffsetsSinkConnectorDeleteOffsetsError() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testAlterOffsetsSinkConnectorDeleteOffsetsError(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); mockKafkaClusterId(); String connectorClass = SampleSinkConnector.class.getName(); connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass); @@ -2348,8 +2434,10 @@ public void testAlterOffsetsSinkConnectorDeleteOffsetsError() throws Exception { verifyKafkaClusterId(); } - @Test - public void testAlterOffsetsSinkConnectorSynchronousError() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testAlterOffsetsSinkConnectorSynchronousError(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); mockKafkaClusterId(); String connectorClass = SampleSinkConnector.class.getName(); connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass); @@ -2383,9 +2471,11 @@ public void testAlterOffsetsSinkConnectorSynchronousError() throws Exception { verifyKafkaClusterId(); } - @Test + @ParameterizedTest + @ValueSource(booleans = {true, false}) @SuppressWarnings("unchecked") - public void testResetOffsetsSourceConnectorExactlyOnceSupportEnabled() throws Exception { + public void testResetOffsetsSourceConnectorExactlyOnceSupportEnabled(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); Map workerProps = new HashMap<>(this.workerProps); workerProps.put("exactly.once.source.support", "enabled"); workerProps.put("bootstrap.servers", "localhost:9092"); @@ -2432,8 +2522,10 @@ public void testResetOffsetsSourceConnectorExactlyOnceSupportEnabled() throws Ex verifyKafkaClusterId(); } - @Test - public void testResetOffsetsSinkConnector() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testResetOffsetsSinkConnector(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); mockKafkaClusterId(); String connectorClass = SampleSinkConnector.class.getName(); connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass); @@ -2473,8 +2565,10 @@ public void testResetOffsetsSinkConnector() throws Exception { verifyKafkaClusterId(); } - @Test - public void testResetOffsetsSinkConnectorDeleteConsumerGroupError() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testResetOffsetsSinkConnectorDeleteConsumerGroupError(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); mockKafkaClusterId(); String connectorClass = SampleSinkConnector.class.getName(); connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass); @@ -2507,9 +2601,11 @@ public void testResetOffsetsSinkConnectorDeleteConsumerGroupError() throws Excep verifyKafkaClusterId(); } - @Test + @ParameterizedTest + @ValueSource(booleans = {true, false}) @SuppressWarnings("unchecked") - public void testModifySourceConnectorOffsetsTimeout() throws Exception { + public void testModifySourceConnectorOffsetsTimeout(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); mockKafkaClusterId(); Time time = new MockTime(); mockInternalConverters(); @@ -2542,8 +2638,10 @@ public void testModifySourceConnectorOffsetsTimeout() throws Exception { verifyKafkaClusterId(); } - @Test - public void testModifyOffsetsSinkConnectorTimeout() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testModifyOffsetsSinkConnectorTimeout(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); mockKafkaClusterId(); String connectorClass = SampleSinkConnector.class.getName(); connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass); @@ -2572,13 +2670,17 @@ public void testModifyOffsetsSinkConnectorTimeout() throws Exception { verifyKafkaClusterId(); } - @Test - public void testConnectorGeneratesTooManyTasksButMaxNotEnforced() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testConnectorGeneratesTooManyTasksButMaxNotEnforced(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); testConnectorGeneratesTooManyTasks(false); } - @Test - public void testConnectorGeneratesTooManyTasksAndMaxEnforced() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testConnectorGeneratesTooManyTasksAndMaxEnforced(boolean enableTopicCreation) throws Exception { + setup(enableTopicCreation); testConnectorGeneratesTooManyTasks(true); } @@ -2689,13 +2791,17 @@ private void testConnectorGeneratesTooManyTasks(boolean enforced) throws Excepti worker.stop(); } - @Test - public void testStartTaskWithTooManyTaskConfigsButMaxNotEnforced() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testStartTaskWithTooManyTaskConfigsButMaxNotEnforced(boolean enableTopicCreation) { + setup(enableTopicCreation); testStartTaskWithTooManyTaskConfigs(false); } - @Test - public void testStartTaskWithTooManyTaskConfigsAndMaxEnforced() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testStartTaskWithTooManyTaskConfigsAndMaxEnforced(boolean enableTopicCreation) { + setup(enableTopicCreation); testStartTaskWithTooManyTaskConfigs(true); } @@ -2751,11 +2857,8 @@ private void testStartTaskWithTooManyTaskConfigs(boolean enforced) { ArgumentCaptor failureCaptor = ArgumentCaptor.forClass(Throwable.class); verify(taskStatusListener, times(1)).onFailure(eq(TASK_ID), failureCaptor.capture()); - assertTrue( - "Expected task start exception to be TooManyTasksException, but was " - + failureCaptor.getValue().getClass() + " instead", - failureCaptor.getValue() instanceof TooManyTasksException - ); + assertInstanceOf(TooManyTasksException.class, failureCaptor.getValue(), + "Expected task start exception to be TooManyTasksException, but was " + failureCaptor.getValue().getClass() + " instead"); tasksMaxExceededMessage = failureCaptor.getValue().getMessage(); } else { @@ -2788,10 +2891,10 @@ private void assertTasksMaxExceededMessage(String connector, int numTasks, int m + numTasks + " tasks, which is greater than " + maxTasks; assertTrue( + message.startsWith(expectedPrefix), "Warning/exception message '" - + message + "' did not start with the expected prefix '" - + expectedPrefix + "'", - message.startsWith(expectedPrefix) + + message + "' did not start with the expected prefix '" + + expectedPrefix + "'" ); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java index 6101dc48c6b11..f85e7e9bb0b04 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java @@ -16,10 +16,10 @@ */ package org.apache.kafka.connect.runtime; -import org.apache.kafka.connect.storage.AppliedConnectorConfig; -import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment; import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState; +import org.apache.kafka.connect.storage.AppliedConnectorConfig; +import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.util.ConnectorTaskId; import java.util.AbstractMap.SimpleEntry; @@ -33,8 +33,8 @@ import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; public class WorkerTestUtils { @@ -176,29 +176,30 @@ public static void assertAssignment(boolean expectFailed, int expectedRevokedTaskNum, int expectedDelay, ExtendedAssignment assignment) { - assertNotNull("Assignment can't be null", assignment); + assertNotNull(assignment, "Assignment can't be null"); - assertEquals("Wrong status in " + assignment, expectFailed, assignment.failed()); + assertEquals(expectFailed, assignment.failed(), "Wrong status in " + assignment); - assertEquals("Wrong leader in " + assignment, expectedLeader, assignment.leader()); + assertEquals(expectedLeader, assignment.leader(), "Wrong leader in " + assignment); - assertEquals("Wrong leaderUrl in " + assignment, expectedLeaderUrl(expectedLeader), - assignment.leaderUrl()); + assertEquals(expectedLeaderUrl(expectedLeader), + assignment.leaderUrl(), "Wrong leaderUrl in " + assignment); - assertEquals("Wrong offset in " + assignment, expectedOffset, assignment.offset()); + assertEquals(expectedOffset, assignment.offset(), "Wrong offset in " + assignment); assertThat("Wrong set of assigned connectors in " + assignment, assignment.connectors(), is(expectedAssignedConnectors)); - assertEquals("Wrong number of assigned tasks in " + assignment, - expectedAssignedTaskNum, assignment.tasks().size()); + assertEquals(expectedAssignedTaskNum, assignment.tasks().size(), + "Wrong number of assigned tasks in " + assignment); assertThat("Wrong set of revoked connectors in " + assignment, assignment.revokedConnectors(), is(expectedRevokedConnectors)); - assertEquals("Wrong number of revoked tasks in " + assignment, - expectedRevokedTaskNum, assignment.revokedTasks().size()); + assertEquals(expectedRevokedTaskNum, assignment.revokedTasks().size(), + "Wrong number of revoked tasks in " + assignment); - assertEquals("Wrong rebalance delay in " + assignment, expectedDelay, assignment.delay()); + assertEquals(expectedDelay, assignment.delay(), + "Wrong rebalance delay in " + assignment); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTransactionContextTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTransactionContextTest.java index 3bc2b2155d1f1..e9cc66d05aa33 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTransactionContextTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTransactionContextTest.java @@ -17,11 +17,12 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.connect.source.SourceRecord; -import org.junit.Test; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class WorkerTransactionContextTest {