diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 199c679cb35c0..c9f17c422a98f 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -77,6 +77,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.controller.errors.ControllerExceptions; +import org.apache.kafka.controller.errors.EventHandlerExceptionInfo; import org.apache.kafka.controller.metrics.QuorumControllerMetrics; import org.apache.kafka.metadata.BrokerHeartbeatReply; import org.apache.kafka.metadata.BrokerRegistrationReply; @@ -457,44 +458,39 @@ private void handleEventEnd(String name, long startProcessingTimeNs) { controllerMetrics.updateEventQueueProcessingTime(NANOSECONDS.toMillis(deltaNs)); } - private Throwable handleEventException(String name, - OptionalLong startProcessingTimeNs, - Throwable exception) { - if (!startProcessingTimeNs.isPresent() && - ControllerExceptions.isTimeoutException(exception)) { - // If the event never started, and the exception is a timeout, increment the timed - // out metric. - controllerMetrics.incrementOperationsTimedOut(); + private Throwable handleEventException( + String name, + OptionalLong startProcessingTimeNs, + Throwable exception + ) { + OptionalLong deltaUs; + if (startProcessingTimeNs.isPresent()) { + long endProcessingTime = time.nanoseconds(); + long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong(); + deltaUs = OptionalLong.of(MICROSECONDS.convert(deltaNs, NANOSECONDS)); + } else { + deltaUs = OptionalLong.empty(); } - Throwable externalException = - ControllerExceptions.toExternalException(exception, () -> latestController()); - if (!startProcessingTimeNs.isPresent()) { - log.error("{}: unable to start processing because of {}. Reason: {}", name, - exception.getClass().getSimpleName(), exception.getMessage()); - return externalException; + EventHandlerExceptionInfo info = EventHandlerExceptionInfo. + fromInternal(exception, () -> latestController()); + int epoch = curClaimEpoch; + if (epoch == -1) { + epoch = lastCommittedEpoch; } - long endProcessingTime = time.nanoseconds(); - long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong(); - long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS); - if (ControllerExceptions.isExpected(exception)) { - log.info("{}: failed with {} in {} us. Reason: {}", name, - exception.getClass().getSimpleName(), deltaUs, exception.getMessage()); - return externalException; + String failureMessage = info.failureMessage(epoch, deltaUs, + isActiveController(), lastCommittedOffset); + if (info.isTimeoutException() && (!deltaUs.isPresent())) { + controllerMetrics.incrementOperationsTimedOut(); } - if (isActiveController()) { - nonFatalFaultHandler.handleFault(String.format("%s: failed with unexpected server " + - "exception %s at epoch %d in %d us. Renouncing leadership and reverting " + - "to the last committed offset %d.", - name, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs, - lastCommittedOffset), exception); - renounce(); + if (info.isFault()) { + nonFatalFaultHandler.handleFault(name + ": " + failureMessage, exception); } else { - nonFatalFaultHandler.handleFault(String.format("%s: failed with unexpected server " + - "exception %s in %d us. The controller is already in standby mode.", - name, exception.getClass().getSimpleName(), deltaUs), - exception); + log.info("{}: {}", name, failureMessage); + } + if (info.causesFailover() && isActiveController()) { + renounce(); } - return externalException; + return info.effectiveExternalException(); } private long updateEventStartMetricsAndGetTime(OptionalLong eventCreatedTimeNs) { @@ -755,22 +751,28 @@ public Long apply(List records) { // Start by trying to apply the record to our in-memory state. This should always // succeed; if it does not, that's a fatal error. It is important to do this before // scheduling the record for Raft replication. - int i = 1; + int recordIndex = 0; for (ApiMessageAndVersion message : records) { + long recordOffset = prevEndOffset + 1 + recordIndex; try { - replay(message.message(), Optional.empty(), prevEndOffset + records.size()); + replay(message.message(), Optional.empty(), recordOffset); } catch (Throwable e) { - String failureMessage = String.format("Unable to apply %s record, which was " + - "%d of %d record(s) in the batch following last write offset %d.", - message.message().getClass().getSimpleName(), i, records.size(), - prevEndOffset); + String failureMessage = String.format("Unable to apply %s " + + "record at offset %d on active controller, from the " + + "batch with baseOffset %d", + message.message().getClass().getSimpleName(), + recordOffset, prevEndOffset + 1); throw fatalFaultHandler.handleFault(failureMessage, e); } - i++; + recordIndex++; } - prevEndOffset = raftClient.scheduleAtomicAppend(controllerEpoch, records); - snapshotRegistry.getOrCreateSnapshot(prevEndOffset); - return prevEndOffset; + long nextEndOffset = prevEndOffset + recordIndex; + raftClient.scheduleAtomicAppend(controllerEpoch, + OptionalLong.of(prevEndOffset + 1), + records); + snapshotRegistry.getOrCreateSnapshot(nextEndOffset); + prevEndOffset = nextEndOffset; + return nextEndOffset; } }); op.processBatchEndOffset(offset); @@ -988,18 +990,20 @@ public void handleCommit(BatchReader reader) { log.debug("Replaying commits from the active node up to " + "offset {} and epoch {}.", offset, epoch); } - int i = 1; + int recordIndex = 0; for (ApiMessageAndVersion message : messages) { + long recordOffset = batch.baseOffset() + recordIndex; try { - replay(message.message(), Optional.empty(), offset); + replay(message.message(), Optional.empty(), recordOffset); } catch (Throwable e) { - String failureMessage = String.format("Unable to apply %s record on standby " + - "controller, which was %d of %d record(s) in the batch with baseOffset %d.", - message.message().getClass().getSimpleName(), i, messages.size(), - batch.baseOffset()); + String failureMessage = String.format("Unable to apply %s " + + "record at offset %d on standby controller, from the " + + "batch with baseOffset %d", + message.message().getClass().getSimpleName(), + recordOffset, batch.baseOffset()); throw fatalFaultHandler.handleFault(failureMessage, e); } - i++; + recordIndex++; } } @@ -1008,13 +1012,6 @@ public void handleCommit(BatchReader reader) { epoch, batch.appendTimestamp() ); - - if (offset >= raftClient.latestSnapshotId().map(OffsetAndEpoch::offset).orElse(0L)) { - oldestNonSnapshottedTimestamp = Math.min( - oldestNonSnapshottedTimestamp, - batch.appendTimestamp() - ); - } } } finally { reader.close(); @@ -1094,10 +1091,10 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) { renounce(); } } else if (newLeader.isLeader(nodeId)) { - log.info("Becoming the active controller at epoch {}, committed offset {}, " + - "committed epoch {}", newLeader.epoch(), lastCommittedOffset, - lastCommittedEpoch); - claim(newLeader.epoch()); + long newLastWriteOffset = raftClient.logEndOffset() - 1; + log.info("Becoming the active controller at epoch {}, last write offset {}.", + newLeader.epoch(), newLastWriteOffset); + claim(newLeader.epoch(), newLastWriteOffset); } else { log.info("In the new epoch {}, the leader is {}.", newLeader.epoch(), newLeaderName); @@ -1168,7 +1165,7 @@ private void updateWriteOffset(long offset) { } } - private void claim(int epoch) { + private void claim(int epoch, long newLastWriteOffset) { try { if (curClaimEpoch != -1) { throw new RuntimeException("Cannot claim leadership because we are already the " + @@ -1176,7 +1173,7 @@ private void claim(int epoch) { } curClaimEpoch = epoch; controllerMetrics.setActive(true); - updateWriteOffset(lastCommittedOffset); + updateWriteOffset(newLastWriteOffset); clusterControl.activate(); // Before switching to active, create an in-memory snapshot at the last committed @@ -1516,25 +1513,24 @@ private void handleFeatureControlChange() { * * @param message The metadata record * @param snapshotId The snapshotId if this record is from a snapshot - * @param batchLastOffset The offset of the last record in the log batch, or the lastContainedLogOffset - * if this record is from a snapshot, this is used along with RegisterBrokerRecord + * @param offset The offset of the record */ - private void replay(ApiMessage message, Optional snapshotId, long batchLastOffset) { + private void replay(ApiMessage message, Optional snapshotId, long offset) { if (log.isTraceEnabled()) { if (snapshotId.isPresent()) { log.trace("Replaying snapshot {} record {}", Snapshots.filenameFromSnapshotId(snapshotId.get()), recordRedactor.toLoggableString(message)); } else { - log.trace("Replaying log record {} with batchLastOffset {}", - recordRedactor.toLoggableString(message), batchLastOffset); + log.trace("Replaying log record {} with offset {}", + recordRedactor.toLoggableString(message), offset); } } logReplayTracker.replay(message); MetadataRecordType type = MetadataRecordType.fromId(message.apiKey()); switch (type) { case REGISTER_BROKER_RECORD: - clusterControl.replay((RegisterBrokerRecord) message, batchLastOffset); + clusterControl.replay((RegisterBrokerRecord) message, offset); break; case UNREGISTER_BROKER_RECORD: clusterControl.replay((UnregisterBrokerRecord) message); @@ -1767,11 +1763,6 @@ private void resetToEmptyState() { */ private long writeOffset; - /** - * Timestamp for the oldest record that was committed but not included in a snapshot. - */ - private long oldestNonSnapshottedTimestamp = Long.MAX_VALUE; - /** * How long to delay partition leader balancing operations. */ @@ -2328,4 +2319,11 @@ Time time() { QuorumControllerMetrics controllerMetrics() { return controllerMetrics; } + + // VisibleForTesting + void setWriteOffset(long newWriteOffset) { + appendControlEvent("setWriteOffset", () -> { + this.writeOffset = newWriteOffset; + }); + } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java b/metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java index 6f99ea45f6b5d..b7e74446a4b6b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java +++ b/metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java @@ -17,18 +17,11 @@ package org.apache.kafka.controller.errors; -import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.NotControllerException; -import org.apache.kafka.common.errors.PolicyViolationException; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.errors.UnknownServerException; -import org.apache.kafka.raft.errors.NotLeaderException; -import org.apache.kafka.server.mutable.BoundedListTooLongException; import java.util.OptionalInt; import java.util.concurrent.ExecutionException; -import java.util.concurrent.RejectedExecutionException; -import java.util.function.Supplier; public class ControllerExceptions { @@ -93,58 +86,4 @@ public static NotControllerException newWrongControllerException(OptionalInt con return new NotControllerException("No controller appears to be active."); } } - - /** - * Determine if an exception is expected. Unexpected exceptions trigger controller failovers - * when they are raised. - * - * @param exception The exception. - * @return True if the exception is expected. - */ - public static boolean isExpected(Throwable exception) { - if (exception instanceof ApiException) { - // ApiExceptions indicate errors that should be returned to the user. - return true; - } else if (exception instanceof NotLeaderException) { - // NotLeaderException is thrown if we try to append records, but are not the leader. - return true; - } else if (exception instanceof RejectedExecutionException) { - // This can happen when the controller is shutting down. - return true; - } else if (exception instanceof BoundedListTooLongException) { - // This can happen if we tried to create too many records. - return true; - } else if (exception instanceof InterruptedException) { - // Interrupted exceptions are not expected. They might happen during junit tests if - // the test gets stuck and must be terminated by sending IE to all the threads. - return false; - } - // Other exceptions are unexpected. - return false; - } - - /** - * Translate an internal controller exception to its external equivalent. - * - * @param exception The internal exception. - * @return Its external equivalent. - */ - public static Throwable toExternalException( - Throwable exception, - Supplier latestControllerSupplier - ) { - if (exception instanceof ApiException) { - return exception; - } else if (exception instanceof NotLeaderException) { - return newWrongControllerException(latestControllerSupplier.get()); - } else if (exception instanceof RejectedExecutionException) { - return new TimeoutException("The controller is shutting down.", exception); - } else if (exception instanceof BoundedListTooLongException) { - return new PolicyViolationException("Unable to perform excessively large batch " + - "operation."); - } else if (exception instanceof InterruptedException) { - return new UnknownServerException("The controller was interrupted."); - } - return new UnknownServerException(exception); - } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java b/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java new file mode 100644 index 0000000000000..d75c4544eae2b --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller.errors; + +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.NotControllerException; +import org.apache.kafka.common.errors.PolicyViolationException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.raft.errors.NotLeaderException; +import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException; +import org.apache.kafka.server.mutable.BoundedListTooLongException; + +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.RejectedExecutionException; +import java.util.function.Supplier; + + +public final class EventHandlerExceptionInfo { + /** + * True if this exception should be treated as a fault, and tracked via the metadata errors + * metric. + */ + private final boolean isFault; + + /** + * True if this exception should cause a controller failover. + * All faults cause failover + */ + private final boolean causesFailover; + + /** + * The internal exception. + */ + private final Throwable internalException; + + /** + * The exception to present to RPC callers, or Optional.empty if the internal exception should + * be presented directly. + */ + private final Optional externalException; + + /** + * Create an EventHandlerExceptionInfo object from an internal exception. + * + * @param internal The internal exception. + * @param latestControllerSupplier A function we can call to obtain the latest leader id. + * + * @return The new immutable info object. + */ + public static EventHandlerExceptionInfo fromInternal( + Throwable internal, + Supplier latestControllerSupplier + ) { + if (internal instanceof ApiException) { + // This exception is a standard API error response from the controller, which can pass + // through without modification. + return new EventHandlerExceptionInfo(false, false, internal); + } else if (internal instanceof NotLeaderException) { + // The controller has lost leadership. + return new EventHandlerExceptionInfo(false, true, internal, + ControllerExceptions.newWrongControllerException(latestControllerSupplier.get())); + } else if (internal instanceof RejectedExecutionException) { + // The controller event queue is shutting down. + return new EventHandlerExceptionInfo(false, false, internal, + new TimeoutException("The controller is shutting down.", internal)); + } else if (internal instanceof BoundedListTooLongException) { + // The operation could not be performed because it would have created an overly large + // batch. + return new EventHandlerExceptionInfo(false, false, internal, + new PolicyViolationException("Unable to perform excessively large batch " + + "operation.")); + } else if (internal instanceof UnexpectedBaseOffsetException) { + // The active controller picked the wrong end offset for its next batch. It must now + // fail over. This should be pretty rare. + return new EventHandlerExceptionInfo(false, true, internal, + new NotControllerException("Unexpected end offset. Controller will resign.")); + } else if (internal instanceof InterruptedException) { + // The controller event queue has been interrupted. This normally only happens during + // a JUnit test that has hung. The test framework sometimes sends an InterruptException + // to all threads to try to get them to shut down. This isn't the correct way to shut + // the test, but it may happen if something hung. + return new EventHandlerExceptionInfo(true, true, internal, + new UnknownServerException("The controller was interrupted.")); + } else { + // This is the catch-all case for things that aren't supposed to happen. Null pointer + // exceptions, illegal argument exceptions, etc. They get translated into an + // UnknownServerException and a controller failover. + return new EventHandlerExceptionInfo(true, true, internal, + new UnknownServerException(internal)); + } + } + + /** + * Returns true if the class and message fields match for two exceptions. Handles nulls. + */ + static boolean exceptionClassesAndMessagesMatch(Throwable a, Throwable b) { + if (a == null) return b == null; + if (b == null) return false; + if (!a.getClass().equals(b.getClass())) return false; + if (!Objects.equals(a.getMessage(), b.getMessage())) return false; + return true; + } + + EventHandlerExceptionInfo( + boolean isFault, + boolean causesFailover, + Throwable internalException + ) { + this.isFault = isFault; + this.causesFailover = causesFailover; + this.internalException = internalException; + this.externalException = Optional.empty(); + } + + EventHandlerExceptionInfo( + boolean isFault, + boolean causesFailover, + Throwable internalException, + Throwable externalException + ) { + this.isFault = isFault; + this.causesFailover = causesFailover; + this.internalException = internalException; + this.externalException = Optional.of(externalException); + } + + public boolean isFault() { + return isFault; + } + + public boolean causesFailover() { + return causesFailover; + } + + public Throwable effectiveExternalException() { + return externalException.orElse(internalException); + } + + public boolean isTimeoutException() { + return internalException instanceof TimeoutException; + } + + public String failureMessage( + int epoch, + OptionalLong deltaUs, + boolean isActiveController, + long lastCommittedOffset + ) { + StringBuilder bld = new StringBuilder(); + if (deltaUs.isPresent()) { + bld.append("event failed with "); + } else { + bld.append("event unable to start processing because of "); + } + bld.append(internalException.getClass().getSimpleName()); + if (externalException.isPresent()) { + bld.append(" (treated as "). + append(externalException.get().getClass().getSimpleName()).append(")"); + } + if (causesFailover()) { + bld.append(" at epoch ").append(epoch); + } + if (deltaUs.isPresent()) { + bld.append(" in ").append(deltaUs.getAsLong()).append(" microseconds"); + } + if (causesFailover()) { + if (isActiveController) { + bld.append(". Renouncing leadership and reverting to the last committed offset "); + bld.append(lastCommittedOffset); + } else { + bld.append(". The controller is already in standby mode"); + } + } + bld.append("."); + return bld.toString(); + } + + @Override + public int hashCode() { + return Objects.hash(isFault, + causesFailover, + internalException.getClass().getCanonicalName(), + internalException.getMessage(), + externalException.orElse(internalException).getClass().getCanonicalName(), + externalException.orElse(internalException).getMessage()); + } + + @Override + public boolean equals(Object o) { + if (o == null || !(o.getClass().equals(EventHandlerExceptionInfo.class))) return false; + EventHandlerExceptionInfo other = (EventHandlerExceptionInfo) o; + return isFault == other.isFault && + causesFailover == other.causesFailover && + exceptionClassesAndMessagesMatch(internalException, other.internalException) && + exceptionClassesAndMessagesMatch(externalException.orElse(null), + other.externalException.orElse(null)); + } + + @Override + public String toString() { + return "EventHandlerExceptionInfo" + + "(isFault=" + isFault + + ", causesFailover=" + causesFailover + + ", internalException.class=" + internalException.getClass().getCanonicalName() + + ", externalException.class=" + (externalException.isPresent() ? + externalException.get().getClass().getCanonicalName() : "(none)") + + ")"; + } +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java index 6f82d915d8d5d..0ccf39a85ac8f 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java @@ -34,6 +34,7 @@ import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.List; +import java.util.OptionalLong; import static org.apache.kafka.raft.KafkaRaftClient.MAX_BATCH_SIZE_BYTES; @@ -61,11 +62,11 @@ private BatchFileWriter( } public void append(ApiMessageAndVersion apiMessageAndVersion) { - batchAccumulator.append(0, Collections.singletonList(apiMessageAndVersion)); + batchAccumulator.append(0, Collections.singletonList(apiMessageAndVersion), OptionalLong.empty(), false); } public void append(List messageBatch) { - batchAccumulator.append(0, messageBatch); + batchAccumulator.append(0, messageBatch, OptionalLong.empty(), false); } public void close() throws IOException { diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java index aec2a4a513f0b..ff61e1dfdb28c 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.metadata.BrokerHeartbeatReply; import org.apache.kafka.metadata.BrokerRegistrationReply; import org.apache.kafka.server.common.MetadataVersion; @@ -168,14 +169,16 @@ static void createTopics( request.topics().add( new CreatableTopic(). setName(prefix + i). - setNumPartitions(-1). + setNumPartitions(1). setReplicationFactor((short) replicationFactor)); } CreateTopicsResponseData response = controller.createTopics(ANONYMOUS_CONTEXT, request, describable).get(); for (int i = 0; i < numTopics; i++) { CreatableTopicResult result = response.topics().find(prefix + i); - assertEquals((short) 0, result.errorCode()); + if (result.errorCode() != Errors.TOPIC_ALREADY_EXISTS.code()) { + assertEquals((short) 0, result.errorCode()); + } } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java index 949d63fa375ed..9dc538f4e2fcc 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java @@ -28,6 +28,8 @@ import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +41,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.createTopics; import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.forceRenounce; import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause; import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence; @@ -89,21 +92,33 @@ public void testClosingQuorumControllerClosesMetrics() throws Throwable { * Test that failing over to a new controller increments NewActiveControllersCount on both the * active and inactive controllers. */ - @Test - public void testFailingOverIncrementsNewActiveControllerCount() throws Throwable { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testFailingOverIncrementsNewActiveControllerCount( + boolean forceFailoverUsingLogLayer + ) throws Throwable { try ( LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). build(); QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). build() ) { - controlEnv.activeController(); // wait for a controller to become active. + registerBrokersAndUnfence(controlEnv.activeController(), 1); // wait for a controller to become active. TestUtils.retryOnExceptionWithTimeout(30_000, () -> { for (QuorumController controller : controlEnv.controllers()) { assertEquals(1, controller.controllerMetrics().newActiveControllers()); } }); - forceRenounce(controlEnv.activeController()); + if (forceFailoverUsingLogLayer) { + controlEnv.activeController().setWriteOffset(123L); + + TestUtils.retryOnExceptionWithTimeout(30_000, () -> { + createTopics(controlEnv.activeController(), "test_", 1, 1); + }); + } else { + // Directly call QuorumController.renounce. + forceRenounce(controlEnv.activeController()); + } TestUtils.retryOnExceptionWithTimeout(30_000, () -> { for (QuorumController controller : controlEnv.controllers()) { assertEquals(2, controller.controllerMetrics().newActiveControllers()); diff --git a/metadata/src/test/java/org/apache/kafka/controller/errors/ControllerExceptionsTest.java b/metadata/src/test/java/org/apache/kafka/controller/errors/ControllerExceptionsTest.java index 81c234491b558..2d1905b1a5c1c 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/errors/ControllerExceptionsTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/errors/ControllerExceptionsTest.java @@ -20,20 +20,15 @@ import org.apache.kafka.common.errors.NotControllerException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicExistsException; -import org.apache.kafka.common.errors.UnknownServerException; -import org.apache.kafka.raft.errors.NotLeaderException; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import java.util.OptionalInt; import java.util.concurrent.ExecutionException; -import java.util.concurrent.RejectedExecutionException; -import static org.apache.kafka.controller.errors.ControllerExceptions.isExpected; import static org.apache.kafka.controller.errors.ControllerExceptions.isTimeoutException; import static org.apache.kafka.controller.errors.ControllerExceptions.newPreMigrationException; import static org.apache.kafka.controller.errors.ControllerExceptions.newWrongControllerException; -import static org.apache.kafka.controller.errors.ControllerExceptions.toExternalException; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -99,32 +94,7 @@ public void testNewWrongControllerExceptionWithActiveController() { newWrongControllerException(OptionalInt.of(1))); } - @Test - public void testApiExceptionIsExpected() { - assertTrue(isExpected(new TopicExistsException(""))); - } - - @Test - public void testNotLeaderExceptionIsExpected() { - assertTrue(isExpected(new NotLeaderException(""))); - } - - @Test - public void testRejectedExecutionExceptionIsExpected() { - assertTrue(isExpected(new RejectedExecutionException())); - } - - @Test - public void testInterruptedExceptionIsNotExpected() { - assertFalse(isExpected(new InterruptedException())); - } - - @Test - public void testRuntimeExceptionIsNotExpected() { - assertFalse(isExpected(new NullPointerException())); - } - - private static void assertExceptionsMatch(Throwable a, Throwable b) { + static void assertExceptionsMatch(Throwable a, Throwable b) { assertEquals(a.getClass(), b.getClass()); assertEquals(a.getMessage(), b.getMessage()); if (a.getCause() != null) { @@ -134,40 +104,4 @@ private static void assertExceptionsMatch(Throwable a, Throwable b) { assertNull(b.getCause()); } } - - @Test - public void testApiExceptionToExternalException() { - assertExceptionsMatch(new TopicExistsException("Topic foo exists"), - toExternalException(new TopicExistsException("Topic foo exists"), - () -> OptionalInt.of(1))); - } - - @Test - public void testNotLeaderExceptionToExternalException() { - assertExceptionsMatch(new NotControllerException("The active controller appears to be node 1."), - toExternalException(new NotLeaderException("Append failed because the given epoch 123 is stale."), - () -> OptionalInt.of(1))); - } - - @Test - public void testRejectedExecutionExceptionToExternalException() { - assertExceptionsMatch(new TimeoutException("The controller is shutting down.", - new RejectedExecutionException("The event queue is shutting down")), - toExternalException(new RejectedExecutionException("The event queue is shutting down"), - () -> OptionalInt.empty())); - } - - @Test - public void testInterruptedExceptionToExternalException() { - assertExceptionsMatch(new UnknownServerException("The controller was interrupted."), - toExternalException(new InterruptedException(), - () -> OptionalInt.empty())); - } - - @Test - public void testRuntimeExceptionToExternalException() { - assertExceptionsMatch(new UnknownServerException(new NullPointerException("Null pointer exception")), - toExternalException(new NullPointerException("Null pointer exception"), - () -> OptionalInt.empty())); - } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java b/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java new file mode 100644 index 0000000000000..bacb54b1b5082 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller.errors; + +import org.apache.kafka.common.errors.NotControllerException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.raft.errors.NotLeaderException; +import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.RejectedExecutionException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@Timeout(value = 40) +public class EventHandlerExceptionInfoTest { + private static final EventHandlerExceptionInfo TOPIC_EXISTS = + EventHandlerExceptionInfo.fromInternal( + new TopicExistsException("Topic exists."), + () -> OptionalInt.empty()); + + private static final EventHandlerExceptionInfo REJECTED_EXECUTION = + EventHandlerExceptionInfo.fromInternal( + new RejectedExecutionException(), + () -> OptionalInt.empty()); + + private static final EventHandlerExceptionInfo INTERRUPTED = + EventHandlerExceptionInfo.fromInternal( + new InterruptedException(), + () -> OptionalInt.of(1)); + + private static final EventHandlerExceptionInfo NULL_POINTER = + EventHandlerExceptionInfo.fromInternal( + new NullPointerException(), + () -> OptionalInt.of(1)); + + private static final EventHandlerExceptionInfo NOT_LEADER = + EventHandlerExceptionInfo.fromInternal( + new NotLeaderException("Append failed"), + () -> OptionalInt.of(2)); + + private static final EventHandlerExceptionInfo UNEXPECTED_END_OFFSET = + EventHandlerExceptionInfo.fromInternal( + new UnexpectedBaseOffsetException("Wanted base offset 3, but the next offset was 4"), + () -> OptionalInt.of(1)); + + private static final EventHandlerExceptionInfo TIMEOUT = + EventHandlerExceptionInfo.fromInternal( + new TimeoutException(), + () -> OptionalInt.of(1)); + + @Test + public void testTopicExistsExceptionInfo() { + assertEquals(new EventHandlerExceptionInfo(false, false, + new TopicExistsException("Topic exists.")), + TOPIC_EXISTS); + } + + @Test + public void testTopicExistsExceptionFailureMessage() { + assertEquals("event failed with TopicExistsException in 234 microseconds.", + TOPIC_EXISTS.failureMessage(123, OptionalLong.of(234L), true, 456L)); + } + + @Test + public void testRejectedExecutionExceptionInfo() { + assertEquals(new EventHandlerExceptionInfo(false, false, + new RejectedExecutionException(), + new TimeoutException("The controller is shutting down.", new RejectedExecutionException())), + REJECTED_EXECUTION); + } + + @Test + public void testRejectedExecutionExceptionFailureMessage() { + assertEquals("event unable to start processing because of RejectedExecutionException (treated " + + "as TimeoutException).", + REJECTED_EXECUTION.failureMessage(123, OptionalLong.empty(), true, 456L)); + } + + @Test + public void testInterruptedExceptionInfo() { + assertEquals(new EventHandlerExceptionInfo(true, true, + new InterruptedException(), + new UnknownServerException("The controller was interrupted.")), + INTERRUPTED); + } + + @Test + public void testInterruptedExceptionFailureMessageWhenActive() { + assertEquals("event unable to start processing because of InterruptedException (treated as " + + "UnknownServerException) at epoch 123. Renouncing leadership and reverting to the " + + "last committed offset 456.", + INTERRUPTED.failureMessage(123, OptionalLong.empty(), true, 456L)); + } + + @Test + public void testInterruptedExceptionFailureMessageWhenInactive() { + assertEquals("event unable to start processing because of InterruptedException (treated as " + + "UnknownServerException) at epoch 123. The controller is already in standby mode.", + INTERRUPTED.failureMessage(123, OptionalLong.empty(), false, 456L)); + } + + @Test + public void testNullPointerExceptionInfo() { + assertEquals(new EventHandlerExceptionInfo(true, true, + new NullPointerException(), + new UnknownServerException(new NullPointerException())), + NULL_POINTER); + } + + @Test + public void testNullPointerExceptionFailureMessageWhenActive() { + assertEquals("event failed with NullPointerException (treated as UnknownServerException) " + + "at epoch 123 in 40 microseconds. Renouncing leadership and reverting to the last " + + "committed offset 456.", + NULL_POINTER.failureMessage(123, OptionalLong.of(40L), true, 456L)); + } + + @Test + public void testNullPointerExceptionFailureMessageWhenInactive() { + assertEquals("event failed with NullPointerException (treated as UnknownServerException) " + + "at epoch 123 in 40 microseconds. The controller is already in standby mode.", + NULL_POINTER.failureMessage(123, OptionalLong.of(40L), false, 456L)); + } + + @Test + public void testNotLeaderExceptionInfo() { + assertEquals(new EventHandlerExceptionInfo(false, true, + new NotLeaderException("Append failed"), + new NotControllerException("The active controller appears to be node 2.")), + NOT_LEADER); + } + + @Test + public void testNotLeaderExceptionFailureMessage() { + assertEquals("event unable to start processing because of NotLeaderException (treated as " + + "NotControllerException) at epoch 123. Renouncing leadership and reverting to the " + + "last committed offset 456.", + NOT_LEADER.failureMessage(123, OptionalLong.empty(), true, 456L)); + } + + @Test + public void testUnexpectedBaseOffsetExceptionInfo() { + assertEquals(new EventHandlerExceptionInfo(false, true, + new UnexpectedBaseOffsetException("Wanted base offset 3, but the next offset was 4"), + new NotControllerException("Unexpected end offset. Controller will resign.")), + UNEXPECTED_END_OFFSET); + } + + @Test + public void testUnexpectedBaseOffsetFailureMessage() { + assertEquals("event failed with UnexpectedBaseOffsetException (treated as " + + "NotControllerException) at epoch 123 in 90 microseconds. Renouncing leadership " + + "and reverting to the last committed offset 456.", + UNEXPECTED_END_OFFSET.failureMessage(123, OptionalLong.of(90L), true, 456L)); + } + + @Test + public void testIsNotTimeoutException() { + assertFalse(TOPIC_EXISTS.isTimeoutException()); + assertFalse(REJECTED_EXECUTION.isTimeoutException()); + assertFalse(INTERRUPTED.isTimeoutException()); + assertFalse(NULL_POINTER.isTimeoutException()); + assertFalse(NOT_LEADER.isTimeoutException()); + assertFalse(UNEXPECTED_END_OFFSET.isTimeoutException()); + } + + @Test + public void testIsTimeoutException() { + assertTrue(TIMEOUT.isTimeoutException()); + } +} diff --git a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java index be4285f6bbf3e..e018424cc5317 100644 --- a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java @@ -82,7 +82,11 @@ public long scheduleAppend(int epoch, List records) { } @Override - public long scheduleAtomicAppend(int epoch, List records) { + public long scheduleAtomicAppend( + int epoch, + OptionalLong requiredEndOffset, + List records + ) { return 0; } @@ -119,6 +123,11 @@ public Optional latestSnapshotId() { } } + @Override + public long logEndOffset() { + return 0; + } + @Override public void close() throws Exception { // nothing to do diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java index 46f99db5d1d1e..c33cdf2a1b65d 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java @@ -32,6 +32,7 @@ import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.RaftClient; import org.apache.kafka.raft.errors.NotLeaderException; +import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException; import org.apache.kafka.raft.internals.MemoryBatchReader; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.snapshot.MockRawSnapshotReader; @@ -221,13 +222,26 @@ synchronized void unregisterLogManager(LocalLogManager logManager) { } } - synchronized long tryAppend(int nodeId, int epoch, List batch) { + synchronized long tryAppend( + int nodeId, + int epoch, + OptionalLong requiredBaseOffset, + List batch + ) { // No easy access to the concept of time. Use the base offset as the append timestamp long appendTimestamp = (prevOffset + 1) * 10; - return tryAppend(nodeId, epoch, new LocalRecordBatch(epoch, appendTimestamp, batch)); - } - - synchronized long tryAppend(int nodeId, int epoch, LocalBatch batch) { + return tryAppend(nodeId, + epoch, + requiredBaseOffset, + new LocalRecordBatch(epoch, appendTimestamp, batch)); + } + + synchronized long tryAppend( + int nodeId, + int epoch, + OptionalLong requiredBaseOffset, + LocalBatch batch + ) { if (!leader.isLeader(nodeId)) { log.debug("tryAppend(nodeId={}, epoch={}): the given node id does not " + "match the current leader id of {}.", nodeId, epoch, leader.leaderId()); @@ -243,15 +257,24 @@ synchronized long tryAppend(int nodeId, int epoch, LocalBatch batch) { } log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch); - long offset = append(batch); + long offset = append(requiredBaseOffset, batch); electLeaderIfNeeded(); return offset; } - public synchronized long append(LocalBatch batch) { - prevOffset += batch.size(); - log.debug("append(batch={}, prevOffset={})", batch, prevOffset); - batches.put(prevOffset, batch); + public synchronized long append( + OptionalLong requiredBaseOffset, + LocalBatch batch + ) { + long nextEndOffset = prevOffset + batch.size(); + requiredBaseOffset.ifPresent(r -> { + if (r != prevOffset + 1) { + throw new UnexpectedBaseOffsetException("Wanted base offset " + r + + ", but the next offset was " + nextEndOffset); + } + }); + log.debug("append(batch={}, nextEndOffset={})", batch, nextEndOffset); + batches.put(nextEndOffset, batch); if (batch instanceof LeaderChangeBatch) { LeaderChangeBatch leaderChangeBatch = (LeaderChangeBatch) batch; leader = leaderChangeBatch.newLeader; @@ -259,7 +282,8 @@ public synchronized long append(LocalBatch batch) { for (LocalLogManager logManager : logManagers.values()) { logManager.scheduleLogCheck(); } - return prevOffset; + prevOffset = nextEndOffset; + return nextEndOffset; } synchronized void electLeaderIfNeeded() { @@ -274,7 +298,7 @@ synchronized void electLeaderIfNeeded() { } LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(nextLeaderNode), leader.epoch() + 1); log.info("Elected new leader: {}.", newLeader); - append(new LeaderChangeBatch(newLeader)); + append(OptionalLong.empty(), new LeaderChangeBatch(newLeader)); } synchronized LeaderAndEpoch leaderAndEpoch() { @@ -737,7 +761,9 @@ public long scheduleAppend(int epoch, List batch) { OptionalLong firstOffset = first .stream() - .mapToLong(record -> scheduleAtomicAppend(epoch, Collections.singletonList(record))) + .mapToLong(record -> scheduleAtomicAppend(epoch, + OptionalLong.empty(), + Collections.singletonList(record))) .max(); if (firstOffset.isPresent() && resignAfterNonAtomicCommit.getAndSet(false)) { @@ -749,15 +775,24 @@ public long scheduleAppend(int epoch, List batch) { } else { return second .stream() - .mapToLong(record -> scheduleAtomicAppend(epoch, Collections.singletonList(record))) + .mapToLong(record -> scheduleAtomicAppend(epoch, + OptionalLong.empty(), + Collections.singletonList(record))) .max() .getAsLong(); } } @Override - public long scheduleAtomicAppend(int epoch, List batch) { - return shared.tryAppend(nodeId, leader.epoch(), batch); + public long scheduleAtomicAppend( + int epoch, + OptionalLong requiredEndOffset, + List batch + ) { + if (batch.isEmpty()) { + throw new IllegalArgumentException("Batch cannot be empty"); + } + return shared.tryAppend(nodeId, leader.epoch(), requiredEndOffset, batch); } @Override @@ -784,7 +819,10 @@ public void resign(int epoch) { LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), currentEpoch + 1); try { - shared.tryAppend(nodeId, currentEpoch, new LeaderChangeBatch(nextLeader)); + shared.tryAppend(nodeId, + currentEpoch, + OptionalLong.empty(), + new LeaderChangeBatch(nextLeader)); } catch (NotLeaderException exp) { // the leader epoch has already advanced. resign is a no op. log.debug("Ignoring call to resign from epoch {}. Either we are not the leader or the provided epoch is " + @@ -822,6 +860,11 @@ public synchronized Optional latestSnapshotId() { return shared.latestSnapshotId(); } + @Override + public synchronized long logEndOffset() { + return shared.prevOffset + 1; + } + @Override public LeaderAndEpoch leaderAndEpoch() { return leader; diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java index d72b7557b48e0..c3a7283ab9edc 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.Optional; import java.util.OptionalInt; +import java.util.OptionalLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -155,11 +156,12 @@ public List allRecords() { */ public void appendInitialRecords(List records) { int initialLeaderEpoch = 1; - shared.append(new LeaderChangeBatch( + shared.append(OptionalLong.empty(), new LeaderChangeBatch( new LeaderAndEpoch(OptionalInt.empty(), initialLeaderEpoch + 1))); - shared.append(new LocalRecordBatch(initialLeaderEpoch + 1, 0, records)); - shared.append(new LeaderChangeBatch( - new LeaderAndEpoch(OptionalInt.of(0), initialLeaderEpoch + 2))); + shared.append(OptionalLong.empty(), + new LocalRecordBatch(initialLeaderEpoch + 1, 0, records)); + shared.append(OptionalLong.empty(), + new LeaderChangeBatch(new LeaderAndEpoch(OptionalInt.of(0), initialLeaderEpoch + 2))); } public String clusterId() { diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 40c6a69590804..28ba2f8d83f53 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -2287,27 +2287,22 @@ public void poll() { @Override public long scheduleAppend(int epoch, List records) { - return append(epoch, records, false); + return append(epoch, records, OptionalLong.empty(), false); } @Override - public long scheduleAtomicAppend(int epoch, List records) { - return append(epoch, records, true); + public long scheduleAtomicAppend(int epoch, OptionalLong requiredBaseOffset, List records) { + return append(epoch, records, requiredBaseOffset, true); } - private long append(int epoch, List records, boolean isAtomic) { + private long append(int epoch, List records, OptionalLong requiredBaseOffset, boolean isAtomic) { LeaderState leaderState = quorum.maybeLeaderState().orElseThrow( () -> new NotLeaderException("Append failed because the replication is not the current leader") ); BatchAccumulator accumulator = leaderState.accumulator(); boolean isFirstAppend = accumulator.isEmpty(); - final long offset; - if (isAtomic) { - offset = accumulator.appendAtomic(epoch, records); - } else { - offset = accumulator.append(epoch, records); - } + final long offset = accumulator.append(epoch, records, requiredBaseOffset, isAtomic); // Wakeup the network channel if either this is the first append // or the accumulator is ready to drain now. Checking for the first @@ -2399,6 +2394,11 @@ public Optional latestSnapshotId() { return log.latestSnapshotId(); } + @Override + public long logEndOffset() { + return log.endOffset().offset; + } + @Override public void close() { log.flush(true); @@ -2570,10 +2570,10 @@ private void fireHandleCommit(long baseOffset, Records records) { /** * This API is used for committed records originating from {@link #scheduleAppend(int, List)} - * or {@link #scheduleAtomicAppend(int, List)} on this instance. In this case, we are able to - * save the original record objects, which saves the need to read them back from disk. This is - * a nice optimization for the leader which is typically doing more work than all of the - * followers. + * or {@link #scheduleAtomicAppend(int, OptionalLong, List)} on this instance. In this case, + * we are able to save the original record objects, which saves the need to read them back + * from disk. This is a nice optimization for the leader which is typically doing more work + * than all of the * followers. */ private void fireHandleCommit( long baseOffset, diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java index 6422fb5f3a095..485d8101c7a36 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java @@ -18,6 +18,7 @@ import org.apache.kafka.raft.errors.BufferAllocationException; import org.apache.kafka.raft.errors.NotLeaderException; +import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException; import org.apache.kafka.snapshot.SnapshotReader; import org.apache.kafka.snapshot.SnapshotWriter; @@ -36,10 +37,10 @@ interface Listener { * after consuming the reader. * * Note that there is not a one-to-one correspondence between writes through - * {@link #scheduleAppend(int, List)} or {@link #scheduleAtomicAppend(int, List)} + * {@link #scheduleAppend(int, List)} or {@link #scheduleAtomicAppend(int, OptionalLong, List)} * and this callback. The Raft implementation is free to batch together the records * from multiple append calls provided that batch boundaries are respected. Records - * specified through {@link #scheduleAtomicAppend(int, List)} are guaranteed to be a + * specified through {@link #scheduleAtomicAppend(int, OptionalLong, List)} are guaranteed to be a * subset of a batch provided by the {@link BatchReader}. Records specified through * {@link #scheduleAppend(int, List)} are guaranteed to be in the same order but * they can map to any number of batches provided by the {@link BatchReader}. @@ -171,7 +172,11 @@ default void beginShutdown() {} * to resign its leadership. The state machine is expected to discard all * uncommitted entries after observing an epoch change. * + * If the current base offset does not match the supplied required base offset, + * then this method will throw {@link UnexpectedBaseOffsetException}. + * * @param epoch the current leader epoch + * @param requiredBaseOffset if this is set, it is the offset we must use as the base offset. * @param records the list of records to append * @return the expected offset of the last record if append succeed * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if the size of the records is greater than the maximum @@ -179,8 +184,9 @@ default void beginShutdown() {} * committed * @throws NotLeaderException if we are not the current leader or the epoch doesn't match the leader epoch * @throws BufferAllocationException we failed to allocate memory for the records + * @throws UnexpectedBaseOffsetException the requested base offset could not be obtained. */ - long scheduleAtomicAppend(int epoch, List records); + long scheduleAtomicAppend(int epoch, OptionalLong requiredBaseOffset, List records); /** * Attempt a graceful shutdown of the client. This allows the leader to proactively @@ -241,4 +247,12 @@ default void beginShutdown() {} * @return the id of the latest snapshot, if it exists */ Optional latestSnapshotId(); + + /** + * Returns the current end of the log. This method is thread-safe. + * + * @return the log end offset, which is one greater than the offset of the last record written, + * or 0 if there have not been any records written. + */ + long logEndOffset(); } diff --git a/raft/src/main/java/org/apache/kafka/raft/errors/UnexpectedBaseOffsetException.java b/raft/src/main/java/org/apache/kafka/raft/errors/UnexpectedBaseOffsetException.java new file mode 100644 index 0000000000000..192e8b2bb66ce --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/errors/UnexpectedBaseOffsetException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.errors; + +/** + * Indicates that an append operation cannot be completed because it would have resulted in an + * unexpected base offset. + */ +public class UnexpectedBaseOffsetException extends RaftException { + private final static long serialVersionUID = 1L; + + public UnexpectedBaseOffsetException(String s) { + super(s); + } +} diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java index b84a7d57b8a72..53d7db99340c6 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.raft.errors.BufferAllocationException; import org.apache.kafka.raft.errors.NotLeaderException; +import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException; import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.common.message.LeaderChangeMessage; @@ -38,6 +39,7 @@ import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; +import java.util.OptionalLong; import java.util.function.Function; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; @@ -90,51 +92,28 @@ public BatchAccumulator( } /** - * Append a list of records into as many batches as necessary. + * Append to the accumulator. * - * The order of the elements in the records argument will match the order in the batches. - * This method will use as many batches as necessary to serialize all of the records. Since - * this method can split the records into multiple batches it is possible that some of the - * records will get committed while other will not when the leader fails. + * @param epoch The leader epoch to append at. + * @param records The records to append. + * @param requiredBaseOffset If this is non-empty, the base offset which we must use. + * @param isAtomic True if we should append the records as a single batch. + * @return The end offset. * - * @param epoch the expected leader epoch. If this does not match, then {@link NotLeaderException} - * will be thrown - * @param records the list of records to include in the batches - * @return the expected offset of the last record - * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum - * batch size; if this exception is throw some of the elements in records may have - * been committed - * @throws NotLeaderException if the epoch is less than the leader epoch - * @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch) - * @throws BufferAllocationException if we failed to allocate memory for the records - * @throws IllegalStateException if we tried to append new records after the batch has been built + * @throws NotLeaderException Indicates that an append operation cannot be completed + * because the provided leader epoch was too old. + * @throws IllegalArgumentException Indicates that an append operation cannot be completed + * because the provided leader epoch was too new. + * @throws UnexpectedBaseOffsetException Indicates that an append operation cannot + * be completed because it would have resulted + * in an unexpected base offset. */ - public long append(int epoch, List records) { - return append(epoch, records, false); - } - - /** - * Append a list of records into an atomic batch. We guarantee all records are included in the - * same underlying record batch so that either all of the records become committed or none of - * them do. - * - * @param epoch the expected leader epoch. If this does not match, then {@link NotLeaderException} - * will be thrown - * @param records the list of records to include in a batch - * @return the expected offset of the last record - * @throws RecordBatchTooLargeException if the size of the records is greater than the maximum - * batch size; if this exception is throw none of the elements in records were - * committed - * @throws NotLeaderException if the epoch is less than the leader epoch - * @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch) - * @throws BufferAllocationException if we failed to allocate memory for the records - * @throws IllegalStateException if we tried to append new records after the batch has been built - */ - public long appendAtomic(int epoch, List records) { - return append(epoch, records, true); - } - - private long append(int epoch, List records, boolean isAtomic) { + public long append( + int epoch, + List records, + OptionalLong requiredBaseOffset, + boolean isAtomic + ) { if (epoch < this.epoch) { throw new NotLeaderException("Append failed because the given epoch " + epoch + " is stale. " + "Current leader epoch = " + this.epoch()); @@ -147,6 +126,13 @@ private long append(int epoch, List records, boolean isAtomic) { appendLock.lock(); try { + long lastOffset = nextOffset + records.size() - 1; + requiredBaseOffset.ifPresent(r -> { + if (r != nextOffset) { + throw new UnexpectedBaseOffsetException("Wanted base offset " + r + + ", but the next offset was " + nextOffset); + } + }); maybeCompleteDrain(); BatchBuilder batch = null; @@ -164,12 +150,12 @@ private long append(int epoch, List records, boolean isAtomic) { } batch.appendRecord(record, serializationCache); - nextOffset += 1; } maybeResetLinger(); - return nextOffset - 1; + nextOffset = lastOffset + 1; + return lastOffset; } finally { appendLock.unlock(); } @@ -408,7 +394,9 @@ public int epoch() { * This call will not block, but the drain may require multiple attempts before * it can be completed if the thread responsible for appending is holding the * append lock. In the worst case, the append will be completed on the next - * call to {@link #append(int, List)} following the initial call to this method. + * call to {@link #append(int, List, OptionalLong, boolean)} following the + * initial call to this method. + * * The caller should respect the time to the next flush as indicated by * {@link #timeUntilDrain(long)}. * diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java index ae6202426b81d..818a346ac0aef 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java @@ -30,6 +30,7 @@ import java.util.Optional; import java.util.List; +import java.util.OptionalLong; import java.util.function.Supplier; final public class RecordsSnapshotWriter implements SnapshotWriter { @@ -184,7 +185,7 @@ public void append(List records) { throw new IllegalStateException(message); } - accumulator.append(snapshot.snapshotId().epoch(), records); + accumulator.append(snapshot.snapshotId().epoch(), records, OptionalLong.empty(), false); if (accumulator.needsDrain(time.milliseconds())) { appendBatches(accumulator.drain()); diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 7cbeb11f4f780..c69245e972378 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -42,9 +42,11 @@ import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; import org.apache.kafka.raft.errors.BufferAllocationException; import org.apache.kafka.raft.errors.NotLeaderException; +import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mockito; import java.io.IOException; @@ -78,6 +80,7 @@ public void testInitializeSingleMemberQuorum() throws IOException { int localId = 0; RaftClientTestContext context = new RaftClientTestContext.Builder(localId, Collections.singleton(localId)).build(); context.assertElectedLeader(1, localId); + assertEquals(context.log.endOffset().offset, context.client.logEndOffset()); } @Test @@ -355,7 +358,8 @@ public void testAppendFailedWithRecordBatchTooLargeException() throws Exception for (int i = 0; i < size; i++) batchToLarge.add("a"); - assertThrows(RecordBatchTooLargeException.class, () -> context.client.scheduleAtomicAppend(epoch, batchToLarge)); + assertThrows(RecordBatchTooLargeException.class, + () -> context.client.scheduleAtomicAppend(epoch, OptionalLong.empty(), batchToLarge)); } @Test @@ -2856,4 +2860,31 @@ private static KafkaMetric getMetric(final Metrics metrics, final String name) { return metrics.metrics().get(metrics.metricName(name, "raft-metrics")); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testAppendWithRequiredBaseOffset(boolean correctOffset) throws Exception { + int localId = 0; + int otherNodeId = 1; + Set voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .build(); + context.becomeLeader(); + assertEquals(OptionalInt.of(localId), context.currentLeader()); + int epoch = context.currentEpoch(); + + if (correctOffset) { + assertEquals(1L, context.client.scheduleAtomicAppend(epoch, + OptionalLong.of(1), + singletonList("a"))); + context.deliverRequest(context.beginEpochRequest(epoch + 1, otherNodeId)); + context.pollUntilResponse(); + } else { + assertThrows(UnexpectedBaseOffsetException.class, () -> { + context.client.scheduleAtomicAppend(epoch, + OptionalLong.of(2), + singletonList("a")); + }); + } + } } diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java index 11499771790f9..70ae8cc3e209b 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java @@ -27,12 +27,16 @@ import org.apache.kafka.common.utils.ByteUtils; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mockito; import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; +import java.util.OptionalLong; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -41,6 +45,7 @@ import static java.util.Collections.singletonList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; class BatchAccumulatorTest { @@ -232,7 +237,7 @@ public void testLingerBeginsOnFirstWrite() { ); time.sleep(15); - assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a"))); + assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a"), OptionalLong.empty(), false)); assertEquals(lingerMs, acc.timeUntilDrain(time.milliseconds())); assertFalse(acc.isEmpty()); @@ -264,7 +269,7 @@ public void testCompletedBatchReleaseBuffer() { maxBatchSize ); - assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a"))); + assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a"), OptionalLong.empty(), false)); time.sleep(lingerMs); List> batches = acc.drain(); @@ -293,7 +298,7 @@ public void testUnflushedBuffersReleasedByClose() { maxBatchSize ); - assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a"))); + assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a"), OptionalLong.empty(), false)); acc.close(); Mockito.verify(memoryPool).release(buffer); } @@ -396,7 +401,7 @@ public void testRecordsAreSplit() { .generate(() -> record) .limit(numberOfRecords) .collect(Collectors.toList()); - assertEquals(baseOffset + numberOfRecords - 1, acc.append(leaderEpoch, records)); + assertEquals(baseOffset + numberOfRecords - 1, acc.append(leaderEpoch, records, OptionalLong.empty(), false)); time.sleep(lingerMs); assertTrue(acc.needsDrain(time.milliseconds())); @@ -451,7 +456,7 @@ public void testDrainDoesNotBlockWithConcurrentAppend() throws Exception { // Do the first append outside the thread to start the linger timer Mockito.when(memoryPool.tryAllocate(maxBatchSize)) .thenReturn(ByteBuffer.allocate(maxBatchSize)); - acc.append(leaderEpoch, singletonList("a")); + acc.append(leaderEpoch, singletonList("a"), OptionalLong.empty(), false); // Let the serde block to simulate a slow append Mockito.doAnswer(invocation -> { @@ -466,7 +471,7 @@ public void testDrainDoesNotBlockWithConcurrentAppend() throws Exception { Mockito.any(Writable.class) ); - Thread appendThread = new Thread(() -> acc.append(leaderEpoch, singletonList("b"))); + Thread appendThread = new Thread(() -> acc.append(leaderEpoch, singletonList("b"), OptionalLong.empty(), false)); appendThread.start(); // Attempt to drain while the append thread is holding the lock @@ -509,14 +514,50 @@ static interface Appender { static final Appender APPEND_ATOMIC = new Appender() { @Override public Long call(BatchAccumulator acc, int epoch, List records) { - return acc.appendAtomic(epoch, records); + return acc.append(epoch, records, OptionalLong.empty(), true); } }; static final Appender APPEND = new Appender() { @Override public Long call(BatchAccumulator acc, int epoch, List records) { - return acc.append(epoch, records); + return acc.append(epoch, records, OptionalLong.empty(), false); } }; + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testAppendWithRequiredBaseOffset(boolean correctOffset) { + int leaderEpoch = 17; + long baseOffset = 157; + int lingerMs = 50; + int maxBatchSize = 512; + + ByteBuffer buffer = ByteBuffer.allocate(maxBatchSize); + Mockito.when(memoryPool.tryAllocate(maxBatchSize)) + .thenReturn(buffer); + + BatchAccumulator acc = buildAccumulator( + leaderEpoch, + baseOffset, + lingerMs, + maxBatchSize + ); + + if (correctOffset) { + assertEquals(baseOffset, acc.append(leaderEpoch, + singletonList("a"), + OptionalLong.of(baseOffset), + true)); + } else { + assertEquals("Wanted base offset 156, but the next offset was 157", + assertThrows(UnexpectedBaseOffsetException.class, () -> { + acc.append(leaderEpoch, + singletonList("a"), + OptionalLong.of(baseOffset - 1), + true); + }).getMessage()); + } + acc.close(); + } }