diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala index e463487167df6..df308d4a0cc0c 100644 --- a/core/src/main/scala/kafka/server/AlterIsrManager.scala +++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala @@ -17,9 +17,7 @@ package kafka.server import java.util -import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import java.util.concurrent.{ConcurrentHashMap, TimeUnit} - import kafka.api.LeaderAndIsr import kafka.metrics.KafkaMetricsGroup import kafka.utils.{KafkaScheduler, Logging, Scheduler} @@ -32,6 +30,7 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} import org.apache.kafka.common.utils.Time +import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ @@ -112,17 +111,14 @@ class DefaultAlterIsrManager( val brokerEpochSupplier: () => Long ) extends AlterIsrManager with Logging with KafkaMetricsGroup { - // Used to allow only one pending ISR update per partition - private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]() + // Used to allow only one pending ISR update per partition (visible for testing) + private[server] val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]() // Used to allow only one in-flight request at a time private val inflightRequest: AtomicBoolean = new AtomicBoolean(false) - private val lastIsrPropagationMs = new AtomicLong(0) - override def start(): Unit = { controllerChannelManager.start() - scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS) } override def shutdown(): Unit = { @@ -130,50 +126,59 @@ class DefaultAlterIsrManager( } override def submit(alterIsrItem: AlterIsrItem): Boolean = { - unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null + val enqueued = unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null + maybePropagateIsrChanges() + enqueued } override def clearPending(topicPartition: TopicPartition): Unit = { unsentIsrUpdates.remove(topicPartition) } - private def propagateIsrChanges(): Unit = { + private[server] def maybePropagateIsrChanges(): Unit = { + // Send all pending items if there is not already a request in-flight. if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, true)) { - // Copy current unsent ISRs but don't remove from the map + // Copy current unsent ISRs but don't remove from the map, they get cleared in the response handler val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]() unsentIsrUpdates.values().forEach(item => inflightAlterIsrItems.append(item)) - - val now = time.milliseconds() - lastIsrPropagationMs.set(now) sendRequest(inflightAlterIsrItems.toSeq) } } - private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = { - val message = buildRequest(inflightAlterIsrItems) - - def clearInflightRequests(): Unit = { - // Be sure to clear the in-flight flag to allow future AlterIsr requests - if (!inflightRequest.compareAndSet(true, false)) { - throw new IllegalStateException("AlterIsr response callback called when no requests were in flight") - } + private[server] def clearInFlightRequest(): Unit = { + if (!inflightRequest.compareAndSet(true, false)) { + warn("Attempting to clear AlterIsr in-flight flag when no apparent request is in-flight") } + } + private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = { + val message = buildRequest(inflightAlterIsrItems) debug(s"Sending AlterIsr to controller $message") // We will not timeout AlterISR request, instead letting it retry indefinitely // until a response is received, or a new LeaderAndIsr overwrites the existing isrState - // which causes the inflight requests to be ignored. + // which causes the response for those partitions to be ignored. controllerChannelManager.sendRequest(new AlterIsrRequest.Builder(message), new ControllerRequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { - try { - debug(s"Received AlterIsr response $response") - val body = response.responseBody().asInstanceOf[AlterIsrResponse] + debug(s"Received AlterIsr response $response") + val body = response.responseBody().asInstanceOf[AlterIsrResponse] + val error = try { handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems) } finally { - clearInflightRequests() + // clear the flag so future requests can proceed + clearInFlightRequest() } + + // check if we need to send another request right away + error match { + case Errors.NONE => + // In the normal case, check for pending updates to send immediately + maybePropagateIsrChanges() + case _ => + // If we received a top-level error from the controller, retry the request in the near future + scheduler.schedule("send-alter-isr", () => maybePropagateIsrChanges(), 50, -1, TimeUnit.MILLISECONDS) + } } override def onTimeout(): Unit = { @@ -207,7 +212,7 @@ class DefaultAlterIsrManager( def handleAlterIsrResponse(alterIsrResponse: AlterIsrResponse, sentBrokerEpoch: Long, - inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = { + inflightAlterIsrItems: Seq[AlterIsrItem]): Errors = { val data: AlterIsrResponseData = alterIsrResponse.data Errors.forCode(data.errorCode) match { @@ -254,5 +259,7 @@ class DefaultAlterIsrManager( case e: Errors => warn(s"Controller returned an unexpected top-level error when handling AlterIsr request: $e") } + + Errors.forCode(data.errorCode) } } diff --git a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala index d053da5c6198a..b1336d317b3d9 100644 --- a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala @@ -34,7 +34,6 @@ import org.junit.{Before, Test} import org.mockito.ArgumentMatchers.{any, anyString} import org.mockito.{ArgumentMatchers, Mockito} - class AlterIsrManagerTest { val topic = "test-topic" @@ -63,9 +62,6 @@ class AlterIsrManagerTest { val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2) alterIsrManager.start() alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) - time.sleep(50) - scheduler.tick() - EasyMock.verify(brokerToController) } @@ -83,10 +79,6 @@ class AlterIsrManagerTest { // Only send one ISR update for a given topic+partition assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))) assertFalse(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2), 10), _ => {}, 0))) - - time.sleep(50) - scheduler.tick() - EasyMock.verify(brokerToController) val request = capture.getValue.build() @@ -97,29 +89,37 @@ class AlterIsrManagerTest { @Test def testSingleBatch(): Unit = { val capture = EasyMock.newCapture[AbstractRequest.Builder[AlterIsrRequest]]() + val callbackCapture = EasyMock.newCapture[ControllerRequestCompletionHandler]() + EasyMock.expect(brokerToController.start()) - EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.anyObject())).once() + EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.capture(callbackCapture))).times(2) EasyMock.replay(brokerToController) val scheduler = new MockScheduler(time) val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2) alterIsrManager.start() - for (i <- 0 to 9) { + // First request will send batch of one + alterIsrManager.submit(AlterIsrItem(new TopicPartition(topic, 0), + new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) + + // Other submissions will queue up until a response + for (i <- 1 to 9) { alterIsrManager.submit(AlterIsrItem(new TopicPartition(topic, i), new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) - time.sleep(1) } - time.sleep(50) - scheduler.tick() + // Simulate response, omitting partition 0 will allow it to stay in unsent queue + val alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData()) + val resp = new ClientResponse(null, null, "", 0L, 0L, + false, null, null, alterIsrResp) - // This should not be included in the batch - alterIsrManager.submit(AlterIsrItem(new TopicPartition(topic, 10), - new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) + // On the callback, we check for unsent items and send another request + callbackCapture.getValue.onComplete(resp) EasyMock.verify(brokerToController) + // Verify the last request sent had all 10 items val request = capture.getValue.build() assertEquals(request.data().topics().size(), 1) assertEquals(request.data().topics().get(0).partitions().size(), 10) @@ -141,17 +141,12 @@ class AlterIsrManagerTest { } private def checkTopLevelError(error: Errors): Unit = { - val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => { }, 0)) - val alterIsrManager = testTopLevelError(isrs, error) - // Any top-level error, we want to retry, so we don't clear items from the pending map - assertFalse(alterIsrManager.submit(AlterIsrItem(tp0, null, _ => { }, 0))) - } - - private def testTopLevelError(isrs: Seq[AlterIsrItem], error: Errors): AlterIsrManager = { + val leaderAndIsr = new LeaderAndIsr(1, 1, List(1,2,3), 10) + val isrs = Seq(AlterIsrItem(tp0, leaderAndIsr, _ => { }, 0)) val callbackCapture = EasyMock.newCapture[ControllerRequestCompletionHandler]() EasyMock.expect(brokerToController.start()) - EasyMock.expect(brokerToController.sendRequest(EasyMock.anyObject(), EasyMock.capture(callbackCapture))).once() + EasyMock.expect(brokerToController.sendRequest(EasyMock.anyObject(), EasyMock.capture(callbackCapture))).times(1) EasyMock.replay(brokerToController) val scheduler = new MockScheduler(time) @@ -159,16 +154,33 @@ class AlterIsrManagerTest { alterIsrManager.start() isrs.foreach(alterIsrManager.submit) + EasyMock.verify(brokerToController) + + var alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData().setErrorCode(error.code)) + var resp = new ClientResponse(null, null, "", 0L, 0L, + false, null, null, alterIsrResp) + callbackCapture.getValue.onComplete(resp) + + // Any top-level error, we want to retry, so we don't clear items from the pending map + assertTrue(alterIsrManager.unsentIsrUpdates.containsKey(tp0)) + + EasyMock.reset(brokerToController) + EasyMock.expect(brokerToController.sendRequest(EasyMock.anyObject(), EasyMock.capture(callbackCapture))).times(1) + EasyMock.replay(brokerToController) + + // After some time, we will retry failed requests time.sleep(100) scheduler.tick() - EasyMock.verify(brokerToController) - - val alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData().setErrorCode(error.code)) - val resp = new ClientResponse(null, null, "", 0L, 0L, + // After a successful response, we can submit another AlterIsrItem + alterIsrResp = partitionResponse(tp0, Errors.NONE) + resp = new ClientResponse(null, null, "", 0L, 0L, false, null, null, alterIsrResp) callbackCapture.getValue.onComplete(resp) - alterIsrManager + + EasyMock.verify(brokerToController) + + assertFalse(alterIsrManager.unsentIsrUpdates.containsKey(tp0)) } @Test @@ -189,7 +201,7 @@ class AlterIsrManagerTest { private def checkPartitionError(error: Errors): Unit = { val alterIsrManager = testPartitionError(tp0, error) // Any partition-level error should clear the item from the pending queue allowing for future updates - assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, null, _ => {}, 0))) + assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))) } private def testPartitionError(tp: TopicPartition, error: Errors): AlterIsrManager = { @@ -213,19 +225,10 @@ class AlterIsrManagerTest { alterIsrManager.submit(AlterIsrItem(tp, new LeaderAndIsr(1, 1, List(1,2,3), 10), callback, 0)) - time.sleep(100) - scheduler.tick() - EasyMock.verify(brokerToController) + EasyMock.reset(brokerToController) - val alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData() - .setTopics(Collections.singletonList( - new AlterIsrResponseData.TopicData() - .setName(tp.topic()) - .setPartitions(Collections.singletonList( - new AlterIsrResponseData.PartitionData() - .setPartitionIndex(tp.partition()) - .setErrorCode(error.code)))))) + val alterIsrResp = partitionResponse(tp, error) val resp = new ClientResponse(null, null, "", 0L, 0L, false, null, null, alterIsrResp) callbackCapture.getValue.onComplete(resp) @@ -245,32 +248,24 @@ class AlterIsrManagerTest { val scheduler = new MockScheduler(time) val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2) alterIsrManager.start() - alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) - time.sleep(100) - scheduler.tick() // Triggers a request + // First submit will send the request + alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) - // Enqueue more updates + // These will become pending unsent items alterIsrManager.submit(AlterIsrItem(tp1, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) alterIsrManager.submit(AlterIsrItem(tp2, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) - time.sleep(100) - scheduler.tick() // Trigger the schedule again, but no request this time - EasyMock.verify(brokerToController) - // Even an empty response will clear the in-flight + // Once the callback runs, another request will be sent + EasyMock.reset(brokerToController) + EasyMock.expect(brokerToController.sendRequest(EasyMock.anyObject(), EasyMock.capture(callbackCapture))).once() + EasyMock.replay(brokerToController) val alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData()) val resp = new ClientResponse(null, null, "", 0L, 0L, false, null, null, alterIsrResp) callbackCapture.getValue.onComplete(resp) - - EasyMock.reset(brokerToController) - EasyMock.expect(brokerToController.sendRequest(EasyMock.anyObject(), EasyMock.capture(callbackCapture))).once() - EasyMock.replay(brokerToController) - - time.sleep(100) - scheduler.tick() EasyMock.verify(brokerToController) } @@ -295,21 +290,10 @@ class AlterIsrManagerTest { alterIsrManager.submit(AlterIsrItem(tp1, new LeaderAndIsr(1, 1, List(1,2,3), 10), callback, 0)) alterIsrManager.submit(AlterIsrItem(tp2, new LeaderAndIsr(1, 1, List(1,2,3), 10), callback, 0)) - - time.sleep(100) - scheduler.tick() - EasyMock.verify(brokerToController) // Three partitions were sent, but only one returned - val alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData() - .setTopics(Collections.singletonList( - new AlterIsrResponseData.TopicData() - .setName(tp0.topic()) - .setPartitions(Collections.singletonList( - new AlterIsrResponseData.PartitionData() - .setPartitionIndex(tp0.partition()) - .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())))))) + val alterIsrResp = partitionResponse(tp0, Errors.UNKNOWN_SERVER_ERROR) val resp = new ClientResponse(null, null, "", 0L, 0L, false, null, null, alterIsrResp) callbackCapture.getValue.onComplete(resp) @@ -345,4 +329,15 @@ class AlterIsrManagerTest { assertTrue(zkIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 3), expectMatch(Left(Errors.INVALID_UPDATE_VERSION)), 0))) } + + private def partitionResponse(tp: TopicPartition, error: Errors): AlterIsrResponse = { + new AlterIsrResponse(new AlterIsrResponseData() + .setTopics(Collections.singletonList( + new AlterIsrResponseData.TopicData() + .setName(tp.topic()) + .setPartitions(Collections.singletonList( + new AlterIsrResponseData.PartitionData() + .setPartitionIndex(tp.partition()) + .setErrorCode(error.code)))))) + } }