Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,52 @@

public class LeaderAndIsrRequest extends AbstractControlRequest {

public enum Type {
UNKNOWN(0),
INCREMENTAL(1),
FULL(2);

private final byte type;
private Type(int type) {
this.type = (byte) type;
}

public byte toByte() {
return type;
}

public static Type fromByte(byte type) {
for (Type t : Type.values()) {
if (t.type == type) {
return t;
}
}
return UNKNOWN;
}
}

public static class Builder extends AbstractControlRequest.Builder<LeaderAndIsrRequest> {

private final List<LeaderAndIsrPartitionState> partitionStates;
private final Map<String, Uuid> topicIds;
private final Collection<Node> liveLeaders;
private final Type updateType;

public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch,
List<LeaderAndIsrPartitionState> partitionStates, Map<String, Uuid> topicIds,
Collection<Node> liveLeaders) {
this(version, controllerId, controllerEpoch, brokerEpoch, partitionStates, topicIds,
liveLeaders, false);
liveLeaders, false, Type.UNKNOWN);
}

public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch,
List<LeaderAndIsrPartitionState> partitionStates, Map<String, Uuid> topicIds,
Collection<Node> liveLeaders, boolean kraftController) {
Collection<Node> liveLeaders, boolean kraftController, Type updateType) {
super(ApiKeys.LEADER_AND_ISR, version, controllerId, controllerEpoch, brokerEpoch, kraftController);
this.partitionStates = partitionStates;
this.topicIds = topicIds;
this.liveLeaders = liveLeaders;
this.updateType = updateType;
}

@Override
Expand All @@ -82,6 +108,10 @@ public LeaderAndIsrRequest build(short version) {
data.setIsKRaftController(kraftController);
}

if (version >= 5) {
Comment thread
cmccabe marked this conversation as resolved.
data.setType(updateType.toByte());
}

if (version >= 2) {
Map<String, LeaderAndIsrTopicState> topicStatesMap = groupByTopic(partitionStates, topicIds);
data.setTopicStates(new ArrayList<>(topicStatesMap.values()));
Expand Down Expand Up @@ -210,6 +240,10 @@ public List<LeaderAndIsrLiveLeader> liveLeaders() {
return Collections.unmodifiableList(data.liveLeaders());
}

public Type requestType() {
return Type.fromByte(data.type());
}

@Override
public LeaderAndIsrRequestData data() {
return data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
val stopReplicaRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, StopReplicaPartitionState]]
val updateMetadataRequestBrokerSet = mutable.Set.empty[Int]
val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, UpdateMetadataPartitionState]
private var updateType: LeaderAndIsrRequest.Type = LeaderAndIsrRequest.Type.UNKNOWN
private var metadataInstance: ControllerChannelContext = _

def sendRequest(brokerId: Int,
Expand All @@ -398,12 +399,17 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
metadataInstance = metadataProvider()
}

def setUpdateType(updateType: LeaderAndIsrRequest.Type): Unit = {
this.updateType = updateType
}

def clear(): Unit = {
leaderAndIsrRequestMap.clear()
stopReplicaRequestMap.clear()
updateMetadataRequestBrokerSet.clear()
updateMetadataRequestPartitionInfoMap.clear()
metadataInstance = null
updateType = LeaderAndIsrRequest.Type.UNKNOWN
}

def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int],
Expand Down Expand Up @@ -543,15 +549,25 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
.toSet[String]
.map(topic => (topic, metadataInstance.topicIds.getOrElse(topic, Uuid.ZERO_UUID)))
.toMap
val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId,
controllerEpoch, brokerEpoch, leaderAndIsrPartitionStates.values.toBuffer.asJava, topicIds.asJava, leaders.asJava, kraftController)
val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(
leaderAndIsrRequestVersion,
controllerId,
controllerEpoch,
brokerEpoch,
leaderAndIsrPartitionStates.values.toBuffer.asJava,
topicIds.asJava,
leaders.asJava,
kraftController,
updateType
)
sendRequest(broker, leaderAndIsrRequestBuilder, (r: AbstractResponse) => {
val leaderAndIsrResponse = r.asInstanceOf[LeaderAndIsrResponse]
handleLeaderAndIsrResponse(leaderAndIsrResponse, broker)
})
}
}
leaderAndIsrRequestMap.clear()
updateType = LeaderAndIsrRequest.Type.UNKNOWN
}

