Skip to content
61 changes: 34 additions & 27 deletions core/src/main/scala/kafka/server/AlterIsrManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
package kafka.server

import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}

import kafka.api.LeaderAndIsr
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.{KafkaScheduler, Logging, Scheduler}
Expand All @@ -32,6 +30,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
import org.apache.kafka.common.utils.Time

import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -112,68 +111,74 @@ class DefaultAlterIsrManager(
val brokerEpochSupplier: () => Long
) extends AlterIsrManager with Logging with KafkaMetricsGroup {

// Used to allow only one pending ISR update per partition
private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]()
// Used to allow only one pending ISR update per partition (visible for testing)
private[server] val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]()

// Used to allow only one in-flight request at a time
private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)

private val lastIsrPropagationMs = new AtomicLong(0)

override def start(): Unit = {
controllerChannelManager.start()
scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS)
}

override def shutdown(): Unit = {
controllerChannelManager.shutdown()
}

override def submit(alterIsrItem: AlterIsrItem): Boolean = {
unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null
val enqueued = unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null
maybePropagateIsrChanges()
enqueued
}

override def clearPending(topicPartition: TopicPartition): Unit = {
unsentIsrUpdates.remove(topicPartition)
}

private def propagateIsrChanges(): Unit = {
private[server] def maybePropagateIsrChanges(): Unit = {
// Send all pending items if there is not already a request in-flight.
if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, true)) {
// Copy current unsent ISRs but don't remove from the map
// Copy current unsent ISRs but don't remove from the map, they get cleared in the response handler
val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]()
unsentIsrUpdates.values().forEach(item => inflightAlterIsrItems.append(item))

val now = time.milliseconds()
lastIsrPropagationMs.set(now)
sendRequest(inflightAlterIsrItems.toSeq)
}
}

private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
val message = buildRequest(inflightAlterIsrItems)

def clearInflightRequests(): Unit = {
// Be sure to clear the in-flight flag to allow future AlterIsr requests
if (!inflightRequest.compareAndSet(true, false)) {
throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
}
private[server] def clearInFlightRequest(): Unit = {
if (!inflightRequest.compareAndSet(true, false)) {
warn("Attempting to clear AlterIsr in-flight flag when no apparent request is in-flight")
}
}

private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
val message = buildRequest(inflightAlterIsrItems)
debug(s"Sending AlterIsr to controller $message")

// We will not timeout AlterISR request, instead letting it retry indefinitely
// until a response is received, or a new LeaderAndIsr overwrites the existing isrState
// which causes the inflight requests to be ignored.
// which causes the response for those partitions to be ignored.
controllerChannelManager.sendRequest(new AlterIsrRequest.Builder(message),
new ControllerRequestCompletionHandler {
override def onComplete(response: ClientResponse): Unit = {
try {
debug(s"Received AlterIsr response $response")
val body = response.responseBody().asInstanceOf[AlterIsrResponse]
debug(s"Received AlterIsr response $response")
val body = response.responseBody().asInstanceOf[AlterIsrResponse]
val error = try {
handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems)
} finally {
clearInflightRequests()
// clear the flag so future requests can proceed
clearInFlightRequest()
}

// check if we need to send another request right away
error match {
case Errors.NONE =>
// In the normal case, check for pending updates to send immediately
maybePropagateIsrChanges()
case _ =>
// If we received a top-level error from the controller, retry the request in the near future
scheduler.schedule("send-alter-isr", () => maybePropagateIsrChanges(), 50, -1, TimeUnit.MILLISECONDS)
}
}

override def onTimeout(): Unit = {
Expand Down Expand Up @@ -207,7 +212,7 @@ class DefaultAlterIsrManager(

def handleAlterIsrResponse(alterIsrResponse: AlterIsrResponse,
sentBrokerEpoch: Long,
inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
inflightAlterIsrItems: Seq[AlterIsrItem]): Errors = {
val data: AlterIsrResponseData = alterIsrResponse.data

Errors.forCode(data.errorCode) match {
Expand Down Expand Up @@ -254,5 +259,7 @@ class DefaultAlterIsrManager(
case e: Errors =>
warn(s"Controller returned an unexpected top-level error when handling AlterIsr request: $e")
}

Errors.forCode(data.errorCode)
}
}
131 changes: 63 additions & 68 deletions core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.junit.{Before, Test}
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.{ArgumentMatchers, Mockito}


class AlterIsrManagerTest {

val topic = "test-topic"
Expand Down Expand Up @@ -63,9 +62,6 @@ class AlterIsrManagerTest {
val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2)
alterIsrManager.start()
alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))
time.sleep(50)
scheduler.tick()

EasyMock.verify(brokerToController)
}

Expand All @@ -83,10 +79,6 @@ class AlterIsrManagerTest {
// Only send one ISR update for a given topic+partition
assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)))
assertFalse(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2), 10), _ => {}, 0)))

