diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java index 60d244951df93..336eed4a3b4bf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java @@ -17,33 +17,37 @@ package org.apache.kafka.clients.consumer; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.Utils; import java.util.Collections; import java.util.Map; -import java.util.function.Function; /** - * In the even of unclean leader election, the log will be truncated, + * In the event of an unclean leader election, the log will be truncated, * previously committed data will be lost, and new data will be written * over these offsets. When this happens, the consumer will detect the * truncation and raise this exception (if no automatic reset policy - * has been defined) with the first offset to diverge from what the - * consumer read. + * has been defined) with the first offset known to diverge from what the + * consumer previously read. */ public class LogTruncationException extends OffsetOutOfRangeException { private final Map divergentOffsets; - public LogTruncationException(Map divergentOffsets) { - super("Detected log truncation with diverging offsets " + divergentOffsets, - Utils.transformMap(divergentOffsets, Function.identity(), OffsetAndMetadata::offset)); + public LogTruncationException(String message, + Map fetchOffsets, + Map divergentOffsets) { + super(message, fetchOffsets); this.divergentOffsets = Collections.unmodifiableMap(divergentOffsets); } /** - * Get the offsets for the partitions which were truncated. This is the first offset which is known to diverge - * from what the consumer read. + * Get the divergent offsets for the partitions which were truncated. For each + * partition, this is the first offset which is known to diverge from what the + * consumer read. + * + * Note that there is no guarantee that this offset will be known. It is necessary + * to use {@link #partitions()} to see the set of partitions that were truncated + * and then check for the presence of a divergent offset in this map. */ public Map divergentOffsets() { return divergentOffsets; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java index 6b662932b2e90..c98e22fc82587 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java @@ -40,6 +40,9 @@ public OffsetOutOfRangeException(String message, Map offse this.offsetOutOfRangePartitions = offsetOutOfRangePartitions; } + /** + * Get a map of the topic partitions and the respective out-of-range fetch offsets. + */ public Map offsetOutOfRangePartitions() { return offsetOutOfRangePartitions; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index d6dcda49766f6..52c5155fe1fe0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -16,12 +16,12 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.ApiVersion; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.FetchSessionHandler; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NodeApiVersions; -import org.apache.kafka.clients.ApiVersion; import org.apache.kafka.clients.StaleMetadataException; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -812,7 +812,7 @@ private void validateOffsetsAsync(Map partitionsT future.addListener(new RequestFutureListener() { @Override public void onSuccess(OffsetForEpochResult offsetsResult) { - Map truncationWithoutResetPolicy = new HashMap<>(); + List truncations = new ArrayList<>(); if (!offsetsResult.partitionsToRetry().isEmpty()) { subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), time.milliseconds() + retryBackoffMs); metadata.requestUpdate(); @@ -824,32 +824,15 @@ public void onSuccess(OffsetForEpochResult offsetsResult) { // // In addition, check whether the returned offset and epoch are valid. If not, then we should reset // its offset if reset policy is configured, or throw out of range exception. - offsetsResult.endOffsets().forEach((respTopicPartition, respEndOffset) -> { - FetchPosition requestPosition = fetchPositions.get(respTopicPartition); - - if (respEndOffset.hasUndefinedEpochOrOffset()) { - try { - handleOffsetOutOfRange(requestPosition, respTopicPartition, - "Failed leader offset epoch validation for " + requestPosition - + " since no end offset larger than current fetch epoch was reported"); - } catch (OffsetOutOfRangeException e) { - // Catch the exception here to ensure finishing other partitions validation. - setFatalOffsetForLeaderException(e); - } - } else { - Optional divergentOffsetOpt = subscriptions.maybeCompleteValidation( - respTopicPartition, requestPosition, respEndOffset); - divergentOffsetOpt.ifPresent( - divergentOffset -> { - log.info("Detected log truncation for topic partition {} with diverging offset {}", - respTopicPartition, divergentOffset); - truncationWithoutResetPolicy.put(respTopicPartition, divergentOffset); - }); - } + offsetsResult.endOffsets().forEach((topicPartition, respEndOffset) -> { + FetchPosition requestPosition = fetchPositions.get(topicPartition); + Optional truncationOpt = + subscriptions.maybeCompleteValidation(topicPartition, requestPosition, respEndOffset); + truncationOpt.ifPresent(truncations::add); }); - if (!truncationWithoutResetPolicy.isEmpty()) { - setFatalOffsetForLeaderException(new LogTruncationException(truncationWithoutResetPolicy)); + if (!truncations.isEmpty()) { + maybeSetOffsetForLeaderException(buildLogTruncationException(truncations)); } } @@ -858,14 +841,28 @@ public void onFailure(RuntimeException e) { subscriptions.requestFailed(fetchPositions.keySet(), time.milliseconds() + retryBackoffMs); metadata.requestUpdate(); - setFatalOffsetForLeaderException(e); + if (!(e instanceof RetriableException)) { + maybeSetOffsetForLeaderException(e); + } } }); }); } - private void setFatalOffsetForLeaderException(RuntimeException e) { - if (!(e instanceof RetriableException) && !cachedOffsetForLeaderException.compareAndSet(null, e)) { + private LogTruncationException buildLogTruncationException(List truncations) { + Map divergentOffsets = new HashMap<>(); + Map truncatedFetchOffsets = new HashMap<>(); + for (SubscriptionState.LogTruncation truncation : truncations) { + truncation.divergentOffsetOpt.ifPresent(divergentOffset -> + divergentOffsets.put(truncation.topicPartition, divergentOffset)); + truncatedFetchOffsets.put(truncation.topicPartition, truncation.fetchPosition.offset); + } + return new LogTruncationException("Detected truncated partitions: " + truncations, + truncatedFetchOffsets, divergentOffsets); + } + + private void maybeSetOffsetForLeaderException(RuntimeException e) { + if (!cachedOffsetForLeaderException.compareAndSet(null, e)) { log.error("Discarding error in OffsetsForLeaderEpoch because another error is pending", e); } } @@ -1296,7 +1293,7 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " + "does not match the current offset {}", tp, fetchOffset, position); } else { - handleOffsetOutOfRange(position, tp, "error response in offset fetch"); + handleOffsetOutOfRange(position, tp); } } else { log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}", @@ -1336,23 +1333,15 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc return completedFetch; } - private void handleOffsetOutOfRange(FetchPosition fetchPosition, - TopicPartition topicPartition, - String reason) { + private void handleOffsetOutOfRange(FetchPosition fetchPosition, TopicPartition topicPartition) { + String errorMessage = "Fetch position " + fetchPosition + " is out of range for partition " + topicPartition; if (subscriptions.hasDefaultOffsetResetPolicy()) { - log.info("Fetch offset epoch {} is out of range for partition {}, resetting offset", - fetchPosition, topicPartition); + log.info("{}, resetting offset", errorMessage); subscriptions.requestOffsetReset(topicPartition); } else { - Map offsetOutOfRangePartitions = - Collections.singletonMap(topicPartition, fetchPosition.offset); - String errorMessage = String.format("Offsets out of range " + - "with no configured reset policy for partitions: %s" + - ", for fetch offset: %d, " + - "root cause: %s", - offsetOutOfRangePartitions, fetchPosition.offset, reason); - log.info(errorMessage); - throw new OffsetOutOfRangeException(errorMessage, offsetOutOfRangePartitions); + log.info("{}, raising error to the application since no reset policy is configured", errorMessage); + throw new OffsetOutOfRangeException(errorMessage, + Collections.singletonMap(topicPartition, fetchPosition.offset)); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index b9885d00075ba..13103bc0c038f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -464,11 +464,11 @@ public synchronized boolean maybeValidatePositionForCurrentLeader(ApiVersions ap /** * Attempt to complete validation with the end offset returned from the OffsetForLeaderEpoch request. - * @return The diverging offset if truncation was detected and no reset policy is defined. + * @return Log truncation details if detected and no reset policy is defined. */ - public synchronized Optional maybeCompleteValidation(TopicPartition tp, - FetchPosition requestPosition, - EpochEndOffset epochEndOffset) { + public synchronized Optional maybeCompleteValidation(TopicPartition tp, + FetchPosition requestPosition, + EpochEndOffset epochEndOffset) { TopicPartitionState state = assignedStateOrNull(tp); if (state == null) { log.debug("Skipping completed validation for partition {} which is not currently assigned.", tp); @@ -480,6 +480,17 @@ public synchronized Optional maybeCompleteValidation(TopicPar log.debug("Skipping completed validation for partition {} since the current position {} " + "no longer matches the position {} when the request was sent", tp, currentPosition, requestPosition); + } else if (epochEndOffset.hasUndefinedEpochOrOffset()) { + if (hasDefaultOffsetResetPolicy()) { + log.info("Truncation detected for partition {} at offset {}, resetting offset", + tp, currentPosition); + + requestOffsetReset(tp); + } else { + log.warn("Truncation detected for partition {} at offset {}, but no reset policy is set", + tp, currentPosition); + return Optional.of(new LogTruncation(tp, requestPosition, Optional.empty())); + } } else if (epochEndOffset.endOffset() < currentPosition.offset) { if (hasDefaultOffsetResetPolicy()) { SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition( @@ -489,11 +500,12 @@ public synchronized Optional maybeCompleteValidation(TopicPar "the first offset known to diverge {}", tp, currentPosition, newPosition); state.seekValidated(newPosition); } else { + OffsetAndMetadata divergentOffset = new OffsetAndMetadata(epochEndOffset.endOffset(), + Optional.of(epochEndOffset.leaderEpoch()), null); log.warn("Truncation detected for partition {} at offset {} (the end offset from the " + - "broker is {}), but no reset policy is set", - tp, currentPosition, epochEndOffset); - return Optional.of(new OffsetAndMetadata(epochEndOffset.endOffset(), - Optional.of(epochEndOffset.leaderEpoch()), null)); + "broker is {}), but no reset policy is set", + tp, currentPosition, divergentOffset); + return Optional.of(new LogTruncation(tp, requestPosition, Optional.of(divergentOffset))); } } else { state.completeValidation(); @@ -1083,4 +1095,43 @@ public String toString() { '}'; } } + + public static class LogTruncation { + public final TopicPartition topicPartition; + public final FetchPosition fetchPosition; + public final Optional divergentOffsetOpt; + + public LogTruncation(TopicPartition topicPartition, + FetchPosition fetchPosition, + Optional divergentOffsetOpt) { + this.topicPartition = topicPartition; + this.fetchPosition = fetchPosition; + this.divergentOffsetOpt = divergentOffsetOpt; + } + + @Override + public String toString() { + StringBuilder bldr = new StringBuilder() + .append("(partition=") + .append(topicPartition) + .append(", fetchOffset=") + .append(fetchPosition.offset) + .append(", fetchEpoch=") + .append(fetchPosition.offsetEpoch); + + if (divergentOffsetOpt.isPresent()) { + OffsetAndMetadata divergentOffset = divergentOffsetOpt.get(); + bldr.append(", divergentOffset=") + .append(divergentOffset.offset()) + .append(", divergentEpoch=") + .append(divergentOffset.leaderEpoch()); + } else { + bldr.append(", divergentOffset=unknown") + .append(", divergentEpoch=unknown"); + } + + return bldr.append(")").toString(); + + } + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 64aea29e7226d..801eddb587218 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.LogTruncationException; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -3856,12 +3857,16 @@ private void testOffsetValidationWithGivenEpochOffset(final EpochEndOffset epoch consumerClient.poll(time.timer(Duration.ZERO)); if (offsetResetStrategy == OffsetResetStrategy.NONE) { - OffsetOutOfRangeException thrown = - assertThrows(OffsetOutOfRangeException.class, () -> fetcher.validateOffsetsIfNeeded()); + LogTruncationException thrown = + assertThrows(LogTruncationException.class, () -> fetcher.validateOffsetsIfNeeded()); + assertEquals(singletonMap(tp0, initialOffset), thrown.offsetOutOfRangePartitions()); - // If epoch offset is valid, we are testing for the log truncation case. - if (!epochEndOffset.hasUndefinedEpochOrOffset()) { - assertTrue(thrown instanceof LogTruncationException); + if (epochEndOffset.hasUndefinedEpochOrOffset()) { + assertEquals(Collections.emptyMap(), thrown.divergentOffsets()); + } else { + OffsetAndMetadata expectedDivergentOffset = new OffsetAndMetadata( + epochEndOffset.endOffset(), Optional.of(epochEndOffset.leaderEpoch()), ""); + assertEquals(singletonMap(tp0, expectedDivergentOffset), thrown.divergentOffsets()); } assertTrue(subscriptions.awaitingValidation(tp0)); } else { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 6d71ebc3b861e..e27e6957f9a33 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.SubscriptionState.LogTruncation; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; @@ -517,9 +518,9 @@ public void testMaybeCompleteValidation() { state.seekUnvalidated(tp0, initialPosition); assertTrue(state.awaitingValidation(tp0)); - Optional divergentOffsetMetadataOpt = state.maybeCompleteValidation(tp0, initialPosition, + Optional truncationOpt = state.maybeCompleteValidation(tp0, initialPosition, new EpochEndOffset(initialOffsetEpoch, initialOffset + 5)); - assertEquals(Optional.empty(), divergentOffsetMetadataOpt); + assertEquals(Optional.empty(), truncationOpt); assertFalse(state.awaitingValidation(tp0)); assertEquals(initialPosition, state.position(tp0)); } @@ -572,9 +573,9 @@ public void testMaybeCompleteValidationAfterPositionChange() { Optional.of(updateOffsetEpoch), new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch))); state.seekUnvalidated(tp0, updatePosition); - Optional divergentOffsetMetadataOpt = state.maybeCompleteValidation(tp0, initialPosition, + Optional truncationOpt = state.maybeCompleteValidation(tp0, initialPosition, new EpochEndOffset(initialOffsetEpoch, initialOffset + 5)); - assertEquals(Optional.empty(), divergentOffsetMetadataOpt); + assertEquals(Optional.empty(), truncationOpt); assertTrue(state.awaitingValidation(tp0)); assertEquals(updatePosition, state.position(tp0)); } @@ -595,9 +596,9 @@ public void testMaybeCompleteValidationAfterOffsetReset() { state.requestOffsetReset(tp0); - Optional divergentOffsetMetadataOpt = state.maybeCompleteValidation(tp0, initialPosition, + Optional truncationOpt = state.maybeCompleteValidation(tp0, initialPosition, new EpochEndOffset(initialOffsetEpoch, initialOffset + 5)); - assertEquals(Optional.empty(), divergentOffsetMetadataOpt); + assertEquals(Optional.empty(), truncationOpt); assertFalse(state.awaitingValidation(tp0)); assertTrue(state.isOffsetResetNeeded(tp0)); assertNull(state.position(tp0)); @@ -619,9 +620,9 @@ public void testTruncationDetectionWithResetPolicy() { state.seekUnvalidated(tp0, initialPosition); assertTrue(state.awaitingValidation(tp0)); - Optional divergentOffsetMetadata = state.maybeCompleteValidation(tp0, initialPosition, + Optional truncationOpt = state.maybeCompleteValidation(tp0, initialPosition, new EpochEndOffset(divergentOffsetEpoch, divergentOffset)); - assertEquals(Optional.empty(), divergentOffsetMetadata); + assertEquals(Optional.empty(), truncationOpt); assertFalse(state.awaitingValidation(tp0)); SubscriptionState.FetchPosition updatedPosition = new SubscriptionState.FetchPosition(divergentOffset, @@ -646,10 +647,62 @@ public void testTruncationDetectionWithoutResetPolicy() { state.seekUnvalidated(tp0, initialPosition); assertTrue(state.awaitingValidation(tp0)); - Optional divergentOffsetMetadata = state.maybeCompleteValidation(tp0, initialPosition, + Optional truncationOpt = state.maybeCompleteValidation(tp0, initialPosition, new EpochEndOffset(divergentOffsetEpoch, divergentOffset)); + assertTrue(truncationOpt.isPresent()); + LogTruncation truncation = truncationOpt.get(); + assertEquals(Optional.of(new OffsetAndMetadata(divergentOffset, Optional.of(divergentOffsetEpoch), "")), - divergentOffsetMetadata); + truncation.divergentOffsetOpt); + assertEquals(initialPosition, truncation.fetchPosition); + assertTrue(state.awaitingValidation(tp0)); + } + + @Test + public void testTruncationDetectionUnknownDivergentOffsetWithResetPolicy() { + Node broker1 = new Node(1, "localhost", 9092); + state = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + state.assignFromUser(Collections.singleton(tp0)); + + int currentEpoch = 10; + long initialOffset = 10L; + int initialOffsetEpoch = 5; + + SubscriptionState.FetchPosition initialPosition = new SubscriptionState.FetchPosition(initialOffset, + Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch))); + state.seekUnvalidated(tp0, initialPosition); + assertTrue(state.awaitingValidation(tp0)); + + Optional truncationOpt = state.maybeCompleteValidation(tp0, initialPosition, + new EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET)); + assertEquals(Optional.empty(), truncationOpt); + assertFalse(state.awaitingValidation(tp0)); + assertTrue(state.isOffsetResetNeeded(tp0)); + assertEquals(OffsetResetStrategy.EARLIEST, state.resetStrategy(tp0)); + } + + @Test + public void testTruncationDetectionUnknownDivergentOffsetWithoutResetPolicy() { + Node broker1 = new Node(1, "localhost", 9092); + state = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + state.assignFromUser(Collections.singleton(tp0)); + + int currentEpoch = 10; + long initialOffset = 10L; + int initialOffsetEpoch = 5; + + SubscriptionState.FetchPosition initialPosition = new SubscriptionState.FetchPosition(initialOffset, + Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch))); + state.seekUnvalidated(tp0, initialPosition); + assertTrue(state.awaitingValidation(tp0)); + + Optional truncationOpt = state.maybeCompleteValidation(tp0, initialPosition, + new EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET)); + assertTrue(truncationOpt.isPresent()); + LogTruncation truncation = truncationOpt.get(); + + assertEquals(Optional.empty(), truncation.divergentOffsetOpt); + assertEquals(initialPosition, truncation.fetchPosition); assertTrue(state.awaitingValidation(tp0)); }