Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,37 @@
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;

import java.util.Collections;
import java.util.Map;
import java.util.function.Function;

/**
* In the even of unclean leader election, the log will be truncated,
* In the event of an unclean leader election, the log will be truncated,
* previously committed data will be lost, and new data will be written
* over these offsets. When this happens, the consumer will detect the
* truncation and raise this exception (if no automatic reset policy
* has been defined) with the first offset to diverge from what the
* consumer read.
* has been defined) with the first offset known to diverge from what the
* consumer previously read.
*/
public class LogTruncationException extends OffsetOutOfRangeException {

private final Map<TopicPartition, OffsetAndMetadata> divergentOffsets;

public LogTruncationException(Map<TopicPartition, OffsetAndMetadata> divergentOffsets) {
super("Detected log truncation with diverging offsets " + divergentOffsets,
Utils.transformMap(divergentOffsets, Function.identity(), OffsetAndMetadata::offset));
public LogTruncationException(String message,
Map<TopicPartition, Long> fetchOffsets,
Map<TopicPartition, OffsetAndMetadata> divergentOffsets) {
super(message, fetchOffsets);
this.divergentOffsets = Collections.unmodifiableMap(divergentOffsets);
}

/**
* Get the offsets for the partitions which were truncated. This is the first offset which is known to diverge
* from what the consumer read.
* Get the divergent offsets for the partitions which were truncated. For each
* partition, this is the first offset which is known to diverge from what the
* consumer read.
*
* Note that there is no guarantee that this offset will be known. It is necessary
* to use {@link #partitions()} to see the set of partitions that were truncated
* and then check for the presence of a divergent offset in this map.
*/
public Map<TopicPartition, OffsetAndMetadata> divergentOffsets() {
return divergentOffsets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public OffsetOutOfRangeException(String message, Map<TopicPartition, Long> offse
this.offsetOutOfRangePartitions = offsetOutOfRangePartitions;
}

/**
* Get a map of the topic partitions and the respective out-of-range fetch offsets.
*/
public Map<TopicPartition, Long> offsetOutOfRangePartitions() {
return offsetOutOfRangePartitions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.ApiVersion;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.ApiVersion;
import org.apache.kafka.clients.StaleMetadataException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -812,7 +812,7 @@ private void validateOffsetsAsync(Map<TopicPartition, FetchPosition> partitionsT
future.addListener(new RequestFutureListener<OffsetForEpochResult>() {
@Override
public void onSuccess(OffsetForEpochResult offsetsResult) {
Map<TopicPartition, OffsetAndMetadata> truncationWithoutResetPolicy = new HashMap<>();
List<SubscriptionState.LogTruncation> truncations = new ArrayList<>();
if (!offsetsResult.partitionsToRetry().isEmpty()) {
subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), time.milliseconds() + retryBackoffMs);
metadata.requestUpdate();
Expand All @@ -824,32 +824,15 @@ public void onSuccess(OffsetForEpochResult offsetsResult) {
//
// In addition, check whether the returned offset and epoch are valid. If not, then we should reset
// its offset if reset policy is configured, or throw out of range exception.
offsetsResult.endOffsets().forEach((respTopicPartition, respEndOffset) -> {
FetchPosition requestPosition = fetchPositions.get(respTopicPartition);

if (respEndOffset.hasUndefinedEpochOrOffset()) {
try {
handleOffsetOutOfRange(requestPosition, respTopicPartition,
"Failed leader offset epoch validation for " + requestPosition
+ " since no end offset larger than current fetch epoch was reported");
} catch (OffsetOutOfRangeException e) {
// Catch the exception here to ensure finishing other partitions validation.
setFatalOffsetForLeaderException(e);
}
} else {
Optional<OffsetAndMetadata> divergentOffsetOpt = subscriptions.maybeCompleteValidation(
respTopicPartition, requestPosition, respEndOffset);
divergentOffsetOpt.ifPresent(
divergentOffset -> {
log.info("Detected log truncation for topic partition {} with diverging offset {}",
respTopicPartition, divergentOffset);
truncationWithoutResetPolicy.put(respTopicPartition, divergentOffset);
});
}
offsetsResult.endOffsets().forEach((topicPartition, respEndOffset) -> {
FetchPosition requestPosition = fetchPositions.get(topicPartition);
Optional<SubscriptionState.LogTruncation> truncationOpt =
subscriptions.maybeCompleteValidation(topicPartition, requestPosition, respEndOffset);
truncationOpt.ifPresent(truncations::add);
});

if (!truncationWithoutResetPolicy.isEmpty()) {
setFatalOffsetForLeaderException(new LogTruncationException(truncationWithoutResetPolicy));
if (!truncations.isEmpty()) {
maybeSetOffsetForLeaderException(buildLogTruncationException(truncations));
}
}

Expand All @@ -858,14 +841,28 @@ public void onFailure(RuntimeException e) {
subscriptions.requestFailed(fetchPositions.keySet(), time.milliseconds() + retryBackoffMs);
metadata.requestUpdate();

setFatalOffsetForLeaderException(e);
if (!(e instanceof RetriableException)) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we move this check out of setFatalOffsetForLeaderException?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seemed inconsistent to have a method named setFatal which checks for retriable exceptions.

maybeSetOffsetForLeaderException(e);
}
}
});
});
}

private void setFatalOffsetForLeaderException(RuntimeException e) {
if (!(e instanceof RetriableException) && !cachedOffsetForLeaderException.compareAndSet(null, e)) {
private LogTruncationException buildLogTruncationException(List<SubscriptionState.LogTruncation> truncations) {
Map<TopicPartition, OffsetAndMetadata> divergentOffsets = new HashMap<>();
Map<TopicPartition, Long> truncatedFetchOffsets = new HashMap<>();
for (SubscriptionState.LogTruncation truncation : truncations) {
truncation.divergentOffsetOpt.ifPresent(divergentOffset ->
divergentOffsets.put(truncation.topicPartition, divergentOffset));
truncatedFetchOffsets.put(truncation.topicPartition, truncation.fetchPosition.offset);
}
return new LogTruncationException("Detected truncated partitions: " + truncations,
truncatedFetchOffsets, divergentOffsets);
}

private void maybeSetOffsetForLeaderException(RuntimeException e) {
if (!cachedOffsetForLeaderException.compareAndSet(null, e)) {
log.error("Discarding error in OffsetsForLeaderEpoch because another error is pending", e);
}
}
Expand Down Expand Up @@ -1296,7 +1293,7 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc
log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " +
"does not match the current offset {}", tp, fetchOffset, position);
} else {
handleOffsetOutOfRange(position, tp, "error response in offset fetch");
handleOffsetOutOfRange(position, tp);
}
} else {
log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}",
Expand Down Expand Up @@ -1336,23 +1333,15 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc
return completedFetch;
}

private void handleOffsetOutOfRange(FetchPosition fetchPosition,
TopicPartition topicPartition,
String reason) {
private void handleOffsetOutOfRange(FetchPosition fetchPosition, TopicPartition topicPartition) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we could still share handleOffsetOutOfRange in two places by letting it return a struct of Optional<LogTruncation> and decide when to throw it by the caller.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about it, it seemed simpler to always use LogTruncationException for validation failures, even if the divergent offset is not known. Then direct OffsetOutOfRange errors are reserved for fetch responses which indicate the OFFSET_OUT_OF_RANGE error.

String errorMessage = "Fetch position " + fetchPosition + " is out of range for partition " + topicPartition;
if (subscriptions.hasDefaultOffsetResetPolicy()) {
log.info("Fetch offset epoch {} is out of range for partition {}, resetting offset",
fetchPosition, topicPartition);
log.info("{}, resetting offset", errorMessage);
subscriptions.requestOffsetReset(topicPartition);
} else {
Map<TopicPartition, Long> offsetOutOfRangePartitions =
Collections.singletonMap(topicPartition, fetchPosition.offset);
String errorMessage = String.format("Offsets out of range " +
"with no configured reset policy for partitions: %s" +
", for fetch offset: %d, " +
"root cause: %s",
offsetOutOfRangePartitions, fetchPosition.offset, reason);
log.info(errorMessage);
throw new OffsetOutOfRangeException(errorMessage, offsetOutOfRangePartitions);
log.info("{}, raising error to the application since no reset policy is configured", errorMessage);
throw new OffsetOutOfRangeException(errorMessage,
Comment thread
hachikuji marked this conversation as resolved.
Outdated
Collections.singletonMap(topicPartition, fetchPosition.offset));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,11 +464,11 @@ public synchronized boolean maybeValidatePositionForCurrentLeader(ApiVersions ap

/**
* Attempt to complete validation with the end offset returned from the OffsetForLeaderEpoch request.
* @return The diverging offset if truncation was detected and no reset policy is defined.
* @return Log truncation details if detected and no reset policy is defined.
*/
public synchronized Optional<OffsetAndMetadata> maybeCompleteValidation(TopicPartition tp,
FetchPosition requestPosition,
EpochEndOffset epochEndOffset) {
public synchronized Optional<LogTruncation> maybeCompleteValidation(TopicPartition tp,
FetchPosition requestPosition,
EpochEndOffset epochEndOffset) {
TopicPartitionState state = assignedStateOrNull(tp);
if (state == null) {
log.debug("Skipping completed validation for partition {} which is not currently assigned.", tp);
Expand All @@ -480,6 +480,17 @@ public synchronized Optional<OffsetAndMetadata> maybeCompleteValidation(TopicPar
log.debug("Skipping completed validation for partition {} since the current position {} " +
"no longer matches the position {} when the request was sent",
tp, currentPosition, requestPosition);
} else if (epochEndOffset.hasUndefinedEpochOrOffset()) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking where is the best to put the check, since previously it was before maybeCompleteValidation. If the partition is not awaiting validation or the returned result doesn't match our current position, should we still trigger undefined epoch offset logic here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the fetch position has changed or we are no longer awaiting validation, we want to ignore the result. This was a bug in the previous patch which we didn't catch.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so before this change, we were raising OffsetOutOfRangeException regardless of the state of the subscription which meant that a regular truncation case was being masked as a failed offset validation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was what @abbccdda and I had agreed in the previous PR. The problem was that we didn't have divergent offsets to include in the exception, so we just raised it as OffsetOutOfRange. After I noticed the problem with LogTruncationException here, I decided to just simplify the logic here and return the truncation exception with the divergent offsets undefined.

if (hasDefaultOffsetResetPolicy()) {
log.info("Truncation detected for partition {} at offset {}, resetting offset",
tp, currentPosition);

requestOffsetReset(tp);
} else {
log.warn("Truncation detected for partition {} at offset {}, but no reset policy is set",
tp, currentPosition);
return Optional.of(new LogTruncation(tp, requestPosition, Optional.empty()));
}
} else if (epochEndOffset.endOffset() < currentPosition.offset) {
if (hasDefaultOffsetResetPolicy()) {
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
Expand All @@ -489,11 +500,12 @@ public synchronized Optional<OffsetAndMetadata> maybeCompleteValidation(TopicPar
"the first offset known to diverge {}", tp, currentPosition, newPosition);
state.seekValidated(newPosition);
} else {
OffsetAndMetadata divergentOffset = new OffsetAndMetadata(epochEndOffset.endOffset(),
Optional.of(epochEndOffset.leaderEpoch()), null);
log.warn("Truncation detected for partition {} at offset {} (the end offset from the " +
"broker is {}), but no reset policy is set",
tp, currentPosition, epochEndOffset);
return Optional.of(new OffsetAndMetadata(epochEndOffset.endOffset(),
Optional.of(epochEndOffset.leaderEpoch()), null));
"broker is {}), but no reset policy is set",
tp, currentPosition, divergentOffset);
return Optional.of(new LogTruncation(tp, requestPosition, Optional.of(divergentOffset)));
}
} else {
state.completeValidation();
Expand Down Expand Up @@ -1083,4 +1095,43 @@ public String toString() {
'}';
}
}

public static class LogTruncation {
public final TopicPartition topicPartition;
public final FetchPosition fetchPosition;
public final Optional<OffsetAndMetadata> divergentOffsetOpt;

public LogTruncation(TopicPartition topicPartition,
FetchPosition fetchPosition,
Optional<OffsetAndMetadata> divergentOffsetOpt) {
this.topicPartition = topicPartition;
this.fetchPosition = fetchPosition;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should require non-null for fetchPosition

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why fetchPosition specifically and not the other fields?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a second thought, I don't feel strong about it.

this.divergentOffsetOpt = divergentOffsetOpt;
}

@Override
public String toString() {
StringBuilder bldr = new StringBuilder()
.append("(partition=")
.append(topicPartition)
.append(", fetchOffset=")
.append(fetchPosition.offset)
.append(", fetchEpoch=")
.append(fetchPosition.offsetEpoch);

if (divergentOffsetOpt.isPresent()) {
OffsetAndMetadata divergentOffset = divergentOffsetOpt.get();
bldr.append(", divergentOffset=")
.append(divergentOffset.offset())
.append(", divergentEpoch=")
.append(divergentOffset.leaderEpoch());
} else {
bldr.append(", divergentOffset=unknown")
.append(", divergentEpoch=unknown");
}

return bldr.append(")").toString();

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.LogTruncationException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
Expand Down Expand Up @@ -3856,12 +3857,16 @@ private void testOffsetValidationWithGivenEpochOffset(final EpochEndOffset epoch
consumerClient.poll(time.timer(Duration.ZERO));

if (offsetResetStrategy == OffsetResetStrategy.NONE) {
OffsetOutOfRangeException thrown =
assertThrows(OffsetOutOfRangeException.class, () -> fetcher.validateOffsetsIfNeeded());
LogTruncationException thrown =
assertThrows(LogTruncationException.class, () -> fetcher.validateOffsetsIfNeeded());
assertEquals(singletonMap(tp0, initialOffset), thrown.offsetOutOfRangePartitions());

// If epoch offset is valid, we are testing for the log truncation case.
if (!epochEndOffset.hasUndefinedEpochOrOffset()) {
assertTrue(thrown instanceof LogTruncationException);
if (epochEndOffset.hasUndefinedEpochOrOffset()) {
assertEquals(Collections.emptyMap(), thrown.divergentOffsets());
} else {
OffsetAndMetadata expectedDivergentOffset = new OffsetAndMetadata(
epochEndOffset.endOffset(), Optional.of(epochEndOffset.leaderEpoch()), "");
assertEquals(singletonMap(tp0, expectedDivergentOffset), thrown.divergentOffsets());
}
assertTrue(subscriptions.awaitingValidation(tp0));
} else {
Expand Down
Loading