From 93dfd5a35beb0a1dbd63ae541811e1f831d61e27 Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Thu, 20 Apr 2017 10:25:24 +0100 Subject: [PATCH 1/3] Guard against unassigned partitions --- .../apache/kafka/clients/consumer/internals/Fetcher.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index c8bbfa3077809..c1e52765058ae 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -487,7 +487,12 @@ public Map>> fetchedRecords() { } else { TopicPartition partition = nextInLineRecords.partition; fetchedPartition = partition; - fetchedOffsets = subscriptions.position(partition); + if (subscriptions.isAssigned(partition)) { + fetchedOffsets = subscriptions.position(partition); + } else { + // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call + log.debug("Not positioning for partition {} since it is no longer assigned", partition); + } List> records = drainRecords(nextInLineRecords, recordsRemaining); if (!records.isEmpty()) { List> currentRecords = drained.get(partition); From 6bde1de7105ff7d5b5bf0c5faf3f76577ec7771f Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Thu, 20 Apr 2017 11:48:11 +0100 Subject: [PATCH 2/3] Remove debug msg --- .../org/apache/kafka/clients/consumer/internals/Fetcher.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index c1e52765058ae..e6eed0efd9c78 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -489,10 +489,7 @@ public Map>> fetchedRecords() { fetchedPartition = partition; if (subscriptions.isAssigned(partition)) { fetchedOffsets = subscriptions.position(partition); - } else { - // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call - log.debug("Not positioning for partition {} since it is no longer assigned", partition); - } + } List> records = drainRecords(nextInLineRecords, recordsRemaining); if (!records.isEmpty()) { List> currentRecords = drained.get(partition); From e2c83a51a606b585403b36b462111bb7610fb390 Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Thu, 20 Apr 2017 19:46:34 +0100 Subject: [PATCH 3/3] Jason's suggestion --- .../clients/consumer/internals/Fetcher.java | 63 ++++++++----------- 1 file changed, 27 insertions(+), 36 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index e6eed0efd9c78..e2631b534ce37 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -472,48 +472,39 @@ public Map>> fetchedRecords() { Map>> drained = new HashMap<>(); int recordsRemaining = maxPollRecords; - TopicPartition fetchedPartition = null; - long fetchedOffsets = -1L; - - try { - while (recordsRemaining > 0) { - if (nextInLineRecords == null || nextInLineRecords.isDrained()) { - CompletedFetch completedFetch = completedFetches.poll(); - if (completedFetch == null) break; - - fetchedPartition = completedFetch.partition; - fetchedOffsets = completedFetch.fetchedOffset; + while (recordsRemaining > 0) { + if (nextInLineRecords == null || nextInLineRecords.isDrained()) { + CompletedFetch completedFetch = completedFetches.poll(); + if (completedFetch == null) break; + try { nextInLineRecords = parseCompletedFetch(completedFetch); - } else { - TopicPartition partition = nextInLineRecords.partition; - fetchedPartition = partition; - if (subscriptions.isAssigned(partition)) { - fetchedOffsets = subscriptions.position(partition); - } - List> records = drainRecords(nextInLineRecords, recordsRemaining); - if (!records.isEmpty()) { - List> currentRecords = drained.get(partition); - if (currentRecords == null) { - drained.put(partition, records); - } else { - // this case shouldn't usually happen because we only send one fetch at a time per partition, - // but it might conceivably happen in some rare cases (such as partition leader changes). - // we have to copy to a new list because the old one may be immutable - List> newRecords = new ArrayList<>(records.size() + currentRecords.size()); - newRecords.addAll(currentRecords); - newRecords.addAll(records); - drained.put(partition, newRecords); - } - recordsRemaining -= records.size(); + } catch (KafkaException e) { + if (drained.isEmpty()) + throw e; + nextInLineExceptionMetadata = new ExceptionMetadata(completedFetch.partition, completedFetch.fetchedOffset, e); + } + } else { + TopicPartition partition = nextInLineRecords.partition; + List> records = drainRecords(nextInLineRecords, recordsRemaining); + if (!records.isEmpty()) { + List> currentRecords = drained.get(partition); + if (currentRecords == null) { + drained.put(partition, records); + } else { + // this case shouldn't usually happen because we only send one fetch at a time per partition, + // but it might conceivably happen in some rare cases (such as partition leader changes). + // we have to copy to a new list because the old one may be immutable + List> newRecords = new ArrayList<>(records.size() + currentRecords.size()); + newRecords.addAll(currentRecords); + newRecords.addAll(records); + drained.put(partition, newRecords); } + recordsRemaining -= records.size(); } } - } catch (KafkaException e) { - if (drained.isEmpty()) - throw e; - nextInLineExceptionMetadata = new ExceptionMetadata(fetchedPartition, fetchedOffsets, e); } + return drained; }