Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ public class ShareFetchResponse extends AbstractResponse {

private final ShareFetchResponseData data;

private volatile LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> responseData = null;

public ShareFetchResponse(ShareFetchResponseData data) {
super(ApiKeys.SHARE_FETCH);
this.data = data;
Expand Down Expand Up @@ -84,23 +82,14 @@ public Map<Errors, Integer> errorCounts() {
}

public LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> responseData(Map<Uuid, String> topicNames) {
if (responseData == null) {
synchronized (this) {
// Assigning the lazy-initialized `responseData` in the last step
// to avoid other threads accessing a half-initialized object.
if (responseData == null) {
final LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> responseDataTmp = new LinkedHashMap<>();
data.responses().forEach(topicResponse -> {
String name = topicNames.get(topicResponse.topicId());
if (name != null) {
topicResponse.partitions().forEach(partitionData -> responseDataTmp.put(new TopicIdPartition(topicResponse.topicId(),
new TopicPartition(name, partitionData.partitionIndex())), partitionData));
}
});
responseData = responseDataTmp;
}
final LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> responseData = new LinkedHashMap<>();
data.responses().forEach(topicResponse -> {
String name = topicNames.get(topicResponse.topicId());
if (name != null) {
topicResponse.partitions().forEach(partitionData -> responseData.put(new TopicIdPartition(topicResponse.topicId(),
new TopicPartition(name, partitionData.partitionIndex())), partitionData));
}
}
});
return responseData;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,23 @@
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.ShareFetchResponse;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -66,6 +75,21 @@ public void testErrorInAllPartitions() {
assertTrue(shareFetch.errorInAllPartitions());
}

@Test
public void testDontCacheAnyData() {
final TopicIdPartition tidp = new TopicIdPartition(Uuid.randomUuid(), 0, "topic");
MemoryRecords records = buildRecords(1L, 3, 1);

ShareFetchResponse shareFetch = shareFetchResponse(tidp, records, Errors.NONE, "", (short) 0,
"", List.of(), 0);
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> responseData = shareFetch.responseData(Map.of(tidp.topicId(), tidp.topic()));
assertEquals(1, responseData.size());
responseData.forEach((topicIdPartition, partitionData) -> assertEquals(records, partitionData.records()));

LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> nonResponseData = shareFetch.responseData(Map.of());
assertEquals(0, nonResponseData.size());
}

@Test
public void testErrorInAllPartitionsWithMultipleTopicIdPartitions() {
TopicIdPartition topicIdPartition0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
Expand Down Expand Up @@ -201,4 +225,27 @@ public void testMaybeCompleteWithExceptionWithExistingErroneousTopicPartition()
assertEquals(1, brokerTopicStats.allTopicsStats().failedShareFetchRequestRate().count());
assertEquals(1, brokerTopicStats.topicStats("foo").failedShareFetchRequestRate().count());
}

private MemoryRecords buildRecords(long baseOffset, int count, long firstMessageId) {
MemoryRecordsBuilder builder = MemoryRecords.builder(
ByteBuffer.allocate(1024), Compression.NONE, TimestampType.CREATE_TIME, baseOffset);
for (int i = 0; i < count; i++)
builder.append(0L, "key".getBytes(), ("value-" + (firstMessageId + i)).getBytes());
return builder.build();
}

private ShareFetchResponse shareFetchResponse(TopicIdPartition tp, MemoryRecords records, Errors error,
String errorMessage, short acknowledgeErrorCode, String acknowledgeErrorMessage,
List<ShareFetchResponseData.AcquiredRecords> acquiredRecords, int throttleTime) {
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> partitions = Map.of(tp,
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tp.topicPartition().partition())
.setErrorCode(error.code())
.setErrorMessage(errorMessage)
.setAcknowledgeErrorCode(acknowledgeErrorCode)
.setAcknowledgeErrorMessage(acknowledgeErrorMessage)
.setRecords(records)
.setAcquiredRecords(acquiredRecords));
return ShareFetchResponse.of(Errors.NONE, throttleTime, new LinkedHashMap<>(partitions), List.of(), Integer.MAX_VALUE);
}
}