From e32700b00db7e59ec1507a0e1c8872d021698a0c Mon Sep 17 00:00:00 2001 From: m1a2st Date: Fri, 5 Jul 2024 08:33:00 +0800 Subject: [PATCH 1/5] remove FetchResponse cache --- .../kafka/common/requests/FetchResponse.java | 36 +++++++------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index dbf73584b777c..a9ab8a7f4e9d6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -73,8 +73,6 @@ public class FetchResponse extends AbstractResponse { public static final int INVALID_PREFERRED_REPLICA_ID = -1; private final FetchResponseData data; - // we build responseData when needed. - private volatile LinkedHashMap responseData = null; @Override public FetchResponseData data() { @@ -99,29 +97,19 @@ public Errors error() { } public LinkedHashMap responseData(Map topicNames, short version) { - if (responseData == null) { - synchronized (this) { - if (responseData == null) { - // Assigning the lazy-initialized `responseData` in the last step - // to avoid other threads accessing a half-initialized object. - final LinkedHashMap responseDataTmp = - new LinkedHashMap<>(); - data.responses().forEach(topicResponse -> { - String name; - if (version < 13) { - name = topicResponse.topic(); - } else { - name = topicNames.get(topicResponse.topicId()); - } - if (name != null) { - topicResponse.partitions().forEach(partition -> - responseDataTmp.put(new TopicPartition(name, partition.partitionIndex()), partition)); - } - }); - responseData = responseDataTmp; - } + final LinkedHashMap responseData = new LinkedHashMap<>(); + data.responses().forEach(topicResponse -> { + String name; + if (version < 13) { + name = topicResponse.topic(); + } else { + name = topicNames.get(topicResponse.topicId()); } - } + if (name != null) { + topicResponse.partitions().forEach(partition -> + responseData.put(new TopicPartition(name, partition.partitionIndex()), partition)); + } + }); return responseData; } From e396bcdbf2f4726b0b7fd5dbf8418c7692823dc6 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Fri, 5 Jul 2024 22:27:21 +0800 Subject: [PATCH 2/5] add new test for fetcher --- .../consumer/internals/FetcherTest.java | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index cb48a2289a16d..e460ca11d94aa 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -2884,6 +2884,11 @@ public void handleError(Throwable t) { verifySessionPartitions(); handler.handleError(t); } + + @Override + public Map sessionTopicNames() { + return handler.sessionTopicNames(); + } // Verify that session partitions can be traversed safely. private void verifySessionPartitions() { @@ -3665,6 +3670,52 @@ public void testWhenFetchResponseReturnsALeaderShipChangeErrorAndNewLeaderInform // Validate subscription is still valid & fetch-able for tp1. assertTrue(subscriptions.isFetchable(tp1)); } + + @Test + public void testFetcherDontCacheAnyData() { + buildFetcher(); + + subscriptions.assignFromUser(singleton(tp0)); + client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 4), + tp -> validLeaderEpoch, topicIds, false)); + subscriptions.seek(tp0, 0); + + // Send fetches + assertEquals(1, sendFetches()); + assertFalse(fetcher.hasCompletedFetches()); + + // Prepare response with records + client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 100L, 0)); + consumerClient.poll(time.timer(0)); + assertTrue(fetcher.hasCompletedFetches()); + fetchRecords().forEach((tp, records) -> { + assertEquals(3, records.size()); + assertEquals(4L, subscriptions.position(tp).offset); + }); + + // Send fetches again + assertEquals(1, sendFetches()); + assertFalse(fetcher.hasCompletedFetches()); + + // Prepare response with no records + client.prepareResponse(fullFetchResponse(tidp0, emptyRecords, Errors.NONE, 100L, 0)); + consumerClient.poll(time.timer(0)); + assertTrue(fetcher.hasCompletedFetches()); + fetchRecords(); + + // Send fetches again + assertEquals(1, sendFetches()); + assertFalse(fetcher.hasCompletedFetches()); + + // Prepare response with records + client.prepareResponse(fullFetchResponse(tidp0, nextRecords, Errors.NONE, 100L, 0)); + consumerClient.poll(time.timer(0)); + assertTrue(fetcher.hasCompletedFetches()); + fetchRecords().forEach((tp, records) -> { + assertEquals(2, records.size()); + assertEquals(6L, subscriptions.position(tp).offset); + }); + } private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse( TopicPartition topicPartition, From 2eecce2be1ac68c34552ef27c2228247cedd9711 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sat, 6 Jul 2024 09:38:39 +0800 Subject: [PATCH 3/5] simplify cache test --- .../consumer/internals/FetcherTest.java | 48 +++---------------- 1 file changed, 6 insertions(+), 42 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index e460ca11d94aa..720f5225d9896 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -3673,48 +3673,12 @@ public void testWhenFetchResponseReturnsALeaderShipChangeErrorAndNewLeaderInform @Test public void testFetcherDontCacheAnyData() { - buildFetcher(); - - subscriptions.assignFromUser(singleton(tp0)); - client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 4), - tp -> validLeaderEpoch, topicIds, false)); - subscriptions.seek(tp0, 0); - - // Send fetches - assertEquals(1, sendFetches()); - assertFalse(fetcher.hasCompletedFetches()); - - // Prepare response with records - client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 100L, 0)); - consumerClient.poll(time.timer(0)); - assertTrue(fetcher.hasCompletedFetches()); - fetchRecords().forEach((tp, records) -> { - assertEquals(3, records.size()); - assertEquals(4L, subscriptions.position(tp).offset); - }); - - // Send fetches again - assertEquals(1, sendFetches()); - assertFalse(fetcher.hasCompletedFetches()); - - // Prepare response with no records - client.prepareResponse(fullFetchResponse(tidp0, emptyRecords, Errors.NONE, 100L, 0)); - consumerClient.poll(time.timer(0)); - assertTrue(fetcher.hasCompletedFetches()); - fetchRecords(); - - // Send fetches again - assertEquals(1, sendFetches()); - assertFalse(fetcher.hasCompletedFetches()); - - // Prepare response with records - client.prepareResponse(fullFetchResponse(tidp0, nextRecords, Errors.NONE, 100L, 0)); - consumerClient.poll(time.timer(0)); - assertTrue(fetcher.hasCompletedFetches()); - fetchRecords().forEach((tp, records) -> { - assertEquals(2, records.size()); - assertEquals(6L, subscriptions.position(tp).offset); - }); + short version = 17; + FetchResponse fetchResponse = fetchResponse(tidp0, records, Errors.NONE, 100L, -1L, 0L, 0); + fetchResponse.responseData(topicNames, version) + .forEach((topicPartition, partitionData) -> assertEquals(records, partitionData.records())); + fetchResponse.responseData(new HashMap<>(), version) + .forEach((topicPartition, partitionData) -> assertEquals(MemoryRecords.EMPTY, partitionData.records())); } private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse( From a3ace40c455a3cc571fc737d6e27149eb0394675 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Mon, 8 Jul 2024 06:23:59 +0800 Subject: [PATCH 4/5] change to use Collections.emptyMap() --- .../apache/kafka/clients/consumer/internals/FetcherTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 720f5225d9896..a9f37ecfcb773 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -3677,7 +3677,7 @@ public void testFetcherDontCacheAnyData() { FetchResponse fetchResponse = fetchResponse(tidp0, records, Errors.NONE, 100L, -1L, 0L, 0); fetchResponse.responseData(topicNames, version) .forEach((topicPartition, partitionData) -> assertEquals(records, partitionData.records())); - fetchResponse.responseData(new HashMap<>(), version) + fetchResponse.responseData(Collections.emptyMap(), version) .forEach((topicPartition, partitionData) -> assertEquals(MemoryRecords.EMPTY, partitionData.records())); } From 12784e165a789c5facc4ba493dd4b7d9e7f5f4be Mon Sep 17 00:00:00 2001 From: m1a2st Date: Mon, 8 Jul 2024 17:49:41 +0800 Subject: [PATCH 5/5] assert size first --- .../kafka/clients/consumer/internals/FetcherTest.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index a9f37ecfcb773..94d88d5f4211c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -3675,10 +3675,12 @@ public void testWhenFetchResponseReturnsALeaderShipChangeErrorAndNewLeaderInform public void testFetcherDontCacheAnyData() { short version = 17; FetchResponse fetchResponse = fetchResponse(tidp0, records, Errors.NONE, 100L, -1L, 0L, 0); - fetchResponse.responseData(topicNames, version) - .forEach((topicPartition, partitionData) -> assertEquals(records, partitionData.records())); - fetchResponse.responseData(Collections.emptyMap(), version) - .forEach((topicPartition, partitionData) -> assertEquals(MemoryRecords.EMPTY, partitionData.records())); + LinkedHashMap responseData = fetchResponse.responseData(topicNames, version); + assertEquals(topicNames.size(), responseData.size()); + responseData.forEach((topicPartition, partitionData) -> assertEquals(records, partitionData.records())); + LinkedHashMap nonResponseData = fetchResponse.responseData(emptyMap(), version); + assertEquals(emptyMap().size(), nonResponseData.size()); + nonResponseData.forEach((topicPartition, partitionData) -> assertEquals(MemoryRecords.EMPTY, partitionData.records())); } private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse(