diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 7a34541afc3f1..c72df406697a9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4232,7 +4232,9 @@ private List getListOffsetsCalls(MetadataOperationContext partitionsToQuery = new ArrayList<>(entry.getValue().values()); - private boolean supportsMaxTimestamp = true; + private boolean supportsMaxTimestamp = partitionsToQuery.stream() + .flatMap(t -> t.partitions().stream()) + .anyMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP); @Override ListOffsetsRequest.Builder createRequest(int timeoutMs) { @@ -4303,7 +4305,6 @@ boolean handleUnsupportedVersionException(UnsupportedVersionException exception) supportsMaxTimestamp = false; // fail any unsupported futures and remove partitions from the downgraded retry - boolean foundMaxTimestampPartition = false; Iterator topicIterator = partitionsToQuery.iterator(); while (topicIterator.hasNext()) { ListOffsetsTopic topic = topicIterator.next(); @@ -4311,7 +4312,6 @@ boolean handleUnsupportedVersionException(UnsupportedVersionException exception) while (partitionIterator.hasNext()) { ListOffsetsPartition partition = partitionIterator.next(); if (partition.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP) { - foundMaxTimestampPartition = true; futures.get(new TopicPartition(topic.name(), partition.partitionIndex())) .completeExceptionally(new UnsupportedVersionException( "Broker " + brokerId + " does not support MAX_TIMESTAMP offset spec")); @@ -4322,7 +4322,7 @@ boolean handleUnsupportedVersionException(UnsupportedVersionException exception) topicIterator.remove(); } } - return foundMaxTimestampPartition && !partitionsToQuery.isEmpty(); + return !partitionsToQuery.isEmpty(); } return false; } diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index f6942b7a5294d..28d363bb8ef50 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -237,23 +237,31 @@ public void send(ClientRequest request, long now) { continue; AbstractRequest.Builder builder = request.requestBuilder(); - short version = nodeApiVersions.latestUsableVersion(request.apiKey(), builder.oldestAllowedVersion(), - builder.latestAllowedVersion()); - UnsupportedVersionException unsupportedVersionException = null; - if (futureResp.isUnsupportedRequest) { - unsupportedVersionException = new UnsupportedVersionException( - "Api " + request.apiKey() + " with version " + version); - } else { - AbstractRequest abstractRequest = request.requestBuilder().build(version); - if (!futureResp.requestMatcher.matches(abstractRequest)) - throw new IllegalStateException("Request matcher did not match next-in-line request " - + abstractRequest + " with prepared response " + futureResp.responseBody); + try { + short version = nodeApiVersions.latestUsableVersion(request.apiKey(), builder.oldestAllowedVersion(), + builder.latestAllowedVersion()); + + UnsupportedVersionException unsupportedVersionException = null; + if (futureResp.isUnsupportedRequest) { + unsupportedVersionException = new UnsupportedVersionException( + "Api " + request.apiKey() + " with version " + version); + } else { + AbstractRequest abstractRequest = request.requestBuilder().build(version); + if (!futureResp.requestMatcher.matches(abstractRequest)) + throw new IllegalStateException("Request matcher did not match next-in-line request " + + abstractRequest + " with prepared response " + futureResp.responseBody); + } + + ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(), + request.createdTimeMs(), time.milliseconds(), futureResp.disconnected, + unsupportedVersionException, null, futureResp.responseBody); + responses.add(resp); + } catch (UnsupportedVersionException unsupportedVersionException) { + ClientResponse resp = new ClientResponse(request.makeHeader(builder.latestAllowedVersion()), request.callback(), request.destination(), + request.createdTimeMs(), time.milliseconds(), false, unsupportedVersionException, null, null); + responses.add(resp); } - ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(), - request.createdTimeMs(), time.milliseconds(), futureResp.disconnected, - unsupportedVersionException, null, futureResp.responseBody); - responses.add(resp); iterator.remove(); return; } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 4f6cddbfb568c..0ba2e13afec34 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -4249,7 +4249,8 @@ public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() { final TopicPartition tp0 = new TopicPartition("foo", 0); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create( + ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 6)); env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); // listoffsets response from broker 0 @@ -4282,7 +4283,8 @@ public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Ex try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create( + ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 6)); env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); // listoffsets response from broker 0 @@ -4332,7 +4334,8 @@ public void testListOffsetsUnsupportedNonMaxTimestamp() { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create( + ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 0)); env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); // listoffsets response from broker 0 @@ -4346,6 +4349,49 @@ public void testListOffsetsUnsupportedNonMaxTimestamp() { } } + @Test + public void testListOffsetsNonMaxTimestampDowngradedImmediately() throws Exception { + Node node = new Node(0, "localhost", 8120); + List nodes = Collections.singletonList(node); + List pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, + AdminClientConfig.RETRIES_CONFIG, "2")) { + + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create( + ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 6)); + + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + ListOffsetsTopicResponse t0 = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp0, Errors.NONE, -1L, 123L, 321); + ListOffsetsResponseData responseData = new ListOffsetsResponseData() + .setThrottleTimeMs(0) + .setTopics(Arrays.asList(t0)); + + // listoffsets response from broker 0 + env.kafkaClient().prepareResponse( + request -> request instanceof ListOffsetsRequest, + new ListOffsetsResponse(responseData)); + + ListOffsetsResult result = env.adminClient().listOffsets( + Collections.singletonMap(tp0, OffsetSpec.latest())); + + ListOffsetsResultInfo tp0Offset = result.partitionResult(tp0).get(); + assertEquals(123L, tp0Offset.offset()); + assertEquals(321, tp0Offset.leaderEpoch().get().intValue()); + assertEquals(-1L, tp0Offset.timestamp()); + } + } + private Map makeTestFeatureUpdates() { return Utils.mkMap( Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, false)), diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index a93af9dc92171..6a4afda15cece 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -2213,10 +2213,15 @@ public void testSendOffsetWithGroupMetadataFailAsAutoDowngradeTxnCommitNotEnable txnOffsetCommitResponse.put(tp0, Errors.NONE); txnOffsetCommitResponse.put(tp1, Errors.COORDINATOR_LOAD_IN_PROGRESS); - prepareGroupMetadataCommit( + TransactionalRequestResult addOffsetsResult = prepareGroupMetadataCommit( () -> prepareTxnOffsetCommitResponse(consumerGroupId, producerId, epoch, txnOffsetCommitResponse)); - assertThrows(UnsupportedVersionException.class, () -> sender.runOnce()); + sender.runOnce(); + + assertTrue(addOffsetsResult.isCompleted()); + assertFalse(addOffsetsResult.isSuccessful()); + assertTrue(addOffsetsResult.error() instanceof UnsupportedVersionException); + assertFatalError(UnsupportedVersionException.class); } private TransactionalRequestResult prepareGroupMetadataCommit(Runnable prepareTxnCommitResponse) {