Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5a7c00d
KAFKA-12541 add MAX_TIMESTAMP spec to listOffsets api
thomaskwscott May 25, 2021
9cc3c0c
Merge remote-tracking branch 'upstream/trunk' into trunk
thomaskwscott May 25, 2021
5f5d46b
KAFKA-12541 updated replica fetcher to use latest ListOffsets request…
thomaskwscott May 25, 2021
4fc4059
KAFKA-12541 refactor with retry approach in KafkaAdminClient
thomaskwscott Jun 5, 2021
51aaa70
Merge remote-tracking branch 'upstream/trunk' into trunk
thomaskwscott Jun 8, 2021
ae0b6ed
KAFKA-12541 added LogOffsetTest tests
thomaskwscott Jun 8, 2021
24be1c7
KAFKA-12541 tidy up
thomaskwscott Jun 8, 2021
fb85422
KAFKA-12541 fixes per pr review
thomaskwscott Jun 18, 2021
32e6b2b
KAFKA-12541 adminclient simplification
thomaskwscott Jun 21, 2021
a0e9c10
KAFKA-12541 refactor of listoffsets retry per pr review comments
thomaskwscott Jun 21, 2021
32241da
KAFKA-12541 fixes per pr review
thomaskwscott Jun 21, 2021
ea3cd65
KAFKA-12541 stopped retries for partitions that cannot use max_timestamp
thomaskwscott Jun 22, 2021
7355f61
KAFKA-12541 roll back changes to RequestResponseTest
thomaskwscott Jun 23, 2021
b003e84
Merge remote-tracking branch 'upstream/trunk' into trunk
thomaskwscott Jun 23, 2021
4f4620d
KAFKA-12541 fixes per pr review
thomaskwscott Jun 23, 2021
aada3fc
KAFKA-12541 fixes per pr review
thomaskwscott Jun 24, 2021
19f98ab
KAFKA-12541 fixes per pr review
thomaskwscott Jun 25, 2021
3f73ce5
Merge remote-tracking branch 'upstream/trunk' into trunk
thomaskwscott Jun 28, 2021
1e43e02
KAFKA-13002 fix for immediate downgrade cases for non max timestamp r…
thomaskwscott Jun 28, 2021
25d5bc4
KAFKA-13002 fixes per pr review
thomaskwscott Jun 29, 2021
cd5ca9f
KAFKA-13002 fixes per pr review
thomaskwscott Jun 30, 2021
81a15c1
KAFKA-13002 update failing test after mockClient updates
thomaskwscott Jun 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4232,7 +4232,9 @@ private List<Call> getListOffsetsCalls(MetadataOperationContext<ListOffsetsResul

final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());

private boolean supportsMaxTimestamp = true;
private boolean supportsMaxTimestamp = partitionsToQuery.stream()
.flatMap(t -> t.partitions().stream())
.anyMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP);

@Override
ListOffsetsRequest.Builder createRequest(int timeoutMs) {
Expand Down Expand Up @@ -4303,15 +4305,13 @@ boolean handleUnsupportedVersionException(UnsupportedVersionException exception)
supportsMaxTimestamp = false;

// fail any unsupported futures and remove partitions from the downgraded retry
boolean foundMaxTimestampPartition = false;
Iterator<ListOffsetsTopic> topicIterator = partitionsToQuery.iterator();
while (topicIterator.hasNext()) {
ListOffsetsTopic topic = topicIterator.next();
Iterator<ListOffsetsPartition> partitionIterator = topic.partitions().iterator();
while (partitionIterator.hasNext()) {
ListOffsetsPartition partition = partitionIterator.next();
if (partition.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP) {
foundMaxTimestampPartition = true;
futures.get(new TopicPartition(topic.name(), partition.partitionIndex()))
.completeExceptionally(new UnsupportedVersionException(
"Broker " + brokerId + " does not support MAX_TIMESTAMP offset spec"));
Expand All @@ -4322,7 +4322,7 @@ boolean handleUnsupportedVersionException(UnsupportedVersionException exception)
topicIterator.remove();
}
}
return foundMaxTimestampPartition && !partitionsToQuery.isEmpty();
return !partitionsToQuery.isEmpty();
}
return false;
}
Expand Down
38 changes: 23 additions & 15 deletions clients/src/test/java/org/apache/kafka/clients/MockClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,23 +237,31 @@ public void send(ClientRequest request, long now) {
continue;

AbstractRequest.Builder<?> builder = request.requestBuilder();
short version = nodeApiVersions.latestUsableVersion(request.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());

UnsupportedVersionException unsupportedVersionException = null;
if (futureResp.isUnsupportedRequest) {
unsupportedVersionException = new UnsupportedVersionException(
"Api " + request.apiKey() + " with version " + version);
} else {
AbstractRequest abstractRequest = request.requestBuilder().build(version);
if (!futureResp.requestMatcher.matches(abstractRequest))
throw new IllegalStateException("Request matcher did not match next-in-line request "
+ abstractRequest + " with prepared response " + futureResp.responseBody);
try {
short version = nodeApiVersions.latestUsableVersion(request.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());

UnsupportedVersionException unsupportedVersionException = null;
if (futureResp.isUnsupportedRequest) {
unsupportedVersionException = new UnsupportedVersionException(
"Api " + request.apiKey() + " with version " + version);
} else {
AbstractRequest abstractRequest = request.requestBuilder().build(version);
if (!futureResp.requestMatcher.matches(abstractRequest))
throw new IllegalStateException("Request matcher did not match next-in-line request "
+ abstractRequest + " with prepared response " + futureResp.responseBody);
}

ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
request.createdTimeMs(), time.milliseconds(), futureResp.disconnected,
unsupportedVersionException, null, futureResp.responseBody);
responses.add(resp);
} catch (UnsupportedVersionException unsupportedVersionException) {
ClientResponse resp = new ClientResponse(request.makeHeader(builder.latestAllowedVersion()), request.callback(), request.destination(),
request.createdTimeMs(), time.milliseconds(), false, unsupportedVersionException, null, null);
responses.add(resp);
}
ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
request.createdTimeMs(), time.milliseconds(), futureResp.disconnected,
unsupportedVersionException, null, futureResp.responseBody);
responses.add(resp);
iterator.remove();
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4249,7 +4249,8 @@ public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
final TopicPartition tp0 = new TopicPartition("foo", 0);