def handleLeaderAndIsrResponse(response: LeaderAndIsrResponse, broker: Int): Unit
Expand Down
37 changes: 31 additions & 6 deletions core/src/main/scala/kafka/log/LocalLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,12 @@ object LocalLog extends Logging {
/** a directory that is used for future partition */
private[log] val FutureDirSuffix = "-future"

/** a directory that is used for stray partition */
private[log] val StrayDirSuffix = "-stray"

private[log] val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix")
private[log] val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix")
private[log] val StrayDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$StrayDirSuffix")

private[log] val UnknownOffset = -1L

Expand All @@ -622,10 +626,17 @@ object LocalLog extends Logging {
* from exceeding 255 characters.
*/
private[log] def logDeleteDirName(topicPartition: TopicPartition): String = {
val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
val suffix = s"-${topicPartition.partition()}.$uniqueId$DeleteDirSuffix"
val prefixLength = Math.min(topicPartition.topic().size, 255 - suffix.size)
s"${topicPartition.topic().substring(0, prefixLength)}$suffix"
logDirNameWithSuffixCappedLength(topicPartition, DeleteDirSuffix)
}

/**
* Return a directory name to rename the log directory to for stray partition deletion.
* The name will be in the following format: "topic-partitionId.uniqueId-stray".
* If the topic name is too long, it will be truncated to prevent the total name
* from exceeding 255 characters.
*/
private[log] def logStrayDirName(topicPartition: TopicPartition): String = {
logDirNameWithSuffixCappedLength(topicPartition, StrayDirSuffix)
}

/**
Expand All @@ -636,6 +647,18 @@ object LocalLog extends Logging {
logDirNameWithSuffix(topicPartition, FutureDirSuffix)
}

/**
* Return a new directory name in the following format: "${topic}-${partitionId}.${uniqueId}${suffix}".
* If the topic name is too long, it will be truncated to prevent the total name
* from exceeding 255 characters.
*/
private[log] def logDirNameWithSuffixCappedLength(topicPartition: TopicPartition, suffix: String): String = {
val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
val fullSuffix = s"-${topicPartition.partition()}.$uniqueId$suffix"
val prefixLength = Math.min(topicPartition.topic().size, 255 - fullSuffix.size)
s"${topicPartition.topic().substring(0, prefixLength)}$fullSuffix"
}

private[log] def logDirNameWithSuffix(topicPartition: TopicPartition, suffix: String): String = {
val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
s"${logDirName(topicPartition)}.$uniqueId$suffix"
Expand Down Expand Up @@ -666,11 +689,13 @@ object LocalLog extends Logging {
if (dirName == null || dirName.isEmpty || !dirName.contains('-'))
throw exception(dir)
if (dirName.endsWith(DeleteDirSuffix) && !DeleteDirPattern.matcher(dirName).matches ||
dirName.endsWith(FutureDirSuffix) && !FutureDirPattern.matcher(dirName).matches)
dirName.endsWith(FutureDirSuffix) && !FutureDirPattern.matcher(dirName).matches ||
dirName.endsWith(StrayDirSuffix) && !StrayDirPattern.matcher(dirName).matches)
throw exception(dir)

val name: String =
if (dirName.endsWith(DeleteDirSuffix) || dirName.endsWith(FutureDirSuffix)) dirName.substring(0, dirName.lastIndexOf('.'))
if (dirName.endsWith(DeleteDirSuffix) || dirName.endsWith(FutureDirSuffix) || dirName.endsWith(StrayDirSuffix))
dirName.substring(0, dirName.lastIndexOf('.'))
else dirName

val index = name.lastIndexOf('-')
Expand Down
31 changes: 25 additions & 6 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ class LogManager(logDirs: Seq[File],
// Each element in the queue contains the log object to be deleted and the time it is scheduled for deletion.
private val logsToBeDeleted = new LinkedBlockingQueue[(UnifiedLog, Long)]()

// Map of stray partition to stray log. This holds all stray logs detected on the broker.
// Visible for testing
private val strayLogs = new Pool[TopicPartition, UnifiedLog]()

private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs)
@volatile private var _currentDefaultConfig = initialDefaultConfig
@volatile private var numRecoveryThreadsPerDataDir = recoveryThreadsPerDataDir
Expand Down Expand Up @@ -302,6 +306,10 @@ class LogManager(logDirs: Seq[File],
this.logsToBeDeleted.add((log, time.milliseconds()))
}

def addStrayLog(strayPartition: TopicPartition, strayLog: UnifiedLog): Unit = {
this.strayLogs.put(strayPartition, strayLog)
}

// Only for testing
private[log] def hasLogsToBeDeleted: Boolean = !logsToBeDeleted.isEmpty

Expand Down Expand Up @@ -337,6 +345,9 @@ class LogManager(logDirs: Seq[File],

if (logDir.getName.endsWith(UnifiedLog.DeleteDirSuffix)) {
addLogToBeDeleted(log)
} else if (logDir.getName.endsWith(UnifiedLog.StrayDirSuffix)) {
addStrayLog(topicPartition, log)
warn(s"Loaded stray log: $logDir")
} else {
val previous = {
if (log.isFuture)
Expand Down Expand Up @@ -1203,7 +1214,8 @@ class LogManager(logDirs: Seq[File],
*/
def asyncDelete(topicPartition: TopicPartition,
isFuture: Boolean = false,
checkpoint: Boolean = true): Option[UnifiedLog] = {
checkpoint: Boolean = true,
isStray: Boolean = false): Option[UnifiedLog] = {
val removedLog: Option[UnifiedLog] = logCreationOrDeletionLock synchronized {
removeLogAndMetrics(if (isFuture) futureLogs else currentLogs, topicPartition)
}
Expand All @@ -1216,15 +1228,21 @@ class LogManager(logDirs: Seq[File],
cleaner.updateCheckpoints(removedLog.parentDirFile, partitionToRemove = Option(topicPartition))
}
}
removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), false)
if (isStray) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are only renaming the stray log dir and removing it from LogManager. Do we also want to add the logic on delayed stray log deletion? If we want to delete the stray logs immediately (more risky, and it might create conflicts with AK merge), I think we need to add it to the log deletion queue (by calling addLogToBeDeleted(strayLog)).

Right now the log dir will be renamed to "-stray" but it will not be deleted by the broker.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, the desired behavior is to stop tracking the partition but not delete the files. Since migrations are are one-off and inherently risky, I didn't want to take any destructive actions like deleting the logs (immediately or delayed). The stray'd partitions are logged at the INFO level when they are detected, and at WARN on subsequent startups.

This gives give operators the information needed to clean up stray partitions if desired.

I filed https://issues.apache.org/jira/browse/KAFKA-15698 to track automatic clean up of the stray partitions.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Makes sense.

// Move aside stray partitions, don't delete them
removedLog.renameDir(UnifiedLog.logStrayDirName(topicPartition), false)
warn(s"Log for partition ${removedLog.topicPartition} is marked as stray and renamed to ${removedLog.dir.getAbsolutePath}")
} else {
removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), false)
addLogToBeDeleted(removedLog)
info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
}
if (checkpoint) {
val logDir = removedLog.parentDirFile
val logsToCheckpoint = logsInDir(logDir)
checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint)
checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
}
addLogToBeDeleted(removedLog)
info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")

case None =>
if (offlineLogDirs.nonEmpty) {
Expand All @@ -1244,18 +1262,19 @@ class LogManager(logDirs: Seq[File],
* topic-partition is raised
*/
def asyncDelete(topicPartitions: Set[TopicPartition],
isStray: Boolean,
errorHandler: (TopicPartition, Throwable) => Unit): Unit = {
val logDirs = mutable.Set.empty[File]

topicPartitions.foreach { topicPartition =>
try {
getLog(topicPartition).foreach { log =>
logDirs += log.parentDirFile
asyncDelete(topicPartition, checkpoint = false)
asyncDelete(topicPartition, checkpoint = false, isStray = isStray)
}
getLog(topicPartition, isFuture = true).foreach { log =>
logDirs += log.parentDirFile
asyncDelete(topicPartition, isFuture = true, checkpoint = false)
asyncDelete(topicPartition, isFuture = true, checkpoint = false, isStray = isStray)
}
} catch {
case e: Throwable => errorHandler(topicPartition, e)
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1869,6 +1869,8 @@ object UnifiedLog extends Logging {

val DeleteDirSuffix = LocalLog.DeleteDirSuffix

val StrayDirSuffix = LocalLog.StrayDirSuffix

val FutureDirSuffix = LocalLog.FutureDirSuffix

private[log] val DeleteDirPattern = LocalLog.DeleteDirPattern
Expand Down Expand Up @@ -1951,6 +1953,8 @@ object UnifiedLog extends Logging {

def logFutureDirName(topicPartition: TopicPartition): String = LocalLog.logFutureDirName(topicPartition)

def logStrayDirName(topicPartition: TopicPartition): String = LocalLog.logStrayDirName(topicPartition)

def logDirName(topicPartition: TopicPartition): String = LocalLog.logDirName(topicPartition)

def transactionIndexFile(dir: File, offset: Long, suffix: String = ""): File = LogFileUtils.transactionIndexFile(dir, offset, suffix)
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/migration/MigrationPropagator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import kafka.controller.{ControllerChannelContext, ControllerChannelManager, Rep
import kafka.server.KafkaConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.apache.kafka.common.utils.Time
import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage, TopicsImage}
import org.apache.kafka.metadata.PartitionRegistration
Expand Down Expand Up @@ -225,6 +226,7 @@ class MigrationPropagator(
requestBatch.sendRequestsToBrokers(zkControllerEpoch)

requestBatch.newBatch()
requestBatch.setUpdateType(LeaderAndIsrRequest.Type.FULL)
// When we need to send RPCs from the image, we're sending 'full' requests meaning we let
// every broker know about all the metadata and all the LISR requests it needs to handle.
// Note that we cannot send StopReplica requests from the image. We don't have any state
Expand Down
Loading