time.sleep(50)
scheduler.tick()

EasyMock.verify(brokerToController)

val request = capture.getValue.build()
Expand All @@ -97,29 +89,37 @@ class AlterIsrManagerTest {
@Test
def testSingleBatch(): Unit = {
val capture = EasyMock.newCapture[AbstractRequest.Builder[AlterIsrRequest]]()
val callbackCapture = EasyMock.newCapture[ControllerRequestCompletionHandler]()

EasyMock.expect(brokerToController.start())
EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.anyObject())).once()
EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.capture(callbackCapture))).times(2)
EasyMock.replay(brokerToController)

val scheduler = new MockScheduler(time)
val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2)
alterIsrManager.start()

for (i <- 0 to 9) {
// First request will send batch of one
alterIsrManager.submit(AlterIsrItem(new TopicPartition(topic, 0),
new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))

// Other submissions will queue up until a response
for (i <- 1 to 9) {
alterIsrManager.submit(AlterIsrItem(new TopicPartition(topic, i),
new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))
time.sleep(1)
}

time.sleep(50)
scheduler.tick()
// Simulate response, omitting partition 0 will allow it to stay in unsent queue
val alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData())
val resp = new ClientResponse(null, null, "", 0L, 0L,
false, null, null, alterIsrResp)

// This should not be included in the batch
alterIsrManager.submit(AlterIsrItem(new TopicPartition(topic, 10),
new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))
// On the callback, we check for unsent items and send another request
callbackCapture.getValue.onComplete(resp)

EasyMock.verify(brokerToController)

// Verify the last request sent had all 10 items
val request = capture.getValue.build()
assertEquals(request.data().topics().size(), 1)
assertEquals(request.data().topics().get(0).partitions().size(), 10)
Expand All @@ -141,34 +141,46 @@ class AlterIsrManagerTest {
}

private def checkTopLevelError(error: Errors): Unit = {
val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => { }, 0))
val alterIsrManager = testTopLevelError(isrs, error)
// Any top-level error, we want to retry, so we don't clear items from the pending map
assertFalse(alterIsrManager.submit(AlterIsrItem(tp0, null, _ => { }, 0)))
}

private def testTopLevelError(isrs: Seq[AlterIsrItem], error: Errors): AlterIsrManager = {
val leaderAndIsr = new LeaderAndIsr(1, 1, List(1,2,3), 10)
val isrs = Seq(AlterIsrItem(tp0, leaderAndIsr, _ => { }, 0))
val callbackCapture = EasyMock.newCapture[ControllerRequestCompletionHandler]()

EasyMock.expect(brokerToController.start())
EasyMock.expect(brokerToController.sendRequest(EasyMock.anyObject(), EasyMock.capture(callbackCapture))).once()
EasyMock.expect(brokerToController.sendRequest(EasyMock.anyObject(), EasyMock.capture(callbackCapture))).times(1)
EasyMock.replay(brokerToController)

val scheduler = new MockScheduler(time)
val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2)
alterIsrManager.start()
isrs.foreach(alterIsrManager.submit)

EasyMock.verify(brokerToController)

var alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData().setErrorCode(error.code))
var resp = new ClientResponse(null, null, "", 0L, 0L,
false, null, null, alterIsrResp)
callbackCapture.getValue.onComplete(resp)

// Any top-level error, we want to retry, so we don't clear items from the pending map
assertTrue(alterIsrManager.unsentIsrUpdates.containsKey(tp0))

EasyMock.reset(brokerToController)
EasyMock.expect(brokerToController.sendRequest(EasyMock.anyObject(), EasyMock.capture(callbackCapture))).times(1)
EasyMock.replay(brokerToController)

// After some time, we will retry failed requests
time.sleep(100)
scheduler.tick()

EasyMock.verify(brokerToController)

val alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData().setErrorCode(error.code))
val resp = new ClientResponse(null, null, "", 0L, 0L,
// After a successful response, we can submit another AlterIsrItem
alterIsrResp = partitionResponse(tp0, Errors.NONE)
resp = new ClientResponse(null, null, "", 0L, 0L,
false, null, null, alterIsrResp)
callbackCapture.getValue.onComplete(resp)
alterIsrManager

EasyMock.verify(brokerToController)

assertFalse(alterIsrManager.unsentIsrUpdates.containsKey(tp0))
}