try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(
ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 6));
Comment on lines +4252 to +4253
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit annoying that we have to keep calling prepareUnsupportedVersionResponse below even when we provide the versions. In the end, setting the versions in this case does not change much because prepareUnsupportedVersionResponse will make the request fail anyway. This is inherently due to the implementation of the MockClient which requires queued responses in order to process queued requests. It is fine to keep it as it in this patch but we should really rework the MockClient.

env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));

// listoffsets response from broker 0
Expand Down Expand Up @@ -4282,7 +4283,8 @@ public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Ex
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
AdminClientConfig.RETRIES_CONFIG, "2")) {

env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(
ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 6));
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));

// listoffsets response from broker 0
Expand Down Expand Up @@ -4332,7 +4334,8 @@ public void testListOffsetsUnsupportedNonMaxTimestamp() {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
AdminClientConfig.RETRIES_CONFIG, "2")) {

env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(
ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 0));
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));

// listoffsets response from broker 0
Expand All @@ -4346,6 +4349,49 @@ public void testListOffsetsUnsupportedNonMaxTimestamp() {
}
}

@Test
public void testListOffsetsNonMaxTimestampDowngradedImmediately() throws Exception {
Node node = new Node(0, "localhost", 8120);
List<Node> nodes = Collections.singletonList(node);
List<PartitionInfo> pInfos = new ArrayList<>();
pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
final Cluster cluster = new Cluster(
"mockClusterId",
nodes,
pInfos,
Collections.emptySet(),
Collections.emptySet(),
node);
final TopicPartition tp0 = new TopicPartition("foo", 0);

try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
AdminClientConfig.RETRIES_CONFIG, "2")) {

env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(
Comment thread
thomaskwscott marked this conversation as resolved.
ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 6));

env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));

ListOffsetsTopicResponse t0 = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp0, Errors.NONE, -1L, 123L, 321);
ListOffsetsResponseData responseData = new ListOffsetsResponseData()
.setThrottleTimeMs(0)
.setTopics(Arrays.asList(t0));

// listoffsets response from broker 0
env.kafkaClient().prepareResponse(
request -> request instanceof ListOffsetsRequest,
new ListOffsetsResponse(responseData));

ListOffsetsResult result = env.adminClient().listOffsets(
Collections.singletonMap(tp0, OffsetSpec.latest()));

ListOffsetsResultInfo tp0Offset = result.partitionResult(tp0).get();
assertEquals(123L, tp0Offset.offset());
assertEquals(321, tp0Offset.leaderEpoch().get().intValue());
assertEquals(-1L, tp0Offset.timestamp());
}
}

private Map<String, FeatureUpdate> makeTestFeatureUpdates() {
return Utils.mkMap(
Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, false)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2213,10 +2213,15 @@ public void testSendOffsetWithGroupMetadataFailAsAutoDowngradeTxnCommitNotEnable
txnOffsetCommitResponse.put(tp0, Errors.NONE);
txnOffsetCommitResponse.put(tp1, Errors.COORDINATOR_LOAD_IN_PROGRESS);

prepareGroupMetadataCommit(
TransactionalRequestResult addOffsetsResult = prepareGroupMetadataCommit(
() -> prepareTxnOffsetCommitResponse(consumerGroupId, producerId, epoch, txnOffsetCommitResponse));

assertThrows(UnsupportedVersionException.class, () -> sender.runOnce());
sender.runOnce();

assertTrue(addOffsetsResult.isCompleted());
assertFalse(addOffsetsResult.isSuccessful());
assertTrue(addOffsetsResult.error() instanceof UnsupportedVersionException);
assertFatalError(UnsupportedVersionException.class);
}

private TransactionalRequestResult prepareGroupMetadataCommit(Runnable prepareTxnCommitResponse) {
Expand Down