diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 31c2cc7fcf45e..f6cabe31114be 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -283,7 +283,6 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; /** * A unit test for KafkaAdminClient. @@ -328,7 +327,7 @@ public void testParseSuccessfulDescribeClusterResponse(boolean includeController setControllerId(includeController ? 0 : -1). setEndpointType(EndpointType.CONTROLLER.id()). setClusterId("Ek8tjqq1QBWfnaoyHFZqDg"). - setBrokers(new DescribeClusterResponseData.DescribeClusterBrokerCollection(Arrays.asList( + setBrokers(new DescribeClusterResponseData.DescribeClusterBrokerCollection(asList( new DescribeClusterBroker(). setBrokerId(0). setHost("controller0.com"). @@ -348,7 +347,7 @@ public void testParseSuccessfulDescribeClusterResponse(boolean includeController assertNull(cluster.controller()); } assertEquals("Ek8tjqq1QBWfnaoyHFZqDg", cluster.clusterResource().clusterId()); - assertEquals(new HashSet<>(Arrays.asList( + assertEquals(new HashSet<>(asList( new Node(0, "controller0.com", 9092), new Node(1, "controller1.com", 9092), new Node(2, "controller2.com", 9092))), new HashSet<>(cluster.nodes())); @@ -584,7 +583,7 @@ public static DeletableTopicResult deletableTopicResultWithId(Uuid topicId, Erro public static CreatePartitionsResponse prepareCreatePartitionsResponse(int throttleTimeMs, CreatePartitionsTopicResult... topics) { CreatePartitionsResponseData data = new CreatePartitionsResponseData() .setThrottleTimeMs(throttleTimeMs) - .setResults(Arrays.asList(topics)); + .setResults(asList(topics)); return new CreatePartitionsResponse(data); } @@ -1371,7 +1370,7 @@ private MockClient.RequestMatcher expectDeleteTopicsRequestWithTopics(final Stri return body -> { if (body instanceof DeleteTopicsRequest) { DeleteTopicsRequest request = (DeleteTopicsRequest) body; - return request.topicNames().equals(Arrays.asList(topics)); + return request.topicNames().equals(asList(topics)); } return false; }; @@ -1381,7 +1380,7 @@ private MockClient.RequestMatcher expectDeleteTopicsRequestWithTopicIds(final Uu return body -> { if (body instanceof DeleteTopicsRequest) { DeleteTopicsRequest request = (DeleteTopicsRequest) body; - return request.topicIds().equals(Arrays.asList(topicIds)); + return request.topicIds().equals(asList(topicIds)); } return false; }; @@ -1421,7 +1420,7 @@ public void testInvalidTopicNames() throws Exception { @SuppressWarnings("NPathComplexity") @Test - public void testDescribeTopicsWithDescribeTopicPartitionsApiBasic() { + public void testDescribeTopicsWithDescribeTopicPartitionsApiBasic() throws ExecutionException, InterruptedException { try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); String topicName0 = "test-0"; @@ -1463,22 +1462,19 @@ public void testDescribeTopicsWithDescribeTopicPartitionsApiBasic() { DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor(); return cursor != null && cursor.topicName() == topicName0 && cursor.partitionIndex() == 1; }, new DescribeTopicPartitionsResponse(dataSecondPart)); - try { - DescribeTopicsResult result = env.adminClient().describeTopics( - Arrays.asList(topicName0, topicName1), new DescribeTopicsOptions() - ); - Map topicDescriptions = result.allTopicNames().get(); - assertEquals(2, topicDescriptions.size()); - TopicDescription topicDescription = topicDescriptions.get(topicName0); - assertEquals(2, topicDescription.partitions().size()); - assertEquals(0, topicDescription.partitions().get(0).partition()); - assertEquals(1, topicDescription.partitions().get(1).partition()); - topicDescription = topicDescriptions.get(topicName1); - assertNull(topicDescription.authorizedOperations()); - assertEquals(1, topicDescription.partitions().size()); - } catch (Exception e) { - fail("describe using DescribeTopics API should not fail", e); - } + + DescribeTopicsResult result = env.adminClient().describeTopics( + asList(topicName0, topicName1), new DescribeTopicsOptions() + ); + Map topicDescriptions = result.allTopicNames().get(); + assertEquals(2, topicDescriptions.size()); + TopicDescription topicDescription = topicDescriptions.get(topicName0); + assertEquals(2, topicDescription.partitions().size()); + assertEquals(0, topicDescription.partitions().get(0).partition()); + assertEquals(1, topicDescription.partitions().get(1).partition()); + topicDescription = topicDescriptions.get(topicName1); + assertEquals(1, topicDescription.partitions().size()); + assertNull(topicDescription.authorizedOperations()); } } @@ -1555,7 +1551,7 @@ public void testDescribeTopicPartitionsApiWithoutAuthorizedOps() throws Executio @SuppressWarnings({"NPathComplexity", "CyclomaticComplexity"}) @Test - public void testDescribeTopicsWithDescribeTopicPartitionsApiEdgeCase() { + public void testDescribeTopicsWithDescribeTopicPartitionsApiEdgeCase() throws ExecutionException, InterruptedException { try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); String topicName0 = "test-0"; @@ -1613,23 +1609,20 @@ public void testDescribeTopicsWithDescribeTopicPartitionsApiEdgeCase() { DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor(); return cursor != null && cursor.topicName().equals(topicName2) && cursor.partitionIndex() == 1; }, new DescribeTopicPartitionsResponse(dataThirdPart)); - try { - DescribeTopicsResult result = env.adminClient().describeTopics( - Arrays.asList(topicName1, topicName0, topicName2), new DescribeTopicsOptions() - ); - Map topicDescriptions = result.allTopicNames().get(); - assertEquals(3, topicDescriptions.size()); - TopicDescription topicDescription = topicDescriptions.get(topicName0); - assertEquals(1, topicDescription.partitions().size()); - assertEquals(0, topicDescription.partitions().get(0).partition()); - topicDescription = topicDescriptions.get(topicName1); - assertEquals(2, topicDescription.partitions().size()); - topicDescription = topicDescriptions.get(topicName2); - assertEquals(2, topicDescription.partitions().size()); - assertNull(topicDescription.authorizedOperations()); - } catch (Exception e) { - fail("describe using DescribeTopics API should not fail", e); - } + + DescribeTopicsResult result = env.adminClient().describeTopics( + asList(topicName1, topicName0, topicName2), new DescribeTopicsOptions() + ); + Map topicDescriptions = result.allTopicNames().get(); + assertEquals(3, topicDescriptions.size()); + TopicDescription topicDescription = topicDescriptions.get(topicName0); + assertEquals(1, topicDescription.partitions().size()); + assertEquals(0, topicDescription.partitions().get(0).partition()); + topicDescription = topicDescriptions.get(topicName1); + assertEquals(2, topicDescription.partitions().size()); + topicDescription = topicDescriptions.get(topicName2); + assertEquals(2, topicDescription.partitions().size()); + assertNull(topicDescription.authorizedOperations()); } } @@ -1648,7 +1641,7 @@ private void addPartitionToDescribeTopicPartitionsResponse( .setEligibleLeaderReplicas(singletonList(1)) .setLastKnownElr(singletonList(2)) .setPartitionIndex(partition) - .setReplicaNodes(Arrays.asList(0, 1, 2))) + .setReplicaNodes(asList(0, 1, 2))) ); data.topics().add(new DescribeTopicPartitionsResponseTopic() .setErrorCode((short) 0) @@ -1660,7 +1653,7 @@ private void addPartitionToDescribeTopicPartitionsResponse( @SuppressWarnings("NPathComplexity") @Test - public void testDescribeTopicsWithDescribeTopicPartitionsApiErrorHandling() { + public void testDescribeTopicsWithDescribeTopicPartitionsApiErrorHandling() throws InterruptedException { try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); String topicName0 = "test-0"; @@ -1707,13 +1700,11 @@ public void testDescribeTopicsWithDescribeTopicPartitionsApiErrorHandling() { return request.cursor() == null; }, new DescribeTopicPartitionsResponse(dataFirstPart)); DescribeTopicsResult result = env.adminClient().describeTopics( - Arrays.asList(topicName1, topicName0), new DescribeTopicsOptions() + asList(topicName1, topicName0), new DescribeTopicsOptions() ); - try { - TestUtils.assertFutureError(result.allTopicNames(), TopicAuthorizationException.class); - } catch (Exception e) { - fail("describe using DescribeTopics API should not have other exceptions", e); - } + + TestUtils.assertFutureError(result.allTopicNames(), TopicAuthorizationException.class); + } } @@ -2723,7 +2714,7 @@ public void testDeleteRecords() throws Exception { } @Test - public void testDescribeTopicsByIds() { + public void testDescribeTopicsByIds() throws ExecutionException, InterruptedException { try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); @@ -2748,13 +2739,10 @@ public void testDescribeTopicsByIds() { singletonList(partitionMetadata), MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED)))); TopicCollection.TopicIdCollection topicIds = TopicCollection.ofTopicIds( singletonList(topicId)); - try { - DescribeTopicsResult result = env.adminClient().describeTopics(topicIds); - Map allTopicIds = result.allTopicIds().get(); - assertEquals(topicName, allTopicIds.get(topicId).name()); - } catch (Exception e) { - fail("describe with valid topicId should not fail", e); - } + + DescribeTopicsResult describeTopicsresult = env.adminClient().describeTopics(topicIds); + Map allTopicIds = describeTopicsresult.allTopicIds().get(); + assertEquals(topicName, allTopicIds.get(topicId).name()); // ID not exist in brokers Uuid nonExistID = Uuid.randomUuid(); @@ -2764,29 +2752,18 @@ public void testDescribeTopicsByIds() { env.cluster().clusterResource().clusterId(), env.cluster().controller().id(), emptyList())); - try { - DescribeTopicsResult result = env.adminClient().describeTopics( - TopicCollection.ofTopicIds(singletonList(nonExistID))); - TestUtils.assertFutureError(result.allTopicIds(), UnknownTopicIdException.class); - result.allTopicIds().get(); - fail("describe with non-exist topic ID should throw exception"); - } catch (Exception e) { - assertEquals( - String.format("org.apache.kafka.common.errors.UnknownTopicIdException: TopicId %s not found.", nonExistID), - e.getMessage()); - } - // Invalid ID - try { - DescribeTopicsResult result = env.adminClient().describeTopics( - TopicCollection.ofTopicIds(singletonList(Uuid.ZERO_UUID))); - TestUtils.assertFutureError(result.allTopicIds(), InvalidTopicException.class); - result.allTopicIds().get(); - fail("describe with Uuid.ZERO_UUID should throw exception"); - } catch (Exception e) { - assertEquals("The given topic id 'AAAAAAAAAAAAAAAAAAAAAA' cannot be represented in a request.", - e.getCause().getMessage()); - } + DescribeTopicsResult result1 = env.adminClient().describeTopics( + TopicCollection.ofTopicIds(singletonList(nonExistID))); + TestUtils.assertFutureError(result1.allTopicIds(), UnknownTopicIdException.class); + Exception e = assertThrows(Exception.class, () -> result1.allTopicIds().get(), "describe with non-exist topic ID should throw exception"); + assertEquals(String.format("org.apache.kafka.common.errors.UnknownTopicIdException: TopicId %s not found.", nonExistID), e.getMessage()); + + DescribeTopicsResult result2 = env.adminClient().describeTopics( + TopicCollection.ofTopicIds(singletonList(Uuid.ZERO_UUID))); + TestUtils.assertFutureError(result2.allTopicIds(), InvalidTopicException.class); + e = assertThrows(Exception.class, () -> result2.allTopicIds().get(), "describe with non-exist topic ID should throw exception"); + assertEquals("The given topic id 'AAAAAAAAAAAAAAAAAAAAAA' cannot be represented in a request.", e.getCause().getMessage()); } } @@ -2825,7 +2802,7 @@ public void testDescribeCluster() throws Exception { assertEquals(env.cluster().clusterResource().clusterId(), result2.clusterId().get()); assertEquals(new HashSet<>(env.cluster().nodes()), new HashSet<>(result2.nodes().get())); assertEquals(3, result2.controller().get().id()); - assertEquals(new HashSet<>(Arrays.asList(AclOperation.DESCRIBE, AclOperation.ALTER)), + assertEquals(new HashSet<>(asList(AclOperation.DESCRIBE, AclOperation.ALTER)), result2.authorizedOperations().get()); } } @@ -2933,7 +2910,7 @@ public void testListConsumerGroups() throws Exception { new ListGroupsResponse( new ListGroupsResponseData() .setErrorCode(Errors.NONE.code()) - .setGroups(Arrays.asList( + .setGroups(asList( new ListGroupsResponseData.ListedGroup() .setGroupId("group-1") .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) @@ -2964,7 +2941,7 @@ public void testListConsumerGroups() throws Exception { new ListGroupsResponse( new ListGroupsResponseData() .setErrorCode(Errors.NONE.code()) - .setGroups(Arrays.asList( + .setGroups(asList( new ListGroupsResponseData.ListedGroup() .setGroupId("group-2") .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) @@ -2980,7 +2957,7 @@ public void testListConsumerGroups() throws Exception { new ListGroupsResponse( new ListGroupsResponseData() .setErrorCode(Errors.NONE.code()) - .setGroups(Arrays.asList( + .setGroups(asList( new ListGroupsResponseData.ListedGroup() .setGroupId("group-3") .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) @@ -3050,7 +3027,7 @@ public void testListConsumerGroupsWithStates() throws Exception { env.kafkaClient().prepareResponseFrom( new ListGroupsResponse(new ListGroupsResponseData() .setErrorCode(Errors.NONE.code()) - .setGroups(Arrays.asList( + .setGroups(asList( new ListGroupsResponseData.ListedGroup() .setGroupId("group-1") .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) @@ -3110,7 +3087,7 @@ public void testListConsumerGroupsWithTypes() throws Exception { expectListGroupsRequestWithFilters(Collections.emptySet(), singleton(GroupType.CONSUMER.toString())), new ListGroupsResponse(new ListGroupsResponseData() .setErrorCode(Errors.NONE.code()) - .setGroups(Arrays.asList( + .setGroups(asList( new ListGroupsResponseData.ListedGroup() .setGroupId("group-1") .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) @@ -3690,7 +3667,7 @@ public void testDescribeOldAndNewConsumerGroups() throws Exception { env.kafkaClient().prepareResponse(new FindCoordinatorResponse( new FindCoordinatorResponseData() - .setCoordinators(Arrays.asList( + .setCoordinators(asList( FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, "grp1", env.cluster().controller()), FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, "grp2", env.cluster().controller()) )) @@ -3698,7 +3675,7 @@ public void testDescribeOldAndNewConsumerGroups() throws Exception { env.kafkaClient().prepareResponse(new ConsumerGroupDescribeResponse( new ConsumerGroupDescribeResponseData() - .setGroups(Arrays.asList( + .setGroups(asList( new ConsumerGroupDescribeResponseData.DescribedGroup() .setGroupId("grp1") .setGroupState("Stable") @@ -3766,7 +3743,7 @@ public void testDescribeOldAndNewConsumerGroups() throws Exception { )); DescribeConsumerGroupsResult result = env.adminClient() - .describeConsumerGroups(Arrays.asList("grp1", "grp2")); + .describeConsumerGroups(asList("grp1", "grp2")); Map expectedResult = new HashMap<>(); expectedResult.put("grp1", new ConsumerGroupDescription( @@ -3974,7 +3951,7 @@ public void testListConsumerGroupOffsetsRetriableErrors() throws Exception { @Test public void testListConsumerGroupOffsetsNonRetriableErrors() throws Exception { // Non-retriable errors throw an exception - final List nonRetriableErrors = Arrays.asList( + final List nonRetriableErrors = asList( Errors.GROUP_AUTHORIZATION_FAILED, Errors.INVALID_GROUP_ID, Errors.GROUP_ID_NOT_FOUND, Errors.UNKNOWN_MEMBER_ID, Errors.STALE_MEMBER_EPOCH); @@ -4081,7 +4058,7 @@ public void testBatchedListConsumerGroupOffsetsWithNoFindCoordinatorBatching() t .setMaxVersion((short) 7); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0")) { - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Arrays.asList(findCoordinatorV3, offsetFetchV7))); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(asList(findCoordinatorV3, offsetFetchV7))); env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); @@ -4283,7 +4260,7 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { .setMaxVersion(ApiKeys.DELETE_GROUPS.latestVersion()); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Arrays.asList(findCoordinatorV3, describeGroups))); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(asList(findCoordinatorV3, describeGroups))); // Retriable FindCoordinatorResponse errors should be retried env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); @@ -4380,7 +4357,7 @@ public void testDeleteMultipleConsumerGroupsWithOlderBroker() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions( - NodeApiVersions.create(Arrays.asList(findCoordinatorV3, describeGroups))); + NodeApiVersions.create(asList(findCoordinatorV3, describeGroups))); // Dummy response for MockClient to handle the UnsupportedVersionException correctly to switch from batched to un-batched env.kafkaClient().prepareResponse(null); @@ -4579,7 +4556,7 @@ public void testDeleteConsumerGroupOffsetsNonRetriableErrors() throws Exception // Non-retriable errors throw an exception final TopicPartition tp1 = new TopicPartition("foo", 0); - final List nonRetriableErrors = Arrays.asList( + final List nonRetriableErrors = asList( Errors.GROUP_AUTHORIZATION_FAILED, Errors.INVALID_GROUP_ID, Errors.GROUP_ID_NOT_FOUND); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { @@ -4738,7 +4715,7 @@ public void testRemoveMembersFromGroupNumRetries() throws Exception { env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NOT_COORDINATOR.code()))); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - Collection membersToRemove = Arrays.asList(new MemberToRemove("instance-1"), new MemberToRemove("instance-2")); + Collection membersToRemove = asList(new MemberToRemove("instance-1"), new MemberToRemove("instance-2")); final RemoveMembersFromConsumerGroupResult result = env.adminClient().removeMembersFromConsumerGroup( GROUP_ID, new RemoveMembersFromConsumerGroupOptions(membersToRemove)); @@ -4859,7 +4836,7 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception { public void testRemoveMembersFromGroupNonRetriableErrors() throws Exception { // Non-retriable errors throw an exception - final List nonRetriableErrors = Arrays.asList( + final List nonRetriableErrors = asList( Errors.GROUP_AUTHORIZATION_FAILED, Errors.INVALID_GROUP_ID, Errors.GROUP_ID_NOT_FOUND); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { @@ -4905,7 +4882,7 @@ public void testRemoveMembersFromGroup() throws Exception { env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData() .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()))); - Collection membersToRemove = Arrays.asList(new MemberToRemove(instanceOne), + Collection membersToRemove = asList(new MemberToRemove(instanceOne), new MemberToRemove(instanceTwo)); final RemoveMembersFromConsumerGroupResult unknownErrorResult = env.adminClient().removeMembersFromConsumerGroup( GROUP_ID, @@ -4930,7 +4907,7 @@ public void testRemoveMembersFromGroup() throws Exception { env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData() .setErrorCode(Errors.NONE.code()) - .setMembers(Arrays.asList(responseOne, responseTwo)))); + .setMembers(asList(responseOne, responseTwo)))); final RemoveMembersFromConsumerGroupResult memberLevelErrorResult = env.adminClient().removeMembersFromConsumerGroup( GROUP_ID, @@ -4962,7 +4939,7 @@ public void testRemoveMembersFromGroup() throws Exception { env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); env.kafkaClient().prepareResponse(new LeaveGroupResponse( new LeaveGroupResponseData().setErrorCode(Errors.NONE.code()).setMembers( - Arrays.asList(responseTwo, + asList(responseTwo, new MemberResponse().setGroupInstanceId(instanceOne).setErrorCode(Errors.NONE.code()) )) )); @@ -4979,7 +4956,7 @@ public void testRemoveMembersFromGroup() throws Exception { final List topicPartitions = Stream.of(1, 2, 3).map(partition -> new TopicPartition("my_topic", partition)) .collect(Collectors.toList()); // construct the DescribeGroupsResponse - DescribeGroupsResponseData data = prepareDescribeGroupsResponseData(GROUP_ID, Arrays.asList(instanceOne, instanceTwo), topicPartitions); + DescribeGroupsResponseData data = prepareDescribeGroupsResponseData(GROUP_ID, asList(instanceOne, instanceTwo), topicPartitions); // Return with partial failure for "removeAll" scenario // 1 prepare response for AdminClient.describeConsumerGroups @@ -4990,7 +4967,7 @@ public void testRemoveMembersFromGroup() throws Exception { env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); env.kafkaClient().prepareResponse(new LeaveGroupResponse( new LeaveGroupResponseData().setErrorCode(Errors.NONE.code()).setMembers( - Arrays.asList(responseOne, responseTwo)) + asList(responseOne, responseTwo)) )); final RemoveMembersFromConsumerGroupResult partialFailureResults = env.adminClient().removeMembersFromConsumerGroup( GROUP_ID, @@ -5009,7 +4986,7 @@ public void testRemoveMembersFromGroup() throws Exception { env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); env.kafkaClient().prepareResponse(new LeaveGroupResponse( new LeaveGroupResponseData().setErrorCode(Errors.NONE.code()).setMembers( - Arrays.asList(responseTwo, + asList(responseTwo, new MemberResponse().setGroupInstanceId(instanceOne).setErrorCode(Errors.NONE.code()) )) )); @@ -5040,7 +5017,7 @@ private void testRemoveMembersFromGroup(String reason, String expectedReason) th member -> member.reason().equals(expectedReason) ); }, new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code()).setMembers( - Arrays.asList( + asList( new MemberResponse().setGroupInstanceId("instance-1"), new MemberResponse().setGroupInstanceId("instance-2") )) @@ -5049,7 +5026,7 @@ private void testRemoveMembersFromGroup(String reason, String expectedReason) th MemberToRemove memberToRemove1 = new MemberToRemove("instance-1"); MemberToRemove memberToRemove2 = new MemberToRemove("instance-2"); - RemoveMembersFromConsumerGroupOptions options = new RemoveMembersFromConsumerGroupOptions(Arrays.asList( + RemoveMembersFromConsumerGroupOptions options = new RemoveMembersFromConsumerGroupOptions(asList( memberToRemove1, memberToRemove2 )); @@ -5093,7 +5070,7 @@ public void testAlterPartitionReassignments() throws Exception { TopicPartition tp2 = new TopicPartition("B", 0); Map> reassignments = new HashMap<>(); reassignments.put(tp1, Optional.empty()); - reassignments.put(tp2, Optional.of(new NewPartitionReassignment(Arrays.asList(1, 2, 3)))); + reassignments.put(tp2, Optional.of(new NewPartitionReassignment(asList(1, 2, 3)))); // 1. server returns less responses than number of partitions we sent AlterPartitionReassignmentsResponseData responseData1 = new AlterPartitionReassignmentsResponseData(); @@ -5114,7 +5091,7 @@ public void testAlterPartitionReassignments() throws Exception { new AlterPartitionReassignmentsResponseData() .setErrorCode(Errors.NOT_CONTROLLER.code()) .setErrorMessage(Errors.NOT_CONTROLLER.message()) - .setResponses(Arrays.asList( + .setResponses(asList( new ReassignableTopicResponse() .setName("A") .setPartitions(Collections.singletonList(normalPartitionResponse)), @@ -5126,7 +5103,7 @@ public void testAlterPartitionReassignments() throws Exception { env.cluster().clusterResource().clusterId(), 1, Collections.emptyList()); AlterPartitionReassignmentsResponseData normalResponse = new AlterPartitionReassignmentsResponseData() - .setResponses(Arrays.asList( + .setResponses(asList( new ReassignableTopicResponse() .setName("A") .setPartitions(Collections.singletonList(normalPartitionResponse)), @@ -5145,7 +5122,7 @@ public void testAlterPartitionReassignments() throws Exception { // 3. partition-level error AlterPartitionReassignmentsResponseData partitionLevelErrData = new AlterPartitionReassignmentsResponseData() - .setResponses(Arrays.asList( + .setResponses(asList( new ReassignableTopicResponse() .setName("A") .setPartitions(Collections.singletonList(new ReassignablePartitionResponse() @@ -5167,7 +5144,7 @@ public void testAlterPartitionReassignments() throws Exception { new AlterPartitionReassignmentsResponseData() .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()) .setErrorMessage(errorMessage) - .setResponses(Arrays.asList( + .setResponses(asList( new ReassignableTopicResponse() .setName("A") .setPartitions(Collections.singletonList(normalPartitionResponse)), @@ -5185,9 +5162,9 @@ public void testAlterPartitionReassignments() throws Exception { TopicPartition invalidTopicTP = new TopicPartition("", 0); TopicPartition invalidPartitionTP = new TopicPartition("ABC", -1); Map> invalidTopicReassignments = new HashMap<>(); - invalidTopicReassignments.put(invalidPartitionTP, Optional.of(new NewPartitionReassignment(Arrays.asList(1, 2, 3)))); - invalidTopicReassignments.put(invalidTopicTP, Optional.of(new NewPartitionReassignment(Arrays.asList(1, 2, 3)))); - invalidTopicReassignments.put(tp1, Optional.of(new NewPartitionReassignment(Arrays.asList(1, 2, 3)))); + invalidTopicReassignments.put(invalidPartitionTP, Optional.of(new NewPartitionReassignment(asList(1, 2, 3)))); + invalidTopicReassignments.put(invalidTopicTP, Optional.of(new NewPartitionReassignment(asList(1, 2, 3)))); + invalidTopicReassignments.put(tp1, Optional.of(new NewPartitionReassignment(asList(1, 2, 3)))); AlterPartitionReassignmentsResponseData singlePartResponseData = new AlterPartitionReassignmentsResponseData() @@ -5207,7 +5184,7 @@ public void testAlterPartitionReassignments() throws Exception { new AlterPartitionReassignmentsResponseData() .setErrorCode(Errors.NONE.code()) .setErrorMessage(Errors.NONE.message()) - .setResponses(Arrays.asList( + .setResponses(asList( new ReassignableTopicResponse() .setName("A") .setPartitions(Collections.singletonList(normalPartitionResponse)), @@ -5231,18 +5208,18 @@ public void testListPartitionReassignments() throws Exception { TopicPartition tp1 = new TopicPartition("A", 0); OngoingPartitionReassignment tp1PartitionReassignment = new OngoingPartitionReassignment() .setPartitionIndex(0) - .setRemovingReplicas(Arrays.asList(1, 2, 3)) - .setAddingReplicas(Arrays.asList(4, 5, 6)) - .setReplicas(Arrays.asList(1, 2, 3, 4, 5, 6)); + .setRemovingReplicas(asList(1, 2, 3)) + .setAddingReplicas(asList(4, 5, 6)) + .setReplicas(asList(1, 2, 3, 4, 5, 6)); OngoingTopicReassignment tp1Reassignment = new OngoingTopicReassignment().setName("A") .setPartitions(Collections.singletonList(tp1PartitionReassignment)); TopicPartition tp2 = new TopicPartition("B", 0); OngoingPartitionReassignment tp2PartitionReassignment = new OngoingPartitionReassignment() .setPartitionIndex(0) - .setRemovingReplicas(Arrays.asList(1, 2, 3)) - .setAddingReplicas(Arrays.asList(4, 5, 6)) - .setReplicas(Arrays.asList(1, 2, 3, 4, 5, 6)); + .setRemovingReplicas(asList(1, 2, 3)) + .setAddingReplicas(asList(4, 5, 6)) + .setReplicas(asList(1, 2, 3, 4, 5, 6)); OngoingTopicReassignment tp2Reassignment = new OngoingTopicReassignment().setName("B") .setPartitions(Collections.singletonList(tp2PartitionReassignment)); @@ -5253,7 +5230,7 @@ public void testListPartitionReassignments() throws Exception { MetadataResponse controllerNodeResponse = RequestTestUtils.metadataResponse(env.cluster().nodes(), env.cluster().clusterResource().clusterId(), 1, Collections.emptyList()); ListPartitionReassignmentsResponseData reassignmentsData = new ListPartitionReassignmentsResponseData() - .setTopics(Arrays.asList(tp1Reassignment, tp2Reassignment)); + .setTopics(asList(tp1Reassignment, tp2Reassignment)); env.kafkaClient().prepareResponse(new ListPartitionReassignmentsResponse(notControllerData)); env.kafkaClient().prepareResponse(controllerNodeResponse); env.kafkaClient().prepareResponse(new ListPartitionReassignmentsResponse(reassignmentsData)); @@ -5267,12 +5244,12 @@ public void testListPartitionReassignments() throws Exception { .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()); env.kafkaClient().prepareResponse(new ListPartitionReassignmentsResponse(unknownTpData)); - ListPartitionReassignmentsResult unknownTpResult = env.adminClient().listPartitionReassignments(new HashSet<>(Arrays.asList(tp1, tp2))); + ListPartitionReassignmentsResult unknownTpResult = env.adminClient().listPartitionReassignments(new HashSet<>(asList(tp1, tp2))); TestUtils.assertFutureError(unknownTpResult.reassignments(), UnknownTopicOrPartitionException.class); // 3. Success ListPartitionReassignmentsResponseData responseData = new ListPartitionReassignmentsResponseData() - .setTopics(Arrays.asList(tp1Reassignment, tp2Reassignment)); + .setTopics(asList(tp1Reassignment, tp2Reassignment)); env.kafkaClient().prepareResponse(new ListPartitionReassignmentsResponse(responseData)); ListPartitionReassignmentsResult responseResult = env.adminClient().listPartitionReassignments(); @@ -5371,7 +5348,7 @@ public void testAlterConsumerGroupOffsetsNonRetriableErrors() throws Exception { // Non-retriable errors throw an exception final TopicPartition tp1 = new TopicPartition("foo", 0); - final List nonRetriableErrors = Arrays.asList( + final List nonRetriableErrors = asList( Errors.GROUP_AUTHORIZATION_FAILED, Errors.INVALID_GROUP_ID, Errors.GROUP_ID_NOT_FOUND, Errors.STALE_MEMBER_EPOCH); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { @@ -5481,7 +5458,7 @@ public void testListOffsets() throws Exception { ListOffsetsTopicResponse t3 = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp3, Errors.NONE, 234567890L, 456L, 654); ListOffsetsResponseData responseData = new ListOffsetsResponseData() .setThrottleTimeMs(0) - .setTopics(Arrays.asList(t0, t1, t2, t3)); + .setTopics(asList(t0, t1, t2, t3)); env.kafkaClient().prepareResponse(new ListOffsetsResponse(responseData)); Map partitions = new HashMap<>(); @@ -5509,10 +5486,7 @@ public void testListOffsets() throws Exception { assertEquals(offsets.get(tp1), result.partitionResult(tp1).get()); assertEquals(offsets.get(tp2), result.partitionResult(tp2).get()); assertEquals(offsets.get(tp3), result.partitionResult(tp3).get()); - try { - result.partitionResult(new TopicPartition("unknown", 0)).get(); - fail("should have thrown IllegalArgumentException"); - } catch (IllegalArgumentException expected) { } + assertThrows(IllegalArgumentException.class, () -> result.partitionResult(new TopicPartition("unknown", 0)).get()); } } @@ -5521,7 +5495,7 @@ public void testListOffsetsRetriableErrors() throws Exception { Node node0 = new Node(0, "localhost", 8120); Node node1 = new Node(1, "localhost", 8121); - List nodes = Arrays.asList(node0, node1); + List nodes = asList(node0, node1); List pInfos = new ArrayList<>(); pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0, node1}, new Node[]{node0, node1})); pInfos.add(new PartitionInfo("foo", 1, node0, new Node[]{node0, node1}, new Node[]{node0, node1})); @@ -5547,7 +5521,7 @@ public void testListOffsetsRetriableErrors() throws Exception { ListOffsetsTopicResponse t1 = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 987L, 789); ListOffsetsResponseData responseData = new ListOffsetsResponseData() .setThrottleTimeMs(0) - .setTopics(Arrays.asList(t0, t1)); + .setTopics(asList(t0, t1)); env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node0); // listoffsets response from broker 1 ListOffsetsTopicResponse t2 = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp2, Errors.NONE, -1L, 456L, 654); @@ -5590,7 +5564,7 @@ public void testListOffsetsNonRetriableErrors() throws Exception { Node node0 = new Node(0, "localhost", 8120); Node node1 = new Node(1, "localhost", 8121); - List nodes = Arrays.asList(node0, node1); + List nodes = asList(node0, node1); List pInfos = new ArrayList<>(); pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0, node1}, new Node[]{node0, node1})); final Cluster cluster = @@ -5731,13 +5705,13 @@ public void testListOffsetsHandlesFulfillmentTimeouts() throws Exception { ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 345L, 543); ListOffsetsResponseData responseDataWithError = new ListOffsetsResponseData() .setThrottleTimeMs(0) - .setTopics(Arrays.asList(tp0ErrorResponse, tp1Response)); + .setTopics(asList(tp0ErrorResponse, tp1Response)); ListOffsetsTopicResponse tp0Response = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp0, Errors.NONE, -1L, 789L, 987); ListOffsetsResponseData responseData = new ListOffsetsResponseData() .setThrottleTimeMs(0) - .setTopics(Arrays.asList(tp0Response, tp1Response)); + .setTopics(asList(tp0Response, tp1Response)); // Test that one-too-many timeouts for partition 0 result in partial success overall - // timeout for partition 0 and success for partition 1. @@ -6147,7 +6121,7 @@ public void testDescribeMetadataQuorumFailure() { public void testListOffsetsMetadataRetriableErrors() throws Exception { Node node0 = new Node(0, "localhost", 8120); Node node1 = new Node(1, "localhost", 8121); - List nodes = Arrays.asList(node0, node1); + List nodes = asList(node0, node1); List pInfos = new ArrayList<>(); pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new Node[]{node0})); pInfos.add(new PartitionInfo("foo", 1, node1, new Node[]{node1}, new Node[]{node1})); @@ -6205,13 +6179,13 @@ public void testListOffsetsWithMultiplePartitionsLeaderChange() throws Exception Node node0 = new Node(0, "localhost", 8120); Node node1 = new Node(1, "localhost", 8121); Node node2 = new Node(2, "localhost", 8122); - List nodes = Arrays.asList(node0, node1, node2); + List nodes = asList(node0, node1, node2); final PartitionInfo oldPInfo1 = new PartitionInfo("foo", 0, node0, new Node[]{node0, node1, node2}, new Node[]{node0, node1, node2}); final PartitionInfo oldPnfo2 = new PartitionInfo("foo", 1, node0, new Node[]{node0, node1, node2}, new Node[]{node0, node1, node2}); - List oldPInfos = Arrays.asList(oldPInfo1, oldPnfo2); + List oldPInfos = asList(oldPInfo1, oldPnfo2); final Cluster oldCluster = new Cluster("mockClusterId", nodes, oldPInfos, Collections.emptySet(), Collections.emptySet(), node0); @@ -6227,14 +6201,14 @@ public void testListOffsetsWithMultiplePartitionsLeaderChange() throws Exception ListOffsetsTopicResponse t1 = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.LEADER_NOT_AVAILABLE, -2L, 123L, 456); ListOffsetsResponseData responseData = new ListOffsetsResponseData() .setThrottleTimeMs(0) - .setTopics(Arrays.asList(t0, t1)); + .setTopics(asList(t0, t1)); env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node0); final PartitionInfo newPInfo1 = new PartitionInfo("foo", 0, node1, new Node[]{node0, node1, node2}, new Node[]{node0, node1, node2}); final PartitionInfo newPInfo2 = new PartitionInfo("foo", 1, node2, new Node[]{node0, node1, node2}, new Node[]{node0, node1, node2}); - List newPInfos = Arrays.asList(newPInfo1, newPInfo2); + List newPInfos = asList(newPInfo1, newPInfo2); final Cluster newCluster = new Cluster("mockClusterId", nodes, newPInfos, Collections.emptySet(), Collections.emptySet(), node0); @@ -6274,7 +6248,7 @@ public void testListOffsetsWithLeaderChange() throws Exception { Node node0 = new Node(0, "localhost", 8120); Node node1 = new Node(1, "localhost", 8121); Node node2 = new Node(2, "localhost", 8122); - List nodes = Arrays.asList(node0, node1, node2); + List nodes = asList(node0, node1, node2); final PartitionInfo oldPartitionInfo = new PartitionInfo("foo", 0, node0, new Node[]{node0, node1, node2}, new Node[]{node0, node1, node2}); @@ -6328,7 +6302,7 @@ public void testListOffsetsMetadataNonRetriableErrors( ) throws Exception { Node node0 = new Node(0, "localhost", 8120); Node node1 = new Node(1, "localhost", 8121); - List nodes = Arrays.asList(node0, node1); + List nodes = asList(node0, node1); List pInfos = new ArrayList<>(); pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0, node1}, new Node[]{node0, node1})); final Cluster cluster = @@ -6390,7 +6364,7 @@ private static Stream listOffsetsMetadataNonRetriableErrors() { public void testListOffsetsPartialResponse() throws Exception { Node node0 = new Node(0, "localhost", 8120); Node node1 = new Node(1, "localhost", 8121); - List nodes = Arrays.asList(node0, node1); + List nodes = asList(node0, node1); List pInfos = new ArrayList<>(); pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0, node1}, new Node[]{node0, node1})); pInfos.add(new PartitionInfo("foo", 1, node0, new Node[]{node0, node1}, new Node[]{node0, node1})); @@ -6429,7 +6403,7 @@ public void testListOffsetsPartialResponse() throws Exception { @Test public void testGetSubLevelError() { - List memberIdentities = Arrays.asList( + List memberIdentities = asList( new MemberIdentity().setGroupInstanceId("instance-0"), new MemberIdentity().setGroupInstanceId("instance-1")); Map errorsMap = new HashMap<>(); @@ -6808,10 +6782,10 @@ public void testDescribeUserScramCredentials() throws Exception { user1CredentialInfo.setIterations(user1Iterations); final DescribeUserScramCredentialsResponseData responseData = new DescribeUserScramCredentialsResponseData(); - responseData.setResults(Arrays.asList( + responseData.setResults(asList( new DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult() .setUser(user0Name) - .setCredentialInfos(Arrays.asList(user0CredentialInfo0, user0CredentialInfo1)), + .setCredentialInfos(asList(user0CredentialInfo0, user0CredentialInfo1)), new DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult() .setUser(user1Name) .setCredentialInfos(singletonList(user1CredentialInfo)))); @@ -6855,7 +6829,7 @@ public void testDescribeUserScramCredentials() throws Exception { } @Test - public void testAlterUserScramCredentialsUnknownMechanism() { + public void testAlterUserScramCredentialsUnknownMechanism() throws ExecutionException, InterruptedException { try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); @@ -6874,7 +6848,7 @@ public void testAlterUserScramCredentialsUnknownMechanism() { env.kafkaClient().prepareResponse(new AlterUserScramCredentialsResponse(responseData)); - AlterUserScramCredentialsResult result = env.adminClient().alterUserScramCredentials(Arrays.asList( + AlterUserScramCredentialsResult result = env.adminClient().alterUserScramCredentials(asList( new UserScramCredentialDeletion(user0Name, user0ScramMechanism0), new UserScramCredentialUpsertion(user1Name, new ScramCredentialInfo(user1ScramMechanism0, 8192), "password"), new UserScramCredentialUpsertion(user2Name, new ScramCredentialInfo(user2ScramMechanism0, 4096), "password"))); @@ -6882,25 +6856,12 @@ public void testAlterUserScramCredentialsUnknownMechanism() { assertEquals(3, resultData.size()); Stream.of(user0Name, user1Name).forEach(u -> { assertTrue(resultData.containsKey(u)); - try { - resultData.get(u).get(); - fail("Expected request for user " + u + " to complete exceptionally, but it did not"); - } catch (Exception expected) { - // ignore - } + assertThrows(Exception.class, () -> resultData.get(u).get(), "Expected request for user " + u + " to complete exceptionally, but it did not"); }); assertTrue(resultData.containsKey(user2Name)); - try { - resultData.get(user2Name).get(); - } catch (Exception e) { - fail("Expected request for user " + user2Name + " to NOT complete excdptionally, but it did"); - } - try { - result.all().get(); - fail("Expected 'result.all().get()' to throw an exception since at least one user failed, but it did not"); - } catch (final Exception expected) { - // ignore, expected - } + resultData.get(user2Name).get(); + + assertThrows(Exception.class, () -> result.all().get(), "Expected 'result.all().get()' to throw an exception since at least one user failed, but it did not"); } } @@ -6923,7 +6884,7 @@ public void testAlterUserScramCredentials() { env.kafkaClient().prepareResponse(new AlterUserScramCredentialsResponse(responseData)); - AlterUserScramCredentialsResult result = env.adminClient().alterUserScramCredentials(Arrays.asList( + AlterUserScramCredentialsResult result = env.adminClient().alterUserScramCredentials(asList( new UserScramCredentialDeletion(user0Name, user0ScramMechanism0), new UserScramCredentialUpsertion(user0Name, new ScramCredentialInfo(user0ScramMechanism1, 8192), "password"), new UserScramCredentialUpsertion(user1Name, new ScramCredentialInfo(user1ScramMechanism0, 8192), "password"), @@ -6964,7 +6925,7 @@ public void testDescribeLogDirsPartialFailure() throws Exception { prepareDescribeLogDirsResponse(Errors.NONE, "/data"), env.cluster().nodeById(1)); - DescribeLogDirsResult result = env.adminClient().describeLogDirs(Arrays.asList(0, 1)); + DescribeLogDirsResult result = env.adminClient().describeLogDirs(asList(0, 1)); // Wait until the prepared attempt has been consumed TestUtils.waitForCondition(() -> env.kafkaClient().numAwaitingResponses() == 0, @@ -7086,7 +7047,7 @@ public void testDescribeProducers() throws Exception { Node leader = env.cluster().nodes().iterator().next(); expectMetadataRequest(env, topicPartition, leader); - List expected = Arrays.asList( + List expected = asList( new ProducerState(12345L, 15, 30, env.time().milliseconds(), OptionalInt.of(99), OptionalLong.empty()), new ProducerState(12345L, 15, 30, env.time().milliseconds(), @@ -7153,7 +7114,7 @@ public void testDescribeProducersRetryAfterDisconnect() throws Exception { Node initialLeader = nodeIterator.next(); expectMetadataRequest(env, topicPartition, initialLeader); - List expected = Arrays.asList( + List expected = asList( new ProducerState(12345L, 15, 30, env.time().milliseconds(), OptionalInt.of(99), OptionalLong.empty()), new ProducerState(12345L, 15, 30, env.time().milliseconds(), @@ -7395,7 +7356,7 @@ public void testListTransactions() throws Exception { MetadataResponseData.HIGHEST_SUPPORTED_VERSION) ); - List expected = Arrays.asList( + List expected = asList( new TransactionListing("foo", 12345L, TransactionState.ONGOING), new TransactionListing("bar", 98765L, TransactionState.PREPARE_ABORT), new TransactionListing("baz", 13579L, TransactionState.COMPLETE_COMMIT) @@ -7704,7 +7665,7 @@ private static MemberDescription convertToMemberDescriptions(DescribedGroupMembe @Test public void testListClientMetricsResources() throws Exception { try (AdminClientUnitTestEnv env = mockClientEnv()) { - List expected = Arrays.asList( + List expected = asList( new ClientMetricsResourceListing("one"), new ClientMetricsResourceListing("two") );