Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ public interface ChangelogReader extends ChangelogRegister {
*/
void restore();

/**
* Update offset limit of a given changelog partition
*/
void updateLimitOffsets();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is only triggered internally and can be removed from interface.


/**
* Transit to restore active changelogs mode
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
Expand Down Expand Up @@ -99,11 +100,18 @@ static class ChangelogMetadata {

// only for active restoring tasks (for standby changelog it is null)
// NOTE we do not book keep the current offset since we leverage state manager as its source of truth
private Long restoreEndOffset;

// only for standby tasks that use source topics as changelogs (for active it is null);
// if it is not on source topics it is also null
private Long restoreLimitOffset;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following the suggestion from KAFKA-9113 PR, by @ableegoldman we consolidate the limit-offset with end-offset.


// the end offset beyond which records should not be applied (yet) to restore the states
//
// for both active restoring tasks and standby updating tasks, it is defined as:
// * log-end-offset if the changelog is not piggy-backed with source topic
// * min(log-end-offset, committed-offset) if the changelog is piggy-backed with source topic
//
// the log-end-offset only needs to be updated once and only need to be for active tasks since for standby
// tasks it would never "complete" based on the end-offset;
// the committed-offset needs to be updated periodically for those standby tasks
private Long restoreEndOffset;

// buffer records polled by the restore consumer;
private final List<ConsumerRecord<byte[], byte[]>> bufferedRecords;
Expand All @@ -113,14 +121,13 @@ static class ChangelogMetadata {
private int bufferedLimitIndex;

private ChangelogMetadata(final StateStoreMetadata storeMetadata, final ProcessorStateManager stateManager) {
this.changelogState = ChangelogState.REGISTERED;
this.storeMetadata = storeMetadata;
this.stateManager = stateManager;
this.changelogState = ChangelogState.REGISTERED;
this.restoreEndOffset = null;
this.totalRestored = 0L;

this.bufferedRecords = new ArrayList<>();
this.restoreLimitOffset = null;
this.bufferedLimitIndex = 0;
}

Expand All @@ -139,7 +146,7 @@ private void transitTo(final ChangelogState newState) {
public String toString() {
final Long currentOffset = storeMetadata.offset();
return changelogState + " " + stateManager.taskType() +
" (currentOffset " + currentOffset + ", endOffset " + restoreEndOffset + ", limitOffset " + restoreLimitOffset + ")";
" (currentOffset " + currentOffset + ", endOffset " + restoreEndOffset + ")";
}

// for testing only below
Expand All @@ -155,10 +162,6 @@ Long endOffset() {
return restoreEndOffset;
}

Long limitOffset() {
return restoreLimitOffset;
}

List<ConsumerRecord<byte[], byte[]>> bufferedRecords() {
return bufferedRecords;
}
Expand All @@ -168,10 +171,14 @@ int bufferedLimitIndex() {
}
}

private final static long DEFAULT_OFFSET_UPDATE_MS = 5 * 60 * 1000; // five minutes

private ChangelogReaderState state;

private final Time time;
private final Logger log;
private final Duration pollTime;
private final long updateOffsetIntervalMs;

// 1) we keep adding partitions to restore consumer whenever new tasks are registered with the state manager;
// 2) we do not unassign partitions when we switch between standbys and actives, we just pause / resume them;
Expand All @@ -188,18 +195,18 @@ int bufferedLimitIndex() {
// to update offset limit for standby tasks;
private Consumer<byte[], byte[]> mainConsumer;

// the flag indicating limit offsets could be updated --- this is only needed for standby tasks that have limit
// offsets enabled
private boolean updateLimitOffset;
private long lastUpdateOffsetTime;

void setMainConsumer(final Consumer<byte[], byte[]> consumer) {
this.mainConsumer = consumer;
}

public StoreChangelogReader(final StreamsConfig config,
public StoreChangelogReader(final Time time,
final StreamsConfig config,
final LogContext logContext,
final Consumer<byte[], byte[]> restoreConsumer,
final StateRestoreListener stateRestoreListener) {
this.time = time;
this.log = logContext.logger(StoreChangelogReader.class);
this.state = ChangelogReaderState.ACTIVE_RESTORING;
this.restoreConsumer = restoreConsumer;
Expand All @@ -209,9 +216,11 @@ public StoreChangelogReader(final StreamsConfig config,
// in order to make sure we call the main consumer#poll in time.
// TODO: once both of these are moved to a separate thread this may no longer be a concern
this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
this.updateOffsetIntervalMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) == Long.MAX_VALUE ?
DEFAULT_OFFSET_UPDATE_MS : config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user set this value to infinity we should still have a non-inf value to take care of the manual commit.

this.lastUpdateOffsetTime = 0L;

this.changelogs = new HashMap<>();
this.updateLimitOffset = false;
}

private static String recordEndOffset(final Long endOffset) {
Expand Down Expand Up @@ -311,7 +320,7 @@ public void register(final TopicPartition partition, final ProcessorStateManager

// initializing limit offset to 0L for standby changelog to effectively disable any restoration until it is updated
if (stateManager.taskType() == Task.TaskType.STANDBY && stateManager.changelogAsSource(partition)) {
changelogMetadata.restoreLimitOffset = 0L;
changelogMetadata.restoreEndOffset = 0L;
}

if (changelogs.putIfAbsent(partition, changelogMetadata) != null) {
Expand Down Expand Up @@ -391,10 +400,6 @@ public void restore() {
return;
}

if (updateLimitOffset) {
updateLimitOffsets();
}

final Set<TopicPartition> restoringChangelogs = restoringChangelogs();
if (!restoringChangelogs.isEmpty()) {
final ConsumerRecords<byte[], byte[]> polledRecords;
Expand All @@ -420,28 +425,30 @@ public void restore() {
// small batches; this can be optimized in the future, e.g. wait longer for larger batches.
restoreChangelog(changelogs.get(partition));
}


maybeUpdateLimitOffsetsForStandbyChangelogs();
}
}

// for standby changelogs, if there are buffered records not applicable, it means that the limit offset
// is there to prevent so, we can try to update the limit offset next time.
final Set<ChangelogMetadata> standbyChangelogs = changelogs.values().stream()
.filter(metadata -> metadata.stateManager.taskType() == Task.TaskType.STANDBY)
.collect(Collectors.toSet());
for (final ChangelogMetadata metadata : standbyChangelogs) {
if (!metadata.bufferedRecords().isEmpty()) {
updateLimitOffset = true;
break;
private void maybeUpdateLimitOffsetsForStandbyChangelogs() {
// for standby changelogs, if the interval has elapsed and there are buffered records not applicable,
// we can try to update the limit offset next time.
if (updateOffsetIntervalMs < time.milliseconds() - lastUpdateOffsetTime) {
final Set<ChangelogMetadata> standbyChangelogs = changelogs.values().stream()
.filter(metadata -> metadata.stateManager.taskType() == Task.TaskType.STANDBY)
.collect(Collectors.toSet());
for (final ChangelogMetadata metadata : standbyChangelogs) {
if (!metadata.bufferedRecords().isEmpty()) {
updateLimitOffsets();
break;
}
}
}
}

private void bufferChangelogRecords(final ChangelogMetadata changelogMetadata, final List<ConsumerRecord<byte[], byte[]>> records) {
// update the buffered records and limit index with the fetched records
final long limitOffset = Math.min(
changelogMetadata.restoreEndOffset == null ? Long.MAX_VALUE : changelogMetadata.restoreEndOffset,
changelogMetadata.restoreLimitOffset == null ? Long.MAX_VALUE : changelogMetadata.restoreLimitOffset
);

for (final ConsumerRecord<byte[], byte[]> record : records) {
// filter polled records for null-keys and also possibly update buffer limit index
if (record.key() == null) {
Expand All @@ -450,7 +457,7 @@ private void bufferChangelogRecords(final ChangelogMetadata changelogMetadata, f
} else {
changelogMetadata.bufferedRecords.add(record);
final long offset = record.offset();
if (offset < limitOffset)
if (changelogMetadata.restoreEndOffset == null || offset < changelogMetadata.restoreEndOffset)
changelogMetadata.bufferedLimitIndex = changelogMetadata.bufferedRecords.size();
}
}
Expand Down Expand Up @@ -517,16 +524,21 @@ private Map<TopicPartition, Long> committedOffsetForChangelogs(final Set<TopicPa
if (partitions.isEmpty())
return Collections.emptyMap();

final Map<TopicPartition, Long> committedOffsets;
try {
// those do not have a committed offset would default to 0
return mainConsumer.committed(partitions).entrySet().stream()
committedOffsets = mainConsumer.committed(partitions).entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : e.getValue().offset()));
} catch (final TimeoutException e) {
// if it timed out we just retry next time.
return Collections.emptyMap();
} catch (final KafkaException e) {
throw new StreamsException(String.format("Failed to retrieve end offsets for %s", partitions), e);
}

lastUpdateOffsetTime = time.milliseconds();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's nothing to be updated or timed out, we do not update the timer.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be less mysterious if this method were inlined into updateLimitOffsets. Right now, it's not terribly clear why it's ok to set the "last update offset time" in a method that doesn't update the offsets.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is triggered by another caller besides updateLimitOffsets, plus it is a bit close to the NPathComplexity threshold..


return committedOffsets;
}

private Map<TopicPartition, Long> endOffsetForChangelogs(final Set<TopicPartition> partitions) {
Expand All @@ -544,7 +556,10 @@ private Map<TopicPartition, Long> endOffsetForChangelogs(final Set<TopicPartitio
}
}

public void updateLimitOffsets() {
/**
* Update offset limit of a given changelog partition
*/
private void updateLimitOffsets() {
if (state != ChangelogReaderState.STANDBY_UPDATING) {
throw new IllegalStateException("We should not try to update standby tasks limit offsets if there are still" +
" active tasks for restoring");
Expand All @@ -556,8 +571,6 @@ public void updateLimitOffsets() {
.map(Map.Entry::getKey).collect(Collectors.toSet());

updateLimitOffsetsForStandbyChangelogs(committedOffsetForChangelogs(changelogsWithLimitOffsets));

updateLimitOffset = false;
}

private void updateLimitOffsetsForStandbyChangelogs(final Map<TopicPartition, Long> committedOffsets) {
Expand All @@ -568,18 +581,18 @@ private void updateLimitOffsetsForStandbyChangelogs(final Map<TopicPartition, Lo
committedOffsets.containsKey(partition)) {

final Long newLimit = committedOffsets.get(partition);
final Long previousLimit = metadata.restoreLimitOffset;
final Long previousLimit = metadata.restoreEndOffset;

if (previousLimit != null && previousLimit > newLimit) {
throw new IllegalStateException("Offset limit should monotonically increase, but was reduced for partition " +
partition + ". New limit: " + newLimit + ". Previous limit: " + previousLimit);
}

metadata.restoreLimitOffset = newLimit;
metadata.restoreEndOffset = newLimit;

// update the limit index for buffered records
while (metadata.bufferedLimitIndex < metadata.bufferedRecords.size() &&
metadata.bufferedRecords.get(metadata.bufferedLimitIndex).offset() < metadata.restoreLimitOffset)
metadata.bufferedRecords.get(metadata.bufferedLimitIndex).offset() < metadata.restoreEndOffset)
metadata.bufferedLimitIndex++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ public static StreamThread create(final InternalTopologyBuilder builder,
final Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(getRestoreConsumerClientId(threadId));
final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs);

final StoreChangelogReader changelogReader = new StoreChangelogReader(config, logContext, restoreConsumer, userStateRestoreListener);
final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, restoreConsumer, userStateRestoreListener);

final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ public void transitToUpdateStandby() {
// do nothing
}

@Override
public void updateLimitOffsets() {
// do nothing
}

@Override
public Set<TopicPartition> completedChangelogs() {
// assuming all restoring partitions are completed
Expand Down
Loading