From 3019459a6065c1b09871bd1abe7f33c3c42d9511 Mon Sep 17 00:00:00 2001 From: "Colin P. McCabe" Date: Tue, 25 Apr 2023 16:09:35 -0700 Subject: [PATCH 01/11] MINOR: provide the exact offset to QuorumController.replay Provide the exact record offset to QuorumController.replay() in all cases. There are several situations where this is useful, such as logging, implementing metadata transactions, or handling broker registration records. In the case where the QC is inactive, and simply replaying records, it is easy to compute the exact record offset from the batch base offset and the record index. The active QC case is more difficult. Technically, when we submit records to the Raft layer, it can choose a batch end offset later than the one we expect, if someone else is also adding records. While the QC is the only entity submitting data records, control records may be added at any time. In the current implementation, these are really only used for leadership elections. However, this could change with the addition of quorum reconfiguration or similar features. Therefore, this PR allows the QC to tell the Raft layer that a record append should fail if it would have resulted in a batch end offset other than what was expected. This in turn will trigger a controller failover. In the future, if automatically added control records become more common, we may wish to have a more sophisticated system than this simple optimistic concurrency mechanism. But for now, this will allow us to rely on the offset as correct. In order that the active QC can learn what offset to start writing at, the PR also adds a new endOffset parameter to handleLeaderChange. Since the Raft layer only invokes handleLeaderChange on the active once it has replayed the log, this information should always be up-to-date in that context. At the Raft level, this PR adds a new exception, UnexpectedEndOffsetException. This gets thrown when we request an end offset that doesn't match the one the Raft layer would have given us. Although this exception should cause a failover, it should not be considered a fault. This complicated the exception handling a bit and motivated splitting more of it out into the new EventHandlerExceptionInfo class. This will also let us unit test things like slf4j log messages a bit better. --- .../scala/kafka/tools/TestRaftServer.scala | 2 +- .../kafka/controller/QuorumController.java | 100 ++++---- .../errors/ControllerExceptions.java | 61 ----- .../errors/EventHandlerExceptionInfo.java | 223 ++++++++++++++++++ .../kafka/image/loader/MetadataLoader.java | 2 +- .../kafka/metadata/util/BatchFileWriter.java | 5 +- .../metadata/util/SnapshotFileReader.java | 2 +- .../errors/ControllerExceptionsTest.java | 68 +----- .../errors/EventHandlerExceptionInfoTest.java | 173 ++++++++++++++ .../image/publisher/SnapshotEmitterTest.java | 6 +- .../apache/kafka/metalog/LocalLogManager.java | 76 ++++-- .../kafka/metalog/LocalLogManagerTestEnv.java | 10 +- .../metalog/MockMetaLogManagerListener.java | 2 +- .../apache/kafka/raft/KafkaRaftClient.java | 27 +-- .../org/apache/kafka/raft/RaftClient.java | 14 +- .../apache/kafka/raft/ReplicatedCounter.java | 2 +- .../errors/UnexpectedEndOffsetException.java | 29 +++ .../raft/internals/BatchAccumulator.java | 69 ++---- .../kafka/snapshot/RecordsSnapshotWriter.java | 3 +- .../kafka/raft/KafkaRaftClientTest.java | 3 +- .../kafka/raft/RaftClientTestContext.java | 2 +- .../raft/internals/BatchAccumulatorTest.java | 17 +- .../deferred/DeferredEventQueueTest.java | 26 ++ 23 files changed, 632 insertions(+), 290 deletions(-) create mode 100644 metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java create mode 100644 metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java create mode 100644 raft/src/main/java/org/apache/kafka/raft/errors/UnexpectedEndOffsetException.java diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index 1026c52847337..6d2653979ada0 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -173,7 +173,7 @@ class TestRaftServer( raftManager.register(this) - override def handleLeaderChange(newLeaderAndEpoch: LeaderAndEpoch): Unit = { + override def handleLeaderChange(newLeaderAndEpoch: LeaderAndEpoch, endOffset: Long): Unit = { if (newLeaderAndEpoch.isLeader(config.nodeId)) { eventQueue.offer(HandleClaim(newLeaderAndEpoch.epoch)) } else if (claimedEpoch.isDefined) { diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 582d774e6e9b2..3026154fb4fc4 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -77,6 +77,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.controller.errors.ControllerExceptions; +import org.apache.kafka.controller.errors.EventHandlerExceptionInfo; import org.apache.kafka.controller.metrics.QuorumControllerMetrics; import org.apache.kafka.metadata.BrokerHeartbeatReply; import org.apache.kafka.metadata.BrokerRegistrationReply; @@ -458,38 +459,32 @@ private void handleEventEnd(String name, long startProcessingTimeNs) { controllerMetrics.updateEventQueueProcessingTime(NANOSECONDS.toMillis(deltaNs)); } - private Throwable handleEventException(String name, - OptionalLong startProcessingTimeNs, - Throwable exception) { - Throwable externalException = - ControllerExceptions.toExternalException(exception, () -> latestController()); - if (!startProcessingTimeNs.isPresent()) { - log.error("{}: unable to start processing because of {}. Reason: {}", name, - exception.getClass().getSimpleName(), exception.getMessage()); - return externalException; - } - long endProcessingTime = time.nanoseconds(); - long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong(); - long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS); - if (ControllerExceptions.isExpected(exception)) { - log.info("{}: failed with {} in {} us. Reason: {}", name, - exception.getClass().getSimpleName(), deltaUs, exception.getMessage()); - return externalException; + private Throwable handleEventException( + String name, + OptionalLong startProcessingTimeNs, + Throwable exception + ) { + OptionalLong deltaUs; + if (startProcessingTimeNs.isPresent()) { + long endProcessingTime = time.nanoseconds(); + long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong(); + deltaUs = OptionalLong.of(MICROSECONDS.convert(deltaNs, NANOSECONDS)); + } else { + deltaUs = OptionalLong.empty(); + } + EventHandlerExceptionInfo info = EventHandlerExceptionInfo. + fromInternal(exception, () -> latestController()); + String failureMessage = info.failureMessage(lastCommittedEpoch, deltaUs, + isActiveController(), lastCommittedOffset); + if (info.isFault()) { + nonFatalFaultHandler.handleFault(name + ": " + failureMessage, exception); + } else { + log.info("{}: {}", name, failureMessage); } - if (isActiveController()) { - nonFatalFaultHandler.handleFault(String.format("%s: failed with unexpected server " + - "exception %s at epoch %d in %d us. Renouncing leadership and reverting " + - "to the last committed offset %d.", - name, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs, - lastCommittedOffset), exception); + if (info.causesFailover() && isActiveController()) { renounce(); - } else { - nonFatalFaultHandler.handleFault(String.format("%s: failed with unexpected server " + - "exception %s in %d us. The controller is already in standby mode.", - name, exception.getClass().getSimpleName(), deltaUs), - exception); } - return externalException; + return info.effectiveExternalException(); } /** @@ -740,22 +735,24 @@ public Long apply(List records) { // Start by trying to apply the record to our in-memory state. This should always // succeed; if it does not, that's a fatal error. It is important to do this before // scheduling the record for Raft replication. - int i = 1; + int i = 0; for (ApiMessageAndVersion message : records) { try { - replay(message.message(), Optional.empty(), prevEndOffset + records.size()); + replay(message.message(), Optional.empty(), prevEndOffset + i + 1); } catch (Throwable e) { String failureMessage = String.format("Unable to apply %s record, which was " + "%d of %d record(s) in the batch following last write offset %d.", - message.message().getClass().getSimpleName(), i, records.size(), + message.message().getClass().getSimpleName(), i + 1, records.size(), prevEndOffset); throw fatalFaultHandler.handleFault(failureMessage, e); } i++; } - prevEndOffset = raftClient.scheduleAtomicAppend(controllerEpoch, records); - snapshotRegistry.getOrCreateSnapshot(prevEndOffset); - return prevEndOffset; + long nextEndOffset = prevEndOffset + i; + raftClient.scheduleAtomicAppend(controllerEpoch, OptionalLong.of(nextEndOffset), records); + snapshotRegistry.getOrCreateSnapshot(nextEndOffset); + prevEndOffset = nextEndOffset; + return nextEndOffset; } }); op.processBatchEndOffset(offset); @@ -973,14 +970,14 @@ public void handleCommit(BatchReader reader) { log.debug("Replaying commits from the active node up to " + "offset {} and epoch {}.", offset, epoch); } - int i = 1; + int i = 0; for (ApiMessageAndVersion message : messages) { try { - replay(message.message(), Optional.empty(), offset); + replay(message.message(), Optional.empty(), batch.baseOffset() + i); } catch (Throwable e) { String failureMessage = String.format("Unable to apply %s record on standby " + "controller, which was %d of %d record(s) in the batch with baseOffset %d.", - message.message().getClass().getSimpleName(), i, messages.size(), + message.message().getClass().getSimpleName(), i + 1, messages.size(), batch.baseOffset()); throw fatalFaultHandler.handleFault(failureMessage, e); } @@ -1059,7 +1056,7 @@ public void handleLoadSnapshot(SnapshotReader reader) { } @Override - public void handleLeaderChange(LeaderAndEpoch newLeader) { + public void handleLeaderChange(LeaderAndEpoch newLeader, long endOffset) { appendRaftEvent("handleLeaderChange[" + newLeader.epoch() + "]", () -> { final String newLeaderName = newLeader.leaderId().isPresent() ? String.valueOf(newLeader.leaderId().getAsInt()) : "(none)"; @@ -1076,10 +1073,10 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) { renounce(); } } else if (newLeader.isLeader(nodeId)) { - log.info("Becoming the active controller at epoch {}, committed offset {}, " + - "committed epoch {}", newLeader.epoch(), lastCommittedOffset, - lastCommittedEpoch); - claim(newLeader.epoch()); + long newLastWriteOffset = endOffset - 1; + log.info("Becoming the active controller at epoch {}, last write offset {}.", + newLeader.epoch(), newLastWriteOffset); + claim(newLeader.epoch(), newLastWriteOffset); } else { log.info("In the new epoch {}, the leader is {}.", newLeader.epoch(), newLeaderName); @@ -1150,15 +1147,17 @@ private void updateWriteOffset(long offset) { } } - private void claim(int epoch) { + private void claim(int epoch, long newLastWriteOffset) { try { if (curClaimEpoch != -1) { throw new RuntimeException("Cannot claim leadership because we are already the " + "active controller."); } curClaimEpoch = epoch; + lastCommittedOffset = newLastWriteOffset; + lastCommittedEpoch = epoch; controllerMetrics.setActive(true); - updateWriteOffset(lastCommittedOffset); + updateWriteOffset(newLastWriteOffset); clusterControl.activate(); // Before switching to active, create an in-memory snapshot at the last committed @@ -1498,25 +1497,24 @@ private void handleFeatureControlChange() { * * @param message The metadata record * @param snapshotId The snapshotId if this record is from a snapshot - * @param batchLastOffset The offset of the last record in the log batch, or the lastContainedLogOffset - * if this record is from a snapshot, this is used along with RegisterBrokerRecord + * @param offset The offset of the record */ - private void replay(ApiMessage message, Optional snapshotId, long batchLastOffset) { + private void replay(ApiMessage message, Optional snapshotId, long offset) { if (log.isTraceEnabled()) { if (snapshotId.isPresent()) { log.trace("Replaying snapshot {} record {}", Snapshots.filenameFromSnapshotId(snapshotId.get()), recordRedactor.toLoggableString(message)); } else { - log.trace("Replaying log record {} with batchLastOffset {}", - recordRedactor.toLoggableString(message), batchLastOffset); + log.trace("Replaying log record {} with offset {}", + recordRedactor.toLoggableString(message), offset); } } logReplayTracker.replay(message); MetadataRecordType type = MetadataRecordType.fromId(message.apiKey()); switch (type) { case REGISTER_BROKER_RECORD: - clusterControl.replay((RegisterBrokerRecord) message, batchLastOffset); + clusterControl.replay((RegisterBrokerRecord) message, offset); break; case UNREGISTER_BROKER_RECORD: clusterControl.replay((UnregisterBrokerRecord) message); diff --git a/metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java b/metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java index 6f99ea45f6b5d..b7e74446a4b6b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java +++ b/metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java @@ -17,18 +17,11 @@ package org.apache.kafka.controller.errors; -import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.NotControllerException; -import org.apache.kafka.common.errors.PolicyViolationException; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.errors.UnknownServerException; -import org.apache.kafka.raft.errors.NotLeaderException; -import org.apache.kafka.server.mutable.BoundedListTooLongException; import java.util.OptionalInt; import java.util.concurrent.ExecutionException; -import java.util.concurrent.RejectedExecutionException; -import java.util.function.Supplier; public class ControllerExceptions { @@ -93,58 +86,4 @@ public static NotControllerException newWrongControllerException(OptionalInt con return new NotControllerException("No controller appears to be active."); } } - - /** - * Determine if an exception is expected. Unexpected exceptions trigger controller failovers - * when they are raised. - * - * @param exception The exception. - * @return True if the exception is expected. - */ - public static boolean isExpected(Throwable exception) { - if (exception instanceof ApiException) { - // ApiExceptions indicate errors that should be returned to the user. - return true; - } else if (exception instanceof NotLeaderException) { - // NotLeaderException is thrown if we try to append records, but are not the leader. - return true; - } else if (exception instanceof RejectedExecutionException) { - // This can happen when the controller is shutting down. - return true; - } else if (exception instanceof BoundedListTooLongException) { - // This can happen if we tried to create too many records. - return true; - } else if (exception instanceof InterruptedException) { - // Interrupted exceptions are not expected. They might happen during junit tests if - // the test gets stuck and must be terminated by sending IE to all the threads. - return false; - } - // Other exceptions are unexpected. - return false; - } - - /** - * Translate an internal controller exception to its external equivalent. - * - * @param exception The internal exception. - * @return Its external equivalent. - */ - public static Throwable toExternalException( - Throwable exception, - Supplier latestControllerSupplier - ) { - if (exception instanceof ApiException) { - return exception; - } else if (exception instanceof NotLeaderException) { - return newWrongControllerException(latestControllerSupplier.get()); - } else if (exception instanceof RejectedExecutionException) { - return new TimeoutException("The controller is shutting down.", exception); - } else if (exception instanceof BoundedListTooLongException) { - return new PolicyViolationException("Unable to perform excessively large batch " + - "operation."); - } else if (exception instanceof InterruptedException) { - return new UnknownServerException("The controller was interrupted."); - } - return new UnknownServerException(exception); - } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java b/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java new file mode 100644 index 0000000000000..4e74b71a677b0 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller.errors; + +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.NotControllerException; +import org.apache.kafka.common.errors.PolicyViolationException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.raft.errors.NotLeaderException; +import org.apache.kafka.raft.errors.UnexpectedEndOffsetException; +import org.apache.kafka.server.mutable.BoundedListTooLongException; + +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.RejectedExecutionException; +import java.util.function.Supplier; + + +public final class EventHandlerExceptionInfo { + /** + * True if this exception should be treated as a fault. + */ + private final boolean isFault; + + /** + * True if this exception should cause a controller failover. + * All faults cause failover + */ + private final boolean causesFailover; + + /** + * The internal exception. + */ + private final Throwable internalException; + + /** + * The exception to present to RPC callers, or Optional.empty if the internal exception should + * be presented directly. + */ + private final Optional externalException; + + /** + * Create an EventHandlerExceptionInfo object from an internal exception. + * + * @param internal The internal exception. + * @param latestControllerSupplier A function we can call to obtain the latest leader id. + * + * @return The new immutable info object. + */ + public static EventHandlerExceptionInfo fromInternal( + Throwable internal, + Supplier latestControllerSupplier + ) { + if (internal instanceof ApiException) { + // This exception is a standard API error response from the controller, which can pass + // through without modification. + return new EventHandlerExceptionInfo(false, false, internal); + } else if (internal instanceof NotLeaderException) { + // The controller has lost leadership. + return new EventHandlerExceptionInfo(false, true, internal, + ControllerExceptions.newWrongControllerException(latestControllerSupplier.get())); + } else if (internal instanceof RejectedExecutionException) { + // The controller event queue is shutting down. + return new EventHandlerExceptionInfo(false, false, internal, + new TimeoutException("The controller is shutting down.", internal)); + } else if (internal instanceof BoundedListTooLongException) { + // The operation could not be performed because it would have created an overly large + // batch. + return new EventHandlerExceptionInfo(false, false, internal, + new PolicyViolationException("Unable to perform excessively large batch " + + "operation.")); + } else if (internal instanceof UnexpectedEndOffsetException) { + // The active controller picked the wrong end offset for its next batch. It must now + // fail over. This should be pretty rare. + return new EventHandlerExceptionInfo(false, true, internal, + new NotControllerException("Unexpected end offset. Controller not known.")); + } else if (internal instanceof InterruptedException) { + // The controller event queue has been interrupted. This normally only happens during + // a JUnit test that has hung. The test framework sometimes sends an InterruptException + // to all threads to try to get them to shut down. This isn't the correct way to shut + // the test, but it may happen if something hung. + return new EventHandlerExceptionInfo(true, true, internal, + new UnknownServerException("The controller was interrupted.")); + } else { + // This is the catch-all case for things that aren't supposed to happen. Null pointer + // exceptions, illegal argument exceptions, etc. They get translated into an + // UnknownServerException and a controller failover. + return new EventHandlerExceptionInfo(true, true, internal, + new UnknownServerException(internal)); + } + } + + /** + * Returns true if the class and message fields match for two exceptions. Handles nulls. + */ + static boolean exceptionClassesAndMessagesMatch(Throwable a, Throwable b) { + if (a == null) return b == null; + if (b == null) return false; + if (!a.getClass().equals(b.getClass())) return false; + if (!Objects.equals(a.getMessage(), b.getMessage())) return false; + return true; + } + + EventHandlerExceptionInfo( + boolean isFault, + boolean causesFailover, + Throwable internalException + ) { + this.isFault = isFault; + this.causesFailover = causesFailover; + this.internalException = internalException; + this.externalException = Optional.empty(); + } + + EventHandlerExceptionInfo( + boolean isFault, + boolean causesFailover, + Throwable internalException, + Throwable externalException + ) { + this.isFault = isFault; + this.causesFailover = causesFailover; + this.internalException = internalException; + this.externalException = Optional.of(externalException); + } + + public boolean isFault() { + return isFault; + } + + public boolean causesFailover() { + return causesFailover; + } + + public Throwable effectiveExternalException() { + return externalException.orElse(internalException); + } + + public String failureMessage( + int epoch, + OptionalLong deltaUs, + boolean isActiveController, + long lastCommittedOffset + ) { + StringBuilder bld = new StringBuilder(); + if (deltaUs.isPresent()) { + bld.append("failed with "); + } else { + bld.append("unable to start processing because of "); + } + bld.append(internalException.getClass().getSimpleName()); + if (externalException.isPresent()) { + bld.append(" (treated as "). + append(externalException.get().getClass().getSimpleName()).append(")"); + } + if (causesFailover()) { + bld.append(" at epoch ").append(epoch); + } + if (deltaUs.isPresent()) { + bld.append(" in ").append(deltaUs.getAsLong()).append(" microseconds"); + } + if (causesFailover()) { + if (isActiveController) { + bld.append(". Renouncing leadership and reverting to the last committed offset "); + bld.append(lastCommittedOffset); + } else { + bld.append(". The controller is already in standby mode"); + } + } + bld.append("."); + return bld.toString(); + } + + @Override + public int hashCode() { + return Objects.hash(isFault, + causesFailover, + internalException.getClass().getCanonicalName(), + internalException.getMessage(), + externalException.orElse(internalException).getClass().getCanonicalName(), + externalException.orElse(internalException).getMessage()); + } + + @Override + public boolean equals(Object o) { + if (o == null || !(o.getClass().equals(EventHandlerExceptionInfo.class))) return false; + EventHandlerExceptionInfo other = (EventHandlerExceptionInfo) o; + return isFault == other.isFault && + causesFailover == other.causesFailover && + exceptionClassesAndMessagesMatch(internalException, other.internalException) && + exceptionClassesAndMessagesMatch(externalException.orElse(null), + other.externalException.orElse(null)); + } + + @Override + public String toString() { + return "EventHandlerExceptionInfo" + + "(isFault=" + isFault + + ", causesFailover=" + causesFailover + + ", internalException.class=" + internalException.getClass().getCanonicalName() + + ", externalException.class=" + (externalException.isPresent() ? + externalException.get().getClass().getCanonicalName() : "(none)") + + ")"; + } +} \ No newline at end of file diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java index 768fcb2574b2f..c923742af9fa7 100644 --- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java +++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java @@ -495,7 +495,7 @@ SnapshotManifest loadSnapshot( } @Override - public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) { + public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch, long endOffset) { eventQueue.append(() -> { currentLeaderAndEpoch = leaderAndEpoch; for (MetadataPublisher publisher : publishers.values()) { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java index 6f82d915d8d5d..0ccf39a85ac8f 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java @@ -34,6 +34,7 @@ import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.List; +import java.util.OptionalLong; import static org.apache.kafka.raft.KafkaRaftClient.MAX_BATCH_SIZE_BYTES; @@ -61,11 +62,11 @@ private BatchFileWriter( } public void append(ApiMessageAndVersion apiMessageAndVersion) { - batchAccumulator.append(0, Collections.singletonList(apiMessageAndVersion)); + batchAccumulator.append(0, Collections.singletonList(apiMessageAndVersion), OptionalLong.empty(), false); } public void append(List messageBatch) { - batchAccumulator.append(0, messageBatch); + batchAccumulator.append(0, messageBatch, OptionalLong.empty(), false); } public void close() throws IOException { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java index 19875741ec2de..5db1e575e7967 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java @@ -137,7 +137,7 @@ private void handleControlBatch(FileChannelRecordBatch batch) { listener.handleLeaderChange(new LeaderAndEpoch( OptionalInt.of(message.leaderId()), batch.partitionLeaderEpoch() - )); + ), batch.lastOffset() + 1); break; default: log.error("Ignoring control record with type {} at offset {}", diff --git a/metadata/src/test/java/org/apache/kafka/controller/errors/ControllerExceptionsTest.java b/metadata/src/test/java/org/apache/kafka/controller/errors/ControllerExceptionsTest.java index 81c234491b558..2d1905b1a5c1c 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/errors/ControllerExceptionsTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/errors/ControllerExceptionsTest.java @@ -20,20 +20,15 @@ import org.apache.kafka.common.errors.NotControllerException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicExistsException; -import org.apache.kafka.common.errors.UnknownServerException; -import org.apache.kafka.raft.errors.NotLeaderException; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import java.util.OptionalInt; import java.util.concurrent.ExecutionException; -import java.util.concurrent.RejectedExecutionException; -import static org.apache.kafka.controller.errors.ControllerExceptions.isExpected; import static org.apache.kafka.controller.errors.ControllerExceptions.isTimeoutException; import static org.apache.kafka.controller.errors.ControllerExceptions.newPreMigrationException; import static org.apache.kafka.controller.errors.ControllerExceptions.newWrongControllerException; -import static org.apache.kafka.controller.errors.ControllerExceptions.toExternalException; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -99,32 +94,7 @@ public void testNewWrongControllerExceptionWithActiveController() { newWrongControllerException(OptionalInt.of(1))); } - @Test - public void testApiExceptionIsExpected() { - assertTrue(isExpected(new TopicExistsException(""))); - } - - @Test - public void testNotLeaderExceptionIsExpected() { - assertTrue(isExpected(new NotLeaderException(""))); - } - - @Test - public void testRejectedExecutionExceptionIsExpected() { - assertTrue(isExpected(new RejectedExecutionException())); - } - - @Test - public void testInterruptedExceptionIsNotExpected() { - assertFalse(isExpected(new InterruptedException())); - } - - @Test - public void testRuntimeExceptionIsNotExpected() { - assertFalse(isExpected(new NullPointerException())); - } - - private static void assertExceptionsMatch(Throwable a, Throwable b) { + static void assertExceptionsMatch(Throwable a, Throwable b) { assertEquals(a.getClass(), b.getClass()); assertEquals(a.getMessage(), b.getMessage()); if (a.getCause() != null) { @@ -134,40 +104,4 @@ private static void assertExceptionsMatch(Throwable a, Throwable b) { assertNull(b.getCause()); } } - - @Test - public void testApiExceptionToExternalException() { - assertExceptionsMatch(new TopicExistsException("Topic foo exists"), - toExternalException(new TopicExistsException("Topic foo exists"), - () -> OptionalInt.of(1))); - } - - @Test - public void testNotLeaderExceptionToExternalException() { - assertExceptionsMatch(new NotControllerException("The active controller appears to be node 1."), - toExternalException(new NotLeaderException("Append failed because the given epoch 123 is stale."), - () -> OptionalInt.of(1))); - } - - @Test - public void testRejectedExecutionExceptionToExternalException() { - assertExceptionsMatch(new TimeoutException("The controller is shutting down.", - new RejectedExecutionException("The event queue is shutting down")), - toExternalException(new RejectedExecutionException("The event queue is shutting down"), - () -> OptionalInt.empty())); - } - - @Test - public void testInterruptedExceptionToExternalException() { - assertExceptionsMatch(new UnknownServerException("The controller was interrupted."), - toExternalException(new InterruptedException(), - () -> OptionalInt.empty())); - } - - @Test - public void testRuntimeExceptionToExternalException() { - assertExceptionsMatch(new UnknownServerException(new NullPointerException("Null pointer exception")), - toExternalException(new NullPointerException("Null pointer exception"), - () -> OptionalInt.empty())); - } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java b/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java new file mode 100644 index 0000000000000..ccffde2831e1f --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller.errors; + +import org.apache.kafka.common.errors.NotControllerException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.raft.errors.NotLeaderException; +import org.apache.kafka.raft.errors.UnexpectedEndOffsetException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.RejectedExecutionException; + +import static org.junit.jupiter.api.Assertions.assertEquals; + + +@Timeout(value = 40) +public class EventHandlerExceptionInfoTest { + private static final EventHandlerExceptionInfo TOPIC_EXISTS = + EventHandlerExceptionInfo.fromInternal( + new TopicExistsException("Topic exists."), + () -> OptionalInt.empty()); + + private static final EventHandlerExceptionInfo REJECTED_EXECUTION = + EventHandlerExceptionInfo.fromInternal( + new RejectedExecutionException(), + () -> OptionalInt.empty()); + + private static final EventHandlerExceptionInfo INTERRUPTED = + EventHandlerExceptionInfo.fromInternal( + new InterruptedException(), + () -> OptionalInt.of(1)); + + private static final EventHandlerExceptionInfo NULL_POINTER = + EventHandlerExceptionInfo.fromInternal( + new NullPointerException(), + () -> OptionalInt.of(1)); + + private static final EventHandlerExceptionInfo NOT_LEADER = + EventHandlerExceptionInfo.fromInternal( + new NotLeaderException("Append failed"), + () -> OptionalInt.of(2)); + + private static final EventHandlerExceptionInfo UNEXPECTED_END_OFFSET = + EventHandlerExceptionInfo.fromInternal( + new UnexpectedEndOffsetException("Wanted end offset 3, but next available was 4"), + () -> OptionalInt.of(1)); + + @Test + public void testTopicExistsExceptionInfo() { + assertEquals(new EventHandlerExceptionInfo(false, false, + new TopicExistsException("Topic exists.")), + TOPIC_EXISTS); + } + + @Test + public void testTopicExistsExceptionFailureMessage() { + assertEquals("failed with TopicExistsException in 234 microseconds.", + TOPIC_EXISTS.failureMessage(123, OptionalLong.of(234L), true, 456L)); + } + + @Test + public void testRejectedExecutionExceptionInfo() { + assertEquals(new EventHandlerExceptionInfo(false, false, + new RejectedExecutionException(), + new TimeoutException("The controller is shutting down.", new RejectedExecutionException())), + REJECTED_EXECUTION); + } + + @Test + public void testRejectedExecutionExceptionFailureMessage() { + assertEquals("unable to start processing because of RejectedExecutionException (treated " + + "as TimeoutException).", + REJECTED_EXECUTION.failureMessage(123, OptionalLong.empty(), true, 456L)); + } + + @Test + public void testInterruptedExceptionInfo() { + assertEquals(new EventHandlerExceptionInfo(true, true, + new InterruptedException(), + new UnknownServerException("The controller was interrupted.")), + INTERRUPTED); + } + + @Test + public void testInterruptedExceptionFailureMessageWhenActive() { + assertEquals("unable to start processing because of InterruptedException (treated as " + + "UnknownServerException) at epoch 123. Renouncing leadership and reverting to the " + + "last committed offset 456.", + INTERRUPTED.failureMessage(123, OptionalLong.empty(), true, 456L)); + } + + @Test + public void testInterruptedExceptionFailureMessageWhenInactive() { + assertEquals("unable to start processing because of InterruptedException (treated as " + + "UnknownServerException) at epoch 123. The controller is already in standby mode.", + INTERRUPTED.failureMessage(123, OptionalLong.empty(), false, 456L)); + } + + @Test + public void testNullPointerExceptionInfo() { + assertEquals(new EventHandlerExceptionInfo(true, true, + new NullPointerException(), + new UnknownServerException(new NullPointerException())), + NULL_POINTER); + } + + @Test + public void testNullPointerExceptionFailureMessageWhenActive() { + assertEquals("failed with NullPointerException (treated as UnknownServerException) " + + "at epoch 123 in 40 microseconds. Renouncing leadership and reverting to the last " + + "committed offset 456.", + NULL_POINTER.failureMessage(123, OptionalLong.of(40L), true, 456L)); + } + + @Test + public void testNullPointerExceptionFailureMessageWhenInactive() { + assertEquals("failed with NullPointerException (treated as UnknownServerException) " + + "at epoch 123 in 40 microseconds. The controller is already in standby mode.", + NULL_POINTER.failureMessage(123, OptionalLong.of(40L), false, 456L)); + } + + @Test + public void testNotLeaderExceptionInfo() { + assertEquals(new EventHandlerExceptionInfo(false, true, + new NotLeaderException("Append failed"), + new NotControllerException("The active controller appears to be node 2.")), + NOT_LEADER); + } + + @Test + public void testNotLeaderExceptionFailureMessage() { + assertEquals("unable to start processing because of NotLeaderException (treated as " + + "NotControllerException) at epoch 123. Renouncing leadership and reverting to the " + + "last committed offset 456.", + NOT_LEADER.failureMessage(123, OptionalLong.empty(), true, 456L)); + } + + @Test + public void testUnexpectedEndOffsetExceptionInfo() { + assertEquals(new EventHandlerExceptionInfo(false, true, + new UnexpectedEndOffsetException("Wanted end offset 3, but next available was 4"), + new NotControllerException("Unexpected end offset. Controller not known.")), + UNEXPECTED_END_OFFSET); + } + + @Test + public void testUnepxectedEndOffsetFailureMessage() { + assertEquals("failed with UnexpectedEndOffsetException (treated as " + + "NotControllerException) at epoch 123 in 90 microseconds. Renouncing leadership " + + "and reverting to the last committed offset 456.", + UNEXPECTED_END_OFFSET.failureMessage(123, OptionalLong.of(90L), true, 456L)); + } +} diff --git a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java index ca72aa058ef9d..038de599009f2 100644 --- a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java @@ -81,7 +81,11 @@ public long scheduleAppend(int epoch, List records) { } @Override - public long scheduleAtomicAppend(int epoch, List records) { + public long scheduleAtomicAppend( + int epoch, + OptionalLong requiredEndOffset, + List records + ) { return 0; } diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java index 46f99db5d1d1e..1f1b5d72e4754 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java @@ -32,6 +32,7 @@ import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.RaftClient; import org.apache.kafka.raft.errors.NotLeaderException; +import org.apache.kafka.raft.errors.UnexpectedEndOffsetException; import org.apache.kafka.raft.internals.MemoryBatchReader; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.snapshot.MockRawSnapshotReader; @@ -221,13 +222,26 @@ synchronized void unregisterLogManager(LocalLogManager logManager) { } } - synchronized long tryAppend(int nodeId, int epoch, List batch) { + synchronized long tryAppend( + int nodeId, + int epoch, + OptionalLong requiredEndOffset, + List batch + ) { // No easy access to the concept of time. Use the base offset as the append timestamp long appendTimestamp = (prevOffset + 1) * 10; - return tryAppend(nodeId, epoch, new LocalRecordBatch(epoch, appendTimestamp, batch)); - } - - synchronized long tryAppend(int nodeId, int epoch, LocalBatch batch) { + return tryAppend(nodeId, + epoch, + requiredEndOffset, + new LocalRecordBatch(epoch, appendTimestamp, batch)); + } + + synchronized long tryAppend( + int nodeId, + int epoch, + OptionalLong requiredEndOffset, + LocalBatch batch + ) { if (!leader.isLeader(nodeId)) { log.debug("tryAppend(nodeId={}, epoch={}): the given node id does not " + "match the current leader id of {}.", nodeId, epoch, leader.leaderId()); @@ -243,15 +257,24 @@ synchronized long tryAppend(int nodeId, int epoch, LocalBatch batch) { } log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch); - long offset = append(batch); + long offset = append(requiredEndOffset, batch); electLeaderIfNeeded(); return offset; } - public synchronized long append(LocalBatch batch) { - prevOffset += batch.size(); - log.debug("append(batch={}, prevOffset={})", batch, prevOffset); - batches.put(prevOffset, batch); + public synchronized long append( + OptionalLong requiredEndOffset, + LocalBatch batch + ) { + long nextEndOffset = prevOffset + batch.size(); + requiredEndOffset.ifPresent(r -> { + if (r != nextEndOffset) { + throw new UnexpectedEndOffsetException("Wanted end offset " + r + + ", but next available was " + nextEndOffset); + } + }); + log.debug("append(batch={}, nextEndOffset={})", batch, nextEndOffset); + batches.put(nextEndOffset, batch); if (batch instanceof LeaderChangeBatch) { LeaderChangeBatch leaderChangeBatch = (LeaderChangeBatch) batch; leader = leaderChangeBatch.newLeader; @@ -259,7 +282,8 @@ public synchronized long append(LocalBatch batch) { for (LocalLogManager logManager : logManagers.values()) { logManager.scheduleLogCheck(); } - return prevOffset; + prevOffset = nextEndOffset; + return nextEndOffset; } synchronized void electLeaderIfNeeded() { @@ -274,7 +298,7 @@ synchronized void electLeaderIfNeeded() { } LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(nextLeaderNode), leader.epoch() + 1); log.info("Elected new leader: {}.", newLeader); - append(new LeaderChangeBatch(newLeader)); + append(OptionalLong.empty(), new LeaderChangeBatch(newLeader)); } synchronized LeaderAndEpoch leaderAndEpoch() { @@ -431,8 +455,8 @@ void handleLoadSnapshot(SnapshotReader reader) { void handleLeaderChange(long offset, LeaderAndEpoch leader) { // Simulate KRaft implementation by first bumping the epoch before assigning a leader - listener.handleLeaderChange(new LeaderAndEpoch(OptionalInt.empty(), leader.epoch())); - listener.handleLeaderChange(leader); + listener.handleLeaderChange(new LeaderAndEpoch(OptionalInt.empty(), leader.epoch()), offset + 1); + listener.handleLeaderChange(leader, offset + 1); notifiedLeader = leader; this.offset = offset; @@ -737,7 +761,9 @@ public long scheduleAppend(int epoch, List batch) { OptionalLong firstOffset = first .stream() - .mapToLong(record -> scheduleAtomicAppend(epoch, Collections.singletonList(record))) + .mapToLong(record -> scheduleAtomicAppend(epoch, + OptionalLong.empty(), + Collections.singletonList(record))) .max(); if (firstOffset.isPresent() && resignAfterNonAtomicCommit.getAndSet(false)) { @@ -749,15 +775,24 @@ public long scheduleAppend(int epoch, List batch) { } else { return second .stream() - .mapToLong(record -> scheduleAtomicAppend(epoch, Collections.singletonList(record))) + .mapToLong(record -> scheduleAtomicAppend(epoch, + OptionalLong.empty(), + Collections.singletonList(record))) .max() .getAsLong(); } } @Override - public long scheduleAtomicAppend(int epoch, List batch) { - return shared.tryAppend(nodeId, leader.epoch(), batch); + public long scheduleAtomicAppend( + int epoch, + OptionalLong requiredEndOffset, + List batch + ) { + if (batch.isEmpty()) { + throw new IllegalArgumentException("Batch cannot be empty"); + } + return shared.tryAppend(nodeId, leader.epoch(), requiredEndOffset, batch); } @Override @@ -784,7 +819,10 @@ public void resign(int epoch) { LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), currentEpoch + 1); try { - shared.tryAppend(nodeId, currentEpoch, new LeaderChangeBatch(nextLeader)); + shared.tryAppend(nodeId, + currentEpoch, + OptionalLong.empty(), + new LeaderChangeBatch(nextLeader)); } catch (NotLeaderException exp) { // the leader epoch has already advanced. resign is a no op. log.debug("Ignoring call to resign from epoch {}. Either we are not the leader or the provided epoch is " + diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java index d72b7557b48e0..c3a7283ab9edc 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.Optional; import java.util.OptionalInt; +import java.util.OptionalLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -155,11 +156,12 @@ public List allRecords() { */ public void appendInitialRecords(List records) { int initialLeaderEpoch = 1; - shared.append(new LeaderChangeBatch( + shared.append(OptionalLong.empty(), new LeaderChangeBatch( new LeaderAndEpoch(OptionalInt.empty(), initialLeaderEpoch + 1))); - shared.append(new LocalRecordBatch(initialLeaderEpoch + 1, 0, records)); - shared.append(new LeaderChangeBatch( - new LeaderAndEpoch(OptionalInt.of(0), initialLeaderEpoch + 2))); + shared.append(OptionalLong.empty(), + new LocalRecordBatch(initialLeaderEpoch + 1, 0, records)); + shared.append(OptionalLong.empty(), + new LeaderChangeBatch(new LeaderAndEpoch(OptionalInt.of(0), initialLeaderEpoch + 2))); } public String clusterId() { diff --git a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java index 3d7267d94a5ee..026f0ac530191 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java @@ -90,7 +90,7 @@ public synchronized void handleLoadSnapshot(SnapshotReader } @Override - public synchronized void handleLeaderChange(LeaderAndEpoch newLeaderAndEpoch) { + public synchronized void handleLeaderChange(LeaderAndEpoch newLeaderAndEpoch, long endOffset) { LeaderAndEpoch oldLeaderAndEpoch = this.leaderAndEpoch; this.leaderAndEpoch = newLeaderAndEpoch; diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 40c6a69590804..764c0999558c9 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -2287,27 +2287,22 @@ public void poll() { @Override public long scheduleAppend(int epoch, List records) { - return append(epoch, records, false); + return append(epoch, records, OptionalLong.empty(), false); } @Override - public long scheduleAtomicAppend(int epoch, List records) { - return append(epoch, records, true); + public long scheduleAtomicAppend(int epoch, OptionalLong requiredEndOffset, List records) { + return append(epoch, records, requiredEndOffset, true); } - private long append(int epoch, List records, boolean isAtomic) { + private long append(int epoch, List records, OptionalLong requiredEndOffset, boolean isAtomic) { LeaderState leaderState = quorum.maybeLeaderState().orElseThrow( () -> new NotLeaderException("Append failed because the replication is not the current leader") ); BatchAccumulator accumulator = leaderState.accumulator(); boolean isFirstAppend = accumulator.isEmpty(); - final long offset; - if (isAtomic) { - offset = accumulator.appendAtomic(epoch, records); - } else { - offset = accumulator.append(epoch, records); - } + final long offset = accumulator.append(epoch, records, requiredEndOffset, isAtomic); // Wakeup the network channel if either this is the first append // or the accumulator is ready to drain now. Checking for the first @@ -2570,10 +2565,10 @@ private void fireHandleCommit(long baseOffset, Records records) { /** * This API is used for committed records originating from {@link #scheduleAppend(int, List)} - * or {@link #scheduleAtomicAppend(int, List)} on this instance. In this case, we are able to - * save the original record objects, which saves the need to read them back from disk. This is - * a nice optimization for the leader which is typically doing more work than all of the - * followers. + * or {@link #scheduleAtomicAppend(int, OptionalLong, List)} on this instance. In this case, + * we are able to save the original record objects, which saves the need to read them back + * from disk. This is a nice optimization for the leader which is typically doing more work + * than all of the * followers. */ private void fireHandleCommit( long baseOffset, @@ -2608,7 +2603,7 @@ private void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch) { if (shouldFireLeaderChange(leaderAndEpoch)) { lastFiredLeaderChange = leaderAndEpoch; logger.debug("Notifying listener {} of leader change {}", listenerName(), leaderAndEpoch); - listener.handleLeaderChange(leaderAndEpoch); + listener.handleLeaderChange(leaderAndEpoch, log.endOffset().offset); } } @@ -2630,7 +2625,7 @@ private void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch, long epochStar // leader and begins writing to the log. if (shouldFireLeaderChange(leaderAndEpoch) && nextOffset() >= epochStartOffset) { lastFiredLeaderChange = leaderAndEpoch; - listener.handleLeaderChange(leaderAndEpoch); + listener.handleLeaderChange(leaderAndEpoch, log.endOffset().offset); } } diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java index 6422fb5f3a095..2b899923f91cb 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java @@ -18,6 +18,7 @@ import org.apache.kafka.raft.errors.BufferAllocationException; import org.apache.kafka.raft.errors.NotLeaderException; +import org.apache.kafka.raft.errors.UnexpectedEndOffsetException; import org.apache.kafka.snapshot.SnapshotReader; import org.apache.kafka.snapshot.SnapshotWriter; @@ -36,10 +37,10 @@ interface Listener { * after consuming the reader. * * Note that there is not a one-to-one correspondence between writes through - * {@link #scheduleAppend(int, List)} or {@link #scheduleAtomicAppend(int, List)} + * {@link #scheduleAppend(int, List)} or {@link #scheduleAtomicAppend(int, OptionalLong, List)} * and this callback. The Raft implementation is free to batch together the records * from multiple append calls provided that batch boundaries are respected. Records - * specified through {@link #scheduleAtomicAppend(int, List)} are guaranteed to be a + * specified through {@link #scheduleAtomicAppend(int, OptionalLong, List)} are guaranteed to be a * subset of a batch provided by the {@link BatchReader}. Records specified through * {@link #scheduleAppend(int, List)} are guaranteed to be in the same order but * they can map to any number of batches provided by the {@link BatchReader}. @@ -80,8 +81,11 @@ interface Listener { * epoch. * * @param leader the current leader and epoch + * @param endOffset the current log end offset (exclusive). This is useful for nodes that + * are claiming leadership, because it lets them know what log offset they + * should attempt to write to next. */ - default void handleLeaderChange(LeaderAndEpoch leader) {} + default void handleLeaderChange(LeaderAndEpoch leader, long endOffset) {} default void beginShutdown() {} } @@ -172,6 +176,7 @@ default void beginShutdown() {} * uncommitted entries after observing an epoch change. * * @param epoch the current leader epoch + * @param requiredEndOffset if this is set, it is the offset we must use. * @param records the list of records to append * @return the expected offset of the last record if append succeed * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if the size of the records is greater than the maximum @@ -179,8 +184,9 @@ default void beginShutdown() {} * committed * @throws NotLeaderException if we are not the current leader or the epoch doesn't match the leader epoch * @throws BufferAllocationException we failed to allocate memory for the records + * @throws UnexpectedEndOffsetException the requested end offset could not be obtained. */ - long scheduleAtomicAppend(int epoch, List records); + long scheduleAtomicAppend(int epoch, OptionalLong requiredEndOffset, List records); /** * Attempt a graceful shutdown of the client. This allows the leader to proactively diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java index cceb65930edbf..36fa99e12c910 100644 --- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java +++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java @@ -165,7 +165,7 @@ public synchronized void handleLoadSnapshot(SnapshotReader reader) { } @Override - public synchronized void handleLeaderChange(LeaderAndEpoch newLeader) { + public synchronized void handleLeaderChange(LeaderAndEpoch newLeader, long endOffset) { if (newLeader.isLeader(nodeId)) { log.debug("Counter uncommitted value initialized to {} after claiming leadership in epoch {}", committed, newLeader); diff --git a/raft/src/main/java/org/apache/kafka/raft/errors/UnexpectedEndOffsetException.java b/raft/src/main/java/org/apache/kafka/raft/errors/UnexpectedEndOffsetException.java new file mode 100644 index 0000000000000..f470134c9a376 --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/errors/UnexpectedEndOffsetException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.errors; + +/** + * Indicates that an append operation cannot be completed because it would have resulted in an + * unexpected end offset. + */ +public class UnexpectedEndOffsetException extends RaftException { + private final static long serialVersionUID = 1L; + + public UnexpectedEndOffsetException(String s) { + super(s); + } +} diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java index b84a7d57b8a72..87717d206d41c 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.raft.errors.BufferAllocationException; import org.apache.kafka.raft.errors.NotLeaderException; +import org.apache.kafka.raft.errors.UnexpectedEndOffsetException; import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.common.message.LeaderChangeMessage; @@ -38,6 +39,7 @@ import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; +import java.util.OptionalLong; import java.util.function.Function; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; @@ -89,52 +91,12 @@ public BatchAccumulator( this.appendLock = new ReentrantLock(); } - /** - * Append a list of records into as many batches as necessary. - * - * The order of the elements in the records argument will match the order in the batches. - * This method will use as many batches as necessary to serialize all of the records. Since - * this method can split the records into multiple batches it is possible that some of the - * records will get committed while other will not when the leader fails. - * - * @param epoch the expected leader epoch. If this does not match, then {@link NotLeaderException} - * will be thrown - * @param records the list of records to include in the batches - * @return the expected offset of the last record - * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum - * batch size; if this exception is throw some of the elements in records may have - * been committed - * @throws NotLeaderException if the epoch is less than the leader epoch - * @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch) - * @throws BufferAllocationException if we failed to allocate memory for the records - * @throws IllegalStateException if we tried to append new records after the batch has been built - */ - public long append(int epoch, List records) { - return append(epoch, records, false); - } - - /** - * Append a list of records into an atomic batch. We guarantee all records are included in the - * same underlying record batch so that either all of the records become committed or none of - * them do. - * - * @param epoch the expected leader epoch. If this does not match, then {@link NotLeaderException} - * will be thrown - * @param records the list of records to include in a batch - * @return the expected offset of the last record - * @throws RecordBatchTooLargeException if the size of the records is greater than the maximum - * batch size; if this exception is throw none of the elements in records were - * committed - * @throws NotLeaderException if the epoch is less than the leader epoch - * @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch) - * @throws BufferAllocationException if we failed to allocate memory for the records - * @throws IllegalStateException if we tried to append new records after the batch has been built - */ - public long appendAtomic(int epoch, List records) { - return append(epoch, records, true); - } - - private long append(int epoch, List records, boolean isAtomic) { + public long append( + int epoch, + List records, + OptionalLong requiredEndOffset, + boolean isAtomic + ) { if (epoch < this.epoch) { throw new NotLeaderException("Append failed because the given epoch " + epoch + " is stale. " + "Current leader epoch = " + this.epoch()); @@ -147,6 +109,13 @@ private long append(int epoch, List records, boolean isAtomic) { appendLock.lock(); try { + long endOffset = nextOffset + records.size() - 1; + requiredEndOffset.ifPresent(r -> { + if (r != endOffset) { + throw new UnexpectedEndOffsetException("Wanted end offset " + r + + ", but next available was " + endOffset); + } + }); maybeCompleteDrain(); BatchBuilder batch = null; @@ -164,12 +133,12 @@ private long append(int epoch, List records, boolean isAtomic) { } batch.appendRecord(record, serializationCache); - nextOffset += 1; } maybeResetLinger(); - return nextOffset - 1; + nextOffset = endOffset + 1; + return endOffset; } finally { appendLock.unlock(); } @@ -408,7 +377,9 @@ public int epoch() { * This call will not block, but the drain may require multiple attempts before * it can be completed if the thread responsible for appending is holding the * append lock. In the worst case, the append will be completed on the next - * call to {@link #append(int, List)} following the initial call to this method. + * call to {@link #append(int, List, OptionalLong, boolean)} following the + * initial call to this method. + * * The caller should respect the time to the next flush as indicated by * {@link #timeUntilDrain(long)}. * diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java index eeacf608a9ff0..a3f726be63e46 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java @@ -30,6 +30,7 @@ import java.util.Optional; import java.util.List; +import java.util.OptionalLong; import java.util.function.Supplier; final public class RecordsSnapshotWriter implements SnapshotWriter { @@ -184,7 +185,7 @@ public void append(List records) { throw new IllegalStateException(message); } - accumulator.append(snapshot.snapshotId().epoch(), records); + accumulator.append(snapshot.snapshotId().epoch(), records, OptionalLong.empty(), false); if (accumulator.needsDrain(time.milliseconds())) { appendBatches(accumulator.drain()); diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 7cbeb11f4f780..44704e901f685 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -355,7 +355,8 @@ public void testAppendFailedWithRecordBatchTooLargeException() throws Exception for (int i = 0; i < size; i++) batchToLarge.add("a"); - assertThrows(RecordBatchTooLargeException.class, () -> context.client.scheduleAtomicAppend(epoch, batchToLarge)); + assertThrows(RecordBatchTooLargeException.class, + () -> context.client.scheduleAtomicAppend(epoch, OptionalLong.empty(), batchToLarge)); } @Test diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 06a300f666c0f..6f8e3e422e304 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -1197,7 +1197,7 @@ void readBatch(BatchReader reader) { } @Override - public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) { + public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch, long endOffset) { // We record the next expected offset as the claimed epoch's start // offset. This is useful to verify that the `handleLeaderChange` callback // was not received early. diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java index 11499771790f9..ed7f6c94122f8 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java @@ -33,6 +33,7 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; +import java.util.OptionalLong; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -232,7 +233,7 @@ public void testLingerBeginsOnFirstWrite() { ); time.sleep(15); - assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a"))); + assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a"), OptionalLong.empty(), false)); assertEquals(lingerMs, acc.timeUntilDrain(time.milliseconds())); assertFalse(acc.isEmpty()); @@ -264,7 +265,7 @@ public void testCompletedBatchReleaseBuffer() { maxBatchSize ); - assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a"))); + assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a"), OptionalLong.empty(), false)); time.sleep(lingerMs); List> batches = acc.drain(); @@ -293,7 +294,7 @@ public void testUnflushedBuffersReleasedByClose() { maxBatchSize ); - assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a"))); + assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a"), OptionalLong.empty(), false)); acc.close(); Mockito.verify(memoryPool).release(buffer); } @@ -396,7 +397,7 @@ public void testRecordsAreSplit() { .generate(() -> record) .limit(numberOfRecords) .collect(Collectors.toList()); - assertEquals(baseOffset + numberOfRecords - 1, acc.append(leaderEpoch, records)); + assertEquals(baseOffset + numberOfRecords - 1, acc.append(leaderEpoch, records, OptionalLong.empty(), false)); time.sleep(lingerMs); assertTrue(acc.needsDrain(time.milliseconds())); @@ -451,7 +452,7 @@ public void testDrainDoesNotBlockWithConcurrentAppend() throws Exception { // Do the first append outside the thread to start the linger timer Mockito.when(memoryPool.tryAllocate(maxBatchSize)) .thenReturn(ByteBuffer.allocate(maxBatchSize)); - acc.append(leaderEpoch, singletonList("a")); + acc.append(leaderEpoch, singletonList("a"), OptionalLong.empty(), false); // Let the serde block to simulate a slow append Mockito.doAnswer(invocation -> { @@ -466,7 +467,7 @@ public void testDrainDoesNotBlockWithConcurrentAppend() throws Exception { Mockito.any(Writable.class) ); - Thread appendThread = new Thread(() -> acc.append(leaderEpoch, singletonList("b"))); + Thread appendThread = new Thread(() -> acc.append(leaderEpoch, singletonList("b"), OptionalLong.empty(), false)); appendThread.start(); // Attempt to drain while the append thread is holding the lock @@ -509,14 +510,14 @@ static interface Appender { static final Appender APPEND_ATOMIC = new Appender() { @Override public Long call(BatchAccumulator acc, int epoch, List records) { - return acc.appendAtomic(epoch, records); + return acc.append(epoch, records, OptionalLong.empty(), true); } }; static final Appender APPEND = new Appender() { @Override public Long call(BatchAccumulator acc, int epoch, List records) { - return acc.append(epoch, records); + return acc.append(epoch, records, OptionalLong.empty(), false); } }; } diff --git a/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java b/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java index 7c4f0e62a95fa..9f017c8414a65 100644 --- a/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java +++ b/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java @@ -24,6 +24,8 @@ import org.apache.kafka.common.utils.LogContext; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -101,4 +103,28 @@ public void testFailEvents() { assertEquals(RuntimeException.class, assertThrows(ExecutionException.class, () -> event3.future.get()).getCause().getClass()); } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testReEntrantCompletion(boolean completeExceptionally) { + final DeferredEventQueue deferredEventQueue = new DeferredEventQueue(new LogContext()); + SampleDeferredEvent event1 = new SampleDeferredEvent(); + SampleDeferredEvent event2 = new SampleDeferredEvent(); + event1.future().thenAccept(__ -> { + deferredEventQueue.completeUpTo(3); + }); + deferredEventQueue.add(1, event1); + deferredEventQueue.add(3, event2); + deferredEventQueue.completeUpTo(0); + assertFalse(event1.future().isDone()); + assertFalse(event2.future().isDone()); + if (completeExceptionally) { + deferredEventQueue.failAll(new RuntimeException("oops")); + } else { + deferredEventQueue.completeUpTo(1); + } + assertTrue(event1.future().isDone()); + assertEquals(completeExceptionally, event1.future().isCompletedExceptionally()); + assertTrue(event2.future().isDone()); + } } From 395216e397d665d61d7bd0b69ed0cd2abc9aea65 Mon Sep 17 00:00:00 2001 From: "Colin P. McCabe" Date: Fri, 14 Jul 2023 13:08:05 -0700 Subject: [PATCH 02/11] Address review comments --- .../kafka/controller/QuorumController.java | 55 +++++++++---------- .../errors/EventHandlerExceptionInfo.java | 11 ++-- .../errors/EventHandlerExceptionInfoTest.java | 18 +++--- .../org/apache/kafka/raft/RaftClient.java | 2 +- .../deferred/DeferredEventQueueTest.java | 26 --------- 5 files changed, 41 insertions(+), 71 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 3026154fb4fc4..b74dda23902c2 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -474,7 +474,11 @@ private Throwable handleEventException( } EventHandlerExceptionInfo info = EventHandlerExceptionInfo. fromInternal(exception, () -> latestController()); - String failureMessage = info.failureMessage(lastCommittedEpoch, deltaUs, + int epoch = curClaimEpoch; + if (epoch == -1) { + epoch = lastCommittedEpoch; + } + String failureMessage = info.failureMessage(epoch, deltaUs, isActiveController(), lastCommittedOffset); if (info.isFault()) { nonFatalFaultHandler.handleFault(name + ": " + failureMessage, exception); @@ -735,22 +739,25 @@ public Long apply(List records) { // Start by trying to apply the record to our in-memory state. This should always // succeed; if it does not, that's a fatal error. It is important to do this before // scheduling the record for Raft replication. - int i = 0; + int recordIndex = 0; for (ApiMessageAndVersion message : records) { + long recordOffset = prevEndOffset + 1 + recordIndex; try { - replay(message.message(), Optional.empty(), prevEndOffset + i + 1); + replay(message.message(), Optional.empty(), recordOffset); } catch (Throwable e) { - String failureMessage = String.format("Unable to apply %s record, which was " + - "%d of %d record(s) in the batch following last write offset %d.", - message.message().getClass().getSimpleName(), i + 1, records.size(), - prevEndOffset); + String failureMessage = String.format("Unable to apply %s " + + "record at offset %d on active controller, from the " + + "batch with baseOffset %d", + message.message().getClass().getSimpleName(), + recordOffset, prevEndOffset + 1); throw fatalFaultHandler.handleFault(failureMessage, e); } - i++; + recordIndex++; } - long nextEndOffset = prevEndOffset + i; + long nextEndOffset = prevEndOffset + recordIndex; raftClient.scheduleAtomicAppend(controllerEpoch, OptionalLong.of(nextEndOffset), records); snapshotRegistry.getOrCreateSnapshot(nextEndOffset); + snapshotRegistry.getOrCreateSnapshot(nextEndOffset); prevEndOffset = nextEndOffset; return nextEndOffset; } @@ -970,18 +977,20 @@ public void handleCommit(BatchReader reader) { log.debug("Replaying commits from the active node up to " + "offset {} and epoch {}.", offset, epoch); } - int i = 0; + int recordIndex = 0; for (ApiMessageAndVersion message : messages) { + long recordOffset = batch.baseOffset() + recordIndex; try { - replay(message.message(), Optional.empty(), batch.baseOffset() + i); + replay(message.message(), Optional.empty(), recordOffset); } catch (Throwable e) { - String failureMessage = String.format("Unable to apply %s record on standby " + - "controller, which was %d of %d record(s) in the batch with baseOffset %d.", - message.message().getClass().getSimpleName(), i + 1, messages.size(), - batch.baseOffset()); + String failureMessage = String.format("Unable to apply %s " + + "record at offset %d on standby controller, from the " + + "batch with baseOffset %d", + message.message().getClass().getSimpleName(), + recordOffset, batch.baseOffset()); throw fatalFaultHandler.handleFault(failureMessage, e); } - i++; + recordIndex++; } } @@ -990,13 +999,6 @@ public void handleCommit(BatchReader reader) { epoch, batch.appendTimestamp() ); - - if (offset >= raftClient.latestSnapshotId().map(OffsetAndEpoch::offset).orElse(0L)) { - oldestNonSnapshottedTimestamp = Math.min( - oldestNonSnapshottedTimestamp, - batch.appendTimestamp() - ); - } } } finally { reader.close(); @@ -1154,8 +1156,6 @@ private void claim(int epoch, long newLastWriteOffset) { "active controller."); } curClaimEpoch = epoch; - lastCommittedOffset = newLastWriteOffset; - lastCommittedEpoch = epoch; controllerMetrics.setActive(true); updateWriteOffset(newLastWriteOffset); clusterControl.activate(); @@ -1747,11 +1747,6 @@ private void resetToEmptyState() { */ private long writeOffset; - /** - * Timestamp for the oldest record that was committed but not included in a snapshot. - */ - private long oldestNonSnapshottedTimestamp = Long.MAX_VALUE; - /** * How long to delay partition leader balancing operations. */ diff --git a/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java b/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java index 4e74b71a677b0..5f68baa6c8096 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java +++ b/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java @@ -36,7 +36,8 @@ public final class EventHandlerExceptionInfo { /** - * True if this exception should be treated as a fault. + * True if this exception should be treated as a fault, and tracked via the metadata errors + * metric. */ private final boolean isFault; @@ -91,7 +92,7 @@ public static EventHandlerExceptionInfo fromInternal( // The active controller picked the wrong end offset for its next batch. It must now // fail over. This should be pretty rare. return new EventHandlerExceptionInfo(false, true, internal, - new NotControllerException("Unexpected end offset. Controller not known.")); + new NotControllerException("Unexpected end offset. Controller will resign.")); } else if (internal instanceof InterruptedException) { // The controller event queue has been interrupted. This normally only happens during // a JUnit test that has hung. The test framework sometimes sends an InterruptException @@ -162,9 +163,9 @@ public String failureMessage( ) { StringBuilder bld = new StringBuilder(); if (deltaUs.isPresent()) { - bld.append("failed with "); + bld.append("event failed with "); } else { - bld.append("unable to start processing because of "); + bld.append("event unable to start processing because of "); } bld.append(internalException.getClass().getSimpleName()); if (externalException.isPresent()) { @@ -220,4 +221,4 @@ public String toString() { externalException.get().getClass().getCanonicalName() : "(none)") + ")"; } -} \ No newline at end of file +} diff --git a/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java b/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java index ccffde2831e1f..6fb5fc4a6c71f 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java @@ -74,7 +74,7 @@ public void testTopicExistsExceptionInfo() { @Test public void testTopicExistsExceptionFailureMessage() { - assertEquals("failed with TopicExistsException in 234 microseconds.", + assertEquals("event failed with TopicExistsException in 234 microseconds.", TOPIC_EXISTS.failureMessage(123, OptionalLong.of(234L), true, 456L)); } @@ -88,7 +88,7 @@ public void testRejectedExecutionExceptionInfo() { @Test public void testRejectedExecutionExceptionFailureMessage() { - assertEquals("unable to start processing because of RejectedExecutionException (treated " + + assertEquals("event unable to start processing because of RejectedExecutionException (treated " + "as TimeoutException).", REJECTED_EXECUTION.failureMessage(123, OptionalLong.empty(), true, 456L)); } @@ -103,7 +103,7 @@ public void testInterruptedExceptionInfo() { @Test public void testInterruptedExceptionFailureMessageWhenActive() { - assertEquals("unable to start processing because of InterruptedException (treated as " + + assertEquals("event unable to start processing because of InterruptedException (treated as " + "UnknownServerException) at epoch 123. Renouncing leadership and reverting to the " + "last committed offset 456.", INTERRUPTED.failureMessage(123, OptionalLong.empty(), true, 456L)); @@ -111,7 +111,7 @@ public void testInterruptedExceptionFailureMessageWhenActive() { @Test public void testInterruptedExceptionFailureMessageWhenInactive() { - assertEquals("unable to start processing because of InterruptedException (treated as " + + assertEquals("event unable to start processing because of InterruptedException (treated as " + "UnknownServerException) at epoch 123. The controller is already in standby mode.", INTERRUPTED.failureMessage(123, OptionalLong.empty(), false, 456L)); } @@ -126,7 +126,7 @@ public void testNullPointerExceptionInfo() { @Test public void testNullPointerExceptionFailureMessageWhenActive() { - assertEquals("failed with NullPointerException (treated as UnknownServerException) " + + assertEquals("event failed with NullPointerException (treated as UnknownServerException) " + "at epoch 123 in 40 microseconds. Renouncing leadership and reverting to the last " + "committed offset 456.", NULL_POINTER.failureMessage(123, OptionalLong.of(40L), true, 456L)); @@ -134,7 +134,7 @@ public void testNullPointerExceptionFailureMessageWhenActive() { @Test public void testNullPointerExceptionFailureMessageWhenInactive() { - assertEquals("failed with NullPointerException (treated as UnknownServerException) " + + assertEquals("event failed with NullPointerException (treated as UnknownServerException) " + "at epoch 123 in 40 microseconds. The controller is already in standby mode.", NULL_POINTER.failureMessage(123, OptionalLong.of(40L), false, 456L)); } @@ -149,7 +149,7 @@ public void testNotLeaderExceptionInfo() { @Test public void testNotLeaderExceptionFailureMessage() { - assertEquals("unable to start processing because of NotLeaderException (treated as " + + assertEquals("event unable to start processing because of NotLeaderException (treated as " + "NotControllerException) at epoch 123. Renouncing leadership and reverting to the " + "last committed offset 456.", NOT_LEADER.failureMessage(123, OptionalLong.empty(), true, 456L)); @@ -159,13 +159,13 @@ public void testNotLeaderExceptionFailureMessage() { public void testUnexpectedEndOffsetExceptionInfo() { assertEquals(new EventHandlerExceptionInfo(false, true, new UnexpectedEndOffsetException("Wanted end offset 3, but next available was 4"), - new NotControllerException("Unexpected end offset. Controller not known.")), + new NotControllerException("Unexpected end offset. Controller will resign.")), UNEXPECTED_END_OFFSET); } @Test public void testUnepxectedEndOffsetFailureMessage() { - assertEquals("failed with UnexpectedEndOffsetException (treated as " + + assertEquals("event failed with UnexpectedEndOffsetException (treated as " + "NotControllerException) at epoch 123 in 90 microseconds. Renouncing leadership " + "and reverting to the last committed offset 456.", UNEXPECTED_END_OFFSET.failureMessage(123, OptionalLong.of(90L), true, 456L)); diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java index 2b899923f91cb..d9414706ad41b 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java @@ -176,7 +176,7 @@ default void beginShutdown() {} * uncommitted entries after observing an epoch change. * * @param epoch the current leader epoch - * @param requiredEndOffset if this is set, it is the offset we must use. + * @param requiredEndOffset if this is set, it is the offset we must use as the end offset (inclusive). * @param records the list of records to append * @return the expected offset of the last record if append succeed * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if the size of the records is greater than the maximum diff --git a/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java b/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java index 9f017c8414a65..7c4f0e62a95fa 100644 --- a/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java +++ b/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java @@ -24,8 +24,6 @@ import org.apache.kafka.common.utils.LogContext; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -103,28 +101,4 @@ public void testFailEvents() { assertEquals(RuntimeException.class, assertThrows(ExecutionException.class, () -> event3.future.get()).getCause().getClass()); } - - @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void testReEntrantCompletion(boolean completeExceptionally) { - final DeferredEventQueue deferredEventQueue = new DeferredEventQueue(new LogContext()); - SampleDeferredEvent event1 = new SampleDeferredEvent(); - SampleDeferredEvent event2 = new SampleDeferredEvent(); - event1.future().thenAccept(__ -> { - deferredEventQueue.completeUpTo(3); - }); - deferredEventQueue.add(1, event1); - deferredEventQueue.add(3, event2); - deferredEventQueue.completeUpTo(0); - assertFalse(event1.future().isDone()); - assertFalse(event2.future().isDone()); - if (completeExceptionally) { - deferredEventQueue.failAll(new RuntimeException("oops")); - } else { - deferredEventQueue.completeUpTo(1); - } - assertTrue(event1.future().isDone()); - assertEquals(completeExceptionally, event1.future().isCompletedExceptionally()); - assertTrue(event2.future().isDone()); - } } From 9cc3d9b0d54b252f8109d48c422cb9a6aecb9962 Mon Sep 17 00:00:00 2001 From: "Colin P. McCabe" Date: Sun, 16 Jul 2023 10:20:56 -0700 Subject: [PATCH 03/11] remove duplciate getOrCreateSnapshot call --- .../main/java/org/apache/kafka/controller/QuorumController.java | 1 - 1 file changed, 1 deletion(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index b74dda23902c2..17075721e59a7 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -757,7 +757,6 @@ public Long apply(List records) { long nextEndOffset = prevEndOffset + recordIndex; raftClient.scheduleAtomicAppend(controllerEpoch, OptionalLong.of(nextEndOffset), records); snapshotRegistry.getOrCreateSnapshot(nextEndOffset); - snapshotRegistry.getOrCreateSnapshot(nextEndOffset); prevEndOffset = nextEndOffset; return nextEndOffset; } From fb681d35365b4432d11b6a02072a52541110fb0b Mon Sep 17 00:00:00 2001 From: "Colin P. McCabe" Date: Tue, 18 Jul 2023 11:23:31 -0700 Subject: [PATCH 04/11] address review comments --- .../kafka/controller/QuorumController.java | 4 ++- .../errors/EventHandlerExceptionInfo.java | 4 +-- .../errors/EventHandlerExceptionInfoTest.java | 12 ++++---- .../apache/kafka/metalog/LocalLogManager.java | 20 ++++++------- .../apache/kafka/raft/KafkaRaftClient.java | 8 ++--- .../org/apache/kafka/raft/RaftClient.java | 8 ++--- ...ava => UnexpectedBaseOffsetException.java} | 6 ++-- .../raft/internals/BatchAccumulator.java | 29 +++++++++++++++---- 8 files changed, 55 insertions(+), 36 deletions(-) rename raft/src/main/java/org/apache/kafka/raft/errors/{UnexpectedEndOffsetException.java => UnexpectedBaseOffsetException.java} (87%) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 17075721e59a7..c950817a224e0 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -755,7 +755,9 @@ public Long apply(List records) { recordIndex++; } long nextEndOffset = prevEndOffset + recordIndex; - raftClient.scheduleAtomicAppend(controllerEpoch, OptionalLong.of(nextEndOffset), records); + raftClient.scheduleAtomicAppend(controllerEpoch, + OptionalLong.of(prevEndOffset + 1), + records); snapshotRegistry.getOrCreateSnapshot(nextEndOffset); prevEndOffset = nextEndOffset; return nextEndOffset; diff --git a/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java b/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java index 5f68baa6c8096..f545099242f59 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java +++ b/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.raft.errors.NotLeaderException; -import org.apache.kafka.raft.errors.UnexpectedEndOffsetException; +import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException; import org.apache.kafka.server.mutable.BoundedListTooLongException; import java.util.Objects; @@ -88,7 +88,7 @@ public static EventHandlerExceptionInfo fromInternal( return new EventHandlerExceptionInfo(false, false, internal, new PolicyViolationException("Unable to perform excessively large batch " + "operation.")); - } else if (internal instanceof UnexpectedEndOffsetException) { + } else if (internal instanceof UnexpectedBaseOffsetException) { // The active controller picked the wrong end offset for its next batch. It must now // fail over. This should be pretty rare. return new EventHandlerExceptionInfo(false, true, internal, diff --git a/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java b/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java index 6fb5fc4a6c71f..87c934855025b 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.raft.errors.NotLeaderException; -import org.apache.kafka.raft.errors.UnexpectedEndOffsetException; +import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -62,7 +62,7 @@ public class EventHandlerExceptionInfoTest { private static final EventHandlerExceptionInfo UNEXPECTED_END_OFFSET = EventHandlerExceptionInfo.fromInternal( - new UnexpectedEndOffsetException("Wanted end offset 3, but next available was 4"), + new UnexpectedBaseOffsetException("Wanted base offset 3, but the next offset was 4"), () -> OptionalInt.of(1)); @Test @@ -156,16 +156,16 @@ public void testNotLeaderExceptionFailureMessage() { } @Test - public void testUnexpectedEndOffsetExceptionInfo() { + public void testUnexpectedBaseOffsetExceptionInfo() { assertEquals(new EventHandlerExceptionInfo(false, true, - new UnexpectedEndOffsetException("Wanted end offset 3, but next available was 4"), + new UnexpectedBaseOffsetException("Wanted base offset 3, but the next offset was 4"), new NotControllerException("Unexpected end offset. Controller will resign.")), UNEXPECTED_END_OFFSET); } @Test - public void testUnepxectedEndOffsetFailureMessage() { - assertEquals("event failed with UnexpectedEndOffsetException (treated as " + + public void testUnexpectedBaseOffsetFailureMessage() { + assertEquals("event failed with UnexpectedBaseOffsetException (treated as " + "NotControllerException) at epoch 123 in 90 microseconds. Renouncing leadership " + "and reverting to the last committed offset 456.", UNEXPECTED_END_OFFSET.failureMessage(123, OptionalLong.of(90L), true, 456L)); diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java index 1f1b5d72e4754..88237d98ce679 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java @@ -32,7 +32,7 @@ import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.RaftClient; import org.apache.kafka.raft.errors.NotLeaderException; -import org.apache.kafka.raft.errors.UnexpectedEndOffsetException; +import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException; import org.apache.kafka.raft.internals.MemoryBatchReader; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.snapshot.MockRawSnapshotReader; @@ -225,21 +225,21 @@ synchronized void unregisterLogManager(LocalLogManager logManager) { synchronized long tryAppend( int nodeId, int epoch, - OptionalLong requiredEndOffset, + OptionalLong requiredBaseOffset, List batch ) { // No easy access to the concept of time. Use the base offset as the append timestamp long appendTimestamp = (prevOffset + 1) * 10; return tryAppend(nodeId, epoch, - requiredEndOffset, + requiredBaseOffset, new LocalRecordBatch(epoch, appendTimestamp, batch)); } synchronized long tryAppend( int nodeId, int epoch, - OptionalLong requiredEndOffset, + OptionalLong requiredBaseOffset, LocalBatch batch ) { if (!leader.isLeader(nodeId)) { @@ -257,20 +257,20 @@ synchronized long tryAppend( } log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch); - long offset = append(requiredEndOffset, batch); + long offset = append(requiredBaseOffset, batch); electLeaderIfNeeded(); return offset; } public synchronized long append( - OptionalLong requiredEndOffset, + OptionalLong requiredBaseOffset, LocalBatch batch ) { long nextEndOffset = prevOffset + batch.size(); - requiredEndOffset.ifPresent(r -> { - if (r != nextEndOffset) { - throw new UnexpectedEndOffsetException("Wanted end offset " + r + - ", but next available was " + nextEndOffset); + requiredBaseOffset.ifPresent(r -> { + if (r != prevOffset + 1) { + throw new UnexpectedBaseOffsetException("Wanted base offset " + r + + ", but the next offset was " + nextEndOffset); } }); log.debug("append(batch={}, nextEndOffset={})", batch, nextEndOffset); diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 764c0999558c9..6d99768b521a6 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -2291,18 +2291,18 @@ public long scheduleAppend(int epoch, List records) { } @Override - public long scheduleAtomicAppend(int epoch, OptionalLong requiredEndOffset, List records) { - return append(epoch, records, requiredEndOffset, true); + public long scheduleAtomicAppend(int epoch, OptionalLong requiredBaseOffset, List records) { + return append(epoch, records, requiredBaseOffset, true); } - private long append(int epoch, List records, OptionalLong requiredEndOffset, boolean isAtomic) { + private long append(int epoch, List records, OptionalLong requiredBaseOffset, boolean isAtomic) { LeaderState leaderState = quorum.maybeLeaderState().orElseThrow( () -> new NotLeaderException("Append failed because the replication is not the current leader") ); BatchAccumulator accumulator = leaderState.accumulator(); boolean isFirstAppend = accumulator.isEmpty(); - final long offset = accumulator.append(epoch, records, requiredEndOffset, isAtomic); + final long offset = accumulator.append(epoch, records, requiredBaseOffset, isAtomic); // Wakeup the network channel if either this is the first append // or the accumulator is ready to drain now. Checking for the first diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java index d9414706ad41b..071f99f65fc10 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java @@ -18,7 +18,7 @@ import org.apache.kafka.raft.errors.BufferAllocationException; import org.apache.kafka.raft.errors.NotLeaderException; -import org.apache.kafka.raft.errors.UnexpectedEndOffsetException; +import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException; import org.apache.kafka.snapshot.SnapshotReader; import org.apache.kafka.snapshot.SnapshotWriter; @@ -176,7 +176,7 @@ default void beginShutdown() {} * uncommitted entries after observing an epoch change. * * @param epoch the current leader epoch - * @param requiredEndOffset if this is set, it is the offset we must use as the end offset (inclusive). + * @param requiredBaseOffset if this is set, it is the offset we must use as the base offset. * @param records the list of records to append * @return the expected offset of the last record if append succeed * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if the size of the records is greater than the maximum @@ -184,9 +184,9 @@ default void beginShutdown() {} * committed * @throws NotLeaderException if we are not the current leader or the epoch doesn't match the leader epoch * @throws BufferAllocationException we failed to allocate memory for the records - * @throws UnexpectedEndOffsetException the requested end offset could not be obtained. + * @throws UnexpectedBaseOffsetException the requested end offset could not be obtained. */ - long scheduleAtomicAppend(int epoch, OptionalLong requiredEndOffset, List records); + long scheduleAtomicAppend(int epoch, OptionalLong requiredBaseOffset, List records); /** * Attempt a graceful shutdown of the client. This allows the leader to proactively diff --git a/raft/src/main/java/org/apache/kafka/raft/errors/UnexpectedEndOffsetException.java b/raft/src/main/java/org/apache/kafka/raft/errors/UnexpectedBaseOffsetException.java similarity index 87% rename from raft/src/main/java/org/apache/kafka/raft/errors/UnexpectedEndOffsetException.java rename to raft/src/main/java/org/apache/kafka/raft/errors/UnexpectedBaseOffsetException.java index f470134c9a376..192e8b2bb66ce 100644 --- a/raft/src/main/java/org/apache/kafka/raft/errors/UnexpectedEndOffsetException.java +++ b/raft/src/main/java/org/apache/kafka/raft/errors/UnexpectedBaseOffsetException.java @@ -18,12 +18,12 @@ /** * Indicates that an append operation cannot be completed because it would have resulted in an - * unexpected end offset. + * unexpected base offset. */ -public class UnexpectedEndOffsetException extends RaftException { +public class UnexpectedBaseOffsetException extends RaftException { private final static long serialVersionUID = 1L; - public UnexpectedEndOffsetException(String s) { + public UnexpectedBaseOffsetException(String s) { super(s); } } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java index 87717d206d41c..1d50d7c4c90d8 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java @@ -24,7 +24,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.raft.errors.BufferAllocationException; import org.apache.kafka.raft.errors.NotLeaderException; -import org.apache.kafka.raft.errors.UnexpectedEndOffsetException; +import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException; import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.common.message.LeaderChangeMessage; @@ -91,10 +91,27 @@ public BatchAccumulator( this.appendLock = new ReentrantLock(); } + /** + * Append to the accumulator. + * + * @param epoch The leader epoch to append at. + * @param records The records to append. + * @param requiredBaseOffset If this is non-empty, the base offset which we must use. + * @param isAtomic True if we should append the records as a single batch. + * @return The end offset. + * + * @throws NotLeaderException Indicates that an append operation cannot be completed + * because the provided leader epoch was too old. + * @throws IllegalArgumentException Indicates that an append operation cannot be completed + * because the provided leader epoch was too new. + * @throws UnexpectedBaseOffsetException Indicates that an append operation cannot + * be completed because it would have resulted + * in an unexpected base offset. + */ public long append( int epoch, List records, - OptionalLong requiredEndOffset, + OptionalLong requiredBaseOffset, boolean isAtomic ) { if (epoch < this.epoch) { @@ -110,10 +127,10 @@ public long append( appendLock.lock(); try { long endOffset = nextOffset + records.size() - 1; - requiredEndOffset.ifPresent(r -> { - if (r != endOffset) { - throw new UnexpectedEndOffsetException("Wanted end offset " + r + - ", but next available was " + endOffset); + requiredBaseOffset.ifPresent(r -> { + if (r != nextOffset) { + throw new UnexpectedBaseOffsetException("Wanted base offset " + r + + ", but the next offset was " + nextOffset); } }); maybeCompleteDrain(); From 1e6437a2f0a5c61a107a49ba2179f62b4227731d Mon Sep 17 00:00:00 2001 From: "Colin P. McCabe" Date: Mon, 24 Jul 2023 15:29:07 -0700 Subject: [PATCH 05/11] remove endOffset from handleLeaderChange. add RaftClient#endOffset method --- .../src/main/scala/kafka/tools/TestRaftServer.scala | 2 +- .../apache/kafka/controller/QuorumController.java | 4 ++-- .../apache/kafka/image/loader/MetadataLoader.java | 2 +- .../kafka/metadata/util/SnapshotFileReader.java | 2 +- .../kafka/image/publisher/SnapshotEmitterTest.java | 5 +++++ .../org/apache/kafka/metalog/LocalLogManager.java | 9 +++++++-- .../kafka/metalog/MockMetaLogManagerListener.java | 2 +- .../java/org/apache/kafka/raft/KafkaRaftClient.java | 9 +++++++-- .../main/java/org/apache/kafka/raft/RaftClient.java | 13 +++++++++---- .../org/apache/kafka/raft/ReplicatedCounter.java | 2 +- .../apache/kafka/raft/RaftClientTestContext.java | 2 +- 11 files changed, 36 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index 6d2653979ada0..1026c52847337 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -173,7 +173,7 @@ class TestRaftServer( raftManager.register(this) - override def handleLeaderChange(newLeaderAndEpoch: LeaderAndEpoch, endOffset: Long): Unit = { + override def handleLeaderChange(newLeaderAndEpoch: LeaderAndEpoch): Unit = { if (newLeaderAndEpoch.isLeader(config.nodeId)) { eventQueue.offer(HandleClaim(newLeaderAndEpoch.epoch)) } else if (claimedEpoch.isDefined) { diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index c950817a224e0..9453614cbb461 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -1059,7 +1059,7 @@ public void handleLoadSnapshot(SnapshotReader reader) { } @Override - public void handleLeaderChange(LeaderAndEpoch newLeader, long endOffset) { + public void handleLeaderChange(LeaderAndEpoch newLeader) { appendRaftEvent("handleLeaderChange[" + newLeader.epoch() + "]", () -> { final String newLeaderName = newLeader.leaderId().isPresent() ? String.valueOf(newLeader.leaderId().getAsInt()) : "(none)"; @@ -1076,7 +1076,7 @@ public void handleLeaderChange(LeaderAndEpoch newLeader, long endOffset) { renounce(); } } else if (newLeader.isLeader(nodeId)) { - long newLastWriteOffset = endOffset - 1; + long newLastWriteOffset = raftClient.logEndOffset() - 1; log.info("Becoming the active controller at epoch {}, last write offset {}.", newLeader.epoch(), newLastWriteOffset); claim(newLeader.epoch(), newLastWriteOffset); diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java index c923742af9fa7..768fcb2574b2f 100644 --- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java +++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java @@ -495,7 +495,7 @@ SnapshotManifest loadSnapshot( } @Override - public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch, long endOffset) { + public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) { eventQueue.append(() -> { currentLeaderAndEpoch = leaderAndEpoch; for (MetadataPublisher publisher : publishers.values()) { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java index 5db1e575e7967..19875741ec2de 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java @@ -137,7 +137,7 @@ private void handleControlBatch(FileChannelRecordBatch batch) { listener.handleLeaderChange(new LeaderAndEpoch( OptionalInt.of(message.leaderId()), batch.partitionLeaderEpoch() - ), batch.lastOffset() + 1); + )); break; default: log.error("Ignoring control record with type {} at offset {}", diff --git a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java index 038de599009f2..13ed3fb9a5d53 100644 --- a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java @@ -122,6 +122,11 @@ public Optional latestSnapshotId() { } } + @Override + public long logEndOffset() { + return 0; + } + @Override public void close() throws Exception { // nothing to do diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java index 88237d98ce679..c33cdf2a1b65d 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java @@ -455,8 +455,8 @@ void handleLoadSnapshot(SnapshotReader reader) { void handleLeaderChange(long offset, LeaderAndEpoch leader) { // Simulate KRaft implementation by first bumping the epoch before assigning a leader - listener.handleLeaderChange(new LeaderAndEpoch(OptionalInt.empty(), leader.epoch()), offset + 1); - listener.handleLeaderChange(leader, offset + 1); + listener.handleLeaderChange(new LeaderAndEpoch(OptionalInt.empty(), leader.epoch())); + listener.handleLeaderChange(leader); notifiedLeader = leader; this.offset = offset; @@ -860,6 +860,11 @@ public synchronized Optional latestSnapshotId() { return shared.latestSnapshotId(); } + @Override + public synchronized long logEndOffset() { + return shared.prevOffset + 1; + } + @Override public LeaderAndEpoch leaderAndEpoch() { return leader; diff --git a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java index 026f0ac530191..3d7267d94a5ee 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java @@ -90,7 +90,7 @@ public synchronized void handleLoadSnapshot(SnapshotReader } @Override - public synchronized void handleLeaderChange(LeaderAndEpoch newLeaderAndEpoch, long endOffset) { + public synchronized void handleLeaderChange(LeaderAndEpoch newLeaderAndEpoch) { LeaderAndEpoch oldLeaderAndEpoch = this.leaderAndEpoch; this.leaderAndEpoch = newLeaderAndEpoch; diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 6d99768b521a6..28ba2f8d83f53 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -2394,6 +2394,11 @@ public Optional latestSnapshotId() { return log.latestSnapshotId(); } + @Override + public long logEndOffset() { + return log.endOffset().offset; + } + @Override public void close() { log.flush(true); @@ -2603,7 +2608,7 @@ private void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch) { if (shouldFireLeaderChange(leaderAndEpoch)) { lastFiredLeaderChange = leaderAndEpoch; logger.debug("Notifying listener {} of leader change {}", listenerName(), leaderAndEpoch); - listener.handleLeaderChange(leaderAndEpoch, log.endOffset().offset); + listener.handleLeaderChange(leaderAndEpoch); } } @@ -2625,7 +2630,7 @@ private void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch, long epochStar // leader and begins writing to the log. if (shouldFireLeaderChange(leaderAndEpoch) && nextOffset() >= epochStartOffset) { lastFiredLeaderChange = leaderAndEpoch; - listener.handleLeaderChange(leaderAndEpoch, log.endOffset().offset); + listener.handleLeaderChange(leaderAndEpoch); } } diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java index 071f99f65fc10..2d93051b78fff 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java @@ -81,11 +81,8 @@ interface Listener { * epoch. * * @param leader the current leader and epoch - * @param endOffset the current log end offset (exclusive). This is useful for nodes that - * are claiming leadership, because it lets them know what log offset they - * should attempt to write to next. */ - default void handleLeaderChange(LeaderAndEpoch leader, long endOffset) {} + default void handleLeaderChange(LeaderAndEpoch leader) {} default void beginShutdown() {} } @@ -247,4 +244,12 @@ default void beginShutdown() {} * @return the id of the latest snapshot, if it exists */ Optional latestSnapshotId(); + + /** + * Returns the current end of the log. This method is thread-safe. + * + * @return the log end offset, which is one greater than the offset of the last record written, + * or 0 if there have not been any records written. + */ + long logEndOffset(); } diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java index 36fa99e12c910..cceb65930edbf 100644 --- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java +++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java @@ -165,7 +165,7 @@ public synchronized void handleLoadSnapshot(SnapshotReader reader) { } @Override - public synchronized void handleLeaderChange(LeaderAndEpoch newLeader, long endOffset) { + public synchronized void handleLeaderChange(LeaderAndEpoch newLeader) { if (newLeader.isLeader(nodeId)) { log.debug("Counter uncommitted value initialized to {} after claiming leadership in epoch {}", committed, newLeader); diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 6f8e3e422e304..06a300f666c0f 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -1197,7 +1197,7 @@ void readBatch(BatchReader reader) { } @Override - public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch, long endOffset) { + public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) { // We record the next expected offset as the claimed epoch's start // offset. This is useful to verify that the `handleLeaderChange` callback // was not received early. From 9996d3a645eda731c8266d2be47db1c08585ca89 Mon Sep 17 00:00:00 2001 From: "Colin P. McCabe" Date: Tue, 25 Jul 2023 08:46:45 -0700 Subject: [PATCH 06/11] address review comments BatchAccumulator: rename endOffset to lastOffset. BatchAccumulatorTest: add test of append with required offset. --- .../raft/internals/BatchAccumulator.java | 6 +-- .../raft/internals/BatchAccumulatorTest.java | 40 +++++++++++++++++++ 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java index 1d50d7c4c90d8..53d7db99340c6 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java @@ -126,7 +126,7 @@ public long append( appendLock.lock(); try { - long endOffset = nextOffset + records.size() - 1; + long lastOffset = nextOffset + records.size() - 1; requiredBaseOffset.ifPresent(r -> { if (r != nextOffset) { throw new UnexpectedBaseOffsetException("Wanted base offset " + r + @@ -154,8 +154,8 @@ public long append( maybeResetLinger(); - nextOffset = endOffset + 1; - return endOffset; + nextOffset = lastOffset + 1; + return lastOffset; } finally { appendLock.unlock(); } diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java index ed7f6c94122f8..70ae8cc3e209b 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java @@ -27,7 +27,10 @@ import org.apache.kafka.common.utils.ByteUtils; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mockito; import java.nio.ByteBuffer; @@ -42,6 +45,7 @@ import static java.util.Collections.singletonList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; class BatchAccumulatorTest { @@ -520,4 +524,40 @@ public Long call(BatchAccumulator acc, int epoch, List records) return acc.append(epoch, records, OptionalLong.empty(), false); } }; + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testAppendWithRequiredBaseOffset(boolean correctOffset) { + int leaderEpoch = 17; + long baseOffset = 157; + int lingerMs = 50; + int maxBatchSize = 512; + + ByteBuffer buffer = ByteBuffer.allocate(maxBatchSize); + Mockito.when(memoryPool.tryAllocate(maxBatchSize)) + .thenReturn(buffer); + + BatchAccumulator acc = buildAccumulator( + leaderEpoch, + baseOffset, + lingerMs, + maxBatchSize + ); + + if (correctOffset) { + assertEquals(baseOffset, acc.append(leaderEpoch, + singletonList("a"), + OptionalLong.of(baseOffset), + true)); + } else { + assertEquals("Wanted base offset 156, but the next offset was 157", + assertThrows(UnexpectedBaseOffsetException.class, () -> { + acc.append(leaderEpoch, + singletonList("a"), + OptionalLong.of(baseOffset - 1), + true); + }).getMessage()); + } + acc.close(); + } } From 0b56700113d679337748fb034bf6a1f797cf6e58 Mon Sep 17 00:00:00 2001 From: "Colin P. McCabe" Date: Tue, 25 Jul 2023 09:46:08 -0700 Subject: [PATCH 07/11] KafkaRaftClientTest: test KafkaRaftClient.logEndOffset --- .../src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 44704e901f685..d455956c9ba6e 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -78,6 +78,7 @@ public void testInitializeSingleMemberQuorum() throws IOException { int localId = 0; RaftClientTestContext context = new RaftClientTestContext.Builder(localId, Collections.singleton(localId)).build(); context.assertElectedLeader(1, localId); + assertEquals(context.log.endOffset().offset, context.client.logEndOffset()); } @Test From c660f5fb05deab25269eaf7858c18c29d8f040fa Mon Sep 17 00:00:00 2001 From: "Colin P. McCabe" Date: Tue, 25 Jul 2023 12:21:15 -0700 Subject: [PATCH 08/11] QuorumController: fix indentation --- .../java/org/apache/kafka/controller/QuorumController.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 4e127eb0accd7..8cbe2a6053b45 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -768,8 +768,8 @@ public Long apply(List records) { } long nextEndOffset = prevEndOffset + recordIndex; raftClient.scheduleAtomicAppend(controllerEpoch, - OptionalLong.of(prevEndOffset + 1), - records); + OptionalLong.of(prevEndOffset + 1), + records); snapshotRegistry.getOrCreateSnapshot(nextEndOffset); prevEndOffset = nextEndOffset; return nextEndOffset; From e6c8fcd2dab9d19400553a8fbfa915d59dec2076 Mon Sep 17 00:00:00 2001 From: "Colin P. McCabe" Date: Tue, 25 Jul 2023 13:09:44 -0700 Subject: [PATCH 09/11] Add test that failover happens when write offset doesn't match log --- .../kafka/controller/QuorumController.java | 7 ++++++ .../QuorumControllerIntegrationTestUtils.java | 7 ++++-- ...uorumControllerMetricsIntegrationTest.java | 23 +++++++++++++++---- 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 8cbe2a6053b45..c9f17c422a98f 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -2319,4 +2319,11 @@ Time time() { QuorumControllerMetrics controllerMetrics() { return controllerMetrics; } + + // VisibleForTesting + void setWriteOffset(long newWriteOffset) { + appendControlEvent("setWriteOffset", () -> { + this.writeOffset = newWriteOffset; + }); + } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java index aec2a4a513f0b..ff61e1dfdb28c 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.metadata.BrokerHeartbeatReply; import org.apache.kafka.metadata.BrokerRegistrationReply; import org.apache.kafka.server.common.MetadataVersion; @@ -168,14 +169,16 @@ static void createTopics( request.topics().add( new CreatableTopic(). setName(prefix + i). - setNumPartitions(-1). + setNumPartitions(1). setReplicationFactor((short) replicationFactor)); } CreateTopicsResponseData response = controller.createTopics(ANONYMOUS_CONTEXT, request, describable).get(); for (int i = 0; i < numTopics; i++) { CreatableTopicResult result = response.topics().find(prefix + i); - assertEquals((short) 0, result.errorCode()); + if (result.errorCode() != Errors.TOPIC_ALREADY_EXISTS.code()) { + assertEquals((short) 0, result.errorCode()); + } } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java index 949d63fa375ed..9dc538f4e2fcc 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java @@ -28,6 +28,8 @@ import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +41,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.createTopics; import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.forceRenounce; import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause; import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence; @@ -89,21 +92,33 @@ public void testClosingQuorumControllerClosesMetrics() throws Throwable { * Test that failing over to a new controller increments NewActiveControllersCount on both the * active and inactive controllers. */ - @Test - public void testFailingOverIncrementsNewActiveControllerCount() throws Throwable { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testFailingOverIncrementsNewActiveControllerCount( + boolean forceFailoverUsingLogLayer + ) throws Throwable { try ( LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). build(); QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). build() ) { - controlEnv.activeController(); // wait for a controller to become active. + registerBrokersAndUnfence(controlEnv.activeController(), 1); // wait for a controller to become active. TestUtils.retryOnExceptionWithTimeout(30_000, () -> { for (QuorumController controller : controlEnv.controllers()) { assertEquals(1, controller.controllerMetrics().newActiveControllers()); } }); - forceRenounce(controlEnv.activeController()); + if (forceFailoverUsingLogLayer) { + controlEnv.activeController().setWriteOffset(123L); + + TestUtils.retryOnExceptionWithTimeout(30_000, () -> { + createTopics(controlEnv.activeController(), "test_", 1, 1); + }); + } else { + // Directly call QuorumController.renounce. + forceRenounce(controlEnv.activeController()); + } TestUtils.retryOnExceptionWithTimeout(30_000, () -> { for (QuorumController controller : controlEnv.controllers()) { assertEquals(2, controller.controllerMetrics().newActiveControllers()); From ca999fb62bf74fce4c425a8f130c09f02ea05bd7 Mon Sep 17 00:00:00 2001 From: "Colin P. McCabe" Date: Wed, 26 Jul 2023 13:54:18 -0700 Subject: [PATCH 10/11] Update RaftClient JavaDoc --- raft/src/main/java/org/apache/kafka/raft/RaftClient.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java index 2d93051b78fff..485d8101c7a36 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java @@ -172,6 +172,9 @@ default void beginShutdown() {} * to resign its leadership. The state machine is expected to discard all * uncommitted entries after observing an epoch change. * + * If the current base offset does not match the supplied required base offset, + * then this method will throw {@link UnexpectedBaseOffsetException}. + * * @param epoch the current leader epoch * @param requiredBaseOffset if this is set, it is the offset we must use as the base offset. * @param records the list of records to append @@ -181,7 +184,7 @@ default void beginShutdown() {} * committed * @throws NotLeaderException if we are not the current leader or the epoch doesn't match the leader epoch * @throws BufferAllocationException we failed to allocate memory for the records - * @throws UnexpectedBaseOffsetException the requested end offset could not be obtained. + * @throws UnexpectedBaseOffsetException the requested base offset could not be obtained. */ long scheduleAtomicAppend(int epoch, OptionalLong requiredBaseOffset, List records); From fb67f9db614daa2badc44caefbdc8dabce2d4004 Mon Sep 17 00:00:00 2001 From: "Colin P. McCabe" Date: Thu, 27 Jul 2023 12:00:03 -0700 Subject: [PATCH 11/11] Add KafkaRaftClientTest.testAppendWithRequiredBaseOffset --- .../kafka/raft/KafkaRaftClientTest.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index d455956c9ba6e..c69245e972378 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -42,9 +42,11 @@ import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; import org.apache.kafka.raft.errors.BufferAllocationException; import org.apache.kafka.raft.errors.NotLeaderException; +import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mockito; import java.io.IOException; @@ -2858,4 +2860,31 @@ private static KafkaMetric getMetric(final Metrics metrics, final String name) { return metrics.metrics().get(metrics.metricName(name, "raft-metrics")); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testAppendWithRequiredBaseOffset(boolean correctOffset) throws Exception { + int localId = 0; + int otherNodeId = 1; + Set voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .build(); + context.becomeLeader(); + assertEquals(OptionalInt.of(localId), context.currentLeader()); + int epoch = context.currentEpoch(); + + if (correctOffset) { + assertEquals(1L, context.client.scheduleAtomicAppend(epoch, + OptionalLong.of(1), + singletonList("a"))); + context.deliverRequest(context.beginEpochRequest(epoch + 1, otherNodeId)); + context.pollUntilResponse(); + } else { + assertThrows(UnexpectedBaseOffsetException.class, () -> { + context.client.scheduleAtomicAppend(epoch, + OptionalLong.of(2), + singletonList("a")); + }); + } + } }