Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ class PartitionStateMachine(config: KafkaConfig,
if (leaderIsrAndControllerEpochOpt.nonEmpty) {
val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochOpt.get
val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled)
val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext)
val newLeaderAndIsrOpt = leaderOpt.map { leader =>
val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
else List(leader)
Expand Down Expand Up @@ -435,10 +435,13 @@ class PartitionStateMachine(config: KafkaConfig,
}

object PartitionLeaderElectionAlgorithms {
def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean): Option[Int] = {
def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext): Option[Int] = {
assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse {
if (uncleanLeaderElectionEnabled) {
assignment.find(liveReplicas.contains)
val leaderOpt = assignment.find(liveReplicas.contains)
if (!leaderOpt.isEmpty)
controllerContext.stats.uncleanLeaderElectionRate.mark()
leaderOpt
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@
package kafka.controller

import org.junit.Assert._
import org.junit.Test
import org.junit.{Before, Test}
import org.scalatest.junit.JUnitSuite

class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite {
private var controllerContext: ControllerContext = null

@Before
def setUp(): Unit = {
controllerContext = new ControllerContext
controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec")
}

@Test
def testOfflinePartitionLeaderElection(): Unit = {
Expand All @@ -30,7 +37,8 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite {
val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
isr,
liveReplicas,
uncleanLeaderElectionEnabled = false)
uncleanLeaderElectionEnabled = false,
controllerContext)
assertEquals(Option(4), leaderOpt)
}

Expand All @@ -42,9 +50,12 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite {
val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
isr,
liveReplicas,
uncleanLeaderElectionEnabled = false)
uncleanLeaderElectionEnabled = false,
controllerContext)
assertEquals(None, leaderOpt)
assertEquals(0, controllerContext.stats.uncleanLeaderElectionRate.count())
}

@Test
def testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionEnabled(): Unit = {
val assignment = Seq(2, 4)
Expand All @@ -53,19 +64,20 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite {
val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
isr,
liveReplicas,
uncleanLeaderElectionEnabled = true)
uncleanLeaderElectionEnabled = true,
controllerContext)
assertEquals(Option(4), leaderOpt)
assertEquals(1, controllerContext.stats.uncleanLeaderElectionRate.count())
}

@Test
def testReassignPartitionLeaderElection(): Unit = {
val reassignment = Seq(2, 4)
val isr = Seq(2, 4)
val liveReplicas = Set(4)
val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(reassignment,
val leaderOpt = PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(reassignment,
isr,
liveReplicas,
uncleanLeaderElectionEnabled = false)
liveReplicas)
assertEquals(Option(4), leaderOpt)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,17 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
produceMessage(servers, topic, "second")
assertEquals(List("first", "second"), consumeAllMessages(topic))

//remove any previous unclean election metric
servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))

// shutdown leader and then restart follower
servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
servers.filter(server => server.config.brokerId == followerId).map(server => server.startup())
val followerServer = servers.find(_.config.brokerId == followerId).get
followerServer.startup()

// wait until new leader is (uncleanly) elected
waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId))
assertEquals(1, followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count())

produceMessage(servers, topic, "third")

Expand Down Expand Up @@ -224,12 +229,17 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
produceMessage(servers, topic, "second")
assertEquals(List("first", "second"), consumeAllMessages(topic))

//remove any previous unclean election metric
servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))

// shutdown leader and then restart follower
servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
servers.filter(server => server.config.brokerId == followerId).map(server => server.startup())
val followerServer = servers.find(_.config.brokerId == followerId).get
followerServer.startup()

// verify that unclean election to non-ISR follower does not occur
waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(-1))
assertEquals(0, followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count())

// message production and consumption should both fail while leader is down
try {
Expand Down