Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.errors.ControllerExceptions;
import org.apache.kafka.controller.errors.EventHandlerExceptionInfo;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
Expand Down Expand Up @@ -457,44 +458,39 @@ private void handleEventEnd(String name, long startProcessingTimeNs) {
controllerMetrics.updateEventQueueProcessingTime(NANOSECONDS.toMillis(deltaNs));
}

private Throwable handleEventException(String name,
OptionalLong startProcessingTimeNs,
Throwable exception) {
if (!startProcessingTimeNs.isPresent() &&
ControllerExceptions.isTimeoutException(exception)) {
// If the event never started, and the exception is a timeout, increment the timed
// out metric.
controllerMetrics.incrementOperationsTimedOut();
private Throwable handleEventException(
String name,
OptionalLong startProcessingTimeNs,
Throwable exception
) {
OptionalLong deltaUs;
if (startProcessingTimeNs.isPresent()) {
long endProcessingTime = time.nanoseconds();
long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
deltaUs = OptionalLong.of(MICROSECONDS.convert(deltaNs, NANOSECONDS));
} else {
deltaUs = OptionalLong.empty();
}
Throwable externalException =
ControllerExceptions.toExternalException(exception, () -> latestController());
if (!startProcessingTimeNs.isPresent()) {
log.error("{}: unable to start processing because of {}. Reason: {}", name,
exception.getClass().getSimpleName(), exception.getMessage());
return externalException;
EventHandlerExceptionInfo info = EventHandlerExceptionInfo.
fromInternal(exception, () -> latestController());
int epoch = curClaimEpoch;
if (epoch == -1) {
epoch = lastCommittedEpoch;
}
long endProcessingTime = time.nanoseconds();
long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS);
if (ControllerExceptions.isExpected(exception)) {
log.info("{}: failed with {} in {} us. Reason: {}", name,
exception.getClass().getSimpleName(), deltaUs, exception.getMessage());
return externalException;
String failureMessage = info.failureMessage(epoch, deltaUs,
isActiveController(), lastCommittedOffset);
if (info.isTimeoutException() && (!deltaUs.isPresent())) {
controllerMetrics.incrementOperationsTimedOut();
}
if (isActiveController()) {
nonFatalFaultHandler.handleFault(String.format("%s: failed with unexpected server " +
"exception %s at epoch %d in %d us. Renouncing leadership and reverting " +
"to the last committed offset %d.",
name, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs,
lastCommittedOffset), exception);
renounce();
if (info.isFault()) {
nonFatalFaultHandler.handleFault(name + ": " + failureMessage, exception);
} else {
nonFatalFaultHandler.handleFault(String.format("%s: failed with unexpected server " +
"exception %s in %d us. The controller is already in standby mode.",
name, exception.getClass().getSimpleName(), deltaUs),
exception);
log.info("{}: {}", name, failureMessage);
Comment thread
mumrah marked this conversation as resolved.
Outdated
}
if (info.causesFailover() && isActiveController()) {
renounce();
}
return externalException;
return info.effectiveExternalException();
}

private long updateEventStartMetricsAndGetTime(OptionalLong eventCreatedTimeNs) {
Expand Down Expand Up @@ -755,22 +751,28 @@ public Long apply(List<ApiMessageAndVersion> records) {
// Start by trying to apply the record to our in-memory state. This should always
// succeed; if it does not, that's a fatal error. It is important to do this before
// scheduling the record for Raft replication.
int i = 1;
int recordIndex = 0;
for (ApiMessageAndVersion message : records) {
long recordOffset = prevEndOffset + 1 + recordIndex;
try {
replay(message.message(), Optional.empty(), prevEndOffset + records.size());
replay(message.message(), Optional.empty(), recordOffset);
} catch (Throwable e) {
String failureMessage = String.format("Unable to apply %s record, which was " +
"%d of %d record(s) in the batch following last write offset %d.",
message.message().getClass().getSimpleName(), i, records.size(),
prevEndOffset);
String failureMessage = String.format("Unable to apply %s " +
"record at offset %d on active controller, from the " +
"batch with baseOffset %d",
message.message().getClass().getSimpleName(),
recordOffset, prevEndOffset + 1);
throw fatalFaultHandler.handleFault(failureMessage, e);
}
i++;
recordIndex++;
}
prevEndOffset = raftClient.scheduleAtomicAppend(controllerEpoch, records);
snapshotRegistry.getOrCreateSnapshot(prevEndOffset);
return prevEndOffset;
long nextEndOffset = prevEndOffset + recordIndex;
raftClient.scheduleAtomicAppend(controllerEpoch,
OptionalLong.of(prevEndOffset + 1),
records);
snapshotRegistry.getOrCreateSnapshot(nextEndOffset);
Comment thread
mumrah marked this conversation as resolved.
prevEndOffset = nextEndOffset;
return nextEndOffset;
}
});
op.processBatchEndOffset(offset);
Expand Down Expand Up @@ -988,18 +990,20 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
log.debug("Replaying commits from the active node up to " +
"offset {} and epoch {}.", offset, epoch);
}
int i = 1;
int recordIndex = 0;
for (ApiMessageAndVersion message : messages) {
long recordOffset = batch.baseOffset() + recordIndex;
try {
replay(message.message(), Optional.empty(), offset);
replay(message.message(), Optional.empty(), recordOffset);
} catch (Throwable e) {
String failureMessage = String.format("Unable to apply %s record on standby " +
"controller, which was %d of %d record(s) in the batch with baseOffset %d.",
message.message().getClass().getSimpleName(), i, messages.size(),
batch.baseOffset());
String failureMessage = String.format("Unable to apply %s " +
"record at offset %d on standby controller, from the " +
"batch with baseOffset %d",
message.message().getClass().getSimpleName(),
recordOffset, batch.baseOffset());
throw fatalFaultHandler.handleFault(failureMessage, e);
}
i++;
recordIndex++;
}
}

Expand All @@ -1008,13 +1012,6 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
epoch,
batch.appendTimestamp()
);

