Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b9499c2
Changed toString() override to toStringBase()
brenden20 May 24, 2024
58f1ca4
Added test for toStringBase()
brenden20 May 24, 2024
7a5f172
Added to toStringBase()
brenden20 May 24, 2024
4f56809
Add println and example hashcode
brenden20 May 24, 2024
1dea30d
Edited toStringBase test
brenden20 May 28, 2024
cee03b7
Added to toStringBase and updated test
brenden20 May 28, 2024
42c7826
Remove hashcode comment
brenden20 May 28, 2024
935f0b8
Add testing comment
brenden20 May 29, 2024
272d5de
Updated toStringBase and added toString
brenden20 May 29, 2024
7aa6ba5
Remove debugging print
brenden20 May 30, 2024
f5a3748
Merge branch 'apache:trunk' into 16557
brenden20 May 31, 2024
b560db3
Implementing PR suggestions
brenden20 Jun 3, 2024
8d7ddbb
Fixed build error
brenden20 Jun 3, 2024
3d9f58b
Small changes to testOffsetFetchRequestStateToStringBase()
brenden20 Jun 3, 2024
426313a
Remove debug print
brenden20 Jun 3, 2024
4461145
Whitespace fixes
brenden20 Jun 3, 2024
a252ec1
More whitespace changes
brenden20 Jun 3, 2024
33a7748
Fix style
brenden20 Jun 3, 2024
554ad53
Updated testOffsetFetchRequestStateToStringBase()
brenden20 Jun 3, 2024
f5abcc5
Merge branch 'apache:trunk' into 16557
brenden20 Jun 3, 2024
7e8be50
Revert "Merge branch 'apache:trunk' into 16557"
brenden20 Jun 3, 2024
97c29c6
Updated testOffsetFetchRequestStateToStringBase()
brenden20 Jun 10, 2024
df3e2ae
Revert "Updated testOffsetFetchRequestStateToStringBase()"
brenden20 Jun 10, 2024
c34f473
Merge branch 'trunk' into 16557
brenden20 Jun 10, 2024
9265258
Merge branch 'apache:trunk' into 16557
brenden20 Jun 11, 2024
74034dc
Revert "Merge branch 'trunk' into 16557"
brenden20 Jun 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
7 changes: 1 addition & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,11 @@ There are two code quality analysis tools that we regularly run, spotbugs and ch
Checkstyle enforces a consistent coding style in Kafka.
You can run checkstyle using:

./gradlew checkstyleMain checkstyleTest spotlessCheck
./gradlew checkstyleMain checkstyleTest

The checkstyle warnings will be found in `reports/checkstyle/reports/main.html` and `reports/checkstyle/reports/test.html` files in the
subproject build directories. They are also printed to the console. The build will fail if Checkstyle fails.

#### Spotless ####
The import order is a part of static check. please call `spotlessApply` (require JDK 11+) to optimize the imports of Java codes before filing pull request.

./gradlew spotlessApply

#### Spotbugs ####
Spotbugs uses static analysis to look for bugs in the code.
You can run spotbugs using:
Expand Down
8 changes: 4 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ subprojects {
jacoco {
toolVersion = versions.jacoco
}

jacocoTestReport {
dependsOn tasks.test
sourceSets sourceSets.main
Expand All @@ -838,8 +838,8 @@ subprojects {
skipProjects = [ ":jmh-benchmarks", ":trogdor" ]
skipConfigurations = [ "zinc" ]
}
// the task `removeUnusedImports` is implemented by google-java-format,
// and unfortunately the google-java-format version used by spotless 6.14.0 can't work with JDK 21.
// the task `removeUnusedImports` is implemented by google-java-format,
// and unfortunately the google-java-format version used by spotless 6.14.0 can't work with JDK 21.
// Hence, we apply spotless tasks only if the env is either JDK11 or JDK17
if ((JavaVersion.current().isJava11() || (JavaVersion.current() == JavaVersion.VERSION_17)) && project.path !in excludedSpotlessModules) {
apply plugin: 'com.diffplug.spotless'
Expand Down Expand Up @@ -3243,7 +3243,7 @@ project(':connect:runtime') {
testImplementation project(':storage')
testImplementation project(':connect:test-plugins')
testImplementation project(':group-coordinator')

testImplementation libs.junitJupiterApi
testImplementation libs.junitVintageEngine
testImplementation libs.mockitoCore
Expand Down
2 changes: 0 additions & 2 deletions checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@
<property name="file" value="${config_loc}/${importControlFile}"/>
</module>

<!-- don't define any import order here! Import order check/format is addressed by spotless.-->

<!-- whitespace -->
<module name="GenericWhitespace"/>
<module name="NoWhitespaceBefore"/>
Expand Down
6 changes: 5 additions & 1 deletion checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
<allow pkg="org.opentest4j" />
<allow pkg="org.hamcrest" />
<allow pkg="org.mockito" />
<allow pkg="org.easymock" />
<allow pkg="org.powermock" />
<allow pkg="java.security" />
<allow pkg="javax.net.ssl" />
<allow pkg="javax.security" />
Expand Down Expand Up @@ -439,7 +441,6 @@
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
Expand Down Expand Up @@ -620,6 +621,9 @@
<subpackage name="file">
<allow pkg="org.apache.kafka.connect" />
<allow pkg="org.apache.kafka.clients.consumer" />
<!-- for tests -->
<allow pkg="org.easymock" />
<allow pkg="org.powermock" />
</subpackage>

<subpackage name="tools">
Expand Down
4 changes: 2 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@
files="StreamThread.java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(InternalTopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl).java"/>
files="(KafkaStreams|KStreamImpl|KTableImpl).java"/>

<suppress checks="CyclomaticComplexity"
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup|SubscriptionWrapperSerde|AssignorConfiguration).java"/>
Expand All @@ -209,7 +209,7 @@
files="StreamsMetricsImpl.java"/>

<suppress checks="NPathComplexity"
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|TaskAssignmentUtils|GlobalStateManagerImpl|KStreamImplJoin|TopologyConfig|KTableKTableOuterJoin).java"/>
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|GlobalStateManagerImpl|KStreamImplJoin|TopologyConfig|KTableKTableOuterJoin).java"/>

<suppress checks="(FinalLocalVariable|UnnecessaryParentheses|BooleanExpressionComplexity|CyclomaticComplexity|WhitespaceAfter|LocalVariableName)"
files="Murmur3.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ static List<InetAddress> resolve(String host, HostResolver hostResolver) throws
InetAddress[] addresses = hostResolver.resolve(host);
List<InetAddress> result = filterPreferredAddresses(addresses);
if (log.isDebugEnabled())
log.debug("Resolved host {} as {}", host, result.stream().map(InetAddress::getHostAddress).collect(Collectors.joining(",")));
log.debug("Resolved host {} as {}", host, result.stream().map(i -> i.getHostAddress()).collect(Collectors.joining(",")));
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public Iterable<NetworkClient.InFlightRequest> clearAll(String node) {
} else {
final Deque<NetworkClient.InFlightRequest> clearedRequests = requests.remove(node);
inFlightRequestCount.getAndAdd(-clearedRequests.size());
return clearedRequests::descendingIterator;
return () -> clearedRequests.descendingIterator();
}
}

