From e9f65e9782741d26c6e6d7cbc7b54e6257b29a63 Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Tue, 4 Jun 2024 11:45:11 +0100 Subject: [PATCH] KAFKA-16047: Use REQUEST_TIMEOUT_MS_CONFIG in AdminClient.fenceProducers (#16151) Use REQUEST_TIMEOUT_MS_CONFIG in AdminClient.fenceProducers, or options.timeoutMs if specified, as transaction timeout. No transaction will be started with this timeout, but ReplicaManager.appendRecords uses this value as its timeout. Use REQUEST_TIMEOUT_MS_CONFIG like a regular producer append to allow for replication to take place. Co-Authored-By: Adrian Preston --- .../kafka/clients/admin/KafkaAdminClient.java | 2 +- .../internals/FenceProducersHandler.java | 12 +- .../internals/FenceProducersHandlerTest.java | 23 +- ...ribeTopicPartitionsRequestHandlerTest.java | 49 +-- .../logger/RuntimeLoggerManagerTest.java | 44 +-- .../BootstrapControllersIntegrationTest.java | 15 +- .../kafka/testkit/KafkaClusterTestKit.java | 8 +- .../src/main/java/kafka/examples/Utils.java | 2 +- .../group/GroupMetadataManagerTest.java | 22 +- .../group/MetadataImageBuilder.java | 2 +- .../group/OffsetMetadataManagerTest.java | 44 +-- .../consumer/ConsumerGroupMemberTest.java | 10 +- .../group/runtime/CoordinatorRuntimeTest.java | 20 +- .../runtime/InMemoryPartitionWriter.java | 6 +- .../log4jappender/KafkaLog4jAppenderTest.java | 2 +- .../kafka/raft/internals/LogHistory.java | 4 +- .../kafka/raft/internals/ReplicaKey.java | 4 +- .../apache/kafka/raft/internals/VoterSet.java | 8 +- .../raft/KafkaRaftClientSnapshotTest.java | 25 +- .../kafka/raft/KafkaRaftClientTest.java | 20 +- .../raft/internals/BatchBuilderTest.java | 5 +- .../config/ServerTopicConfigSynonyms.java | 14 +- .../kafka/server/mutable/BoundedListTest.java | 7 +- .../network/EndpointReadyFuturesTest.java | 11 +- .../server/util/CommandLineUtilsTest.java | 7 +- .../kafka/timeline/TimelineHashMapTest.java | 3 +- .../kafka/timeline/TimelineHashSetTest.java | 5 +- .../shell/command/CatCommandHandler.java | 3 +- .../kafka/shell/command/CdCommandHandler.java | 3 +- .../command/ErroneousCommandHandler.java | 3 +- .../shell/command/ExitCommandHandler.java | 3 +- .../shell/command/FindCommandHandler.java | 3 +- .../shell/command/HelpCommandHandler.java | 3 +- .../kafka/shell/command/LsCommandHandler.java | 6 +- .../shell/command/ManCommandHandler.java | 3 +- .../shell/command/NoOpCommandHandler.java | 3 +- .../shell/command/PwdCommandHandler.java | 3 +- .../shell/command/TreeCommandHandler.java | 3 +- .../apache/kafka/shell/glob/GlobVisitor.java | 3 +- .../kafka/shell/command/CommandTest.java | 18 +- .../kafka/shell/glob/GlobVisitorTest.java | 15 +- .../storage/LocalTieredStorageEvent.java | 5 +- .../storage/LocalTieredStorageTest.java | 4 +- .../storage/utils/RecordsKeyValueMatcher.java | 11 +- .../kafka/tools/ConsumerPerformance.java | 4 +- .../kafka/tools/LeaderElectionCommand.java | 18 +- .../org/apache/kafka/tools/ToolsUtils.java | 4 +- .../org/apache/kafka/tools/TopicCommand.java | 10 +- .../kafka/tools/GetOffsetShellTest.java | 3 +- .../tools/TopicCommandIntegrationTest.java | 2 +- .../apache/kafka/tools/TopicCommandTest.java | 6 +- .../group/ConsumerGroupCommandTest.java | 336 ------------------ .../reassign/ReassignPartitionsUnitTest.java | 32 +- .../kafka/trogdor/rest/TasksRequest.java | 6 +- .../trogdor/common/StringExpanderTest.java | 4 +- 55 files changed, 271 insertions(+), 620 deletions(-) delete mode 100644 tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 71d39900cd5a8..92ba6ad3d6c1f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4569,7 +4569,7 @@ public ListTransactionsResult listTransactions(ListTransactionsOptions options) public FenceProducersResult fenceProducers(Collection transactionalIds, FenceProducersOptions options) { AdminApiFuture.SimpleAdminApiFuture future = FenceProducersHandler.newFuture(transactionalIds); - FenceProducersHandler handler = new FenceProducersHandler(logContext); + FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs); invokeDriver(handler, future, options.timeoutMs); return new FenceProducersResult(future.all()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java index 23572dd4419ca..9a12bc1959609 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.admin.internals; +import org.apache.kafka.clients.admin.FenceProducersOptions; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; @@ -38,12 +39,16 @@ public class FenceProducersHandler extends AdminApiHandler.Unbatched { private final Logger log; private final AdminApiLookupStrategy lookupStrategy; + private final int txnTimeoutMs; public FenceProducersHandler( - LogContext logContext + FenceProducersOptions options, + LogContext logContext, + int requestTimeoutMs ) { this.log = logContext.logger(FenceProducersHandler.class); this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.TRANSACTION, logContext); + this.txnTimeoutMs = options.timeoutMs() != null ? options.timeoutMs() : requestTimeoutMs; } public static AdminApiFuture.SimpleAdminApiFuture newFuture( @@ -82,9 +87,8 @@ InitProducerIdRequest.Builder buildSingleRequest(int brokerId, CoordinatorKey ke .setProducerEpoch(ProducerIdAndEpoch.NONE.epoch) .setProducerId(ProducerIdAndEpoch.NONE.producerId) .setTransactionalId(key.idValue) - // Set transaction timeout to 1 since it's only being initialized to fence out older producers with the same transactional ID, - // and shouldn't be used for any actual record writes - .setTransactionTimeoutMs(1); + // This timeout is used by the coordinator to append the record with the new producer epoch to the transaction log. + .setTransactionTimeoutMs(txnTimeoutMs); return new InitProducerIdRequest.Builder(data); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java index 34ed2e6772c2f..9665bd0bdf120 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.admin.internals; +import org.apache.kafka.clients.admin.FenceProducersOptions; import org.apache.kafka.clients.admin.internals.AdminApiHandler.ApiResult; import org.apache.kafka.common.Node; import org.apache.kafka.common.message.InitProducerIdResponseData; @@ -39,11 +40,21 @@ public class FenceProducersHandlerTest { private final LogContext logContext = new LogContext(); private final Node node = new Node(1, "host", 1234); + private final int requestTimeoutMs = 30000; + private final FenceProducersOptions options = new FenceProducersOptions(); @Test public void testBuildRequest() { - FenceProducersHandler handler = new FenceProducersHandler(logContext); - mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId)); + FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs); + mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId, requestTimeoutMs)); + } + + @Test + public void testBuildRequestOptionsTimeout() { + final int optionsTimeoutMs = 50000; + options.timeoutMs(optionsTimeoutMs); + FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs); + mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId, optionsTimeoutMs)); } @Test @@ -51,7 +62,7 @@ public void testHandleSuccessfulResponse() { String transactionalId = "foo"; CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId); - FenceProducersHandler handler = new FenceProducersHandler(logContext); + FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs); short epoch = 57; long producerId = 7; @@ -73,7 +84,7 @@ public void testHandleSuccessfulResponse() { @Test public void testHandleErrorResponse() { String transactionalId = "foo"; - FenceProducersHandler handler = new FenceProducersHandler(logContext); + FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs); assertFatalError(handler, transactionalId, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED); assertFatalError(handler, transactionalId, Errors.CLUSTER_AUTHORIZATION_FAILED); assertFatalError(handler, transactionalId, Errors.UNKNOWN_SERVER_ERROR); @@ -136,10 +147,10 @@ private ApiResult handleResponseError( return result; } - private void assertLookup(FenceProducersHandler handler, String transactionalId) { + private void assertLookup(FenceProducersHandler handler, String transactionalId, int txnTimeoutMs) { CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId); InitProducerIdRequest.Builder request = handler.buildSingleRequest(1, key); assertEquals(transactionalId, request.data.transactionalId()); - assertEquals(1, request.data.transactionTimeoutMs()); + assertEquals(txnTimeoutMs, request.data.transactionTimeoutMs()); } } diff --git a/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java b/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java index 058ab9522b283..af2c4ddfba466 100644 --- a/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java +++ b/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java @@ -71,6 +71,7 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -105,12 +106,12 @@ public KafkaPrincipal deserialize(byte[] bytes) throws SerializationException { UpdateMetadataBroker broker = new UpdateMetadataBroker() .setId(0) .setRack("rack") - .setEndpoints(Arrays.asList( - new UpdateMetadataRequestData.UpdateMetadataEndpoint() - .setHost("broker0") - .setPort(9092) - .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) - .setListener(plaintextListener.value()) + .setEndpoints(Collections.singletonList( + new UpdateMetadataRequestData.UpdateMetadataEndpoint() + .setHost("broker0") + .setPort(9092) + .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) + .setListener(plaintextListener.value()) )); @Test @@ -168,9 +169,9 @@ void testDescribeTopicPartitionsRequest() { .setPartitionId(1) .setReplicas(Arrays.asList(0, 1, 2)) .setLeader(0) - .setIsr(Arrays.asList(0)) - .setEligibleLeaderReplicas(Arrays.asList(1)) - .setLastKnownElr(Arrays.asList(2)) + .setIsr(Collections.singletonList(0)) + .setEligibleLeaderReplicas(Collections.singletonList(1)) + .setLastKnownElr(Collections.singletonList(2)) .setLeaderEpoch(0) .setPartitionEpoch(1) .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()), @@ -179,9 +180,9 @@ void testDescribeTopicPartitionsRequest() { .setPartitionId(0) .setReplicas(Arrays.asList(0, 1, 2)) .setLeader(0) - .setIsr(Arrays.asList(0)) - .setEligibleLeaderReplicas(Arrays.asList(1)) - .setLastKnownElr(Arrays.asList(2)) + .setIsr(Collections.singletonList(0)) + .setEligibleLeaderReplicas(Collections.singletonList(1)) + .setLastKnownElr(Collections.singletonList(2)) .setLeaderEpoch(0) .setPartitionEpoch(1) .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()), @@ -190,9 +191,9 @@ void testDescribeTopicPartitionsRequest() { .setPartitionId(0) .setReplicas(Arrays.asList(0, 1, 3)) .setLeader(0) - .setIsr(Arrays.asList(0)) - .setEligibleLeaderReplicas(Arrays.asList(1)) - .setLastKnownElr(Arrays.asList(3)) + .setIsr(Collections.singletonList(0)) + .setEligibleLeaderReplicas(Collections.singletonList(1)) + .setLastKnownElr(Collections.singletonList(3)) .setLeaderEpoch(0) .setPartitionEpoch(2) .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()) @@ -371,9 +372,9 @@ void testDescribeTopicPartitionsRequestWithEdgeCases() { .setPartitionId(0) .setReplicas(Arrays.asList(0, 1, 2)) .setLeader(0) - .setIsr(Arrays.asList(0)) - .setEligibleLeaderReplicas(Arrays.asList(1)) - .setLastKnownElr(Arrays.asList(2)) + .setIsr(Collections.singletonList(0)) + .setEligibleLeaderReplicas(Collections.singletonList(1)) + .setLastKnownElr(Collections.singletonList(2)) .setLeaderEpoch(0) .setPartitionEpoch(1) .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()), @@ -382,9 +383,9 @@ void testDescribeTopicPartitionsRequestWithEdgeCases() { .setPartitionId(1) .setReplicas(Arrays.asList(0, 1, 2)) .setLeader(0) - .setIsr(Arrays.asList(0)) - .setEligibleLeaderReplicas(Arrays.asList(1)) - .setLastKnownElr(Arrays.asList(2)) + .setIsr(Collections.singletonList(0)) + .setEligibleLeaderReplicas(Collections.singletonList(1)) + .setLastKnownElr(Collections.singletonList(2)) .setLeaderEpoch(0) .setPartitionEpoch(1) .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()), @@ -393,9 +394,9 @@ void testDescribeTopicPartitionsRequestWithEdgeCases() { .setPartitionId(0) .setReplicas(Arrays.asList(0, 1, 3)) .setLeader(0) - .setIsr(Arrays.asList(0)) - .setEligibleLeaderReplicas(Arrays.asList(1)) - .setLastKnownElr(Arrays.asList(3)) + .setIsr(Collections.singletonList(0)) + .setEligibleLeaderReplicas(Collections.singletonList(1)) + .setLastKnownElr(Collections.singletonList(3)) .setLeaderEpoch(0) .setPartitionEpoch(2) .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()) diff --git a/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java b/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java index 8f170950365cc..297bee6380173 100644 --- a/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java +++ b/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java @@ -29,7 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; +import java.util.Collections; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -40,18 +40,18 @@ public class RuntimeLoggerManagerTest { @Test public void testValidateSetLogLevelConfig() { - MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig(). - setName(LOG.getName()). - setConfigOperation(OpType.SET.id()). - setValue("TRACE"))); + MANAGER.validateLogLevelConfigs(Collections.singletonList(new AlterableConfig(). + setName(LOG.getName()). + setConfigOperation(OpType.SET.id()). + setValue("TRACE"))); } @Test public void testValidateDeleteLogLevelConfig() { - MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig(). - setName(LOG.getName()). - setConfigOperation(OpType.DELETE.id()). - setValue(""))); + MANAGER.validateLogLevelConfigs(Collections.singletonList(new AlterableConfig(). + setName(LOG.getName()). + setConfigOperation(OpType.DELETE.id()). + setValue(""))); } @ParameterizedTest @@ -60,10 +60,10 @@ public void testOperationNotAllowed(byte id) { OpType opType = AlterConfigOp.OpType.forId(id); assertEquals(opType + " operation is not allowed for the BROKER_LOGGER resource", Assertions.assertThrows(InvalidRequestException.class, - () -> MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig(). - setName(LOG.getName()). - setConfigOperation(id). - setValue("TRACE")))).getMessage()); + () -> MANAGER.validateLogLevelConfigs(Collections.singletonList(new AlterableConfig(). + setName(LOG.getName()). + setConfigOperation(id). + setValue("TRACE")))).getMessage()); } @Test @@ -71,15 +71,15 @@ public void testValidateBogusLogLevelNameNotAllowed() { assertEquals("Cannot set the log level of " + LOG.getName() + " to BOGUS as it is not " + "a supported log level. Valid log levels are DEBUG, ERROR, FATAL, INFO, TRACE, WARN", Assertions.assertThrows(InvalidConfigurationException.class, - () -> MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig(). - setName(LOG.getName()). - setConfigOperation(OpType.SET.id()). - setValue("BOGUS")))).getMessage()); + () -> MANAGER.validateLogLevelConfigs(Collections.singletonList(new AlterableConfig(). + setName(LOG.getName()). + setConfigOperation(OpType.SET.id()). + setValue("BOGUS")))).getMessage()); } @Test public void testValidateSetRootLogLevelConfig() { - MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig(). + MANAGER.validateLogLevelConfigs(Collections.singletonList(new AlterableConfig(). setName(Log4jController.ROOT_LOGGER()). setConfigOperation(OpType.SET.id()). setValue("TRACE"))); @@ -90,9 +90,9 @@ public void testValidateRemoveRootLogLevelConfigNotAllowed() { assertEquals("Removing the log level of the " + Log4jController.ROOT_LOGGER() + " logger is not allowed", Assertions.assertThrows(InvalidRequestException.class, - () -> MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig(). - setName(Log4jController.ROOT_LOGGER()). - setConfigOperation(OpType.DELETE.id()). - setValue("")))).getMessage()); + () -> MANAGER.validateLogLevelConfigs(Collections.singletonList(new AlterableConfig(). + setName(Log4jController.ROOT_LOGGER()). + setConfigOperation(OpType.DELETE.id()). + setValue("")))).getMessage()); } } diff --git a/core/src/test/java/kafka/test/server/BootstrapControllersIntegrationTest.java b/core/src/test/java/kafka/test/server/BootstrapControllersIntegrationTest.java index 5182293f4b7fd..e267e92a7be70 100644 --- a/core/src/test/java/kafka/test/server/BootstrapControllersIntegrationTest.java +++ b/core/src/test/java/kafka/test/server/BootstrapControllersIntegrationTest.java @@ -50,7 +50,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -220,15 +219,15 @@ public void testIncrementalAlterConfigs(boolean usingBootstrapControllers) throw ConfigResource nodeResource = new ConfigResource(BROKER, "" + nodeId); ConfigResource defaultResource = new ConfigResource(BROKER, ""); Map> alterations = new HashMap<>(); - alterations.put(nodeResource, Arrays.asList( - new AlterConfigOp(new ConfigEntry("my.custom.config", "foo"), - AlterConfigOp.OpType.SET))); - alterations.put(defaultResource, Arrays.asList( - new AlterConfigOp(new ConfigEntry("my.custom.config", "bar"), - AlterConfigOp.OpType.SET))); + alterations.put(nodeResource, Collections.singletonList( + new AlterConfigOp(new ConfigEntry("my.custom.config", "foo"), + AlterConfigOp.OpType.SET))); + alterations.put(defaultResource, Collections.singletonList( + new AlterConfigOp(new ConfigEntry("my.custom.config", "bar"), + AlterConfigOp.OpType.SET))); admin.incrementalAlterConfigs(alterations).all().get(1, TimeUnit.MINUTES); TestUtils.retryOnExceptionWithTimeout(30_000, () -> { - Config config = admin.describeConfigs(Arrays.asList(nodeResource)). + Config config = admin.describeConfigs(Collections.singletonList(nodeResource)). all().get(1, TimeUnit.MINUTES).get(nodeResource); ConfigEntry entry = config.entries().stream(). filter(e -> e.name().equals("my.custom.config")). diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java index 94d94dc71735e..5897f45cce9a3 100644 --- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java +++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java @@ -147,9 +147,9 @@ public FaultHandler build(String name, boolean fatal, Runnable action) { } public static class Builder { - private TestKitNodes nodes; - private Map configProps = new HashMap<>(); - private SimpleFaultHandlerFactory faultHandlerFactory = new SimpleFaultHandlerFactory(); + private final TestKitNodes nodes; + private final Map configProps = new HashMap<>(); + private final SimpleFaultHandlerFactory faultHandlerFactory = new SimpleFaultHandlerFactory(); public Builder(TestKitNodes nodes) { this.nodes = nodes; @@ -481,7 +481,7 @@ public String quorumVotersConfig() throws ExecutionException, InterruptedExcepti } public class ClientPropertiesBuilder { - private Properties properties; + private final Properties properties; private boolean usingBootstrapControllers = false; public ClientPropertiesBuilder() { diff --git a/examples/src/main/java/kafka/examples/Utils.java b/examples/src/main/java/kafka/examples/Utils.java index 8846879f67d5f..d0b7734becfba 100644 --- a/examples/src/main/java/kafka/examples/Utils.java +++ b/examples/src/main/java/kafka/examples/Utils.java @@ -39,7 +39,7 @@ private Utils() { } public static void printHelp(String message, Object... args) { - System.out.println(format(message, args)); + System.out.printf(message + "%n", args); } public static void printOut(String message, Object... args) { diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index abf48fd64158a..d84fb8b6c4ec5 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -9696,9 +9696,9 @@ public void testConsumerGroupHeartbeatWithPreparingRebalanceClassicGroup() throw .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription( Arrays.asList(fooTopicName, barTopicName), null, - Arrays.asList( - new TopicPartition(barTopicName, 0) - ) + Collections.singletonList( + new TopicPartition(barTopicName, 0) + ) )))) ); @@ -9713,8 +9713,8 @@ public void testConsumerGroupHeartbeatWithPreparingRebalanceClassicGroup() throw ); put( memberId2, - Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Arrays.asList( - new TopicPartition(barTopicName, 0) + Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Collections.singletonList( + new TopicPartition(barTopicName, 0) )))) ); } @@ -9943,9 +9943,9 @@ public void testConsumerGroupHeartbeatWithCompletingRebalanceClassicGroup() thro .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription( Arrays.asList(fooTopicName, barTopicName), null, - Arrays.asList( - new TopicPartition(barTopicName, 0) - ) + Collections.singletonList( + new TopicPartition(barTopicName, 0) + ) )))) ); @@ -9960,8 +9960,8 @@ public void testConsumerGroupHeartbeatWithCompletingRebalanceClassicGroup() thro ); put( memberId2, - Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Arrays.asList( - new TopicPartition(barTopicName, 0) + Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Collections.singletonList( + new TopicPartition(barTopicName, 0) )))) ); } @@ -10755,7 +10755,7 @@ public void testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() { .setPartitions(Arrays.asList(3, 4, 5)), new ConsumerGroupHeartbeatRequestData.TopicPartitions() .setTopicId(barTopicId) - .setPartitions(Arrays.asList(2)) + .setPartitions(Collections.singletonList(2)) )) ); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MetadataImageBuilder.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MetadataImageBuilder.java index 995f1ee74a50b..2f6aacccebabc 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MetadataImageBuilder.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MetadataImageBuilder.java @@ -27,7 +27,7 @@ import java.util.Arrays; public class MetadataImageBuilder { - private MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); + private final MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); public MetadataImageBuilder addTopic( Uuid topicId, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index e2684a7cab0ae..3c1dbbf1e0850 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -101,7 +101,7 @@ public static class Builder { private GroupMetadataManager groupMetadataManager = null; private MetadataImage metadataImage = null; private GroupCoordinatorConfig config = null; - private GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class); + private final GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class); Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) { config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(offsetMetadataMaxSize, 60000L, 24 * 60 * 1000); @@ -2039,30 +2039,30 @@ public void testFetchAllOffsetsAtDifferentCommittedOffset() { assertEquals(Collections.emptyList(), context.fetchAllOffsets("group", 0L)); // Fetching with 1 should return data up to offset 1. - assertEquals(Arrays.asList( - new OffsetFetchResponseData.OffsetFetchResponseTopics() - .setName("foo") - .setPartitions(Arrays.asList( - mkOffsetPartitionResponse(0, 100L, 1, "metadata") - )) + assertEquals(Collections.singletonList( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName("foo") + .setPartitions(Collections.singletonList( + mkOffsetPartitionResponse(0, 100L, 1, "metadata") + )) ), context.fetchAllOffsets("group", 1L)); // Fetching with 2 should return data up to offset 2. - assertEquals(Arrays.asList( - new OffsetFetchResponseData.OffsetFetchResponseTopics() - .setName("foo") - .setPartitions(Arrays.asList( - mkOffsetPartitionResponse(0, 100L, 1, "metadata"), - mkOffsetPartitionResponse(1, 110L, 1, "metadata") - )) + assertEquals(Collections.singletonList( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName("foo") + .setPartitions(Arrays.asList( + mkOffsetPartitionResponse(0, 100L, 1, "metadata"), + mkOffsetPartitionResponse(1, 110L, 1, "metadata") + )) ), context.fetchAllOffsets("group", 2L)); // Fetching with 3 should return data up to offset 3. assertEquals(Arrays.asList( new OffsetFetchResponseData.OffsetFetchResponseTopics() .setName("bar") - .setPartitions(Arrays.asList( - mkOffsetPartitionResponse(0, 200L, 1, "metadata") + .setPartitions(Collections.singletonList( + mkOffsetPartitionResponse(0, 200L, 1, "metadata") )), new OffsetFetchResponseData.OffsetFetchResponseTopics() .setName("foo") @@ -2076,8 +2076,8 @@ public void testFetchAllOffsetsAtDifferentCommittedOffset() { assertEquals(Arrays.asList( new OffsetFetchResponseData.OffsetFetchResponseTopics() .setName("bar") - .setPartitions(Arrays.asList( - mkOffsetPartitionResponse(0, 200L, 1, "metadata") + .setPartitions(Collections.singletonList( + mkOffsetPartitionResponse(0, 200L, 1, "metadata") )), new OffsetFetchResponseData.OffsetFetchResponseTopics() .setName("foo") @@ -2130,8 +2130,8 @@ public void testFetchAllOffsetsWithPendingTransactionalOffsets() { assertEquals(Arrays.asList( new OffsetFetchResponseData.OffsetFetchResponseTopics() .setName("bar") - .setPartitions(Arrays.asList( - mkOffsetPartitionResponse(0, Errors.UNSTABLE_OFFSET_COMMIT) + .setPartitions(Collections.singletonList( + mkOffsetPartitionResponse(0, Errors.UNSTABLE_OFFSET_COMMIT) )), new OffsetFetchResponseData.OffsetFetchResponseTopics() .setName("foo") @@ -2146,8 +2146,8 @@ public void testFetchAllOffsetsWithPendingTransactionalOffsets() { assertEquals(Arrays.asList( new OffsetFetchResponseData.OffsetFetchResponseTopics() .setName("bar") - .setPartitions(Arrays.asList( - mkOffsetPartitionResponse(0, 200L, 1, "metadata") + .setPartitions(Collections.singletonList( + mkOffsetPartitionResponse(0, 200L, 1, "metadata") )), new OffsetFetchResponseData.OffsetFetchResponseTopics() .setName("foo") diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java index 44ed930f3903e..180ecd6465834 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java @@ -355,11 +355,11 @@ public void testAsConsumerGroupDescribeWithTopicNameNotFound() { @Test public void testClassicProtocolListFromJoinRequestProtocolCollection() { JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestData.JoinGroupRequestProtocolCollection(); - protocols.addAll(Arrays.asList( - new JoinGroupRequestData.JoinGroupRequestProtocol() - .setName("range") - .setMetadata(new byte[]{1, 2, 3}) - )); + protocols.add( + new JoinGroupRequestData.JoinGroupRequestProtocol() + .setName("range") + .setMetadata(new byte[]{1, 2, 3}) + ); assertEquals( toClassicProtocolCollection("range"), diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java index ae1d404792406..465e2e13f0e4d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java @@ -147,7 +147,7 @@ public void close() throws Exception {} * when poll() is called. */ private static class ManualEventProcessor implements CoordinatorEventProcessor { - private Deque queue = new LinkedList<>(); + private final Deque queue = new LinkedList<>(); @Override public void enqueueLast(CoordinatorEvent event) throws RejectedExecutionException { @@ -986,13 +986,13 @@ public void testScheduleWriteOp() throws ExecutionException, InterruptedExceptio // Records have been replayed to the coordinator. assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); // Records have been written to the log. - assertEquals(Arrays.asList( - records(timer.time().milliseconds(), "record1", "record2") + assertEquals(Collections.singletonList( + records(timer.time().milliseconds(), "record1", "record2") ), writer.entries(TP)); // Write #2. CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Arrays.asList("record3"), "response2")); + state -> new CoordinatorResult<>(Collections.singletonList("record3"), "response2")); // Verify that the write is not committed yet. assertFalse(write2.isDone()); @@ -1540,8 +1540,8 @@ public void testScheduleTransactionCompletion(TransactionResult result) throws E 100L )); // Records have been written to the log. - assertEquals(Arrays.asList( - transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2") + assertEquals(Collections.singletonList( + transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2") ), writer.entries(TP)); // Complete transaction #1. @@ -1785,8 +1785,8 @@ public void replayEndTransactionMarker( assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L)); assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records()); - assertEquals(Arrays.asList( - transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2") + assertEquals(Collections.singletonList( + transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2") ), writer.entries(TP)); // Complete transaction #1. It should fail. @@ -1807,8 +1807,8 @@ public void replayEndTransactionMarker( assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L)); assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records()); - assertEquals(Arrays.asList( - transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2") + assertEquals(Collections.singletonList( + transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2") ), writer.entries(TP)); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java index adcf0fbe13706..cff65269c26db 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java @@ -38,9 +38,9 @@ public class InMemoryPartitionWriter implements PartitionWriter { private class PartitionState { - private ReentrantLock lock = new ReentrantLock(); - private List listeners = new ArrayList<>(); - private List entries = new ArrayList<>(); + private final ReentrantLock lock = new ReentrantLock(); + private final List listeners = new ArrayList<>(); + private final List entries = new ArrayList<>(); private long endOffset = 0L; private long committedOffset = 0L; } diff --git a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java index eb64a85bd8869..18ac97901cb0a 100644 --- a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java +++ b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java @@ -45,7 +45,7 @@ public class KafkaLog4jAppenderTest { - private Logger logger = Logger.getLogger(KafkaLog4jAppenderTest.class); + private final Logger logger = Logger.getLogger(KafkaLog4jAppenderTest.class); @BeforeEach public void setup() { diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/LogHistory.java b/raft/src/main/java/org/apache/kafka/raft/internals/LogHistory.java index 6751400678e14..6f77cab592abd 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/LogHistory.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/LogHistory.java @@ -101,9 +101,7 @@ public boolean equals(Object o) { Entry that = (Entry) o; if (offset != that.offset) return false; - if (!Objects.equals(value, that.value)) return false; - - return true; + return Objects.equals(value, that.value); } @Override diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/ReplicaKey.java b/raft/src/main/java/org/apache/kafka/raft/internals/ReplicaKey.java index 7d799a9bd6d40..002a2dee1914a 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/ReplicaKey.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/ReplicaKey.java @@ -45,9 +45,7 @@ public boolean equals(Object o) { ReplicaKey that = (ReplicaKey) o; if (id != that.id) return false; - if (!Objects.equals(directoryId, that.directoryId)) return false; - - return true; + return Objects.equals(directoryId, that.directoryId); } @Override diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java index 3ab41f5788cfd..393cb373b316c 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java @@ -248,9 +248,7 @@ public boolean hasOverlappingMajority(VoterSet that) { .collect(Collectors.toSet()); if (Utils.diff(HashSet::new, thisReplicaKeys, thatReplicaKeys).size() > 1) return false; - if (Utils.diff(HashSet::new, thatReplicaKeys, thisReplicaKeys).size() > 1) return false; - - return true; + return Utils.diff(HashSet::new, thatReplicaKeys, thisReplicaKeys).size() <= 1; } @Override @@ -314,9 +312,7 @@ public boolean equals(Object o) { if (!Objects.equals(voterKey, that.voterKey)) return false; if (!Objects.equals(supportedKRaftVersion, that.supportedKRaftVersion)) return false; - if (!Objects.equals(listeners, that.listeners)) return false; - - return true; + return Objects.equals(listeners, that.listeners); } @Override diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java index 299fa819d5882..5c7b00e30953b 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.OptionalLong; @@ -122,7 +123,7 @@ public void testLeaderListenerNotified(boolean entireLog) throws Exception { // Check that listener was notified of the new snapshot try (SnapshotReader snapshot = context.listener.drainHandledSnapshot().get()) { assertEquals(snapshotId, snapshot.snapshotId()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot); + SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(), snapshot); } } @@ -164,7 +165,7 @@ public void testFollowerListenerNotified(boolean entireLog) throws Exception { // Check that listener was notified of the new snapshot try (SnapshotReader snapshot = context.listener.drainHandledSnapshot().get()) { assertEquals(snapshotId, snapshot.snapshotId()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot); + SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(), snapshot); } } @@ -210,7 +211,7 @@ public void testSecondListenerNotified(boolean entireLog) throws Exception { // Check that the second listener was notified of the new snapshot try (SnapshotReader snapshot = secondListener.drainHandledSnapshot().get()) { assertEquals(snapshotId, snapshot.snapshotId()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot); + SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(), snapshot); } } @@ -245,7 +246,7 @@ public void testListenerRenotified() throws Exception { // Check that listener was notified of the new snapshot try (SnapshotReader snapshot = context.listener.drainHandledSnapshot().get()) { assertEquals(snapshotId, snapshot.snapshotId()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot); + SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(), snapshot); } // Generate a new snapshot @@ -264,7 +265,7 @@ public void testListenerRenotified() throws Exception { // Check that listener was notified of the second snapshot try (SnapshotReader snapshot = context.listener.drainHandledSnapshot().get()) { assertEquals(secondSnapshotId, snapshot.snapshotId()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot); + SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(), snapshot); } } @@ -660,7 +661,7 @@ public void testFetchSnapshotRequestAsLeader() throws Exception { List records = Arrays.asList("foo", "bar"); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .appendToLog(snapshotId.epoch(), Arrays.asList("a")) + .appendToLog(snapshotId.epoch(), Collections.singletonList("a")) .build(); context.becomeLeader(); @@ -712,7 +713,7 @@ public void testLeaderShouldResignLeadershipIfNotGetFetchSnapshotRequestFromMajo List records = Arrays.asList("foo", "bar"); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .appendToLog(snapshotId.epoch(), Arrays.asList("a")) + .appendToLog(snapshotId.epoch(), Collections.singletonList("a")) .build(); int resignLeadershipTimeout = context.checkQuorumTimeoutMs; @@ -909,7 +910,7 @@ public void testFetchSnapshotRequestWithInvalidPosition() throws Exception { List records = Arrays.asList("foo", "bar"); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .appendToLog(snapshotId.epoch(), Arrays.asList("a")) + .appendToLog(snapshotId.epoch(), Collections.singletonList("a")) .build(); context.becomeLeader(); @@ -1136,12 +1137,12 @@ public void testFetchResponseWithSnapshotId() throws Exception { // Check that the snapshot was written to the log RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get(); assertEquals(memorySnapshot.buffer().remaining(), snapshot.sizeInBytes()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), snapshot); + SnapshotWriterReaderTest.assertSnapshot(Collections.singletonList(records), snapshot); // Check that listener was notified of the new snapshot try (SnapshotReader reader = context.listener.drainHandledSnapshot().get()) { assertEquals(snapshotId, reader.snapshotId()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), reader); + SnapshotWriterReaderTest.assertSnapshot(Collections.singletonList(records), reader); } } @@ -1239,12 +1240,12 @@ public void testFetchSnapshotResponsePartialData() throws Exception { // Check that the snapshot was written to the log RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get(); assertEquals(memorySnapshot.buffer().remaining(), snapshot.sizeInBytes()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), snapshot); + SnapshotWriterReaderTest.assertSnapshot(Collections.singletonList(records), snapshot); // Check that listener was notified of the new snapshot try (SnapshotReader reader = context.listener.drainHandledSnapshot().get()) { assertEquals(snapshotId, reader.snapshotId()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), reader); + SnapshotWriterReaderTest.assertSnapshot(Collections.singletonList(records), reader); } } diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 049b648d8811a..e1288be131d8a 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -465,7 +465,7 @@ public void testResignInOlderEpochIgnored() throws Exception { context.client.poll(); // Ensure we are still leader even after expiration of the election timeout. - context.time.sleep(context.electionTimeoutMs() * 2); + context.time.sleep(context.electionTimeoutMs() * 2L); context.client.poll(); context.assertElectedLeader(currentEpoch, localId); } @@ -607,7 +607,7 @@ public void testElectionTimeoutAfterUserInitiatedResign() throws Exception { resignedEpoch, OptionalInt.of(localId)); // After the election timer, we should become a candidate. - context.time.sleep(2 * context.electionTimeoutMs()); + context.time.sleep(2L * context.electionTimeoutMs()); context.pollUntil(context.client.quorum()::isCandidate); assertEquals(resignedEpoch + 1, context.currentEpoch()); assertEquals(new LeaderAndEpoch(OptionalInt.empty(), resignedEpoch + 1), @@ -693,7 +693,7 @@ public void testInitializeAsCandidateAndBecomeLeader() throws Exception { RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build(); context.assertUnknownLeader(0); - context.time.sleep(2 * context.electionTimeoutMs()); + context.time.sleep(2L * context.electionTimeoutMs()); context.pollUntilRequest(); context.assertVotedCandidate(1, localId); @@ -737,7 +737,7 @@ public void testInitializeAsCandidateAndBecomeLeaderQuorumOfThree() throws Excep RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build(); context.assertUnknownLeader(0); - context.time.sleep(2 * context.electionTimeoutMs()); + context.time.sleep(2L * context.electionTimeoutMs()); context.pollUntilRequest(); context.assertVotedCandidate(1, localId); @@ -1118,7 +1118,7 @@ public void testVoteRequestTimeout() throws Exception { RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build(); context.assertUnknownLeader(0); - context.time.sleep(2 * context.electionTimeoutMs()); + context.time.sleep(2L * context.electionTimeoutMs()); context.pollUntilRequest(); context.assertVotedCandidate(epoch, localId); @@ -1361,7 +1361,7 @@ public void testRetryElection() throws Exception { context.assertUnknownLeader(0); - context.time.sleep(2 * context.electionTimeoutMs()); + context.time.sleep(2L * context.electionTimeoutMs()); context.pollUntilRequest(); context.assertVotedCandidate(epoch, localId); @@ -2090,7 +2090,7 @@ public void testVoteResponseIgnoredAfterBecomingFollower() throws Exception { context.assertUnknownLeader(epoch - 1); // Sleep a little to ensure that we become a candidate - context.time.sleep(context.electionTimeoutMs() * 2); + context.time.sleep(context.electionTimeoutMs() * 2L); // Wait until the vote requests are inflight context.pollUntilRequest(); @@ -2696,7 +2696,7 @@ public void testFollowerLogReconciliation() throws Exception { RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) .withElectedLeader(epoch, otherNodeId) .appendToLog(lastEpoch, Arrays.asList("foo", "bar")) - .appendToLog(lastEpoch, Arrays.asList("baz")) + .appendToLog(lastEpoch, singletonList("baz")) .build(); context.assertElectedLeader(epoch, otherNodeId); @@ -2827,7 +2827,7 @@ public void testClusterAuthorizationFailedInVote() throws Exception { .build(); // Sleep a little to ensure that we become a candidate - context.time.sleep(context.electionTimeoutMs() * 2); + context.time.sleep(context.electionTimeoutMs() * 2L); context.pollUntilRequest(); context.assertVotedCandidate(epoch, localId); @@ -3186,7 +3186,7 @@ public void testHandleCommitCallbackFiresInCandidateState() throws Exception { // Timeout the election and become candidate int candidateEpoch = epoch + 2; - context.time.sleep(context.electionTimeoutMs() * 2); + context.time.sleep(context.electionTimeoutMs() * 2L); context.client.poll(); context.assertVotedCandidate(candidateEpoch, localId); diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java index 824ec20afc0c7..ba356fff4b6a1 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java @@ -28,6 +28,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -69,7 +70,7 @@ void testBuildBatch(CompressionType compressionType) { records.forEach(record -> builder.appendRecord(record, null)); MemoryRecords builtRecordSet = builder.build(); - assertTrue(builder.bytesNeeded(Arrays.asList("a"), null).isPresent()); + assertTrue(builder.bytesNeeded(Collections.singletonList("a"), null).isPresent()); assertThrows(IllegalStateException.class, () -> builder.appendRecord("a", null)); List builtBatches = Utils.toList(builtRecordSet.batchIterator()); @@ -113,7 +114,7 @@ public void testHasRoomForUncompressed(int batchSize) { String record = "i am a record"; - while (!builder.bytesNeeded(Arrays.asList(record), null).isPresent()) { + while (!builder.bytesNeeded(Collections.singletonList(record), null).isPresent()) { builder.appendRecord(record, null); } diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java index 757c65b6b7fa4..d0b628ab6cab0 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.server.config; -import static java.util.Arrays.asList; - import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -115,23 +113,23 @@ public static String serverSynonym(String topicConfigName) { } private static Entry> sameName(String configName) { - return Utils.mkEntry(configName, asList(new ConfigSynonym(configName))); + return Utils.mkEntry(configName, Collections.singletonList(new ConfigSynonym(configName))); } private static Entry> sameNameWithLogPrefix(String configName) { - return Utils.mkEntry(configName, asList(new ConfigSynonym(LOG_PREFIX + configName))); + return Utils.mkEntry(configName, Collections.singletonList(new ConfigSynonym(LOG_PREFIX + configName))); } private static Entry> sameNameWithLogCleanerPrefix(String configName) { - return Utils.mkEntry(configName, asList(new ConfigSynonym(LOG_CLEANER_PREFIX + configName))); + return Utils.mkEntry(configName, Collections.singletonList(new ConfigSynonym(LOG_CLEANER_PREFIX + configName))); } private static Entry> singleWithLogPrefix(String topicConfigName, String brokerConfigName) { - return Utils.mkEntry(topicConfigName, asList(new ConfigSynonym(LOG_PREFIX + brokerConfigName))); + return Utils.mkEntry(topicConfigName, Collections.singletonList(new ConfigSynonym(LOG_PREFIX + brokerConfigName))); } private static Entry> singleWithLogCleanerPrefix(String topicConfigName, String brokerConfigName) { - return Utils.mkEntry(topicConfigName, asList(new ConfigSynonym(LOG_CLEANER_PREFIX + brokerConfigName))); + return Utils.mkEntry(topicConfigName, Collections.singletonList(new ConfigSynonym(LOG_CLEANER_PREFIX + brokerConfigName))); } private static Entry> listWithLogPrefix(String topicConfigName, ConfigSynonym... synonyms) { @@ -142,6 +140,6 @@ private static Entry> listWithLogPrefix(String topic } private static Entry> single(String topicConfigName, String brokerConfigName) { - return Utils.mkEntry(topicConfigName, asList(new ConfigSynonym(brokerConfigName))); + return Utils.mkEntry(topicConfigName, Collections.singletonList(new ConfigSynonym(brokerConfigName))); } } diff --git a/server-common/src/test/java/org/apache/kafka/server/mutable/BoundedListTest.java b/server-common/src/test/java/org/apache/kafka/server/mutable/BoundedListTest.java index df2608430f4ae..70c38c63a7752 100644 --- a/server-common/src/test/java/org/apache/kafka/server/mutable/BoundedListTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/mutable/BoundedListTest.java @@ -21,6 +21,7 @@ import org.junit.jupiter.api.Timeout; import java.util.Arrays; +import java.util.Collections; import java.util.List; import static org.junit.jupiter.api.Assertions.assertArrayEquals; @@ -122,7 +123,7 @@ public void testRemove() { list.remove("a"); assertEquals(Arrays.asList("a", "c"), list); list.remove(0); - assertEquals(Arrays.asList("c"), list); + assertEquals(Collections.singletonList("c"), list); } @Test @@ -132,7 +133,7 @@ public void testClear() { list.add("a"); list.add("c"); list.clear(); - assertEquals(Arrays.asList(), list); + assertEquals(Collections.emptyList(), list); assertTrue(list.isEmpty()); } @@ -205,7 +206,7 @@ public void testSubList() { list.add(1); list.add(2); list.add(3); - assertEquals(Arrays.asList(2), list.subList(1, 2)); + assertEquals(Collections.singletonList(2), list.subList(1, 2)); assertThrows(UnsupportedOperationException.class, () -> list.subList(1, 2).remove(2)); } diff --git a/server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java b/server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java index 2d76e5df37dbd..b3347a9661ffa 100644 --- a/server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.server.network; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -43,11 +44,11 @@ final public class EndpointReadyFuturesTest { new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "127.0.0.1", 9093); private static final KafkaAuthorizerServerInfo INFO = new KafkaAuthorizerServerInfo( - new ClusterResource("S6-01LPiQOCBhhFIunQUcQ"), - 1, - Arrays.asList(EXTERNAL, INTERNAL), - INTERNAL, - Arrays.asList("INTERNAL")); + new ClusterResource("S6-01LPiQOCBhhFIunQUcQ"), + 1, + Arrays.asList(EXTERNAL, INTERNAL), + INTERNAL, + Collections.singletonList("INTERNAL")); static void assertComplete( EndpointReadyFutures readyFutures, diff --git a/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java b/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java index e52f39cf846a6..672eb93b65449 100644 --- a/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java @@ -22,6 +22,7 @@ import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Properties; @@ -33,14 +34,14 @@ public class CommandLineUtilsTest { @Test public void testParseEmptyArg() { - List argArray = Arrays.asList("my.empty.property="); + List argArray = Collections.singletonList("my.empty.property="); assertThrows(IllegalArgumentException.class, () -> CommandLineUtils.parseKeyValueArgs(argArray, false)); } @Test public void testParseEmptyArgWithNoDelimiter() { - List argArray = Arrays.asList("my.empty.property"); + List argArray = Collections.singletonList("my.empty.property"); assertThrows(IllegalArgumentException.class, () -> CommandLineUtils.parseKeyValueArgs(argArray, false)); } @@ -56,7 +57,7 @@ public void testParseEmptyArgAsValid() { @Test public void testParseSingleArg() { - List argArray = Arrays.asList("my.property=value"); + List argArray = Collections.singletonList("my.property=value"); Properties props = CommandLineUtils.parseKeyValueArgs(argArray); assertEquals(props.getProperty("my.property"), "value", "Value of a single property should be 'value'"); diff --git a/server-common/src/test/java/org/apache/kafka/timeline/TimelineHashMapTest.java b/server-common/src/test/java/org/apache/kafka/timeline/TimelineHashMapTest.java index 65413ab5cf432..1b4218c0c44e5 100644 --- a/server-common/src/test/java/org/apache/kafka/timeline/TimelineHashMapTest.java +++ b/server-common/src/test/java/org/apache/kafka/timeline/TimelineHashMapTest.java @@ -18,7 +18,6 @@ package org.apache.kafka.timeline; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -96,7 +95,7 @@ public void testMapMethods() { assertNull(map.putIfAbsent(1, "xyz")); assertEquals("xyz", map.putIfAbsent(1, "123")); assertEquals("xyz", map.putIfAbsent(1, "ghi")); - map.putAll(Collections.singletonMap(2, "b")); + map.put(2, "b"); assertTrue(map.containsKey(2)); assertEquals("xyz", map.remove(1)); assertEquals("b", map.remove(2)); diff --git a/server-common/src/test/java/org/apache/kafka/timeline/TimelineHashSetTest.java b/server-common/src/test/java/org/apache/kafka/timeline/TimelineHashSetTest.java index 070893cdc84be..82c220f5969b4 100644 --- a/server-common/src/test/java/org/apache/kafka/timeline/TimelineHashSetTest.java +++ b/server-common/src/test/java/org/apache/kafka/timeline/TimelineHashSetTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.timeline; import java.util.Arrays; +import java.util.Collections; import org.apache.kafka.common.utils.LogContext; import org.junit.jupiter.api.Test; @@ -61,9 +62,9 @@ public void testIteration() { set.add("d"); assertTrue(set.retainAll(Arrays.asList("a", "b", "c"))); assertFalse(set.retainAll(Arrays.asList("a", "b", "c"))); - assertFalse(set.removeAll(Arrays.asList("d"))); + assertFalse(set.removeAll(Collections.singletonList("d"))); registry.getOrCreateSnapshot(2); - assertTrue(set.removeAll(Arrays.asList("c"))); + assertTrue(set.removeAll(Collections.singletonList("c"))); assertThat(TimelineHashMapTest.iteratorToList(set.iterator(2)), containsInAnyOrder("a", "b", "c")); assertThat(TimelineHashMapTest.iteratorToList(set.iterator()), diff --git a/shell/src/main/java/org/apache/kafka/shell/command/CatCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/CatCommandHandler.java index 9cd7603f94caa..51b2f6d3cb78d 100644 --- a/shell/src/main/java/org/apache/kafka/shell/command/CatCommandHandler.java +++ b/shell/src/main/java/org/apache/kafka/shell/command/CatCommandHandler.java @@ -121,7 +121,6 @@ public int hashCode() { public boolean equals(Object other) { if (!(other instanceof CatCommandHandler)) return false; CatCommandHandler o = (CatCommandHandler) other; - if (!Objects.equals(o.targets, targets)) return false; - return true; + return Objects.equals(o.targets, targets); } } diff --git a/shell/src/main/java/org/apache/kafka/shell/command/CdCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/CdCommandHandler.java index 71057a4ade75f..ba22c0bb8ecf6 100644 --- a/shell/src/main/java/org/apache/kafka/shell/command/CdCommandHandler.java +++ b/shell/src/main/java/org/apache/kafka/shell/command/CdCommandHandler.java @@ -112,7 +112,6 @@ public int hashCode() { public boolean equals(Object other) { if (!(other instanceof CdCommandHandler)) return false; CdCommandHandler o = (CdCommandHandler) other; - if (!o.target.equals(target)) return false; - return true; + return o.target.equals(target); } } diff --git a/shell/src/main/java/org/apache/kafka/shell/command/ErroneousCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/ErroneousCommandHandler.java index 27cb02a906b7c..e8b8096f8b2d4 100644 --- a/shell/src/main/java/org/apache/kafka/shell/command/ErroneousCommandHandler.java +++ b/shell/src/main/java/org/apache/kafka/shell/command/ErroneousCommandHandler.java @@ -52,8 +52,7 @@ public int hashCode() { public boolean equals(Object other) { if (!(other instanceof ErroneousCommandHandler)) return false; ErroneousCommandHandler o = (ErroneousCommandHandler) other; - if (!Objects.equals(o.message, message)) return false; - return true; + return Objects.equals(o.message, message); } @Override diff --git a/shell/src/main/java/org/apache/kafka/shell/command/ExitCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/ExitCommandHandler.java index 56f92de30f318..fab54be2c98be 100644 --- a/shell/src/main/java/org/apache/kafka/shell/command/ExitCommandHandler.java +++ b/shell/src/main/java/org/apache/kafka/shell/command/ExitCommandHandler.java @@ -89,7 +89,6 @@ public int hashCode() { @Override public boolean equals(Object other) { - if (!(other instanceof ExitCommandHandler)) return false; - return true; + return other instanceof ExitCommandHandler; } } diff --git a/shell/src/main/java/org/apache/kafka/shell/command/FindCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/FindCommandHandler.java index a41b0b21ca318..133cb988d01f9 100644 --- a/shell/src/main/java/org/apache/kafka/shell/command/FindCommandHandler.java +++ b/shell/src/main/java/org/apache/kafka/shell/command/FindCommandHandler.java @@ -127,7 +127,6 @@ public int hashCode() { public boolean equals(Object other) { if (!(other instanceof FindCommandHandler)) return false; FindCommandHandler o = (FindCommandHandler) other; - if (!Objects.equals(o.paths, paths)) return false; - return true; + return Objects.equals(o.paths, paths); } } diff --git a/shell/src/main/java/org/apache/kafka/shell/command/HelpCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/HelpCommandHandler.java index 52345487b44a5..e0a5aa03288b0 100644 --- a/shell/src/main/java/org/apache/kafka/shell/command/HelpCommandHandler.java +++ b/shell/src/main/java/org/apache/kafka/shell/command/HelpCommandHandler.java @@ -89,7 +89,6 @@ public int hashCode() { @Override public boolean equals(Object other) { - if (!(other instanceof HelpCommandHandler)) return false; - return true; + return other instanceof HelpCommandHandler; } } diff --git a/shell/src/main/java/org/apache/kafka/shell/command/LsCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/LsCommandHandler.java index 848b7bd5185be..e42f7414cf41c 100644 --- a/shell/src/main/java/org/apache/kafka/shell/command/LsCommandHandler.java +++ b/shell/src/main/java/org/apache/kafka/shell/command/LsCommandHandler.java @@ -271,8 +271,7 @@ public boolean equals(Object o) { if (!(o instanceof ColumnSchema)) return false; ColumnSchema other = (ColumnSchema) o; if (entriesPerColumn != other.entriesPerColumn) return false; - if (!Arrays.equals(columnWidths, other.columnWidths)) return false; - return true; + return Arrays.equals(columnWidths, other.columnWidths); } @Override @@ -298,7 +297,6 @@ public int hashCode() { public boolean equals(Object other) { if (!(other instanceof LsCommandHandler)) return false; LsCommandHandler o = (LsCommandHandler) other; - if (!Objects.equals(o.targets, targets)) return false; - return true; + return Objects.equals(o.targets, targets); } } diff --git a/shell/src/main/java/org/apache/kafka/shell/command/ManCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/ManCommandHandler.java index f10e89b2bffef..5892cdff4c5ad 100644 --- a/shell/src/main/java/org/apache/kafka/shell/command/ManCommandHandler.java +++ b/shell/src/main/java/org/apache/kafka/shell/command/ManCommandHandler.java @@ -110,7 +110,6 @@ public int hashCode() { public boolean equals(Object other) { if (!(other instanceof ManCommandHandler)) return false; ManCommandHandler o = (ManCommandHandler) other; - if (!o.cmd.equals(cmd)) return false; - return true; + return o.cmd.equals(cmd); } } diff --git a/shell/src/main/java/org/apache/kafka/shell/command/NoOpCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/NoOpCommandHandler.java index 106d2ddb0147c..e7168127e26c3 100644 --- a/shell/src/main/java/org/apache/kafka/shell/command/NoOpCommandHandler.java +++ b/shell/src/main/java/org/apache/kafka/shell/command/NoOpCommandHandler.java @@ -42,7 +42,6 @@ public int hashCode() { @Override public boolean equals(Object other) { - if (!(other instanceof NoOpCommandHandler)) return false; - return true; + return other instanceof NoOpCommandHandler; } } diff --git a/shell/src/main/java/org/apache/kafka/shell/command/PwdCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/PwdCommandHandler.java index 4a0752a4e701d..55046cf5e7243 100644 --- a/shell/src/main/java/org/apache/kafka/shell/command/PwdCommandHandler.java +++ b/shell/src/main/java/org/apache/kafka/shell/command/PwdCommandHandler.java @@ -88,7 +88,6 @@ public int hashCode() { @Override public boolean equals(Object other) { - if (!(other instanceof PwdCommandHandler)) return false; - return true; + return other instanceof PwdCommandHandler; } } diff --git a/shell/src/main/java/org/apache/kafka/shell/command/TreeCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/TreeCommandHandler.java index 1489e1f150090..ee937d1c2f5bb 100644 --- a/shell/src/main/java/org/apache/kafka/shell/command/TreeCommandHandler.java +++ b/shell/src/main/java/org/apache/kafka/shell/command/TreeCommandHandler.java @@ -117,7 +117,6 @@ public int hashCode() { public boolean equals(Object other) { if (!(other instanceof TreeCommandHandler)) return false; TreeCommandHandler o = (TreeCommandHandler) other; - if (!Objects.equals(o.targets, targets)) return false; - return true; + return Objects.equals(o.targets, targets); } } diff --git a/shell/src/main/java/org/apache/kafka/shell/glob/GlobVisitor.java b/shell/src/main/java/org/apache/kafka/shell/glob/GlobVisitor.java index 23b3a52d77623..6af3011d55946 100644 --- a/shell/src/main/java/org/apache/kafka/shell/glob/GlobVisitor.java +++ b/shell/src/main/java/org/apache/kafka/shell/glob/GlobVisitor.java @@ -76,8 +76,7 @@ public boolean equals(Object o) { if (!(o instanceof MetadataNodeInfo)) return false; MetadataNodeInfo other = (MetadataNodeInfo) o; if (!Arrays.equals(path, other.path)) return false; - if (!node.equals(other.node)) return false; - return true; + return node.equals(other.node); } @Override diff --git a/shell/src/test/java/org/apache/kafka/shell/command/CommandTest.java b/shell/src/test/java/org/apache/kafka/shell/command/CommandTest.java index 212ac11e69b5c..58528335c4be8 100644 --- a/shell/src/test/java/org/apache/kafka/shell/command/CommandTest.java +++ b/shell/src/test/java/org/apache/kafka/shell/command/CommandTest.java @@ -30,39 +30,39 @@ public class CommandTest { @Test public void testParseCommands() { - assertEquals(new CatCommandHandler(Arrays.asList("foo")), + assertEquals(new CatCommandHandler(Collections.singletonList("foo")), new Commands(true).parseCommand(Arrays.asList("cat", "foo"))); assertEquals(new CdCommandHandler(Optional.empty()), - new Commands(true).parseCommand(Arrays.asList("cd"))); + new Commands(true).parseCommand(Collections.singletonList("cd"))); assertEquals(new CdCommandHandler(Optional.of("foo")), new Commands(true).parseCommand(Arrays.asList("cd", "foo"))); assertEquals(new ExitCommandHandler(), - new Commands(true).parseCommand(Arrays.asList("exit"))); + new Commands(true).parseCommand(Collections.singletonList("exit"))); assertEquals(new HelpCommandHandler(), - new Commands(true).parseCommand(Arrays.asList("help"))); + new Commands(true).parseCommand(Collections.singletonList("help"))); assertEquals(new HistoryCommandHandler(3), new Commands(true).parseCommand(Arrays.asList("history", "3"))); assertEquals(new HistoryCommandHandler(Integer.MAX_VALUE), - new Commands(true).parseCommand(Arrays.asList("history"))); + new Commands(true).parseCommand(Collections.singletonList("history"))); assertEquals(new LsCommandHandler(Collections.emptyList()), - new Commands(true).parseCommand(Arrays.asList("ls"))); + new Commands(true).parseCommand(Collections.singletonList("ls"))); assertEquals(new LsCommandHandler(Arrays.asList("abc", "123")), new Commands(true).parseCommand(Arrays.asList("ls", "abc", "123"))); assertEquals(new PwdCommandHandler(), - new Commands(true).parseCommand(Arrays.asList("pwd"))); + new Commands(true).parseCommand(Collections.singletonList("pwd"))); } @Test public void testParseInvalidCommand() { assertEquals(new ErroneousCommandHandler("invalid choice: 'blah' (choose " + "from 'cat', 'cd', 'exit', 'find', 'help', 'history', 'ls', 'man', 'pwd', 'tree')"), - new Commands(true).parseCommand(Arrays.asList("blah"))); + new Commands(true).parseCommand(Collections.singletonList("blah"))); } @Test public void testEmptyCommandLine() { assertEquals(new NoOpCommandHandler(), - new Commands(true).parseCommand(Arrays.asList(""))); + new Commands(true).parseCommand(Collections.singletonList(""))); assertEquals(new NoOpCommandHandler(), new Commands(true).parseCommand(Collections.emptyList())); } diff --git a/shell/src/test/java/org/apache/kafka/shell/glob/GlobVisitorTest.java b/shell/src/test/java/org/apache/kafka/shell/glob/GlobVisitorTest.java index 000d8f2f01775..2bb74910f5ffc 100644 --- a/shell/src/test/java/org/apache/kafka/shell/glob/GlobVisitorTest.java +++ b/shell/src/test/java/org/apache/kafka/shell/glob/GlobVisitorTest.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -137,8 +138,8 @@ public void testDotDot() { InfoConsumer consumer = new InfoConsumer(); GlobVisitor visitor = new GlobVisitor("..", consumer); visitor.accept(DATA); - assertEquals(Optional.of(Arrays.asList( - new MetadataNodeInfo(new String[0], DATA.root()))), consumer.infos); + assertEquals(Optional.of(Collections.singletonList( + new MetadataNodeInfo(new String[0], DATA.root()))), consumer.infos); } @Test @@ -146,8 +147,8 @@ public void testDoubleDotDot() { InfoConsumer consumer = new InfoConsumer(); GlobVisitor visitor = new GlobVisitor("../..", consumer); visitor.accept(DATA); - assertEquals(Optional.of(Arrays.asList( - new MetadataNodeInfo(new String[0], DATA.root()))), consumer.infos); + assertEquals(Optional.of(Collections.singletonList( + new MetadataNodeInfo(new String[0], DATA.root()))), consumer.infos); } @Test @@ -189,8 +190,8 @@ public void testAbsoluteGlob() { InfoConsumer consumer = new InfoConsumer(); GlobVisitor visitor = new GlobVisitor("/a?pha", consumer); visitor.accept(DATA); - assertEquals(Optional.of(Arrays.asList( - new MetadataNodeInfo(new String[] {"alpha"}, - DATA.root().child("alpha")))), consumer.infos); + assertEquals(Optional.of(Collections.singletonList( + new MetadataNodeInfo(new String[]{"alpha"}, + DATA.root().child("alpha")))), consumer.infos); } } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java index 9617da17c6593..83884e6ce3da6 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java @@ -76,10 +76,7 @@ public boolean matches(final LocalTieredStorageCondition condition) { if (condition.baseOffset != null && !metadata.isPresent()) { return false; } - if (condition.baseOffset != null && metadata.get().startOffset() != condition.baseOffset) { - return false; - } - return true; + return condition.baseOffset == null || metadata.get().startOffset() == condition.baseOffset; } /** diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java index d0b4ee79ab62a..74ab28d9595d5 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java @@ -304,7 +304,7 @@ public void traverseMultipleOffloadedRecordsInOneSegment() throws RemoteStorageE final LocalTieredStorageSnapshot snapshot = takeSnapshot(tieredStorage); - assertEquals(asList(topicPartition), snapshot.getTopicPartitions()); + assertEquals(Collections.singletonList(topicPartition), snapshot.getTopicPartitions()); assertEquals(asList(wrap(record1), wrap(record2)), extractRecordsValue(snapshot, id)); } @@ -331,7 +331,7 @@ public void traverseMultipleOffloadedRecordsInTwoSegments() throws RemoteStorage actual.put(idA, extractRecordsValue(snapshot, idA)); actual.put(idB, extractRecordsValue(snapshot, idB)); - assertEquals(asList(topicPartition), snapshot.getTopicPartitions()); + assertEquals(Collections.singletonList(topicPartition), snapshot.getTopicPartitions()); assertEquals(expected, actual); } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java index 902b5c2d7132a..736a9cc4dbec1 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java @@ -111,13 +111,10 @@ private boolean matches(R1 expected, R2 actual, Description mismatchDescription) .appendValue(actual.getClass().getSimpleName()); return false; } - if (!compare(expectedRecord.key(), actualRecord.key(), keySerde.deserializer(), "Record key", - mismatchDescription) || - !compare(expectedRecord.value(), actualRecord.value(), valueSerde.deserializer(), "Record value", - mismatchDescription)) { - return false; - } - return true; + return compare(expectedRecord.key(), actualRecord.key(), keySerde.deserializer(), "Record key", + mismatchDescription) && + compare(expectedRecord.value(), actualRecord.value(), valueSerde.deserializer(), "Record value", + mismatchDescription); } private boolean compare(ByteBuffer lhs, diff --git a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java index a907233057ea8..1373d274e6846 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java @@ -219,8 +219,8 @@ private static void printExtendedProgress(long bytesRead, } public static class ConsumerPerfRebListener implements ConsumerRebalanceListener { - private AtomicLong joinTimeMs; - private AtomicLong joinTimeMsInSingleRound; + private final AtomicLong joinTimeMs; + private final AtomicLong joinTimeMsInSingleRound; private long joinStartMs; public ConsumerPerfRebListener(AtomicLong joinTimeMs, long joinStartMs, AtomicLong joinTimeMsInSingleRound) { diff --git a/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java index 4db46867bc123..ace0add5f4e45 100644 --- a/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java @@ -160,28 +160,26 @@ private static void electLeaders(Admin client, ElectionType electionType, Option String partitionsAsString = succeeded.stream() .map(TopicPartition::toString) .collect(Collectors.joining(", ")); - System.out.println(String.format("Successfully completed leader election (%s) for partitions %s", - electionType, partitionsAsString)); + System.out.printf("Successfully completed leader election (%s) for partitions %s%n", + electionType, partitionsAsString); } if (!noop.isEmpty()) { String partitionsAsString = noop.stream() .map(TopicPartition::toString) .collect(Collectors.joining(", ")); - System.out.println(String.format("Valid replica already elected for partitions %s", partitionsAsString)); + System.out.printf("Valid replica already elected for partitions %s%n", partitionsAsString); } if (!failed.isEmpty()) { AdminCommandFailedException rootException = new AdminCommandFailedException(String.format("%s replica(s) could not be elected", failed.size())); failed.forEach((key, value) -> { - System.out.println( - String.format( - "Error completing leader election (%s) for partition: %s: %s", - electionType, - key, - value - ) + System.out.printf( + "Error completing leader election (%s) for partition: %s: %s%n", + electionType, + key, + value ); rootException.addSuppressed(value); }); diff --git a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java index 394f5078c4690..1a6558def9145 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java +++ b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java @@ -48,7 +48,7 @@ public static void printMetrics(Map metrics) { } String doubleOutputFormat = "%-" + maxLengthOfDisplayName + "s : %.3f"; String defaultOutputFormat = "%-" + maxLengthOfDisplayName + "s : %s"; - System.out.println(String.format("\n%-" + maxLengthOfDisplayName + "s %s", "Metric Name", "Value")); + System.out.printf("\n%-" + maxLengthOfDisplayName + "s %s%n", "Metric Name", "Value"); for (Map.Entry entry : sortedMetrics.entrySet()) { String outputFormat; @@ -56,7 +56,7 @@ public static void printMetrics(Map metrics) { outputFormat = doubleOutputFormat; else outputFormat = defaultOutputFormat; - System.out.println(String.format(outputFormat, entry.getKey(), entry.getValue())); + System.out.printf(outputFormat + "%n", entry.getKey(), entry.getValue()); } } } diff --git a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java index 93b19adcf3ca9..76971b28c6092 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java @@ -984,7 +984,7 @@ private void checkRequiredArgs() { CommandLineUtils.checkRequiredArgs(parser, options, topicOpt); if (has(alterOpt)) { Set> usedOptions = new HashSet<>(Arrays.asList(bootstrapServerOpt, configOpt)); - Set> invalidOptions = new HashSet<>(Arrays.asList(alterOpt)); + Set> invalidOptions = new HashSet<>(Collections.singletonList(alterOpt)); CommandLineUtils.checkInvalidArgsSet(parser, options, usedOptions, invalidOptions, Optional.of(KAFKA_CONFIGS_CLI_SUPPORTS_ALTERING_TOPIC_CONFIGS_WITH_A_BOOTSTRAP_SERVER)); CommandLineUtils.checkRequiredArgs(parser, options, partitionsOpt); } @@ -994,9 +994,9 @@ private void checkInvalidArgs() { // check invalid args CommandLineUtils.checkInvalidArgs(parser, options, configOpt, invalidOptions(Arrays.asList(alterOpt, createOpt))); CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, - invalidOptions(new HashSet<>(Arrays.asList(bootstrapServerOpt)), Arrays.asList(alterOpt))); + invalidOptions(new HashSet<>(Collections.singletonList(bootstrapServerOpt)), Collections.singletonList(alterOpt))); CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, invalidOptions(Arrays.asList(alterOpt, createOpt))); - CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, invalidOptions(Arrays.asList(createOpt))); + CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, invalidOptions(Collections.singletonList(createOpt))); CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, invalidOptions(Arrays.asList(alterOpt, createOpt))); if (options.has(createOpt)) { CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, partitionsOpt, replicationFactorOpt); @@ -1012,10 +1012,10 @@ private void checkInvalidArgs() { CommandLineUtils.checkInvalidArgs(parser, options, reportUnavailablePartitionsOpt, invalidOptions(Collections.singleton(topicsWithOverridesOpt), Arrays.asList(describeOpt, reportUnavailablePartitionsOpt))); CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt, - invalidOptions(new HashSet<>(allReplicationReportOpts), Arrays.asList(describeOpt))); + invalidOptions(new HashSet<>(allReplicationReportOpts), Collections.singletonList(describeOpt))); CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, invalidOptions(Arrays.asList(alterOpt, deleteOpt, describeOpt))); - CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, invalidOptions(Arrays.asList(createOpt))); + CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, invalidOptions(Collections.singletonList(createOpt))); CommandLineUtils.checkInvalidArgs(parser, options, excludeInternalTopicOpt, invalidOptions(Arrays.asList(listOpt, describeOpt))); } diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java index 42d65816d63c8..e0e9239a2aab4 100644 --- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java @@ -40,6 +40,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Properties; @@ -325,7 +326,7 @@ public void testTopicPartitionsArgWithInternalIncluded() { List offsets = executeAndParse("--topic-partitions", "__.*:0"); - assertEquals(Arrays.asList(new Row("__consumer_offsets", 0, 0L)), offsets); + assertEquals(Collections.singletonList(new Row("__consumer_offsets", 0, 0L)), offsets); } @ClusterTest diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java b/tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java index 1dd3082feba20..81147f48cba6c 100644 --- a/tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java @@ -930,7 +930,7 @@ public void testDescribeUnderMinIsrPartitionsMixed(String quorum) { fullyReplicatedReplicaAssignmentMap.put(0, JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 1, (Object) 2, (Object) 3)).asScala().toSeq()); scala.collection.mutable.HashMap> offlineReplicaAssignmentMap = new scala.collection.mutable.HashMap<>(); - offlineReplicaAssignmentMap.put(0, JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 0)).asScala().toSeq()); + offlineReplicaAssignmentMap.put(0, JavaConverters.asScalaBufferConverter(Collections.singletonList((Object) 0)).asScala().toSeq()); Properties topicConfig = new Properties(); topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6"); diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java index d4fff9d0c52f0..cf92593004f6c 100644 --- a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java @@ -79,7 +79,7 @@ public void testIsNotUnderReplicatedWhenAdding() { new TopicPartitionInfo(0, new Node(1, "localhost", 9091), replicas, Collections.singletonList(new Node(1, "localhost", 9091))), null, false, - new PartitionReassignment(replicaIds, Arrays.asList(2), Collections.emptyList()) + new PartitionReassignment(replicaIds, Collections.singletonList(2), Collections.emptyList()) ); assertFalse(partitionDescription.isUnderReplicated()); @@ -222,7 +222,7 @@ public void testCreateTopicDoesNotRetryThrottlingQuotaExceededException() { .configs(Collections.emptyMap()); verify(adminClient, times(1)).createTopics( - eq(new HashSet<>(Arrays.asList(expectedNewTopic))), + eq(new HashSet<>(Collections.singletonList(expectedNewTopic))), argThat(exception -> !exception.shouldRetryOnQuotaViolation()) ); } @@ -247,7 +247,7 @@ public void testDeleteTopicDoesNotRetryThrottlingQuotaExceededException() { assertInstanceOf(ThrottlingQuotaExceededException.class, exception.getCause()); verify(adminClient).deleteTopics( - argThat((Collection topics) -> topics.equals(Arrays.asList(topicName))), + argThat((Collection topics) -> topics.equals(Collections.singletonList(topicName))), argThat((DeleteTopicsOptions options) -> !options.shouldRetryOnQuotaViolation())); } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java deleted file mode 100644 index 4fbdeec0d946f..0000000000000 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java +++ /dev/null @@ -1,336 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.tools.consumer.group; - -import kafka.api.BaseConsumerTest; -import kafka.server.KafkaConfig; -import kafka.utils.TestUtils; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.GroupProtocol; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.RangeAssignor; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.params.provider.Arguments; -import scala.collection.JavaConverters; -import scala.collection.Seq; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import java.util.stream.Stream; - -public class ConsumerGroupCommandTest extends kafka.integration.KafkaServerTestHarness { - public static final String TOPIC = "foo"; - public static final String GROUP = "test.group"; - public static final String PROTOCOL_GROUP = "protocol-group"; - - List consumerGroupService = new ArrayList<>(); - List consumerGroupExecutors = new ArrayList<>(); - - @Override - public Seq generateConfigs() { - List cfgs = new ArrayList<>(); - - TestUtils.createBrokerConfigs( - 1, - zkConnectOrNull(), - false, - true, - scala.None$.empty(), - scala.None$.empty(), - scala.None$.empty(), - true, - false, - false, - false, - scala.collection.immutable.Map$.MODULE$.empty(), - 1, - false, - 1, - (short) 1, - 0, - false - ).foreach(props -> { - if (isNewGroupCoordinatorEnabled()) { - props.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true"); - props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer"); - } - cfgs.add(KafkaConfig.fromProps(props)); - return null; - }); - - return seq(cfgs); - } - - @BeforeEach - @Override - public void setUp(TestInfo testInfo) { - super.setUp(testInfo); - createTopic(TOPIC, 1, 1, new Properties(), listenerName(), new Properties()); - } - - @AfterEach - @Override - public void tearDown() { - consumerGroupService.forEach(ConsumerGroupCommand.ConsumerGroupService::close); - consumerGroupExecutors.forEach(AbstractConsumerGroupExecutor::shutdown); - super.tearDown(); - } - - Map committedOffsets(String topic, String group) { - try (Consumer consumer = createNoAutoCommitConsumer(group)) { - Set partitions = consumer.partitionsFor(topic).stream() - .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())) - .collect(Collectors.toSet()); - return consumer.committed(partitions).entrySet().stream() - .filter(e -> e.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); - } - } - - Consumer createNoAutoCommitConsumer(String group) { - Properties props = new Properties(); - props.put("bootstrap.servers", bootstrapServers(listenerName())); - props.put("group.id", group); - props.put("enable.auto.commit", "false"); - return new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer()); - } - - ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] args) { - ConsumerGroupCommandOptions opts = ConsumerGroupCommandOptions.fromArgs(args); - ConsumerGroupCommand.ConsumerGroupService service = new ConsumerGroupCommand.ConsumerGroupService( - opts, - Collections.singletonMap(AdminClientConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE)) - ); - - consumerGroupService.add(0, service); - return service; - } - - ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String groupProtocol) { - return addConsumerGroupExecutor(numConsumers, TOPIC, GROUP, RangeAssignor.class.getName(), Optional.empty(), Optional.empty(), false, groupProtocol); - } - - ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String groupProtocol, Optional remoteAssignor) { - return addConsumerGroupExecutor(numConsumers, TOPIC, GROUP, RangeAssignor.class.getName(), remoteAssignor, Optional.empty(), false, groupProtocol); - } - - ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String topic, String group, String groupProtocol) { - return addConsumerGroupExecutor(numConsumers, topic, group, RangeAssignor.class.getName(), Optional.empty(), Optional.empty(), false, groupProtocol); - } - - ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String topic, String group, String strategy, Optional remoteAssignor, - Optional customPropsOpt, boolean syncCommit, String groupProtocol) { - ConsumerGroupExecutor executor = new ConsumerGroupExecutor(bootstrapServers(listenerName()), numConsumers, group, groupProtocol, - topic, strategy, remoteAssignor, customPropsOpt, syncCommit); - addExecutor(executor); - return executor; - } - - SimpleConsumerGroupExecutor addSimpleGroupExecutor(Collection partitions, String group) { - SimpleConsumerGroupExecutor executor = new SimpleConsumerGroupExecutor(bootstrapServers(listenerName()), group, partitions); - addExecutor(executor); - return executor; - } - - private AbstractConsumerGroupExecutor addExecutor(AbstractConsumerGroupExecutor executor) { - consumerGroupExecutors.add(0, executor); - return executor; - } - - abstract class AbstractConsumerRunnable implements Runnable { - final String broker; - final String groupId; - final Optional customPropsOpt; - final boolean syncCommit; - - final Properties props = new Properties(); - KafkaConsumer consumer; - - boolean configured = false; - - public AbstractConsumerRunnable(String broker, String groupId, Optional customPropsOpt, boolean syncCommit) { - this.broker = broker; - this.groupId = groupId; - this.customPropsOpt = customPropsOpt; - this.syncCommit = syncCommit; - } - - void configure() { - configured = true; - configure(props); - customPropsOpt.ifPresent(props::putAll); - consumer = new KafkaConsumer<>(props); - } - - void configure(Properties props) { - props.put("bootstrap.servers", broker); - props.put("group.id", groupId); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - } - - abstract void subscribe(); - - @Override - public void run() { - assert configured : "Must call configure before use"; - try { - subscribe(); - while (true) { - consumer.poll(Duration.ofMillis(Long.MAX_VALUE)); - if (syncCommit) - consumer.commitSync(); - } - } catch (WakeupException e) { - // OK - } finally { - consumer.close(); - } - } - - void shutdown() { - consumer.wakeup(); - } - } - - class ConsumerRunnable extends AbstractConsumerRunnable { - final String topic; - final String groupProtocol; - final String strategy; - final Optional remoteAssignor; - - public ConsumerRunnable(String broker, String groupId, String groupProtocol, String topic, String strategy, - Optional remoteAssignor, Optional customPropsOpt, boolean syncCommit) { - super(broker, groupId, customPropsOpt, syncCommit); - - this.topic = topic; - this.groupProtocol = groupProtocol; - this.strategy = strategy; - this.remoteAssignor = remoteAssignor; - } - - @Override - void configure(Properties props) { - super.configure(props); - props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); - if (groupProtocol.toUpperCase(Locale.ROOT).equals(GroupProtocol.CONSUMER.toString())) { - remoteAssignor.ifPresent(assignor -> props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, assignor)); - } else { - props.put("partition.assignment.strategy", strategy); - } - } - - @Override - void subscribe() { - consumer.subscribe(Collections.singleton(topic)); - } - } - - class SimpleConsumerRunnable extends AbstractConsumerRunnable { - final Collection partitions; - - public SimpleConsumerRunnable(String broker, String groupId, Collection partitions) { - super(broker, groupId, Optional.empty(), false); - - this.partitions = partitions; - } - - @Override - void subscribe() { - consumer.assign(partitions); - } - } - - class AbstractConsumerGroupExecutor { - final int numThreads; - final ExecutorService executor; - final List consumers = new ArrayList<>(); - - public AbstractConsumerGroupExecutor(int numThreads) { - this.numThreads = numThreads; - this.executor = Executors.newFixedThreadPool(numThreads); - } - - void submit(AbstractConsumerRunnable consumerThread) { - consumers.add(consumerThread); - executor.submit(consumerThread); - } - - void shutdown() { - consumers.forEach(AbstractConsumerRunnable::shutdown); - executor.shutdown(); - try { - executor.awaitTermination(5000, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } - - class ConsumerGroupExecutor extends AbstractConsumerGroupExecutor { - public ConsumerGroupExecutor(String broker, int numConsumers, String groupId, String groupProtocol, String topic, String strategy, - Optional remoteAssignor, Optional customPropsOpt, boolean syncCommit) { - super(numConsumers); - IntStream.rangeClosed(1, numConsumers).forEach(i -> { - ConsumerRunnable th = new ConsumerRunnable(broker, groupId, groupProtocol, topic, strategy, remoteAssignor, customPropsOpt, syncCommit); - th.configure(); - submit(th); - }); - } - } - - class SimpleConsumerGroupExecutor extends AbstractConsumerGroupExecutor { - public SimpleConsumerGroupExecutor(String broker, String groupId, Collection partitions) { - super(1); - - SimpleConsumerRunnable th = new SimpleConsumerRunnable(broker, groupId, partitions); - th.configure(); - submit(th); - } - } - - - public static Stream getTestQuorumAndGroupProtocolParametersAll() { - return BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll(); - } - - @SuppressWarnings({"deprecation"}) - static Seq seq(Collection seq) { - return JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq(); - } -} diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java index 609b68c4cc90d..39a7844321dc5 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java @@ -144,10 +144,10 @@ private void addTopics(MockAdminClient adminClient) { asList(b.get(1), b.get(2), b.get(3)), asList(b.get(1), b.get(2), b.get(3))) ), Collections.emptyMap()); - adminClient.addTopic(false, "bar", asList( - new TopicPartitionInfo(0, b.get(2), - asList(b.get(2), b.get(3), b.get(0)), - asList(b.get(2), b.get(3), b.get(0))) + adminClient.addTopic(false, "bar", Collections.singletonList( + new TopicPartitionInfo(0, b.get(2), + asList(b.get(2), b.get(3), b.get(0)), + asList(b.get(2), b.get(3), b.get(0))) ), Collections.emptyMap()); } @@ -219,10 +219,10 @@ public void testFindLogDirMoveStates() throws Exception { addTopics(adminClient); List b = adminClient.brokers(); - adminClient.addTopic(false, "quux", asList( - new TopicPartitionInfo(0, b.get(2), - asList(b.get(1), b.get(2), b.get(3)), - asList(b.get(1), b.get(2), b.get(3)))), + adminClient.addTopic(false, "quux", Collections.singletonList( + new TopicPartitionInfo(0, b.get(2), + asList(b.get(1), b.get(2), b.get(3)), + asList(b.get(1), b.get(2), b.get(3)))), Collections.emptyMap()); Map replicaAssignment = new HashMap<>(); @@ -289,7 +289,7 @@ public void testGetReplicaAssignments() throws Exception { assignments.put(new TopicPartition("foo", 0), asList(0, 1, 2)); assignments.put(new TopicPartition("foo", 1), asList(1, 2, 3)); - assertEquals(assignments, getReplicaAssignmentForTopics(adminClient, asList("foo"))); + assertEquals(assignments, getReplicaAssignmentForTopics(adminClient, Collections.singletonList("foo"))); assignments.clear(); @@ -344,7 +344,7 @@ public void testParseGenerateAssignmentArgs() throws Exception { assertThrows(AdminCommandFailedException.class, () -> parseGenerateAssignmentArgs( "{\"topics\": [{\"topic\": \"foo\"}], \"version\":1}", "5,2,3,4,5"), "Expected to detect duplicate broker list entries").getMessage()); - assertEquals(new SimpleImmutableEntry<>(asList(5, 2, 3, 4), asList("foo")), + assertEquals(new SimpleImmutableEntry<>(asList(5, 2, 3, 4), Collections.singletonList("foo")), parseGenerateAssignmentArgs("{\"topics\": [{\"topic\": \"foo\"}], \"version\":1}", "5,2,3,4")); assertStartsWith("List of topics to reassign contains duplicate entries", assertThrows(AdminCommandFailedException.class, () -> parseGenerateAssignmentArgs( @@ -473,7 +473,7 @@ public void testMoveMap() { Map currentReassignments = new HashMap<>(); currentReassignments.put(new TopicPartition("foo", 0), new PartitionReassignment( - asList(1, 2, 3, 4), asList(4), asList(3))); + asList(1, 2, 3, 4), Collections.singletonList(4), Collections.singletonList(3))); currentReassignments.put(new TopicPartition("foo", 1), new PartitionReassignment( asList(4, 5, 6, 7, 8), asList(7, 8), asList(4, 5))); currentReassignments.put(new TopicPartition("foo", 2), new PartitionReassignment( @@ -490,7 +490,7 @@ public void testMoveMap() { proposedParts.put(new TopicPartition("foo", 0), asList(1, 2, 5)); proposedParts.put(new TopicPartition("foo", 2), asList(3, 4)); proposedParts.put(new TopicPartition("foo", 3), asList(5, 6)); - proposedParts.put(new TopicPartition("foo", 4), asList(3)); + proposedParts.put(new TopicPartition("foo", 4), Collections.singletonList(3)); proposedParts.put(new TopicPartition("foo", 5), asList(3, 4, 5, 6)); proposedParts.put(new TopicPartition("bar", 0), asList(1, 2, 3)); @@ -509,16 +509,16 @@ public void testMoveMap() { Map fooMoves = new HashMap<>(); - fooMoves.put(0, new PartitionMove(new HashSet<>(asList(1, 2, 3)), new HashSet<>(asList(5)))); + fooMoves.put(0, new PartitionMove(new HashSet<>(asList(1, 2, 3)), new HashSet<>(Collections.singletonList(5)))); fooMoves.put(1, new PartitionMove(new HashSet<>(asList(4, 5, 6)), new HashSet<>(asList(7, 8)))); fooMoves.put(2, new PartitionMove(new HashSet<>(asList(1, 2)), new HashSet<>(asList(3, 4)))); fooMoves.put(3, new PartitionMove(new HashSet<>(asList(1, 2)), new HashSet<>(asList(5, 6)))); - fooMoves.put(4, new PartitionMove(new HashSet<>(asList(1, 2)), new HashSet<>(asList(3)))); + fooMoves.put(4, new PartitionMove(new HashSet<>(asList(1, 2)), new HashSet<>(Collections.singletonList(3)))); fooMoves.put(5, new PartitionMove(new HashSet<>(asList(1, 2)), new HashSet<>(asList(3, 4, 5, 6)))); Map barMoves = new HashMap<>(); - barMoves.put(0, new PartitionMove(new HashSet<>(asList(2, 3, 4)), new HashSet<>(asList(1)))); + barMoves.put(0, new PartitionMove(new HashSet<>(asList(2, 3, 4)), new HashSet<>(Collections.singletonList(1)))); assertEquals(fooMoves, moveMap.get("foo")); assertEquals(barMoves, moveMap.get("bar")); @@ -747,7 +747,7 @@ public void testAlterReplicaLogDirs() throws Exception { assignment.put(new TopicPartitionReplica("quux", 1, 0), "/tmp/kafka-logs1"); assertEquals( - new HashSet<>(asList(new TopicPartitionReplica("foo", 0, 0))), + new HashSet<>(Collections.singletonList(new TopicPartitionReplica("foo", 0, 0))), alterReplicaLogDirs(adminClient, assignment) ); } diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java index 7ed3d7c8d65f6..70ad19b4231fb 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java @@ -133,10 +133,6 @@ public boolean matches(String taskId, long startMs, long endMs, TaskStateType st return false; } - if (this.state.isPresent() && !this.state.get().equals(state)) { - return false; - } - - return true; + return !this.state.isPresent() || this.state.get().equals(state); } } diff --git a/trogdor/src/test/java/org/apache/kafka/trogdor/common/StringExpanderTest.java b/trogdor/src/test/java/org/apache/kafka/trogdor/common/StringExpanderTest.java index 22da4987c3015..fc9ce2cb81dd0 100644 --- a/trogdor/src/test/java/org/apache/kafka/trogdor/common/StringExpanderTest.java +++ b/trogdor/src/test/java/org/apache/kafka/trogdor/common/StringExpanderTest.java @@ -45,8 +45,8 @@ public void testExpansions() { )); assertEquals(expected1, StringExpander.expand("foo[1-3]")); - HashSet expected2 = new HashSet<>(Arrays.asList( - "foo bar baz 0" + HashSet expected2 = new HashSet<>(Collections.singletonList( + "foo bar baz 0" )); assertEquals(expected2, StringExpander.expand("foo bar baz [0-0]"));