From ea67545d78e804992b186e5fd30ccaf4a3720c10 Mon Sep 17 00:00:00 2001 From: dengziming Date: Thu, 12 Aug 2021 11:13:43 +0800 Subject: [PATCH 1/3] logic about inconsistent cluster id --- .../apache/kafka/raft/KafkaRaftClient.java | 39 +++--- .../java/org/apache/kafka/raft/RaftUtil.java | 16 +++ .../raft/KafkaRaftClientSnapshotTest.java | 44 ++++++- .../kafka/raft/KafkaRaftClientTest.java | 118 ++++++++++++++++++ 4 files changed, 199 insertions(+), 18 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index dced48da39a0d..bee2fddc5b961 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -17,8 +17,8 @@ package org.apache.kafka.raft; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ClusterAuthorizationException; +import org.apache.kafka.common.errors.InconsistentClusterIdException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.message.BeginQuorumEpochRequestData; @@ -93,6 +93,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -151,6 +152,7 @@ public class KafkaRaftClient implements RaftClient { private final Time time; private final int fetchMaxWaitMs; private final String clusterId; + private final AtomicBoolean clusterIdAcknowledged; private final NetworkChannel channel; private final ReplicatedLog log; private final Random random; @@ -230,6 +232,7 @@ public KafkaRaftClient( this.appendPurgatory = new ThresholdPurgatory<>(expirationService); this.time = time; this.clusterId = clusterId; + this.clusterIdAcknowledged = new AtomicBoolean(false); this.fetchMaxWaitMs = fetchMaxWaitMs; this.logger = logContext.logger(KafkaRaftClient.class); this.random = random; @@ -592,6 +595,8 @@ private boolean handleVoteResponse( return handleTopLevelError(topLevelError, responseMetadata); } + recordAcknowledgedClusterId(); + if (!hasValidTopicPartition(response, log.topicPartition())) { return false; } @@ -721,6 +726,8 @@ private boolean handleBeginQuorumEpochResponse( return handleTopLevelError(topLevelError, responseMetadata); } + recordAcknowledgedClusterId(); + if (!hasValidTopicPartition(response, log.topicPartition())) { return false; } @@ -833,6 +840,8 @@ private boolean handleEndQuorumEpochResponse( return handleTopLevelError(topLevelError, responseMetadata); } + recordAcknowledgedClusterId(); + if (!hasValidTopicPartition(response, log.topicPartition())) { return false; } @@ -912,6 +921,10 @@ private boolean hasValidClusterId(String requestClusterId) { return clusterId.equals(requestClusterId); } + private void recordAcknowledgedClusterId() { + clusterIdAcknowledged.set(true); + } + /** * Handle a Fetch request. The fetch offset and last fetched epoch are always * validated against the current log. In the case that they do not match, the response will @@ -1047,6 +1060,8 @@ private boolean handleFetchResponse( return handleTopLevelError(topLevelError, responseMetadata); } + recordAcknowledgedClusterId(); + if (!RaftUtil.hasValidTopicPartition(response, log.topicPartition(), log.topicId())) { return false; } @@ -1203,25 +1218,12 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest( return new FetchSnapshotResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()); } - if (data.topics().size() != 1 && data.topics().get(0).partitions().size() != 1) { + if (!RaftUtil.hasValidTopicPartition(data, log.topicPartition())) { return FetchSnapshotResponse.withTopLevelError(Errors.INVALID_REQUEST); } Optional partitionSnapshotOpt = FetchSnapshotRequest .forTopicPartition(data, log.topicPartition()); - if (!partitionSnapshotOpt.isPresent()) { - // The Raft client assumes that there is only one topic partition. - TopicPartition unknownTopicPartition = new TopicPartition( - data.topics().get(0).name(), - data.topics().get(0).partitions().get(0).partition() - ); - - return FetchSnapshotResponse.singleton( - unknownTopicPartition, - responsePartitionSnapshot -> responsePartitionSnapshot - .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) - ); - } FetchSnapshotRequestData.PartitionSnapshot partitionSnapshot = partitionSnapshotOpt.get(); Optional leaderValidation = validateLeaderOnlyRequest( @@ -1304,7 +1306,9 @@ private boolean handleFetchSnapshotResponse( return handleTopLevelError(topLevelError, responseMetadata); } - if (data.topics().size() != 1 && data.topics().get(0).partitions().size() != 1) { + recordAcknowledgedClusterId(); + + if (!RaftUtil.hasValidTopicPartition(data, log.topicPartition())) { return false; } @@ -1526,6 +1530,9 @@ private boolean handleTopLevelError(Errors error, RaftResponse.Inbound response) return false; } else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED) { throw new ClusterAuthorizationException("Received cluster authorization error in response " + response); + } else if (error == Errors.INCONSISTENT_CLUSTER_ID && !clusterIdAcknowledged.get()) { + // When handling a response, invalid cluster id are fatal unless a previous response contained a valid cluster id. + throw new InconsistentClusterIdException("Received inconsistent clusterId error in response " + response); } else { return handleUnexpectedError(error, response); } diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java index 9ff03617e63fa..67ce1e5ad1032 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java @@ -25,6 +25,8 @@ import org.apache.kafka.common.message.EndQuorumEpochResponseData; import org.apache.kafka.common.message.FetchRequestData; import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.FetchSnapshotRequestData; +import org.apache.kafka.common.message.FetchSnapshotResponseData; import org.apache.kafka.common.message.VoteRequestData; import org.apache.kafka.common.message.VoteResponseData; import org.apache.kafka.common.protocol.ApiKeys; @@ -159,4 +161,18 @@ static boolean hasValidTopicPartition(DescribeQuorumRequestData data, TopicParti data.topics().get(0).partitions().size() == 1 && data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); } + + static boolean hasValidTopicPartition(FetchSnapshotRequestData data, TopicPartition topicPartition) { + return data.topics().size() == 1 && + data.topics().get(0).name().equals(topicPartition.topic()) && + data.topics().get(0).partitions().size() == 1 && + data.topics().get(0).partitions().get(0).partition() == topicPartition.partition(); + } + + static boolean hasValidTopicPartition(FetchSnapshotResponseData data, TopicPartition topicPartition) { + return data.topics().size() == 1 && + data.topics().get(0).name().equals(topicPartition.topic()) && + data.topics().get(0).partitions().size() == 1 && + data.topics().get(0).partitions().get(0).index() == topicPartition.partition(); + } } diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java index 754c2e2c55e3d..f5ea3748ab202 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java @@ -47,6 +47,7 @@ import java.util.OptionalInt; import java.util.Set; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -517,8 +518,7 @@ public void testFetchSnapshotRequestUnknownPartition() throws Exception { context.client.poll(); - FetchSnapshotResponseData.PartitionSnapshot response = context.assertSentFetchSnapshotResponse(topicPartition).get(); - assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.forCode(response.errorCode())); + context.assertSentFetchSnapshotResponse(Errors.INVALID_REQUEST); } @Test @@ -1618,6 +1618,46 @@ public void testCreateSnapshotAsFollowerWithInvalidSnapshotId() throws Exception assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId4.offset, invalidSnapshotId4.epoch, 0)); } + @Test + public void testInconsistentClusterIdInFetchSnapshotResponse() throws Exception { + int localId = 0; + int leaderId = localId + 1; + Set voters = Utils.mkSet(localId, leaderId); + int epoch = 2; + OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withElectedLeader(epoch, leaderId) + .build(); + + // Send a request + context.pollUntilRequest(); + RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); + + // Firstly receive a response with a valid cluster id + context.deliverResponse( + fetchRequest.correlationId, + fetchRequest.destinationId(), + snapshotFetchResponse(context.metadataPartition, context.metadataTopicId, epoch, leaderId, snapshotId, 200L) + ); + + // Send fetch snapshot request + context.pollUntilRequest(); + RaftRequest.Outbound snapshotRequest = context.assertSentFetchSnapshotRequest(); + + // Secondly receive a response with an inconsistent cluster id + context.deliverResponse( + snapshotRequest.correlationId, + snapshotRequest.destinationId(), + new FetchSnapshotResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()) + ); + + // Inconsistent cluster id are not fatal if a previous response contained a valid cluster id + assertDoesNotThrow(context.client::poll); + + // It's impossible to receive a be begin quorum response before any other request so we don't test + } + private static FetchSnapshotRequestData fetchSnapshotRequest( TopicPartition topicPartition, int epoch, diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 9b2771d2b34e4..129d46fd76755 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.raft; import org.apache.kafka.common.errors.ClusterAuthorizationException; +import org.apache.kafka.common.errors.InconsistentClusterIdException; import org.apache.kafka.common.errors.RecordBatchTooLargeException; import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.message.BeginQuorumEpochResponseData; @@ -57,6 +58,7 @@ import static java.util.Collections.singletonList; import static org.apache.kafka.raft.RaftClientTestContext.Builder.DEFAULT_ELECTION_TIMEOUT_MS; import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -1559,6 +1561,122 @@ public void testEndQuorumEpochRequestClusterIdValidation() throws Exception { context.assertSentEndQuorumEpochResponse(Errors.INCONSISTENT_CLUSTER_ID); } + @Test + public void testInconsistentClusterIdInFetchResponse() throws Exception { + int localId = 0; + int otherNodeId = 1; + int epoch = 5; + Set voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withElectedLeader(epoch, otherNodeId) + .build(); + + // Send a request + context.pollUntilRequest(); + int correlationId = context.assertSentFetchRequest(epoch, 0L, 0); + + // Firstly receive a response with a valid cluster id + FetchResponseData fetchResponse = context.fetchResponse(epoch, otherNodeId, + MemoryRecords.EMPTY, 0L, Errors.NONE); + context.deliverResponse(correlationId, otherNodeId, fetchResponse); + + // Send fetch request + context.pollUntilRequest(); + correlationId = context.assertSentFetchRequest(epoch, 0L, 0); + + // Secondly receive a response with an inconsistent cluster id + context.deliverResponse(correlationId, otherNodeId, + new FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())); + + // Inconsistent cluster id are not fatal if a previous response contained a valid cluster id + assertDoesNotThrow(context.client::poll); + + // This time we receive a inconsistent cluster id directly + context = new RaftClientTestContext.Builder(localId, voters) + .withElectedLeader(epoch, otherNodeId) + .build(); + + context.pollUntilRequest(); + correlationId = context.assertSentFetchRequest(epoch, 0L, 0); + context.deliverResponse(correlationId, otherNodeId, + new FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())); + // Inconsistent cluster id are not fatal if a previous response contained a valid cluster id + assertThrows(InconsistentClusterIdException.class, context.client::poll); + } + + @Test + public void testInconsistentClusterIdInBeginQuorumResponse() throws Exception { + int localId = 0; + int otherNodeId = 1; + int epoch = 5; + Set voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withVotedCandidate(epoch, localId) + .build(); + context.assertVotedCandidate(epoch, localId); + + // Send a request + context.pollUntilRequest(); + int correlationId = context.assertSentVoteRequest(epoch, 0, 0L, 1); + + // Firstly receive a response with a valid cluster id + context.deliverResponse(correlationId, otherNodeId, context.voteResponse(true, Optional.empty(), epoch)); + context.client.poll(); + context.assertElectedLeader(epoch, localId); + + // Send begin quorum request + context.pollUntilRequest(); + correlationId = context.assertSentBeginQuorumEpochRequest(epoch, 1); + + // Secondly receive a response with an inconsistent cluster id + context.deliverResponse(correlationId, otherNodeId, + new BeginQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())); + + // Inconsistent cluster id are not fatal if a previous response contained a valid cluster id + assertDoesNotThrow(context.client::poll); + + // It's impossible to receive a be begin quorum response before any other request so we don't test + } + + @Test + public void testInconsistentClusterIdInEndQuorumResponse() throws Exception { + int localId = 0; + int otherNodeId = 1; + int epoch = 5; + Set voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withVotedCandidate(epoch, localId) + .build(); + context.assertVotedCandidate(epoch, localId); + + // Send a request + context.pollUntilRequest(); + int correlationId = context.assertSentVoteRequest(epoch, 0, 0L, 1); + + // Firstly receive a response with a valid cluster id + context.deliverResponse(correlationId, otherNodeId, context.voteResponse(true, Optional.empty(), epoch)); + context.client.poll(); + context.assertElectedLeader(epoch, localId); + + context.client.shutdown(5000); + + // Send end quorum request + context.pollUntilRequest(); + correlationId = context.assertSentEndQuorumEpochRequest(epoch, 1); + + // Secondly receive a response with an inconsistent cluster id + context.deliverResponse(correlationId, otherNodeId, + new EndQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())); + + // Inconsistent cluster id are not fatal if a previous response contained a valid cluster id + assertDoesNotThrow(context.client::poll); + + // It's impossible to receive a be end quorum response before any other request so we don't test + } + @Test public void testVoterOnlyRequestValidation() throws Exception { int localId = 0; From 67edb7e12508a5f324b58d8652cc1d832631acf3 Mon Sep 17 00:00:00 2001 From: dengziming Date: Mon, 7 Feb 2022 20:36:44 +0800 Subject: [PATCH 2/3] resolve feedbacks --- checkstyle/suppressions.xml | 2 +- .../apache/kafka/raft/KafkaRaftClient.java | 54 +- .../java/org/apache/kafka/raft/RaftUtil.java | 165 ++-- .../raft/KafkaRaftClientSnapshotTest.java | 4 +- .../kafka/raft/KafkaRaftClientTest.java | 6 +- .../kafka/raft/RaftClientTestContext.java | 6 +- .../org/apache/kafka/raft/RaftUtilTest.java | 835 ++++++++++++++++++ 7 files changed, 986 insertions(+), 86 deletions(-) create mode 100644 raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index f1007dc4c5c64..5e160b317c4da 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -60,7 +60,7 @@ files="MemoryRecordsBuilder.java"/> + files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest|RaftUtilTest).java"/> diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index bee2fddc5b961..d4b3d9f9f7557 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -99,7 +99,7 @@ import java.util.stream.Collectors; import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition; +import static org.apache.kafka.raft.RaftUtil.validateTopicPartition; /** * This class implements a Kafkaesque version of the Raft protocol. Leader election @@ -537,6 +537,7 @@ private VoteResponseData buildVoteResponse(Errors partitionLevelError, boolean v * - {@link Errors#INCONSISTENT_VOTER_SET} if the request suggests inconsistent voter membership (e.g. * if this node or the sender is not one of the current known voters) * - {@link Errors#INVALID_REQUEST} if the last epoch or offset are invalid + * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} if the topic or partition doesn't match metadata topic */ private VoteResponseData handleVoteRequest( RaftRequest.Inbound requestMetadata @@ -547,9 +548,10 @@ private VoteResponseData handleVoteRequest( return new VoteResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()); } - if (!hasValidTopicPartition(request, log.topicPartition())) { + Errors topError = validateTopicPartition(request, log.topicPartition()); + if (topError != Errors.NONE) { // Until we support multi-raft, we treat individual topic partition mismatches as invalid requests - return new VoteResponseData().setErrorCode(Errors.INVALID_REQUEST.code()); + return new VoteResponseData().setErrorCode(topError.code()); } VoteRequestData.PartitionData partitionRequest = @@ -597,7 +599,7 @@ private boolean handleVoteResponse( recordAcknowledgedClusterId(); - if (!hasValidTopicPartition(response, log.topicPartition())) { + if (validateTopicPartition(response, log.topicPartition()) != Errors.NONE) { return false; } @@ -684,6 +686,7 @@ private BeginQuorumEpochResponseData buildBeginQuorumEpochResponse(Errors partit * - {@link Errors#INCONSISTENT_VOTER_SET} if the request suggests inconsistent voter membership (e.g. * if this node or the sender is not one of the current known voters) * - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch + * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} if the topic or partition doesn't match metadata topic */ private BeginQuorumEpochResponseData handleBeginQuorumEpochRequest( RaftRequest.Inbound requestMetadata, @@ -695,9 +698,10 @@ private BeginQuorumEpochResponseData handleBeginQuorumEpochRequest( return new BeginQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()); } - if (!hasValidTopicPartition(request, log.topicPartition())) { + Errors topError = validateTopicPartition(request, log.topicPartition()); + if (topError != Errors.NONE) { // Until we support multi-raft, we treat topic partition mismatches as invalid requests - return new BeginQuorumEpochResponseData().setErrorCode(Errors.INVALID_REQUEST.code()); + return new BeginQuorumEpochResponseData().setErrorCode(topError.code()); } BeginQuorumEpochRequestData.PartitionData partitionRequest = @@ -728,7 +732,7 @@ private boolean handleBeginQuorumEpochResponse( recordAcknowledgedClusterId(); - if (!hasValidTopicPartition(response, log.topicPartition())) { + if (validateTopicPartition(response, log.topicPartition()) != Errors.NONE) { return false; } @@ -775,6 +779,7 @@ private EndQuorumEpochResponseData buildEndQuorumEpochResponse(Errors partitionL * - {@link Errors#INCONSISTENT_VOTER_SET} if the request suggests inconsistent voter membership (e.g. * if this node or the sender is not one of the current known voters) * - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch + * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} if the topic or partition doesn't match metadata topic */ private EndQuorumEpochResponseData handleEndQuorumEpochRequest( RaftRequest.Inbound requestMetadata, @@ -786,9 +791,10 @@ private EndQuorumEpochResponseData handleEndQuorumEpochRequest( return new EndQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()); } - if (!hasValidTopicPartition(request, log.topicPartition())) { + Errors topError = validateTopicPartition(request, log.topicPartition()); + if (topError != Errors.NONE) { // Until we support multi-raft, we treat topic partition mismatches as invalid requests - return new EndQuorumEpochResponseData().setErrorCode(Errors.INVALID_REQUEST.code()); + return new EndQuorumEpochResponseData().setErrorCode(topError.code()); } EndQuorumEpochRequestData.PartitionData partitionRequest = @@ -842,7 +848,7 @@ private boolean handleEndQuorumEpochResponse( recordAcknowledgedClusterId(); - if (!hasValidTopicPartition(response, log.topicPartition())) { + if (validateTopicPartition(response, log.topicPartition()) != Errors.NONE) { return false; } @@ -939,6 +945,7 @@ private void recordAcknowledgedClusterId() { * - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch * - {@link Errors#INVALID_REQUEST} if the request epoch is larger than the leader's current epoch * or if either the fetch offset or the last fetched epoch is invalid + * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} if the topic or partition doesn't match metadata topic */ private CompletableFuture handleFetchRequest( RaftRequest.Inbound requestMetadata, @@ -950,9 +957,10 @@ private CompletableFuture handleFetchRequest( return completedFuture(new FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())); } - if (!hasValidTopicPartition(request, log.topicPartition(), log.topicId())) { + Errors topError = validateTopicPartition(request, log.topicPartition(), log.topicId()); + if (topError != Errors.NONE) { // Until we support multi-raft, we treat topic partition mismatches as invalid requests - return completedFuture(new FetchResponseData().setErrorCode(Errors.INVALID_REQUEST.code())); + return completedFuture(new FetchResponseData().setErrorCode(topError.code())); } // If the ID is valid, we can set the topic name. request.topics().get(0).setTopic(log.topicPartition().topic()); @@ -1062,7 +1070,7 @@ private boolean handleFetchResponse( recordAcknowledgedClusterId(); - if (!RaftUtil.hasValidTopicPartition(response, log.topicPartition(), log.topicId())) { + if (validateTopicPartition(response, log.topicPartition(), log.topicId()) != Errors.NONE) { return false; } // If the ID is valid, we can set the topic name. @@ -1175,9 +1183,10 @@ private DescribeQuorumResponseData handleDescribeQuorumRequest( long currentTimeMs ) { DescribeQuorumRequestData describeQuorumRequestData = (DescribeQuorumRequestData) requestMetadata.data; - if (!hasValidTopicPartition(describeQuorumRequestData, log.topicPartition())) { - return DescribeQuorumRequest.getPartitionLevelErrorResponse( - describeQuorumRequestData, Errors.UNKNOWN_TOPIC_OR_PARTITION); + + Errors topError = validateTopicPartition(describeQuorumRequestData, log.topicPartition()); + if (topError != Errors.NONE) { + return DescribeQuorumRequest.getPartitionLevelErrorResponse(describeQuorumRequestData, topError); } if (!quorum.isLeader()) { @@ -1208,6 +1217,7 @@ private DescribeQuorumResponseData handleDescribeQuorumRequest( * or if either the fetch offset or the last fetched epoch is invalid * - {@link Errors#SNAPSHOT_NOT_FOUND} if the request snapshot id does not exists * - {@link Errors#POSITION_OUT_OF_RANGE} if the request snapshot offset out of range + * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} if the topic or partition doesn't match metadata topic */ private FetchSnapshotResponseData handleFetchSnapshotRequest( RaftRequest.Inbound requestMetadata @@ -1218,14 +1228,14 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest( return new FetchSnapshotResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()); } - if (!RaftUtil.hasValidTopicPartition(data, log.topicPartition())) { - return FetchSnapshotResponse.withTopLevelError(Errors.INVALID_REQUEST); + Errors topError = validateTopicPartition(data, log.topicPartition()); + if (topError != Errors.NONE) { + return FetchSnapshotResponse.withTopLevelError(topError); } - Optional partitionSnapshotOpt = FetchSnapshotRequest - .forTopicPartition(data, log.topicPartition()); + FetchSnapshotRequestData.PartitionSnapshot partitionSnapshot = FetchSnapshotRequest + .forTopicPartition(data, log.topicPartition()).get(); - FetchSnapshotRequestData.PartitionSnapshot partitionSnapshot = partitionSnapshotOpt.get(); Optional leaderValidation = validateLeaderOnlyRequest( partitionSnapshot.currentLeaderEpoch() ); @@ -1308,7 +1318,7 @@ private boolean handleFetchSnapshotResponse( recordAcknowledgedClusterId(); - if (!RaftUtil.hasValidTopicPartition(data, log.topicPartition())) { + if (validateTopicPartition(data, log.topicPartition()) != Errors.NONE) { return false; } diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java index 67ce1e5ad1032..45c237440e242 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java @@ -99,80 +99,135 @@ public static FetchResponseData singletonFetchResponse( .setResponses(Collections.singletonList(fetchableTopic)); } - static boolean hasValidTopicPartition(FetchRequestData data, TopicPartition topicPartition, Uuid topicId) { - return data.topics().size() == 1 && - data.topics().get(0).topicId().equals(topicId) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partition() == topicPartition.partition(); + static Errors validateTopicPartition(FetchRequestData data, TopicPartition topicPartition, Uuid topicId) { + if (data.topics().size() != 1 || + data.topics().get(0).partitions().size() != 1) { + return Errors.INVALID_REQUEST; + } + if (!data.topics().get(0).topicId().equals(topicId) || + data.topics().get(0).partitions().get(0).partition() != topicPartition.partition()) { + return Errors.UNKNOWN_TOPIC_OR_PARTITION; + } + return Errors.NONE; } - static boolean hasValidTopicPartition(FetchResponseData data, TopicPartition topicPartition, Uuid topicId) { - return data.responses().size() == 1 && - data.responses().get(0).topicId().equals(topicId) && - data.responses().get(0).partitions().size() == 1 && - data.responses().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + static Errors validateTopicPartition(FetchResponseData data, TopicPartition topicPartition, Uuid topicId) { + if (data.responses().size() != 1 || + data.responses().get(0).partitions().size() != 1) { + return Errors.INVALID_REQUEST; + } + if (!data.responses().get(0).topicId().equals(topicId) || + data.responses().get(0).partitions().get(0).partitionIndex() != topicPartition.partition()) { + return Errors.UNKNOWN_TOPIC_OR_PARTITION; + } + return Errors.NONE; } - static boolean hasValidTopicPartition(VoteResponseData data, TopicPartition topicPartition) { - return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + static Errors validateTopicPartition(VoteResponseData data, TopicPartition topicPartition) { + if (data.topics().size() != 1 || + data.topics().get(0).partitions().size() != 1) { + return Errors.INVALID_REQUEST; + } + if (!data.topics().get(0).topicName().equals(topicPartition.topic()) || + data.topics().get(0).partitions().get(0).partitionIndex() != topicPartition.partition()) { + return Errors.UNKNOWN_TOPIC_OR_PARTITION; + } + return Errors.NONE; } - static boolean hasValidTopicPartition(VoteRequestData data, TopicPartition topicPartition) { - return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + static Errors validateTopicPartition(VoteRequestData data, TopicPartition topicPartition) { + if (data.topics().size() != 1 || + data.topics().get(0).partitions().size() != 1) { + return Errors.INVALID_REQUEST; + } + if (!data.topics().get(0).topicName().equals(topicPartition.topic()) || + data.topics().get(0).partitions().get(0).partitionIndex() != topicPartition.partition()) { + return Errors.UNKNOWN_TOPIC_OR_PARTITION; + } + return Errors.NONE; } - static boolean hasValidTopicPartition(BeginQuorumEpochRequestData data, TopicPartition topicPartition) { - return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + static Errors validateTopicPartition(BeginQuorumEpochRequestData data, TopicPartition topicPartition) { + if (data.topics().size() != 1 || + data.topics().get(0).partitions().size() != 1) { + return Errors.INVALID_REQUEST; + } + if (!data.topics().get(0).topicName().equals(topicPartition.topic()) || + data.topics().get(0).partitions().get(0).partitionIndex() != topicPartition.partition()) { + return Errors.UNKNOWN_TOPIC_OR_PARTITION; + } + return Errors.NONE; } - static boolean hasValidTopicPartition(BeginQuorumEpochResponseData data, TopicPartition topicPartition) { - return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + static Errors validateTopicPartition(BeginQuorumEpochResponseData data, TopicPartition topicPartition) { + if (data.topics().size() != 1 || + data.topics().get(0).partitions().size() != 1) { + return Errors.INVALID_REQUEST; + } + if (!data.topics().get(0).topicName().equals(topicPartition.topic()) || + data.topics().get(0).partitions().get(0).partitionIndex() != topicPartition.partition()) { + return Errors.UNKNOWN_TOPIC_OR_PARTITION; + } + return Errors.NONE; } - static boolean hasValidTopicPartition(EndQuorumEpochRequestData data, TopicPartition topicPartition) { - return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + static Errors validateTopicPartition(EndQuorumEpochRequestData data, TopicPartition topicPartition) { + if (data.topics().size() != 1 || + data.topics().get(0).partitions().size() != 1) { + return Errors.INVALID_REQUEST; + } + if (!data.topics().get(0).topicName().equals(topicPartition.topic()) || + data.topics().get(0).partitions().get(0).partitionIndex() != topicPartition.partition()) { + return Errors.UNKNOWN_TOPIC_OR_PARTITION; + } + return Errors.NONE; } - static boolean hasValidTopicPartition(EndQuorumEpochResponseData data, TopicPartition topicPartition) { - return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + static Errors validateTopicPartition(EndQuorumEpochResponseData data, TopicPartition topicPartition) { + if (data.topics().size() != 1 || + data.topics().get(0).partitions().size() != 1) { + return Errors.INVALID_REQUEST; + } + if (!data.topics().get(0).topicName().equals(topicPartition.topic()) || + data.topics().get(0).partitions().get(0).partitionIndex() != topicPartition.partition()) { + return Errors.UNKNOWN_TOPIC_OR_PARTITION; + } + return Errors.NONE; } - static boolean hasValidTopicPartition(DescribeQuorumRequestData data, TopicPartition topicPartition) { - return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + static Errors validateTopicPartition(DescribeQuorumRequestData data, TopicPartition topicPartition) { + if (data.topics().size() != 1 || + data.topics().get(0).partitions().size() != 1) { + return Errors.INVALID_REQUEST; + } + if (!data.topics().get(0).topicName().equals(topicPartition.topic()) || + data.topics().get(0).partitions().get(0).partitionIndex() != topicPartition.partition()) { + return Errors.UNKNOWN_TOPIC_OR_PARTITION; + } + return Errors.NONE; } - static boolean hasValidTopicPartition(FetchSnapshotRequestData data, TopicPartition topicPartition) { - return data.topics().size() == 1 && - data.topics().get(0).name().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partition() == topicPartition.partition(); + static Errors validateTopicPartition(FetchSnapshotRequestData data, TopicPartition topicPartition) { + if (data.topics().size() != 1 || + data.topics().get(0).partitions().size() != 1) { + return Errors.INVALID_REQUEST; + } + if (!data.topics().get(0).name().equals(topicPartition.topic()) || + data.topics().get(0).partitions().get(0).partition() != topicPartition.partition()) { + return Errors.UNKNOWN_TOPIC_OR_PARTITION; + } + return Errors.NONE; } - static boolean hasValidTopicPartition(FetchSnapshotResponseData data, TopicPartition topicPartition) { - return data.topics().size() == 1 && - data.topics().get(0).name().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).index() == topicPartition.partition(); + static Errors validateTopicPartition(FetchSnapshotResponseData data, TopicPartition topicPartition) { + if (data.topics().size() != 1 || + data.topics().get(0).partitions().size() != 1) { + return Errors.INVALID_REQUEST; + } + if (!data.topics().get(0).name().equals(topicPartition.topic()) || + data.topics().get(0).partitions().get(0).index() != topicPartition.partition()) { + return Errors.UNKNOWN_TOPIC_OR_PARTITION; + } + return Errors.NONE; } } diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java index f5ea3748ab202..2aa9f1bb486fc 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java @@ -518,7 +518,7 @@ public void testFetchSnapshotRequestUnknownPartition() throws Exception { context.client.poll(); - context.assertSentFetchSnapshotResponse(Errors.INVALID_REQUEST); + context.assertSentFetchSnapshotResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION); } @Test @@ -1655,7 +1655,7 @@ public void testInconsistentClusterIdInFetchSnapshotResponse() throws Exception // Inconsistent cluster id are not fatal if a previous response contained a valid cluster id assertDoesNotThrow(context.client::poll); - // It's impossible to receive a be begin quorum response before any other request so we don't test + // It's impossible to receive a FetchSnapshotResponse before any other request so we don't test } private static FetchSnapshotRequestData fetchSnapshotRequest( diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 129d46fd76755..d814fe8a0d444 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -1601,7 +1601,7 @@ public void testInconsistentClusterIdInFetchResponse() throws Exception { correlationId = context.assertSentFetchRequest(epoch, 0L, 0); context.deliverResponse(correlationId, otherNodeId, new FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())); - // Inconsistent cluster id are not fatal if a previous response contained a valid cluster id + // Inconsistent cluster id are fatal if this is the first rpc request assertThrows(InconsistentClusterIdException.class, context.client::poll); } @@ -1637,7 +1637,7 @@ public void testInconsistentClusterIdInBeginQuorumResponse() throws Exception { // Inconsistent cluster id are not fatal if a previous response contained a valid cluster id assertDoesNotThrow(context.client::poll); - // It's impossible to receive a be begin quorum response before any other request so we don't test + // It's impossible to receive a BeginQuorumEpochResponse before any other request so we don't test } @Test @@ -1674,7 +1674,7 @@ public void testInconsistentClusterIdInEndQuorumResponse() throws Exception { // Inconsistent cluster id are not fatal if a previous response contained a valid cluster id assertDoesNotThrow(context.client::poll); - // It's impossible to receive a be end quorum response before any other request so we don't test + // It's impossible to receive an EndQuorumEpochResponse before any other request so we don't test } @Test diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index d48e41fb31d0c..6e83be7145a24 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -78,7 +78,7 @@ import java.util.Set; import java.util.stream.Collectors; -import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition; +import static org.apache.kafka.raft.RaftUtil.validateTopicPartition; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -496,7 +496,7 @@ void assertSentVoteResponse( RaftMessage raftMessage = sentMessages.get(0); assertTrue(raftMessage.data() instanceof VoteResponseData); VoteResponseData response = (VoteResponseData) raftMessage.data(); - assertTrue(hasValidTopicPartition(response, metadataPartition)); + assertEquals(Errors.NONE, validateTopicPartition(response, metadataPartition)); VoteResponseData.PartitionData partitionResponse = response.topics().get(0).partitions().get(0); @@ -914,7 +914,7 @@ VoteResponseData voteResponse(boolean voteGranted, Optional leaderId, i } private VoteRequestData.PartitionData unwrap(VoteRequestData voteRequest) { - assertTrue(RaftUtil.hasValidTopicPartition(voteRequest, metadataPartition)); + assertEquals(Errors.NONE, validateTopicPartition(voteRequest, metadataPartition)); return voteRequest.topics().get(0).partitions().get(0); } diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java new file mode 100644 index 0000000000000..59e2480ecfd6a --- /dev/null +++ b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java @@ -0,0 +1,835 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.BeginQuorumEpochRequestData; +import org.apache.kafka.common.message.BeginQuorumEpochResponseData; +import org.apache.kafka.common.message.DescribeQuorumRequestData; +import org.apache.kafka.common.message.EndQuorumEpochRequestData; +import org.apache.kafka.common.message.EndQuorumEpochResponseData; +import org.apache.kafka.common.message.FetchRequestData; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.FetchSnapshotRequestData; +import org.apache.kafka.common.message.FetchSnapshotResponseData; +import org.apache.kafka.common.message.VoteRequestData; +import org.apache.kafka.common.message.VoteResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class RaftUtilTest { + + private String topic = "__cluster_metadata"; + private Uuid topicId = Uuid.METADATA_TOPIC_ID; + private int partition = 0; + + @Test + public void testValidateFetchRequestData() { + FetchRequestData none = new FetchRequestData() + .setTopics(Collections.singletonList( + new FetchRequestData.FetchTopic().setTopic(topic).setTopicId(topicId) + .setPartitions(Collections.singletonList(new FetchRequestData.FetchPartition().setPartition(partition))))); + assertEquals( + Errors.NONE, + RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition), topicId) + ); + + List invalidRequests = Arrays.asList( + new FetchRequestData(), + new FetchRequestData().setTopics( + Arrays.asList( + new FetchRequestData.FetchTopic(), + new FetchRequestData.FetchTopic())), + new FetchRequestData().setTopics( + Collections.singletonList( + new FetchRequestData.FetchTopic() + .setTopic(topic) + .setTopicId(topicId) + ) + ), + new FetchRequestData().setTopics( + Collections.singletonList( + new FetchRequestData.FetchTopic() + .setTopic(topic) + .setTopicId(topicId) + .setPartitions( + Arrays.asList( + new FetchRequestData.FetchPartition(), + new FetchRequestData.FetchPartition() + ) + ) + ) + ) + ); + for (FetchRequestData invalidRequest : invalidRequests) { + assertEquals( + Errors.INVALID_REQUEST, + RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition), topicId) + ); + } + + List unknownTopicOrPartitionRequests = Arrays.asList( + new FetchRequestData().setTopics( + Collections.singletonList( + new FetchRequestData.FetchTopic() + .setTopic(topic) + .setTopicId(Uuid.randomUuid()) + .setPartitions( + Collections.singletonList(new FetchRequestData.FetchPartition().setPartition(0)) + ) + ) + ), + new FetchRequestData().setTopics( + Collections.singletonList( + new FetchRequestData.FetchTopic() + .setTopic(topic) + .setTopicId(topicId) + .setPartitions( + Collections.singletonList(new FetchRequestData.FetchPartition().setPartition(1)) + ) + ) + ) + ); + for (FetchRequestData unknownTopicOrPartitionRequest: unknownTopicOrPartitionRequests) { + assertEquals( + Errors.UNKNOWN_TOPIC_OR_PARTITION, + RaftUtil.validateTopicPartition(unknownTopicOrPartitionRequest, new TopicPartition(topic, partition), topicId) + ); + } + } + + @Test + public void testValidateFetchResponseData() { + FetchResponseData none = new FetchResponseData() + .setResponses(Collections.singletonList( + new FetchResponseData.FetchableTopicResponse().setTopic(topic).setTopicId(topicId) + .setPartitions(Collections.singletonList(new FetchResponseData.PartitionData().setPartitionIndex(partition))))); + assertEquals( + Errors.NONE, + RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition), topicId) + ); + + List invalidRequestResponses = Arrays.asList( + new FetchResponseData(), + new FetchResponseData().setResponses( + Arrays.asList( + new FetchResponseData.FetchableTopicResponse(), + new FetchResponseData.FetchableTopicResponse())), + new FetchResponseData().setResponses( + Collections.singletonList( + new FetchResponseData.FetchableTopicResponse() + .setTopic(topic) + .setTopicId(topicId) + ) + ), + new FetchResponseData().setResponses( + Collections.singletonList( + new FetchResponseData.FetchableTopicResponse() + .setTopic(topic) + .setTopicId(topicId) + .setPartitions( + Arrays.asList( + new FetchResponseData.PartitionData(), + new FetchResponseData.PartitionData() + ) + ) + ) + ) + ); + for (FetchResponseData invalidRequest : invalidRequestResponses) { + assertEquals( + Errors.INVALID_REQUEST, + RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition), topicId) + ); + } + + List unknownTopicOrPartitionResponses = Arrays.asList( + new FetchResponseData().setResponses( + Collections.singletonList( + new FetchResponseData.FetchableTopicResponse() + .setTopic(topic) + .setTopicId(Uuid.randomUuid()) + .setPartitions( + Collections.singletonList(new FetchResponseData.PartitionData().setPartitionIndex(0)) + ) + ) + ), + new FetchResponseData().setResponses( + Collections.singletonList( + new FetchResponseData.FetchableTopicResponse() + .setTopic(topic) + .setTopicId(topicId) + .setPartitions( + Collections.singletonList(new FetchResponseData.PartitionData().setPartitionIndex(1)) + ) + ) + ) + ); + for (FetchResponseData unknownTopicOrPartition: unknownTopicOrPartitionResponses) { + assertEquals( + Errors.UNKNOWN_TOPIC_OR_PARTITION, + RaftUtil.validateTopicPartition(unknownTopicOrPartition, new TopicPartition(topic, partition), topicId) + ); + } + } + + @Test + public void testValidateVoteRequestData() { + VoteRequestData none = new VoteRequestData() + .setTopics(Collections.singletonList( + new VoteRequestData.TopicData().setTopicName(topic) + .setPartitions(Collections.singletonList(new VoteRequestData.PartitionData().setPartitionIndex(partition))))); + assertEquals( + Errors.NONE, + RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition)) + ); + + List invalidRequests = Arrays.asList( + new VoteRequestData(), + new VoteRequestData().setTopics( + Arrays.asList( + new VoteRequestData.TopicData(), + new VoteRequestData.TopicData())), + new VoteRequestData().setTopics( + Collections.singletonList( + new VoteRequestData.TopicData() + .setTopicName(topic) + ) + ), + new VoteRequestData().setTopics( + Collections.singletonList( + new VoteRequestData.TopicData() + .setTopicName(topic) + .setPartitions( + Arrays.asList( + new VoteRequestData.PartitionData(), + new VoteRequestData.PartitionData() + ) + ) + ) + ) + ); + for (VoteRequestData invalidRequest : invalidRequests) { + assertEquals( + Errors.INVALID_REQUEST, + RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition)) + ); + } + + List unknownTopicOrPartitionRequests = Arrays.asList( + new VoteRequestData().setTopics( + Collections.singletonList( + new VoteRequestData.TopicData() + .setTopicName(topic + "1") + .setPartitions( + Collections.singletonList(new VoteRequestData.PartitionData().setPartitionIndex(0)) + ) + ) + ), + new VoteRequestData().setTopics( + Collections.singletonList( + new VoteRequestData.TopicData() + .setTopicName(topic) + .setPartitions( + Collections.singletonList(new VoteRequestData.PartitionData().setPartitionIndex(1)) + ) + ) + ) + ); + for (VoteRequestData unknownTopicOrPartitionRequest: unknownTopicOrPartitionRequests) { + assertEquals( + Errors.UNKNOWN_TOPIC_OR_PARTITION, + RaftUtil.validateTopicPartition(unknownTopicOrPartitionRequest, new TopicPartition(topic, partition)) + ); + } + } + + @Test + public void testValidateVoteResponseData() { + VoteResponseData none = new VoteResponseData() + .setTopics(Collections.singletonList( + new VoteResponseData.TopicData().setTopicName(topic) + .setPartitions(Collections.singletonList(new VoteResponseData.PartitionData().setPartitionIndex(partition))))); + assertEquals( + Errors.NONE, + RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition)) + ); + + List invalidRequests = Arrays.asList( + new VoteResponseData(), + new VoteResponseData().setTopics( + Arrays.asList( + new VoteResponseData.TopicData(), + new VoteResponseData.TopicData())), + new VoteResponseData().setTopics( + Collections.singletonList( + new VoteResponseData.TopicData() + .setTopicName(topic) + ) + ), + new VoteResponseData().setTopics( + Collections.singletonList( + new VoteResponseData.TopicData() + .setTopicName(topic) + .setPartitions( + Arrays.asList( + new VoteResponseData.PartitionData(), + new VoteResponseData.PartitionData() + ) + ) + ) + ) + ); + for (VoteResponseData invalidRequest : invalidRequests) { + assertEquals( + Errors.INVALID_REQUEST, + RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition)) + ); + } + + List unknownTopicOrPartitionRequests = Arrays.asList( + new VoteResponseData().setTopics( + Collections.singletonList( + new VoteResponseData.TopicData() + .setTopicName(topic + "1") + .setPartitions( + Collections.singletonList(new VoteResponseData.PartitionData().setPartitionIndex(0)) + ) + ) + ), + new VoteResponseData().setTopics( + Collections.singletonList( + new VoteResponseData.TopicData() + .setTopicName(topic) + .setPartitions( + Collections.singletonList(new VoteResponseData.PartitionData().setPartitionIndex(1)) + ) + ) + ) + ); + for (VoteResponseData unknownTopicOrPartitionRequest: unknownTopicOrPartitionRequests) { + assertEquals( + Errors.UNKNOWN_TOPIC_OR_PARTITION, + RaftUtil.validateTopicPartition(unknownTopicOrPartitionRequest, new TopicPartition(topic, partition)) + ); + } + } + + @Test + public void testValidateBeginQuorumEpochRequestData() { + BeginQuorumEpochRequestData none = new BeginQuorumEpochRequestData() + .setTopics(Collections.singletonList( + new BeginQuorumEpochRequestData.TopicData().setTopicName(topic) + .setPartitions(Collections.singletonList(new BeginQuorumEpochRequestData.PartitionData().setPartitionIndex(partition))))); + assertEquals( + Errors.NONE, + RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition)) + ); + + List invalidRequests = Arrays.asList( + new BeginQuorumEpochRequestData(), + new BeginQuorumEpochRequestData().setTopics( + Arrays.asList( + new BeginQuorumEpochRequestData.TopicData(), + new BeginQuorumEpochRequestData.TopicData())), + new BeginQuorumEpochRequestData().setTopics( + Collections.singletonList( + new BeginQuorumEpochRequestData.TopicData() + .setTopicName(topic) + ) + ), + new BeginQuorumEpochRequestData().setTopics( + Collections.singletonList( + new BeginQuorumEpochRequestData.TopicData() + .setTopicName(topic) + .setPartitions( + Arrays.asList( + new BeginQuorumEpochRequestData.PartitionData(), + new BeginQuorumEpochRequestData.PartitionData() + ) + ) + ) + ) + ); + for (BeginQuorumEpochRequestData invalidRequest : invalidRequests) { + assertEquals( + Errors.INVALID_REQUEST, + RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition)) + ); + } + + List unknownTopicOrPartitionRequests = Arrays.asList( + new BeginQuorumEpochRequestData().setTopics( + Collections.singletonList( + new BeginQuorumEpochRequestData.TopicData() + .setTopicName(topic + "1") + .setPartitions( + Collections.singletonList(new BeginQuorumEpochRequestData.PartitionData().setPartitionIndex(0)) + ) + ) + ), + new BeginQuorumEpochRequestData().setTopics( + Collections.singletonList( + new BeginQuorumEpochRequestData.TopicData() + .setTopicName(topic) + .setPartitions( + Collections.singletonList(new BeginQuorumEpochRequestData.PartitionData().setPartitionIndex(1)) + ) + ) + ) + ); + for (BeginQuorumEpochRequestData unknownTopicOrPartitionRequest: unknownTopicOrPartitionRequests) { + assertEquals( + Errors.UNKNOWN_TOPIC_OR_PARTITION, + RaftUtil.validateTopicPartition(unknownTopicOrPartitionRequest, new TopicPartition(topic, partition)) + ); + } + } + + @Test + public void testValidateBeginQuorumEpochResponseData() { + BeginQuorumEpochResponseData none = new BeginQuorumEpochResponseData() + .setTopics(Collections.singletonList( + new BeginQuorumEpochResponseData.TopicData().setTopicName(topic) + .setPartitions(Collections.singletonList(new BeginQuorumEpochResponseData.PartitionData().setPartitionIndex(partition))))); + assertEquals( + Errors.NONE, + RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition)) + ); + + List invalidRequests = Arrays.asList( + new BeginQuorumEpochResponseData(), + new BeginQuorumEpochResponseData().setTopics( + Arrays.asList( + new BeginQuorumEpochResponseData.TopicData(), + new BeginQuorumEpochResponseData.TopicData())), + new BeginQuorumEpochResponseData().setTopics( + Collections.singletonList( + new BeginQuorumEpochResponseData.TopicData() + .setTopicName(topic) + ) + ), + new BeginQuorumEpochResponseData().setTopics( + Collections.singletonList( + new BeginQuorumEpochResponseData.TopicData() + .setTopicName(topic) + .setPartitions( + Arrays.asList( + new BeginQuorumEpochResponseData.PartitionData(), + new BeginQuorumEpochResponseData.PartitionData() + ) + ) + ) + ) + ); + for (BeginQuorumEpochResponseData invalidRequest : invalidRequests) { + assertEquals( + Errors.INVALID_REQUEST, + RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition)) + ); + } + + List unknownTopicOrPartitionRequests = Arrays.asList( + new BeginQuorumEpochResponseData().setTopics( + Collections.singletonList( + new BeginQuorumEpochResponseData.TopicData() + .setTopicName(topic + "1") + .setPartitions( + Collections.singletonList(new BeginQuorumEpochResponseData.PartitionData().setPartitionIndex(0)) + ) + ) + ), + new BeginQuorumEpochResponseData().setTopics( + Collections.singletonList( + new BeginQuorumEpochResponseData.TopicData() + .setTopicName(topic) + .setPartitions( + Collections.singletonList(new BeginQuorumEpochResponseData.PartitionData().setPartitionIndex(1)) + ) + ) + ) + ); + for (BeginQuorumEpochResponseData unknownTopicOrPartitionRequest: unknownTopicOrPartitionRequests) { + assertEquals( + Errors.UNKNOWN_TOPIC_OR_PARTITION, + RaftUtil.validateTopicPartition(unknownTopicOrPartitionRequest, new TopicPartition(topic, partition)) + ); + } + } + + @Test + public void testValidateEndQuorumEpochRequestData() { + EndQuorumEpochRequestData none = new EndQuorumEpochRequestData() + .setTopics(Collections.singletonList( + new EndQuorumEpochRequestData.TopicData().setTopicName(topic) + .setPartitions(Collections.singletonList(new EndQuorumEpochRequestData.PartitionData().setPartitionIndex(partition))))); + assertEquals( + Errors.NONE, + RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition)) + ); + + List invalidRequests = Arrays.asList( + new EndQuorumEpochRequestData(), + new EndQuorumEpochRequestData().setTopics( + Arrays.asList( + new EndQuorumEpochRequestData.TopicData(), + new EndQuorumEpochRequestData.TopicData())), + new EndQuorumEpochRequestData().setTopics( + Collections.singletonList( + new EndQuorumEpochRequestData.TopicData() + .setTopicName(topic) + ) + ), + new EndQuorumEpochRequestData().setTopics( + Collections.singletonList( + new EndQuorumEpochRequestData.TopicData() + .setTopicName(topic) + .setPartitions( + Arrays.asList( + new EndQuorumEpochRequestData.PartitionData(), + new EndQuorumEpochRequestData.PartitionData() + ) + ) + ) + ) + ); + for (EndQuorumEpochRequestData invalidRequest : invalidRequests) { + assertEquals( + Errors.INVALID_REQUEST, + RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition)) + ); + } + + List unknownTopicOrPartitionRequests = Arrays.asList( + new EndQuorumEpochRequestData().setTopics( + Collections.singletonList( + new EndQuorumEpochRequestData.TopicData() + .setTopicName(topic + "1") + .setPartitions( + Collections.singletonList(new EndQuorumEpochRequestData.PartitionData().setPartitionIndex(0)) + ) + ) + ), + new EndQuorumEpochRequestData().setTopics( + Collections.singletonList( + new EndQuorumEpochRequestData.TopicData() + .setTopicName(topic) + .setPartitions( + Collections.singletonList(new EndQuorumEpochRequestData.PartitionData().setPartitionIndex(1)) + ) + ) + ) + ); + for (EndQuorumEpochRequestData unknownTopicOrPartitionRequest: unknownTopicOrPartitionRequests) { + assertEquals( + Errors.UNKNOWN_TOPIC_OR_PARTITION, + RaftUtil.validateTopicPartition(unknownTopicOrPartitionRequest, new TopicPartition(topic, partition)) + ); + } + } + + @Test + public void testValidateEndQuorumEpochResponseData() { + EndQuorumEpochResponseData none = new EndQuorumEpochResponseData() + .setTopics(Collections.singletonList( + new EndQuorumEpochResponseData.TopicData().setTopicName(topic) + .setPartitions(Collections.singletonList(new EndQuorumEpochResponseData.PartitionData().setPartitionIndex(partition))))); + assertEquals( + Errors.NONE, + RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition)) + ); + + List invalidRequests = Arrays.asList( + new EndQuorumEpochResponseData(), + new EndQuorumEpochResponseData().setTopics( + Arrays.asList( + new EndQuorumEpochResponseData.TopicData(), + new EndQuorumEpochResponseData.TopicData())), + new EndQuorumEpochResponseData().setTopics( + Collections.singletonList( + new EndQuorumEpochResponseData.TopicData() + .setTopicName(topic) + ) + ), + new EndQuorumEpochResponseData().setTopics( + Collections.singletonList( + new EndQuorumEpochResponseData.TopicData() + .setTopicName(topic) + .setPartitions( + Arrays.asList( + new EndQuorumEpochResponseData.PartitionData(), + new EndQuorumEpochResponseData.PartitionData() + ) + ) + ) + ) + ); + for (EndQuorumEpochResponseData invalidRequest : invalidRequests) { + assertEquals( + Errors.INVALID_REQUEST, + RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition)) + ); + } + + List unknownTopicOrPartitionRequests = Arrays.asList( + new EndQuorumEpochResponseData().setTopics( + Collections.singletonList( + new EndQuorumEpochResponseData.TopicData() + .setTopicName(topic + "1") + .setPartitions( + Collections.singletonList(new EndQuorumEpochResponseData.PartitionData().setPartitionIndex(0)) + ) + ) + ), + new EndQuorumEpochResponseData().setTopics( + Collections.singletonList( + new EndQuorumEpochResponseData.TopicData() + .setTopicName(topic) + .setPartitions( + Collections.singletonList(new EndQuorumEpochResponseData.PartitionData().setPartitionIndex(1)) + ) + ) + ) + ); + for (EndQuorumEpochResponseData unknownTopicOrPartitionRequest: unknownTopicOrPartitionRequests) { + assertEquals( + Errors.UNKNOWN_TOPIC_OR_PARTITION, + RaftUtil.validateTopicPartition(unknownTopicOrPartitionRequest, new TopicPartition(topic, partition)) + ); + } + } + + @Test + public void testValidateDescribeQuorumRequestData() { + DescribeQuorumRequestData none = new DescribeQuorumRequestData() + .setTopics(Collections.singletonList( + new DescribeQuorumRequestData.TopicData().setTopicName(topic) + .setPartitions(Collections.singletonList(new DescribeQuorumRequestData.PartitionData().setPartitionIndex(partition))))); + assertEquals( + Errors.NONE, + RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition)) + ); + + List invalidRequests = Arrays.asList( + new DescribeQuorumRequestData(), + new DescribeQuorumRequestData().setTopics( + Arrays.asList( + new DescribeQuorumRequestData.TopicData(), + new DescribeQuorumRequestData.TopicData())), + new DescribeQuorumRequestData().setTopics( + Collections.singletonList( + new DescribeQuorumRequestData.TopicData() + .setTopicName(topic) + ) + ), + new DescribeQuorumRequestData().setTopics( + Collections.singletonList( + new DescribeQuorumRequestData.TopicData() + .setTopicName(topic) + .setPartitions( + Arrays.asList( + new DescribeQuorumRequestData.PartitionData(), + new DescribeQuorumRequestData.PartitionData() + ) + ) + ) + ) + ); + for (DescribeQuorumRequestData invalidRequest : invalidRequests) { + assertEquals( + Errors.INVALID_REQUEST, + RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition)) + ); + } + + List unknownTopicOrPartitionRequests = Arrays.asList( + new DescribeQuorumRequestData().setTopics( + Collections.singletonList( + new DescribeQuorumRequestData.TopicData() + .setTopicName(topic + "1") + .setPartitions( + Collections.singletonList(new DescribeQuorumRequestData.PartitionData().setPartitionIndex(0)) + ) + ) + ), + new DescribeQuorumRequestData().setTopics( + Collections.singletonList( + new DescribeQuorumRequestData.TopicData() + .setTopicName(topic) + .setPartitions( + Collections.singletonList(new DescribeQuorumRequestData.PartitionData().setPartitionIndex(1)) + ) + ) + ) + ); + for (DescribeQuorumRequestData unknownTopicOrPartitionRequest: unknownTopicOrPartitionRequests) { + assertEquals( + Errors.UNKNOWN_TOPIC_OR_PARTITION, + RaftUtil.validateTopicPartition(unknownTopicOrPartitionRequest, new TopicPartition(topic, partition)) + ); + } + } + + @Test + public void testValidateFetchSnapshotRequestData() { + FetchSnapshotRequestData none = new FetchSnapshotRequestData() + .setTopics(Collections.singletonList( + new FetchSnapshotRequestData.TopicSnapshot().setName(topic) + .setPartitions(Collections.singletonList(new FetchSnapshotRequestData.PartitionSnapshot().setPartition(partition))))); + assertEquals( + Errors.NONE, + RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition)) + ); + + List invalidRequests = Arrays.asList( + new FetchSnapshotRequestData(), + new FetchSnapshotRequestData().setTopics( + Arrays.asList( + new FetchSnapshotRequestData.TopicSnapshot(), + new FetchSnapshotRequestData.TopicSnapshot())), + new FetchSnapshotRequestData().setTopics( + Collections.singletonList( + new FetchSnapshotRequestData.TopicSnapshot() + .setName(topic) + ) + ), + new FetchSnapshotRequestData().setTopics( + Collections.singletonList( + new FetchSnapshotRequestData.TopicSnapshot() + .setName(topic) + .setPartitions( + Arrays.asList( + new FetchSnapshotRequestData.PartitionSnapshot(), + new FetchSnapshotRequestData.PartitionSnapshot() + ) + ) + ) + ) + ); + for (FetchSnapshotRequestData invalidRequest : invalidRequests) { + assertEquals( + Errors.INVALID_REQUEST, + RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition)) + ); + } + + List unknownTopicOrPartitionRequests = Arrays.asList( + new FetchSnapshotRequestData().setTopics( + Collections.singletonList( + new FetchSnapshotRequestData.TopicSnapshot() + .setName(topic + "1") + .setPartitions( + Collections.singletonList(new FetchSnapshotRequestData.PartitionSnapshot().setPartition(0)) + ) + ) + ), + new FetchSnapshotRequestData().setTopics( + Collections.singletonList( + new FetchSnapshotRequestData.TopicSnapshot() + .setName(topic) + .setPartitions( + Collections.singletonList(new FetchSnapshotRequestData.PartitionSnapshot().setPartition(1)) + ) + ) + ) + ); + for (FetchSnapshotRequestData unknownTopicOrPartitionRequest: unknownTopicOrPartitionRequests) { + assertEquals( + Errors.UNKNOWN_TOPIC_OR_PARTITION, + RaftUtil.validateTopicPartition(unknownTopicOrPartitionRequest, new TopicPartition(topic, partition)) + ); + } + } + + @Test + public void testValidateFetchSnapshotResponseData() { + FetchSnapshotResponseData none = new FetchSnapshotResponseData() + .setTopics(Collections.singletonList( + new FetchSnapshotResponseData.TopicSnapshot().setName(topic) + .setPartitions(Collections.singletonList(new FetchSnapshotResponseData.PartitionSnapshot().setIndex(partition))))); + assertEquals( + Errors.NONE, + RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition)) + ); + + List invalidRequests = Arrays.asList( + new FetchSnapshotResponseData(), + new FetchSnapshotResponseData().setTopics( + Arrays.asList( + new FetchSnapshotResponseData.TopicSnapshot(), + new FetchSnapshotResponseData.TopicSnapshot())), + new FetchSnapshotResponseData().setTopics( + Collections.singletonList( + new FetchSnapshotResponseData.TopicSnapshot() + .setName(topic) + ) + ), + new FetchSnapshotResponseData().setTopics( + Collections.singletonList( + new FetchSnapshotResponseData.TopicSnapshot() + .setName(topic) + .setPartitions( + Arrays.asList( + new FetchSnapshotResponseData.PartitionSnapshot(), + new FetchSnapshotResponseData.PartitionSnapshot() + ) + ) + ) + ) + ); + for (FetchSnapshotResponseData invalidRequest : invalidRequests) { + assertEquals( + Errors.INVALID_REQUEST, + RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition)) + ); + } + + List unknownTopicOrPartitionRequests = Arrays.asList( + new FetchSnapshotResponseData().setTopics( + Collections.singletonList( + new FetchSnapshotResponseData.TopicSnapshot() + .setName(topic + "1") + .setPartitions( + Collections.singletonList(new FetchSnapshotResponseData.PartitionSnapshot().setIndex(0)) + ) + ) + ), + new FetchSnapshotResponseData().setTopics( + Collections.singletonList( + new FetchSnapshotResponseData.TopicSnapshot() + .setName(topic) + .setPartitions( + Collections.singletonList(new FetchSnapshotResponseData.PartitionSnapshot().setIndex(1)) + ) + ) + ) + ); + for (FetchSnapshotResponseData unknownTopicOrPartitionRequest: unknownTopicOrPartitionRequests) { + assertEquals( + Errors.UNKNOWN_TOPIC_OR_PARTITION, + RaftUtil.validateTopicPartition(unknownTopicOrPartitionRequest, new TopicPartition(topic, partition)) + ); + } + } +} From 66e23826f66fdd7e0b63e30ad213d5f3e0cad695 Mon Sep 17 00:00:00 2001 From: dengziming Date: Mon, 7 Feb 2022 21:47:47 +0800 Subject: [PATCH 3/3] Add test case for VoteResponse --- .../kafka/raft/KafkaRaftClientTest.java | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index d814fe8a0d444..007f8b7530e45 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -1605,6 +1605,50 @@ public void testInconsistentClusterIdInFetchResponse() throws Exception { assertThrows(InconsistentClusterIdException.class, context.client::poll); } + @Test + public void testInconsistentClusterIdInVoteResponse() throws Exception { + int localId = 0; + int otherNodeId = 1; + int epoch = 5; + Set voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withVotedCandidate(epoch, localId) + .build(); + + // Send vote request + context.pollUntilRequest(); + int correlationId = context.assertSentVoteRequest(epoch, 0, 0L, 1); + + // Firstly receive a response with a valid cluster id + context.deliverResponse(correlationId, otherNodeId, + context.voteResponse(false, Optional.empty(), epoch)); + context.client.poll(); + + // Send vote request again + context.pollUntilRequest(); + correlationId = context.assertSentVoteRequest(epoch + 1, 0, 0L, 1); + + // Receive a response with an inconsistent cluster id + context.deliverResponse(correlationId, otherNodeId, + new VoteResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())); + + // Inconsistent cluster id are not fatal if a previous response contained a valid cluster id + assertDoesNotThrow(context.client::poll); + + // This time we receive a inconsistent cluster id directly + context = new RaftClientTestContext.Builder(localId, voters) + .withVotedCandidate(epoch, localId) + .build(); + + context.pollUntilRequest(); + correlationId = context.assertSentVoteRequest(epoch, 0, 0L, 1); + context.deliverResponse(correlationId, otherNodeId, + new VoteResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())); + // Inconsistent cluster id are fatal if this is the first rpc request + assertThrows(InconsistentClusterIdException.class, context.client::poll); + } + @Test public void testInconsistentClusterIdInBeginQuorumResponse() throws Exception { int localId = 0;