diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index 6b6b56059c84d..4ebd600cbd854 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -124,7 +124,7 @@ static List resolve(String host, HostResolver hostResolver) throws InetAddress[] addresses = hostResolver.resolve(host); List result = filterPreferredAddresses(addresses); if (log.isDebugEnabled()) - log.debug("Resolved host {} as {}", host, result.stream().map(i -> i.getHostAddress()).collect(Collectors.joining(","))); + log.debug("Resolved host {} as {}", host, result.stream().map(InetAddress::getHostAddress).collect(Collectors.joining(","))); return result; } diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index 2bd70206fbe0f..dc3d9f4d7458b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -148,7 +148,7 @@ public Iterable clearAll(String node) { } else { final Deque clearedRequests = requests.remove(node); inFlightRequestCount.getAndAdd(-clearedRequests.size()); - return () -> clearedRequests.descendingIterator(); + return clearedRequests::descendingIterator; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 30cad44a4bc97..0aec2287c5d4b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -425,8 +425,8 @@ public synchronized Set updatePartitionLeadership(Map existingTopicIds = this.metadataSnapshot.topicIds(); Map topicIdsForUpdatedTopics = updatedTopics.stream() - .filter(e -> existingTopicIds.containsKey(e)) - .collect(Collectors.toMap(e -> e, e -> existingTopicIds.get(e))); + .filter(existingTopicIds::containsKey) + .collect(Collectors.toMap(e -> e, existingTopicIds::get)); if (log.isDebugEnabled()) { updatePartitionMetadata.forEach( diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java index 8e3c2d2822615..ebab1507e6cc2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java @@ -44,9 +44,7 @@ public DescribeConsumerGroupsResult(final Map> describedGroups() { - Map> describedGroups = new HashMap<>(); - futures.forEach((key, future) -> describedGroups.put(key, future)); - return describedGroups; + return new HashMap<>(futures); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java index ef9f105850a5f..fabd91b58f782 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeDelegationTokenOptions.java @@ -32,9 +32,9 @@ public class DescribeDelegationTokenOptions extends AbstractOptions owners; /** - * if owners is null, all the user owned tokens and tokens where user have Describe permission + * If owners is null, all the user owned tokens and tokens where user have Describe permission * will be returned. - * @param owners + * @param owners The owners that we want to describe delegation tokens for * @return this instance */ public DescribeDelegationTokenOptions owners(List owners) { diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java index c57867aad7c8d..d6f1114d61f4a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java @@ -52,7 +52,7 @@ public Map> return descriptions().entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, - entry -> entry.getValue().thenApply(map -> convertMapValues(map)))); + entry -> entry.getValue().thenApply(this::convertMapValues))); } @SuppressWarnings("deprecation") @@ -88,7 +88,7 @@ public Map>> descriptions() @Deprecated public KafkaFuture>> all() { return allDescriptions().thenApply(map -> map.entrySet().stream().collect(Collectors.toMap( - entry -> entry.getKey(), + Map.Entry::getKey, entry -> convertMapValues(entry.getValue()) ))); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java index 54bd9c142b0b0..3e7457bda5946 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java @@ -51,21 +51,18 @@ public Map> values() { * Return a future which succeeds if log directory information of all replicas are available */ public KafkaFuture> all() { - return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])). - thenApply(new KafkaFuture.BaseFunction>() { - @Override - public Map apply(Void v) { - Map replicaLogDirInfos = new HashMap<>(); - for (Map.Entry> entry : futures.entrySet()) { - try { - replicaLogDirInfos.put(entry.getKey(), entry.getValue().get()); - } catch (InterruptedException | ExecutionException e) { - // This should be unreachable, because allOf ensured that all the futures completed successfully. - throw new RuntimeException(e); - } + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])) + .thenApply(v -> { + Map replicaLogDirInfos = new HashMap<>(); + for (Map.Entry> entry : futures.entrySet()) { + try { + replicaLogDirInfos.put(entry.getKey(), entry.getValue().get()); + } catch (InterruptedException | ExecutionException e) { + // This should be unreachable, because allOf ensured that all the futures completed successfully. + throw new RuntimeException(e); } - return replicaLogDirInfos; } + return replicaLogDirInfos; }); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 92ba6ad3d6c1f..dc68f3f7a7c13 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2195,9 +2195,9 @@ Call generateDescribeTopicsCallWithDescribeTopicPartitionsApi( long now ) { final Map topicsRequests = new LinkedHashMap<>(); - topicNamesList.stream().sorted().forEach(topic -> { - topicsRequests.put(topic, new TopicRequest().setName(topic)); - }); + topicNamesList.stream().sorted().forEach(topic -> + topicsRequests.put(topic, new TopicRequest().setName(topic)) + ); return new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { TopicDescription partiallyFinishedTopicDescription = null; @@ -3043,7 +3043,7 @@ public DescribeLogDirsRequest.Builder createRequest(int timeoutMs) { public void handleResponse(AbstractResponse abstractResponse) { DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse; Map descriptions = logDirDescriptions(response); - if (descriptions.size() > 0) { + if (!descriptions.isEmpty()) { future.complete(descriptions); } else { // Up to v3 DescribeLogDirsResponse did not have an error code field, hence it defaults to None @@ -3555,10 +3555,10 @@ private void maybeAddConsumerGroup(ListGroupsResponseData.ListedGroup group) { String protocolType = group.protocolType(); if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) { final String groupId = group.groupId(); - final Optional state = group.groupState().equals("") + final Optional state = group.groupState().isEmpty() ? Optional.empty() : Optional.of(ConsumerGroupState.parse(group.groupState())); - final Optional type = group.groupType().equals("") + final Optional type = group.groupType().isEmpty() ? Optional.empty() : Optional.of(GroupType.parse(group.groupType())); final ConsumerGroupListing groupListing = new ConsumerGroupListing( @@ -4210,9 +4210,9 @@ public void handleResponse(AbstractResponse abstractResponse) { * Be sure to do this after the NOT_CONTROLLER error check above * so that all errors are consistent in that case. */ - userIllegalAlterationExceptions.entrySet().stream().forEach(entry -> { - futures.get(entry.getKey()).completeExceptionally(entry.getValue()); - }); + userIllegalAlterationExceptions.entrySet().stream().forEach(entry -> + futures.get(entry.getKey()).completeExceptionally(entry.getValue()) + ); response.data().results().forEach(result -> { KafkaFutureImpl future = futures.get(result.user()); if (future == null) { diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java index 21540732d30d2..15ea042eb1f16 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java @@ -48,13 +48,13 @@ public KafkaFuture> namesToListings() { * Return a future which yields a collection of TopicListing objects. */ public KafkaFuture> listings() { - return future.thenApply(namesToDescriptions -> namesToDescriptions.values()); + return future.thenApply(Map::values); } /** * Return a future which yields a collection of topic names. */ public KafkaFuture> names() { - return future.thenApply(namesToListings -> namesToListings.keySet()); + return future.thenApply(Map::keySet); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java index 49cf484f5d625..ba950566d25b8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java @@ -78,7 +78,7 @@ public ListTransactionsOptions filterOnDuration(long durationMs) { /** * Returns the set of states to be filtered or empty if no states have been specified. * - * @return the current set of filtered states (empty means that no states are filtered and all + * @return the current set of filtered states (empty means that no states are filtered and * all transactions will be returned) */ public Set filteredStates() { diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsResult.java index c9670dba55245..c22f8b7791baf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsResult.java @@ -103,7 +103,7 @@ public KafkaFuture>> allByBrokerId() } Set remainingResponses = new HashSet<>(map.keySet()); - map.forEach((brokerId, future) -> { + map.forEach((brokerId, future) -> future.whenComplete((listings, brokerException) -> { if (brokerException != null) { allFuture.completeExceptionally(brokerException); @@ -115,8 +115,8 @@ public KafkaFuture>> allByBrokerId() allFuture.complete(allListingsMap); } } - }); - }); + }) + ); }); return allFuture; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java index fcb657fff8f92..184608ee95f91 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java @@ -212,7 +212,7 @@ private ApiResult handledConsumerGroup final Set authorizedOperations = validAclOperations(describedGroup.authorizedOperations()); final List memberDescriptions = new ArrayList<>(describedGroup.members().size()); - describedGroup.members().forEach(groupMember -> { + describedGroup.members().forEach(groupMember -> memberDescriptions.add(new MemberDescription( groupMember.memberId(), Optional.ofNullable(groupMember.instanceId()), @@ -220,8 +220,8 @@ private ApiResult handledConsumerGroup groupMember.clientHost(), new MemberAssignment(convertAssignment(groupMember.assignment())), Optional.of(new MemberAssignment(convertAssignment(groupMember.targetAssignment()))) - )); - }); + )) + ); final ConsumerGroupDescription consumerGroupDescription = new ConsumerGroupDescription( diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java index fe8e48e705dc0..2a2192a58a9a1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java @@ -121,9 +121,9 @@ private void failAllPartitionsForTopic( Map failed, Function exceptionGenerator ) { - partitions.stream().filter(tp -> tp.topic().equals(topic)).forEach(tp -> { - failed.put(tp, exceptionGenerator.apply(tp)); - }); + partitions.stream().filter(tp -> tp.topic().equals(topic)).forEach(tp -> + failed.put(tp, exceptionGenerator.apply(tp)) + ); } private void handlePartitionError( diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java index 2f43b603fc8ff..8884c0393d608 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java @@ -152,7 +152,7 @@ public interface ConsumerRebalanceListener { * {@link #onPartitionsRevoked(Collection)} callback before any instance executes its * {@link #onPartitionsAssigned(Collection)} callback. During exceptional scenarios, partitions may be migrated * without the old owner being notified (i.e. their {@link #onPartitionsRevoked(Collection)} callback not triggered), - * and later when the old owner consumer realized this event, the {@link #onPartitionsLost(Collection)} (Collection)} callback + * and later when the old owner consumer realized this event, the {@link #onPartitionsLost(Collection)} callback * will be triggered by the consumer then. *

* It is common for the assignment callback to use the consumer instance in order to query offsets. It is possible diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 27faa80c65421..600f8bbd07ef4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -253,7 +253,7 @@ public synchronized ConsumerRecords poll(final Duration timeout) { } } - toClear.forEach(p -> this.records.remove(p)); + toClear.forEach(records::remove); return new ConsumerRecords<>(results); } @@ -263,7 +263,7 @@ public synchronized void addRecord(ConsumerRecord record) { Set currentAssigned = this.subscriptions.assignedPartitions(); if (!currentAssigned.contains(tp)) throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer"); - List> recs = this.records.computeIfAbsent(tp, k -> new ArrayList<>()); + List> recs = records.computeIfAbsent(tp, k -> new ArrayList<>()); recs.add(record); } @@ -286,8 +286,7 @@ public synchronized void setOffsetsException(KafkaException exception) { @Override public synchronized void commitAsync(Map offsets, OffsetCommitCallback callback) { ensureNotClosed(); - for (Map.Entry entry : offsets.entrySet()) - committed.put(entry.getKey(), entry.getValue()); + committed.putAll(offsets); if (callback != null) { callback.onComplete(offsets, null); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java index e95d4f1efd7b0..6bc064006fe6f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java @@ -170,7 +170,7 @@ private void assignRanges(TopicAssignmentState assignmentState, private void assignWithRackMatching(Collection assignmentStates, Map> assignment) { - assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> { + assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> { if (coPartitionedStates.size() > 1) assignCoPartitionedWithRackMatching(consumers, numPartitions, coPartitionedStates, assignment); @@ -179,8 +179,8 @@ private void assignWithRackMatching(Collection assignmentS if (state.needsRackAwareAssignment) assignRanges(state, state::racksMatch, assignment); } - }); - }); + }) + ); } private void assignCoPartitionedWithRackMatching(LinkedHashMap> consumers, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java index 7e7350a5946e8..0d3e4a256e2be 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java @@ -236,8 +236,7 @@ static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) { topicAssignments.add(topicAssignment); } struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray()); - if (memberData.generation.isPresent()) - struct.set(GENERATION_KEY_NAME, memberData.generation.get()); + memberData.generation.ifPresent(integer -> struct.set(GENERATION_KEY_NAME, integer)); ByteBuffer buffer = ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA_V1.sizeOf(struct)); STICKY_ASSIGNOR_USER_DATA_V1.write(buffer, struct); buffer.flip(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java index a6cc28fb0f4c6..aba539b876326 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java @@ -28,7 +28,6 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import java.util.Objects; @@ -51,7 +50,6 @@ */ public class CoordinatorRequestManager implements RequestManager { private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 * 1000; - private final Time time; private final Logger log; private final BackgroundEventHandler backgroundEventHandler; private final String groupId; @@ -62,7 +60,6 @@ public class CoordinatorRequestManager implements RequestManager { private Node coordinator; public CoordinatorRequestManager( - final Time time, final LogContext logContext, final long retryBackoffMs, final long retryBackoffMaxMs, @@ -70,7 +67,6 @@ public CoordinatorRequestManager( final String groupId ) { Objects.requireNonNull(groupId); - this.time = time; this.log = logContext.logger(this.getClass()); this.backgroundEventHandler = errorHandler; this.groupId = groupId; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java index d31d412c65503..0ed4a67b5869d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java @@ -569,7 +569,7 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { sentFields.rebalanceTimeoutMs = rebalanceTimeoutMs; } - // SubscribedTopicNames - only sent if has changed since the last heartbeat + // SubscribedTopicNames - only sent if it has changed since the last heartbeat TreeSet subscribedTopicNames = new TreeSet<>(this.subscriptions.subscription()); if (sendAllFields || !subscribedTopicNames.equals(sentFields.subscribedTopicNames)) { data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription())); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 75d87432db680..b198cdbefe017 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -163,7 +163,7 @@ protected RequestManagers create() { if (groupRebalanceConfig != null && groupRebalanceConfig.groupId != null) { Optional serverAssignor = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); - coordinator = new CoordinatorRequestManager(time, + coordinator = new CoordinatorRequestManager( logContext, retryBackoffMs, retryBackoffMaxMs, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java index ce8aa4a824f90..3b3a711990822 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java @@ -48,7 +48,7 @@ TxnPartitionEntry get(TopicPartition topicPartition) { } TxnPartitionEntry getOrCreate(TopicPartition topicPartition) { - return topicPartitions.computeIfAbsent(topicPartition, tp -> new TxnPartitionEntry(tp)); + return topicPartitions.computeIfAbsent(topicPartition, TxnPartitionEntry::new); } boolean contains(TopicPartition topicPartition) { diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index 820adbdb5fbfa..93f2f4225bc74 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -246,8 +246,8 @@ public Node nodeById(int id) { /** * Get the node by node id if the replica for the given partition is online - * @param partition - * @param id + * @param partition The TopicPartition + * @param id The node id * @return the node */ public Optional nodeIfOnline(TopicPartition partition, int id) { diff --git a/clients/src/main/java/org/apache/kafka/common/ClusterResource.java b/clients/src/main/java/org/apache/kafka/common/ClusterResource.java index 749f2d124e077..2f857ff560975 100644 --- a/clients/src/main/java/org/apache/kafka/common/ClusterResource.java +++ b/clients/src/main/java/org/apache/kafka/common/ClusterResource.java @@ -30,7 +30,7 @@ public class ClusterResource { * Create {@link ClusterResource} with a cluster id. Note that cluster id may be {@code null} if the * metadata request was sent to a broker without support for cluster ids. The first version of Kafka * to support cluster id is 0.10.1.0. - * @param clusterId + * @param clusterId The cluster id */ public ClusterResource(String clusterId) { this.clusterId = clusterId; diff --git a/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockInputStream.java index e2c9ef8a63b6c..75576eedb5d75 100644 --- a/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockInputStream.java @@ -276,11 +276,6 @@ public void reset() { throw new RuntimeException("reset not supported"); } - @Override - public boolean markSupported() { - return false; - } - /** * Checks whether the version of lz4 on the classpath has the fix for reading from ByteBuffers with * non-zero array offsets (see https://github.com/lz4/lz4-java/pull/65) diff --git a/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java index 97e370a383caf..0f1d213f50e36 100644 --- a/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java @@ -333,10 +333,6 @@ public boolean isBlockChecksumSet() { return blockChecksum == 1; } - public boolean isBlockIndependenceSet() { - return blockIndependence == 1; - } - public int getVersion() { return version; } diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index b69ca06219a04..f7632c8ca56df 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -113,9 +113,7 @@ public AbstractConfig(ConfigDef definition, Map originals, Map this.originals = resolveConfigVariables(configProviderProps, originalMap); this.values = definition.parse(this.originals); Map configUpdates = postProcessParsedConfig(Collections.unmodifiableMap(this.values)); - for (Map.Entry update : configUpdates.entrySet()) { - this.values.put(update.getKey(), update.getValue()); - } + this.values.putAll(configUpdates); definition.parse(this.values); this.definition = definition; if (doLog) diff --git a/clients/src/main/java/org/apache/kafka/common/config/internals/AllowedPaths.java b/clients/src/main/java/org/apache/kafka/common/config/internals/AllowedPaths.java index 4fb6a483f66e7..b0f1f1a29c78f 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/internals/AllowedPaths.java +++ b/clients/src/main/java/org/apache/kafka/common/config/internals/AllowedPaths.java @@ -70,7 +70,7 @@ public Path parseUntrustedPath(String path) { if (allowedPaths != null) { Path normalisedPath = parsedPath.normalize(); - long allowed = allowedPaths.stream().filter(allowedPath -> normalisedPath.startsWith(allowedPath)).count(); + long allowed = allowedPaths.stream().filter(normalisedPath::startsWith).count(); if (allowed == 0) { return null; } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java index 43eb8cb319465..5fafa144956b5 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java @@ -21,7 +21,7 @@ import java.util.Map; /** - * A implementation of MetricsContext, it encapsulates required metrics context properties for Kafka services and clients + * An implementation of MetricsContext, it encapsulates required metrics context properties for Kafka services and clients */ public class KafkaMetricsContext implements MetricsContext { /** diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java index b52285dac63f4..9e987aaedaed6 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java @@ -392,7 +392,7 @@ public synchronized Sensor sensor(String name, MetricConfig config, Sensor.Recor * receive every value recorded with this sensor. * @param name The name of the sensor * @param config A default configuration to use for this sensor for metrics that don't have their own config - * @param inactiveSensorExpirationTimeSeconds If no value if recorded on the Sensor for this duration of time, + * @param inactiveSensorExpirationTimeSeconds If no value is recorded on the Sensor for this duration of time, * it is eligible for removal * @param parents The parent sensors * @param recordingLevel The recording level. @@ -419,7 +419,7 @@ public synchronized Sensor sensor(String name, MetricConfig config, long inactiv * receive every value recorded with this sensor. This uses a default recording level of INFO. * @param name The name of the sensor * @param config A default configuration to use for this sensor for metrics that don't have their own config - * @param inactiveSensorExpirationTimeSeconds If no value if recorded on the Sensor for this duration of time, + * @param inactiveSensorExpirationTimeSeconds If no value is recorded on the Sensor for this duration of time, * it is eligible for removal * @param parents The parent sensors * @return The sensor that is created diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java index 75771fb4acedc..e9b4582096dd1 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java @@ -40,13 +40,13 @@ public interface MetricsReporter extends Reconfigurable, AutoCloseable { /** * This is called whenever a metric is updated or added - * @param metric + * @param metric The metric that has been added or changed */ void metricChange(KafkaMetric metric); /** * This is called whenever a metric is removed - * @param metric + * @param metric The metric that has been removed */ void metricRemoval(KafkaMetric metric); diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java index 97f91822d8261..aa7de6fa6c931 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java @@ -116,7 +116,6 @@ public interface BinScheme { public static class ConstantBinScheme implements BinScheme { private static final int MIN_BIN_NUMBER = 0; private final double min; - private final double max; private final int bins; private final double bucketWidth; private final int maxBinNumber; @@ -132,7 +131,6 @@ public ConstantBinScheme(int bins, double min, double max) { if (bins < 2) throw new IllegalArgumentException("Must have at least 2 bins."); this.min = min; - this.max = max; this.bins = bins; this.bucketWidth = (max - min) / bins; this.maxBinNumber = bins - 1; diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java index 4ff41f81b594c..c6181b81c5e73 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.utils.Utils; import java.io.Closeable; -import java.io.IOException; import java.net.InetAddress; import java.nio.channels.SelectionKey; import java.util.Map; @@ -72,7 +71,7 @@ KafkaChannel buildChannel(String id, TransportLayer transportLayer, Supplier= RECORD_OVERHEAD_V0 && checksum() == computeChecksum(); } - public Long wrapperRecordTimestamp() { - return wrapperRecordTimestamp; - } - - public TimestampType wrapperRecordTimestampType() { - return wrapperRecordTimestampType; - } - /** * Throw an InvalidRecordException if isValid is false for this record */ diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index a5985103ec0a0..70f279a6c29bc 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -80,10 +80,10 @@ public void write(int b) { private long producerId; private short producerEpoch; private int baseSequence; - private int uncompressedRecordsSizeInBytes = 0; // Number of bytes (excluding the header) written before compression - private int numRecords = 0; - private float actualCompressionRatio = 1; - private long maxTimestamp = RecordBatch.NO_TIMESTAMP; + private int uncompressedRecordsSizeInBytes; // Number of bytes (excluding the header) written before compression + private int numRecords; + private float actualCompressionRatio; + private long maxTimestamp; private long offsetOfMaxTimestamp = -1; private Long lastOffset = null; private Long baseTimestamp = null; @@ -814,7 +814,7 @@ private void ensureOpenForRecordBatchWrite() { } /** - * Get an estimate of the number of bytes written (based on the estimation factor hard-coded in {@link CompressionType}. + * Get an estimate of the number of bytes written (based on the estimation factor hard-coded in {@link CompressionType}). * @return The estimated number of bytes written */ private int estimatedBytesWritten() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java index 0aae47aa24a96..142210f765d01 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java @@ -83,20 +83,20 @@ public Builder(AlterPartitionRequestData data, boolean canUseTopicIds) { @Override public AlterPartitionRequest build(short version) { if (version < 3) { - data.topics().forEach(topicData -> { + data.topics().forEach(topicData -> topicData.partitions().forEach(partitionData -> { // The newIsrWithEpochs will be empty after build. Then we can skip the conversion if the build // is called again. if (!partitionData.newIsrWithEpochs().isEmpty()) { List newIsr = new ArrayList<>(partitionData.newIsrWithEpochs().size()); - partitionData.newIsrWithEpochs().forEach(brokerState -> { - newIsr.add(brokerState.brokerId()); - }); + partitionData.newIsrWithEpochs().forEach(brokerState -> + newIsr.add(brokerState.brokerId()) + ); partitionData.setNewIsr(newIsr); partitionData.setNewIsrWithEpochs(Collections.emptyList()); } - }); - }); + }) + ); } return new AlterPartitionRequest(data, version); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionResponse.java index 9ee92f7b809cd..38b8eaf275bbd 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionResponse.java @@ -44,9 +44,9 @@ public AlterPartitionResponseData data() { public Map errorCounts() { Map counts = new HashMap<>(); updateErrorCounts(counts, Errors.forCode(data.errorCode())); - data.topics().forEach(topicResponse -> topicResponse.partitions().forEach(partitionResponse -> { - updateErrorCounts(counts, Errors.forCode(partitionResponse.errorCode())); - })); + data.topics().forEach(topicResponse -> topicResponse.partitions().forEach(partitionResponse -> + updateErrorCounts(counts, Errors.forCode(partitionResponse.errorCode())) + )); return counts; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java index bc5aa0ba35a33..d3b606eeac63a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java @@ -76,11 +76,11 @@ public static ControlledShutdownResponse prepareResponse(Errors error, Set { + tps.forEach(tp -> pSet.add(new RemainingPartition() .setTopicName(tp.topic()) - .setPartitionIndex(tp.partition())); - }); + .setPartitionIndex(tp.partition())) + ); data.setRemainingPartitions(pSet); return new ControlledShutdownResponse(data); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java index 27e7c581c33bb..2cf8dd40a15c9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java @@ -107,7 +107,7 @@ private void validate(short version) { } public static DeleteAclsFilterResult filterResult(AclDeleteResult result) { - ApiError error = result.exception().map(e -> ApiError.fromThrowable(e)).orElse(ApiError.NONE); + ApiError error = result.exception().map(ApiError::fromThrowable).orElse(ApiError.NONE); List matchingAcls = result.aclBindingDeleteResults().stream() .map(DeleteAclsResponse::matchingAcl) .collect(Collectors.toList()); @@ -118,7 +118,7 @@ public static DeleteAclsFilterResult filterResult(AclDeleteResult result) { } private static DeleteAclsMatchingAcl matchingAcl(AclDeleteResult.AclBindingDeleteResult result) { - ApiError error = result.exception().map(e -> ApiError.fromThrowable(e)).orElse(ApiError.NONE); + ApiError error = result.exception().map(ApiError::fromThrowable).orElse(ApiError.NONE); AclBinding acl = result.aclBinding(); return matchingAcl(acl, error); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java index 006491f74c067..60b4031725e80 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java @@ -92,7 +92,7 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { public List topicNames() { if (version() >= 6) - return data.topics().stream().map(topic -> topic.name()).collect(Collectors.toList()); + return data.topics().stream().map(DeleteTopicState::name).collect(Collectors.toList()); return data.topicNames(); } @@ -104,7 +104,7 @@ public int numberOfTopics() { public List topicIds() { if (version() >= 6) - return data.topics().stream().map(topic -> topic.topicId()).collect(Collectors.toList()); + return data.topics().stream().map(DeleteTopicState::topicId).collect(Collectors.toList()); return Collections.emptyList(); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java index d612ca80949a0..1bae21a9e9c91 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java @@ -58,12 +58,12 @@ public DescribeConfigsResponse getErrorResponse(int throttleTimeMs, Throwable e) Errors error = Errors.forException(e); return new DescribeConfigsResponse(new DescribeConfigsResponseData() .setThrottleTimeMs(throttleTimeMs) - .setResults(data.resources().stream().map(result -> { - return new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(error.code()) + .setResults(data.resources().stream().map(result -> + new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(error.code()) .setErrorMessage(error.message()) .setResourceName(result.resourceName()) - .setResourceType(result.resourceType()); - }).collect(Collectors.toList()) + .setResourceType(result.resourceType())) + .collect(Collectors.toList()) )); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java index 3d06cff712c18..e0bba392a076b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java @@ -59,9 +59,9 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) { public Map errorCounts() { Map errorCounts = new HashMap<>(); errorCounts.put(Errors.forCode(data.errorCode()), 1); - data.results().forEach(result -> { - updateErrorCounts(errorCounts, Errors.forCode(result.errorCode())); - }); + data.results().forEach(result -> + updateErrorCounts(errorCounts, Errors.forCode(result.errorCode())) + ); return errorCounts; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotResponse.java index c86ba9cfc6616..2ea0265f6992d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotResponse.java @@ -84,7 +84,7 @@ public static FetchSnapshotResponseData withTopLevelError(Errors error) { /** * Creates a FetchSnapshotResponseData with a single PartitionSnapshot for the topic partition. * - * The partition index will already by populated when calling operator. + * The partition index will already be populated when calling operator. * * @param topicPartition the topic partition to include * @param operator unary operator responsible for populating all of the appropriate fields diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index af10a6c78aebd..10041633faeed 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -86,7 +86,7 @@ public static String maybeTruncateReason(final String reason) { /** * Since JoinGroupRequest version 4, a client that sends a join group request with - * {@link UNKNOWN_MEMBER_ID} needs to rejoin with a new member id generated + * {@link #UNKNOWN_MEMBER_ID} needs to rejoin with a new member id generated * by the server. Once the second join group request is complete, the client is * added as a new member of the group. * diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java index c32d61442632c..77a72d5532bc4 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java @@ -138,9 +138,9 @@ public Map errorCounts() { updateErrorCounts(combinedErrorCounts, Errors.forCode(data.errorCode())); // Member level error. - data.members().forEach(memberResponse -> { - updateErrorCounts(combinedErrorCounts, Errors.forCode(memberResponse.errorCode())); - }); + data.members().forEach(memberResponse -> + updateErrorCounts(combinedErrorCounts, Errors.forCode(memberResponse.errorCode())) + ); return combinedErrorCounts; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java index fc996453d6470..57aeb9de1bbb5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java @@ -181,13 +181,4 @@ public static List toListOffsetsTopics(Map(topics.values()); } - - public static ListOffsetsTopic singletonRequestData(String topic, int partitionIndex, long timestamp, int maxNumOffsets) { - return new ListOffsetsTopic() - .setName(topic) - .setPartitions(Collections.singletonList(new ListOffsetsPartition() - .setPartitionIndex(partitionIndex) - .setTimestamp(timestamp) - .setMaxNumOffsets(maxNumOffsets))); - } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 0eba5f29ab4a6..88111b1007717 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -101,11 +101,11 @@ public static OffsetCommitResponseData getErrorResponse( .setName(topic.name()); response.topics().add(responseTopic); - topic.partitions().forEach(partition -> { + topic.partitions().forEach(partition -> responseTopic.partitions().add(new OffsetCommitResponsePartition() .setPartitionIndex(partition.partitionIndex()) - .setErrorCode(error.code())); - }); + .setErrorCode(error.code())) + ); }); return response; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java index 9848ad380fab0..2b6d00b1a47f6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java @@ -160,13 +160,11 @@ public

Builder addPartitions( Errors error ) { final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName); - - partitions.forEach(partition -> { + partitions.forEach(partition -> topicResponse.partitions().add(new OffsetCommitResponsePartition() .setPartitionIndex(partitionIndex.apply(partition)) - .setErrorCode(error.code())); - }); - + .setErrorCode(error.code())) + ); return this; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java index e8c21969e9e3b..aa9b4bc4ffe66 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java @@ -83,13 +83,11 @@ public

Builder addPartitions( Errors error ) { final OffsetDeleteResponseTopic topicResponse = getOrCreateTopic(topicName); - - partitions.forEach(partition -> { + partitions.forEach(partition -> topicResponse.partitions().add(new OffsetDeleteResponsePartition() .setPartitionIndex(partitionIndex.apply(partition)) - .setErrorCode(error.code())); - }); - + .setErrorCode(error.code())) + ); return this; } @@ -113,9 +111,9 @@ public Builder merge( // Otherwise, we add the partitions to the existing one. Note we // expect non-overlapping partitions here as we don't verify // if the partition is already in the list before adding it. - newTopic.partitions().forEach(partition -> { - existingTopic.partitions().add(partition.duplicate()); - }); + newTopic.partitions().forEach(partition -> + existingTopic.partitions().add(partition.duplicate()) + ); } }); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java index 70473565f63d5..1b748dca28e61 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java @@ -237,12 +237,12 @@ public List groups() { group.setTopics(null); } else { // Otherwise, topics are translated to the new structure. - data.topics().forEach(topic -> { + data.topics().forEach(topic -> group.topics().add(new OffsetFetchRequestTopics() .setName(topic.name()) .setPartitionIndexes(topic.partitionIndexes()) - ); - }); + ) + ); } return Collections.singletonList(group); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java index 8242b71a03fbd..d2da48fb018ee 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java @@ -77,7 +77,7 @@ public Map groupAssignments() { } /** - * ProtocolType and ProtocolName are mandatory since version 5. This methods verifies that + * ProtocolType and ProtocolName are mandatory since version 5. This method verifies that * they are defined for version 5 or higher, or returns true otherwise for older versions. */ public boolean areMandatoryProtocolTypeAndNamePresent() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java index e70fe11891afe..e79b3bbc7b3be 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java @@ -199,11 +199,11 @@ public static TxnOffsetCommitResponseData getErrorResponse( .setName(topic.name()); response.topics().add(responseTopic); - topic.partitions().forEach(partition -> { + topic.partitions().forEach(partition -> responseTopic.partitions().add(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() .setPartitionIndex(partition.partitionIndex()) - .setErrorCode(error.code())); - }); + .setErrorCode(error.code())) + ); }); return response; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java index 58f9d5c099c24..ce7dd9e7f1cbb 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java @@ -88,11 +88,11 @@ public

Builder addPartitions( ) { final TxnOffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName); - partitions.forEach(partition -> { + partitions.forEach(partition -> topicResponse.partitions().add(new TxnOffsetCommitResponsePartition() .setPartitionIndex(partitionIndex.apply(partition)) - .setErrorCode(error.code())); - }); + .setErrorCode(error.code())) + ); return this; } diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java index f84a8ac9b7532..49e537c1f3a1f 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java @@ -76,7 +76,7 @@ private LoginManager(JaasContext jaasContext, String saslMechanism, Map payload, String claimName) { diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ValidatorAccessTokenValidator.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ValidatorAccessTokenValidator.java index 5f51d456efdcb..d07663b723e7e 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ValidatorAccessTokenValidator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ValidatorAccessTokenValidator.java @@ -181,13 +181,11 @@ else if (scopeRaw instanceof Collection) Long issuedAt = ClaimValidationUtils.validateIssuedAt(ReservedClaimNames.ISSUED_AT, issuedAtRaw != null ? issuedAtRaw.getValueInMillis() : null); - OAuthBearerToken token = new BasicOAuthBearerToken(accessToken, + return new BasicOAuthBearerToken(accessToken, scopes, expiration, sub, issuedAt); - - return token; } private T getClaim(ClaimSupplier supplier, String claimName) throws ValidateException { diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java index 4c776159580e5..629053e6550c3 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java @@ -188,7 +188,7 @@ private void handleTokenCallback(OAuthBearerTokenCallback callback) { callback.token(null); return; } - if (moduleOptions.keySet().stream().noneMatch(name -> !name.startsWith(EXTENSION_PREFIX))) { + if (moduleOptions.keySet().stream().allMatch(name -> name.startsWith(EXTENSION_PREFIX))) { throw new OAuthBearerConfigException("Extensions provided in login context without a token"); } String principalClaimNameValue = optionValue(PRINCIPAL_CLAIM_NAME_OPTION); diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java index 7a81521518cd1..e9fa9e8bce850 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java @@ -117,9 +117,8 @@ public void configure(Map configs, String saslMechanism, List unmodifiableModuleOptions = Collections + this.moduleOptions = Collections .unmodifiableMap((Map) jaasConfigEntries.get(0).getOptions()); - this.moduleOptions = unmodifiableModuleOptions; configured = true; } diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java index d5d55a65021c8..28c2e5568ee40 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java @@ -70,7 +70,6 @@ enum State { private final ScramFormatter formatter; private final CallbackHandler callbackHandler; private State state; - private String username; private ClientFirstMessage clientFirstMessage; private ServerFirstMessage serverFirstMessage; private ScramExtensions scramExtensions; @@ -108,7 +107,7 @@ public byte[] evaluateResponse(byte[] response) throws SaslException, SaslAuthen String serverNonce = formatter.secureRandomString(); try { String saslName = clientFirstMessage.saslName(); - this.username = ScramFormatter.username(saslName); + String username = ScramFormatter.username(saslName); NameCallback nameCallback = new NameCallback("username", username); ScramCredentialCallback credentialCallback; if (scramExtensions.tokenAuthenticated()) { diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferUnmapper.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferUnmapper.java index 4777f7bf96b49..68e182d22784a 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferUnmapper.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferUnmapper.java @@ -117,9 +117,8 @@ private static MethodHandle unmapJava7Or8(MethodHandles.Lookup lookup) throws Re MethodHandle nonNullTest = lookup.findStatic(ByteBufferUnmapper.class, "nonNull", methodType(boolean.class, Object.class)).asType(methodType(boolean.class, cleanerClass)); MethodHandle noop = dropArguments(constant(Void.class, null).asType(methodType(void.class)), 0, cleanerClass); - MethodHandle unmapper = filterReturnValue(directBufferCleanerMethod, guardWithTest(nonNullTest, cleanMethod, noop)) + return filterReturnValue(directBufferCleanerMethod, guardWithTest(nonNullTest, cleanMethod, noop)) .asType(methodType(void.class, ByteBuffer.class)); - return unmapper; } private static MethodHandle unmapJava9(MethodHandles.Lookup lookup) throws ReflectiveOperationException { diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Java.java b/clients/src/main/java/org/apache/kafka/common/utils/Java.java index 2310f422d0e1b..2552db250d55a 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Java.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Java.java @@ -36,7 +36,7 @@ static Version parseVersion(String versionString) { return new Version(majorVersion, minorVersion); } - // Having these as static final provides the best opportunity for compilar optimization + // Having these as static final provides the best opportunity for compiler optimization public static final boolean IS_JAVA9_COMPATIBLE = VERSION.isJava9Compatible(); public static final boolean IS_JAVA11_COMPATIBLE = VERSION.isJava11Compatible(); diff --git a/clients/src/main/java/org/apache/kafka/common/utils/LoggingSignalHandler.java b/clients/src/main/java/org/apache/kafka/common/utils/LoggingSignalHandler.java index 55eb49a2704aa..824a1c4ddb0d3 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/LoggingSignalHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/LoggingSignalHandler.java @@ -37,7 +37,6 @@ public class LoggingSignalHandler { private final Constructor signalConstructor; private final Class signalHandlerClass; - private final Class signalClass; private final Method signalHandleMethod; private final Method signalGetNameMethod; private final Method signalHandlerHandleMethod; @@ -48,7 +47,7 @@ public class LoggingSignalHandler { * @throws ReflectiveOperationException if the underlying API has changed in an incompatible manner. */ public LoggingSignalHandler() throws ReflectiveOperationException { - signalClass = Class.forName("sun.misc.Signal"); + Class signalClass = Class.forName("sun.misc.Signal"); signalConstructor = signalClass.getConstructor(String.class); signalHandlerClass = Class.forName("sun.misc.SignalHandler"); signalHandlerHandleMethod = signalHandlerClass.getMethod("handle", signalClass); diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index 5030ecfef10af..a6371c6db6086 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -1359,7 +1359,7 @@ public void testConcurrentUpdateAndFetchForSnapshotAndCluster() throws Interrupt } /** - * For testUpdatePartially, validates that updatedMetadata is matching expected part1Metadata, part2Metadata, interalPartMetadata, nodes & more. + * For testUpdatePartially, validates that updatedMetadata is matching expected part1Metadata, part2Metadata, internalPartMetadata, nodes & more. */ void validateForUpdatePartitionLeadership(Metadata updatedMetadata, PartitionMetadata part1Metadata, PartitionMetadata part2Metadata, PartitionMetadata part12Metadata, @@ -1379,7 +1379,7 @@ void validateForUpdatePartitionLeadership(Metadata updatedMetadata, assertEquals(expectedController, updatedCluster.controller()); assertEquals(expectedTopicIds, updatedMetadata.topicIds()); - Map nodeMap = expectedNodes.stream().collect(Collectors.toMap(e -> e.id(), e -> e)); + Map nodeMap = expectedNodes.stream().collect(Collectors.toMap(Node::id, e -> e)); for (PartitionMetadata partitionMetadata: Arrays.asList(part1Metadata, part2Metadata, part12Metadata, internalPartMetadata)) { TopicPartition tp = new TopicPartition(partitionMetadata.topic(), partitionMetadata.partition()); diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index cd3ec36f38593..4c22108b2b3b6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -1351,17 +1351,17 @@ public TestMetadataUpdater(List nodes) { @Override public void handleServerDisconnect(long now, String destinationId, Optional maybeAuthException) { - maybeAuthException.ifPresent(exception -> { - failure = exception; - }); + maybeAuthException.ifPresent(exception -> + failure = exception + ); super.handleServerDisconnect(now, destinationId, maybeAuthException); } @Override public void handleFailedRequest(long now, Optional maybeFatalException) { - maybeFatalException.ifPresent(exception -> { - failure = exception; - }); + maybeFatalException.ifPresent(exception -> + failure = exception + ); } public KafkaException getAndClearFailure() { diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java index 8c9244e23d80d..a84927c735f58 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java @@ -156,7 +156,7 @@ public static ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult(Stri public static ListClientMetricsResourcesResult listClientMetricsResourcesResult(String... names) { return new ListClientMetricsResourcesResult( KafkaFuture.completedFuture(Arrays.stream(names) - .map(name -> new ClientMetricsResourceListing(name)) + .map(ClientMetricsResourceListing::new) .collect(Collectors.toList()))); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 3d73cacbb77a9..bd7c7d0b1ab47 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -749,7 +749,7 @@ private static DescribeQuorumResponse prepareDescribeQuorumResponse( Boolean partitionIndexError, Boolean emptyOptionals) { String topicName = topicNameError ? "RANDOM" : Topic.CLUSTER_METADATA_TOPIC_NAME; - Integer partitionIndex = partitionIndexError ? 1 : Topic.CLUSTER_METADATA_TOPIC_PARTITION.partition(); + int partitionIndex = partitionIndexError ? 1 : Topic.CLUSTER_METADATA_TOPIC_PARTITION.partition(); List topics = new ArrayList<>(); List partitions = new ArrayList<>(); for (int i = 0; i < (partitionCountError ? 2 : 1); i++) { @@ -1553,7 +1553,7 @@ private void addPartitionToDescribeTopicPartitionsResponse( Uuid topicId, List partitions) { List addingPartitions = new ArrayList<>(); - partitions.forEach(partition -> { + partitions.forEach(partition -> addingPartitions.add(new DescribeTopicPartitionsResponsePartition() .setIsrNodes(singletonList(0)) .setErrorCode((short) 0) @@ -1562,8 +1562,8 @@ private void addPartitionToDescribeTopicPartitionsResponse( .setEligibleLeaderReplicas(singletonList(1)) .setLastKnownElr(singletonList(2)) .setPartitionIndex(partition) - .setReplicaNodes(Arrays.asList(0, 1, 2))); - }); + .setReplicaNodes(Arrays.asList(0, 1, 2))) + ); data.topics().add(new DescribeTopicPartitionsResponseTopic() .setErrorCode((short) 0) .setTopicId(topicId) @@ -4033,7 +4033,7 @@ public void testBatchedListConsumerGroupOffsetsWithNoOffsetFetchBatching() throw ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(groupSpecs); // The request handler attempts both FindCoordinator and OffsetFetch requests. This seems - // ok since since we expect this scenario only during upgrades from versions < 3.0.0 where + // ok since we expect this scenario only during upgrades from versions < 3.0.0 where // some upgraded brokers could handle batched FindCoordinator while non-upgraded coordinators // rejected batched OffsetFetch requests. sendFindCoordinatorResponse(env.kafkaClient(), env.cluster().controller()); @@ -4890,7 +4890,7 @@ public void testRemoveMembersFromGroup() throws Exception { assertNull(noErrorResult.memberResult(memberTwo).get()); // Test the "removeAll" scenario - final List topicPartitions = Arrays.asList(1, 2, 3).stream().map(partition -> new TopicPartition("my_topic", partition)) + final List topicPartitions = Stream.of(1, 2, 3).map(partition -> new TopicPartition("my_topic", partition)) .collect(Collectors.toList()); // construct the DescribeGroupsResponse DescribeGroupsResponseData data = prepareDescribeGroupsResponseData(GROUP_ID, Arrays.asList(instanceOne, instanceTwo), topicPartitions); @@ -6794,7 +6794,7 @@ public void testAlterUserScramCredentialsUnknownMechanism() { new UserScramCredentialUpsertion(user2Name, new ScramCredentialInfo(user2ScramMechanism0, 4096), "password"))); Map> resultData = result.values(); assertEquals(3, resultData.size()); - Arrays.asList(user0Name, user1Name).stream().forEach(u -> { + Stream.of(user0Name, user1Name).forEach(u -> { assertTrue(resultData.containsKey(u)); try { resultData.get(u).get(); @@ -6819,7 +6819,7 @@ public void testAlterUserScramCredentialsUnknownMechanism() { } @Test - public void testAlterUserScramCredentials() throws Exception { + public void testAlterUserScramCredentials() { try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); @@ -6831,7 +6831,7 @@ public void testAlterUserScramCredentials() throws Exception { final String user2Name = "user2"; ScramMechanism user2ScramMechanism0 = ScramMechanism.SCRAM_SHA_512; AlterUserScramCredentialsResponseData responseData = new AlterUserScramCredentialsResponseData(); - responseData.setResults(Arrays.asList(user0Name, user1Name, user2Name).stream().map(u -> + responseData.setResults(Stream.of(user0Name, user1Name, user2Name).map(u -> new AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult() .setUser(u).setErrorCode(Errors.NONE.code())).collect(Collectors.toList())); @@ -6844,7 +6844,7 @@ public void testAlterUserScramCredentials() throws Exception { new UserScramCredentialDeletion(user2Name, user2ScramMechanism0))); Map> resultData = result.values(); assertEquals(3, resultData.size()); - Arrays.asList(user0Name, user1Name, user2Name).stream().forEach(u -> { + Stream.of(user0Name, user1Name, user2Name).forEach(u -> { assertTrue(resultData.containsKey(u)); assertFalse(resultData.get(u).isCompletedExceptionally()); }); @@ -7294,14 +7294,14 @@ public void testListTransactions() throws Exception { MetadataResponseData.MetadataResponseBrokerCollection brokers = new MetadataResponseData.MetadataResponseBrokerCollection(); - env.cluster().nodes().forEach(node -> { + env.cluster().nodes().forEach(node -> brokers.add(new MetadataResponseData.MetadataResponseBroker() .setHost(node.host()) .setNodeId(node.id()) .setPort(node.port()) .setRack(node.rack()) - ); - }); + ) + ); env.kafkaClient().prepareResponse( request -> request instanceof MetadataRequest, @@ -7658,7 +7658,7 @@ public void testListClientMetricsResourcesEmpty() throws Exception { } @Test - public void testListClientMetricsResourcesNotSupported() throws Exception { + public void testListClientMetricsResourcesNotSupported() { try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().prepareResponse( request -> request instanceof ListClientMetricsResourcesRequest, diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index f72362715e26e..4d52a81480d63 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -20,7 +20,6 @@ import org.apache.kafka.clients.admin.internals.CoordinatorKey; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.ElectionType; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; @@ -103,8 +102,6 @@ public class MockAdminClient extends AdminClient { private Time mockTime; private long blockingTimeMs; - private KafkaException listConsumerGroupOffsetsException; - private final Map mockMetrics = new HashMap<>(); private final List allTokens = new ArrayList<>(); @@ -646,8 +643,8 @@ synchronized public CreateDelegationTokenResult createDelegationToken(CreateDele synchronized public RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options) { KafkaFutureImpl future = new KafkaFutureImpl<>(); - Boolean tokenFound = false; - Long expiryTimestamp = options.renewTimePeriodMs(); + boolean tokenFound = false; + long expiryTimestamp = options.renewTimePeriodMs(); for (DelegationToken token : allTokens) { if (Arrays.equals(token.hmac(), hmac)) { token.tokenInfo().setExpiryTimestamp(expiryTimestamp); @@ -668,9 +665,9 @@ synchronized public RenewDelegationTokenResult renewDelegationToken(byte[] hmac, synchronized public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options) { KafkaFutureImpl future = new KafkaFutureImpl<>(); - Long expiryTimestamp = options.expiryTimePeriodMs(); + long expiryTimestamp = options.expiryTimePeriodMs(); List tokensToRemove = new ArrayList<>(); - Boolean tokenFound = false; + boolean tokenFound = false; for (DelegationToken token : allTokens) { if (Arrays.equals(token.hmac(), hmac)) { if (expiryTimestamp == -1 || expiryTimestamp < System.currentTimeMillis()) { @@ -728,20 +725,9 @@ synchronized public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map< String group = groupSpecs.keySet().iterator().next(); Collection topicPartitions = groupSpecs.get(group).topicPartitions(); final KafkaFutureImpl> future = new KafkaFutureImpl<>(); - - if (listConsumerGroupOffsetsException != null) { - future.completeExceptionally(listConsumerGroupOffsetsException); - } else { - if (topicPartitions.isEmpty()) { - future.complete(committedOffsets.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> new OffsetAndMetadata(entry.getValue())))); - } else { - future.complete(committedOffsets.entrySet().stream() - .filter(entry -> topicPartitions.contains(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, entry -> new OffsetAndMetadata(entry.getValue())))); - } - } - + future.complete(committedOffsets.entrySet().stream() + .filter(entry -> topicPartitions.isEmpty() || topicPartitions.contains(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, entry -> new OffsetAndMetadata(entry.getValue())))); return new ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group), future)); } @@ -1335,10 +1321,6 @@ public synchronized void updateConsumerGroupOffsets(final Map groupIds = new HashSet<>(Arrays.asList("g1", "g2")); DeleteConsumerGroupsHandler handler = new DeleteConsumerGroupsHandler(lc); AdminApiFuture future = AdminApiFuture.forKeys( - groupIds.stream().map(g -> CoordinatorKey.byGroupId(g)).collect(Collectors.toSet())); + groupIds.stream().map(CoordinatorKey::byGroupId).collect(Collectors.toSet())); AdminApiDriver driver = new AdminApiDriver<>( handler, @@ -660,13 +660,13 @@ public TestContext( new LogContext() ); - staticKeys.forEach((key, brokerId) -> { - assertMappedKey(this, key, brokerId); - }); + staticKeys.forEach((key, brokerId) -> + assertMappedKey(this, key, brokerId) + ); - dynamicKeys.keySet().forEach(key -> { - assertUnmappedKey(this, key); - }); + dynamicKeys.keySet().forEach(key -> + assertUnmappedKey(this, key) + ); } public static TestContext staticMapped(Map staticKeys) { @@ -681,22 +681,22 @@ private void assertLookupResponse( RequestSpec requestSpec, LookupResult result ) { - requestSpec.keys.forEach(key -> { - assertUnmappedKey(this, key); - }); + requestSpec.keys.forEach(key -> + assertUnmappedKey(this, key) + ); // The response is just a placeholder. The result is all we are interested in MetadataResponse response = new MetadataResponse(new MetadataResponseData(), ApiKeys.METADATA.latestVersion()); driver.onResponse(time.milliseconds(), requestSpec, response, Node.noNode()); - result.mappedKeys.forEach((key, brokerId) -> { - assertMappedKey(this, key, brokerId); - }); + result.mappedKeys.forEach((key, brokerId) -> + assertMappedKey(this, key, brokerId) + ); - result.failedKeys.forEach((key, exception) -> { - assertFailedKey(this, key, exception); - }); + result.failedKeys.forEach((key, exception) -> + assertFailedKey(this, key, exception) + ); } private void assertResponse( @@ -707,9 +707,9 @@ private void assertResponse( int brokerId = requestSpec.scope.destinationBrokerId().orElseThrow(() -> new AssertionError("Fulfillment requests must specify a target brokerId")); - requestSpec.keys.forEach(key -> { - assertMappedKey(this, key, brokerId); - }); + requestSpec.keys.forEach(key -> + assertMappedKey(this, key, brokerId) + ); // The response is just a placeholder. The result is all we are interested in MetadataResponse response = new MetadataResponse(new MetadataResponseData(), @@ -717,17 +717,17 @@ private void assertResponse( driver.onResponse(time.milliseconds(), requestSpec, response, node); - result.unmappedKeys.forEach(key -> { - assertUnmappedKey(this, key); - }); + result.unmappedKeys.forEach(key -> + assertUnmappedKey(this, key) + ); - result.failedKeys.forEach((key, exception) -> { - assertFailedKey(this, key, exception); - }); + result.failedKeys.forEach((key, exception) -> + assertFailedKey(this, key, exception) + ); - result.completedKeys.forEach((key, value) -> { - assertCompletedKey(this, key, value); - }); + result.completedKeys.forEach((key, value) -> + assertCompletedKey(this, key, value) + ); } private MockLookupStrategy lookupStrategy() { diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategyTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategyTest.java index 8e4b961394cbe..8c30d93038f70 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategyTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategyTest.java @@ -85,9 +85,9 @@ public void testHandleResponse() { ); assertEquals(expectedMappedKeys, lookupResult.mappedKeys.keySet()); - lookupResult.mappedKeys.forEach((brokerKey, brokerId) -> { - assertEquals(OptionalInt.of(brokerId), brokerKey.brokerId); - }); + lookupResult.mappedKeys.forEach((brokerKey, brokerId) -> + assertEquals(OptionalInt.of(brokerId), brokerKey.brokerId) + ); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index fc6d2c85ba561..7984aaf6d98f3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -299,9 +299,9 @@ public void testWakeupFromEnsureCoordinatorReady() { coordinator.ensureCoordinatorReadyAsync(); // But should wakeup in sync variation even if timer is 0. - assertThrows(WakeupException.class, () -> { - coordinator.ensureCoordinatorReady(mockTime.timer(0)); - }); + assertThrows(WakeupException.class, () -> + coordinator.ensureCoordinatorReady(mockTime.timer(0)) + ); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 66ee724a0e515..8a90a5965063e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -824,9 +824,7 @@ public void testCompleteQuietly() { AtomicReference exception = new AtomicReference<>(); CompletableFuture future = CompletableFuture.completedFuture(null); consumer = newConsumer(); - assertDoesNotThrow(() -> consumer.completeQuietly(() -> { - future.get(0, TimeUnit.MILLISECONDS); - }, "test", exception)); + assertDoesNotThrow(() -> consumer.completeQuietly(() -> future.get(0, TimeUnit.MILLISECONDS), "test", exception)); assertNull(exception.get()); assertDoesNotThrow(() -> consumer.completeQuietly(() -> { @@ -1764,9 +1762,9 @@ public void testLongPollWaitIsLimited() { // Mock the subscription being assigned as the first fetch is collected consumer.subscriptions().assignFromSubscribed(Collections.singleton(tp)); return Fetch.empty(); - }).doAnswer(invocation -> { - return Fetch.forPartition(tp, records, true); - }).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + }).doAnswer(invocation -> + Fetch.forPartition(tp, records, true) + ).when(fetchCollector).collectFetch(any(FetchBuffer.class)); // And then poll for up to 10000ms, which should return 2 records without timing out ConsumerRecords returnedRecords = consumer.poll(Duration.ofMillis(10000)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java index 9f6fd4a764b0a..acd5ed45cc21a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java @@ -179,7 +179,6 @@ public ConsumerTestBuilder(Optional groupInfo, boolean enableA if (groupInfo.isPresent()) { GroupInformation gi = groupInfo.get(); CoordinatorRequestManager coordinator = spy(new CoordinatorRequestManager( - time, logContext, DEFAULT_RETRY_BACKOFF_MS, DEFAULT_RETRY_BACKOFF_MAX_MS, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java index d4496522c07b8..01ba51134d23f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java @@ -189,7 +189,6 @@ private void expectFindCoordinatorRequest( private CoordinatorRequestManager setupCoordinatorManager(String groupId) { return new CoordinatorRequestManager( - time, new LogContext(), RETRY_BACKOFF_MS, RETRY_BACKOFF_MS, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index 451743ae2ad83..6a4b9faf479d6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -35,7 +35,6 @@ import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; @@ -47,7 +46,6 @@ public class ApplicationEventProcessorTest { private final Time time = new MockTime(1); - private final BlockingQueue applicationEventQueue = mock(BlockingQueue.class); private final ConsumerMetadata metadata = mock(ConsumerMetadata.class); private ApplicationEventProcessor processor; private CommitRequestManager commitRequestManager; @@ -55,7 +53,6 @@ public class ApplicationEventProcessorTest { private MembershipManager membershipManager; @BeforeEach - @SuppressWarnings("unchecked") public void setup() { LogContext logContext = new LogContext(); OffsetsRequestManager offsetsRequestManager = mock(OffsetsRequestManager.class); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 643dffb617918..81a7804b97ea6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -615,10 +615,9 @@ public void testInterceptorConstructorConfigurationWithExceptionShouldCloseRemai MockProducerInterceptor.setThrowOnConfigExceptionThreshold(targetInterceptor); - assertThrows(KafkaException.class, () -> { - new KafkaProducer<>( - props, new StringSerializer(), new StringSerializer()); - }); + assertThrows(KafkaException.class, () -> + new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()) + ); assertEquals(3, MockProducerInterceptor.CONFIG_COUNT.get()); assertEquals(3, MockProducerInterceptor.CLOSE_COUNT.get()); @@ -1207,9 +1206,8 @@ public void testInitTransactionsResponseAfterTimeout() throws Exception { ExecutorService executor = Executors.newFixedThreadPool(1); - Producer producer = kafkaProducer(configs, new StringSerializer(), - new StringSerializer(), metadata, client, null, time); - try { + try (Producer producer = kafkaProducer(configs, new StringSerializer(), + new StringSerializer(), metadata, client, null, time)) { client.prepareResponse( request -> request instanceof FindCoordinatorRequest && ((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(), @@ -1226,8 +1224,6 @@ public void testInitTransactionsResponseAfterTimeout() throws Exception { Thread.sleep(1000); producer.initTransactions(); - } finally { - producer.close(Duration.ZERO); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java index ed372e452f426..1578c2e22492a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java @@ -89,7 +89,7 @@ public void testSimple() throws Exception { } /** - * Test that we cannot try to allocate more memory then we have in the whole pool + * Test that we cannot try to allocate more memory than we have in the whole pool */ @Test public void testCantAllocateMoreMemoryThanWeHave() throws Exception { diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java index 81d2530650dfa..ad904d6e73cc0 100644 --- a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java +++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java @@ -99,7 +99,7 @@ private Throwable awaitAndAssertFailure(KafkaFuture future, assertEquals(expectedException, executionException.getCause().getClass()); assertEquals(expectedMessage, executionException.getCause().getMessage()); - executionException = assertThrows(ExecutionException.class, () -> future.get()); + executionException = assertThrows(ExecutionException.class, future::get); assertEquals(expectedException, executionException.getCause().getClass()); assertEquals(expectedMessage, executionException.getCause().getMessage()); @@ -114,7 +114,7 @@ private void awaitAndAssertCancelled(KafkaFuture future, String expectedMessa assertEquals(expectedMessage, cancellationException.getMessage()); assertEquals(CancellationException.class, cancellationException.getClass()); - cancellationException = assertThrows(CancellationException.class, () -> future.get()); + cancellationException = assertThrows(CancellationException.class, future::get); assertEquals(expectedMessage, cancellationException.getMessage()); assertEquals(CancellationException.class, cancellationException.getClass()); @@ -155,7 +155,7 @@ public void testCompleteFuturesExceptionally() { assertFalse(futureFail.completeExceptionally(new RuntimeException("We require more minerals"))); assertFalse(futureFail.cancel(true)); - ExecutionException executionException = assertThrows(ExecutionException.class, () -> futureFail.get()); + ExecutionException executionException = assertThrows(ExecutionException.class, futureFail::get); assertEquals(RuntimeException.class, executionException.getCause().getClass()); assertEquals("We require more vespene gas", executionException.getCause().getMessage()); diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java index 8add9c95f3071..7d8ca5fdff7ee 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java @@ -502,7 +502,7 @@ public void toRst() { .define("opt3", Type.LIST, Arrays.asList("a", "b"), Importance.LOW, "docs3") .define("opt4", Type.BOOLEAN, false, Importance.LOW, null); - final String expectedRst = "" + + final String expectedRst = "``opt2``\n" + " docs2\n" + "\n" + @@ -547,7 +547,7 @@ public void toEnrichedRst() { "Group Two", 0, Width.NONE, "..", singletonList("some.option")) .define("poor.opt", Type.STRING, "foo", Importance.HIGH, "Doc doc doc doc."); - final String expectedRst = "" + + final String expectedRst = "``poor.opt``\n" + " Doc doc doc doc.\n" + "\n" + diff --git a/clients/src/test/java/org/apache/kafka/common/network/KafkaChannelTest.java b/clients/src/test/java/org/apache/kafka/common/network/KafkaChannelTest.java index f83ea7db87187..bd03a60935107 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/KafkaChannelTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/KafkaChannelTest.java @@ -73,9 +73,9 @@ public void testReceiving() throws IOException { ChannelMetadataRegistry metadataRegistry = Mockito.mock(ChannelMetadataRegistry.class); ArgumentCaptor sizeCaptor = ArgumentCaptor.forClass(Integer.class); - Mockito.when(pool.tryAllocate(sizeCaptor.capture())).thenAnswer(invocation -> { - return ByteBuffer.allocate(sizeCaptor.getValue()); - }); + Mockito.when(pool.tryAllocate(sizeCaptor.capture())).thenAnswer(invocation -> + ByteBuffer.allocate(sizeCaptor.getValue()) + ); KafkaChannel channel = new KafkaChannel("0", transport, () -> authenticator, 1024, pool, metadataRegistry); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index a46a38af30c13..f805ba807d69d 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -272,7 +272,7 @@ static List cipherMetrics(Metrics metrics) { return metrics.metrics().entrySet().stream(). filter(e -> e.getKey().description(). contains("The number of connections with this SSL cipher and protocol.")). - map(e -> e.getValue()). + map(Map.Entry::getValue). collect(Collectors.toList()); } diff --git a/clients/src/test/java/org/apache/kafka/common/replica/ReplicaSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/replica/ReplicaSelectorTest.java index 15ddef649a7f6..15249bd8383cc 100644 --- a/clients/src/test/java/org/apache/kafka/common/replica/ReplicaSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/replica/ReplicaSelectorTest.java @@ -50,9 +50,9 @@ public void testSameRackSelector() { }); selected = selector.select(tp, metadata("not-a-rack"), partitionView); - assertOptional(selected, replicaInfo -> { - assertEquals(replicaInfo, leader, "Expect leader when we can't find any nodes in given rack"); - }); + assertOptional(selected, replicaInfo -> + assertEquals(replicaInfo, leader, "Expect leader when we can't find any nodes in given rack") + ); selected = selector.select(tp, metadata("rack-a"), partitionView); assertOptional(selected, replicaInfo -> { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java index 339ef9be4a5df..ae7b603d41b16 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java @@ -39,13 +39,13 @@ public class DeleteAclsResponseTest { private static final short V1 = 1; private static final DeleteAclsMatchingAcl LITERAL_ACL1 = new DeleteAclsMatchingAcl() - .setResourceType(ResourceType.TOPIC.code()) - .setResourceName("foo") - .setPatternType(PatternType.LITERAL.code()) - .setPrincipal("User:ANONYMOUS") - .setHost("127.0.0.1") - .setOperation(AclOperation.READ.code()) - .setPermissionType(AclPermissionType.DENY.code()); + .setResourceType(ResourceType.TOPIC.code()) + .setResourceName("foo") + .setPatternType(PatternType.LITERAL.code()) + .setPrincipal("User:ANONYMOUS") + .setHost("127.0.0.1") + .setOperation(AclOperation.READ.code()) + .setPermissionType(AclPermissionType.DENY.code()); private static final DeleteAclsMatchingAcl LITERAL_ACL2 = new DeleteAclsMatchingAcl() .setResourceType(ResourceType.GROUP.code()) diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DeleteTopicsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DeleteTopicsRequestTest.java index 897797a350a56..d7d7eb985cd8a 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/DeleteTopicsRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/DeleteTopicsRequestTest.java @@ -77,7 +77,7 @@ public void testNewTopicsField() { } else { // We should fail if version is less than 6. - assertThrows(UnsupportedVersionException.class, () -> requestWithNames.serialize()); + assertThrows(UnsupportedVersionException.class, requestWithNames::serialize); } } } @@ -105,7 +105,7 @@ public void testTopicIdsField() { requestWithIdsSerialized.data().topics().forEach(topic -> assertNull(topic.name())); } else { // We should fail if version is less than 6. - assertThrows(UnsupportedVersionException.class, () -> requestWithIds.serialize()); + assertThrows(UnsupportedVersionException.class, requestWithIds::serialize); } } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java index 702341fee836c..4f13c914f0f71 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java @@ -42,7 +42,7 @@ public class FetchRequestTest { private static Stream fetchVersions() { - return ApiKeys.FETCH.allVersions().stream().map(version -> Arguments.of(version)); + return ApiKeys.FETCH.allVersions().stream().map(Arguments::of); } @ParameterizedTest @@ -64,7 +64,7 @@ public void testToReplaceWithDifferentVersions(short version) { // If version < 13, we should not see any partitions in forgottenTopics. This is because we can not // distinguish different topic IDs on versions earlier than 13. - assertEquals(fetchRequestUsesTopicIds, fetchRequest.data().forgottenTopicsData().size() > 0); + assertEquals(fetchRequestUsesTopicIds, !fetchRequest.data().forgottenTopicsData().isEmpty()); fetchRequest.data().forgottenTopicsData().forEach(forgottenTopic -> { // Since we didn't serialize, we should see the topic name and ID regardless of the version. assertEquals(tp.topic(), forgottenTopic.topic()); @@ -228,9 +228,9 @@ public void testFetchRequestSimpleBuilderReplicaStateDowngrade(short version) { public void testFetchRequestSimpleBuilderReplicaIdNotSupported(short version) { FetchRequestData fetchRequestData = new FetchRequestData().setReplicaId(1); FetchRequest.SimpleBuilder builder = new FetchRequest.SimpleBuilder(fetchRequestData); - assertThrows(IllegalStateException.class, () -> { - builder.build(version); - }); + assertThrows(IllegalStateException.class, () -> + builder.build(version) + ); } @Test diff --git a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java index ce3e1548d31d8..cea12640269f7 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java @@ -101,7 +101,7 @@ public void testGetErrorResponse() { /** * Verifies the logic we have in LeaderAndIsrRequest to present a unified interface across the various versions - * works correctly. For example, `LeaderAndIsrPartitionState.topicName` is not serialiazed/deserialized in + * works correctly. For example, `LeaderAndIsrPartitionState.topicName` is not serialized/deserialized in * recent versions, but we set it manually so that we can always present the ungrouped partition states * independently of the version. */ diff --git a/clients/src/test/java/org/apache/kafka/common/requests/UpdateMetadataRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/UpdateMetadataRequestTest.java index 1a909579fbee5..7be139c439964 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/UpdateMetadataRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/UpdateMetadataRequestTest.java @@ -73,7 +73,7 @@ public void testGetErrorResponse() { /** * Verifies the logic we have in UpdateMetadataRequest to present a unified interface across the various versions - * works correctly. For example, `UpdateMetadataPartitionState.topicName` is not serialiazed/deserialized in + * works correctly. For example, `UpdateMetadataPartitionState.topicName` is not serialized/deserialized in * recent versions, but we set it manually so that we can always present the ungrouped partition states * independently of the version. */ diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index 590514a6691b5..4972f3ea43037 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -1884,7 +1884,7 @@ private void verifySslClientAuthForSaslSslListener(boolean useListenerPrefix, // Client configures untrusted key store CertStores newStore = new CertStores(false, "localhost"); - newStore.keyStoreProps().forEach((k, v) -> saslClientConfigs.put(k, v)); + saslClientConfigs.putAll(newStore.keyStoreProps()); if (expectedClientAuth == SslClientAuth.NONE) { createAndCheckClientConnectionAndPrincipal(securityProtocol, "2", principalWithOneWayTls); } else { diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/internals/ScramFormatterTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/internals/ScramFormatterTest.java index 8d7a8ecd84407..715c97fe9270e 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/scram/internals/ScramFormatterTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/scram/internals/ScramFormatterTest.java @@ -78,9 +78,8 @@ public void rfc7677Example() throws Exception { * Tests encoding of username */ @Test - public void saslName() throws Exception { + public void saslName() { String[] usernames = {"user1", "123", "1,2", "user=A", "user==B", "user,1", "user 1", ",", "=", ",=", "=="}; - ScramFormatter formatter = new ScramFormatter(ScramMechanism.SCRAM_SHA_256); for (String username : usernames) { String saslName = ScramFormatter.saslName(username); // There should be no commas in saslName (comma is used as field separator in SASL messages) diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/CommonNameLoggingTrustManagerFactoryWrapperTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/CommonNameLoggingTrustManagerFactoryWrapperTest.java index 129e383221e86..b9aefc9a44ce4 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/ssl/CommonNameLoggingTrustManagerFactoryWrapperTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/CommonNameLoggingTrustManagerFactoryWrapperTest.java @@ -152,7 +152,7 @@ void testNeverExpiringX509Certificate() throws Exception { if (cert.getNotBefore().before(dateNow) && cert.getNotAfter().after(dateNow)) { assertDoesNotThrow(() -> cert.checkValidity()); } else { - assertThrows(CertificateException.class, () -> cert.checkValidity()); + assertThrows(CertificateException.class, cert::checkValidity); } // The wrappedCert must never throw due to being expired assertDoesNotThrow(() -> wrappedCert.checkValidity()); diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java index 8e5e584f97c91..82e4c4b4594b8 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java +++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java @@ -45,8 +45,6 @@ protected TrustManager[] engineGetTrustManagers() { public static class TestTrustManager extends X509ExtendedTrustManager { - public static final String ALIAS = "TestAlias"; - @Override public void checkClientTrusted(X509Certificate[] x509Certificates, String s) { diff --git a/clients/src/test/java/org/apache/kafka/common/utils/FlattenedIteratorTest.java b/clients/src/test/java/org/apache/kafka/common/utils/FlattenedIteratorTest.java index da54c1d98a4ae..9023e00be1ea3 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/FlattenedIteratorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/FlattenedIteratorTest.java @@ -19,6 +19,7 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -36,11 +37,11 @@ public void testNestedLists() { Collections.singletonList("ddddd"), asList("", "bar2", "baz45")); - Iterable flattenedIterable = () -> new FlattenedIterator<>(list.iterator(), l -> l.iterator()); + Iterable flattenedIterable = () -> new FlattenedIterator<>(list.iterator(), List::iterator); List flattened = new ArrayList<>(); flattenedIterable.forEach(flattened::add); - assertEquals(list.stream().flatMap(l -> l.stream()).collect(Collectors.toList()), flattened); + assertEquals(list.stream().flatMap(Collection::stream).collect(Collectors.toList()), flattened); // Ensure we can iterate multiple times List flattened2 = new ArrayList<>(); @@ -53,7 +54,7 @@ public void testNestedLists() { public void testEmptyList() { List> list = emptyList(); - Iterable flattenedIterable = () -> new FlattenedIterator<>(list.iterator(), l -> l.iterator()); + Iterable flattenedIterable = () -> new FlattenedIterator<>(list.iterator(), List::iterator); List flattened = new ArrayList<>(); flattenedIterable.forEach(flattened::add); @@ -64,7 +65,7 @@ public void testEmptyList() { public void testNestedSingleEmptyList() { List> list = Collections.singletonList(emptyList()); - Iterable flattenedIterable = () -> new FlattenedIterator<>(list.iterator(), l -> l.iterator()); + Iterable flattenedIterable = () -> new FlattenedIterator<>(list.iterator(), List::iterator); List flattened = new ArrayList<>(); flattenedIterable.forEach(flattened::add); @@ -77,11 +78,11 @@ public void testEmptyListFollowedByNonEmpty() { emptyList(), asList("boo", "b", "de")); - Iterable flattenedIterable = () -> new FlattenedIterator<>(list.iterator(), l -> l.iterator()); + Iterable flattenedIterable = () -> new FlattenedIterator<>(list.iterator(), List::iterator); List flattened = new ArrayList<>(); flattenedIterable.forEach(flattened::add); - assertEquals(list.stream().flatMap(l -> l.stream()).collect(Collectors.toList()), flattened); + assertEquals(list.stream().flatMap(Collection::stream).collect(Collectors.toList()), flattened); } @Test @@ -91,11 +92,11 @@ public void testEmptyListInBetweenNonEmpty() { emptyList(), asList("ee", "aa", "dd")); - Iterable flattenedIterable = () -> new FlattenedIterator<>(list.iterator(), l -> l.iterator()); + Iterable flattenedIterable = () -> new FlattenedIterator<>(list.iterator(), List::iterator); List flattened = new ArrayList<>(); flattenedIterable.forEach(flattened::add); - assertEquals(list.stream().flatMap(l -> l.stream()).collect(Collectors.toList()), flattened); + assertEquals(list.stream().flatMap(Collection::stream).collect(Collectors.toList()), flattened); } @Test @@ -105,11 +106,11 @@ public void testEmptyListAtTheEnd() { Collections.singletonList("e"), emptyList()); - Iterable flattenedIterable = () -> new FlattenedIterator<>(list.iterator(), l -> l.iterator()); + Iterable flattenedIterable = () -> new FlattenedIterator<>(list.iterator(), List::iterator); List flattened = new ArrayList<>(); flattenedIterable.forEach(flattened::add); - assertEquals(list.stream().flatMap(l -> l.stream()).collect(Collectors.toList()), flattened); + assertEquals(list.stream().flatMap(Collection::stream).collect(Collectors.toList()), flattened); } } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java index 0c5828f565a0b..93d03a3837aa0 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java @@ -625,13 +625,7 @@ public int compare(TestElement a, TestElement b) { return -1; } else if (a.key > b.key) { return 1; - } else if (a.val < b.val) { - return -1; - } else if (a.val > b.val) { - return 1; - } else { - return 0; - } + } else return Integer.compare(a.val, b.val); } } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MappedIteratorTest.java b/clients/src/test/java/org/apache/kafka/common/utils/MappedIteratorTest.java index 058f2cd6a9b44..12b4367b0295e 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/MappedIteratorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/MappedIteratorTest.java @@ -32,7 +32,7 @@ public class MappedIteratorTest { @Test public void testStringToInteger() { List list = asList("foo", "", "bar2", "baz45"); - Function mapper = s -> s.length(); + Function mapper = String::length; Iterable mappedIterable = () -> new MappedIterator<>(list.iterator(), mapper); List mapped = new ArrayList<>(); @@ -49,7 +49,7 @@ public void testStringToInteger() { @Test public void testEmptyList() { List list = emptyList(); - Function mapper = s -> s.length(); + Function mapper = String::length; Iterable mappedIterable = () -> new MappedIterator<>(list.iterator(), mapper); List mapped = new ArrayList<>(); diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java index 08e2cebc3124a..012151dd34e71 100755 --- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -823,19 +823,19 @@ public void shouldAcceptValidDateFormats() throws ParseException { public void shouldThrowOnInvalidDateFormatOrNullTimestamp() { // check some invalid formats // test null timestamp - assertTrue(assertThrows(IllegalArgumentException.class, () -> { - Utils.getDateTime(null); - }).getMessage().contains("Error parsing timestamp with null value")); + assertTrue(assertThrows(IllegalArgumentException.class, () -> + Utils.getDateTime(null) + ).getMessage().contains("Error parsing timestamp with null value")); // test pattern: yyyy-MM-dd'T'HH:mm:ss.X - checkExceptionForGetDateTimeMethod(() -> { - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.X")); - }); + checkExceptionForGetDateTimeMethod(() -> + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.X")) + ); // test pattern: yyyy-MM-dd HH:mm:ss - assertTrue(assertThrows(ParseException.class, () -> { - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); - }).getMessage().contains("It does not contain a 'T' according to ISO8601 format")); + assertTrue(assertThrows(ParseException.class, () -> + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")) + ).getMessage().contains("It does not contain a 'T' according to ISO8601 format")); // KAFKA-10685: use DateTimeFormatter generate micro/nano second timestamp final DateTimeFormatter formatter = new DateTimeFormatterBuilder() @@ -847,19 +847,19 @@ public void shouldThrowOnInvalidDateFormatOrNullTimestamp() { final LocalDateTime timestampWithSeconds = timestampWithNanoSeconds.truncatedTo(ChronoUnit.SECONDS); // test pattern: yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS - checkExceptionForGetDateTimeMethod(() -> { - Utils.getDateTime(formatter.format(timestampWithNanoSeconds)); - }); + checkExceptionForGetDateTimeMethod(() -> + Utils.getDateTime(formatter.format(timestampWithNanoSeconds)) + ); // test pattern: yyyy-MM-dd'T'HH:mm:ss.SSSSSS - checkExceptionForGetDateTimeMethod(() -> { - Utils.getDateTime(formatter.format(timestampWithMicroSeconds)); - }); + checkExceptionForGetDateTimeMethod(() -> + Utils.getDateTime(formatter.format(timestampWithMicroSeconds)) + ); // test pattern: yyyy-MM-dd'T'HH:mm:ss - checkExceptionForGetDateTimeMethod(() -> { - Utils.getDateTime(formatter.format(timestampWithSeconds)); - }); + checkExceptionForGetDateTimeMethod(() -> + Utils.getDateTime(formatter.format(timestampWithSeconds)) + ); } private void checkExceptionForGetDateTimeMethod(Executable executable) {