diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 0699684eca7d8..68c7347e20c41 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -676,13 +676,15 @@ private List> fetchRecords(CompletedFetch completedFetch, i if (completedFetch.nextFetchOffset == position.offset) { List> partRecords = completedFetch.fetchRecords(maxRecords); + log.trace("Returning {} fetched records at offset {} for assigned partition {}", + partRecords.size(), position, completedFetch.partition); + if (completedFetch.nextFetchOffset > position.offset) { SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition( completedFetch.nextFetchOffset, completedFetch.lastEpoch, position.currentLeader); - log.trace("Returning fetched records at offset {} for assigned partition {} and update " + - "position to {}", position, completedFetch.partition, nextPosition); + log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition); subscriptions.position(completedFetch.partition, nextPosition); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index d4a9587ffa56e..b49f2fd9d0f9e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -484,6 +484,8 @@ static void writeHeader(ByteBuffer buffer, @Override public String toString() { return "RecordBatch(magic=" + magic() + ", offsets=[" + baseOffset() + ", " + lastOffset() + "], " + + "sequence=[" + baseSequence() + ", " + lastSequence() + "], " + + "isTransactional=" + isTransactional() + ", isControlBatch=" + isControlBatch() + ", " + "compression=" + compressionType() + ", timestampType=" + timestampType() + ", crc=" + checksum() + ")"; }