From e2db77afb0adae97c6564befbd1b3e46161d746d Mon Sep 17 00:00:00 2001 From: Manikumar Reddy Date: Sat, 31 Oct 2020 20:26:26 +0530 Subject: [PATCH 1/3] KAFKA-10669: Make CurrentLeaderEpoch field ignorable and set default for MaxNumOffsets to 1 --- .../main/resources/common/message/ListOffsetRequest.json | 4 ++-- .../apache/kafka/common/requests/RequestResponseTest.java | 7 +++---- .../scala/unit/kafka/server/ListOffsetsRequestTest.scala | 8 ++++++-- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/clients/src/main/resources/common/message/ListOffsetRequest.json b/clients/src/main/resources/common/message/ListOffsetRequest.json index 259d7bf1da236..5ecc2d61bcd21 100644 --- a/clients/src/main/resources/common/message/ListOffsetRequest.json +++ b/clients/src/main/resources/common/message/ListOffsetRequest.json @@ -42,11 +42,11 @@ "about": "Each partition in the request.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, - { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "4+", "default": "-1", + { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "4+", "default": "-1", "ignorable": true, "about": "The current leader epoch." }, { "name": "Timestamp", "type": "int64", "versions": "0+", "about": "The current timestamp." }, - { "name": "MaxNumOffsets", "type": "int32", "versions": "0", + { "name": "MaxNumOffsets", "type": "int32", "versions": "0", "default": "1", "about": "The maximum number of offsets to report." } ]} ]} diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 0862e2b6c3bc2..e5cc95bcb0a60 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -1261,10 +1261,9 @@ private ListOffsetRequest createListOffsetRequest(int version) { } else if (version >= 2 && version <= 5) { ListOffsetPartition partition = new ListOffsetPartition() .setPartitionIndex(0) - .setTimestamp(1000000L); - if (version >= 4) { - partition.setCurrentLeaderEpoch(5); - } + .setTimestamp(1000000L) + .setCurrentLeaderEpoch(5); + ListOffsetTopic topic = new ListOffsetTopic() .setName("test") .setPartitions(Arrays.asList(partition)); diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala index cedbf0a833caf..1bd057375dfd9 100644 --- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala @@ -143,7 +143,10 @@ class ListOffsetsRequestTest extends BaseRequestTest { val partitionData = response.topics.asScala.find(_.name == topic).get .partitions.asScala.find(_.partitionIndex == partition.partition).get - (partitionData.offset, partitionData.leaderEpoch) + if (version == 0) + (partitionData.oldStyleOffsets().asScala.head, partitionData.leaderEpoch) + else + (partitionData.offset, partitionData.leaderEpoch) } @Test @@ -180,7 +183,8 @@ class ListOffsetsRequestTest extends BaseRequestTest { TestUtils.generateAndProduceMessages(servers, topic, 10) - assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 0)) + assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, 0)) + assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, 0)) assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 1)) assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 2)) assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 3)) From f60228671ac378a2a9a56fc206fb8c2512e199fa Mon Sep 17 00:00:00 2001 From: Manikumar Reddy Date: Mon, 2 Nov 2020 17:10:22 +0530 Subject: [PATCH 2/3] Address review comments --- .../common/requests/RequestResponseTest.java | 6 ++-- .../kafka/server/ListOffsetsRequestTest.scala | 30 +++++++++++++------ 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index e5cc95bcb0a60..71048d8752dde 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -1243,7 +1243,8 @@ private ListOffsetRequest createListOffsetRequest(int version) { .setPartitions(Arrays.asList(new ListOffsetPartition() .setPartitionIndex(0) .setTimestamp(1000000L) - .setMaxNumOffsets(10))); + .setMaxNumOffsets(10) + .setCurrentLeaderEpoch(5))); return ListOffsetRequest.Builder .forConsumer(false, IsolationLevel.READ_UNCOMMITTED) .setTargetTimes(Collections.singletonList(topic)) @@ -1253,7 +1254,8 @@ private ListOffsetRequest createListOffsetRequest(int version) { .setName("test") .setPartitions(Arrays.asList(new ListOffsetPartition() .setPartitionIndex(0) - .setTimestamp(1000000L))); + .setTimestamp(1000000L) + .setCurrentLeaderEpoch(5))); return ListOffsetRequest.Builder .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) .setTargetTimes(Collections.singletonList(topic)) diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala index 1bd057375dfd9..a273421a4ba04 100644 --- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala @@ -143,9 +143,12 @@ class ListOffsetsRequestTest extends BaseRequestTest { val partitionData = response.topics.asScala.find(_.name == topic).get .partitions.asScala.find(_.partitionIndex == partition.partition).get - if (version == 0) - (partitionData.oldStyleOffsets().asScala.head, partitionData.leaderEpoch) - else + if (version == 0) { + if (partitionData.oldStyleOffsets().isEmpty) + (-1, partitionData.leaderEpoch) + else + (partitionData.oldStyleOffsets().asScala.head, partitionData.leaderEpoch) + } else (partitionData.offset, partitionData.leaderEpoch) } @@ -183,12 +186,21 @@ class ListOffsetsRequestTest extends BaseRequestTest { TestUtils.generateAndProduceMessages(servers, topic, 10) - assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, 0)) - assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, 0)) - assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 1)) - assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 2)) - assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 3)) - assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, 4)) + for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to ApiKeys.LIST_OFFSETS.latestVersion) { + if (version == 0) { + assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort)) + assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, version.toShort)) + assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, version.toShort)) + } else if (version >= 1 && version <=3) { + assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort)) + assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, version.toShort)) + assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, version.toShort)) + } else if (version >=4) { + assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort)) + assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, version.toShort)) + assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, version.toShort)) + } + } } private def assertResponseError(error: Errors, brokerId: Int, request: ListOffsetRequest): Unit = { From 54acc6dcf38133cd8b97b5e97ce8123ffd93ae5a Mon Sep 17 00:00:00 2001 From: Manikumar Reddy Date: Mon, 2 Nov 2020 20:50:24 +0530 Subject: [PATCH 3/3] Address review commnents --- .../scala/unit/kafka/server/ListOffsetsRequestTest.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala index a273421a4ba04..ce324c77cd9a2 100644 --- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala @@ -180,7 +180,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { } @Test - def testResponseDefaultOffsetAndLeaderEpochForLowerVersions(): Unit = { + def testResponseDefaultOffsetAndLeaderEpochForAllVersions(): Unit = { val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers) val firstLeaderId = partitionToLeader(partition.partition) @@ -191,11 +191,11 @@ class ListOffsetsRequestTest extends BaseRequestTest { assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort)) assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, version.toShort)) assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, version.toShort)) - } else if (version >= 1 && version <=3) { + } else if (version >= 1 && version <= 3) { assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort)) assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, version.toShort)) assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, version.toShort)) - } else if (version >=4) { + } else if (version >= 4) { assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort)) assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, version.toShort)) assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, version.toShort))