diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index c8bbfa3077809..e2631b534ce37 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -472,46 +472,39 @@ public Map>> fetchedRecords() { Map>> drained = new HashMap<>(); int recordsRemaining = maxPollRecords; - TopicPartition fetchedPartition = null; - long fetchedOffsets = -1L; - - try { - while (recordsRemaining > 0) { - if (nextInLineRecords == null || nextInLineRecords.isDrained()) { - CompletedFetch completedFetch = completedFetches.poll(); - if (completedFetch == null) break; - - fetchedPartition = completedFetch.partition; - fetchedOffsets = completedFetch.fetchedOffset; + while (recordsRemaining > 0) { + if (nextInLineRecords == null || nextInLineRecords.isDrained()) { + CompletedFetch completedFetch = completedFetches.poll(); + if (completedFetch == null) break; + try { nextInLineRecords = parseCompletedFetch(completedFetch); - } else { - TopicPartition partition = nextInLineRecords.partition; - fetchedPartition = partition; - fetchedOffsets = subscriptions.position(partition); - List> records = drainRecords(nextInLineRecords, recordsRemaining); - if (!records.isEmpty()) { - List> currentRecords = drained.get(partition); - if (currentRecords == null) { - drained.put(partition, records); - } else { - // this case shouldn't usually happen because we only send one fetch at a time per partition, - // but it might conceivably happen in some rare cases (such as partition leader changes). - // we have to copy to a new list because the old one may be immutable - List> newRecords = new ArrayList<>(records.size() + currentRecords.size()); - newRecords.addAll(currentRecords); - newRecords.addAll(records); - drained.put(partition, newRecords); - } - recordsRemaining -= records.size(); + } catch (KafkaException e) { + if (drained.isEmpty()) + throw e; + nextInLineExceptionMetadata = new ExceptionMetadata(completedFetch.partition, completedFetch.fetchedOffset, e); + } + } else { + TopicPartition partition = nextInLineRecords.partition; + List> records = drainRecords(nextInLineRecords, recordsRemaining); + if (!records.isEmpty()) { + List> currentRecords = drained.get(partition); + if (currentRecords == null) { + drained.put(partition, records); + } else { + // this case shouldn't usually happen because we only send one fetch at a time per partition, + // but it might conceivably happen in some rare cases (such as partition leader changes). + // we have to copy to a new list because the old one may be immutable + List> newRecords = new ArrayList<>(records.size() + currentRecords.size()); + newRecords.addAll(currentRecords); + newRecords.addAll(records); + drained.put(partition, newRecords); } + recordsRemaining -= records.size(); } } - } catch (KafkaException e) { - if (drained.isEmpty()) - throw e; - nextInLineExceptionMetadata = new ExceptionMetadata(fetchedPartition, fetchedOffsets, e); } + return drained; }