Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5ea4672
KAFKA-6361: Fix log divergence between leader and follower after fast…
apovzner Apr 16, 2018
00bca81
set EpochEndOffset.UNDEFINED_EPOCH default to NO_PARTITION_LEADER_EPO…
apovzner Apr 16, 2018
bc51478
some cleanup and tests
apovzner Apr 25, 2018
4fce82f
Few small fixes to truncation logic
apovzner Apr 25, 2018
2f6cf3b
fixed merge errors
apovzner Apr 25, 2018
c69e85e
Fixed build failures and fixed logging
apovzner Apr 30, 2018
ab86ba3
Updated comments based on review comments
apovzner Apr 30, 2018
b2be14d
Updated the upgrade doc and minor fixes in comments
apovzner May 1, 2018
4dc679c
Addressed review comments in tests
apovzner May 1, 2018
818d29e
Truncate based on protocol version
apovzner May 1, 2018
a781aba
Moved common truncation logic to method in base class
apovzner May 1, 2018
ba620c2
Added test for truncation logic for protocol version < 2.0
apovzner May 1, 2018
d05e50d
Added test for truncating to largest common epoch in ReplicaAlterLogD…
apovzner May 1, 2018
5a290a9
removed dup test due to merge, small cleanup to address couple of com…
apovzner May 7, 2018
a71f6cd
Bounding future replica HW to its LOE, and some more cleanup to addre…
apovzner May 7, 2018
bcced69
fixes to comments and log
apovzner May 7, 2018
6ccaa51
updated unit test to properly simulate both brokers on older protocol…
apovzner May 7, 2018
ca63ae1
Various small fixes addressing review comments
apovzner May 8, 2018
b800cb5
Creating OffsetsForLeaderEpochRequests with version
apovzner May 8, 2018
64031fc
Minor changes to address review comments
apovzner May 9, 2018
4503ab4
Future replica truncation is consistent with protocol version
apovzner May 9, 2018
e5ad61f
future replica always uses leader epoch for truncation if available
apovzner May 9, 2018
84dc51e
reverted incorrect warn about truncating below HW
apovzner May 9, 2018
544f4fd
Output warning if truncating below HW
apovzner May 9, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class CommonFields {
public static final Field.Int32 PARTITION_ID = new Field.Int32("partition", "Topic partition id");
public static final Field.Int16 ERROR_CODE = new Field.Int16("error_code", "Response error code");
public static final Field.NullableStr ERROR_MESSAGE = new Field.NullableStr("error_message", "Response error message");
public static final Field.Int32 LEADER_EPOCH = new Field.Int32("leader_epoch", "The epoch");

// Group APIs
public static final Field.Str GROUP_ID = new Field.Str("group_id", "The unique group identifier");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,29 @@

import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;

import java.util.Objects;

/**
* The offset, fetched from a leader, for a particular partition.
*/

public class EpochEndOffset {
public static final long UNDEFINED_EPOCH_OFFSET = NO_PARTITION_LEADER_EPOCH;
public static final int UNDEFINED_EPOCH = -1;
public static final int UNDEFINED_EPOCH = NO_PARTITION_LEADER_EPOCH;

private Errors error;
private int leaderEpoch; // introduced in V1
private long endOffset;

public EpochEndOffset(Errors error, long endOffset) {
public EpochEndOffset(Errors error, int leaderEpoch, long endOffset) {
this.error = error;
this.leaderEpoch = leaderEpoch;
this.endOffset = endOffset;
}

public EpochEndOffset(long endOffset) {
public EpochEndOffset(int leaderEpoch, long endOffset) {
this.error = Errors.NONE;
this.leaderEpoch = leaderEpoch;
this.endOffset = endOffset;
}

Expand All @@ -53,10 +58,15 @@ public long endOffset() {
return endOffset;
}

public int leaderEpoch() {
return leaderEpoch;
}

@Override
public String toString() {
return "EpochEndOffset{" +
"error=" + error +
", leaderEpoch=" + leaderEpoch +
", endOffset=" + endOffset +
'}';
}
Expand All @@ -68,14 +78,13 @@ public boolean equals(Object o) {

EpochEndOffset that = (EpochEndOffset) o;

if (error != that.error) return false;
return endOffset == that.endOffset;
return Objects.equals(error, that.error)
&& Objects.equals(leaderEpoch, that.leaderEpoch)
&& Objects.equals(endOffset, that.endOffset);
}

@Override
public int hashCode() {
int result = (int) error.code();
result = 31 * result + (int) (endOffset ^ (endOffset >>> 32));
return result;
return Objects.hash(error, leaderEpoch, endOffset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V0 = new Schema(
new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_REQUEST_TOPIC_V0), "An array of topics to get epochs for"));

/* v1 request is the same as v0. Per-partition leader epoch has been added to response */
private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V1 = OFFSET_FOR_LEADER_EPOCH_REQUEST_V0;

public static Schema[] schemaVersions() {
return new Schema[]{OFFSET_FOR_LEADER_EPOCH_REQUEST_V0};
return new Schema[]{OFFSET_FOR_LEADER_EPOCH_REQUEST_V0, OFFSET_FOR_LEADER_EPOCH_REQUEST_V1};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this affects inter broker protocol, we need to (1) document this api change for "2.0-IV0" in ApiVersion.scala, (2) update the upgrade section in the doc, (3) only use the new protocol if the inter broker protocol is 2.0-IV0 or above.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done all three.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding (3), the fetcher falls back to KIP-101 logic if inter-broker protocol version < KAFKA_2_0_IV0 (ignores leader epoch returned in the response and uses end offset).

}

private Map<TopicPartition, Integer> epochsByPartition;
Expand All @@ -63,12 +66,12 @@ public Map<TopicPartition, Integer> epochsByTopicPartition() {
public static class Builder extends AbstractRequest.Builder<OffsetsForLeaderEpochRequest> {
private Map<TopicPartition, Integer> epochsByPartition = new HashMap<>();

public Builder() {
super(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
public Builder(short version) {
super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version);
}

public Builder(Map<TopicPartition, Integer> epochsByPartition) {
super(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
public Builder(short version, Map<TopicPartition, Integer> epochsByPartition) {
super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version);
this.epochsByPartition = epochsByPartition;
}

Expand Down Expand Up @@ -150,7 +153,8 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
Map<TopicPartition, EpochEndOffset> errorResponse = new HashMap<>();
for (TopicPartition tp : epochsByPartition.keySet()) {
errorResponse.put(tp, new EpochEndOffset(error, EpochEndOffset.UNDEFINED_EPOCH_OFFSET));
errorResponse.put(tp, new EpochEndOffset(
error, EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET));
}
return new OffsetsForLeaderEpochResponse(errorResponse);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.CollectionUtils;

import java.nio.ByteBuffer;
Expand All @@ -34,6 +35,7 @@
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
import static org.apache.kafka.common.protocol.CommonFields.LEADER_EPOCH;
import static org.apache.kafka.common.protocol.types.Type.INT64;

public class OffsetsForLeaderEpochResponse extends AbstractResponse {
Expand All @@ -52,8 +54,23 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V0),
"An array of topics for which we have leader offsets for some requested Partition Leader Epoch"));


// OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V1 added a per-partition leader epoch field,
// which specifies which leader epoch the end offset belongs to
private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V1 = new Schema(
ERROR_CODE,
PARTITION_ID,
LEADER_EPOCH,
new Field(END_OFFSET_KEY_NAME, INT64, "The end offset"));
private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V1 = new Schema(
TOPIC_NAME,
new Field(PARTITIONS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V1)));
private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1 = new Schema(
new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V1),
"An array of topics for which we have leader offsets for some requested Partition Leader Epoch"));

public static Schema[] schemaVersions() {
return new Schema[]{OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0};
return new Schema[]{OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0, OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1};
}

private Map<TopicPartition, EpochEndOffset> epochEndOffsetsByPartition;
Expand All @@ -68,8 +85,9 @@ public OffsetsForLeaderEpochResponse(Struct struct) {
Errors error = Errors.forCode(partitionAndEpoch.get(ERROR_CODE));
int partitionId = partitionAndEpoch.get(PARTITION_ID);
TopicPartition tp = new TopicPartition(topic, partitionId);
int leaderEpoch = partitionAndEpoch.getOrElse(LEADER_EPOCH, RecordBatch.NO_PARTITION_LEADER_EPOCH);
long endOffset = partitionAndEpoch.getLong(END_OFFSET_KEY_NAME);
epochEndOffsetsByPartition.put(tp, new EpochEndOffset(error, endOffset));
epochEndOffsetsByPartition.put(tp, new EpochEndOffset(error, leaderEpoch, endOffset));
}
}
}
Expand Down Expand Up @@ -110,6 +128,7 @@ protected Struct toStruct(short version) {
Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME);
partitionStruct.set(ERROR_CODE, partitionEndOffset.getValue().error().code());
partitionStruct.set(PARTITION_ID, partitionEndOffset.getKey());
partitionStruct.setIfExists(LEADER_EPOCH, partitionEndOffset.getValue().leaderEpoch());
partitionStruct.set(END_OFFSET_KEY_NAME, partitionEndOffset.getValue().endOffset());
partitions.add(partitionStruct);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1020,15 +1020,15 @@ private OffsetsForLeaderEpochRequest createLeaderEpochRequest() {
epochs.put(new TopicPartition("topic1", 1), 1);
epochs.put(new TopicPartition("topic2", 2), 3);

return new OffsetsForLeaderEpochRequest.Builder(epochs).build();
return new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), epochs).build();
}

