From b145989baa267cd728a6753b1ceadf1cf76d194f Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 2 Apr 2025 15:44:52 +0000 Subject: [PATCH 1/5] fix production --- .../common/requests/ShareFetchResponse.java | 25 ++++++------------- 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java index 0c40ea2039a40..4248ce65046fb 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java @@ -55,8 +55,6 @@ public class ShareFetchResponse extends AbstractResponse { private final ShareFetchResponseData data; - private volatile LinkedHashMap responseData = null; - public ShareFetchResponse(ShareFetchResponseData data) { super(ApiKeys.SHARE_FETCH); this.data = data; @@ -84,23 +82,14 @@ public Map errorCounts() { } public LinkedHashMap responseData(Map topicNames) { - if (responseData == null) { - synchronized (this) { - // Assigning the lazy-initialized `responseData` in the last step - // to avoid other threads accessing a half-initialized object. - if (responseData == null) { - final LinkedHashMap responseDataTmp = new LinkedHashMap<>(); - data.responses().forEach(topicResponse -> { - String name = topicNames.get(topicResponse.topicId()); - if (name != null) { - topicResponse.partitions().forEach(partitionData -> responseDataTmp.put(new TopicIdPartition(topicResponse.topicId(), - new TopicPartition(name, partitionData.partitionIndex())), partitionData)); - } - }); - responseData = responseDataTmp; - } + final LinkedHashMap responseData = new LinkedHashMap<>(); + data.responses().forEach(topicResponse -> { + String name = topicNames.get(topicResponse.topicId()); + if (name != null) { + topicResponse.partitions().forEach(partitionData -> responseData.put(new TopicIdPartition(topicResponse.topicId(), + new TopicPartition(name, partitionData.partitionIndex())), partitionData)); } - } + }); return responseData; } From eb477847a87437f54fa6645e223c85fc37c8f5c2 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Thu, 3 Apr 2025 10:36:27 +0000 Subject: [PATCH 2/5] add test --- .../server/share/fetch/ShareFetchTest.java | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java index f7e29e2484f5f..c4d651d2f2931 100644 --- a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java +++ b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java @@ -19,7 +19,14 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.message.ShareFetchResponseData; import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ShareFetchResponse; import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; @@ -27,6 +34,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.nio.ByteBuffer; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -43,11 +52,15 @@ public class ShareFetchTest { private static final String MEMBER_ID = "memberId"; private static final int BATCH_SIZE = 500; + private final TopicIdPartition tidp0 = new TopicIdPartition(Uuid.randomUuid(), 0, "topic"); + private MemoryRecords records; + private BrokerTopicStats brokerTopicStats; @BeforeEach public void setUp() { brokerTopicStats = new BrokerTopicStats(); + records = buildRecords(1L, 3, 1); } @AfterEach @@ -66,6 +79,19 @@ public void testErrorInAllPartitions() { assertTrue(shareFetch.errorInAllPartitions()); } + @Test + public void testDontCacheAnyData() { + ShareFetchResponse shareFetch = shareFetchResponse(tidp0, records, Errors.NONE, "", (short) 0, + "", List.of(), 0); + LinkedHashMap responseData = shareFetch.responseData(Map.of(tidp0.topicId(), tidp0.topic())); + assertEquals(1, responseData.size()); + responseData.forEach((topicIdPartition, partitionData) -> assertEquals(records, partitionData.records())); + + LinkedHashMap nonResponseData = shareFetch.responseData(Map.of()); + assertEquals(0, nonResponseData.size()); + nonResponseData.forEach((topicIdPartition, partitionData) -> assertEquals(MemoryRecords.EMPTY, partitionData.records())); + } + @Test public void testErrorInAllPartitionsWithMultipleTopicIdPartitions() { TopicIdPartition topicIdPartition0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); @@ -201,4 +227,27 @@ public void testMaybeCompleteWithExceptionWithExistingErroneousTopicPartition() assertEquals(1, brokerTopicStats.allTopicsStats().failedShareFetchRequestRate().count()); assertEquals(1, brokerTopicStats.topicStats("foo").failedShareFetchRequestRate().count()); } + + private MemoryRecords buildRecords(long baseOffset, int count, long firstMessageId) { + MemoryRecordsBuilder builder = MemoryRecords.builder( + ByteBuffer.allocate(1024), Compression.NONE, TimestampType.CREATE_TIME, baseOffset); + for (int i = 0; i < count; i++) + builder.append(0L, "key".getBytes(), ("value-" + (firstMessageId + i)).getBytes()); + return builder.build(); + } + + private ShareFetchResponse shareFetchResponse(TopicIdPartition tp, MemoryRecords records, Errors error, + String errorMessage, short acknowledgeErrorCode, String acknowledgeErrorMessage, + List acquiredRecords, int throttleTime) { + Map partitions = Map.of(tp, + new ShareFetchResponseData.PartitionData() + .setPartitionIndex(tp.topicPartition().partition()) + .setErrorCode(error.code()) + .setErrorMessage(errorMessage) + .setAcknowledgeErrorCode(acknowledgeErrorCode) + .setAcknowledgeErrorMessage(acknowledgeErrorMessage) + .setRecords(records) + .setAcquiredRecords(acquiredRecords)); + return ShareFetchResponse.of(Errors.NONE, throttleTime, new LinkedHashMap<>(partitions), List.of(), Integer.MAX_VALUE); + } } From 321c3b090f08ad25fa95d2d546e589bd4fa764ed Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 4 Apr 2025 09:37:35 +0000 Subject: [PATCH 3/5] address PaAn comment --- .../java/org/apache/kafka/server/share/fetch/ShareFetchTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java index c4d651d2f2931..da2141bfdb6d0 100644 --- a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java +++ b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java @@ -89,7 +89,6 @@ public void testDontCacheAnyData() { LinkedHashMap nonResponseData = shareFetch.responseData(Map.of()); assertEquals(0, nonResponseData.size()); - nonResponseData.forEach((topicIdPartition, partitionData) -> assertEquals(MemoryRecords.EMPTY, partitionData.records())); } @Test From 0d7dfa83016ba3bc98510ffd09f05fcc97925022 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 4 Apr 2025 15:24:16 +0000 Subject: [PATCH 4/5] address PaAn comment round 2 --- .../kafka/server/share/fetch/ShareFetchTest.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java index da2141bfdb6d0..e3eb5e60a4741 100644 --- a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java +++ b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java @@ -52,15 +52,11 @@ public class ShareFetchTest { private static final String MEMBER_ID = "memberId"; private static final int BATCH_SIZE = 500; - private final TopicIdPartition tidp0 = new TopicIdPartition(Uuid.randomUuid(), 0, "topic"); - private MemoryRecords records; - private BrokerTopicStats brokerTopicStats; @BeforeEach public void setUp() { brokerTopicStats = new BrokerTopicStats(); - records = buildRecords(1L, 3, 1); } @AfterEach @@ -81,9 +77,12 @@ public void testErrorInAllPartitions() { @Test public void testDontCacheAnyData() { - ShareFetchResponse shareFetch = shareFetchResponse(tidp0, records, Errors.NONE, "", (short) 0, + final TopicIdPartition tidp = new TopicIdPartition(Uuid.randomUuid(), 0, "topic"); + MemoryRecords records = buildRecords(1L, 3, 1);; + + ShareFetchResponse shareFetch = shareFetchResponse(tidp, records, Errors.NONE, "", (short) 0, "", List.of(), 0); - LinkedHashMap responseData = shareFetch.responseData(Map.of(tidp0.topicId(), tidp0.topic())); + LinkedHashMap responseData = shareFetch.responseData(Map.of(tidp.topicId(), tidp.topic())); assertEquals(1, responseData.size()); responseData.forEach((topicIdPartition, partitionData) -> assertEquals(records, partitionData.records())); From 11338a7c1e35f1fc59d0a4f279e8a5d7de934a1d Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 4 Apr 2025 15:25:52 +0000 Subject: [PATCH 5/5] fix build --- .../org/apache/kafka/server/share/fetch/ShareFetchTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java index e3eb5e60a4741..2bd63df869ecd 100644 --- a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java +++ b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java @@ -78,7 +78,7 @@ public void testErrorInAllPartitions() { @Test public void testDontCacheAnyData() { final TopicIdPartition tidp = new TopicIdPartition(Uuid.randomUuid(), 0, "topic"); - MemoryRecords records = buildRecords(1L, 3, 1);; + MemoryRecords records = buildRecords(1L, 3, 1); ShareFetchResponse shareFetch = shareFetchResponse(tidp, records, Errors.NONE, "", (short) 0, "", List.of(), 0);