Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
files="MemoryRecordsBuilder.java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest).java"/>
files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest|RaftUtilTest).java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest).java"/>

Expand Down
89 changes: 53 additions & 36 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.kafka.raft;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
Expand Down Expand Up @@ -93,12 +93,13 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
import static org.apache.kafka.raft.RaftUtil.validateTopicPartition;

/**
* This class implements a Kafkaesque version of the Raft protocol. Leader election
Expand Down Expand Up @@ -151,6 +152,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private final Time time;
private final int fetchMaxWaitMs;
private final String clusterId;
private final AtomicBoolean clusterIdAcknowledged;
private final NetworkChannel channel;
private final ReplicatedLog log;
private final Random random;
Expand Down Expand Up @@ -230,6 +232,7 @@ public KafkaRaftClient(
this.appendPurgatory = new ThresholdPurgatory<>(expirationService);
this.time = time;
this.clusterId = clusterId;
this.clusterIdAcknowledged = new AtomicBoolean(false);
this.fetchMaxWaitMs = fetchMaxWaitMs;
this.logger = logContext.logger(KafkaRaftClient.class);
this.random = random;
Expand Down Expand Up @@ -534,6 +537,7 @@ private VoteResponseData buildVoteResponse(Errors partitionLevelError, boolean v
* - {@link Errors#INCONSISTENT_VOTER_SET} if the request suggests inconsistent voter membership (e.g.
* if this node or the sender is not one of the current known voters)
* - {@link Errors#INVALID_REQUEST} if the last epoch or offset are invalid
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} if the topic or partition doesn't match metadata topic
*/
private VoteResponseData handleVoteRequest(
RaftRequest.Inbound requestMetadata
Expand All @@ -544,9 +548,10 @@ private VoteResponseData handleVoteRequest(
return new VoteResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
}

if (!hasValidTopicPartition(request, log.topicPartition())) {
Errors topError = validateTopicPartition(request, log.topicPartition());
if (topError != Errors.NONE) {
// Until we support multi-raft, we treat individual topic partition mismatches as invalid requests
return new VoteResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
return new VoteResponseData().setErrorCode(topError.code());
}

VoteRequestData.PartitionData partitionRequest =
Expand Down Expand Up @@ -592,7 +597,9 @@ private boolean handleVoteResponse(
return handleTopLevelError(topLevelError, responseMetadata);
}

if (!hasValidTopicPartition(response, log.topicPartition())) {
recordAcknowledgedClusterId();

if (validateTopicPartition(response, log.topicPartition()) != Errors.NONE) {
return false;
}

Expand Down Expand Up @@ -679,6 +686,7 @@ private BeginQuorumEpochResponseData buildBeginQuorumEpochResponse(Errors partit
* - {@link Errors#INCONSISTENT_VOTER_SET} if the request suggests inconsistent voter membership (e.g.
* if this node or the sender is not one of the current known voters)
* - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} if the topic or partition doesn't match metadata topic
*/
private BeginQuorumEpochResponseData handleBeginQuorumEpochRequest(
RaftRequest.Inbound requestMetadata,
Expand All @@ -690,9 +698,10 @@ private BeginQuorumEpochResponseData handleBeginQuorumEpochRequest(
return new BeginQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
}

if (!hasValidTopicPartition(request, log.topicPartition())) {
Errors topError = validateTopicPartition(request, log.topicPartition());
if (topError != Errors.NONE) {
// Until we support multi-raft, we treat topic partition mismatches as invalid requests
return new BeginQuorumEpochResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
return new BeginQuorumEpochResponseData().setErrorCode(topError.code());
}

BeginQuorumEpochRequestData.PartitionData partitionRequest =
Expand Down Expand Up @@ -721,7 +730,9 @@ private boolean handleBeginQuorumEpochResponse(
return handleTopLevelError(topLevelError, responseMetadata);
}

if (!hasValidTopicPartition(response, log.topicPartition())) {
recordAcknowledgedClusterId();

if (validateTopicPartition(response, log.topicPartition()) != Errors.NONE) {
return false;
}

Expand Down Expand Up @@ -768,6 +779,7 @@ private EndQuorumEpochResponseData buildEndQuorumEpochResponse(Errors partitionL
* - {@link Errors#INCONSISTENT_VOTER_SET} if the request suggests inconsistent voter membership (e.g.
* if this node or the sender is not one of the current known voters)
* - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} if the topic or partition doesn't match metadata topic
*/
private EndQuorumEpochResponseData handleEndQuorumEpochRequest(
RaftRequest.Inbound requestMetadata,
Expand All @@ -779,9 +791,10 @@ private EndQuorumEpochResponseData handleEndQuorumEpochRequest(
return new EndQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
}

if (!hasValidTopicPartition(request, log.topicPartition())) {
Errors topError = validateTopicPartition(request, log.topicPartition());
if (topError != Errors.NONE) {
// Until we support multi-raft, we treat topic partition mismatches as invalid requests
return new EndQuorumEpochResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
return new EndQuorumEpochResponseData().setErrorCode(topError.code());
}

EndQuorumEpochRequestData.PartitionData partitionRequest =
Expand Down Expand Up @@ -833,7 +846,9 @@ private boolean handleEndQuorumEpochResponse(
return handleTopLevelError(topLevelError, responseMetadata);
}

if (!hasValidTopicPartition(response, log.topicPartition())) {
recordAcknowledgedClusterId();

if (validateTopicPartition(response, log.topicPartition()) != Errors.NONE) {
return false;
}

Expand Down Expand Up @@ -912,6 +927,10 @@ private boolean hasValidClusterId(String requestClusterId) {
return clusterId.equals(requestClusterId);
}

private void recordAcknowledgedClusterId() {
clusterIdAcknowledged.set(true);
}

/**
* Handle a Fetch request. The fetch offset and last fetched epoch are always
* validated against the current log. In the case that they do not match, the response will
Expand All @@ -926,6 +945,7 @@ private boolean hasValidClusterId(String requestClusterId) {
* - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch
* - {@link Errors#INVALID_REQUEST} if the request epoch is larger than the leader's current epoch
* or if either the fetch offset or the last fetched epoch is invalid
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} if the topic or partition doesn't match metadata topic
*/
private CompletableFuture<FetchResponseData> handleFetchRequest(
RaftRequest.Inbound requestMetadata,
Expand All @@ -937,9 +957,10 @@ private CompletableFuture<FetchResponseData> handleFetchRequest(
return completedFuture(new FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
}

if (!hasValidTopicPartition(request, log.topicPartition(), log.topicId())) {
Errors topError = validateTopicPartition(request, log.topicPartition(), log.topicId());
if (topError != Errors.NONE) {
// Until we support multi-raft, we treat topic partition mismatches as invalid requests
return completedFuture(new FetchResponseData().setErrorCode(Errors.INVALID_REQUEST.code()));
return completedFuture(new FetchResponseData().setErrorCode(topError.code()));
}
// If the ID is valid, we can set the topic name.
request.topics().get(0).setTopic(log.topicPartition().topic());
Expand Down Expand Up @@ -1047,7 +1068,9 @@ private boolean handleFetchResponse(
return handleTopLevelError(topLevelError, responseMetadata);
}

if (!RaftUtil.hasValidTopicPartition(response, log.topicPartition(), log.topicId())) {
recordAcknowledgedClusterId();

if (validateTopicPartition(response, log.topicPartition(), log.topicId()) != Errors.NONE) {
return false;
}
// If the ID is valid, we can set the topic name.
Expand Down Expand Up @@ -1160,9 +1183,10 @@ private DescribeQuorumResponseData handleDescribeQuorumRequest(
long currentTimeMs
) {
DescribeQuorumRequestData describeQuorumRequestData = (DescribeQuorumRequestData) requestMetadata.data;
if (!hasValidTopicPartition(describeQuorumRequestData, log.topicPartition())) {
return DescribeQuorumRequest.getPartitionLevelErrorResponse(
describeQuorumRequestData, Errors.UNKNOWN_TOPIC_OR_PARTITION);

Errors topError = validateTopicPartition(describeQuorumRequestData, log.topicPartition());
if (topError != Errors.NONE) {
return DescribeQuorumRequest.getPartitionLevelErrorResponse(describeQuorumRequestData, topError);
}

if (!quorum.isLeader()) {
Expand Down Expand Up @@ -1193,6 +1217,7 @@ private DescribeQuorumResponseData handleDescribeQuorumRequest(
* or if either the fetch offset or the last fetched epoch is invalid
* - {@link Errors#SNAPSHOT_NOT_FOUND} if the request snapshot id does not exists
* - {@link Errors#POSITION_OUT_OF_RANGE} if the request snapshot offset out of range
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} if the topic or partition doesn't match metadata topic
*/
private FetchSnapshotResponseData handleFetchSnapshotRequest(
RaftRequest.Inbound requestMetadata
Expand All @@ -1203,27 +1228,14 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest(
return new FetchSnapshotResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
}

if (data.topics().size() != 1 && data.topics().get(0).partitions().size() != 1) {
return FetchSnapshotResponse.withTopLevelError(Errors.INVALID_REQUEST);
Errors topError = validateTopicPartition(data, log.topicPartition());
if (topError != Errors.NONE) {
return FetchSnapshotResponse.withTopLevelError(topError);
}

Optional<FetchSnapshotRequestData.PartitionSnapshot> partitionSnapshotOpt = FetchSnapshotRequest
.forTopicPartition(data, log.topicPartition());
if (!partitionSnapshotOpt.isPresent()) {
// The Raft client assumes that there is only one topic partition.
TopicPartition unknownTopicPartition = new TopicPartition(
data.topics().get(0).name(),
data.topics().get(0).partitions().get(0).partition()
);
FetchSnapshotRequestData.PartitionSnapshot partitionSnapshot = FetchSnapshotRequest
.forTopicPartition(data, log.topicPartition()).get();

return FetchSnapshotResponse.singleton(
unknownTopicPartition,
responsePartitionSnapshot -> responsePartitionSnapshot
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
);
}

FetchSnapshotRequestData.PartitionSnapshot partitionSnapshot = partitionSnapshotOpt.get();
Optional<Errors> leaderValidation = validateLeaderOnlyRequest(
partitionSnapshot.currentLeaderEpoch()
);
Expand Down Expand Up @@ -1304,7 +1316,9 @@ private boolean handleFetchSnapshotResponse(
return handleTopLevelError(topLevelError, responseMetadata);
}

if (data.topics().size() != 1 && data.topics().get(0).partitions().size() != 1) {
recordAcknowledgedClusterId();

if (validateTopicPartition(data, log.topicPartition()) != Errors.NONE) {
return false;
}

Expand Down Expand Up @@ -1526,6 +1540,9 @@ private boolean handleTopLevelError(Errors error, RaftResponse.Inbound response)
return false;
} else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
throw new ClusterAuthorizationException("Received cluster authorization error in response " + response);
} else if (error == Errors.INCONSISTENT_CLUSTER_ID && !clusterIdAcknowledged.get()) {
// When handling a response, invalid cluster id are fatal unless a previous response contained a valid cluster id.
throw new InconsistentClusterIdException("Received inconsistent clusterId error in response " + response);
} else {
return handleUnexpectedError(error, response);
}
Expand Down
Loading