Expand Down
4 changes: 2 additions & 2 deletions clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,8 @@ public synchronized Set<TopicPartition> updatePartitionLeadership(Map<TopicParti
// Get topic-ids for updated topics from existing topic-ids.
Map<String, Uuid> existingTopicIds = this.metadataSnapshot.topicIds();
Map<String, Uuid> topicIdsForUpdatedTopics = updatedTopics.stream()
.filter(existingTopicIds::containsKey)
.collect(Collectors.toMap(e -> e, existingTopicIds::get));
.filter(e -> existingTopicIds.containsKey(e))
.collect(Collectors.toMap(e -> e, e -> existingTopicIds.get(e)));

if (log.isDebugEnabled()) {
updatePartitionMetadata.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,8 +821,9 @@ private void processDisconnection(List<ClientResponse> responses,
break;
case AUTHENTICATE:
log.warn("Connection to node {} ({}) terminated during authentication. This may happen " +
"due to any of the following reasons: (1) Firewall blocking Kafka TLS " +
"traffic (eg it may only allow HTTPS traffic), (2) Transient network issue.",
"due to any of the following reasons: (1) Authentication failed due to invalid " +
"credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS " +
"traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.",
nodeId, disconnectState.remoteAddress());
break;
case NOT_CONNECTED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,10 @@ public String toString(boolean lineBreaks) {
// which may happen when the remote is too old.
for (ApiKeys apiKey : ApiKeys.clientApis()) {
if (!apiKeysText.containsKey(apiKey.id)) {
String bld = apiKey.name + "(" +
apiKey.id + "): " + "UNSUPPORTED";
apiKeysText.put(apiKey.id, bld);
StringBuilder bld = new StringBuilder();
bld.append(apiKey.name).append("(").
append(apiKey.id).append("): ").append("UNSUPPORTED");
apiKeysText.put(apiKey.id, bld.toString());
}
}
String separator = lineBreaks ? ",\n\t" : ", ";
Expand Down

This file was deleted.

This file was deleted.

64 changes: 4 additions & 60 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ default DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> g
* List the consumer groups available in the cluster.
*
* @param options The options to use when listing the consumer groups.
* @return The ListConsumerGroupsResult.
* @return The ListGroupsResult.
*/
ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options);

Expand All @@ -911,7 +911,7 @@ default DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> g
* This is a convenience method for {@link #listConsumerGroups(ListConsumerGroupsOptions)} with default options.
* See the overload for more details.
*
* @return The ListConsumerGroupsResult.
* @return The ListGroupsResult.
*/
default ListConsumerGroupsResult listConsumerGroups() {
return listConsumerGroups(new ListConsumerGroupsOptions());
Expand All @@ -921,7 +921,7 @@ default ListConsumerGroupsResult listConsumerGroups() {
* List the consumer group offsets available in the cluster.
*
* @param options The options to use when listing the consumer group offsets.
* @return The ListConsumerGroupOffsetsResult
* @return The ListGroupOffsetsResult
*/
default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) {
@SuppressWarnings("deprecation")
Expand All @@ -939,7 +939,7 @@ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId,
* This is a convenience method for {@link #listConsumerGroupOffsets(Map, ListConsumerGroupOffsetsOptions)}
* to list offsets of all partitions of one group with default options.
*
* @return The ListConsumerGroupOffsetsResult.
* @return The ListGroupOffsetsResult.
*/
default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) {
return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions());
Expand Down Expand Up @@ -1711,62 +1711,6 @@ default ListClientMetricsResourcesResult listClientMetricsResources() {
*/
Uuid clientInstanceId(Duration timeout);

/**
* Add a new voter node to the KRaft metadata quorum.
*
* @param voterId The node ID of the voter.
* @param voterDirectoryId The directory ID of the voter.
* @param endpoints The endpoints that the new voter has.
*/
default AddRaftVoterResult addRaftVoter(
int voterId,
Uuid voterDirectoryId,
Set<RaftVoterEndpoint> endpoints
) {
return addRaftVoter(voterId, voterDirectoryId, endpoints, new AddRaftVoterOptions());
}

/**
* Add a new voter node to the KRaft metadata quorum.
*
* @param voterId The node ID of the voter.
* @param voterDirectoryId The directory ID of the voter.
* @param endpoints The endpoints that the new voter has.
* @param options The options to use when adding the new voter node.
*/
AddRaftVoterResult addRaftVoter(
int voterId,
Uuid voterDirectoryId,
Set<RaftVoterEndpoint> endpoints,
AddRaftVoterOptions options
);

/**
* Remove a voter node from the KRaft metadata quorum.
*
* @param voterId The node ID of the voter.
* @param voterDirectoryId The directory ID of the voter.
*/
default RemoveRaftVoterResult removeRaftVoter(
int voterId,
Uuid voterDirectoryId
) {
return removeRaftVoter(voterId, voterDirectoryId, new RemoveRaftVoterOptions());
}

/**
* Remove a voter node from the KRaft metadata quorum.
*
* @param voterId The node ID of the voter.
* @param voterDirectoryId The directory ID of the voter.
* @param options The options to use when removing the voter node.
*/
RemoveRaftVoterResult removeRaftVoter(
int voterId,
Uuid voterDirectoryId,
RemoveRaftVoterOptions options
);

/**
* Get the metrics kept by the adminClient
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ public DescribeConsumerGroupsResult(final Map<String, KafkaFuture<ConsumerGroupD
* Return a map from group id to futures which yield group descriptions.
*/
public Map<String, KafkaFuture<ConsumerGroupDescription>> describedGroups() {
return new HashMap<>(futures);
Map<String, KafkaFuture<ConsumerGroupDescription>> describedGroups = new HashMap<>();
futures.forEach((key, future) -> describedGroups.put(key, future));
return describedGroups;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public class DescribeDelegationTokenOptions extends AbstractOptions<DescribeDele
private List<KafkaPrincipal> owners;

/**
* If owners is null, all the user owned tokens and tokens where user have Describe permission
* if owners is null, all the user owned tokens and tokens where user have Describe permission
* will be returned.
* @param owners The owners that we want to describe delegation tokens for
* @param owners
* @return this instance
*/
public DescribeDelegationTokenOptions owners(List<KafkaPrincipal> owners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public Map<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>
return descriptions().entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().thenApply(this::convertMapValues)));
entry -> entry.getValue().thenApply(map -> convertMapValues(map))));
}

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -88,7 +88,7 @@ public Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions()
@Deprecated
public KafkaFuture<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> all() {
return allDescriptions().thenApply(map -> map.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getKey(),
entry -> convertMapValues(entry.getValue())
)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,21 @@ public Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> values() {
* Return a future which succeeds if log directory information of all replicas are available
*/
public KafkaFuture<Map<TopicPartitionReplica, ReplicaLogDirInfo>> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]))
.thenApply(v -> {
Map<TopicPartitionReplica, ReplicaLogDirInfo> replicaLogDirInfos = new HashMap<>();
for (Map.Entry<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> entry : futures.entrySet()) {
try {
replicaLogDirInfos.put(entry.getKey(), entry.getValue().get());
} catch (InterruptedException | ExecutionException e) {
// This should be unreachable, because allOf ensured that all the futures completed successfully.
throw new RuntimeException(e);
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
thenApply(new KafkaFuture.BaseFunction<Void, Map<TopicPartitionReplica, ReplicaLogDirInfo>>() {
@Override
public Map<TopicPartitionReplica, ReplicaLogDirInfo> apply(Void v) {
Map<TopicPartitionReplica, ReplicaLogDirInfo> replicaLogDirInfos = new HashMap<>();
for (Map.Entry<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> entry : futures.entrySet()) {
try {
replicaLogDirInfos.put(entry.getKey(), entry.getValue().get());
} catch (InterruptedException | ExecutionException e) {
// This should be unreachable, because allOf ensured that all the futures completed successfully.
throw new RuntimeException(e);
}
}
return replicaLogDirInfos;
}
return replicaLogDirInfos;
});
}

Expand Down
Loading