diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java index 0c40ea2039a40..4248ce65046fb 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java @@ -55,8 +55,6 @@ public class ShareFetchResponse extends AbstractResponse { private final ShareFetchResponseData data; - private volatile LinkedHashMap responseData = null; - public ShareFetchResponse(ShareFetchResponseData data) { super(ApiKeys.SHARE_FETCH); this.data = data; @@ -84,23 +82,14 @@ public Map errorCounts() { } public LinkedHashMap responseData(Map topicNames) { - if (responseData == null) { - synchronized (this) { - // Assigning the lazy-initialized `responseData` in the last step - // to avoid other threads accessing a half-initialized object. - if (responseData == null) { - final LinkedHashMap responseDataTmp = new LinkedHashMap<>(); - data.responses().forEach(topicResponse -> { - String name = topicNames.get(topicResponse.topicId()); - if (name != null) { - topicResponse.partitions().forEach(partitionData -> responseDataTmp.put(new TopicIdPartition(topicResponse.topicId(), - new TopicPartition(name, partitionData.partitionIndex())), partitionData)); - } - }); - responseData = responseDataTmp; - } + final LinkedHashMap responseData = new LinkedHashMap<>(); + data.responses().forEach(topicResponse -> { + String name = topicNames.get(topicResponse.topicId()); + if (name != null) { + topicResponse.partitions().forEach(partitionData -> responseData.put(new TopicIdPartition(topicResponse.topicId(), + new TopicPartition(name, partitionData.partitionIndex())), partitionData)); } - } + }); return responseData; } diff --git a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java index f7e29e2484f5f..2bd63df869ecd 100644 --- a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java +++ b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java @@ -19,7 +19,14 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.message.ShareFetchResponseData; import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ShareFetchResponse; import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; @@ -27,6 +34,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.nio.ByteBuffer; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -66,6 +75,21 @@ public void testErrorInAllPartitions() { assertTrue(shareFetch.errorInAllPartitions()); } + @Test + public void testDontCacheAnyData() { + final TopicIdPartition tidp = new TopicIdPartition(Uuid.randomUuid(), 0, "topic"); + MemoryRecords records = buildRecords(1L, 3, 1); + + ShareFetchResponse shareFetch = shareFetchResponse(tidp, records, Errors.NONE, "", (short) 0, + "", List.of(), 0); + LinkedHashMap responseData = shareFetch.responseData(Map.of(tidp.topicId(), tidp.topic())); + assertEquals(1, responseData.size()); + responseData.forEach((topicIdPartition, partitionData) -> assertEquals(records, partitionData.records())); + + LinkedHashMap nonResponseData = shareFetch.responseData(Map.of()); + assertEquals(0, nonResponseData.size()); + } + @Test public void testErrorInAllPartitionsWithMultipleTopicIdPartitions() { TopicIdPartition topicIdPartition0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); @@ -201,4 +225,27 @@ public void testMaybeCompleteWithExceptionWithExistingErroneousTopicPartition() assertEquals(1, brokerTopicStats.allTopicsStats().failedShareFetchRequestRate().count()); assertEquals(1, brokerTopicStats.topicStats("foo").failedShareFetchRequestRate().count()); } + + private MemoryRecords buildRecords(long baseOffset, int count, long firstMessageId) { + MemoryRecordsBuilder builder = MemoryRecords.builder( + ByteBuffer.allocate(1024), Compression.NONE, TimestampType.CREATE_TIME, baseOffset); + for (int i = 0; i < count; i++) + builder.append(0L, "key".getBytes(), ("value-" + (firstMessageId + i)).getBytes()); + return builder.build(); + } + + private ShareFetchResponse shareFetchResponse(TopicIdPartition tp, MemoryRecords records, Errors error, + String errorMessage, short acknowledgeErrorCode, String acknowledgeErrorMessage, + List acquiredRecords, int throttleTime) { + Map partitions = Map.of(tp, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(tp.topicPartition().partition()) + .setErrorCode(error.code()) + .setErrorMessage(errorMessage) + .setAcknowledgeErrorCode(acknowledgeErrorCode) + .setAcknowledgeErrorMessage(acknowledgeErrorMessage) + .setRecords(records) + .setAcquiredRecords(acquiredRecords)); + return ShareFetchResponse.of(Errors.NONE, throttleTime, new LinkedHashMap<>(partitions), List.of(), Integer.MAX_VALUE); + } }