Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<suppress checks="ClassFanOutComplexity"
files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
<suppress checks="NPath"
files="SaslServerAuthenticator.java"/>
files="(Microbenchmarks|SaslServerAuthenticator).java"/>
<suppress checks="ClassFanOutComplexity"
files="Errors.java"/>
<suppress checks="ClassFanOutComplexity"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import java.net.InetAddress;

class AddressChangeHostResolver implements HostResolver {
private final InetAddress[] initialAddresses;
private final InetAddress[] newAddresses;
private boolean useNewAddresses;
private InetAddress[] initialAddresses;
private InetAddress[] newAddresses;
private int resolutionCount = 0;

public AddressChangeHostResolver(InetAddress[] initialAddresses, InetAddress[] newAddresses) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void testParseAndValidateAddressesWithReverseLookup() {
// With lookup of example.com, either one or two addresses are expected depending on
// whether ipv4 and ipv6 are enabled
List<InetSocketAddress> validatedAddresses = checkWithLookup(asList("example.com:10000"));
assertTrue(validatedAddresses.size() >= 1, "Unexpected addresses " + validatedAddresses);
assertFalse(validatedAddresses.isEmpty(), "Unexpected addresses " + validatedAddresses);
List<String> validatedHostNames = validatedAddresses.stream().map(InetSocketAddress::getHostName)
.collect(Collectors.toList());
List<String> expectedHostNames = asList("93.184.215.14", "2606:2800:21f:cb07:6820:80da:af6b:8b2c");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,16 @@ public class ClusterConnectionStatesTest {
private final String nodeId2 = "2002";
private final String nodeId3 = "3003";
private final String hostTwoIps = "multiple.ip.address";
private ClusterConnectionStates connectionStates;

// For testing nodes with a single IP address, use localhost and default DNS resolution
private DefaultHostResolver singleIPHostResolver = new DefaultHostResolver();
private final DefaultHostResolver singleIPHostResolver = new DefaultHostResolver();

// For testing nodes with multiple IP addresses, mock DNS resolution to get consistent results
private AddressChangeHostResolver multipleIPHostResolver = new AddressChangeHostResolver(
private final AddressChangeHostResolver multipleIPHostResolver = new AddressChangeHostResolver(
initialAddresses.toArray(new InetAddress[0]), newAddresses.toArray(new InetAddress[0]));

private ClusterConnectionStates connectionStates;

@BeforeEach
public void setup() {
this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand All @@ -67,12 +68,7 @@ public class FetchSessionHandlerTest {
* ordering for test purposes.
*/
private static Set<TopicPartition> toSet(TopicPartition... arr) {
TreeSet<TopicPartition> set = new TreeSet<>(new Comparator<TopicPartition>() {
@Override
public int compare(TopicPartition o1, TopicPartition o2) {
return o1.toString().compareTo(o2.toString());
}
});
TreeSet<TopicPartition> set = new TreeSet<>(Comparator.comparing(TopicPartition::toString));
set.addAll(Arrays.asList(arr));
return set;
}
Expand Down Expand Up @@ -317,12 +313,7 @@ public void testDoubleBuild() {
builder.add(new TopicPartition("foo", 0),
new FetchRequest.PartitionData(Uuid.randomUuid(), 0, 100, 200, Optional.empty()));
builder.build();
try {
builder.build();
fail("Expected calling build twice to fail.");
} catch (Throwable t) {
// expected
}
assertThrows(Throwable.class, builder::build, "Expected calling build twice to fail.");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@

public class InFlightRequestsTest {

private final String dest = "dest";
private InFlightRequests inFlightRequests;
private int correlationId;
private String dest = "dest";

@BeforeEach
public void setup() {
Expand Down
22 changes: 7 additions & 15 deletions clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.stream.Collectors;

import static org.apache.kafka.test.TestUtils.assertOptional;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand All @@ -80,9 +81,9 @@

public class MetadataTest {

private long refreshBackoffMs = 100;
private long refreshBackoffMaxMs = 1000;
private long metadataExpireMs = 1000;
private final long refreshBackoffMs = 100;
private final long refreshBackoffMaxMs = 1000;
private final long metadataExpireMs = 1000;
private Metadata metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs,
metadataExpireMs, new LogContext(), new ClusterResourceListeners());

Expand Down Expand Up @@ -1210,8 +1211,7 @@ else if (partition.equals(internalPart))
metadata.update(versionAndBuilder.requestVersion,
RequestTestUtils.metadataUpdateWith(clusterId, numNodes, errorCounts, topicPartitionCounts, tp -> null, metadataSupplier, ApiKeys.METADATA.latestVersion(), topicIds),
false, time.milliseconds());
List<Node> nodes = new ArrayList<>();
nodes.addAll(metadata.fetch().nodes());
List<Node> nodes = new ArrayList<>(metadata.fetch().nodes());
Node controller = metadata.fetch().controller();
assertEquals(numNodes, nodes.size());
assertFalse(metadata.updateRequested());
Expand Down Expand Up @@ -1321,22 +1321,14 @@ public void testConcurrentUpdateAndFetchForSnapshotAndCluster() throws Interrupt
metadata.updateWithCurrentRequestVersion(newMetadataResponse, true, time.milliseconds());
atleastMetadataUpdatedOnceLatch.countDown();
} else { // Thread to read metadata snapshot, once its updated
try {
if (!atleastMetadataUpdatedOnceLatch.await(5, TimeUnit.MINUTES)) {
assertFalse(true, "Test had to wait more than 5 minutes, something went wrong.");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertTrue(assertDoesNotThrow(() -> atleastMetadataUpdatedOnceLatch.await(5, TimeUnit.MINUTES)));
newSnapshot.set(metadata.fetchMetadataSnapshot());
newCluster.set(metadata.fetch());
}
allThreadsDoneLatch.countDown();
});
}
if (!allThreadsDoneLatch.await(5, TimeUnit.MINUTES)) {
assertFalse(true, "Test had to wait more than 5 minutes, something went wrong.");
}
assertTrue(allThreadsDoneLatch.await(5, TimeUnit.MINUTES));

// Validate new snapshot is upto-date. And has higher partition counts, nodes & leader epoch than earlier.
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ public void testTopicCollection() {
List<Uuid> topicIds = Arrays.asList(Uuid.randomUuid(), Uuid.randomUuid(), Uuid.randomUuid());
List<String> topicNames = Arrays.asList("foo", "bar");

TopicCollection idCollection = TopicCollection.ofTopicIds(topicIds);
TopicCollection nameCollection = TopicCollection.ofTopicNames(topicNames);
TopicIdCollection idCollection = TopicCollection.ofTopicIds(topicIds);
TopicNameCollection nameCollection = TopicCollection.ofTopicNames(topicNames);

assertTrue(((TopicIdCollection) idCollection).topicIds().containsAll(topicIds));
assertTrue(((TopicNameCollection) nameCollection).topicNames().containsAll(topicNames));
assertTrue(idCollection.topicIds().containsAll(topicIds));
assertTrue(nameCollection.topicNames().containsAll(topicNames));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private OffsetDeleteResponse buildGroupErrorResponse(Errors error) {
}

private OffsetDeleteResponse buildPartitionErrorResponse(Errors error) {
OffsetDeleteResponse response = new OffsetDeleteResponse(
return new OffsetDeleteResponse(
new OffsetDeleteResponseData()
.setThrottleTimeMs(0)
.setTopics(new OffsetDeleteResponseTopicCollection(singletonList(
Expand All @@ -135,7 +135,6 @@ private OffsetDeleteResponse buildPartitionErrorResponse(Errors error) {
).iterator()))
).iterator()))
);
return response;
}

private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleWithGroupError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,12 @@ public void testFailedHandleResponse() {
}

private DeleteGroupsResponse buildResponse(Errors error) {
DeleteGroupsResponse response = new DeleteGroupsResponse(
return new DeleteGroupsResponse(
new DeleteGroupsResponseData()
.setResults(new DeletableGroupResultCollection(singletonList(
new DeletableGroupResult()
.setErrorCode(error.code())
.setGroupId(groupId1)).iterator())));
return response;
}

private AdminApiHandler.ApiResult<CoordinatorKey, Void> handleWithError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ private ConsumerGroupDescribeResponse buildConsumerGroupDescribeResponse(Errors
}

private DescribeGroupsResponse buildResponse(Errors error, String protocolType) {
DescribeGroupsResponse response = new DescribeGroupsResponse(
return new DescribeGroupsResponse(
new DescribeGroupsResponseData()
.setGroups(singletonList(
new DescribedGroup()
Expand All @@ -314,7 +314,6 @@ private DescribeGroupsResponse buildResponse(Errors error, String protocolType)
.setMemberAssignment(ConsumerProtocol.serializeAssignment(
new Assignment(new ArrayList<>(tps))).array())
)))));
return response;
}

private AdminApiHandler.ApiResult<CoordinatorKey, ConsumerGroupDescription> handleClassicGroupWithError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,6 @@ private ApiResult<CoordinatorKey, ProducerIdAndEpoch> handleResponseError(
String transactionalId,
Errors error
) {
int brokerId = 1;

CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId);
Set<CoordinatorKey> keys = mkSet(key);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,27 +90,25 @@ public void testFailedHandleResponseInMemberLevel() {
}

private LeaveGroupResponse buildResponse(Errors error) {
LeaveGroupResponse response = new LeaveGroupResponse(
return new LeaveGroupResponse(
new LeaveGroupResponseData()
.setErrorCode(error.code())
.setMembers(singletonList(
new MemberResponse()
.setErrorCode(Errors.NONE.code())
.setMemberId("m1")
.setGroupInstanceId("m1-gii"))));
return response;
}

private LeaveGroupResponse buildResponseWithMemberError(Errors error) {
LeaveGroupResponse response = new LeaveGroupResponse(
return new LeaveGroupResponse(
new LeaveGroupResponseData()
.setErrorCode(Errors.NONE.code())
.setMembers(singletonList(
new MemberResponse()
.setErrorCode(error.code())
.setMemberId("m1")
.setGroupInstanceId("m1-gii"))));
return response;
}

private AdminApiHandler.ApiResult<CoordinatorKey, Map<MemberIdentity, Errors>> handleWithGroupError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

public class ConsumerGroupMetadataTest {

private String groupId = "group";
private final String groupId = "group";

@Test
public void testAssignmentConstructor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,12 @@ public class KafkaConsumerTest {
private final String memberId = "memberId";
private final String leaderId = "leaderId";
private final Optional<String> groupInstanceId = Optional.of("mock-instance");
private Map<String, Uuid> topicIds = Stream.of(
private final Map<String, Uuid> topicIds = Stream.of(
new AbstractMap.SimpleEntry<>(topic, topicId),
new AbstractMap.SimpleEntry<>(topic2, topicId2),
new AbstractMap.SimpleEntry<>(topic3, topicId3))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
private Map<Uuid, String> topicNames = Stream.of(
private final Map<Uuid, String> topicNames = Stream.of(
new AbstractMap.SimpleEntry<>(topicId, topic),
new AbstractMap.SimpleEntry<>(topicId2, topic2),
new AbstractMap.SimpleEntry<>(topicId3, topic3))
Expand Down Expand Up @@ -1840,63 +1840,33 @@ public void testOperationsBySubscribingConsumerWithDefaultGroupId(GroupProtocol
// OK, expected
}

try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupProtocol, (String) null)) {
consumer.subscribe(Collections.singleton(topic));
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupProtocol, null)) {
assertThrows(InvalidGroupIdException.class, () -> consumer.subscribe(Collections.singleton(topic)));
}

try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupProtocol, (String) null)) {
consumer.committed(Collections.singleton(tp0)).get(tp0);
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupProtocol, null)) {
assertThrows(InvalidGroupIdException.class, () -> consumer.committed(Collections.singleton(tp0)).get(tp0));
}

try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupProtocol, (String) null)) {
consumer.commitAsync();
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupProtocol, null)) {
assertThrows(InvalidGroupIdException.class, () -> consumer.commitAsync());
}

try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupProtocol, (String) null)) {
consumer.commitSync();
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupProtocol, null)) {
assertThrows(InvalidGroupIdException.class, () -> consumer.commitSync());
}
}

