Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition;
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse;
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
Expand Down Expand Up @@ -215,8 +215,8 @@
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.ListPartitionReassignmentsRequest;
import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
Expand Down Expand Up @@ -3975,7 +3975,7 @@ private List<Call> getListOffsetsCalls(MetadataOperationContext<ListOffsetsResul
MetadataResponse mr = context.response().orElseThrow(() -> new IllegalStateException("No Metadata response"));
List<Call> calls = new ArrayList<>();
// grouping topic partitions per leader
Map<Node, Map<String, ListOffsetTopic>> leaders = new HashMap<>();
Map<Node, Map<String, ListOffsetsTopic>> leaders = new HashMap<>();

for (Map.Entry<TopicPartition, OffsetSpec> entry: topicPartitionOffsets.entrySet()) {

Expand All @@ -3985,15 +3985,15 @@ private List<Call> getListOffsetsCalls(MetadataOperationContext<ListOffsetsResul
long offsetQuery = (offsetSpec instanceof TimestampSpec)
? ((TimestampSpec) offsetSpec).timestamp()
: (offsetSpec instanceof OffsetSpec.EarliestSpec)
? ListOffsetRequest.EARLIEST_TIMESTAMP
: ListOffsetRequest.LATEST_TIMESTAMP;
? ListOffsetsRequest.EARLIEST_TIMESTAMP
: ListOffsetsRequest.LATEST_TIMESTAMP;
// avoid sending listOffsets request for topics with errors
if (!mr.errors().containsKey(tp.topic())) {
Node node = mr.cluster().leaderFor(tp);
if (node != null) {
Map<String, ListOffsetTopic> leadersOnNode = leaders.computeIfAbsent(node, k -> new HashMap<String, ListOffsetTopic>());
ListOffsetTopic topic = leadersOnNode.computeIfAbsent(tp.topic(), k -> new ListOffsetTopic().setName(tp.topic()));
topic.partitions().add(new ListOffsetPartition().setPartitionIndex(tp.partition()).setTimestamp(offsetQuery));
Map<String, ListOffsetsTopic> leadersOnNode = leaders.computeIfAbsent(node, k -> new HashMap<>());
ListOffsetsTopic topic = leadersOnNode.computeIfAbsent(tp.topic(), k -> new ListOffsetsTopic().setName(tp.topic()));
topic.partitions().add(new ListOffsetsPartition().setPartitionIndex(tp.partition()).setTimestamp(offsetQuery));
} else {
future.completeExceptionally(Errors.LEADER_NOT_AVAILABLE.exception());
}
Expand All @@ -4002,27 +4002,27 @@ private List<Call> getListOffsetsCalls(MetadataOperationContext<ListOffsetsResul
}
}

for (final Map.Entry<Node, Map<String, ListOffsetTopic>> entry : leaders.entrySet()) {
for (final Map.Entry<Node, Map<String, ListOffsetsTopic>> entry : leaders.entrySet()) {
final int brokerId = entry.getKey().id();

calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) {

final List<ListOffsetTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());

@Override
ListOffsetRequest.Builder createRequest(int timeoutMs) {
return ListOffsetRequest.Builder
ListOffsetsRequest.Builder createRequest(int timeoutMs) {
return ListOffsetsRequest.Builder
.forConsumer(true, context.options().isolationLevel())
.setTargetTimes(partitionsToQuery);
}

@Override
void handleResponse(AbstractResponse abstractResponse) {
ListOffsetResponse response = (ListOffsetResponse) abstractResponse;
ListOffsetsResponse response = (ListOffsetsResponse) abstractResponse;
Map<TopicPartition, OffsetSpec> retryTopicPartitionOffsets = new HashMap<>();

for (ListOffsetTopicResponse topic : response.topics()) {
for (ListOffsetPartitionResponse partition : topic.partitions()) {
for (ListOffsetsTopicResponse topic : response.topics()) {
for (ListOffsetsPartitionResponse partition : topic.partitions()) {
TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp);
Errors error = Errors.forCode(partition.errorCode());
Expand All @@ -4032,7 +4032,7 @@ void handleResponse(AbstractResponse abstractResponse) {
} else if (MetadataOperationContext.shouldRefreshMetadata(error)) {
retryTopicPartitionOffsets.put(tp, offsetRequestSpec);
} else if (error == Errors.NONE) {
Optional<Integer> leaderEpoch = (partition.leaderEpoch() == ListOffsetResponse.UNKNOWN_EPOCH)
Optional<Integer> leaderEpoch = (partition.leaderEpoch() == ListOffsetsResponse.UNKNOWN_EPOCH)
? Optional.empty()
: Optional.of(partition.leaderEpoch());
future.complete(new ListOffsetsResultInfo(partition.offset(), partition.timestamp(), leaderEpoch));
Expand All @@ -4044,8 +4044,8 @@ void handleResponse(AbstractResponse abstractResponse) {

if (retryTopicPartitionOffsets.isEmpty()) {
// The server should send back a response for every topic partition. But do a sanity check anyway.
for (ListOffsetTopic topic : partitionsToQuery) {
for (ListOffsetPartition partition : topic.partitions()) {
for (ListOffsetsTopic topic : partitionsToQuery) {
for (ListOffsetsPartition partition : topic.partitions()) {
TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
ApiException error = new ApiException("The response from broker " + brokerId +
" did not contain a result for topic partition " + tp);
Expand All @@ -4063,8 +4063,8 @@ void handleResponse(AbstractResponse abstractResponse) {

@Override
void handleFailure(Throwable throwable) {
for (ListOffsetTopic topic : entry.getValue().values()) {
for (ListOffsetPartition partition : topic.partitions()) {
for (ListOffsetsTopic topic : entry.getValue().values()) {
for (ListOffsetsPartition partition : topic.partitions()) {
TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp);
future.completeExceptionally(throwable);
Expand Down
Loading