diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index dbf73584b777c..a9ab8a7f4e9d6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -73,8 +73,6 @@ public class FetchResponse extends AbstractResponse { public static final int INVALID_PREFERRED_REPLICA_ID = -1; private final FetchResponseData data; - // we build responseData when needed. - private volatile LinkedHashMap responseData = null; @Override public FetchResponseData data() { @@ -99,29 +97,19 @@ public Errors error() { } public LinkedHashMap responseData(Map topicNames, short version) { - if (responseData == null) { - synchronized (this) { - if (responseData == null) { - // Assigning the lazy-initialized `responseData` in the last step - // to avoid other threads accessing a half-initialized object. - final LinkedHashMap responseDataTmp = - new LinkedHashMap<>(); - data.responses().forEach(topicResponse -> { - String name; - if (version < 13) { - name = topicResponse.topic(); - } else { - name = topicNames.get(topicResponse.topicId()); - } - if (name != null) { - topicResponse.partitions().forEach(partition -> - responseDataTmp.put(new TopicPartition(name, partition.partitionIndex()), partition)); - } - }); - responseData = responseDataTmp; - } + final LinkedHashMap responseData = new LinkedHashMap<>(); + data.responses().forEach(topicResponse -> { + String name; + if (version < 13) { + name = topicResponse.topic(); + } else { + name = topicNames.get(topicResponse.topicId()); } - } + if (name != null) { + topicResponse.partitions().forEach(partition -> + responseData.put(new TopicPartition(name, partition.partitionIndex()), partition)); + } + }); return responseData; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index cb48a2289a16d..94d88d5f4211c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -2884,6 +2884,11 @@ public void handleError(Throwable t) { verifySessionPartitions(); handler.handleError(t); } + + @Override + public Map sessionTopicNames() { + return handler.sessionTopicNames(); + } // Verify that session partitions can be traversed safely. private void verifySessionPartitions() { @@ -3665,6 +3670,18 @@ public void testWhenFetchResponseReturnsALeaderShipChangeErrorAndNewLeaderInform // Validate subscription is still valid & fetch-able for tp1. assertTrue(subscriptions.isFetchable(tp1)); } + + @Test + public void testFetcherDontCacheAnyData() { + short version = 17; + FetchResponse fetchResponse = fetchResponse(tidp0, records, Errors.NONE, 100L, -1L, 0L, 0); + LinkedHashMap responseData = fetchResponse.responseData(topicNames, version); + assertEquals(topicNames.size(), responseData.size()); + responseData.forEach((topicPartition, partitionData) -> assertEquals(records, partitionData.records())); + LinkedHashMap nonResponseData = fetchResponse.responseData(emptyMap(), version); + assertEquals(emptyMap().size(), nonResponseData.size()); + nonResponseData.forEach((topicPartition, partitionData) -> assertEquals(MemoryRecords.EMPTY, partitionData.records())); + } private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse( TopicPartition topicPartition,