diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f1007dc4c5c64..5e160b317c4da 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -60,7 +60,7 @@
files="MemoryRecordsBuilder.java"/>
+ files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest|RaftUtilTest).java"/>
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index dced48da39a0d..d4b3d9f9f7557 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -17,8 +17,8 @@
package org.apache.kafka.raft;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
@@ -93,12 +93,13 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.util.concurrent.CompletableFuture.completedFuture;
-import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
+import static org.apache.kafka.raft.RaftUtil.validateTopicPartition;
/**
* This class implements a Kafkaesque version of the Raft protocol. Leader election
@@ -151,6 +152,7 @@ public class KafkaRaftClient implements RaftClient {
private final Time time;
private final int fetchMaxWaitMs;
private final String clusterId;
+ private final AtomicBoolean clusterIdAcknowledged;
private final NetworkChannel channel;
private final ReplicatedLog log;
private final Random random;
@@ -230,6 +232,7 @@ public KafkaRaftClient(
this.appendPurgatory = new ThresholdPurgatory<>(expirationService);
this.time = time;
this.clusterId = clusterId;
+ this.clusterIdAcknowledged = new AtomicBoolean(false);
this.fetchMaxWaitMs = fetchMaxWaitMs;
this.logger = logContext.logger(KafkaRaftClient.class);
this.random = random;
@@ -534,6 +537,7 @@ private VoteResponseData buildVoteResponse(Errors partitionLevelError, boolean v
* - {@link Errors#INCONSISTENT_VOTER_SET} if the request suggests inconsistent voter membership (e.g.
* if this node or the sender is not one of the current known voters)
* - {@link Errors#INVALID_REQUEST} if the last epoch or offset are invalid
+ * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} if the topic or partition doesn't match metadata topic
*/
private VoteResponseData handleVoteRequest(
RaftRequest.Inbound requestMetadata
@@ -544,9 +548,10 @@ private VoteResponseData handleVoteRequest(
return new VoteResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
}
- if (!hasValidTopicPartition(request, log.topicPartition())) {
+ Errors topError = validateTopicPartition(request, log.topicPartition());
+ if (topError != Errors.NONE) {
// Until we support multi-raft, we treat individual topic partition mismatches as invalid requests
- return new VoteResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
+ return new VoteResponseData().setErrorCode(topError.code());
}
VoteRequestData.PartitionData partitionRequest =
@@ -592,7 +597,9 @@ private boolean handleVoteResponse(
return handleTopLevelError(topLevelError, responseMetadata);
}
- if (!hasValidTopicPartition(response, log.topicPartition())) {
+ recordAcknowledgedClusterId();
+
+ if (validateTopicPartition(response, log.topicPartition()) != Errors.NONE) {
return false;
}
@@ -679,6 +686,7 @@ private BeginQuorumEpochResponseData buildBeginQuorumEpochResponse(Errors partit
* - {@link Errors#INCONSISTENT_VOTER_SET} if the request suggests inconsistent voter membership (e.g.
* if this node or the sender is not one of the current known voters)
* - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch
+ * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} if the topic or partition doesn't match metadata topic
*/
private BeginQuorumEpochResponseData handleBeginQuorumEpochRequest(
RaftRequest.Inbound requestMetadata,
@@ -690,9 +698,10 @@ private BeginQuorumEpochResponseData handleBeginQuorumEpochRequest(
return new BeginQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
}
- if (!hasValidTopicPartition(request, log.topicPartition())) {
+ Errors topError = validateTopicPartition(request, log.topicPartition());
+ if (topError != Errors.NONE) {
// Until we support multi-raft, we treat topic partition mismatches as invalid requests
- return new BeginQuorumEpochResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
+ return new BeginQuorumEpochResponseData().setErrorCode(topError.code());
}
BeginQuorumEpochRequestData.PartitionData partitionRequest =
@@ -721,7 +730,9 @@ private boolean handleBeginQuorumEpochResponse(
return handleTopLevelError(topLevelError, responseMetadata);
}
- if (!hasValidTopicPartition(response, log.topicPartition())) {
+ recordAcknowledgedClusterId();
+
+ if (validateTopicPartition(response, log.topicPartition()) != Errors.NONE) {
return false;
}
@@ -768,6 +779,7 @@ private EndQuorumEpochResponseData buildEndQuorumEpochResponse(Errors partitionL
* - {@link Errors#INCONSISTENT_VOTER_SET} if the request suggests inconsistent voter membership (e.g.
* if this node or the sender is not one of the current known voters)
* - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch
+ * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} if the topic or partition doesn't match metadata topic
*/
private EndQuorumEpochResponseData handleEndQuorumEpochRequest(
RaftRequest.Inbound requestMetadata,
@@ -779,9 +791,10 @@ private EndQuorumEpochResponseData handleEndQuorumEpochRequest(
return new EndQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
}
- if (!hasValidTopicPartition(request, log.topicPartition())) {
+ Errors topError = validateTopicPartition(request, log.topicPartition());
+ if (topError != Errors.NONE) {
// Until we support multi-raft, we treat topic partition mismatches as invalid requests
- return new EndQuorumEpochResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
+ return new EndQuorumEpochResponseData().setErrorCode(topError.code());
}
EndQuorumEpochRequestData.PartitionData partitionRequest =
@@ -833,7 +846,9 @@ private boolean handleEndQuorumEpochResponse(
return handleTopLevelError(topLevelError, responseMetadata);
}
- if (!hasValidTopicPartition(response, log.topicPartition())) {
+ recordAcknowledgedClusterId();
+
+ if (validateTopicPartition(response, log.topicPartition()) != Errors.NONE) {
return false;
}
@@ -912,6 +927,10 @@ private boolean hasValidClusterId(String requestClusterId) {
return clusterId.equals(requestClusterId);
}
+ private void recordAcknowledgedClusterId() {
+ clusterIdAcknowledged.set(true);
+ }
+
/**
* Handle a Fetch request. The fetch offset and last fetched epoch are always
* validated against the current log. In the case that they do not match, the response will
@@ -926,6 +945,7 @@ private boolean hasValidClusterId(String requestClusterId) {
* - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch
* - {@link Errors#INVALID_REQUEST} if the request epoch is larger than the leader's current epoch
* or if either the fetch offset or the last fetched epoch is invalid
+ * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} if the topic or partition doesn't match metadata topic
*/
private CompletableFuture handleFetchRequest(
RaftRequest.Inbound requestMetadata,
@@ -937,9 +957,10 @@ private CompletableFuture handleFetchRequest(
return completedFuture(new FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
}
- if (!hasValidTopicPartition(request, log.topicPartition(), log.topicId())) {
+ Errors topError = validateTopicPartition(request, log.topicPartition(), log.topicId());
+ if (topError != Errors.NONE) {
// Until we support multi-raft, we treat topic partition mismatches as invalid requests
- return completedFuture(new FetchResponseData().setErrorCode(Errors.INVALID_REQUEST.code()));
+ return completedFuture(new FetchResponseData().setErrorCode(topError.code()));
}
// If the ID is valid, we can set the topic name.
request.topics().get(0).setTopic(log.topicPartition().topic());
@@ -1047,7 +1068,9 @@ private boolean handleFetchResponse(
return handleTopLevelError(topLevelError, responseMetadata);
}
- if (!RaftUtil.hasValidTopicPartition(response, log.topicPartition(), log.topicId())) {
+ recordAcknowledgedClusterId();
+
+ if (validateTopicPartition(response, log.topicPartition(), log.topicId()) != Errors.NONE) {
return false;
}
// If the ID is valid, we can set the topic name.
@@ -1160,9 +1183,10 @@ private DescribeQuorumResponseData handleDescribeQuorumRequest(
long currentTimeMs
) {
DescribeQuorumRequestData describeQuorumRequestData = (DescribeQuorumRequestData) requestMetadata.data;
- if (!hasValidTopicPartition(describeQuorumRequestData, log.topicPartition())) {
- return DescribeQuorumRequest.getPartitionLevelErrorResponse(
- describeQuorumRequestData, Errors.UNKNOWN_TOPIC_OR_PARTITION);
+
+ Errors topError = validateTopicPartition(describeQuorumRequestData, log.topicPartition());
+ if (topError != Errors.NONE) {
+ return DescribeQuorumRequest.getPartitionLevelErrorResponse(describeQuorumRequestData, topError);
}
if (!quorum.isLeader()) {
@@ -1193,6 +1217,7 @@ private DescribeQuorumResponseData handleDescribeQuorumRequest(
* or if either the fetch offset or the last fetched epoch is invalid
* - {@link Errors#SNAPSHOT_NOT_FOUND} if the request snapshot id does not exists
* - {@link Errors#POSITION_OUT_OF_RANGE} if the request snapshot offset out of range
+ * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} if the topic or partition doesn't match metadata topic
*/
private FetchSnapshotResponseData handleFetchSnapshotRequest(
RaftRequest.Inbound requestMetadata
@@ -1203,27 +1228,14 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest(
return new FetchSnapshotResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
}
- if (data.topics().size() != 1 && data.topics().get(0).partitions().size() != 1) {
- return FetchSnapshotResponse.withTopLevelError(Errors.INVALID_REQUEST);
+ Errors topError = validateTopicPartition(data, log.topicPartition());
+ if (topError != Errors.NONE) {
+ return FetchSnapshotResponse.withTopLevelError(topError);
}
- Optional partitionSnapshotOpt = FetchSnapshotRequest
- .forTopicPartition(data, log.topicPartition());
- if (!partitionSnapshotOpt.isPresent()) {
- // The Raft client assumes that there is only one topic partition.
- TopicPartition unknownTopicPartition = new TopicPartition(
- data.topics().get(0).name(),
- data.topics().get(0).partitions().get(0).partition()
- );
+ FetchSnapshotRequestData.PartitionSnapshot partitionSnapshot = FetchSnapshotRequest
+ .forTopicPartition(data, log.topicPartition()).get();
- return FetchSnapshotResponse.singleton(
- unknownTopicPartition,
- responsePartitionSnapshot -> responsePartitionSnapshot
- .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
- );
- }
-
- FetchSnapshotRequestData.PartitionSnapshot partitionSnapshot = partitionSnapshotOpt.get();
Optional leaderValidation = validateLeaderOnlyRequest(
partitionSnapshot.currentLeaderEpoch()
);
@@ -1304,7 +1316,9 @@ private boolean handleFetchSnapshotResponse(
return handleTopLevelError(topLevelError, responseMetadata);
}
- if (data.topics().size() != 1 && data.topics().get(0).partitions().size() != 1) {
+ recordAcknowledgedClusterId();
+
+ if (validateTopicPartition(data, log.topicPartition()) != Errors.NONE) {
return false;
}
@@ -1526,6 +1540,9 @@ private boolean handleTopLevelError(Errors error, RaftResponse.Inbound response)
return false;
} else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
throw new ClusterAuthorizationException("Received cluster authorization error in response " + response);
+ } else if (error == Errors.INCONSISTENT_CLUSTER_ID && !clusterIdAcknowledged.get()) {
+ // When handling a response, invalid cluster id are fatal unless a previous response contained a valid cluster id.
+ throw new InconsistentClusterIdException("Received inconsistent clusterId error in response " + response);
} else {
return handleUnexpectedError(error, response);
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
index 9ff03617e63fa..45c237440e242 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
@@ -25,6 +25,8 @@
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.FetchSnapshotRequestData;
+import org.apache.kafka.common.message.FetchSnapshotResponseData;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -97,66 +99,135 @@ public static FetchResponseData singletonFetchResponse(
.setResponses(Collections.singletonList(fetchableTopic));
}
- static boolean hasValidTopicPartition(FetchRequestData data, TopicPartition topicPartition, Uuid topicId) {
- return data.topics().size() == 1 &&
- data.topics().get(0).topicId().equals(topicId) &&
- data.topics().get(0).partitions().size() == 1 &&
- data.topics().get(0).partitions().get(0).partition() == topicPartition.partition();
+ static Errors validateTopicPartition(FetchRequestData data, TopicPartition topicPartition, Uuid topicId) {
+ if (data.topics().size() != 1 ||
+ data.topics().get(0).partitions().size() != 1) {
+ return Errors.INVALID_REQUEST;
+ }
+ if (!data.topics().get(0).topicId().equals(topicId) ||
+ data.topics().get(0).partitions().get(0).partition() != topicPartition.partition()) {
+ return Errors.UNKNOWN_TOPIC_OR_PARTITION;
+ }
+ return Errors.NONE;
}
- static boolean hasValidTopicPartition(FetchResponseData data, TopicPartition topicPartition, Uuid topicId) {
- return data.responses().size() == 1 &&
- data.responses().get(0).topicId().equals(topicId) &&
- data.responses().get(0).partitions().size() == 1 &&
- data.responses().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
+ static Errors validateTopicPartition(FetchResponseData data, TopicPartition topicPartition, Uuid topicId) {
+ if (data.responses().size() != 1 ||
+ data.responses().get(0).partitions().size() != 1) {
+ return Errors.INVALID_REQUEST;
+ }
+ if (!data.responses().get(0).topicId().equals(topicId) ||
+ data.responses().get(0).partitions().get(0).partitionIndex() != topicPartition.partition()) {
+ return Errors.UNKNOWN_TOPIC_OR_PARTITION;
+ }
+ return Errors.NONE;
}
- static boolean hasValidTopicPartition(VoteResponseData data, TopicPartition topicPartition) {
- return data.topics().size() == 1 &&
- data.topics().get(0).topicName().equals(topicPartition.topic()) &&
- data.topics().get(0).partitions().size() == 1 &&
- data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
+ static Errors validateTopicPartition(VoteResponseData data, TopicPartition topicPartition) {
+ if (data.topics().size() != 1 ||
+ data.topics().get(0).partitions().size() != 1) {
+ return Errors.INVALID_REQUEST;
+ }
+ if (!data.topics().get(0).topicName().equals(topicPartition.topic()) ||
+ data.topics().get(0).partitions().get(0).partitionIndex() != topicPartition.partition()) {
+ return Errors.UNKNOWN_TOPIC_OR_PARTITION;
+ }
+ return Errors.NONE;
+ }
+
+ static Errors validateTopicPartition(VoteRequestData data, TopicPartition topicPartition) {
+ if (data.topics().size() != 1 ||
+ data.topics().get(0).partitions().size() != 1) {
+ return Errors.INVALID_REQUEST;
+ }
+ if (!data.topics().get(0).topicName().equals(topicPartition.topic()) ||
+ data.topics().get(0).partitions().get(0).partitionIndex() != topicPartition.partition()) {
+ return Errors.UNKNOWN_TOPIC_OR_PARTITION;
+ }
+ return Errors.NONE;
+ }
+
+ static Errors validateTopicPartition(BeginQuorumEpochRequestData data, TopicPartition topicPartition) {
+ if (data.topics().size() != 1 ||
+ data.topics().get(0).partitions().size() != 1) {
+ return Errors.INVALID_REQUEST;
+ }
+ if (!data.topics().get(0).topicName().equals(topicPartition.topic()) ||
+ data.topics().get(0).partitions().get(0).partitionIndex() != topicPartition.partition()) {
+ return Errors.UNKNOWN_TOPIC_OR_PARTITION;
+ }
+ return Errors.NONE;
}
- static boolean hasValidTopicPartition(VoteRequestData data, TopicPartition topicPartition) {
- return data.topics().size() == 1 &&
- data.topics().get(0).topicName().equals(topicPartition.topic()) &&
- data.topics().get(0).partitions().size() == 1 &&
- data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
+ static Errors validateTopicPartition(BeginQuorumEpochResponseData data, TopicPartition topicPartition) {
+ if (data.topics().size() != 1 ||
+ data.topics().get(0).partitions().size() != 1) {
+ return Errors.INVALID_REQUEST;
+ }
+ if (!data.topics().get(0).topicName().equals(topicPartition.topic()) ||
+ data.topics().get(0).partitions().get(0).partitionIndex() != topicPartition.partition()) {
+ return Errors.UNKNOWN_TOPIC_OR_PARTITION;
+ }
+ return Errors.NONE;
}
- static boolean hasValidTopicPartition(BeginQuorumEpochRequestData data, TopicPartition topicPartition) {
- return data.topics().size() == 1 &&
- data.topics().get(0).topicName().equals(topicPartition.topic()) &&
- data.topics().get(0).partitions().size() == 1 &&
- data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
+ static Errors validateTopicPartition(EndQuorumEpochRequestData data, TopicPartition topicPartition) {
+ if (data.topics().size() != 1 ||
+ data.topics().get(0).partitions().size() != 1) {
+ return Errors.INVALID_REQUEST;
+ }
+ if (!data.topics().get(0).topicName().equals(topicPartition.topic()) ||
+ data.topics().get(0).partitions().get(0).partitionIndex() != topicPartition.partition()) {
+ return Errors.UNKNOWN_TOPIC_OR_PARTITION;
+ }
+ return Errors.NONE;
}
- static boolean hasValidTopicPartition(BeginQuorumEpochResponseData data, TopicPartition topicPartition) {
- return data.topics().size() == 1 &&
- data.topics().get(0).topicName().equals(topicPartition.topic()) &&
- data.topics().get(0).partitions().size() == 1 &&
- data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
+ static Errors validateTopicPartition(EndQuorumEpochResponseData data, TopicPartition topicPartition) {
+ if (data.topics().size() != 1 ||
+ data.topics().get(0).partitions().size() != 1) {
+ return Errors.INVALID_REQUEST;
+ }
+ if (!data.topics().get(0).topicName().equals(topicPartition.topic()) ||
+ data.topics().get(0).partitions().get(0).partitionIndex() != topicPartition.partition()) {
+ return Errors.UNKNOWN_TOPIC_OR_PARTITION;
+ }
+ return Errors.NONE;
}
- static boolean hasValidTopicPartition(EndQuorumEpochRequestData data, TopicPartition topicPartition) {
- return data.topics().size() == 1 &&
- data.topics().get(0).topicName().equals(topicPartition.topic()) &&
- data.topics().get(0).partitions().size() == 1 &&
- data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
+ static Errors validateTopicPartition(DescribeQuorumRequestData data, TopicPartition topicPartition) {
+ if (data.topics().size() != 1 ||
+ data.topics().get(0).partitions().size() != 1) {
+ return Errors.INVALID_REQUEST;
+ }
+ if (!data.topics().get(0).topicName().equals(topicPartition.topic()) ||
+ data.topics().get(0).partitions().get(0).partitionIndex() != topicPartition.partition()) {
+ return Errors.UNKNOWN_TOPIC_OR_PARTITION;
+ }
+ return Errors.NONE;
}
- static boolean hasValidTopicPartition(EndQuorumEpochResponseData data, TopicPartition topicPartition) {
- return data.topics().size() == 1 &&
- data.topics().get(0).topicName().equals(topicPartition.topic()) &&
- data.topics().get(0).partitions().size() == 1 &&
- data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
+ static Errors validateTopicPartition(FetchSnapshotRequestData data, TopicPartition topicPartition) {
+ if (data.topics().size() != 1 ||
+ data.topics().get(0).partitions().size() != 1) {
+ return Errors.INVALID_REQUEST;
+ }
+ if (!data.topics().get(0).name().equals(topicPartition.topic()) ||
+ data.topics().get(0).partitions().get(0).partition() != topicPartition.partition()) {
+ return Errors.UNKNOWN_TOPIC_OR_PARTITION;
+ }
+ return Errors.NONE;
}
- static boolean hasValidTopicPartition(DescribeQuorumRequestData data, TopicPartition topicPartition) {
- return data.topics().size() == 1 &&
- data.topics().get(0).topicName().equals(topicPartition.topic()) &&
- data.topics().get(0).partitions().size() == 1 &&
- data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
+ static Errors validateTopicPartition(FetchSnapshotResponseData data, TopicPartition topicPartition) {
+ if (data.topics().size() != 1 ||
+ data.topics().get(0).partitions().size() != 1) {
+ return Errors.INVALID_REQUEST;
+ }
+ if (!data.topics().get(0).name().equals(topicPartition.topic()) ||
+ data.topics().get(0).partitions().get(0).index() != topicPartition.partition()) {
+ return Errors.UNKNOWN_TOPIC_OR_PARTITION;
+ }
+ return Errors.NONE;
}
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index 754c2e2c55e3d..2aa9f1bb486fc 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -47,6 +47,7 @@
import java.util.OptionalInt;
import java.util.Set;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -517,8 +518,7 @@ public void testFetchSnapshotRequestUnknownPartition() throws Exception {
context.client.poll();
- FetchSnapshotResponseData.PartitionSnapshot response = context.assertSentFetchSnapshotResponse(topicPartition).get();
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.forCode(response.errorCode()));
+ context.assertSentFetchSnapshotResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION);
}
@Test
@@ -1618,6 +1618,46 @@ public void testCreateSnapshotAsFollowerWithInvalidSnapshotId() throws Exception
assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId4.offset, invalidSnapshotId4.epoch, 0));
}
+ @Test
+ public void testInconsistentClusterIdInFetchSnapshotResponse() throws Exception {
+ int localId = 0;
+ int leaderId = localId + 1;
+ Set voters = Utils.mkSet(localId, leaderId);
+ int epoch = 2;
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+ .withElectedLeader(epoch, leaderId)
+ .build();
+
+ // Send a request
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+
+ // Firstly receive a response with a valid cluster id
+ context.deliverResponse(
+ fetchRequest.correlationId,
+ fetchRequest.destinationId(),
+ snapshotFetchResponse(context.metadataPartition, context.metadataTopicId, epoch, leaderId, snapshotId, 200L)
+ );
+
+ // Send fetch snapshot request
+ context.pollUntilRequest();
+ RaftRequest.Outbound snapshotRequest = context.assertSentFetchSnapshotRequest();
+
+ // Secondly receive a response with an inconsistent cluster id
+ context.deliverResponse(
+ snapshotRequest.correlationId,
+ snapshotRequest.destinationId(),
+ new FetchSnapshotResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())
+ );
+
+ // Inconsistent cluster id are not fatal if a previous response contained a valid cluster id
+ assertDoesNotThrow(context.client::poll);
+
+ // It's impossible to receive a FetchSnapshotResponse before any other request so we don't test
+ }
+
private static FetchSnapshotRequestData fetchSnapshotRequest(
TopicPartition topicPartition,
int epoch,
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 9b2771d2b34e4..007f8b7530e45 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.raft;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
@@ -57,6 +58,7 @@
import static java.util.Collections.singletonList;
import static org.apache.kafka.raft.RaftClientTestContext.Builder.DEFAULT_ELECTION_TIMEOUT_MS;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -1559,6 +1561,166 @@ public void testEndQuorumEpochRequestClusterIdValidation() throws Exception {
context.assertSentEndQuorumEpochResponse(Errors.INCONSISTENT_CLUSTER_ID);
}
+ @Test
+ public void testInconsistentClusterIdInFetchResponse() throws Exception {
+ int localId = 0;
+ int otherNodeId = 1;
+ int epoch = 5;
+ Set voters = Utils.mkSet(localId, otherNodeId);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+ .withElectedLeader(epoch, otherNodeId)
+ .build();
+
+ // Send a request
+ context.pollUntilRequest();
+ int correlationId = context.assertSentFetchRequest(epoch, 0L, 0);
+
+ // Firstly receive a response with a valid cluster id
+ FetchResponseData fetchResponse = context.fetchResponse(epoch, otherNodeId,
+ MemoryRecords.EMPTY, 0L, Errors.NONE);
+ context.deliverResponse(correlationId, otherNodeId, fetchResponse);
+
+ // Send fetch request
+ context.pollUntilRequest();
+ correlationId = context.assertSentFetchRequest(epoch, 0L, 0);
+
+ // Secondly receive a response with an inconsistent cluster id
+ context.deliverResponse(correlationId, otherNodeId,
+ new FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
+
+ // Inconsistent cluster id are not fatal if a previous response contained a valid cluster id
+ assertDoesNotThrow(context.client::poll);
+
+ // This time we receive a inconsistent cluster id directly
+ context = new RaftClientTestContext.Builder(localId, voters)
+ .withElectedLeader(epoch, otherNodeId)
+ .build();
+
+ context.pollUntilRequest();
+ correlationId = context.assertSentFetchRequest(epoch, 0L, 0);
+ context.deliverResponse(correlationId, otherNodeId,
+ new FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
+ // Inconsistent cluster id are fatal if this is the first rpc request
+ assertThrows(InconsistentClusterIdException.class, context.client::poll);
+ }
+
+ @Test
+ public void testInconsistentClusterIdInVoteResponse() throws Exception {
+ int localId = 0;
+ int otherNodeId = 1;
+ int epoch = 5;
+ Set voters = Utils.mkSet(localId, otherNodeId);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+ .withVotedCandidate(epoch, localId)
+ .build();
+
+ // Send vote request
+ context.pollUntilRequest();
+ int correlationId = context.assertSentVoteRequest(epoch, 0, 0L, 1);
+
+ // Firstly receive a response with a valid cluster id
+ context.deliverResponse(correlationId, otherNodeId,
+ context.voteResponse(false, Optional.empty(), epoch));
+ context.client.poll();
+
+ // Send vote request again
+ context.pollUntilRequest();
+ correlationId = context.assertSentVoteRequest(epoch + 1, 0, 0L, 1);
+
+ // Receive a response with an inconsistent cluster id
+ context.deliverResponse(correlationId, otherNodeId,
+ new VoteResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
+
+ // Inconsistent cluster id are not fatal if a previous response contained a valid cluster id
+ assertDoesNotThrow(context.client::poll);
+
+ // This time we receive a inconsistent cluster id directly
+ context = new RaftClientTestContext.Builder(localId, voters)
+ .withVotedCandidate(epoch, localId)
+ .build();
+
+ context.pollUntilRequest();
+ correlationId = context.assertSentVoteRequest(epoch, 0, 0L, 1);
+ context.deliverResponse(correlationId, otherNodeId,
+ new VoteResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
+ // Inconsistent cluster id are fatal if this is the first rpc request
+ assertThrows(InconsistentClusterIdException.class, context.client::poll);
+ }
+
+ @Test
+ public void testInconsistentClusterIdInBeginQuorumResponse() throws Exception {
+ int localId = 0;
+ int otherNodeId = 1;
+ int epoch = 5;
+ Set voters = Utils.mkSet(localId, otherNodeId);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+ .withVotedCandidate(epoch, localId)
+ .build();
+ context.assertVotedCandidate(epoch, localId);
+
+ // Send a request
+ context.pollUntilRequest();
+ int correlationId = context.assertSentVoteRequest(epoch, 0, 0L, 1);
+
+ // Firstly receive a response with a valid cluster id
+ context.deliverResponse(correlationId, otherNodeId, context.voteResponse(true, Optional.empty(), epoch));
+ context.client.poll();
+ context.assertElectedLeader(epoch, localId);
+
+ // Send begin quorum request
+ context.pollUntilRequest();
+ correlationId = context.assertSentBeginQuorumEpochRequest(epoch, 1);
+
+ // Secondly receive a response with an inconsistent cluster id
+ context.deliverResponse(correlationId, otherNodeId,
+ new BeginQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
+
+ // Inconsistent cluster id are not fatal if a previous response contained a valid cluster id
+ assertDoesNotThrow(context.client::poll);
+
+ // It's impossible to receive a BeginQuorumEpochResponse before any other request so we don't test
+ }
+
+ @Test
+ public void testInconsistentClusterIdInEndQuorumResponse() throws Exception {
+ int localId = 0;
+ int otherNodeId = 1;
+ int epoch = 5;
+ Set voters = Utils.mkSet(localId, otherNodeId);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+ .withVotedCandidate(epoch, localId)
+ .build();
+ context.assertVotedCandidate(epoch, localId);
+
+ // Send a request
+ context.pollUntilRequest();
+ int correlationId = context.assertSentVoteRequest(epoch, 0, 0L, 1);
+
+ // Firstly receive a response with a valid cluster id
+ context.deliverResponse(correlationId, otherNodeId, context.voteResponse(true, Optional.empty(), epoch));
+ context.client.poll();
+ context.assertElectedLeader(epoch, localId);
+
+ context.client.shutdown(5000);
+
+ // Send end quorum request
+ context.pollUntilRequest();
+ correlationId = context.assertSentEndQuorumEpochRequest(epoch, 1);
+
+ // Secondly receive a response with an inconsistent cluster id
+ context.deliverResponse(correlationId, otherNodeId,
+ new EndQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
+
+ // Inconsistent cluster id are not fatal if a previous response contained a valid cluster id
+ assertDoesNotThrow(context.client::poll);
+
+ // It's impossible to receive an EndQuorumEpochResponse before any other request so we don't test
+ }
+
@Test
public void testVoterOnlyRequestValidation() throws Exception {
int localId = 0;
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index d48e41fb31d0c..6e83be7145a24 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -78,7 +78,7 @@
import java.util.Set;
import java.util.stream.Collectors;
-import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
+import static org.apache.kafka.raft.RaftUtil.validateTopicPartition;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -496,7 +496,7 @@ void assertSentVoteResponse(
RaftMessage raftMessage = sentMessages.get(0);
assertTrue(raftMessage.data() instanceof VoteResponseData);
VoteResponseData response = (VoteResponseData) raftMessage.data();
- assertTrue(hasValidTopicPartition(response, metadataPartition));
+ assertEquals(Errors.NONE, validateTopicPartition(response, metadataPartition));
VoteResponseData.PartitionData partitionResponse = response.topics().get(0).partitions().get(0);
@@ -914,7 +914,7 @@ VoteResponseData voteResponse(boolean voteGranted, Optional leaderId, i
}
private VoteRequestData.PartitionData unwrap(VoteRequestData voteRequest) {
- assertTrue(RaftUtil.hasValidTopicPartition(voteRequest, metadataPartition));
+ assertEquals(Errors.NONE, validateTopicPartition(voteRequest, metadataPartition));
return voteRequest.topics().get(0).partitions().get(0);
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
new file mode 100644
index 0000000000000..59e2480ecfd6a
--- /dev/null
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
@@ -0,0 +1,835 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.DescribeQuorumRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchRequestData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.FetchSnapshotRequestData;
+import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.message.VoteRequestData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RaftUtilTest {
+
+ private String topic = "__cluster_metadata";
+ private Uuid topicId = Uuid.METADATA_TOPIC_ID;
+ private int partition = 0;
+
+ @Test
+ public void testValidateFetchRequestData() {
+ FetchRequestData none = new FetchRequestData()
+ .setTopics(Collections.singletonList(
+ new FetchRequestData.FetchTopic().setTopic(topic).setTopicId(topicId)
+ .setPartitions(Collections.singletonList(new FetchRequestData.FetchPartition().setPartition(partition)))));
+ assertEquals(
+ Errors.NONE,
+ RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition), topicId)
+ );
+
+ List invalidRequests = Arrays.asList(
+ new FetchRequestData(),
+ new FetchRequestData().setTopics(
+ Arrays.asList(
+ new FetchRequestData.FetchTopic(),
+ new FetchRequestData.FetchTopic())),
+ new FetchRequestData().setTopics(
+ Collections.singletonList(
+ new FetchRequestData.FetchTopic()
+ .setTopic(topic)
+ .setTopicId(topicId)
+ )
+ ),
+ new FetchRequestData().setTopics(
+ Collections.singletonList(
+ new FetchRequestData.FetchTopic()
+ .setTopic(topic)
+ .setTopicId(topicId)
+ .setPartitions(
+ Arrays.asList(
+ new FetchRequestData.FetchPartition(),
+ new FetchRequestData.FetchPartition()
+ )
+ )
+ )
+ )
+ );
+ for (FetchRequestData invalidRequest : invalidRequests) {
+ assertEquals(
+ Errors.INVALID_REQUEST,
+ RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition), topicId)
+ );
+ }
+
+ List unknownTopicOrPartitionRequests = Arrays.asList(
+ new FetchRequestData().setTopics(
+ Collections.singletonList(
+ new FetchRequestData.FetchTopic()
+ .setTopic(topic)
+ .setTopicId(Uuid.randomUuid())
+ .setPartitions(
+ Collections.singletonList(new FetchRequestData.FetchPartition().setPartition(0))
+ )
+ )
+ ),
+ new FetchRequestData().setTopics(
+ Collections.singletonList(
+ new FetchRequestData.FetchTopic()
+ .setTopic(topic)
+ .setTopicId(topicId)
+ .setPartitions(
+ Collections.singletonList(new FetchRequestData.FetchPartition().setPartition(1))
+ )
+ )
+ )
+ );
+ for (FetchRequestData unknownTopicOrPartitionRequest: unknownTopicOrPartitionRequests) {
+ assertEquals(
+ Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ RaftUtil.validateTopicPartition(unknownTopicOrPartitionRequest, new TopicPartition(topic, partition), topicId)
+ );
+ }
+ }
+
+ @Test
+ public void testValidateFetchResponseData() {
+ FetchResponseData none = new FetchResponseData()
+ .setResponses(Collections.singletonList(
+ new FetchResponseData.FetchableTopicResponse().setTopic(topic).setTopicId(topicId)
+ .setPartitions(Collections.singletonList(new FetchResponseData.PartitionData().setPartitionIndex(partition)))));
+ assertEquals(
+ Errors.NONE,
+ RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition), topicId)
+ );
+
+ List invalidRequestResponses = Arrays.asList(
+ new FetchResponseData(),
+ new FetchResponseData().setResponses(
+ Arrays.asList(
+ new FetchResponseData.FetchableTopicResponse(),
+ new FetchResponseData.FetchableTopicResponse())),
+ new FetchResponseData().setResponses(
+ Collections.singletonList(
+ new FetchResponseData.FetchableTopicResponse()
+ .setTopic(topic)
+ .setTopicId(topicId)
+ )
+ ),
+ new FetchResponseData().setResponses(
+ Collections.singletonList(
+ new FetchResponseData.FetchableTopicResponse()
+ .setTopic(topic)
+ .setTopicId(topicId)
+ .setPartitions(
+ Arrays.asList(
+ new FetchResponseData.PartitionData(),
+ new FetchResponseData.PartitionData()
+ )
+ )
+ )
+ )
+ );
+ for (FetchResponseData invalidRequest : invalidRequestResponses) {
+ assertEquals(
+ Errors.INVALID_REQUEST,
+ RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition), topicId)
+ );
+ }
+
+ List unknownTopicOrPartitionResponses = Arrays.asList(
+ new FetchResponseData().setResponses(
+ Collections.singletonList(
+ new FetchResponseData.FetchableTopicResponse()
+ .setTopic(topic)
+ .setTopicId(Uuid.randomUuid())
+ .setPartitions(
+ Collections.singletonList(new FetchResponseData.PartitionData().setPartitionIndex(0))
+ )
+ )
+ ),
+ new FetchResponseData().setResponses(
+ Collections.singletonList(
+ new FetchResponseData.FetchableTopicResponse()
+ .setTopic(topic)
+ .setTopicId(topicId)
+ .setPartitions(
+ Collections.singletonList(new FetchResponseData.PartitionData().setPartitionIndex(1))
+ )
+ )
+ )
+ );
+ for (FetchResponseData unknownTopicOrPartition: unknownTopicOrPartitionResponses) {
+ assertEquals(
+ Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ RaftUtil.validateTopicPartition(unknownTopicOrPartition, new TopicPartition(topic, partition), topicId)
+ );
+ }
+ }
+
+ @Test
+ public void testValidateVoteRequestData() {
+ VoteRequestData none = new VoteRequestData()
+ .setTopics(Collections.singletonList(
+ new VoteRequestData.TopicData().setTopicName(topic)
+ .setPartitions(Collections.singletonList(new VoteRequestData.PartitionData().setPartitionIndex(partition)))));
+ assertEquals(
+ Errors.NONE,
+ RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition))
+ );
+
+ List invalidRequests = Arrays.asList(
+ new VoteRequestData(),
+ new VoteRequestData().setTopics(
+ Arrays.asList(
+ new VoteRequestData.TopicData(),
+ new VoteRequestData.TopicData())),
+ new VoteRequestData().setTopics(
+ Collections.singletonList(
+ new VoteRequestData.TopicData()
+ .setTopicName(topic)
+ )
+ ),
+ new VoteRequestData().setTopics(
+ Collections.singletonList(
+ new VoteRequestData.TopicData()
+ .setTopicName(topic)
+ .setPartitions(
+ Arrays.asList(
+ new VoteRequestData.PartitionData(),
+ new VoteRequestData.PartitionData()
+ )
+ )
+ )
+ )
+ );
+ for (VoteRequestData invalidRequest : invalidRequests) {
+ assertEquals(
+ Errors.INVALID_REQUEST,
+ RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition))
+ );
+ }
+
+ List unknownTopicOrPartitionRequests = Arrays.asList(
+ new VoteRequestData().setTopics(
+ Collections.singletonList(
+ new VoteRequestData.TopicData()
+ .setTopicName(topic + "1")
+ .setPartitions(
+ Collections.singletonList(new VoteRequestData.PartitionData().setPartitionIndex(0))
+ )
+ )
+ ),
+ new VoteRequestData().setTopics(
+ Collections.singletonList(
+ new VoteRequestData.TopicData()
+ .setTopicName(topic)
+ .setPartitions(
+ Collections.singletonList(new VoteRequestData.PartitionData().setPartitionIndex(1))
+ )
+ )
+ )
+ );
+ for (VoteRequestData unknownTopicOrPartitionRequest: unknownTopicOrPartitionRequests) {
+ assertEquals(
+ Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ RaftUtil.validateTopicPartition(unknownTopicOrPartitionRequest, new TopicPartition(topic, partition))
+ );
+ }
+ }
+
+ @Test
+ public void testValidateVoteResponseData() {
+ VoteResponseData none = new VoteResponseData()
+ .setTopics(Collections.singletonList(
+ new VoteResponseData.TopicData().setTopicName(topic)
+ .setPartitions(Collections.singletonList(new VoteResponseData.PartitionData().setPartitionIndex(partition)))));
+ assertEquals(
+ Errors.NONE,
+ RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition))
+ );
+
+ List invalidRequests = Arrays.asList(
+ new VoteResponseData(),
+ new VoteResponseData().setTopics(
+ Arrays.asList(
+ new VoteResponseData.TopicData(),
+ new VoteResponseData.TopicData())),
+ new VoteResponseData().setTopics(
+ Collections.singletonList(
+ new VoteResponseData.TopicData()
+ .setTopicName(topic)
+ )
+ ),
+ new VoteResponseData().setTopics(
+ Collections.singletonList(
+ new VoteResponseData.TopicData()
+ .setTopicName(topic)
+ .setPartitions(
+ Arrays.asList(
+ new VoteResponseData.PartitionData(),
+ new VoteResponseData.PartitionData()
+ )
+ )
+ )
+ )
+ );
+ for (VoteResponseData invalidRequest : invalidRequests) {
+ assertEquals(
+ Errors.INVALID_REQUEST,
+ RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition))
+ );
+ }
+
+ List unknownTopicOrPartitionRequests = Arrays.asList(
+ new VoteResponseData().setTopics(
+ Collections.singletonList(
+ new VoteResponseData.TopicData()
+ .setTopicName(topic + "1")
+ .setPartitions(
+ Collections.singletonList(new VoteResponseData.PartitionData().setPartitionIndex(0))
+ )
+ )
+ ),
+ new VoteResponseData().setTopics(
+ Collections.singletonList(
+ new VoteResponseData.TopicData()
+ .setTopicName(topic)
+ .setPartitions(
+ Collections.singletonList(new VoteResponseData.PartitionData().setPartitionIndex(1))
+ )
+ )
+ )
+ );
+ for (VoteResponseData unknownTopicOrPartitionRequest: unknownTopicOrPartitionRequests) {
+ assertEquals(
+ Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ RaftUtil.validateTopicPartition(unknownTopicOrPartitionRequest, new TopicPartition(topic, partition))
+ );
+ }
+ }
+
+ @Test
+ public void testValidateBeginQuorumEpochRequestData() {
+ BeginQuorumEpochRequestData none = new BeginQuorumEpochRequestData()
+ .setTopics(Collections.singletonList(
+ new BeginQuorumEpochRequestData.TopicData().setTopicName(topic)
+ .setPartitions(Collections.singletonList(new BeginQuorumEpochRequestData.PartitionData().setPartitionIndex(partition)))));
+ assertEquals(
+ Errors.NONE,
+ RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition))
+ );
+
+ List invalidRequests = Arrays.asList(
+ new BeginQuorumEpochRequestData(),
+ new BeginQuorumEpochRequestData().setTopics(
+ Arrays.asList(
+ new BeginQuorumEpochRequestData.TopicData(),
+ new BeginQuorumEpochRequestData.TopicData())),
+ new BeginQuorumEpochRequestData().setTopics(
+ Collections.singletonList(
+ new BeginQuorumEpochRequestData.TopicData()
+ .setTopicName(topic)
+ )
+ ),
+ new BeginQuorumEpochRequestData().setTopics(
+ Collections.singletonList(
+ new BeginQuorumEpochRequestData.TopicData()
+ .setTopicName(topic)
+ .setPartitions(
+ Arrays.asList(
+ new BeginQuorumEpochRequestData.PartitionData(),
+ new BeginQuorumEpochRequestData.PartitionData()
+ )
+ )
+ )
+ )
+ );
+ for (BeginQuorumEpochRequestData invalidRequest : invalidRequests) {
+ assertEquals(
+ Errors.INVALID_REQUEST,
+ RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition))
+ );
+ }
+
+ List unknownTopicOrPartitionRequests = Arrays.asList(
+ new BeginQuorumEpochRequestData().setTopics(
+ Collections.singletonList(
+ new BeginQuorumEpochRequestData.TopicData()
+ .setTopicName(topic + "1")
+ .setPartitions(
+ Collections.singletonList(new BeginQuorumEpochRequestData.PartitionData().setPartitionIndex(0))
+ )
+ )
+ ),
+ new BeginQuorumEpochRequestData().setTopics(
+ Collections.singletonList(
+ new BeginQuorumEpochRequestData.TopicData()
+ .setTopicName(topic)
+ .setPartitions(
+ Collections.singletonList(new BeginQuorumEpochRequestData.PartitionData().setPartitionIndex(1))
+ )
+ )
+ )
+ );
+ for (BeginQuorumEpochRequestData unknownTopicOrPartitionRequest: unknownTopicOrPartitionRequests) {
+ assertEquals(
+ Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ RaftUtil.validateTopicPartition(unknownTopicOrPartitionRequest, new TopicPartition(topic, partition))
+ );
+ }
+ }
+
+ @Test
+ public void testValidateBeginQuorumEpochResponseData() {
+ BeginQuorumEpochResponseData none = new BeginQuorumEpochResponseData()
+ .setTopics(Collections.singletonList(
+ new BeginQuorumEpochResponseData.TopicData().setTopicName(topic)
+ .setPartitions(Collections.singletonList(new BeginQuorumEpochResponseData.PartitionData().setPartitionIndex(partition)))));
+ assertEquals(
+ Errors.NONE,
+ RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition))
+ );
+
+ List invalidRequests = Arrays.asList(
+ new BeginQuorumEpochResponseData(),
+ new BeginQuorumEpochResponseData().setTopics(
+ Arrays.asList(
+ new BeginQuorumEpochResponseData.TopicData(),
+ new BeginQuorumEpochResponseData.TopicData())),
+ new BeginQuorumEpochResponseData().setTopics(
+ Collections.singletonList(
+ new BeginQuorumEpochResponseData.TopicData()
+ .setTopicName(topic)
+ )
+ ),
+ new BeginQuorumEpochResponseData().setTopics(
+ Collections.singletonList(
+ new BeginQuorumEpochResponseData.TopicData()
+ .setTopicName(topic)
+ .setPartitions(
+ Arrays.asList(
+ new BeginQuorumEpochResponseData.PartitionData(),
+ new BeginQuorumEpochResponseData.PartitionData()
+ )
+ )
+ )
+ )
+ );
+ for (BeginQuorumEpochResponseData invalidRequest : invalidRequests) {
+ assertEquals(
+ Errors.INVALID_REQUEST,
+ RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition))
+ );
+ }
+
+ List unknownTopicOrPartitionRequests = Arrays.asList(
+ new BeginQuorumEpochResponseData().setTopics(
+ Collections.singletonList(
+ new BeginQuorumEpochResponseData.TopicData()
+ .setTopicName(topic + "1")
+ .setPartitions(
+ Collections.singletonList(new BeginQuorumEpochResponseData.PartitionData().setPartitionIndex(0))
+ )
+ )
+ ),
+ new BeginQuorumEpochResponseData().setTopics(
+ Collections.singletonList(
+ new BeginQuorumEpochResponseData.TopicData()
+ .setTopicName(topic)
+ .setPartitions(
+ Collections.singletonList(new BeginQuorumEpochResponseData.PartitionData().setPartitionIndex(1))
+ )
+ )
+ )
+ );
+ for (BeginQuorumEpochResponseData unknownTopicOrPartitionRequest: unknownTopicOrPartitionRequests) {
+ assertEquals(
+ Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ RaftUtil.validateTopicPartition(unknownTopicOrPartitionRequest, new TopicPartition(topic, partition))
+ );
+ }
+ }
+
+ @Test
+ public void testValidateEndQuorumEpochRequestData() {
+ EndQuorumEpochRequestData none = new EndQuorumEpochRequestData()
+ .setTopics(Collections.singletonList(
+ new EndQuorumEpochRequestData.TopicData().setTopicName(topic)
+ .setPartitions(Collections.singletonList(new EndQuorumEpochRequestData.PartitionData().setPartitionIndex(partition)))));
+ assertEquals(
+ Errors.NONE,
+ RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition))
+ );
+
+ List invalidRequests = Arrays.asList(
+ new EndQuorumEpochRequestData(),
+ new EndQuorumEpochRequestData().setTopics(
+ Arrays.asList(
+ new EndQuorumEpochRequestData.TopicData(),
+ new EndQuorumEpochRequestData.TopicData())),
+ new EndQuorumEpochRequestData().setTopics(
+ Collections.singletonList(
+ new EndQuorumEpochRequestData.TopicData()
+ .setTopicName(topic)
+ )
+ ),
+ new EndQuorumEpochRequestData().setTopics(
+ Collections.singletonList(
+ new EndQuorumEpochRequestData.TopicData()
+ .setTopicName(topic)
+ .setPartitions(
+ Arrays.asList(
+ new EndQuorumEpochRequestData.PartitionData(),
+ new EndQuorumEpochRequestData.PartitionData()
+ )
+ )
+ )
+ )
+ );
+ for (EndQuorumEpochRequestData invalidRequest : invalidRequests) {
+ assertEquals(
+ Errors.INVALID_REQUEST,
+ RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition))
+ );
+ }
+
+ List unknownTopicOrPartitionRequests = Arrays.asList(
+ new EndQuorumEpochRequestData().setTopics(
+ Collections.singletonList(
+ new EndQuorumEpochRequestData.TopicData()
+ .setTopicName(topic + "1")
+ .setPartitions(
+ Collections.singletonList(new EndQuorumEpochRequestData.PartitionData().setPartitionIndex(0))
+ )
+ )
+ ),
+ new EndQuorumEpochRequestData().setTopics(
+ Collections.singletonList(
+ new EndQuorumEpochRequestData.TopicData()
+ .setTopicName(topic)
+ .setPartitions(
+ Collections.singletonList(new EndQuorumEpochRequestData.PartitionData().setPartitionIndex(1))
+ )
+ )
+ )
+ );
+ for (EndQuorumEpochRequestData unknownTopicOrPartitionRequest: unknownTopicOrPartitionRequests) {
+ assertEquals(
+ Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ RaftUtil.validateTopicPartition(unknownTopicOrPartitionRequest, new TopicPartition(topic, partition))
+ );
+ }
+ }
+
+ @Test
+ public void testValidateEndQuorumEpochResponseData() {
+ EndQuorumEpochResponseData none = new EndQuorumEpochResponseData()
+ .setTopics(Collections.singletonList(
+ new EndQuorumEpochResponseData.TopicData().setTopicName(topic)
+ .setPartitions(Collections.singletonList(new EndQuorumEpochResponseData.PartitionData().setPartitionIndex(partition)))));
+ assertEquals(
+ Errors.NONE,
+ RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition))
+ );
+
+ List invalidRequests = Arrays.asList(
+ new EndQuorumEpochResponseData(),
+ new EndQuorumEpochResponseData().setTopics(
+ Arrays.asList(
+ new EndQuorumEpochResponseData.TopicData(),
+ new EndQuorumEpochResponseData.TopicData())),
+ new EndQuorumEpochResponseData().setTopics(
+ Collections.singletonList(
+ new EndQuorumEpochResponseData.TopicData()
+ .setTopicName(topic)
+ )
+ ),
+ new EndQuorumEpochResponseData().setTopics(
+ Collections.singletonList(
+ new EndQuorumEpochResponseData.TopicData()
+ .setTopicName(topic)
+ .setPartitions(
+ Arrays.asList(
+ new EndQuorumEpochResponseData.PartitionData(),
+ new EndQuorumEpochResponseData.PartitionData()
+ )
+ )
+ )
+ )
+ );
+ for (EndQuorumEpochResponseData invalidRequest : invalidRequests) {
+ assertEquals(
+ Errors.INVALID_REQUEST,
+ RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition))
+ );
+ }
+
+ List unknownTopicOrPartitionRequests = Arrays.asList(
+ new EndQuorumEpochResponseData().setTopics(
+ Collections.singletonList(
+ new EndQuorumEpochResponseData.TopicData()
+ .setTopicName(topic + "1")
+ .setPartitions(
+ Collections.singletonList(new EndQuorumEpochResponseData.PartitionData().setPartitionIndex(0))
+ )
+ )
+ ),
+ new EndQuorumEpochResponseData().setTopics(
+ Collections.singletonList(
+ new EndQuorumEpochResponseData.TopicData()
+ .setTopicName(topic)
+ .setPartitions(
+ Collections.singletonList(new EndQuorumEpochResponseData.PartitionData().setPartitionIndex(1))
+ )
+ )
+ )
+ );
+ for (EndQuorumEpochResponseData unknownTopicOrPartitionRequest: unknownTopicOrPartitionRequests) {
+ assertEquals(
+ Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ RaftUtil.validateTopicPartition(unknownTopicOrPartitionRequest, new TopicPartition(topic, partition))
+ );
+ }
+ }
+
+ @Test
+ public void testValidateDescribeQuorumRequestData() {
+ DescribeQuorumRequestData none = new DescribeQuorumRequestData()
+ .setTopics(Collections.singletonList(
+ new DescribeQuorumRequestData.TopicData().setTopicName(topic)
+ .setPartitions(Collections.singletonList(new DescribeQuorumRequestData.PartitionData().setPartitionIndex(partition)))));
+ assertEquals(
+ Errors.NONE,
+ RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition))
+ );
+
+ List invalidRequests = Arrays.asList(
+ new DescribeQuorumRequestData(),
+ new DescribeQuorumRequestData().setTopics(
+ Arrays.asList(
+ new DescribeQuorumRequestData.TopicData(),
+ new DescribeQuorumRequestData.TopicData())),
+ new DescribeQuorumRequestData().setTopics(
+ Collections.singletonList(
+ new DescribeQuorumRequestData.TopicData()
+ .setTopicName(topic)
+ )
+ ),
+ new DescribeQuorumRequestData().setTopics(
+ Collections.singletonList(
+ new DescribeQuorumRequestData.TopicData()
+ .setTopicName(topic)
+ .setPartitions(
+ Arrays.asList(
+ new DescribeQuorumRequestData.PartitionData(),
+ new DescribeQuorumRequestData.PartitionData()
+ )
+ )
+ )
+ )
+ );
+ for (DescribeQuorumRequestData invalidRequest : invalidRequests) {
+ assertEquals(
+ Errors.INVALID_REQUEST,
+ RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition))
+ );
+ }
+
+ List unknownTopicOrPartitionRequests = Arrays.asList(
+ new DescribeQuorumRequestData().setTopics(
+ Collections.singletonList(
+ new DescribeQuorumRequestData.TopicData()
+ .setTopicName(topic + "1")
+ .setPartitions(
+ Collections.singletonList(new DescribeQuorumRequestData.PartitionData().setPartitionIndex(0))
+ )
+ )
+ ),
+ new DescribeQuorumRequestData().setTopics(
+ Collections.singletonList(
+ new DescribeQuorumRequestData.TopicData()
+ .setTopicName(topic)
+ .setPartitions(
+ Collections.singletonList(new DescribeQuorumRequestData.PartitionData().setPartitionIndex(1))
+ )
+ )
+ )
+ );
+ for (DescribeQuorumRequestData unknownTopicOrPartitionRequest: unknownTopicOrPartitionRequests) {
+ assertEquals(
+ Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ RaftUtil.validateTopicPartition(unknownTopicOrPartitionRequest, new TopicPartition(topic, partition))
+ );
+ }
+ }
+
+ @Test
+ public void testValidateFetchSnapshotRequestData() {
+ FetchSnapshotRequestData none = new FetchSnapshotRequestData()
+ .setTopics(Collections.singletonList(
+ new FetchSnapshotRequestData.TopicSnapshot().setName(topic)
+ .setPartitions(Collections.singletonList(new FetchSnapshotRequestData.PartitionSnapshot().setPartition(partition)))));
+ assertEquals(
+ Errors.NONE,
+ RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition))
+ );
+
+ List invalidRequests = Arrays.asList(
+ new FetchSnapshotRequestData(),
+ new FetchSnapshotRequestData().setTopics(
+ Arrays.asList(
+ new FetchSnapshotRequestData.TopicSnapshot(),
+ new FetchSnapshotRequestData.TopicSnapshot())),
+ new FetchSnapshotRequestData().setTopics(
+ Collections.singletonList(
+ new FetchSnapshotRequestData.TopicSnapshot()
+ .setName(topic)
+ )
+ ),
+ new FetchSnapshotRequestData().setTopics(
+ Collections.singletonList(
+ new FetchSnapshotRequestData.TopicSnapshot()
+ .setName(topic)
+ .setPartitions(
+ Arrays.asList(
+ new FetchSnapshotRequestData.PartitionSnapshot(),
+ new FetchSnapshotRequestData.PartitionSnapshot()
+ )
+ )
+ )
+ )
+ );
+ for (FetchSnapshotRequestData invalidRequest : invalidRequests) {
+ assertEquals(
+ Errors.INVALID_REQUEST,
+ RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition))
+ );
+ }
+
+ List unknownTopicOrPartitionRequests = Arrays.asList(
+ new FetchSnapshotRequestData().setTopics(
+ Collections.singletonList(
+ new FetchSnapshotRequestData.TopicSnapshot()
+ .setName(topic + "1")
+ .setPartitions(
+ Collections.singletonList(new FetchSnapshotRequestData.PartitionSnapshot().setPartition(0))
+ )
+ )
+ ),
+ new FetchSnapshotRequestData().setTopics(
+ Collections.singletonList(
+ new FetchSnapshotRequestData.TopicSnapshot()
+ .setName(topic)
+ .setPartitions(
+ Collections.singletonList(new FetchSnapshotRequestData.PartitionSnapshot().setPartition(1))
+ )
+ )
+ )
+ );
+ for (FetchSnapshotRequestData unknownTopicOrPartitionRequest: unknownTopicOrPartitionRequests) {
+ assertEquals(
+ Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ RaftUtil.validateTopicPartition(unknownTopicOrPartitionRequest, new TopicPartition(topic, partition))
+ );
+ }
+ }
+
+ @Test
+ public void testValidateFetchSnapshotResponseData() {
+ FetchSnapshotResponseData none = new FetchSnapshotResponseData()
+ .setTopics(Collections.singletonList(
+ new FetchSnapshotResponseData.TopicSnapshot().setName(topic)
+ .setPartitions(Collections.singletonList(new FetchSnapshotResponseData.PartitionSnapshot().setIndex(partition)))));
+ assertEquals(
+ Errors.NONE,
+ RaftUtil.validateTopicPartition(none, new TopicPartition(topic, partition))
+ );
+
+ List invalidRequests = Arrays.asList(
+ new FetchSnapshotResponseData(),
+ new FetchSnapshotResponseData().setTopics(
+ Arrays.asList(
+ new FetchSnapshotResponseData.TopicSnapshot(),
+ new FetchSnapshotResponseData.TopicSnapshot())),
+ new FetchSnapshotResponseData().setTopics(
+ Collections.singletonList(
+ new FetchSnapshotResponseData.TopicSnapshot()
+ .setName(topic)
+ )
+ ),
+ new FetchSnapshotResponseData().setTopics(
+ Collections.singletonList(
+ new FetchSnapshotResponseData.TopicSnapshot()
+ .setName(topic)
+ .setPartitions(
+ Arrays.asList(
+ new FetchSnapshotResponseData.PartitionSnapshot(),
+ new FetchSnapshotResponseData.PartitionSnapshot()
+ )
+ )
+ )
+ )
+ );
+ for (FetchSnapshotResponseData invalidRequest : invalidRequests) {
+ assertEquals(
+ Errors.INVALID_REQUEST,
+ RaftUtil.validateTopicPartition(invalidRequest, new TopicPartition(topic, partition))
+ );
+ }
+
+ List unknownTopicOrPartitionRequests = Arrays.asList(
+ new FetchSnapshotResponseData().setTopics(
+ Collections.singletonList(
+ new FetchSnapshotResponseData.TopicSnapshot()
+ .setName(topic + "1")
+ .setPartitions(
+ Collections.singletonList(new FetchSnapshotResponseData.PartitionSnapshot().setIndex(0))
+ )
+ )
+ ),
+ new FetchSnapshotResponseData().setTopics(
+ Collections.singletonList(
+ new FetchSnapshotResponseData.TopicSnapshot()
+ .setName(topic)
+ .setPartitions(
+ Collections.singletonList(new FetchSnapshotResponseData.PartitionSnapshot().setIndex(1))
+ )
+ )
+ )
+ );
+ for (FetchSnapshotResponseData unknownTopicOrPartitionRequest: unknownTopicOrPartitionRequests) {
+ assertEquals(
+ Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ RaftUtil.validateTopicPartition(unknownTopicOrPartitionRequest, new TopicPartition(topic, partition))
+ );
+ }
+ }
+}