From 5a7c00d65faa6e21324d37c855ef7f80c86997ee Mon Sep 17 00:00:00 2001 From: thomaskwscott Date: Tue, 25 May 2021 12:00:29 +0100 Subject: [PATCH 01/18] KAFKA-12541 add MAX_TIMESTAMP spec to listOffsets api --- .../kafka/clients/admin/KafkaAdminClient.java | 148 ++++++++++-------- .../kafka/clients/admin/OffsetSpec.java | 10 ++ .../common/requests/ListOffsetsRequest.java | 5 + .../common/message/ListOffsetsRequest.json | 4 +- .../common/message/ListOffsetsResponse.json | 4 +- core/src/main/scala/kafka/log/Log.scala | 10 ++ .../admin/ListOffsetsIntegrationTest.scala | 96 ++++++++++++ 7 files changed, 214 insertions(+), 63 deletions(-) create mode 100644 core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index e49f28a62f61b..2339a1665ef0c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -388,6 +388,12 @@ public class KafkaAdminClient extends AdminClient { private final long retryBackoffMs; + + private enum ListOffsetRequestVersion { + V0AndAbove, + V7AndAbove + } + /** * Get or create a list value from a map. * @@ -4197,25 +4203,26 @@ private List getListOffsetsCalls(MetadataOperationContext new IllegalStateException("No Metadata response")); List calls = new ArrayList<>(); - // grouping topic partitions per leader - Map> leaders = new HashMap<>(); + // grouping topic partitions per leader/request version + Map>> leaders = new HashMap<>(); for (Map.Entry entry: topicPartitionOffsets.entrySet()) { OffsetSpec offsetSpec = entry.getValue(); TopicPartition tp = entry.getKey(); KafkaFutureImpl future = futures.get(tp); - long offsetQuery = (offsetSpec instanceof TimestampSpec) - ? ((TimestampSpec) offsetSpec).timestamp() - : (offsetSpec instanceof OffsetSpec.EarliestSpec) - ? ListOffsetsRequest.EARLIEST_TIMESTAMP - : ListOffsetsRequest.LATEST_TIMESTAMP; + long offsetQuery = getOffsetFromOffsetSpec(offsetSpec); // avoid sending listOffsets request for topics with errors if (!mr.errors().containsKey(tp.topic())) { Node node = mr.cluster().leaderFor(tp); if (node != null) { - Map leadersOnNode = leaders.computeIfAbsent(node, k -> new HashMap<>()); - ListOffsetsTopic topic = leadersOnNode.computeIfAbsent(tp.topic(), k -> new ListOffsetsTopic().setName(tp.topic())); + Map> leadersOnNode = leaders.computeIfAbsent(node, k -> new HashMap<>()); + + ListOffsetRequestVersion requiredRequestVersion = offsetQuery == ListOffsetsRequest.MAX_TIMESTAMP + ? ListOffsetRequestVersion.V7AndAbove : + ListOffsetRequestVersion.V0AndAbove; + Map requestVersionsOnLeader = leadersOnNode.computeIfAbsent(requiredRequestVersion, k -> new HashMap<>()); + ListOffsetsTopic topic = requestVersionsOnLeader.computeIfAbsent(tp.topic(), k -> new ListOffsetsTopic().setName(tp.topic())); topic.partitions().add(new ListOffsetsPartition().setPartitionIndex(tp.partition()).setTimestamp(offsetQuery)); } else { future.completeExceptionally(Errors.LEADER_NOT_AVAILABLE.exception()); @@ -4225,76 +4232,84 @@ private List getListOffsetsCalls(MetadataOperationContext> entry : leaders.entrySet()) { - final int brokerId = entry.getKey().id(); + for (final Map.Entry>> versionedEntry : leaders.entrySet()) { + for (final Map.Entry> entry : versionedEntry.getValue().entrySet()) { + final int brokerId = versionedEntry.getKey().id(); - calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) { + calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) { - final List partitionsToQuery = new ArrayList<>(entry.getValue().values()); + final List partitionsToQuery = new ArrayList<>(entry.getValue().values()); - @Override - ListOffsetsRequest.Builder createRequest(int timeoutMs) { - return ListOffsetsRequest.Builder + @Override + ListOffsetsRequest.Builder createRequest(int timeoutMs) { + ListOffsetRequestVersion requestVersion = entry.getKey(); + if (requestVersion == ListOffsetRequestVersion.V7AndAbove) { + return ListOffsetsRequest.Builder + .forMaxTimestamp(context.options().isolationLevel()) + .setTargetTimes(partitionsToQuery); + } + return ListOffsetsRequest.Builder .forConsumer(true, context.options().isolationLevel()) .setTargetTimes(partitionsToQuery); - } + } - @Override - void handleResponse(AbstractResponse abstractResponse) { - ListOffsetsResponse response = (ListOffsetsResponse) abstractResponse; - Map retryTopicPartitionOffsets = new HashMap<>(); - - for (ListOffsetsTopicResponse topic : response.topics()) { - for (ListOffsetsPartitionResponse partition : topic.partitions()) { - TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); - KafkaFutureImpl future = futures.get(tp); - Errors error = Errors.forCode(partition.errorCode()); - OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp); - if (offsetRequestSpec == null) { - log.warn("Server response mentioned unknown topic partition {}", tp); - } else if (MetadataOperationContext.shouldRefreshMetadata(error)) { - retryTopicPartitionOffsets.put(tp, offsetRequestSpec); - } else if (error == Errors.NONE) { - Optional leaderEpoch = (partition.leaderEpoch() == ListOffsetsResponse.UNKNOWN_EPOCH) + @Override + void handleResponse(AbstractResponse abstractResponse) { + ListOffsetsResponse response = (ListOffsetsResponse) abstractResponse; + Map retryTopicPartitionOffsets = new HashMap<>(); + + for (ListOffsetsTopicResponse topic : response.topics()) { + for (ListOffsetsPartitionResponse partition : topic.partitions()) { + TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + KafkaFutureImpl future = futures.get(tp); + Errors error = Errors.forCode(partition.errorCode()); + OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp); + if (offsetRequestSpec == null) { + log.warn("Server response mentioned unknown topic partition {}", tp); + } else if (MetadataOperationContext.shouldRefreshMetadata(error)) { + retryTopicPartitionOffsets.put(tp, offsetRequestSpec); + } else if (error == Errors.NONE) { + Optional leaderEpoch = (partition.leaderEpoch() == ListOffsetsResponse.UNKNOWN_EPOCH) ? Optional.empty() : Optional.of(partition.leaderEpoch()); - future.complete(new ListOffsetsResultInfo(partition.offset(), partition.timestamp(), leaderEpoch)); - } else { - future.completeExceptionally(error.exception()); + future.complete(new ListOffsetsResultInfo(partition.offset(), partition.timestamp(), leaderEpoch)); + } else { + future.completeExceptionally(error.exception()); + } } } - } - if (retryTopicPartitionOffsets.isEmpty()) { - // The server should send back a response for every topic partition. But do a sanity check anyway. - for (ListOffsetsTopic topic : partitionsToQuery) { - for (ListOffsetsPartition partition : topic.partitions()) { - TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); - ApiException error = new ApiException("The response from broker " + brokerId + + if (retryTopicPartitionOffsets.isEmpty()) { + // The server should send back a response for every topic partition. But do a sanity check anyway. + for (ListOffsetsTopic topic : partitionsToQuery) { + for (ListOffsetsPartition partition : topic.partitions()) { + TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + ApiException error = new ApiException("The response from broker " + brokerId + " did not contain a result for topic partition " + tp); - futures.get(tp).completeExceptionally(error); + futures.get(tp).completeExceptionally(error); + } } + } else { + Set retryTopics = retryTopicPartitionOffsets.keySet().stream().map( + TopicPartition::topic).collect(Collectors.toSet()); + MetadataOperationContext retryContext = + new MetadataOperationContext<>(retryTopics, context.options(), context.deadline(), futures); + rescheduleMetadataTask(retryContext, () -> getListOffsetsCalls(retryContext, retryTopicPartitionOffsets, futures)); } - } else { - Set retryTopics = retryTopicPartitionOffsets.keySet().stream().map( - TopicPartition::topic).collect(Collectors.toSet()); - MetadataOperationContext retryContext = - new MetadataOperationContext<>(retryTopics, context.options(), context.deadline(), futures); - rescheduleMetadataTask(retryContext, () -> getListOffsetsCalls(retryContext, retryTopicPartitionOffsets, futures)); } - } - @Override - void handleFailure(Throwable throwable) { - for (ListOffsetsTopic topic : entry.getValue().values()) { - for (ListOffsetsPartition partition : topic.partitions()) { - TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); - KafkaFutureImpl future = futures.get(tp); - future.completeExceptionally(throwable); + @Override + void handleFailure(Throwable throwable) { + for (ListOffsetsTopic topic : entry.getValue().values()) { + for (ListOffsetsPartition partition : topic.partitions()) { + TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + KafkaFutureImpl future = futures.get(tp); + future.completeExceptionally(throwable); + } } } - } - }); + }); + } } return calls; } @@ -4821,6 +4836,17 @@ void maybeRetry(long currentTimeMs, Throwable throwable) { }; } + private long getOffsetFromOffsetSpec(OffsetSpec offsetSpec) { + if (offsetSpec instanceof TimestampSpec) { + return ((TimestampSpec) offsetSpec).timestamp(); + } else if (offsetSpec instanceof OffsetSpec.EarliestSpec) { + return ListOffsetsRequest.EARLIEST_TIMESTAMP; + } else if (offsetSpec instanceof OffsetSpec.MaxTimestampSpec) { + return ListOffsetsRequest.MAX_TIMESTAMP; + } + return ListOffsetsRequest.LATEST_TIMESTAMP; + } + /** * Get a sub level error when the request is in batch. If given key was not found, * return an {@link IllegalArgumentException}. diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java index 339e9cf815e87..dcf90452c55e7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java @@ -25,6 +25,7 @@ public class OffsetSpec { public static class EarliestSpec extends OffsetSpec { } public static class LatestSpec extends OffsetSpec { } + public static class MaxTimestampSpec extends OffsetSpec { } public static class TimestampSpec extends OffsetSpec { private final long timestamp; @@ -60,4 +61,13 @@ public static OffsetSpec forTimestamp(long timestamp) { return new TimestampSpec(timestamp); } + /** + * Used to retrieve the offset with the largest timestamp of a partition + * as message timestamps can be specified client side this may not match + * the log end offset returned by LatestSpec + */ + public static OffsetSpec maxTimestamp() { + return new MaxTimestampSpec(); + } + } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java index 658188da7e5dc..5036746622dfc 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java @@ -40,6 +40,7 @@ public class ListOffsetsRequest extends AbstractRequest { public static final long EARLIEST_TIMESTAMP = -2L; public static final long LATEST_TIMESTAMP = -1L; + public static final long MAX_TIMESTAMP = -3L; public static final int CONSUMER_REPLICA_ID = -1; public static final int DEBUGGING_REPLICA_ID = -2; @@ -63,6 +64,10 @@ else if (requireTimestamp) return new Builder(minVersion, ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel); } + public static Builder forMaxTimestamp(IsolationLevel isolationLevel) { + return new Builder((short) 7, ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel); + } + private Builder(short oldestAllowedVersion, short latestAllowedVersion, int replicaId, diff --git a/clients/src/main/resources/common/message/ListOffsetsRequest.json b/clients/src/main/resources/common/message/ListOffsetsRequest.json index a464c9376444f..4ee739b286588 100644 --- a/clients/src/main/resources/common/message/ListOffsetsRequest.json +++ b/clients/src/main/resources/common/message/ListOffsetsRequest.json @@ -30,7 +30,9 @@ // Version 5 is the same as version 4. // // Version 6 enables flexible versions. - "validVersions": "0-6", + // + // Version 7 enables listing offsets by max timestamp. + "validVersions": "0-7", "flexibleVersions": "6+", "fields": [ { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId", diff --git a/clients/src/main/resources/common/message/ListOffsetsResponse.json b/clients/src/main/resources/common/message/ListOffsetsResponse.json index 727bb8f274a99..e20959bdf14b4 100644 --- a/clients/src/main/resources/common/message/ListOffsetsResponse.json +++ b/clients/src/main/resources/common/message/ListOffsetsResponse.json @@ -29,7 +29,9 @@ // Version 5 adds a new error code, OFFSET_NOT_AVAILABLE. // // Version 6 enables flexible versions. - "validVersions": "0-6", + // + // Version 7 is the same as version 6. + "validVersions": "0-7", "flexibleVersions": "6+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true, diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index ef0d6aed5f0d8..21d11f1775303 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1316,6 +1316,16 @@ class Log(@volatile private var _dir: File, val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer]) val epochOptional = Optional.ofNullable(latestEpochOpt.orNull) Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epochOptional)) + } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { + // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides + // constant time access while being safe to use with concurrent collections unlike `toArray`. + val segmentsCopy = logSegments.toBuffer + val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) + val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer]) + val epochOptional = Optional.ofNullable(latestEpochOpt.orNull) + Some(new TimestampAndOffset(latestTimestampSegment.maxTimestampSoFar, + latestTimestampSegment.offsetOfMaxTimestampSoFar, + epochOptional)) } else { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala new file mode 100644 index 0000000000000..2bac9347d4472 --- /dev/null +++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.kafka.admin + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.admin._ +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.utils.Utils +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} + +import scala.collection.{Map, Seq} +import scala.jdk.CollectionConverters._ + +class ListOffsetsIntegrationTest extends KafkaServerTestHarness { + + val topicName = "foo" + var adminClient: Admin = null + + @BeforeEach + override def setUp(): Unit = { + super.setUp() + createTopic(topicName,1,1.asInstanceOf[Short]) + produceMessages() + adminClient = Admin.create(Map[String, Object]( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList + ).asJava) + } + + @AfterEach + override def tearDown(): Unit = { + Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") + super.tearDown() + } + + @Test + def testEarliestOffset(): Unit = { + val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) + assertEquals(0,earliestOffset.offset()) + } + + @Test + def testLatestOffset(): Unit = { + val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest()) + assertEquals(3,latestOffset.offset()) + } + + @Test + def testMaxTimestampOffset(): Unit = { + val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp()) + assertEquals(1,maxTimestampOffset.offset()) + } + + private def runFetchOffsets(adminClient: Admin, + offsetSpec: OffsetSpec): ListOffsetsResult.ListOffsetsResultInfo = { + println(s"==> listOffsets(${topicName} -> ${offsetSpec}, new ListOffsetsOptions())") + val tp = new TopicPartition(topicName,0) + adminClient.listOffsets(Map( + tp -> offsetSpec + ).asJava,new ListOffsetsOptions()).all().get().get(tp) + } + + def produceMessages(): Unit = { + val records = Seq( + new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L, + null, new Array[Byte](10000)), + new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L, + null, new Array[Byte](10000)), + new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L, + null, new Array[Byte](10000)), + ) + TestUtils.produceMessages(servers, records, -1) + } + + def generateConfigs: Seq[KafkaConfig] = + TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps) +} + From 5f5d46b6a1bd6e0d9ab38329c26ef9d5b4d4fe7e Mon Sep 17 00:00:00 2001 From: thomaskwscott Date: Tue, 25 May 2021 14:21:14 +0100 Subject: [PATCH 02/18] KAFKA-12541 updated replica fetcher to use latest ListOffsets request version --- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 170b5b9a889d5..ae88a22961890 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -94,7 +94,8 @@ class ReplicaFetcherThread(name: String, // Visible for testing private[server] val listOffsetRequestVersion: Short = - if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_8_IV0) 6 + if (brokerConfig.interBrokerProtocolVersion >= KAFKA_3_0_IV0) 7 + else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_8_IV0) 6 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_2_IV1) 5 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV1) 4 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1) 3 From 4fc4059625041fb6a87b5befbd0421fdc5d97745 Mon Sep 17 00:00:00 2001 From: thomaskwscott Date: Sat, 5 Jun 2021 19:13:51 +0100 Subject: [PATCH 03/18] KAFKA-12541 refactor with retry approach in KafkaAdminClient --- .../kafka/clients/admin/KafkaAdminClient.java | 161 ++++++++++-------- .../clients/admin/KafkaAdminClientTest.java | 132 ++++++++++++++ .../common/requests/RequestResponseTest.java | 44 ++++- .../admin/ListOffsetsIntegrationTest.scala | 10 +- .../test/scala/unit/kafka/log/LogTest.scala | 24 +++ .../kafka/server/ListOffsetsRequestTest.scala | 19 ++- .../unit/kafka/server/LogOffsetTest.scala | 33 ++++ 7 files changed, 332 insertions(+), 91 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 2339a1665ef0c..834a6e32292ca 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -388,12 +388,6 @@ public class KafkaAdminClient extends AdminClient { private final long retryBackoffMs; - - private enum ListOffsetRequestVersion { - V0AndAbove, - V7AndAbove - } - /** * Get or create a list value from a map. * @@ -4203,8 +4197,8 @@ private List getListOffsetsCalls(MetadataOperationContext new IllegalStateException("No Metadata response")); List calls = new ArrayList<>(); - // grouping topic partitions per leader/request version - Map>> leaders = new HashMap<>(); + // grouping topic partitions per leader + Map> leaders = new HashMap<>(); for (Map.Entry entry: topicPartitionOffsets.entrySet()) { @@ -4216,13 +4210,8 @@ private List getListOffsetsCalls(MetadataOperationContext> leadersOnNode = leaders.computeIfAbsent(node, k -> new HashMap<>()); - - ListOffsetRequestVersion requiredRequestVersion = offsetQuery == ListOffsetsRequest.MAX_TIMESTAMP - ? ListOffsetRequestVersion.V7AndAbove : - ListOffsetRequestVersion.V0AndAbove; - Map requestVersionsOnLeader = leadersOnNode.computeIfAbsent(requiredRequestVersion, k -> new HashMap<>()); - ListOffsetsTopic topic = requestVersionsOnLeader.computeIfAbsent(tp.topic(), k -> new ListOffsetsTopic().setName(tp.topic())); + Map leadersOnNode = leaders.computeIfAbsent(node, k -> new HashMap<>()); + ListOffsetsTopic topic = leadersOnNode.computeIfAbsent(tp.topic(), k -> new ListOffsetsTopic().setName(tp.topic())); topic.partitions().add(new ListOffsetsPartition().setPartitionIndex(tp.partition()).setTimestamp(offsetQuery)); } else { future.completeExceptionally(Errors.LEADER_NOT_AVAILABLE.exception()); @@ -4232,84 +4221,108 @@ private List getListOffsetsCalls(MetadataOperationContext>> versionedEntry : leaders.entrySet()) { - for (final Map.Entry> entry : versionedEntry.getValue().entrySet()) { - final int brokerId = versionedEntry.getKey().id(); + for (final Map.Entry> entry : leaders.entrySet()) { + final int brokerId = entry.getKey().id(); - calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) { + calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) { - final List partitionsToQuery = new ArrayList<>(entry.getValue().values()); + final List partitionsToQuery = new ArrayList<>(entry.getValue().values()); - @Override - ListOffsetsRequest.Builder createRequest(int timeoutMs) { - ListOffsetRequestVersion requestVersion = entry.getKey(); - if (requestVersion == ListOffsetRequestVersion.V7AndAbove) { - return ListOffsetsRequest.Builder - .forMaxTimestamp(context.options().isolationLevel()) - .setTargetTimes(partitionsToQuery); - } + private boolean supportsMaxTimestamp = true; + + @Override + ListOffsetsRequest.Builder createRequest(int timeoutMs) { + if (supportsMaxTimestamp) { return ListOffsetsRequest.Builder - .forConsumer(true, context.options().isolationLevel()) + .forMaxTimestamp(context.options().isolationLevel()) .setTargetTimes(partitionsToQuery); + } else { + // we shouldn't request offsets for max timestamp partitions + final List remainingPartitionsToQuery = new ArrayList<>(); + partitionsToQuery.stream().forEach(t -> + remainingPartitionsToQuery.add(new ListOffsetsTopic() + .setName(t.name()) + .setPartitions(t.partitions().stream() + .filter(p -> p.timestamp() != ListOffsetsRequest.MAX_TIMESTAMP) + .collect(Collectors.toList())) + )); + return ListOffsetsRequest.Builder + .forConsumer(true, context.options().isolationLevel()) + .setTargetTimes(remainingPartitionsToQuery); } + } - @Override - void handleResponse(AbstractResponse abstractResponse) { - ListOffsetsResponse response = (ListOffsetsResponse) abstractResponse; - Map retryTopicPartitionOffsets = new HashMap<>(); - - for (ListOffsetsTopicResponse topic : response.topics()) { - for (ListOffsetsPartitionResponse partition : topic.partitions()) { - TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); - KafkaFutureImpl future = futures.get(tp); - Errors error = Errors.forCode(partition.errorCode()); - OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp); - if (offsetRequestSpec == null) { - log.warn("Server response mentioned unknown topic partition {}", tp); - } else if (MetadataOperationContext.shouldRefreshMetadata(error)) { - retryTopicPartitionOffsets.put(tp, offsetRequestSpec); - } else if (error == Errors.NONE) { - Optional leaderEpoch = (partition.leaderEpoch() == ListOffsetsResponse.UNKNOWN_EPOCH) + @Override + void handleResponse(AbstractResponse abstractResponse) { + ListOffsetsResponse response = (ListOffsetsResponse) abstractResponse; + Map retryTopicPartitionOffsets = new HashMap<>(); + + for (ListOffsetsTopicResponse topic : response.topics()) { + for (ListOffsetsPartitionResponse partition : topic.partitions()) { + TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + KafkaFutureImpl future = futures.get(tp); + Errors error = Errors.forCode(partition.errorCode()); + OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp); + if (offsetRequestSpec == null) { + log.warn("Server response mentioned unknown topic partition {}", tp); + } else if (MetadataOperationContext.shouldRefreshMetadata(error)) { + retryTopicPartitionOffsets.put(tp, offsetRequestSpec); + } else if (error == Errors.NONE) { + Optional leaderEpoch = (partition.leaderEpoch() == ListOffsetsResponse.UNKNOWN_EPOCH) ? Optional.empty() : Optional.of(partition.leaderEpoch()); - future.complete(new ListOffsetsResultInfo(partition.offset(), partition.timestamp(), leaderEpoch)); - } else { - future.completeExceptionally(error.exception()); - } + future.complete(new ListOffsetsResultInfo(partition.offset(), partition.timestamp(), leaderEpoch)); + } else { + future.completeExceptionally(error.exception()); } } + } - if (retryTopicPartitionOffsets.isEmpty()) { - // The server should send back a response for every topic partition. But do a sanity check anyway. - for (ListOffsetsTopic topic : partitionsToQuery) { - for (ListOffsetsPartition partition : topic.partitions()) { - TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); - ApiException error = new ApiException("The response from broker " + brokerId + + if (retryTopicPartitionOffsets.isEmpty()) { + // The server should send back a response for every topic partition. But do a sanity check anyway. + for (ListOffsetsTopic topic : partitionsToQuery) { + for (ListOffsetsPartition partition : topic.partitions()) { + TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + // we don't retry MAX_TIMESTAMP requests if the broker doesn't support it so this may be why there was no response. + ApiException error = !supportsMaxTimestamp && partition.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP ? + new UnsupportedVersionException("Broker " + brokerId + " does not support MAX_TIMESTAMP offset spec") : + new ApiException("The response from broker " + brokerId + " did not contain a result for topic partition " + tp); - futures.get(tp).completeExceptionally(error); - } + futures.get(tp).completeExceptionally(error); } - } else { - Set retryTopics = retryTopicPartitionOffsets.keySet().stream().map( - TopicPartition::topic).collect(Collectors.toSet()); - MetadataOperationContext retryContext = - new MetadataOperationContext<>(retryTopics, context.options(), context.deadline(), futures); - rescheduleMetadataTask(retryContext, () -> getListOffsetsCalls(retryContext, retryTopicPartitionOffsets, futures)); } + } else { + Set retryTopics = retryTopicPartitionOffsets.keySet().stream().map( + TopicPartition::topic).collect(Collectors.toSet()); + MetadataOperationContext retryContext = + new MetadataOperationContext<>(retryTopics, context.options(), context.deadline(), futures); + rescheduleMetadataTask(retryContext, () -> getListOffsetsCalls(retryContext, retryTopicPartitionOffsets, futures)); } + } - @Override - void handleFailure(Throwable throwable) { - for (ListOffsetsTopic topic : entry.getValue().values()) { - for (ListOffsetsPartition partition : topic.partitions()) { - TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); - KafkaFutureImpl future = futures.get(tp); - future.completeExceptionally(throwable); - } + @Override + void handleFailure(Throwable throwable) { + for (ListOffsetsTopic topic : entry.getValue().values()) { + for (ListOffsetsPartition partition : topic.partitions()) { + TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + KafkaFutureImpl future = futures.get(tp); + future.completeExceptionally(throwable); } } - }); - } + } + + @Override + boolean handleUnsupportedVersionException(UnsupportedVersionException exception) { + if (supportsMaxTimestamp) { + supportsMaxTimestamp = false; + // check if there are any non MAX_TIMESTAMPS partitions left to be downgraded + return partitionsToQuery.stream().anyMatch( + t -> t.partitions().stream().anyMatch( + p -> p.timestamp() != ListOffsetsRequest.MAX_TIMESTAMP)); + } + return false; + } + }); } return calls; } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 4cad255ba0241..044ccecc64024 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -171,6 +171,7 @@ import org.apache.kafka.common.requests.LeaveGroupResponse; import org.apache.kafka.common.requests.ListGroupsRequest; import org.apache.kafka.common.requests.ListGroupsResponse; +import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.requests.ListOffsetsResponse; import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse; import org.apache.kafka.common.requests.MetadataRequest; @@ -4223,6 +4224,137 @@ public void testListOffsetsNonRetriableErrors() throws Exception { } } + @Test + public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() { + + Node node = new Node(0, "localhost", 8120); + List nodes = Collections.singletonList(node); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})), + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + // listoffsets response from broker 0 + env.kafkaClient().prepareUnsupportedVersionResponse( + request -> request instanceof ListOffsetsRequest); + + ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp())); + + Exception exception = assertThrows(ExecutionException.class, () -> { + Map offsets = result.all().get(3, TimeUnit.SECONDS); + }); + assertTrue(exception.getCause() instanceof UnsupportedVersionException); + } + } + + @Test + public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception { + + Node node = new Node(0, "localhost", 8120); + List nodes = Collections.singletonList(node); + List pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})); + pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node})); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + final TopicPartition tp1 = new TopicPartition("foo", 1); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, + AdminClientConfig.RETRIES_CONFIG, "2")) { + + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + // listoffsets response from broker 0 + env.kafkaClient().prepareUnsupportedVersionResponse( + request -> request instanceof ListOffsetsRequest); + + ListOffsetsResult result = env.adminClient().listOffsets(new HashMap() {{ + put(tp0, OffsetSpec.maxTimestamp()); + put(tp1, OffsetSpec.latest()); + }}); + + ListOffsetsTopicResponse topicResponse = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 345L, 543); + ListOffsetsResponseData responseData = new ListOffsetsResponseData() + .setThrottleTimeMs(0) + .setTopics(Arrays.asList(topicResponse)); + env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node); + + Exception exception = assertThrows(ExecutionException.class, () -> { + result.partitionResult(tp0).get(); + }); + assertTrue(exception.getCause() instanceof UnsupportedVersionException); + + ListOffsetsResultInfo tp1Offset = result.partitionResult(tp1).get(); + assertEquals(345L, tp1Offset.offset()); + assertEquals(543, tp1Offset.leaderEpoch().get().intValue()); + assertEquals(-1L, tp1Offset.timestamp()); + } + } + + @Test + public void testListOffsetsMaxTimestampAndNoBrokerResponse() { + Node node = new Node(0, "localhost", 8120); + List nodes = Collections.singletonList(node); + List pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})); + pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node})); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + final TopicPartition tp1 = new TopicPartition("foo", 1); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, + AdminClientConfig.RETRIES_CONFIG, "2")) { + + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + // listoffsets response from broker 0 + env.kafkaClient().prepareUnsupportedVersionResponse( + request -> request instanceof ListOffsetsRequest); + + ListOffsetsResult result = env.adminClient().listOffsets(new HashMap() {{ + put(tp0, OffsetSpec.maxTimestamp()); + put(tp1, OffsetSpec.latest()); + }}); + + ListOffsetsResponseData responseData = new ListOffsetsResponseData() + .setThrottleTimeMs(0) + .setTopics(new ArrayList<>()); + env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node); + + Exception maxTimestampException = assertThrows(ExecutionException.class, () -> { + result.partitionResult(tp0).get(); + }); + assertTrue(maxTimestampException.getCause() instanceof UnsupportedVersionException); + + Exception nopResponseException = assertThrows(ExecutionException.class, () -> { + result.partitionResult(tp1).get(); + }); + assertTrue(nopResponseException.getCause() instanceof ApiException); + } + } + private Map makeTestFeatureUpdates() { return Utils.mkMap( Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, false)), diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index fc709dc584327..d9e8e577f72c9 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -302,10 +302,16 @@ public void testSerialization() throws Exception { checkErrorResponse(createDeleteGroupsRequest(), unknownServerException, true); checkResponse(createDeleteGroupsResponse(), 0, true); for (short version : LIST_OFFSETS.allVersions()) { - checkRequest(createListOffsetRequest(version), true); - checkErrorResponse(createListOffsetRequest(version), unknownServerException, true); + checkRequest(createListOffsetRequest(version, 1000000L), true); + checkErrorResponse(createListOffsetRequest(version, 1000000L), unknownServerException, true); checkResponse(createListOffsetResponse(version), version, true); } + LIST_OFFSETS.allVersions().stream().filter(version -> version >= (short) 7).forEach( + version -> { + checkRequest(createListOffsetRequest(version, ListOffsetsRequest.MAX_TIMESTAMP), true); + checkErrorResponse(createListOffsetRequest(version, ListOffsetsRequest.MAX_TIMESTAMP), unknownServerException, true); + } + ); checkRequest(MetadataRequest.Builder.allTopics().build((short) 2), true); checkRequest(createMetadataRequest(1, Collections.singletonList("topic1")), true); checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), unknownServerException, true); @@ -432,8 +438,8 @@ public void testSerialization() throws Exception { checkRequest(createUpdateMetadataRequest(5, null), false); checkErrorResponse(createUpdateMetadataRequest(5, "rack1"), unknownServerException, true); checkResponse(createUpdateMetadataResponse(), 0, true); - checkRequest(createListOffsetRequest(0), true); - checkErrorResponse(createListOffsetRequest(0), unknownServerException, true); + checkRequest(createListOffsetRequest(0, 1000000L), true); + checkErrorResponse(createListOffsetRequest(0, 1000000L), unknownServerException, true); checkResponse(createListOffsetResponse(0), 0, true); checkRequest(createLeaderEpochRequestForReplica(0, 1), true); checkRequest(createLeaderEpochRequestForConsumer(), true); @@ -1444,13 +1450,13 @@ private DeleteGroupsResponse createDeleteGroupsResponse() { ); } - private ListOffsetsRequest createListOffsetRequest(int version) { + private ListOffsetsRequest createListOffsetRequest(int version, long timestamp) { if (version == 0) { ListOffsetsTopic topic = new ListOffsetsTopic() .setName("test") .setPartitions(Arrays.asList(new ListOffsetsPartition() .setPartitionIndex(0) - .setTimestamp(1000000L) + .setTimestamp(timestamp) .setMaxNumOffsets(10) .setCurrentLeaderEpoch(5))); return ListOffsetsRequest.Builder @@ -1462,16 +1468,16 @@ private ListOffsetsRequest createListOffsetRequest(int version) { .setName("test") .setPartitions(Arrays.asList(new ListOffsetsPartition() .setPartitionIndex(0) - .setTimestamp(1000000L) + .setTimestamp(timestamp) .setCurrentLeaderEpoch(5))); return ListOffsetsRequest.Builder .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) .setTargetTimes(Collections.singletonList(topic)) .build((short) version); - } else if (version >= 2 && version <= LIST_OFFSETS.latestVersion()) { + } else if (version >= 2 && version <= 6) { ListOffsetsPartition partition = new ListOffsetsPartition() .setPartitionIndex(0) - .setTimestamp(1000000L) + .setTimestamp(timestamp) .setCurrentLeaderEpoch(5); ListOffsetsTopic topic = new ListOffsetsTopic() @@ -1481,6 +1487,26 @@ private ListOffsetsRequest createListOffsetRequest(int version) { .forConsumer(true, IsolationLevel.READ_COMMITTED) .setTargetTimes(Collections.singletonList(topic)) .build((short) version); + } else if (version >= 7 && version <= LIST_OFFSETS.latestVersion()) { + ListOffsetsPartition partition = new ListOffsetsPartition() + .setPartitionIndex(0) + .setTimestamp(timestamp) + .setCurrentLeaderEpoch(5); + + ListOffsetsTopic topic = new ListOffsetsTopic() + .setName("test") + .setPartitions(Arrays.asList(partition)); + if (timestamp == ListOffsetsRequest.MAX_TIMESTAMP) { + return ListOffsetsRequest.Builder + .forMaxTimestamp(IsolationLevel.READ_COMMITTED) + .setTargetTimes(Collections.singletonList(topic)) + .build((short) version); + } else { + return ListOffsetsRequest.Builder + .forConsumer(true, IsolationLevel.READ_COMMITTED) + .setTargetTimes(Collections.singletonList(topic)) + .build((short) version); + } } else { throw new IllegalArgumentException("Illegal ListOffsetRequest version " + version); } diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala index 2bac9347d4472..c95920da3adab 100644 --- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala @@ -38,7 +38,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @BeforeEach override def setUp(): Unit = { super.setUp() - createTopic(topicName,1,1.asInstanceOf[Short]) + createTopic(topicName, 1, 1.asInstanceOf[Short]) produceMessages() adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList @@ -54,19 +54,19 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @Test def testEarliestOffset(): Unit = { val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) - assertEquals(0,earliestOffset.offset()) + assertEquals(0, earliestOffset.offset()) } @Test def testLatestOffset(): Unit = { val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest()) - assertEquals(3,latestOffset.offset()) + assertEquals(3, latestOffset.offset()) } @Test def testMaxTimestampOffset(): Unit = { val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp()) - assertEquals(1,maxTimestampOffset.offset()) + assertEquals(1, maxTimestampOffset.offset()) } private def runFetchOffsets(adminClient: Admin, @@ -75,7 +75,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val tp = new TopicPartition(topicName,0) adminClient.listOffsets(Map( tp -> offsetSpec - ).asJava,new ListOffsetsOptions()).all().get().get(tp) + ).asJava, new ListOffsetsOptions()).all().get().get(tp) } def produceMessages(): Unit = { diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 30541165a8289..2b0eea4ccf7e4 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -2072,6 +2072,30 @@ class LogTest { log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)) } + @Test + def testFetchOffsetByTimestampWithMaxTimestampIncludesTimestamp(): Unit = { + val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) + val log = createLog(logDir, logConfig) + + assertEquals(None, log.fetchOffsetByTimestamp(0L)) + + val firstTimestamp = mockTime.milliseconds + val leaderEpoch = 0 + log.appendAsLeader(TestUtils.singletonRecords( + value = TestUtils.randomBytes(10), + timestamp = firstTimestamp), + leaderEpoch = leaderEpoch) + + val secondTimestamp = firstTimestamp + 1 + log.appendAsLeader(TestUtils.singletonRecords( + value = TestUtils.randomBytes(10), + timestamp = secondTimestamp), + leaderEpoch = leaderEpoch) + + assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(leaderEpoch))), + log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)) + } + /** * Test the Log truncate operations */ diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala index 6bbf0a0d46169..e0c9d2bd206ee 100644 --- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala @@ -162,11 +162,15 @@ class ListOffsetsRequestTest extends BaseRequestTest { val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers) val firstLeaderId = partitionToLeader(partition.partition) - TestUtils.generateAndProduceMessages(servers, topic, 10) + // produce in 2 batches to ensure the max timestamp matches the last message + TestUtils.generateAndProduceMessages(servers, topic, 9) + Thread.sleep(10) + TestUtils.generateAndProduceMessages(servers, topic, 1) assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, -1)) assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1)) assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1)) + assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1)) // Kill the first leader so that we can verify the epoch change when fetching the latest offset killBroker(firstLeaderId) @@ -185,6 +189,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { // The latest offset reflects the updated epoch assertEquals((10L, secondLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1)) + assertEquals((9L, secondLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1)) } @Test @@ -192,7 +197,10 @@ class ListOffsetsRequestTest extends BaseRequestTest { val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers) val firstLeaderId = partitionToLeader(partition.partition) - TestUtils.generateAndProduceMessages(servers, topic, 10) + // produce in 2 batches to ensure the max timestamp matches the last message + TestUtils.generateAndProduceMessages(servers, topic, 9) + Thread.sleep(10) + TestUtils.generateAndProduceMessages(servers, topic, 1) for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to ApiKeys.LIST_OFFSETS.latestVersion) { if (version == 0) { @@ -203,10 +211,15 @@ class ListOffsetsRequestTest extends BaseRequestTest { assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort)) assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort)) assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort)) - } else if (version >= 4) { + } else if (version >= 4 && version <= 6) { assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort)) assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort)) assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort)) + } else if (version >= 7) { + assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort)) + assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort)) + assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort)) + assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, version.toShort)) } } } diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index bf9ad3e435504..7ff5ccaf24dff 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -92,6 +92,34 @@ class LogOffsetTest extends BaseRequestTest { assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), consumerOffsets) } + @Test + def testFetchOffsetForMaxTimestampAfterTruncate(): Unit = { + val topic = "kafka-" + val topicPartition = new TopicPartition(topic, 0) + + createTopic(topic, 1, 1) + + val logManager = server.getLogManager + TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined, + "Log for partition [topic,0] should be created") + val log = logManager.getLog(topicPartition).get + + for (_ <- 0 until 20) + log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0) + log.flush() + + log.updateHighWatermark(log.logEndOffset) + + val firstOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) + assertEquals(19, firstOffset.get.offset) + + log.truncateTo(0) + + val secondOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) + assertEquals(0, secondOffset.get.offset) + + } + @Test def testGetOffsetsBeforeLatestTime(): Unit = { val topic = "kafka-" @@ -149,6 +177,11 @@ class LogOffsetTest extends BaseRequestTest { assertFalse(offsetChanged) } + @Test + def testEmptyLogsGetMaxTimestampOffsets(): Unit = { + fail() + } + @deprecated("legacyFetchOffsetsBefore", since = "") @Test def testGetOffsetsBeforeNow(): Unit = { From ae0b6eda1cc4d68f8340a353fa6b5b196f1da321 Mon Sep 17 00:00:00 2001 From: thomaskwscott Date: Tue, 8 Jun 2021 14:55:33 +0100 Subject: [PATCH 04/18] KAFKA-12541 added LogOffsetTest tests --- .../unit/kafka/server/LogOffsetTest.scala | 52 ++++++++++++++++--- 1 file changed, 45 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 7ff5ccaf24dff..48ff731e39027 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -93,7 +93,7 @@ class LogOffsetTest extends BaseRequestTest { } @Test - def testFetchOffsetForMaxTimestampAfterTruncate(): Unit = { + def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = { val topic = "kafka-" val topicPartition = new TopicPartition(topic, 0) @@ -104,22 +104,45 @@ class LogOffsetTest extends BaseRequestTest { "Log for partition [topic,0] should be created") val log = logManager.getLog(topicPartition).get - for (_ <- 0 until 20) - log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0) + for (timestamp <- 0 until 20) + log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes(), timestamp = timestamp.toLong), leaderEpoch = 0) log.flush() log.updateHighWatermark(log.logEndOffset) val firstOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) - assertEquals(19, firstOffset.get.offset) + assertEquals(19L, firstOffset.get.offset) log.truncateTo(0) val secondOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) - assertEquals(0, secondOffset.get.offset) + assertEquals(0L, secondOffset.get.offset) } + @Test + def testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(): Unit = { + val topic = "kafka-" + val topicPartition = new TopicPartition(topic, 0) + + createTopic(topic, 1, 1) + + val logManager = server.getLogManager + TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined, + "Log for partition [topic,0] should be created") + val log = logManager.getLog(topicPartition).get + + for (timestamp <- List(0L, 1L, 2L, 3L, 4L, 6L, 5L)) + log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes(), timestamp = timestamp), leaderEpoch = 0) + log.flush() + + log.updateHighWatermark(log.logEndOffset) + + val maxTimestampOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) + assertEquals(7L, log.logEndOffset) + assertEquals(5L, maxTimestampOffset.get.offset) + } + @Test def testGetOffsetsBeforeLatestTime(): Unit = { val topic = "kafka-" @@ -178,8 +201,23 @@ class LogOffsetTest extends BaseRequestTest { } @Test - def testEmptyLogsGetMaxTimestampOffsets(): Unit = { - fail() + def testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(): Unit = { + val topic = "kafka-" + val topicPartition = new TopicPartition(topic, 0) + + createTopic(topic, 1, 1) + + val logManager = server.getLogManager + TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined, + "Log for partition [topic,0] should be created") + val log = logManager.getLog(topicPartition).get + + log.updateHighWatermark(log.logEndOffset) + + val maxTimestampOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) + assertEquals(0L, log.logEndOffset) + assertEquals(0L, maxTimestampOffset.get.offset) + assertEquals(-1L, maxTimestampOffset.get.timestamp) } @deprecated("legacyFetchOffsetsBefore", since = "") From 24be1c79b3de59af2e7287b5871ac3c5cd0876f2 Mon Sep 17 00:00:00 2001 From: thomaskwscott Date: Tue, 8 Jun 2021 18:13:02 +0100 Subject: [PATCH 05/18] KAFKA-12541 tidy up --- .../integration/kafka/admin/ListOffsetsIntegrationTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala index c95920da3adab..35db582861ea3 100644 --- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala @@ -72,7 +72,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { private def runFetchOffsets(adminClient: Admin, offsetSpec: OffsetSpec): ListOffsetsResult.ListOffsetsResultInfo = { println(s"==> listOffsets(${topicName} -> ${offsetSpec}, new ListOffsetsOptions())") - val tp = new TopicPartition(topicName,0) + val tp = new TopicPartition(topicName, 0) adminClient.listOffsets(Map( tp -> offsetSpec ).asJava, new ListOffsetsOptions()).all().get().get(tp) From fb85422ec55f6b7ea0a574109f36b53e907e93dc Mon Sep 17 00:00:00 2001 From: thomaskwscott Date: Fri, 18 Jun 2021 17:36:32 +0100 Subject: [PATCH 06/18] KAFKA-12541 fixes per pr review --- .../kafka/clients/admin/KafkaAdminClient.java | 19 +++++-- .../clients/admin/KafkaAdminClientTest.java | 49 ++++++++-------- .../admin/ListOffsetsIntegrationTest.scala | 1 - .../test/scala/unit/kafka/log/LogTest.scala | 5 ++ .../kafka/server/ListOffsetsRequestTest.scala | 8 +-- .../unit/kafka/server/LogOffsetTest.scala | 56 +++++++------------ .../scala/unit/kafka/utils/TestUtils.scala | 5 +- 7 files changed, 67 insertions(+), 76 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 6cd379969c3b7..059bd9b88608d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4287,10 +4287,7 @@ void handleResponse(AbstractResponse abstractResponse) { for (ListOffsetsTopic topic : partitionsToQuery) { for (ListOffsetsPartition partition : topic.partitions()) { TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); - // we don't retry MAX_TIMESTAMP requests if the broker doesn't support it so this may be why there was no response. - ApiException error = !supportsMaxTimestamp && partition.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP ? - new UnsupportedVersionException("Broker " + brokerId + " does not support MAX_TIMESTAMP offset spec") : - new ApiException("The response from broker " + brokerId + + ApiException error = new ApiException("The response from broker " + brokerId + " did not contain a result for topic partition " + tp); futures.get(tp).completeExceptionally(error); } @@ -4319,6 +4316,20 @@ void handleFailure(Throwable throwable) { boolean handleUnsupportedVersionException(UnsupportedVersionException exception) { if (supportsMaxTimestamp) { supportsMaxTimestamp = false; + + // fail any unsupported futures + partitionsToQuery.stream().forEach( + t -> t.partitions().stream() + .filter(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP) + .forEach( + p -> futures.get(new TopicPartition(t.name(), p.partitionIndex())) + .completeExceptionally( + new UnsupportedVersionException( + "Broker " + brokerId + + " does not support MAX_TIMESTAMP offset spec")) + ) + ); + // check if there are any non MAX_TIMESTAMPS partitions left to be downgraded return partitionsToQuery.stream().anyMatch( t -> t.partitions().stream().anyMatch( diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 068011fa29dd4..83a73ac82abc0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -4033,6 +4033,7 @@ public void testListOffsets() throws Exception { pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new Node[]{node0})); pInfos.add(new PartitionInfo("bar", 0, node0, new Node[]{node0}, new Node[]{node0})); pInfos.add(new PartitionInfo("baz", 0, node0, new Node[]{node0}, new Node[]{node0})); + pInfos.add(new PartitionInfo("qux", 0, node0, new Node[]{node0}, new Node[]{node0})); final Cluster cluster = new Cluster( "mockClusterId", @@ -4045,6 +4046,7 @@ public void testListOffsets() throws Exception { final TopicPartition tp0 = new TopicPartition("foo", 0); final TopicPartition tp1 = new TopicPartition("bar", 0); final TopicPartition tp2 = new TopicPartition("baz", 0); + final TopicPartition tp3 = new TopicPartition("qux", 0); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); @@ -4054,15 +4056,17 @@ public void testListOffsets() throws Exception { ListOffsetsTopicResponse t0 = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp0, Errors.NONE, -1L, 123L, 321); ListOffsetsTopicResponse t1 = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 234L, 432); ListOffsetsTopicResponse t2 = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp2, Errors.NONE, 123456789L, 345L, 543); + ListOffsetsTopicResponse t3 = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp3, Errors.NONE, 234567890L, 456L, 654); ListOffsetsResponseData responseData = new ListOffsetsResponseData() .setThrottleTimeMs(0) - .setTopics(Arrays.asList(t0, t1, t2)); + .setTopics(Arrays.asList(t0, t1, t2, t3)); env.kafkaClient().prepareResponse(new ListOffsetsResponse(responseData)); Map partitions = new HashMap<>(); partitions.put(tp0, OffsetSpec.latest()); partitions.put(tp1, OffsetSpec.earliest()); partitions.put(tp2, OffsetSpec.forTimestamp(System.currentTimeMillis())); + partitions.put(tp3, OffsetSpec.maxTimestamp()); ListOffsetsResult result = env.adminClient().listOffsets(partitions); Map offsets = result.all().get(); @@ -4076,9 +4080,13 @@ public void testListOffsets() throws Exception { assertEquals(345L, offsets.get(tp2).offset()); assertEquals(543, offsets.get(tp2).leaderEpoch().get().intValue()); assertEquals(123456789L, offsets.get(tp2).timestamp()); + assertEquals(456L, offsets.get(tp3).offset()); + assertEquals(654, offsets.get(tp3).leaderEpoch().get().intValue()); + assertEquals(234567890L, offsets.get(tp3).timestamp()); assertEquals(offsets.get(tp0), result.partitionResult(tp0).get()); assertEquals(offsets.get(tp1), result.partitionResult(tp1).get()); assertEquals(offsets.get(tp2), result.partitionResult(tp2).get()); + assertEquals(offsets.get(tp3), result.partitionResult(tp3).get()); try { result.partitionResult(new TopicPartition("unknown", 0)).get(); fail("should have thrown IllegalArgumentException"); @@ -4251,10 +4259,7 @@ public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() { ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp())); - Exception exception = assertThrows(ExecutionException.class, () -> { - Map offsets = result.all().get(3, TimeUnit.SECONDS); - }); - assertTrue(exception.getCause() instanceof UnsupportedVersionException); + TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class); } } @@ -4286,21 +4291,18 @@ public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Ex env.kafkaClient().prepareUnsupportedVersionResponse( request -> request instanceof ListOffsetsRequest); - ListOffsetsResult result = env.adminClient().listOffsets(new HashMap() {{ - put(tp0, OffsetSpec.maxTimestamp()); - put(tp1, OffsetSpec.latest()); - }}); - ListOffsetsTopicResponse topicResponse = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 345L, 543); ListOffsetsResponseData responseData = new ListOffsetsResponseData() .setThrottleTimeMs(0) .setTopics(Arrays.asList(topicResponse)); env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node); - Exception exception = assertThrows(ExecutionException.class, () -> { - result.partitionResult(tp0).get(); - }); - assertTrue(exception.getCause() instanceof UnsupportedVersionException); + ListOffsetsResult result = env.adminClient().listOffsets(new HashMap() {{ + put(tp0, OffsetSpec.maxTimestamp()); + put(tp1, OffsetSpec.latest()); + }}); + + TestUtils.assertFutureThrows(result.partitionResult(tp0), UnsupportedVersionException.class); ListOffsetsResultInfo tp1Offset = result.partitionResult(tp1).get(); assertEquals(345L, tp1Offset.offset()); @@ -4336,25 +4338,18 @@ public void testListOffsetsMaxTimestampAndNoBrokerResponse() { env.kafkaClient().prepareUnsupportedVersionResponse( request -> request instanceof ListOffsetsRequest); - ListOffsetsResult result = env.adminClient().listOffsets(new HashMap() {{ - put(tp0, OffsetSpec.maxTimestamp()); - put(tp1, OffsetSpec.latest()); - }}); - ListOffsetsResponseData responseData = new ListOffsetsResponseData() .setThrottleTimeMs(0) .setTopics(new ArrayList<>()); env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node); - Exception maxTimestampException = assertThrows(ExecutionException.class, () -> { - result.partitionResult(tp0).get(); - }); - assertTrue(maxTimestampException.getCause() instanceof UnsupportedVersionException); + ListOffsetsResult result = env.adminClient().listOffsets(new HashMap() {{ + put(tp0, OffsetSpec.maxTimestamp()); + put(tp1, OffsetSpec.latest()); + }}); - Exception nopResponseException = assertThrows(ExecutionException.class, () -> { - result.partitionResult(tp1).get(); - }); - assertTrue(nopResponseException.getCause() instanceof ApiException); + TestUtils.assertFutureThrows(result.partitionResult(tp0), UnsupportedVersionException.class); + TestUtils.assertFutureThrows(result.partitionResult(tp1), ApiException.class); } } diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala index 35db582861ea3..83a78624f358e 100644 --- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala @@ -71,7 +71,6 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { private def runFetchOffsets(adminClient: Admin, offsetSpec: OffsetSpec): ListOffsetsResult.ListOffsetsResultInfo = { - println(s"==> listOffsets(${topicName} -> ${offsetSpec}, new ListOffsetsOptions())") val tp = new TopicPartition(topicName, 0) adminClient.listOffsets(Map( tp -> offsetSpec diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 4bc087c1b9fae..25a966789b998 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -2092,6 +2092,11 @@ class LogTest { timestamp = secondTimestamp), leaderEpoch = leaderEpoch) + log.appendAsLeader(TestUtils.singletonRecords( + value = TestUtils.randomBytes(10), + timestamp = firstTimestamp), + leaderEpoch = leaderEpoch) + assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(leaderEpoch))), log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)) } diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala index e0c9d2bd206ee..e47c62dcc59a6 100644 --- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala @@ -162,10 +162,8 @@ class ListOffsetsRequestTest extends BaseRequestTest { val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers) val firstLeaderId = partitionToLeader(partition.partition) - // produce in 2 batches to ensure the max timestamp matches the last message TestUtils.generateAndProduceMessages(servers, topic, 9) - Thread.sleep(10) - TestUtils.generateAndProduceMessages(servers, topic, 1) + TestUtils.produceMessage(servers, topic, "test-10", System.currentTimeMillis() + 10L) assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, -1)) assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1)) @@ -197,10 +195,8 @@ class ListOffsetsRequestTest extends BaseRequestTest { val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers) val firstLeaderId = partitionToLeader(partition.partition) - // produce in 2 batches to ensure the max timestamp matches the last message TestUtils.generateAndProduceMessages(servers, topic, 9) - Thread.sleep(10) - TestUtils.generateAndProduceMessages(servers, topic, 1) + TestUtils.produceMessage(servers, topic, "test-10", System.currentTimeMillis() + 10L) for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to ApiKeys.LIST_OFFSETS.latestVersion) { if (version == 0) { diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 48ff731e39027..ebbd1a87e7ef2 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -65,13 +65,7 @@ class LogOffsetTest extends BaseRequestTest { def testGetOffsetsAfterDeleteRecords(): Unit = { val topic = "kafka-" val topicPartition = new TopicPartition(topic, 0) - - createTopic(topic, 1, 1) - - val logManager = server.getLogManager - TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined, - "Log for partition [topic,0] should be created") - val log = logManager.getLog(topicPartition).get + val log = createTopicAndGetLog(topic, topicPartition) for (_ <- 0 until 20) log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0) @@ -96,13 +90,7 @@ class LogOffsetTest extends BaseRequestTest { def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = { val topic = "kafka-" val topicPartition = new TopicPartition(topic, 0) - - createTopic(topic, 1, 1) - - val logManager = server.getLogManager - TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined, - "Log for partition [topic,0] should be created") - val log = logManager.getLog(topicPartition).get + val log = createTopicAndGetLog(topic, topicPartition) for (timestamp <- 0 until 20) log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes(), timestamp = timestamp.toLong), leaderEpoch = 0) @@ -112,11 +100,13 @@ class LogOffsetTest extends BaseRequestTest { val firstOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) assertEquals(19L, firstOffset.get.offset) + assertEquals(19L, firstOffset.get.timestamp) log.truncateTo(0) val secondOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) assertEquals(0L, secondOffset.get.offset) + assertEquals(-1L, secondOffset.get.timestamp) } @@ -124,13 +114,7 @@ class LogOffsetTest extends BaseRequestTest { def testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(): Unit = { val topic = "kafka-" val topicPartition = new TopicPartition(topic, 0) - - createTopic(topic, 1, 1) - - val logManager = server.getLogManager - TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined, - "Log for partition [topic,0] should be created") - val log = logManager.getLog(topicPartition).get + val log = createTopicAndGetLog(topic, topicPartition) for (timestamp <- List(0L, 1L, 2L, 3L, 4L, 6L, 5L)) log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes(), timestamp = timestamp), leaderEpoch = 0) @@ -141,19 +125,14 @@ class LogOffsetTest extends BaseRequestTest { val maxTimestampOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) assertEquals(7L, log.logEndOffset) assertEquals(5L, maxTimestampOffset.get.offset) + assertEquals(6L, maxTimestampOffset.get.timestamp) } @Test def testGetOffsetsBeforeLatestTime(): Unit = { val topic = "kafka-" val topicPartition = new TopicPartition(topic, 0) - - createTopic(topic, 1, 1) - - val logManager = server.getLogManager - TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined, - s"Log for partition $topicPartition should be created") - val log = logManager.getLog(topicPartition).get + val log = createTopicAndGetLog(topic, topicPartition) for (_ <- 0 until 20) log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0) @@ -162,7 +141,7 @@ class LogOffsetTest extends BaseRequestTest { val offsets = log.legacyFetchOffsetsBefore(ListOffsetsRequest.LATEST_TIMESTAMP, 15) assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), offsets) - TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server), + TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, 0, server), "Leader should be elected") val request = ListOffsetsRequest.Builder.forReplica(0, 0) .setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.LATEST_TIMESTAMP, 15).asJava).build() @@ -204,13 +183,7 @@ class LogOffsetTest extends BaseRequestTest { def testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(): Unit = { val topic = "kafka-" val topicPartition = new TopicPartition(topic, 0) - - createTopic(topic, 1, 1) - - val logManager = server.getLogManager - TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined, - "Log for partition [topic,0] should be created") - val log = logManager.getLog(topicPartition).get + val log = createTopicAndGetLog(topic, topicPartition) log.updateHighWatermark(log.logEndOffset) @@ -218,6 +191,7 @@ class LogOffsetTest extends BaseRequestTest { assertEquals(0L, log.logEndOffset) assertEquals(0L, maxTimestampOffset.get.offset) assertEquals(-1L, maxTimestampOffset.get.timestamp) + } @deprecated("legacyFetchOffsetsBefore", since = "") @@ -337,4 +311,14 @@ class LogOffsetTest extends BaseRequestTest { .partitions.asScala.find(_.partitionIndex == tp.partition).get } + private def createTopicAndGetLog(topic: String, topicPartition: TopicPartition): Log = { + + createTopic(topic, 1, 1) + + val logManager = server.getLogManager + TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined, + "Log for partition [topic,0] should be created") + logManager.getLog(topicPartition).get + } + } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 53bc88e249874..0802d87112f54 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1207,17 +1207,18 @@ object TestUtils extends Logging { values } - def produceMessage(servers: Seq[KafkaServer], topic: String, message: String, + def produceMessage(servers: Seq[KafkaServer], topic: String, message: String, timestamp: java.lang.Long = null, deliveryTimeoutMs: Int = 30 * 1000, requestTimeoutMs: Int = 20 * 1000): Unit = { val producer = createProducer(TestUtils.getBrokerListStrFromServers(servers), deliveryTimeoutMs = deliveryTimeoutMs, requestTimeoutMs = requestTimeoutMs) try { - producer.send(new ProducerRecord(topic, topic.getBytes, message.getBytes)).get + producer.send(new ProducerRecord(topic, null, timestamp, topic.getBytes, message.getBytes)).get } finally { producer.close() } } + def verifyTopicDeletion(zkClient: KafkaZkClient, topic: String, numPartitions: Int, servers: Seq[KafkaServer]): Unit = { val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)) // wait until admin path for delete topic is deleted, signaling completion of topic deletion From 32e6b2b6073cad41ca51a0db341e1f8a7678a6e1 Mon Sep 17 00:00:00 2001 From: thomaskwscott Date: Mon, 21 Jun 2021 09:59:34 +0100 Subject: [PATCH 07/18] KAFKA-12541 adminclient simplification --- .../kafka/clients/admin/KafkaAdminClient.java | 20 +++---------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 059bd9b88608d..6e35d840db19d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4236,23 +4236,14 @@ private List getListOffsetsCalls(MetadataOperationContext remainingPartitionsToQuery = new ArrayList<>(); - partitionsToQuery.stream().forEach(t -> - remainingPartitionsToQuery.add(new ListOffsetsTopic() - .setName(t.name()) - .setPartitions(t.partitions().stream() - .filter(p -> p.timestamp() != ListOffsetsRequest.MAX_TIMESTAMP) - .collect(Collectors.toList())) - )); + }else { return ListOffsetsRequest.Builder .forConsumer(true, context.options().isolationLevel()) - .setTargetTimes(remainingPartitionsToQuery); + .setTargetTimes(partitionsToQuery); } } @@ -4329,11 +4320,6 @@ boolean handleUnsupportedVersionException(UnsupportedVersionException exception) + " does not support MAX_TIMESTAMP offset spec")) ) ); - - // check if there are any non MAX_TIMESTAMPS partitions left to be downgraded - return partitionsToQuery.stream().anyMatch( - t -> t.partitions().stream().anyMatch( - p -> p.timestamp() != ListOffsetsRequest.MAX_TIMESTAMP)); } return false; } From a0e9c10ebe9330eb84318ae481581e9fdd34f205 Mon Sep 17 00:00:00 2001 From: thomaskwscott Date: Mon, 21 Jun 2021 16:44:34 +0100 Subject: [PATCH 08/18] KAFKA-12541 refactor of listoffsets retry per pr review comments --- .../kafka/clients/admin/KafkaAdminClient.java | 11 +++-------- .../kafka/clients/consumer/internals/Fetcher.java | 2 +- .../kafka/common/requests/ListOffsetsRequest.java | 10 ++++------ .../common/requests/ListOffsetsRequestTest.java | 4 ++-- .../kafka/common/requests/RequestResponseTest.java | 14 +++++++------- .../kafka/api/AuthorizerIntegrationTest.scala | 2 +- .../scala/unit/kafka/server/KafkaApisTest.scala | 4 ++-- .../unit/kafka/server/ListOffsetsRequestTest.scala | 6 +++--- .../scala/unit/kafka/server/LogOffsetTest.scala | 2 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +- .../jmh/common/ListOffsetRequestBenchmark.java | 2 +- 11 files changed, 26 insertions(+), 33 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 6e35d840db19d..b1649ab03e3e5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4236,15 +4236,9 @@ private List getListOffsetsCalls(MetadataOperationContext sendListOffsetRequest(final Node node, final Map timestampsToSearch, boolean requireTimestamp) { ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(requireTimestamp, isolationLevel) + .forConsumer(requireTimestamp, isolationLevel, false) .setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(timestampsToSearch)); log.debug("Sending ListOffsetRequest {} to broker {}", builder, node); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java index 5036746622dfc..6b7734aca406c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java @@ -55,19 +55,17 @@ public static Builder forReplica(short allowedVersion, int replicaId) { return new Builder((short) 0, allowedVersion, replicaId, IsolationLevel.READ_UNCOMMITTED); } - public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel) { + public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel, boolean requireMaxTimestamp) { short minVersion = 0; - if (isolationLevel == IsolationLevel.READ_COMMITTED) + if (requireMaxTimestamp) + minVersion = 7; + else if (isolationLevel == IsolationLevel.READ_COMMITTED) minVersion = 2; else if (requireTimestamp) minVersion = 1; return new Builder(minVersion, ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel); } - public static Builder forMaxTimestamp(IsolationLevel isolationLevel) { - return new Builder((short) 7, ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel); - } - private Builder(short oldestAllowedVersion, short latestAllowedVersion, int replicaId, diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java index a7c83d6f73361..83c4b101d8969 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java @@ -67,7 +67,7 @@ public void testGetErrorResponse() { new ListOffsetsPartition() .setPartitionIndex(0)))); ListOffsetsRequest request = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_COMMITTED) + .forConsumer(true, IsolationLevel.READ_COMMITTED, false) .setTargetTimes(topics) .build(version); ListOffsetsResponse response = (ListOffsetsResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception()); @@ -100,7 +100,7 @@ public void testGetErrorResponseV0() { new ListOffsetsPartition() .setPartitionIndex(0)))); ListOffsetsRequest request = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(topics) .build((short) 0); ListOffsetsResponse response = (ListOffsetsResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception()); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index d9e8e577f72c9..1e630f55d8360 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -1460,7 +1460,7 @@ private ListOffsetsRequest createListOffsetRequest(int version, long timestamp) .setMaxNumOffsets(10) .setCurrentLeaderEpoch(5))); return ListOffsetsRequest.Builder - .forConsumer(false, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(Collections.singletonList(topic)) .build((short) version); } else if (version == 1) { @@ -1471,7 +1471,7 @@ private ListOffsetsRequest createListOffsetRequest(int version, long timestamp) .setTimestamp(timestamp) .setCurrentLeaderEpoch(5))); return ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(Collections.singletonList(topic)) .build((short) version); } else if (version >= 2 && version <= 6) { @@ -1484,7 +1484,7 @@ private ListOffsetsRequest createListOffsetRequest(int version, long timestamp) .setName("test") .setPartitions(Arrays.asList(partition)); return ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_COMMITTED) + .forConsumer(true, IsolationLevel.READ_COMMITTED, false) .setTargetTimes(Collections.singletonList(topic)) .build((short) version); } else if (version >= 7 && version <= LIST_OFFSETS.latestVersion()) { @@ -1498,12 +1498,12 @@ private ListOffsetsRequest createListOffsetRequest(int version, long timestamp) .setPartitions(Arrays.asList(partition)); if (timestamp == ListOffsetsRequest.MAX_TIMESTAMP) { return ListOffsetsRequest.Builder - .forMaxTimestamp(IsolationLevel.READ_COMMITTED) - .setTargetTimes(Collections.singletonList(topic)) - .build((short) version); + .forConsumer(true, IsolationLevel.READ_COMMITTED, false) + .setTargetTimes(Collections.singletonList(topic)) + .build((short) version); } else { return ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_COMMITTED) + .forConsumer(true, IsolationLevel.READ_COMMITTED, false) .setTargetTimes(Collections.singletonList(topic)) .build((short) version); } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index c730c476249ca..0204b8917b048 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -354,7 +354,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } private def createListOffsetsRequest = { - requests.ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED).setTargetTimes( + requests.ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false).setTargetTimes( List(new ListOffsetsTopic() .setName(tp.topic) .setPartitions(List(new ListOffsetsPartition() diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 9a0bdb04211ca..8c1b06c416d98 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -2000,7 +2000,7 @@ class KafkaApisTest { .setPartitionIndex(tp.partition) .setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP) .setCurrentLeaderEpoch(currentLeaderEpoch.get)).asJava)).asJava - val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel) + val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false) .setTargetTimes(targetTimes).build() val request = buildRequest(listOffsetRequest) val capturedResponse = expectNoThrottling(request) @@ -3112,7 +3112,7 @@ class KafkaApisTest { .setPartitions(List(new ListOffsetsPartition() .setPartitionIndex(tp.partition) .setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava - val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel) + val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false) .setTargetTimes(targetTimes).build() val request = buildRequest(listOffsetRequest) val capturedResponse = expectNoThrottling(request) diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala index e47c62dcc59a6..961e966c09a9d 100644 --- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala @@ -43,7 +43,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { .setCurrentLeaderEpoch(0)).asJava)).asJava val consumerRequest = ListOffsetsRequest.Builder - .forConsumer(false, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(targetTimes) .build() @@ -90,7 +90,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { .setName(topic) .setPartitions(List(listOffsetPartition).asJava)).asJava val request = ListOffsetsRequest.Builder - .forConsumer(false, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(targetTimes) .build() assertResponseError(error, brokerId, request) @@ -133,7 +133,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { .setTimestamp(timestamp)).asJava)).asJava val builder = ListOffsetsRequest.Builder - .forConsumer(false, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(targetTimes) val request = if (version == -1) builder.build() else builder.build(version) diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index ebbd1a87e7ef2..59c001b6b7f03 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -54,7 +54,7 @@ class LogOffsetTest extends BaseRequestTest { @Test def testGetOffsetsForUnknownTopic(): Unit = { val topicPartition = new TopicPartition("foo", 0) - val request = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED) + val request = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.LATEST_TIMESTAMP, 10).asJava).build(0) val response = sendListOffsetsRequest(request) assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, findPartition(response.topics.asScala, topicPartition).errorCode) diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 757b82c13dbe8..1f04f08e8d982 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -240,7 +240,7 @@ class RequestQuotaTest extends BaseRequestTest { .setPartitionIndex(tp.partition) .setTimestamp(0L) .setCurrentLeaderEpoch(15)).asJava) - ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED) + ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(List(topic).asJava) case ApiKeys.LEADER_AND_ISR => diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java index 326916e5f14ac..e6fc2dc141c17 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java @@ -69,7 +69,7 @@ public void setup() { } } - this.offsetRequest = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED) + this.offsetRequest = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) .build(ApiKeys.LIST_OFFSETS.latestVersion()); } From 32241da50403ed5a484ad1524cd0ac453ab1ea64 Mon Sep 17 00:00:00 2001 From: thomaskwscott Date: Mon, 21 Jun 2021 16:49:24 +0100 Subject: [PATCH 09/18] KAFKA-12541 fixes per pr review --- .../integration/kafka/admin/ListOffsetsIntegrationTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala index 83a78624f358e..c937030521203 100644 --- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala @@ -38,7 +38,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @BeforeEach override def setUp(): Unit = { super.setUp() - createTopic(topicName, 1, 1.asInstanceOf[Short]) + createTopic(topicName, 1, 1.toShort) produceMessages() adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList From ea3cd6571450371ee4af6c0482ae3f604aff1f23 Mon Sep 17 00:00:00 2001 From: thomaskwscott Date: Tue, 22 Jun 2021 13:21:42 +0100 Subject: [PATCH 10/18] KAFKA-12541 stopped retries for partitions that cannot use max_timestamp --- .../kafka/clients/admin/KafkaAdminClient.java | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index b1649ab03e3e5..76b38a8727aaf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4302,19 +4302,30 @@ boolean handleUnsupportedVersionException(UnsupportedVersionException exception) if (supportsMaxTimestamp) { supportsMaxTimestamp = false; - // fail any unsupported futures + // fail any unsupported futures and remove partitions from the downgraded retry + List topicsToRemove = new ArrayList<>(); partitionsToQuery.stream().forEach( - t -> t.partitions().stream() - .filter(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP) - .forEach( - p -> futures.get(new TopicPartition(t.name(), p.partitionIndex())) - .completeExceptionally( - new UnsupportedVersionException( - "Broker " + brokerId - + " does not support MAX_TIMESTAMP offset spec")) - ) + t -> { + List partitionsToRemove = new ArrayList<>(); + t.partitions().stream() + .filter(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP) + .forEach( + p -> { + futures.get(new TopicPartition(t.name(), p.partitionIndex())) + .completeExceptionally( + new UnsupportedVersionException( + "Broker " + brokerId + + " does not support MAX_TIMESTAMP offset spec")); + partitionsToRemove.add(p); + + }); + t.partitions().removeAll(partitionsToRemove); + if (t.partitions().isEmpty()) topicsToRemove.add(t); + } ); - return true; + partitionsToQuery.removeAll(topicsToRemove); + + return !partitionsToQuery.isEmpty(); } return false; } From 7355f61ac2b34746a6810f29ea248698d02e8b7c Mon Sep 17 00:00:00 2001 From: thomaskwscott Date: Wed, 23 Jun 2021 12:36:04 +0100 Subject: [PATCH 11/18] KAFKA-12541 roll back changes to RequestResponseTest --- .../common/requests/RequestResponseTest.java | 50 +++++-------------- 1 file changed, 12 insertions(+), 38 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 1e630f55d8360..fc709dc584327 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -302,16 +302,10 @@ public void testSerialization() throws Exception { checkErrorResponse(createDeleteGroupsRequest(), unknownServerException, true); checkResponse(createDeleteGroupsResponse(), 0, true); for (short version : LIST_OFFSETS.allVersions()) { - checkRequest(createListOffsetRequest(version, 1000000L), true); - checkErrorResponse(createListOffsetRequest(version, 1000000L), unknownServerException, true); + checkRequest(createListOffsetRequest(version), true); + checkErrorResponse(createListOffsetRequest(version), unknownServerException, true); checkResponse(createListOffsetResponse(version), version, true); } - LIST_OFFSETS.allVersions().stream().filter(version -> version >= (short) 7).forEach( - version -> { - checkRequest(createListOffsetRequest(version, ListOffsetsRequest.MAX_TIMESTAMP), true); - checkErrorResponse(createListOffsetRequest(version, ListOffsetsRequest.MAX_TIMESTAMP), unknownServerException, true); - } - ); checkRequest(MetadataRequest.Builder.allTopics().build((short) 2), true); checkRequest(createMetadataRequest(1, Collections.singletonList("topic1")), true); checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), unknownServerException, true); @@ -438,8 +432,8 @@ public void testSerialization() throws Exception { checkRequest(createUpdateMetadataRequest(5, null), false); checkErrorResponse(createUpdateMetadataRequest(5, "rack1"), unknownServerException, true); checkResponse(createUpdateMetadataResponse(), 0, true); - checkRequest(createListOffsetRequest(0, 1000000L), true); - checkErrorResponse(createListOffsetRequest(0, 1000000L), unknownServerException, true); + checkRequest(createListOffsetRequest(0), true); + checkErrorResponse(createListOffsetRequest(0), unknownServerException, true); checkResponse(createListOffsetResponse(0), 0, true); checkRequest(createLeaderEpochRequestForReplica(0, 1), true); checkRequest(createLeaderEpochRequestForConsumer(), true); @@ -1450,17 +1444,17 @@ private DeleteGroupsResponse createDeleteGroupsResponse() { ); } - private ListOffsetsRequest createListOffsetRequest(int version, long timestamp) { + private ListOffsetsRequest createListOffsetRequest(int version) { if (version == 0) { ListOffsetsTopic topic = new ListOffsetsTopic() .setName("test") .setPartitions(Arrays.asList(new ListOffsetsPartition() .setPartitionIndex(0) - .setTimestamp(timestamp) + .setTimestamp(1000000L) .setMaxNumOffsets(10) .setCurrentLeaderEpoch(5))); return ListOffsetsRequest.Builder - .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED) .setTargetTimes(Collections.singletonList(topic)) .build((short) version); } else if (version == 1) { @@ -1468,45 +1462,25 @@ private ListOffsetsRequest createListOffsetRequest(int version, long timestamp) .setName("test") .setPartitions(Arrays.asList(new ListOffsetsPartition() .setPartitionIndex(0) - .setTimestamp(timestamp) + .setTimestamp(1000000L) .setCurrentLeaderEpoch(5))); return ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) .setTargetTimes(Collections.singletonList(topic)) .build((short) version); - } else if (version >= 2 && version <= 6) { + } else if (version >= 2 && version <= LIST_OFFSETS.latestVersion()) { ListOffsetsPartition partition = new ListOffsetsPartition() .setPartitionIndex(0) - .setTimestamp(timestamp) + .setTimestamp(1000000L) .setCurrentLeaderEpoch(5); ListOffsetsTopic topic = new ListOffsetsTopic() .setName("test") .setPartitions(Arrays.asList(partition)); return ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_COMMITTED, false) + .forConsumer(true, IsolationLevel.READ_COMMITTED) .setTargetTimes(Collections.singletonList(topic)) .build((short) version); - } else if (version >= 7 && version <= LIST_OFFSETS.latestVersion()) { - ListOffsetsPartition partition = new ListOffsetsPartition() - .setPartitionIndex(0) - .setTimestamp(timestamp) - .setCurrentLeaderEpoch(5); - - ListOffsetsTopic topic = new ListOffsetsTopic() - .setName("test") - .setPartitions(Arrays.asList(partition)); - if (timestamp == ListOffsetsRequest.MAX_TIMESTAMP) { - return ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_COMMITTED, false) - .setTargetTimes(Collections.singletonList(topic)) - .build((short) version); - } else { - return ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_COMMITTED, false) - .setTargetTimes(Collections.singletonList(topic)) - .build((short) version); - } } else { throw new IllegalArgumentException("Illegal ListOffsetRequest version " + version); } From 4f4620de89ea907a284c0a3008a516fc368e1746 Mon Sep 17 00:00:00 2001 From: thomaskwscott Date: Wed, 23 Jun 2021 13:10:17 +0100 Subject: [PATCH 12/18] KAFKA-12541 fixes per pr review --- .../kafka/clients/admin/KafkaAdminClient.java | 43 +++++++------- .../common/message/ListOffsetsRequest.json | 2 +- .../common/message/ListOffsetsResponse.json | 2 +- .../clients/admin/KafkaAdminClientTest.java | 56 ++++--------------- .../common/requests/RequestResponseTest.java | 6 +- .../kafka/server/ListOffsetsRequestTest.scala | 12 ++++ .../unit/kafka/server/LogOffsetTest.scala | 3 - 7 files changed, 50 insertions(+), 74 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 76b38a8727aaf..efa2a2f663ed8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4299,31 +4299,34 @@ void handleFailure(Throwable throwable) { @Override boolean handleUnsupportedVersionException(UnsupportedVersionException exception) { + + // if no max timestamp requests were submitted we should not retry + if (partitionsToQuery.stream() + .flatMap(t -> t.partitions().stream()) + .noneMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP)) + return false; + if (supportsMaxTimestamp) { supportsMaxTimestamp = false; // fail any unsupported futures and remove partitions from the downgraded retry - List topicsToRemove = new ArrayList<>(); - partitionsToQuery.stream().forEach( - t -> { - List partitionsToRemove = new ArrayList<>(); - t.partitions().stream() - .filter(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP) - .forEach( - p -> { - futures.get(new TopicPartition(t.name(), p.partitionIndex())) - .completeExceptionally( - new UnsupportedVersionException( - "Broker " + brokerId - + " does not support MAX_TIMESTAMP offset spec")); - partitionsToRemove.add(p); - - }); - t.partitions().removeAll(partitionsToRemove); - if (t.partitions().isEmpty()) topicsToRemove.add(t); + Iterator topicIterator = partitionsToQuery.iterator(); + while (topicIterator.hasNext()) { + ListOffsetsTopic topic = topicIterator.next(); + Iterator partitionIterator = topic.partitions().iterator(); + while (partitionIterator.hasNext()) { + ListOffsetsPartition partition = partitionIterator.next(); + if (partition.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP) { + futures.get(new TopicPartition(topic.name(), partition.partitionIndex())) + .completeExceptionally(new UnsupportedVersionException( + "Broker " + brokerId + " does not support MAX_TIMESTAMP offset spec")); + partitionIterator.remove(); + } } - ); - partitionsToQuery.removeAll(topicsToRemove); + if (topic.partitions().isEmpty()) { + topicIterator.remove(); + } + } return !partitionsToQuery.isEmpty(); } diff --git a/clients/src/main/resources/common/message/ListOffsetsRequest.json b/clients/src/main/resources/common/message/ListOffsetsRequest.json index 4ee739b286588..93c920ee2fe97 100644 --- a/clients/src/main/resources/common/message/ListOffsetsRequest.json +++ b/clients/src/main/resources/common/message/ListOffsetsRequest.json @@ -31,7 +31,7 @@ // // Version 6 enables flexible versions. // - // Version 7 enables listing offsets by max timestamp. + // Version 7 enables listing offsets by max timestamp (KIP-734). "validVersions": "0-7", "flexibleVersions": "6+", "fields": [ diff --git a/clients/src/main/resources/common/message/ListOffsetsResponse.json b/clients/src/main/resources/common/message/ListOffsetsResponse.json index e20959bdf14b4..6d6be0fdf4f59 100644 --- a/clients/src/main/resources/common/message/ListOffsetsResponse.json +++ b/clients/src/main/resources/common/message/ListOffsetsResponse.json @@ -30,7 +30,7 @@ // // Version 6 enables flexible versions. // - // Version 7 is the same as version 6. + // Version 7 is the same as version 6 (KIP-734). "validVersions": "0-7", "flexibleVersions": "6+", "fields": [ diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 83a73ac82abc0..50dd41e393097 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -4237,7 +4237,6 @@ public void testListOffsetsNonRetriableErrors() throws Exception { @Test public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() { - Node node = new Node(0, "localhost", 8120); List nodes = Collections.singletonList(node); final Cluster cluster = new Cluster( @@ -4265,7 +4264,6 @@ public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() { @Test public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception { - Node node = new Node(0, "localhost", 8120); List nodes = Collections.singletonList(node); List pInfos = new ArrayList<>(); @@ -4289,13 +4287,21 @@ public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Ex // listoffsets response from broker 0 env.kafkaClient().prepareUnsupportedVersionResponse( - request -> request instanceof ListOffsetsRequest); + // ensure that the initial request contains max timestamp requests + request -> request instanceof ListOffsetsRequest && ((ListOffsetsRequest) request).topics().stream() + .flatMap(t -> t.partitions().stream()) + .anyMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP)); ListOffsetsTopicResponse topicResponse = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 345L, 543); ListOffsetsResponseData responseData = new ListOffsetsResponseData() .setThrottleTimeMs(0) .setTopics(Arrays.asList(topicResponse)); - env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node); + env.kafkaClient().prepareResponseFrom( + // ensure that no max timestamp requests are retried + request -> request instanceof ListOffsetsRequest && ((ListOffsetsRequest) request).topics().stream() + .flatMap(t -> t.partitions().stream()) + .noneMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP), + new ListOffsetsResponse(responseData), node); ListOffsetsResult result = env.adminClient().listOffsets(new HashMap() {{ put(tp0, OffsetSpec.maxTimestamp()); @@ -4311,48 +4317,6 @@ public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Ex } } - @Test - public void testListOffsetsMaxTimestampAndNoBrokerResponse() { - Node node = new Node(0, "localhost", 8120); - List nodes = Collections.singletonList(node); - List pInfos = new ArrayList<>(); - pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})); - pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node})); - final Cluster cluster = new Cluster( - "mockClusterId", - nodes, - pInfos, - Collections.emptySet(), - Collections.emptySet(), - node); - final TopicPartition tp0 = new TopicPartition("foo", 0); - final TopicPartition tp1 = new TopicPartition("foo", 1); - - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, - AdminClientConfig.RETRIES_CONFIG, "2")) { - - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); - - // listoffsets response from broker 0 - env.kafkaClient().prepareUnsupportedVersionResponse( - request -> request instanceof ListOffsetsRequest); - - ListOffsetsResponseData responseData = new ListOffsetsResponseData() - .setThrottleTimeMs(0) - .setTopics(new ArrayList<>()); - env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node); - - ListOffsetsResult result = env.adminClient().listOffsets(new HashMap() {{ - put(tp0, OffsetSpec.maxTimestamp()); - put(tp1, OffsetSpec.latest()); - }}); - - TestUtils.assertFutureThrows(result.partitionResult(tp0), UnsupportedVersionException.class); - TestUtils.assertFutureThrows(result.partitionResult(tp1), ApiException.class); - } - } - private Map makeTestFeatureUpdates() { return Utils.mkMap( Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, false)), diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index fc709dc584327..0208c05ab9ccf 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -1454,7 +1454,7 @@ private ListOffsetsRequest createListOffsetRequest(int version) { .setMaxNumOffsets(10) .setCurrentLeaderEpoch(5))); return ListOffsetsRequest.Builder - .forConsumer(false, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(Collections.singletonList(topic)) .build((short) version); } else if (version == 1) { @@ -1465,7 +1465,7 @@ private ListOffsetsRequest createListOffsetRequest(int version) { .setTimestamp(1000000L) .setCurrentLeaderEpoch(5))); return ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(Collections.singletonList(topic)) .build((short) version); } else if (version >= 2 && version <= LIST_OFFSETS.latestVersion()) { @@ -1478,7 +1478,7 @@ private ListOffsetsRequest createListOffsetRequest(int version) { .setName("test") .setPartitions(Arrays.asList(partition)); return ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_COMMITTED) + .forConsumer(true, IsolationLevel.READ_COMMITTED, false) .setTargetTimes(Collections.singletonList(topic)) .build((short) version); } else { diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala index 961e966c09a9d..1988ad6afcae1 100644 --- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala @@ -80,6 +80,18 @@ class ListOffsetsRequestTest extends BaseRequestTest { assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, nonReplica, debugReplicaRequest) } + @Test + def testListOffsetsMaxTimeStampOldestVersion(): Unit = { + val consumerRequestBuilder = ListOffsetsRequest.Builder + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) + + val maxTimestampRequestBuilder = ListOffsetsRequest.Builder + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, true) + + assertEquals(0.toShort, consumerRequestBuilder.oldestAllowedVersion()) + assertEquals(7.toShort, maxTimestampRequestBuilder.oldestAllowedVersion()) + } + def assertResponseErrorForEpoch(error: Errors, brokerId: Int, currentLeaderEpoch: Optional[Integer]): Unit = { val listOffsetPartition = new ListOffsetsPartition() .setPartitionIndex(partition.partition) diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 59c001b6b7f03..b742bfe68722c 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -107,7 +107,6 @@ class LogOffsetTest extends BaseRequestTest { val secondOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) assertEquals(0L, secondOffset.get.offset) assertEquals(-1L, secondOffset.get.timestamp) - } @Test @@ -191,7 +190,6 @@ class LogOffsetTest extends BaseRequestTest { assertEquals(0L, log.logEndOffset) assertEquals(0L, maxTimestampOffset.get.offset) assertEquals(-1L, maxTimestampOffset.get.timestamp) - } @deprecated("legacyFetchOffsetsBefore", since = "") @@ -312,7 +310,6 @@ class LogOffsetTest extends BaseRequestTest { } private def createTopicAndGetLog(topic: String, topicPartition: TopicPartition): Log = { - createTopic(topic, 1, 1) val logManager = server.getLogManager From aada3fcb66dd7acda953fb523bb12cc6d445ef32 Mon Sep 17 00:00:00 2001 From: thomaskwscott Date: Thu, 24 Jun 2021 14:46:13 +0100 Subject: [PATCH 13/18] KAFKA-12541 fixes per pr review --- .../kafka/clients/admin/KafkaAdminClient.java | 12 ++----- .../clients/admin/KafkaAdminClientTest.java | 36 +++++++++++++++++++ .../src/main/scala/kafka/api/ApiVersion.scala | 11 +++++- .../kafka/server/ReplicaFetcherThread.scala | 2 +- 4 files changed, 50 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index efa2a2f663ed8..7a34541afc3f1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4299,17 +4299,11 @@ void handleFailure(Throwable throwable) { @Override boolean handleUnsupportedVersionException(UnsupportedVersionException exception) { - - // if no max timestamp requests were submitted we should not retry - if (partitionsToQuery.stream() - .flatMap(t -> t.partitions().stream()) - .noneMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP)) - return false; - if (supportsMaxTimestamp) { supportsMaxTimestamp = false; // fail any unsupported futures and remove partitions from the downgraded retry + boolean foundMaxTimestampPartition = false; Iterator topicIterator = partitionsToQuery.iterator(); while (topicIterator.hasNext()) { ListOffsetsTopic topic = topicIterator.next(); @@ -4317,6 +4311,7 @@ boolean handleUnsupportedVersionException(UnsupportedVersionException exception) while (partitionIterator.hasNext()) { ListOffsetsPartition partition = partitionIterator.next(); if (partition.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP) { + foundMaxTimestampPartition = true; futures.get(new TopicPartition(topic.name(), partition.partitionIndex())) .completeExceptionally(new UnsupportedVersionException( "Broker " + brokerId + " does not support MAX_TIMESTAMP offset spec")); @@ -4327,8 +4322,7 @@ boolean handleUnsupportedVersionException(UnsupportedVersionException exception) topicIterator.remove(); } } - - return !partitionsToQuery.isEmpty(); + return foundMaxTimestampPartition && !partitionsToQuery.isEmpty(); } return false; } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 50dd41e393097..06831eac8128b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -4317,6 +4317,42 @@ public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Ex } } + @Test + public void testListOffsetsUnsupportedNonMaxTimestamp() { + Node node = new Node(0, "localhost", 8120); + List nodes = Collections.singletonList(node); + List pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, + AdminClientConfig.RETRIES_CONFIG, "2")) { + + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + // listoffsets response from broker 0 + env.kafkaClient().prepareUnsupportedVersionResponse( + // ensure that the initial request doesn't contain max timestamp requests + request -> request instanceof ListOffsetsRequest && ((ListOffsetsRequest) request).topics().stream() + .flatMap(t -> t.partitions().stream()) + .noneMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP)); + + ListOffsetsResult result = env.adminClient().listOffsets(new HashMap() {{ + put(tp0, OffsetSpec.latest()); + }}); + + TestUtils.assertFutureThrows(result.partitionResult(tp0), UnsupportedVersionException.class); + } + } + private Map makeTestFeatureUpdates() { return Utils.mkMap( Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, false)), diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index 724bc9894045a..dc3b1caca042f 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -114,7 +114,9 @@ object ApiVersion { // Introduced topic IDs to LeaderAndIsr and UpdateMetadata requests/responses (KIP-516) KAFKA_2_8_IV1, // Introduce AllocateProducerIds (KIP-730) - KAFKA_3_0_IV0 + KAFKA_3_0_IV0, + // Introduce ListOffsets maxTimestamps (KIP-734) + KAFKA_3_0_IV1 ) // Map keys are the union of the short and full versions @@ -458,6 +460,13 @@ case object KAFKA_3_0_IV0 extends DefaultApiVersion { val id: Int = 33 } +case object KAFKA_3_0_IV1 extends DefaultApiVersion { + val shortVersion: String = "3.0" + val subVersion = "IV1" + val recordVersion = RecordVersion.V2 + val id: Int = 34 +} + object ApiVersionValidator extends Validator { override def ensureValid(name: String, value: Any): Unit = { diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index ae88a22961890..840c5674b0b4f 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -94,7 +94,7 @@ class ReplicaFetcherThread(name: String, // Visible for testing private[server] val listOffsetRequestVersion: Short = - if (brokerConfig.interBrokerProtocolVersion >= KAFKA_3_0_IV0) 7 + if (brokerConfig.interBrokerProtocolVersion >= KAFKA_3_0_IV1) 7 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_8_IV0) 6 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_2_IV1) 5 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV1) 4 From 19f98ab39f040f3072aa2242eedb3f76465fd65f Mon Sep 17 00:00:00 2001 From: thomaskwscott Date: Fri, 25 Jun 2021 10:34:13 +0100 Subject: [PATCH 14/18] KAFKA-12541 fixes per pr review --- .../kafka/clients/admin/KafkaAdminClientTest.java | 15 ++++----------- core/src/main/scala/kafka/api/ApiVersion.scala | 2 +- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 06831eac8128b..4f6cddbfb568c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -4287,10 +4287,7 @@ public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Ex // listoffsets response from broker 0 env.kafkaClient().prepareUnsupportedVersionResponse( - // ensure that the initial request contains max timestamp requests - request -> request instanceof ListOffsetsRequest && ((ListOffsetsRequest) request).topics().stream() - .flatMap(t -> t.partitions().stream()) - .anyMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP)); + request -> request instanceof ListOffsetsRequest); ListOffsetsTopicResponse topicResponse = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 345L, 543); ListOffsetsResponseData responseData = new ListOffsetsResponseData() @@ -4340,14 +4337,10 @@ public void testListOffsetsUnsupportedNonMaxTimestamp() { // listoffsets response from broker 0 env.kafkaClient().prepareUnsupportedVersionResponse( - // ensure that the initial request doesn't contain max timestamp requests - request -> request instanceof ListOffsetsRequest && ((ListOffsetsRequest) request).topics().stream() - .flatMap(t -> t.partitions().stream()) - .noneMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP)); + request -> request instanceof ListOffsetsRequest); - ListOffsetsResult result = env.adminClient().listOffsets(new HashMap() {{ - put(tp0, OffsetSpec.latest()); - }}); + ListOffsetsResult result = env.adminClient().listOffsets( + Collections.singletonMap(tp0, OffsetSpec.latest())); TestUtils.assertFutureThrows(result.partitionResult(tp0), UnsupportedVersionException.class); } diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index dc3b1caca042f..d73b0a2a9ede1 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -115,7 +115,7 @@ object ApiVersion { KAFKA_2_8_IV1, // Introduce AllocateProducerIds (KIP-730) KAFKA_3_0_IV0, - // Introduce ListOffsets maxTimestamps (KIP-734) + // Introduce ListOffsets V7 which supports listing offsets by max timestamp (KIP-734) KAFKA_3_0_IV1 ) From 1e43e02decd3161eb8894aa08d880261b8c0af44 Mon Sep 17 00:00:00 2001 From: thomaskwscott Date: Mon, 28 Jun 2021 18:30:06 +0100 Subject: [PATCH 15/18] KAFKA-13002 fix for immediate downgrade cases for non max timestamp requests --- .../kafka/clients/admin/KafkaAdminClient.java | 8 ++-- .../clients/admin/KafkaAdminClientTest.java | 45 +++++++++++++++++++ 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 7a34541afc3f1..c72df406697a9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4232,7 +4232,9 @@ private List getListOffsetsCalls(MetadataOperationContext partitionsToQuery = new ArrayList<>(entry.getValue().values()); - private boolean supportsMaxTimestamp = true; + private boolean supportsMaxTimestamp = partitionsToQuery.stream() + .flatMap(t -> t.partitions().stream()) + .anyMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP); @Override ListOffsetsRequest.Builder createRequest(int timeoutMs) { @@ -4303,7 +4305,6 @@ boolean handleUnsupportedVersionException(UnsupportedVersionException exception) supportsMaxTimestamp = false; // fail any unsupported futures and remove partitions from the downgraded retry - boolean foundMaxTimestampPartition = false; Iterator topicIterator = partitionsToQuery.iterator(); while (topicIterator.hasNext()) { ListOffsetsTopic topic = topicIterator.next(); @@ -4311,7 +4312,6 @@ boolean handleUnsupportedVersionException(UnsupportedVersionException exception) while (partitionIterator.hasNext()) { ListOffsetsPartition partition = partitionIterator.next(); if (partition.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP) { - foundMaxTimestampPartition = true; futures.get(new TopicPartition(topic.name(), partition.partitionIndex())) .completeExceptionally(new UnsupportedVersionException( "Broker " + brokerId + " does not support MAX_TIMESTAMP offset spec")); @@ -4322,7 +4322,7 @@ boolean handleUnsupportedVersionException(UnsupportedVersionException exception) topicIterator.remove(); } } - return foundMaxTimestampPartition && !partitionsToQuery.isEmpty(); + return !partitionsToQuery.isEmpty(); } return false; } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 4f6cddbfb568c..7aa0f92dc7cfb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -4346,6 +4346,51 @@ public void testListOffsetsUnsupportedNonMaxTimestamp() { } } + @Test + public void testListOffsetsNonMaxTimestampDowngradedImmediately() throws Exception { + Node node = new Node(0, "localhost", 8120); + List nodes = Collections.singletonList(node); + List pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, + AdminClientConfig.RETRIES_CONFIG, "2")) { + + ApiVersion oldBrokerVersion = new ApiVersion(); + oldBrokerVersion.setApiKey(ApiKeys.LIST_OFFSETS.id); + oldBrokerVersion.setMinVersion((short) 0); + oldBrokerVersion.setMaxVersion((short) 6); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create( + Arrays.asList(oldBrokerVersion))); + + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + ListOffsetsTopicResponse t0 = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp0, Errors.NONE, -1L, 123L, 321); + ListOffsetsResponseData responseData = new ListOffsetsResponseData() + .setThrottleTimeMs(0) + .setTopics(Arrays.asList(t0)); + + // listoffsets response from broker 0 + env.kafkaClient().prepareResponse( + request -> request instanceof ListOffsetsRequest, + new ListOffsetsResponse(responseData)); + + ListOffsetsResult result = env.adminClient().listOffsets( + Collections.singletonMap(tp0, OffsetSpec.latest())); + + // trigger the request + result.partitionResult(tp0).get(); + } + } + private Map makeTestFeatureUpdates() { return Utils.mkMap( Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, false)), From 25d5bc4fc53793b0d0ef0818609f54adc2d46d8e Mon Sep 17 00:00:00 2001 From: thomaskwscott Date: Tue, 29 Jun 2021 22:35:28 +0100 Subject: [PATCH 16/18] KAFKA-13002 fixes per pr review --- .../org/apache/kafka/clients/MockClient.java | 10 +++++++-- .../clients/admin/KafkaAdminClientTest.java | 21 ++++++++++--------- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index f6942b7a5294d..aed415794e45e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -237,10 +237,16 @@ public void send(ClientRequest request, long now) { continue; AbstractRequest.Builder builder = request.requestBuilder(); - short version = nodeApiVersions.latestUsableVersion(request.apiKey(), builder.oldestAllowedVersion(), - builder.latestAllowedVersion()); UnsupportedVersionException unsupportedVersionException = null; + short version = 0; + try { + version = nodeApiVersions.latestUsableVersion(request.apiKey(), builder.oldestAllowedVersion(), + builder.latestAllowedVersion()); + } catch (UnsupportedVersionException e) { + unsupportedVersionException = e; + } + if (futureResp.isUnsupportedRequest) { unsupportedVersionException = new UnsupportedVersionException( "Api " + request.apiKey() + " with version " + version); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 7aa0f92dc7cfb..6df6b28094457 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -4249,7 +4249,8 @@ public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() { final TopicPartition tp0 = new TopicPartition("foo", 0); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create( + ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 6)); env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); // listoffsets response from broker 0 @@ -4282,7 +4283,8 @@ public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Ex try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create( + ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 6)); env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); // listoffsets response from broker 0 @@ -4332,7 +4334,8 @@ public void testListOffsetsUnsupportedNonMaxTimestamp() { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create( + ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 6)); env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); // listoffsets response from broker 0 @@ -4364,12 +4367,8 @@ public void testListOffsetsNonMaxTimestampDowngradedImmediately() throws Excepti try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { - ApiVersion oldBrokerVersion = new ApiVersion(); - oldBrokerVersion.setApiKey(ApiKeys.LIST_OFFSETS.id); - oldBrokerVersion.setMinVersion((short) 0); - oldBrokerVersion.setMaxVersion((short) 6); env.kafkaClient().setNodeApiVersions(NodeApiVersions.create( - Arrays.asList(oldBrokerVersion))); + ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 6)); env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); @@ -4386,8 +4385,10 @@ public void testListOffsetsNonMaxTimestampDowngradedImmediately() throws Excepti ListOffsetsResult result = env.adminClient().listOffsets( Collections.singletonMap(tp0, OffsetSpec.latest())); - // trigger the request - result.partitionResult(tp0).get(); + ListOffsetsResultInfo tp0Offset = result.partitionResult(tp0).get(); + assertEquals(123L, tp0Offset.offset()); + assertEquals(321, tp0Offset.leaderEpoch().get().intValue()); + assertEquals(-1L, tp0Offset.timestamp()); } } From cd5ca9f61e2c35a4f9187e0d0ce6e4af3990417f Mon Sep 17 00:00:00 2001 From: thomaskwscott Date: Wed, 30 Jun 2021 11:52:02 +0100 Subject: [PATCH 17/18] KAFKA-13002 fixes per pr review --- .../org/apache/kafka/clients/MockClient.java | 38 ++++++++++--------- .../clients/admin/KafkaAdminClientTest.java | 2 +- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index aed415794e45e..28d363bb8ef50 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -238,28 +238,30 @@ public void send(ClientRequest request, long now) { AbstractRequest.Builder builder = request.requestBuilder(); - UnsupportedVersionException unsupportedVersionException = null; - short version = 0; try { - version = nodeApiVersions.latestUsableVersion(request.apiKey(), builder.oldestAllowedVersion(), + short version = nodeApiVersions.latestUsableVersion(request.apiKey(), builder.oldestAllowedVersion(), builder.latestAllowedVersion()); - } catch (UnsupportedVersionException e) { - unsupportedVersionException = e; - } - if (futureResp.isUnsupportedRequest) { - unsupportedVersionException = new UnsupportedVersionException( - "Api " + request.apiKey() + " with version " + version); - } else { - AbstractRequest abstractRequest = request.requestBuilder().build(version); - if (!futureResp.requestMatcher.matches(abstractRequest)) - throw new IllegalStateException("Request matcher did not match next-in-line request " - + abstractRequest + " with prepared response " + futureResp.responseBody); + UnsupportedVersionException unsupportedVersionException = null; + if (futureResp.isUnsupportedRequest) { + unsupportedVersionException = new UnsupportedVersionException( + "Api " + request.apiKey() + " with version " + version); + } else { + AbstractRequest abstractRequest = request.requestBuilder().build(version); + if (!futureResp.requestMatcher.matches(abstractRequest)) + throw new IllegalStateException("Request matcher did not match next-in-line request " + + abstractRequest + " with prepared response " + futureResp.responseBody); + } + + ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(), + request.createdTimeMs(), time.milliseconds(), futureResp.disconnected, + unsupportedVersionException, null, futureResp.responseBody); + responses.add(resp); + } catch (UnsupportedVersionException unsupportedVersionException) { + ClientResponse resp = new ClientResponse(request.makeHeader(builder.latestAllowedVersion()), request.callback(), request.destination(), + request.createdTimeMs(), time.milliseconds(), false, unsupportedVersionException, null, null); + responses.add(resp); } - ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(), - request.createdTimeMs(), time.milliseconds(), futureResp.disconnected, - unsupportedVersionException, null, futureResp.responseBody); - responses.add(resp); iterator.remove(); return; } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 6df6b28094457..0ba2e13afec34 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -4335,7 +4335,7 @@ public void testListOffsetsUnsupportedNonMaxTimestamp() { AdminClientConfig.RETRIES_CONFIG, "2")) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create( - ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 6)); + ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 0)); env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); // listoffsets response from broker 0 From 81a15c1000cdfae8b7e27384aa318b9b0df5a511 Mon Sep 17 00:00:00 2001 From: thomaskwscott Date: Wed, 30 Jun 2021 16:55:03 +0100 Subject: [PATCH 18/18] KAFKA-13002 update failing test after mockClient updates --- .../producer/internals/TransactionManagerTest.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index a93af9dc92171..6a4afda15cece 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -2213,10 +2213,15 @@ public void testSendOffsetWithGroupMetadataFailAsAutoDowngradeTxnCommitNotEnable txnOffsetCommitResponse.put(tp0, Errors.NONE); txnOffsetCommitResponse.put(tp1, Errors.COORDINATOR_LOAD_IN_PROGRESS); - prepareGroupMetadataCommit( + TransactionalRequestResult addOffsetsResult = prepareGroupMetadataCommit( () -> prepareTxnOffsetCommitResponse(consumerGroupId, producerId, epoch, txnOffsetCommitResponse)); - assertThrows(UnsupportedVersionException.class, () -> sender.runOnce()); + sender.runOnce(); + + assertTrue(addOffsetsResult.isCompleted()); + assertFalse(addOffsetsResult.isSuccessful()); + assertTrue(addOffsetsResult.error() instanceof UnsupportedVersionException); + assertFatalError(UnsupportedVersionException.class); } private TransactionalRequestResult prepareGroupMetadataCommit(Runnable prepareTxnCommitResponse) {