private OffsetsForLeaderEpochResponse createLeaderEpochResponse() {
Map<TopicPartition, EpochEndOffset> epochs = new HashMap<>();

epochs.put(new TopicPartition("topic1", 0), new EpochEndOffset(Errors.NONE, 0));
epochs.put(new TopicPartition("topic1", 1), new EpochEndOffset(Errors.NONE, 1));
epochs.put(new TopicPartition("topic2", 2), new EpochEndOffset(Errors.NONE, 2));
epochs.put(new TopicPartition("topic1", 0), new EpochEndOffset(Errors.NONE, 1, 0));
epochs.put(new TopicPartition("topic1", 1), new EpochEndOffset(Errors.NONE, 1, 1));
epochs.put(new TopicPartition("topic2", 2), new EpochEndOffset(Errors.NONE, 1, 2));

return new OffsetsForLeaderEpochResponse(epochs);
}
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/api/ApiVersion.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ object ApiVersion {
// and KafkaStorageException for fetch requests.
"1.1-IV0" -> KAFKA_1_1_IV0,
"1.1" -> KAFKA_1_1_IV0,
// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279
"2.0-IV0" -> KAFKA_2_0_IV0,
"2.0" -> KAFKA_2_0_IV0
)
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -647,15 +647,20 @@ class Partition(val topic: String,

/**
* @param leaderEpoch Requested leader epoch
* @return The last offset of messages published under this leader epoch.
* @return The requested leader epoch and the end offset of this leader epoch, or if the requested
* leader epoch is unknown, the leader epoch less than the requested leader epoch and the end offset
* of this leader epoch. The end offset of a leader epoch is defined as the start
* offset of the first leader epoch larger than the leader epoch, or else the log end
* offset if the leader epoch is the latest leader epoch.
*/
def lastOffsetForLeaderEpoch(leaderEpoch: Int): EpochEndOffset = {
inReadLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal match {
case Some(leaderReplica) =>
new EpochEndOffset(NONE, leaderReplica.epochs.get.endOffsetFor(leaderEpoch))
val (epoch, offset) = leaderReplica.epochs.get.endOffsetFor(leaderEpoch)
new EpochEndOffset(NONE, epoch, offset)
case None =>
new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET)
new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package kafka.consumer
import kafka.api.{FetchRequestBuilder, FetchResponsePartitionData, OffsetRequest, Request}
import kafka.cluster.BrokerEndPoint
import kafka.message.ByteBufferMessageSet
import kafka.server.{AbstractFetcherThread, PartitionFetchState}
import kafka.server.{AbstractFetcherThread, PartitionFetchState, OffsetTruncationState}
import AbstractFetcherThread.ResultWithPartitions
import kafka.common.{ErrorMapping, TopicAndPartition}

Expand Down Expand Up @@ -129,7 +129,7 @@ class ConsumerFetcherThread(consumerIdString: String,

override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { Map() }

override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = {
override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = {
ResultWithPartitions(Map(), Set())
}
}
Expand Down
Loading