@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testOperationsByAssigningConsumerWithDefaultGroupId(GroupProtocol groupProtocol) {
KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupProtocol, null);
consumer.assign(singleton(tp0));
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupProtocol, null)) {
consumer.assign(singleton(tp0));

try {
consumer.committed(Collections.singleton(tp0)).get(tp0);
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
}

try {
consumer.commitAsync();
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
assertThrows(InvalidGroupIdException.class, () -> consumer.committed(Collections.singleton(tp0)).get(tp0));
assertThrows(InvalidGroupIdException.class, () -> consumer.commitAsync());
assertThrows(InvalidGroupIdException.class, () -> consumer.commitSync());
}

try {
consumer.commitSync();
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
}

consumer.close();
}

@ParameterizedTest
Expand Down Expand Up @@ -2055,12 +2025,7 @@ private void consumerCloseTest(GroupProtocol groupProtocol,
}
if (i < nonCloseRequests) {
// the close request should not complete until non-close requests (commit requests) have completed.
try {
future.get(100, TimeUnit.MILLISECONDS);
fail("Close completed without waiting for response");
} catch (TimeoutException e) {
// Expected exception
}
assertThrows(TimeoutException.class, () -> future.get(100, TimeUnit.MILLISECONDS));
}
}

Expand Down Expand Up @@ -2288,23 +2253,15 @@ public void testRebalanceException(GroupProtocol groupProtocol) {
client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), Errors.NONE), coordinator);

// assign throws
try {
consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
fail("Should throw exception");
} catch (Throwable e) {
assertEquals(partitionAssigned + singleTopicPartition, e.getCause().getMessage());
}
KafkaException exc = assertThrows(KafkaException.class, () -> consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE)));
assertEquals(partitionAssigned + singleTopicPartition, exc.getCause().getMessage());

// the assignment is still updated regardless of the exception
assertEquals(singleton(tp0), subscription.assignedPartitions());

// close's revoke throws
try {
consumer.close(Duration.ofMillis(0));
fail("Should throw exception");
} catch (Throwable e) {
assertEquals(partitionRevoked + singleTopicPartition, e.getCause().getCause().getMessage());
}
exc = assertThrows(KafkaException.class, () -> consumer.close(Duration.ofMillis(0)));
assertEquals(partitionRevoked + singleTopicPartition, exc.getCause().getCause().getMessage());

consumer.close(Duration.ofMillis(0));

Expand Down
Loading