Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -512,11 +512,11 @@ private LogOffsetMetadata endOffsetMetadataForTopicPartition(TopicIdPartition to
// extend it to support other FetchIsolation types.
FetchIsolation isolationType = shareFetch.fetchParams().isolation;
if (isolationType == FetchIsolation.LOG_END)
return offsetSnapshot.logEndOffset;
return offsetSnapshot.logEndOffset();
else if (isolationType == FetchIsolation.HIGH_WATERMARK)
return offsetSnapshot.highWatermark;
return offsetSnapshot.highWatermark();
else
return offsetSnapshot.lastStableOffset;
return offsetSnapshot.lastStableOffset();

}

Expand Down Expand Up @@ -835,8 +835,8 @@ private void completeRemoteStorageShareFetchRequest() {
for (RemoteFetch remoteFetch : pendingRemoteFetchesOpt.get().remoteFetches()) {
if (remoteFetch.remoteFetchResult().isDone()) {
RemoteLogReadResult remoteLogReadResult = remoteFetch.remoteFetchResult().get();
if (remoteLogReadResult.error.isPresent()) {
Throwable error = remoteLogReadResult.error.get();
if (remoteLogReadResult.error().isPresent()) {
Throwable error = remoteLogReadResult.error().get();
// If there is any error for the remote fetch topic partition, we populate the error accordingly.
shareFetchPartitionData.add(
new ShareFetchPartitionData(
Expand All @@ -846,7 +846,7 @@ private void completeRemoteStorageShareFetchRequest() {
)
);
} else {
FetchDataInfo info = remoteLogReadResult.fetchDataInfo.get();
FetchDataInfo info = remoteLogReadResult.fetchDataInfo().get();
TopicIdPartition topicIdPartition = remoteFetch.topicIdPartition();
LogReadResult logReadResult = remoteFetch.logReadResult();
shareFetchPartitionData.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ void testUnscheduleProducerTask() throws IOException {
leaderEpochCache,
producerStateManager,
new ConcurrentHashMap<>(), false).load();
LocalLog localLog = new LocalLog(logDir, logConfig, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, scheduler, mockTime, topicPartition, logDirFailureChannel);
UnifiedLog log = new UnifiedLog(offsets.logStartOffset,
LocalLog localLog = new LocalLog(logDir, logConfig, segments, offsets.recoveryPoint(),
offsets.nextOffsetMetadata(), scheduler, mockTime, topicPartition, logDirFailureChannel);
UnifiedLog log = new UnifiedLog(offsets.logStartOffset(),
localLog,
brokerTopicStats,
producerIdExpirationCheckIntervalMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,13 @@
*/
package org.apache.kafka.server.log.remote.quota;

public class RLMQuotaManagerConfig {
/**
* Configuration settings for quota management
*
* @param quotaBytesPerSecond The quota in bytes per second
* @param numQuotaSamples The number of samples to retain in memory
* @param quotaWindowSizeSeconds The time span of each sample
*/
public record RLMQuotaManagerConfig(long quotaBytesPerSecond, int numQuotaSamples, int quotaWindowSizeSeconds) {
public static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600;

private final long quotaBytesPerSecond;
private final int numQuotaSamples;
private final int quotaWindowSizeSeconds;

/**
* Configuration settings for quota management
*
* @param quotaBytesPerSecond The quota in bytes per second
* @param numQuotaSamples The number of samples to retain in memory
* @param quotaWindowSizeSeconds The time span of each sample
*/
public RLMQuotaManagerConfig(long quotaBytesPerSecond, int numQuotaSamples, int quotaWindowSizeSeconds) {
this.quotaBytesPerSecond = quotaBytesPerSecond;
this.numQuotaSamples = numQuotaSamples;
this.quotaWindowSizeSeconds = quotaWindowSizeSeconds;
}

public long quotaBytesPerSecond() {
return quotaBytesPerSecond;
}

public int numQuotaSamples() {
return numQuotaSamples;
}

public int quotaWindowSizeSeconds() {
return quotaWindowSizeSeconds;
}

@Override
public String toString() {
return "RLMQuotaManagerConfig{" +
"quotaBytesPerSecond=" + quotaBytesPerSecond +
", numQuotaSamples=" + numQuotaSamples +
", quotaWindowSizeSeconds=" + quotaWindowSizeSeconds +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,11 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader {
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.log.remote", "RemoteLogManager");

// The endpoint for remote log metadata manager to connect to
private Optional<Endpoint> endpoint = Optional.empty();
private final Optional<Endpoint> endpoint;
private final Timer remoteReadTimer;

private boolean closed = false;

private final Timer remoteReadTimer;
private volatile DelayedOperationPurgatory<DelayedRemoteListOffsets> delayedRemoteListOffsetsPurgatory;

/**
Expand Down Expand Up @@ -1010,7 +1011,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, RemoteLogSegment

List<EpochEntry> epochEntries = getLeaderEpochEntries(log, segment.baseOffset(), nextSegmentBaseOffset);
Map<Integer, Long> segmentLeaderEpochs = new HashMap<>(epochEntries.size());
epochEntries.forEach(entry -> segmentLeaderEpochs.put(entry.epoch, entry.startOffset));
epochEntries.forEach(entry -> segmentLeaderEpochs.put(entry.epoch(), entry.startOffset()));

boolean isTxnIdxEmpty = segment.txnIndex().isEmpty();
RemoteLogSegmentMetadata copySegmentStartedRlsm = new RemoteLogSegmentMetadata(segmentId, segment.baseOffset(), endOffset,
Expand Down Expand Up @@ -1071,7 +1072,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, RemoteLogSegment

// `epochEntries` cannot be empty, there is a pre-condition validation in RemoteLogSegmentMetadata
// constructor
int lastEpochInSegment = epochEntries.get(epochEntries.size() - 1).epoch;
int lastEpochInSegment = epochEntries.get(epochEntries.size() - 1).epoch();
copiedOffsetOption = Optional.of(new OffsetAndEpoch(endOffset, lastEpochInSegment));
// Update the highest offset in remote storage for this partition's log so that the local log segments
// are not deleted before they are copied to remote storage.
Expand Down Expand Up @@ -1206,7 +1207,7 @@ private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earl
RemoteLogSegmentMetadata metadata)
throws RemoteStorageException, ExecutionException, InterruptedException {
boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
ignored -> metadata.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch));
ignored -> metadata.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch()));
if (isSegmentDeleted) {
logger.info("Deleted remote log segment {} due to leader-epoch-cache truncation. " +
"Current earliest-epoch-entry: {}, segment-end-offset: {} and segment-epochs: {}",
Expand Down Expand Up @@ -1375,7 +1376,7 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE
if (earliestEpochEntryOptional.isPresent()) {
EpochEntry earliestEpochEntry = earliestEpochEntryOptional.get();
Iterator<Integer> epochsToClean = remoteLeaderEpochs.stream()
.filter(remoteEpoch -> remoteEpoch < earliestEpochEntry.epoch)
.filter(remoteEpoch -> remoteEpoch < earliestEpochEntry.epoch())
.iterator();

List<RemoteLogSegmentMetadata> listOfSegmentsToBeCleaned = new ArrayList<>();
Expand Down Expand Up @@ -1658,11 +1659,11 @@ static NavigableMap<Integer, Long> buildFilteredLeaderEpochMap(NavigableMap<Inte
}

public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException {
int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
TopicPartition tp = remoteStorageFetchInfo.topicIdPartition.topicPartition();
FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo;
int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes();
TopicPartition tp = remoteStorageFetchInfo.topicIdPartition().topicPartition();
FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo();

boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED;
boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation() == FetchIsolation.TXN_COMMITTED;

long offset = fetchInfo.fetchOffset;
int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
Expand Down Expand Up @@ -1714,14 +1715,14 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws
// An empty record is sent instead of an incomplete batch when
// - there is no minimum-one-message constraint and
// - the first batch size is more than maximum bytes that can be sent and
if (!remoteStorageFetchInfo.minOneMessage && firstBatchSize > maxBytes) {
if (!remoteStorageFetchInfo.minOneMessage() && firstBatchSize > maxBytes) {
LOGGER.debug("Returning empty record for offset {} in partition {} because the first batch size {} " +
"is greater than max fetch bytes {}", offset, tp, firstBatchSize, maxBytes);
return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY);
}

int updatedFetchSize =
remoteStorageFetchInfo.minOneMessage && firstBatchSize > maxBytes ? firstBatchSize : maxBytes;
remoteStorageFetchInfo.minOneMessage() && firstBatchSize > maxBytes ? firstBatchSize : maxBytes;

ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
int remainingBytes = updatedFetchSize;
Expand Down Expand Up @@ -1770,7 +1771,7 @@ private FetchDataInfo addAbortedTransactions(long startOffset,

OffsetIndex offsetIndex = indexCache.getIndexEntry(segmentMetadata).offsetIndex();
long upperBoundOffset = offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
.map(position -> position.offset).orElse(segmentMetadata.endOffset() + 1);
.map(OffsetPosition::offset).orElse(segmentMetadata.endOffset() + 1);

final Set<FetchResponseData.AbortedTransaction> abortedTransactions = new HashSet<>();

Expand Down Expand Up @@ -1815,8 +1816,8 @@ private void collectAbortedTransactions(long startOffset,
if (txnIndexOpt.isPresent()) {
TransactionIndex txnIndex = txnIndexOpt.get();
TxnIndexSearchResult searchResult = txnIndex.collectAbortedTxns(startOffset, upperBoundOffset);
accumulator.accept(searchResult.abortedTransactions);
isSearchComplete = searchResult.isComplete;
accumulator.accept(searchResult.abortedTransactions());
isSearchComplete = searchResult.isComplete();
}
if (!isSearchComplete) {
currentMetadataOpt = findNextSegmentWithTxnIndex(tp, currentMetadata.endOffset() + 1, leaderEpochCache);
Expand Down Expand Up @@ -1844,8 +1845,8 @@ private void collectAbortedTransactionInLocalSegments(long startOffset,
TransactionIndex txnIndex = localLogSegments.next().txnIndex();
if (txnIndex != null) {
TxnIndexSearchResult searchResult = txnIndex.collectAbortedTxns(startOffset, upperBoundOffset);
accumulator.accept(searchResult.abortedTransactions);
if (searchResult.isComplete) {
accumulator.accept(searchResult.abortedTransactions());
if (searchResult.isComplete()) {
return;
}
}
Expand Down Expand Up @@ -1886,9 +1887,9 @@ Optional<RemoteLogSegmentMetadata> findNextSegmentWithTxnIndex(TopicPartition tp
}
int initialEpoch = initialEpochOpt.getAsInt();
for (EpochEntry epochEntry : leaderEpochCache.epochEntries()) {
if (epochEntry.epoch >= initialEpoch) {
long startOffset = Math.max(epochEntry.startOffset, offset);
Optional<RemoteLogSegmentMetadata> metadataOpt = fetchNextSegmentWithTxnIndex(tp, epochEntry.epoch, startOffset);
if (epochEntry.epoch() >= initialEpoch) {
long startOffset = Math.max(epochEntry.startOffset(), offset);
Optional<RemoteLogSegmentMetadata> metadataOpt = fetchNextSegmentWithTxnIndex(tp, epochEntry.epoch(), startOffset);
if (metadataOpt.isPresent()) {
return metadataOpt;
}
Expand Down Expand Up @@ -1917,7 +1918,7 @@ OffsetAndEpoch findHighestRemoteOffset(TopicIdPartition topicIdPartition, Unifie
LeaderEpochFileCache leaderEpochCache = log.leaderEpochCache();
Optional<EpochEntry> maybeEpochEntry = leaderEpochCache.latestEntry();
while (offsetAndEpoch == null && maybeEpochEntry.isPresent()) {
int epoch = maybeEpochEntry.get().epoch;
int epoch = maybeEpochEntry.get().epoch();
Optional<Long> highestRemoteOffsetOpt =
remoteLogMetadataManagerPlugin.get().highestOffsetForEpoch(topicIdPartition, epoch);
if (highestRemoteOffsetOpt.isPresent()) {
Expand Down Expand Up @@ -1946,7 +1947,7 @@ long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throw
Optional<Long> logStartOffset = Optional.empty();
LeaderEpochFileCache leaderEpochCache = log.leaderEpochCache();
OptionalInt earliestEpochOpt = leaderEpochCache.earliestEntry()
.map(epochEntry -> OptionalInt.of(epochEntry.epoch))
.map(epochEntry -> OptionalInt.of(epochEntry.epoch()))
.orElseGet(OptionalInt::empty);
while (logStartOffset.isEmpty() && earliestEpochOpt.isPresent()) {
Iterator<RemoteLogSegmentMetadata> iterator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
this.rlm = rlm;
this.brokerTopicStats = brokerTopicStats;
this.callback = callback;
this.brokerTopicStats.topicStats(fetchInfo.topicIdPartition.topic()).remoteFetchRequestRate().mark();
this.brokerTopicStats.topicStats(fetchInfo.topicIdPartition().topic()).remoteFetchRequestRate().mark();
this.brokerTopicStats.allTopicsStats().remoteFetchRequestRate().mark();
this.quotaManager = quotaManager;
this.remoteReadTimer = remoteReadTimer;
Expand All @@ -61,21 +61,21 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
public Void call() {
RemoteLogReadResult result;
try {
LOGGER.debug("Reading records from remote storage for topic partition {}", fetchInfo.topicIdPartition);
LOGGER.debug("Reading records from remote storage for topic partition {}", fetchInfo.topicIdPartition());
FetchDataInfo fetchDataInfo = remoteReadTimer.time(() -> rlm.read(fetchInfo));
brokerTopicStats.topicStats(fetchInfo.topicIdPartition.topic()).remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes());
brokerTopicStats.topicStats(fetchInfo.topicIdPartition().topic()).remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes());
brokerTopicStats.allTopicsStats().remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes());
result = new RemoteLogReadResult(Optional.of(fetchDataInfo), Optional.empty());
} catch (OffsetOutOfRangeException e) {
result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
} catch (Exception e) {
brokerTopicStats.topicStats(fetchInfo.topicIdPartition.topic()).failedRemoteFetchRequestRate().mark();
brokerTopicStats.topicStats(fetchInfo.topicIdPartition().topic()).failedRemoteFetchRequestRate().mark();
brokerTopicStats.allTopicsStats().failedRemoteFetchRequestRate().mark();
LOGGER.error("Error occurred while reading the remote data for {}", fetchInfo.topicIdPartition, e);
LOGGER.error("Error occurred while reading the remote data for {}", fetchInfo.topicIdPartition(), e);
result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
}
LOGGER.debug("Finished reading records from remote storage for topic partition {}", fetchInfo.topicIdPartition);
quotaManager.record(result.fetchDataInfo.map(fetchDataInfo -> fetchDataInfo.records.sizeInBytes()).orElse(0));
LOGGER.debug("Finished reading records from remote storage for topic partition {}", fetchInfo.topicIdPartition());
quotaManager.record(result.fetchDataInfo().map(fetchDataInfo -> fetchDataInfo.records.sizeInBytes()).orElse(0));
callback.accept(result);
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public OptionalLong read() {
Content content = Json.parseStringAs(text, Content.class);
return OptionalLong.of(content.brokerEpoch);
} catch (Exception e) {
logger.debug("Fail to read the clean shutdown file in " + cleanShutdownFile.toPath() + ":" + e);
logger.debug("Fail to read the clean shutdown file in {}", cleanShutdownFile.toPath(), e);
return OptionalLong.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static File newFile(File dir) {
private static class Formatter implements EntryFormatter<EpochEntry> {

public String toString(EpochEntry entry) {
return entry.epoch + " " + entry.startOffset;
return entry.epoch() + " " + entry.startOffset();
}

public Optional<EpochEntry> fromString(String line) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,7 @@ public Optional<TopicPartitionOffset> fromString(String line) {
}
}

static class TopicPartitionOffset {

final TopicPartition tp;
final long offset;

TopicPartitionOffset(TopicPartition tp, long offset) {
this.tp = tp;
this.offset = offset;
}
record TopicPartitionOffset(TopicPartition tp, long offset) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,7 @@

import org.apache.kafka.common.Uuid;

public class PartitionMetadata {
private final int version;
private final Uuid topicId;

public PartitionMetadata(int version, Uuid topicId) {
this.version = version;
this.topicId = topicId;
}

public int version() {
return version;
}

public Uuid topicId() {
return topicId;
}
public record PartitionMetadata(int version, Uuid topicId) {

public String encode() {
return "version: " + version + "\ntopic_id: " + topicId;
Expand Down
Loading
Loading