From 859e03c0db2f9f6f7098e8a1bfca7cc4aadb7fa7 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 6 May 2020 22:50:17 -0700 Subject: [PATCH 1/3] the fix --- .../kafka/clients/consumer/internals/Fetcher.java | 14 ++++++++------ .../kafka/common/record/DefaultRecordBatch.java | 2 ++ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 0699684eca7d8..2216cc4e22581 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -258,12 +258,12 @@ public synchronized int sendFetches() { if (log.isDebugEnabled()) { log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget); } - RequestFuture future = client.send(fetchTarget, request); - // We add the node to the set of nodes with pending fetch requests before adding the - // listener because the future may have been fulfilled on another thread (e.g. during a + // We add the node to the set of nodes with pending fetch requests before sending the + // request to the client because the future may have been fulfilled on another thread (e.g. during a // disconnection being handled by the heartbeat thread) which will mean the listener - // will be invoked synchronously. + // will be invoked synchronously, and hence the added id would not be removed anymore. this.nodesWithPendingFetchRequests.add(entry.getKey().id()); + RequestFuture future = client.send(fetchTarget, request); future.addListener(new RequestFutureListener() { @Override public void onSuccess(ClientResponse resp) { @@ -676,13 +676,15 @@ private List> fetchRecords(CompletedFetch completedFetch, i if (completedFetch.nextFetchOffset == position.offset) { List> partRecords = completedFetch.fetchRecords(maxRecords); + log.trace("Returning fetched records {} at offset {} for assigned partition {}", + partRecords, position, completedFetch.partition); + if (completedFetch.nextFetchOffset > position.offset) { SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition( completedFetch.nextFetchOffset, completedFetch.lastEpoch, position.currentLeader); - log.trace("Returning fetched records at offset {} for assigned partition {} and update " + - "position to {}", position, completedFetch.partition, nextPosition); + log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition); subscriptions.position(completedFetch.partition, nextPosition); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index d4a9587ffa56e..b49f2fd9d0f9e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -484,6 +484,8 @@ static void writeHeader(ByteBuffer buffer, @Override public String toString() { return "RecordBatch(magic=" + magic() + ", offsets=[" + baseOffset() + ", " + lastOffset() + "], " + + "sequence=[" + baseSequence() + ", " + lastSequence() + "], " + + "isTransactional=" + isTransactional() + ", isControlBatch=" + isControlBatch() + ", " + "compression=" + compressionType() + ", timestampType=" + timestampType() + ", crc=" + checksum() + ")"; } From 662364f34c93783cf9ef99377443c102b30afa7d Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 7 May 2020 08:56:02 -0700 Subject: [PATCH 2/3] revert --- .../apache/kafka/clients/consumer/internals/Fetcher.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 2216cc4e22581..1d2c90e7aeead 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -258,12 +258,12 @@ public synchronized int sendFetches() { if (log.isDebugEnabled()) { log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget); } - // We add the node to the set of nodes with pending fetch requests before sending the - // request to the client because the future may have been fulfilled on another thread (e.g. during a + RequestFuture future = client.send(fetchTarget, request); + // We add the node to the set of nodes with pending fetch requests before adding the + // listenerbecause the future may have been fulfilled on another thread (e.g. during a // disconnection being handled by the heartbeat thread) which will mean the listener - // will be invoked synchronously, and hence the added id would not be removed anymore. + // will be invoked synchronously. this.nodesWithPendingFetchRequests.add(entry.getKey().id()); - RequestFuture future = client.send(fetchTarget, request); future.addListener(new RequestFutureListener() { @Override public void onSuccess(ClientResponse resp) { From eab85ef671c48ddd0aa4f2f1e4f5cfb81449b0f2 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 7 May 2020 10:00:39 -0700 Subject: [PATCH 3/3] minor fixes --- .../apache/kafka/clients/consumer/internals/Fetcher.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 1d2c90e7aeead..68c7347e20c41 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -260,7 +260,7 @@ public synchronized int sendFetches() { } RequestFuture future = client.send(fetchTarget, request); // We add the node to the set of nodes with pending fetch requests before adding the - // listenerbecause the future may have been fulfilled on another thread (e.g. during a + // listener because the future may have been fulfilled on another thread (e.g. during a // disconnection being handled by the heartbeat thread) which will mean the listener // will be invoked synchronously. this.nodesWithPendingFetchRequests.add(entry.getKey().id()); @@ -676,8 +676,8 @@ private List> fetchRecords(CompletedFetch completedFetch, i if (completedFetch.nextFetchOffset == position.offset) { List> partRecords = completedFetch.fetchRecords(maxRecords); - log.trace("Returning fetched records {} at offset {} for assigned partition {}", - partRecords, position, completedFetch.partition); + log.trace("Returning {} fetched records at offset {} for assigned partition {}", + partRecords.size(), position, completedFetch.partition); if (completedFetch.nextFetchOffset > position.offset) { SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(