diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java index cd99b54c72a80..2ba86cfffea09 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java @@ -111,6 +111,7 @@ public ApiResult> handleResponse( List unmapped = new ArrayList<>(); Map partitions = new HashMap<>(); + int totalPartitionCount = 0; for (OffsetCommitResponseTopic topic : response.data().topics()) { for (OffsetCommitResponsePartition partition : topic.partitions()) { TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); @@ -120,9 +121,14 @@ public ApiResult> handleResponse( } else { partitions.put(tp, error); } + totalPartitionCount++; } } - if (failed.isEmpty() && unmapped.isEmpty()) + // only complete this request when: + // 1. no fail + // 2. no unmapped + // 3. all partitions are handled (i.e. no need to retry) + if (failed.isEmpty() && unmapped.isEmpty() && partitions.size() == totalPartitionCount) completed.put(groupId, partitions); return new ApiResult<>(completed, failed, unmapped); @@ -136,21 +142,28 @@ private void handleError( ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `OffsetCommit` response", groupId, - error.exception()); + log.error("Received authorization failure for group {} in `{}` response", groupId, + apiName(), error.exception()); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`{}` request for group {} failed because the coordinator" + + " is still in the process of loading state. Will retry.", apiName(), groupId); + break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("OffsetCommit request for group {} returned error {}. Will retry", groupId, error); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`{}` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry.", apiName(), groupId, error); unmapped.add(groupId); break; default: - log.error("Received unexpected error for group {} in `OffsetCommit` response", - groupId, error.exception()); - failed.put(groupId, error.exception( - "Received unexpected error for group " + groupId + " in `OffsetCommit` response")); + final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", + groupId, apiName()); + log.error(unexpectedErrorMsg, error.exception()); + failed.put(groupId, error.exception(unexpectedErrorMsg)); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java index 7e8b549b323c2..c4fe653334810 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java @@ -104,15 +104,18 @@ public ApiResult> handleResponse( final Errors error = Errors.forCode(response.data().errorCode()); if (error != Errors.NONE) { - handleError(groupId, error, failed, unmapped); + handleGroupLevelError(groupId, error, failed, unmapped); } else { final Map partitions = new HashMap<>(); response.data().topics().forEach(topic -> - topic.partitions().forEach(partition -> { - Errors partitionError = Errors.forCode(partition.errorCode()); - if (!handleError(groupId, partitionError, failed, unmapped)) { - partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); + topic.partitions().forEach(partitionOffsetDeleteResponse -> { + Errors partitionError = Errors.forCode(partitionOffsetDeleteResponse.errorCode()); + TopicPartition tp = new TopicPartition(topic.name(), partitionOffsetDeleteResponse.partitionIndex()); + if (log.isDebugEnabled() && partitionError != Errors.NONE) { + log.debug("`{}` request for group {} returned error {} in the partition {}.", + apiName(), groupId, partitionError, tp); } + partitions.put(tp, partitionError); }) ); if (!partitions.isEmpty()) @@ -121,7 +124,7 @@ public ApiResult> handleResponse( return new ApiResult<>(completed, failed, unmapped); } - private boolean handleError( + private void handleGroupLevelError( CoordinatorKey groupId, Errors error, Map failed, @@ -131,20 +134,29 @@ private boolean handleError( case GROUP_AUTHORIZATION_FAILED: case GROUP_ID_NOT_FOUND: case INVALID_GROUP_ID: - log.error("Received non retriable error for group {} in `DeleteConsumerGroupOffsets` response", groupId, - error.exception()); + case NON_EMPTY_GROUP: + log.error("Received non retriable error for group {} in `{}` response", groupId, + apiName(), error.exception()); failed.put(groupId, error.exception()); - return true; + break; case COORDINATOR_LOAD_IN_PROGRESS: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`{}` request for group {} failed because the coordinator" + + " is still in the process of loading state. Will retry.", apiName(), groupId); + break; case COORDINATOR_NOT_AVAILABLE: - return true; case NOT_COORDINATOR: - log.debug("DeleteConsumerGroupOffsets request for group {} returned error {}. Will retry", - groupId, error); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`{}` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry.", apiName(), groupId, error); unmapped.add(groupId); - return true; + break; default: - return false; + final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", + groupId, apiName()); + log.error(unexpectedErrorMsg, error.exception()); + failed.put(groupId, error.exception()); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java index c5d220538129a..09b7a7187fc49 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java @@ -112,21 +112,34 @@ private void handleError( ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `DeleteConsumerGroups` response", groupId, - error.exception()); + log.error("Received authorization failure for group {} in `{}` response", groupId, + apiName(), error.exception()); + failed.put(groupId, error.exception()); + break; + case INVALID_GROUP_ID: + case NON_EMPTY_GROUP: + case GROUP_ID_NOT_FOUND: + log.error("Received non retriable failure for group {} in `{}` response", groupId, + apiName(), error.exception()); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: - case COORDINATOR_NOT_AVAILABLE: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`{}` request for group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", apiName(), groupId); break; + case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("DeleteConsumerGroups request for group {} returned error {}. Will retry", - groupId, error); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`{}` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", apiName(), groupId, error); unmapped.add(groupId); break; default: - log.error("Received unexpected error for group {} in `DeleteConsumerGroups` response", - groupId, error.exception()); + final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", + groupId, apiName()); + log.error(unexpectedErrorMsg, error.exception()); failed.put(groupId, error.exception()); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java index 8a94becef1a07..978576c0c95cc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java @@ -166,23 +166,28 @@ private void handleError( ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `DescribeGroups` response", groupId, - error.exception()); + log.error("Received authorization failure for group {} in `{}` response", groupId, + apiName(), error.exception()); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: - case COORDINATOR_NOT_AVAILABLE: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`{}` request for group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", apiName(), groupId); break; + case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("DescribeGroups request for group {} returned error {}. Will retry", - groupId, error); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`{}` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", apiName(), groupId, error); unmapped.add(groupId); break; default: - log.error("Received unexpected error for group {} in `DescribeGroups` response", - groupId, error.exception()); - failed.put(groupId, error.exception( - "Received unexpected error for group " + groupId + " in `DescribeGroups` response")); + final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", + groupId, apiName()); + log.error(unexpectedErrorMsg, error.exception()); + failed.put(groupId, error.exception(unexpectedErrorMsg)); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java index d270145a423da..640265bd028bc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java @@ -159,21 +159,21 @@ private void handleError( switch (error) { case TRANSACTIONAL_ID_AUTHORIZATION_FAILED: failed.put(transactionalIdKey, new TransactionalIdAuthorizationException( - "DescribeTransactions request for transactionalId `" + transactionalIdKey.idValue + "` " + + apiName() + " request for transactionalId `" + transactionalIdKey.idValue + "` " + "failed due to authorization failure")); break; case TRANSACTIONAL_ID_NOT_FOUND: failed.put(transactionalIdKey, new TransactionalIdNotFoundException( - "DescribeTransactions request for transactionalId `" + transactionalIdKey.idValue + "` " + + apiName() + " request for transactionalId `" + transactionalIdKey.idValue + "` " + "failed because the ID could not be found")); break; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry - log.debug("DescribeTransactions request for transactionalId `{}` failed because the " + + log.debug("{} request for transactionalId `{}` failed because the " + "coordinator is still in the process of loading state. Will retry", - transactionalIdKey.idValue); + apiName(), transactionalIdKey.idValue); break; case NOT_COORDINATOR: @@ -181,12 +181,12 @@ private void handleError( // If the coordinator is unavailable or there was a coordinator change, then we unmap // the key so that we retry the `FindCoordinator` request unmapped.add(transactionalIdKey); - log.debug("DescribeTransactions request for transactionalId `{}` returned error {}. Will attempt " + - "to find the coordinator again and retry", transactionalIdKey.idValue, error); + log.debug("{} request for transactionalId `{}` returned error {}. Will attempt " + + "to find the coordinator again and retry", apiName(), transactionalIdKey.idValue, error); break; default: - failed.put(transactionalIdKey, error.exception("DescribeTransactions request for " + + failed.put(transactionalIdKey, error.exception(apiName() + " request for " + "transactionalId `" + transactionalIdKey.idValue + "` failed due to unexpected error")); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java index 4439bc3b02d76..9cf3d6e572110 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java @@ -125,23 +125,30 @@ private void handleError( ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `OffsetFetch` response", groupId, - error.exception()); + log.error("Received authorization failure for group {} in `{}` response", groupId, + apiName(), error.exception()); failed.put(groupId, error.exception()); break; + case COORDINATOR_LOAD_IN_PROGRESS: - case COORDINATOR_NOT_AVAILABLE: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`{}` request for group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", apiName(), groupId); break; + case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("OffsetFetch request for group {} returned error {}. Will retry", - groupId, error); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`{}` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", apiName(), groupId, error); unmapped.add(groupId); break; + default: - log.error("Received unexpected error for group {} in `OffsetFetch` response", - groupId, error.exception()); - failed.put(groupId, error.exception( - "Received unexpected error for group " + groupId + " in `OffsetFetch` response")); + final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", + groupId, apiName()); + log.error(unexpectedErrorMsg, error.exception()); + failed.put(groupId, error.exception(unexpectedErrorMsg)); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java index d60580c85bf22..2eb8f1247d6e7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java @@ -89,18 +89,18 @@ public ApiResult> h Errors error = Errors.forCode(response.data().errorCode()); if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) { - log.debug("The `ListTransactions` request sent to broker {} failed because the " + - "coordinator is still loading state. Will try again after backing off", brokerId); + log.debug("The `{}` request sent to broker {} failed because the " + + "coordinator is still loading state. Will try again after backing off", apiName(), brokerId); return ApiResult.empty(); } else if (error == Errors.COORDINATOR_NOT_AVAILABLE) { - log.debug("The `ListTransactions` request sent to broker {} failed because the " + - "coordinator is shutting down", brokerId); - return ApiResult.failed(key, new CoordinatorNotAvailableException("ListTransactions " + - "request sent to broker " + brokerId + " failed because the coordinator is shutting down")); + log.debug("The `{}` request sent to broker {} failed because the " + + "coordinator is shutting down", apiName(), brokerId); + return ApiResult.failed(key, new CoordinatorNotAvailableException(apiName() + + " request sent to broker " + brokerId + " failed because the coordinator is shutting down")); } else if (error != Errors.NONE) { - log.error("The `ListTransactions` request sent to broker {} failed because of an " + - "unexpected error {}", brokerId, error); - return ApiResult.failed(key, error.exception("ListTransactions request " + + log.error("The `{}` request sent to broker {} failed because of an " + + "unexpected error {}", apiName(), brokerId, error); + return ApiResult.failed(key, error.exception(apiName() + " request " + "sent to broker " + brokerId + " failed with an unexpected exception")); } else { List listings = response.data().transactionStates().stream() diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java index c6af2d4a3db7b..446eb2710cf69 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java @@ -109,24 +109,30 @@ private void handleError( ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `LeaveGroup` response", groupId, - error.exception()); + log.error("Received authorization failure for group {} in `{}` response", groupId, + apiName(), error.exception()); failed.put(groupId, error.exception()); break; + case COORDINATOR_LOAD_IN_PROGRESS: - case COORDINATOR_NOT_AVAILABLE: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`{}` request for group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", apiName(), groupId); break; + case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("LeaveGroup request for group {} returned error {}. Will retry", - groupId, error); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`{}` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", apiName(), groupId, error); unmapped.add(groupId); break; + default: - log.error("Received unexpected error for group {} in `LeaveGroup` response", - groupId, error.exception()); - failed.put(groupId, error.exception( - "Received unexpected error for group " + groupId + " in `LeaveGroup` response")); - break; + final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", + groupId, apiName()); + log.error(unexpectedErrorMsg, error.exception()); + failed.put(groupId, error.exception(unexpectedErrorMsg)); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 53e326ad8955f..f29ab4f872fe5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -2689,7 +2689,6 @@ public void testDescribeConsumerGroups() throws Exception { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); //Retriable FindCoordinatorResponse errors should be retried - env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); @@ -2707,21 +2706,12 @@ public void testDescribeConsumerGroups() throws Exception { Collections.emptySet())); env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); - data = new DescribeGroupsResponseData(); - data.groups().add(DescribeGroupsResponse.groupMetadata( - GROUP_ID, - Errors.COORDINATOR_NOT_AVAILABLE, - "", - "", - "", - Collections.emptyList(), - Collections.emptySet())); - env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); - /* * We need to return two responses here, one with NOT_COORDINATOR error when calling describe consumer group * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for COORDINATOR_NOT_AVAILABLE error response */ data = new DescribeGroupsResponseData(); data.groups().add(DescribeGroupsResponse.groupMetadata( @@ -2735,6 +2725,18 @@ public void testDescribeConsumerGroups() throws Exception { env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + data = new DescribeGroupsResponseData(); + data.groups().add(DescribeGroupsResponse.groupMetadata( + GROUP_ID, + Errors.COORDINATOR_NOT_AVAILABLE, + "", + "", + "", + Collections.emptyList(), + Collections.emptySet())); + env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + data = new DescribeGroupsResponseData(); TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1); @@ -2960,9 +2962,6 @@ public void testListConsumerGroupOffsetsRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - env.kafkaClient().prepareResponse( - new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); - env.kafkaClient().prepareResponse( new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap())); @@ -2970,6 +2969,8 @@ public void testListConsumerGroupOffsetsRetriableErrors() throws Exception { * We need to return two responses here, one for NOT_COORDINATOR call when calling list consumer offsets * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ env.kafkaClient().prepareResponse( new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap())); @@ -2977,6 +2978,12 @@ public void testListConsumerGroupOffsetsRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( + new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( new OffsetFetchResponse(Errors.NONE, Collections.emptyMap())); @@ -3015,22 +3022,26 @@ public void testListConsumerGroupOffsets() throws Exception { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); // Retriable FindCoordinatorResponse errors should be retried - env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); // Retriable errors should be retried - env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap())); /* * We need to return two responses here, one for NOT_COORDINATOR error when calling list consumer group offsets * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1); TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2); @@ -3156,7 +3167,6 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Arrays.asList(findCoordinatorV3, describeGroups))); //Retriable FindCoordinatorResponse errors should be retried - env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); @@ -3179,41 +3189,47 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { env.kafkaClient().prepareResponse( prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode())); - final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds); + DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds); TestUtils.assertFutureError(errorResult.deletedGroups().get("groupId"), GroupAuthorizationException.class); - - //Retriable errors should be retried + + // Retriable errors should be retried env.kafkaClient().prepareResponse( prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - final DeletableGroupResultCollection errorResponse1 = new DeletableGroupResultCollection(); - errorResponse1.add(new DeletableGroupResult() - .setGroupId("groupId") - .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) - ); - env.kafkaClient().prepareResponse(new DeleteGroupsResponse( - new DeleteGroupsResponseData() - .setResults(errorResponse1))); - - final DeletableGroupResultCollection errorResponse2 = new DeletableGroupResultCollection(); - errorResponse2.add(new DeletableGroupResult() - .setGroupId("groupId") - .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) + final DeletableGroupResultCollection errorResponse = new DeletableGroupResultCollection(); + errorResponse.add(new DeletableGroupResult() + .setGroupId("groupId") + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) ); env.kafkaClient().prepareResponse(new DeleteGroupsResponse( new DeleteGroupsResponseData() - .setResults(errorResponse2))); + .setResults(errorResponse))); /* * We need to return two responses here, one for NOT_COORDINATOR call when calling delete a consumer group * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ - final DeletableGroupResultCollection coordinatorMoved = new DeletableGroupResultCollection(); + + DeletableGroupResultCollection coordinatorMoved = new DeletableGroupResultCollection(); coordinatorMoved.add(new DeletableGroupResult() - .setGroupId("groupId") - .setErrorCode(Errors.NOT_COORDINATOR.code()) + .setGroupId("groupId") + .setErrorCode(Errors.NOT_COORDINATOR.code()) ); + + env.kafkaClient().prepareResponse(new DeleteGroupsResponse( + new DeleteGroupsResponseData() + .setResults(coordinatorMoved))); + env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + coordinatorMoved = new DeletableGroupResultCollection(); + coordinatorMoved.add(new DeletableGroupResult() + .setGroupId("groupId") + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + ); + env.kafkaClient().prepareResponse(new DeleteGroupsResponse( new DeleteGroupsResponseData() .setResults(coordinatorMoved))); @@ -3223,9 +3239,9 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { new DeleteGroupsResponseData() .setResults(validResponse))); - final DeleteConsumerGroupsResult errorResult1 = env.adminClient().deleteConsumerGroups(groupIds); + errorResult = env.adminClient().deleteConsumerGroups(groupIds); - final KafkaFuture errorResults = errorResult1.deletedGroups().get("groupId"); + final KafkaFuture errorResults = errorResult.deletedGroups().get("groupId"); assertNull(errorResults.get()); } } @@ -3401,9 +3417,6 @@ public void testDeleteConsumerGroupOffsetsRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - env.kafkaClient().prepareResponse( - prepareOffsetDeleteResponse(Errors.COORDINATOR_NOT_AVAILABLE)); - env.kafkaClient().prepareResponse( prepareOffsetDeleteResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS)); @@ -3411,6 +3424,8 @@ public void testDeleteConsumerGroupOffsetsRetriableErrors() throws Exception { * We need to return two responses here, one for NOT_COORDINATOR call when calling delete a consumer group * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ env.kafkaClient().prepareResponse( prepareOffsetDeleteResponse(Errors.NOT_COORDINATOR)); @@ -3418,6 +3433,12 @@ public void testDeleteConsumerGroupOffsetsRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( + prepareOffsetDeleteResponse(Errors.COORDINATOR_NOT_AVAILABLE)); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( prepareOffsetDeleteResponse("foo", 0, Errors.NONE)); @@ -3641,10 +3662,6 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - env.kafkaClient().prepareResponse( - new LeaveGroupResponse(new LeaveGroupResponseData() - .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()))); - env.kafkaClient().prepareResponse( new LeaveGroupResponse(new LeaveGroupResponseData() .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()))); @@ -3653,6 +3670,8 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception { * We need to return two responses here, one for NOT_COORDINATOR call when calling remove member * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ env.kafkaClient().prepareResponse( new LeaveGroupResponse(new LeaveGroupResponseData() @@ -3661,6 +3680,13 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( + new LeaveGroupResponse(new LeaveGroupResponseData() + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()))); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + MemberResponse memberResponse = new MemberResponse() .setGroupInstanceId("instance-1") .setErrorCode(Errors.NONE.code()); @@ -3719,13 +3745,10 @@ public void testRemoveMembersFromGroup() throws Exception { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); // Retriable FindCoordinatorResponse errors should be retried - env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); // Retriable errors should be retried - env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData() - .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()))); env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData() .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()))); @@ -4101,19 +4124,24 @@ public void testAlterConsumerGroupOffsetsRetriableErrors() throws Exception { prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); env.kafkaClient().prepareResponse( - prepareOffsetCommitResponse(tp1, Errors.COORDINATOR_NOT_AVAILABLE)); + prepareOffsetCommitResponse(tp1, Errors.COORDINATOR_LOAD_IN_PROGRESS)); - env.kafkaClient().prepareResponse( - prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + /* + * We need to return two responses here, one for NOT_COORDINATOR call when calling remove member + * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a + * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response + */ env.kafkaClient().prepareResponse( - prepareOffsetCommitResponse(tp1, Errors.COORDINATOR_LOAD_IN_PROGRESS)); + prepareOffsetCommitResponse(tp1, Errors.NOT_COORDINATOR)); env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); env.kafkaClient().prepareResponse( - prepareOffsetCommitResponse(tp1, Errors.NOT_COORDINATOR)); + prepareOffsetCommitResponse(tp1, Errors.COORDINATOR_NOT_AVAILABLE)); env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java index c20107f67aa74..4a1e029c2ffa3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java @@ -81,12 +81,16 @@ public void testSuccessfulHandleResponse() { } @Test - public void testRetriableHandleResponse() { + public void testUnmappedHandleResponse() { assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); - assertUnmapped(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } + @Test + public void testRetriableHandleResponse() { + assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); + } + @Test public void testFailedHandleResponse() { assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED)); @@ -102,6 +106,14 @@ private AdminApiHandler.ApiResult> h return handler.handleResponse(node, singleton(CoordinatorKey.byGroupId(groupId)), response); } + private void assertRetriable( + AdminApiHandler.ApiResult> result + ) { + assertEquals(emptySet(), result.completedKeys.keySet()); + assertEquals(emptySet(), result.failedKeys.keySet()); + assertEquals(emptyList(), result.unmappedKeys); + } + private void assertUnmapped( AdminApiHandler.ApiResult> result ) { diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java index 439b37733d98c..745d80e558d3b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java @@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Map; @@ -67,28 +68,58 @@ public void testBuildRequest() { @Test public void testSuccessfulHandleResponse() { Map responseData = Collections.singletonMap(t0p0, Errors.NONE); - assertCompleted(handleWithError(Errors.NONE), responseData); + assertCompleted(handleWithGroupError(Errors.NONE), responseData); } @Test public void testUnmappedHandleResponse() { - assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); + assertUnmapped(handleWithGroupError(Errors.NOT_COORDINATOR)); + assertUnmapped(handleWithGroupError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test public void testRetriableHandleResponse() { - assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); + assertRetriable(handleWithGroupError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); } @Test - public void testFailedHandleResponse() { - assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED)); - assertFailed(GroupIdNotFoundException.class, handleWithError(Errors.GROUP_ID_NOT_FOUND)); - assertFailed(InvalidGroupIdException.class, handleWithError(Errors.INVALID_GROUP_ID)); + public void testFailedHandleResponseWithGroupError() { + assertGroupFailed(GroupAuthorizationException.class, handleWithGroupError(Errors.GROUP_AUTHORIZATION_FAILED)); + assertGroupFailed(GroupIdNotFoundException.class, handleWithGroupError(Errors.GROUP_ID_NOT_FOUND)); + assertGroupFailed(InvalidGroupIdException.class, handleWithGroupError(Errors.INVALID_GROUP_ID)); } - private OffsetDeleteResponse buildResponse(Errors error) { + @Test + public void testFailedHandleResponseWithPartitionError() { + assertPartitionFailed(Collections.singletonMap(t0p0, Errors.GROUP_SUBSCRIBED_TO_TOPIC), + handleWithPartitionError(Errors.GROUP_SUBSCRIBED_TO_TOPIC)); + assertPartitionFailed(Collections.singletonMap(t0p0, Errors.TOPIC_AUTHORIZATION_FAILED), + handleWithPartitionError(Errors.TOPIC_AUTHORIZATION_FAILED)); + assertPartitionFailed(Collections.singletonMap(t0p0, Errors.UNKNOWN_TOPIC_OR_PARTITION), + handleWithPartitionError(Errors.UNKNOWN_TOPIC_OR_PARTITION)); + } + + private OffsetDeleteResponse buildGroupErrorResponse(Errors error) { + OffsetDeleteResponse response = new OffsetDeleteResponse( + new OffsetDeleteResponseData() + .setErrorCode(error.code())); + if (error == Errors.NONE) { + response.data() + .setThrottleTimeMs(0) + .setTopics(new OffsetDeleteResponseTopicCollection(singletonList( + new OffsetDeleteResponseTopic() + .setName("t0") + .setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList( + new OffsetDeleteResponsePartition() + .setPartitionIndex(0) + .setErrorCode(error.code()) + ).iterator())) + ).iterator())); + } + return response; + } + + private OffsetDeleteResponse buildPartitionErrorResponse(Errors error) { OffsetDeleteResponse response = new OffsetDeleteResponse( new OffsetDeleteResponseData() .setThrottleTimeMs(0) @@ -100,15 +131,24 @@ private OffsetDeleteResponse buildResponse(Errors error) { .setPartitionIndex(0) .setErrorCode(error.code()) ).iterator())) - ).iterator()))); + ).iterator())) + ); return response; } - private AdminApiHandler.ApiResult> handleWithError( + private AdminApiHandler.ApiResult> handleWithGroupError( Errors error ) { DeleteConsumerGroupOffsetsHandler handler = new DeleteConsumerGroupOffsetsHandler(groupId, tps, logContext); - OffsetDeleteResponse response = buildResponse(error); + OffsetDeleteResponse response = buildGroupErrorResponse(error); + return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response); + } + + private AdminApiHandler.ApiResult> handleWithPartitionError( + Errors error + ) { + DeleteConsumerGroupOffsetsHandler handler = new DeleteConsumerGroupOffsetsHandler(groupId, tps, logContext); + OffsetDeleteResponse response = buildPartitionErrorResponse(error); return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response); } @@ -139,7 +179,7 @@ private void assertCompleted( assertEquals(expected, result.completedKeys.get(key)); } - private void assertFailed( + private void assertGroupFailed( Class expectedExceptionType, AdminApiHandler.ApiResult> result ) { @@ -149,4 +189,21 @@ private void assertFailed( assertEquals(singleton(key), result.failedKeys.keySet()); assertTrue(expectedExceptionType.isInstance(result.failedKeys.get(key))); } + + private void assertPartitionFailed( + Map expectedResult, + AdminApiHandler.ApiResult> result + ) { + CoordinatorKey key = CoordinatorKey.byGroupId(groupId); + assertEquals(singleton(key), result.completedKeys.keySet()); + + // verify the completed value is expected result + Collection> completeCollection = result.completedKeys.values(); + assertEquals(1, completeCollection.size()); + Map completeMap = completeCollection.iterator().next(); + assertEquals(expectedResult, completeMap); + + assertEquals(emptyList(), result.unmappedKeys); + assertEquals(emptySet(), result.failedKeys.keySet()); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java index 30a65f4c8fcc6..85aeaac5e551c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java @@ -57,12 +57,12 @@ public void testSuccessfulHandleResponse() { @Test public void testUnmappedHandleResponse() { assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); + assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test public void testRetriableHandleResponse() { assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java index fe26043613773..8cf7ef86d771d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java @@ -105,12 +105,12 @@ public void testSuccessfulHandleResponse() { @Test public void testUnmappedHandleResponse() { assertUnmapped(handleWithError(Errors.NOT_COORDINATOR, "")); + assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE, "")); } @Test public void testRetriableHandleResponse() { assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS, "")); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE, "")); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java index 5c98940645199..cbe78217cb43f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java @@ -70,12 +70,12 @@ public void testSuccessfulHandleResponse() { @Test public void testUnmappedHandleResponse() { assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); + assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test public void testRetriableHandleResponse() { assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java index 0ffa43b4c0a80..854bbe7ab3c4e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java @@ -69,12 +69,12 @@ public void testSuccessfulHandleResponse() { @Test public void testUnmappedHandleResponse() { assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); + assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test public void testRetriableHandleResponse() { assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test