@Test
Expand All @@ -189,7 +201,7 @@ class AlterIsrManagerTest {
private def checkPartitionError(error: Errors): Unit = {
val alterIsrManager = testPartitionError(tp0, error)
// Any partition-level error should clear the item from the pending queue allowing for future updates
assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, null, _ => {}, 0)))
assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)))
}

private def testPartitionError(tp: TopicPartition, error: Errors): AlterIsrManager = {
Expand All @@ -213,19 +225,10 @@ class AlterIsrManagerTest {

alterIsrManager.submit(AlterIsrItem(tp, new LeaderAndIsr(1, 1, List(1,2,3), 10), callback, 0))

time.sleep(100)
scheduler.tick()

EasyMock.verify(brokerToController)
EasyMock.reset(brokerToController)

val alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData()
.setTopics(Collections.singletonList(
new AlterIsrResponseData.TopicData()
.setName(tp.topic())
.setPartitions(Collections.singletonList(
new AlterIsrResponseData.PartitionData()
.setPartitionIndex(tp.partition())
.setErrorCode(error.code))))))
val alterIsrResp = partitionResponse(tp, error)
val resp = new ClientResponse(null, null, "", 0L, 0L,
false, null, null, alterIsrResp)
callbackCapture.getValue.onComplete(resp)
Expand All @@ -245,32 +248,24 @@ class AlterIsrManagerTest {
val scheduler = new MockScheduler(time)
val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2)
alterIsrManager.start()
alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))

time.sleep(100)
scheduler.tick() // Triggers a request
// First submit will send the request
alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))

// Enqueue more updates
// These will become pending unsent items
alterIsrManager.submit(AlterIsrItem(tp1, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))
alterIsrManager.submit(AlterIsrItem(tp2, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))

time.sleep(100)
scheduler.tick() // Trigger the schedule again, but no request this time

EasyMock.verify(brokerToController)

// Even an empty response will clear the in-flight
// Once the callback runs, another request will be sent
EasyMock.reset(brokerToController)
EasyMock.expect(brokerToController.sendRequest(EasyMock.anyObject(), EasyMock.capture(callbackCapture))).once()
EasyMock.replay(brokerToController)
val alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData())
val resp = new ClientResponse(null, null, "", 0L, 0L,
false, null, null, alterIsrResp)
callbackCapture.getValue.onComplete(resp)

EasyMock.reset(brokerToController)
EasyMock.expect(brokerToController.sendRequest(EasyMock.anyObject(), EasyMock.capture(callbackCapture))).once()
EasyMock.replay(brokerToController)

time.sleep(100)
scheduler.tick()
EasyMock.verify(brokerToController)
}

Expand All @@ -295,21 +290,10 @@ class AlterIsrManagerTest {
alterIsrManager.submit(AlterIsrItem(tp1, new LeaderAndIsr(1, 1, List(1,2,3), 10), callback, 0))
alterIsrManager.submit(AlterIsrItem(tp2, new LeaderAndIsr(1, 1, List(1,2,3), 10), callback, 0))


time.sleep(100)
scheduler.tick()

EasyMock.verify(brokerToController)

// Three partitions were sent, but only one returned
val alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData()
.setTopics(Collections.singletonList(
new AlterIsrResponseData.TopicData()
.setName(tp0.topic())
.setPartitions(Collections.singletonList(
new AlterIsrResponseData.PartitionData()
.setPartitionIndex(tp0.partition())
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()))))))
val alterIsrResp = partitionResponse(tp0, Errors.UNKNOWN_SERVER_ERROR)
val resp = new ClientResponse(null, null, "", 0L, 0L,
false, null, null, alterIsrResp)
callbackCapture.getValue.onComplete(resp)
Expand Down Expand Up @@ -345,4 +329,15 @@ class AlterIsrManagerTest {
assertTrue(zkIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 3),
expectMatch(Left(Errors.INVALID_UPDATE_VERSION)), 0)))
}

private def partitionResponse(tp: TopicPartition, error: Errors): AlterIsrResponse = {
new AlterIsrResponse(new AlterIsrResponseData()
.setTopics(Collections.singletonList(
new AlterIsrResponseData.TopicData()
.setName(tp.topic())
.setPartitions(Collections.singletonList(
new AlterIsrResponseData.PartitionData()
.setPartitionIndex(tp.partition())
.setErrorCode(error.code))))))
}
}