if (offset >= raftClient.latestSnapshotId().map(OffsetAndEpoch::offset).orElse(0L)) {
oldestNonSnapshottedTimestamp = Math.min(
oldestNonSnapshottedTimestamp,
batch.appendTimestamp()
);
}
}
} finally {
reader.close();
Expand Down Expand Up @@ -1094,10 +1091,10 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
renounce();
}
} else if (newLeader.isLeader(nodeId)) {
log.info("Becoming the active controller at epoch {}, committed offset {}, " +
"committed epoch {}", newLeader.epoch(), lastCommittedOffset,
lastCommittedEpoch);
claim(newLeader.epoch());
long newLastWriteOffset = raftClient.logEndOffset() - 1;
log.info("Becoming the active controller at epoch {}, last write offset {}.",
newLeader.epoch(), newLastWriteOffset);
claim(newLeader.epoch(), newLastWriteOffset);
} else {
log.info("In the new epoch {}, the leader is {}.",
newLeader.epoch(), newLeaderName);
Expand Down Expand Up @@ -1168,15 +1165,15 @@ private void updateWriteOffset(long offset) {
}
}

private void claim(int epoch) {
private void claim(int epoch, long newLastWriteOffset) {
try {
if (curClaimEpoch != -1) {
throw new RuntimeException("Cannot claim leadership because we are already the " +
"active controller.");
}
curClaimEpoch = epoch;
controllerMetrics.setActive(true);
updateWriteOffset(lastCommittedOffset);
updateWriteOffset(newLastWriteOffset);
Comment on lines -1179 to +1176
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, we would update lastCommittedOffset as we got the handleCommit callback from our RaftClient. Since we process Raft events sequentially (and they are delivered sequentially from a single thread), we always process any commit callbacks before a leader change. Which means this offset is valid with respect to the end offset when the leadership changed.

Now, while we're processing a leader change, we ask RaftClient for its end offset. Is there any possibility that commits could be made that would make this end offset greater than we expect? Basically, can we be sure that the end offset doesn't change between the time Raft becomes the leader and this event is processed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, there is not really any difference between the previous iterations of this PR and the current one in this regard. If some other component that isn't the controller is adding messages, our supplied requiredBaseOffset may be invalid. It is only a snapshot of the offset at a point in time, after all. Which is why we check requiredBaseOffset in atomicAppend.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, I just wanted to make sure I understood the behavior here regarding the new RaftClient API. It sounds like it's fine for us as long as we're single writer.

clusterControl.activate();

// Before switching to active, create an in-memory snapshot at the last committed
Expand Down Expand Up @@ -1516,25 +1513,24 @@ private void handleFeatureControlChange() {
*
* @param message The metadata record
* @param snapshotId The snapshotId if this record is from a snapshot
* @param batchLastOffset The offset of the last record in the log batch, or the lastContainedLogOffset
* if this record is from a snapshot, this is used along with RegisterBrokerRecord
* @param offset The offset of the record
*/
private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long batchLastOffset) {
private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long offset) {
if (log.isTraceEnabled()) {
if (snapshotId.isPresent()) {
log.trace("Replaying snapshot {} record {}",
Snapshots.filenameFromSnapshotId(snapshotId.get()),
recordRedactor.toLoggableString(message));
} else {
log.trace("Replaying log record {} with batchLastOffset {}",
recordRedactor.toLoggableString(message), batchLastOffset);
log.trace("Replaying log record {} with offset {}",
recordRedactor.toLoggableString(message), offset);
}
}
logReplayTracker.replay(message);
MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
switch (type) {
case REGISTER_BROKER_RECORD:
clusterControl.replay((RegisterBrokerRecord) message, batchLastOffset);
clusterControl.replay((RegisterBrokerRecord) message, offset);
break;
case UNREGISTER_BROKER_RECORD:
clusterControl.replay((UnregisterBrokerRecord) message);
Expand Down Expand Up @@ -1767,11 +1763,6 @@ private void resetToEmptyState() {
*/
private long writeOffset;

/**
* Timestamp for the oldest record that was committed but not included in a snapshot.
*/
private long oldestNonSnapshottedTimestamp = Long.MAX_VALUE;

/**
* How long to delay partition leader balancing operations.
*/
Expand Down Expand Up @@ -2328,4 +2319,11 @@ Time time() {
QuorumControllerMetrics controllerMetrics() {
return controllerMetrics;
}

// VisibleForTesting
void setWriteOffset(long newWriteOffset) {
appendControlEvent("setWriteOffset", () -> {
this.writeOffset = newWriteOffset;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,11 @@

package org.apache.kafka.controller.errors;

import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.server.mutable.BoundedListTooLongException;

import java.util.OptionalInt;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Supplier;


public class ControllerExceptions {
Expand Down Expand Up @@ -93,58 +86,4 @@ public static NotControllerException newWrongControllerException(OptionalInt con
return new NotControllerException("No controller appears to be active.");
}
}

/**
* Determine if an exception is expected. Unexpected exceptions trigger controller failovers
* when they are raised.
*
* @param exception The exception.
* @return True if the exception is expected.
*/
public static boolean isExpected(Throwable exception) {
if (exception instanceof ApiException) {
// ApiExceptions indicate errors that should be returned to the user.
return true;
} else if (exception instanceof NotLeaderException) {
// NotLeaderException is thrown if we try to append records, but are not the leader.
return true;
} else if (exception instanceof RejectedExecutionException) {
// This can happen when the controller is shutting down.
return true;
} else if (exception instanceof BoundedListTooLongException) {
// This can happen if we tried to create too many records.
return true;
} else if (exception instanceof InterruptedException) {
// Interrupted exceptions are not expected. They might happen during junit tests if
// the test gets stuck and must be terminated by sending IE to all the threads.
return false;
}
// Other exceptions are unexpected.
return false;
}

/**
* Translate an internal controller exception to its external equivalent.
*
* @param exception The internal exception.
* @return Its external equivalent.
*/
public static Throwable toExternalException(
Throwable exception,
Supplier<OptionalInt> latestControllerSupplier
) {
if (exception instanceof ApiException) {
return exception;
} else if (exception instanceof NotLeaderException) {
return newWrongControllerException(latestControllerSupplier.get());
} else if (exception instanceof RejectedExecutionException) {
return new TimeoutException("The controller is shutting down.", exception);
} else if (exception instanceof BoundedListTooLongException) {
return new PolicyViolationException("Unable to perform excessively large batch " +
"operation.");
} else if (exception instanceof InterruptedException) {
return new UnknownServerException("The controller was interrupted.");
}
return new UnknownServerException(exception);
}
}
Loading