From 430ce4b8ae5bf44263a4bd32c803f12008dcf79e Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 14 Dec 2020 12:29:45 -0500 Subject: [PATCH 1/8] Only schedule AlterIsr thread when we have an ISR change --- .../scala/kafka/server/AlterIsrManager.scala | 22 ++++++++++++++----- .../kafka/server/AlterIsrManagerTest.scala | 4 ++++ 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala index 16fe620dee2d7..84300123ad213 100644 --- a/core/src/main/scala/kafka/server/AlterIsrManager.scala +++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala @@ -19,7 +19,6 @@ package kafka.server import java.util import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import java.util.concurrent.{ConcurrentHashMap, TimeUnit} - import kafka.api.LeaderAndIsr import kafka.metrics.KafkaMetricsGroup import kafka.utils.{Logging, Scheduler} @@ -62,12 +61,19 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne private val lastIsrPropagationMs = new AtomicLong(0) - override def start(): Unit = { - scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS) - } + override def start(): Unit = { } override def enqueue(alterIsrItem: AlterIsrItem): Boolean = { - unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null + if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) { + if (inflightRequest.compareAndSet(false, true)) { + // optimistically set the inflight flag even though we haven't sent the request yet + scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS) + } + true + } else { + false + } + } override def clearPending(topicPartition: TopicPartition): Unit = { @@ -75,7 +81,8 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne } private def propagateIsrChanges(): Unit = { - if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, true)) { + // Updates could have been cleared by new LeaderAndIsr, so check again + if (!unsentIsrUpdates.isEmpty) { // Copy current unsent ISRs but don't remove from the map val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]() unsentIsrUpdates.values().forEach(item => inflightAlterIsrItems.append(item)) @@ -83,6 +90,9 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne val now = time.milliseconds() lastIsrPropagationMs.set(now) sendRequest(inflightAlterIsrItems.toSeq) + } else { + // Never sent a request, so clear the flag + inflightRequest.set(false) } } diff --git a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala index 1f451fef090c1..68c738597b6fa 100644 --- a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala @@ -254,6 +254,10 @@ class AlterIsrManagerTest { EasyMock.expect(brokerToController.sendRequest(EasyMock.anyObject(), EasyMock.capture(callbackCapture), EasyMock.eq(requestTimeout))).once() EasyMock.replay(brokerToController) + // Need to re-enqueue again to trigger the thread to be scheduled + alterIsrManager.clearPending(tp2) + alterIsrManager.enqueue(AlterIsrItem(tp2, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {})) + time.sleep(100) scheduler.tick() EasyMock.verify(brokerToController) From 7463b2c5da59fa2b8c09a61a9fcef4afe603de65 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Thu, 7 Jan 2021 14:39:56 -0500 Subject: [PATCH 2/8] Don't delay, make sure to submit any waiting items in callback --- .../scala/kafka/server/AlterIsrManager.scala | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala index 13490db548d3f..a3c5b5e82db21 100644 --- a/core/src/main/scala/kafka/server/AlterIsrManager.scala +++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala @@ -18,7 +18,7 @@ package kafka.server import java.util import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} -import java.util.concurrent.{ConcurrentHashMap, TimeUnit} +import java.util.concurrent.ConcurrentHashMap import kafka.api.LeaderAndIsr import kafka.metrics.KafkaMetricsGroup import kafka.utils.{KafkaScheduler, Logging, Scheduler} @@ -131,7 +131,7 @@ class DefaultAlterIsrManager( if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) { if (inflightRequest.compareAndSet(false, true)) { // optimistically set the inflight flag even though we haven't sent the request yet - scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS) + scheduler.schedule("send-alter-isr", propagateIsrChanges) } true } else { @@ -162,13 +162,6 @@ class DefaultAlterIsrManager( private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = { val message = buildRequest(inflightAlterIsrItems) - def clearInflightRequests(): Unit = { - // Be sure to clear the in-flight flag to allow future AlterIsr requests - if (!inflightRequest.compareAndSet(true, false)) { - throw new IllegalStateException("AlterIsr response callback called when no requests were in flight") - } - } - debug(s"Sending AlterIsr to controller $message") // We will not timeout AlterISR request, instead letting it retry indefinitely @@ -182,7 +175,11 @@ class DefaultAlterIsrManager( val body = response.responseBody().asInstanceOf[AlterIsrResponse] handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems) } finally { - clearInflightRequests() + // Make sure we send any waiting items or clear the inflight flag + if (!inflightRequest.get()) { + throw new IllegalStateException("AlterIsr response callback called when no requests were in flight") + } + propagateIsrChanges() } } From 261670e909334bf6c7888c96f2a0b3ee9097974b Mon Sep 17 00:00:00 2001 From: David Arthur Date: Fri, 8 Jan 2021 11:23:38 -0500 Subject: [PATCH 3/8] Re-send request with delay on top-level errors --- .../scala/kafka/server/AlterIsrManager.scala | 29 ++++++++++--------- .../kafka/server/AlterIsrManagerTest.scala | 9 +++--- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala index a3c5b5e82db21..202defa61fcb1 100644 --- a/core/src/main/scala/kafka/server/AlterIsrManager.scala +++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala @@ -18,7 +18,7 @@ package kafka.server import java.util import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import kafka.api.LeaderAndIsr import kafka.metrics.KafkaMetricsGroup import kafka.utils.{KafkaScheduler, Logging, Scheduler} @@ -131,7 +131,7 @@ class DefaultAlterIsrManager( if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) { if (inflightRequest.compareAndSet(false, true)) { // optimistically set the inflight flag even though we haven't sent the request yet - scheduler.schedule("send-alter-isr", propagateIsrChanges) + scheduler.schedule("send-alter-isr", propagateIsrChanges, 1, -1, TimeUnit.MILLISECONDS) } true } else { @@ -154,7 +154,7 @@ class DefaultAlterIsrManager( lastIsrPropagationMs.set(now) sendRequest(inflightAlterIsrItems.toSeq) } else { - // Never sent a request, so clear the flag + // No request was sent, so clear the flag inflightRequest.set(false) } } @@ -170,16 +170,15 @@ class DefaultAlterIsrManager( controllerChannelManager.sendRequest(new AlterIsrRequest.Builder(message), new ControllerRequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { - try { - debug(s"Received AlterIsr response $response") - val body = response.responseBody().asInstanceOf[AlterIsrResponse] - handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems) - } finally { - // Make sure we send any waiting items or clear the inflight flag - if (!inflightRequest.get()) { - throw new IllegalStateException("AlterIsr response callback called when no requests were in flight") - } - propagateIsrChanges() + debug(s"Received AlterIsr response $response") + val body = response.responseBody().asInstanceOf[AlterIsrResponse] + handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems) match { + case Errors.NONE => + // In the normal case, check for pending updates to send immediately + scheduler.schedule("send-alter-isr", propagateIsrChanges, 1, -1, TimeUnit.MILLISECONDS) + case _ => + // If we received a top-level error from the controller, retry a request in the near future + scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS) } } @@ -214,7 +213,7 @@ class DefaultAlterIsrManager( def handleAlterIsrResponse(alterIsrResponse: AlterIsrResponse, sentBrokerEpoch: Long, - inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = { + inflightAlterIsrItems: Seq[AlterIsrItem]): Errors = { val data: AlterIsrResponseData = alterIsrResponse.data Errors.forCode(data.errorCode) match { @@ -261,5 +260,7 @@ class DefaultAlterIsrManager( case e: Errors => warn(s"Controller returned an unexpected top-level error when handling AlterIsr request: $e") } + + Errors.forCode(data.errorCode) } } diff --git a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala index 4e317affdbe27..f6710a17e1e08 100644 --- a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala @@ -108,10 +108,9 @@ class AlterIsrManagerTest { for (i <- 0 to 9) { alterIsrManager.submit(AlterIsrItem(new TopicPartition(topic, i), new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) - time.sleep(1) } - time.sleep(50) + time.sleep(1) scheduler.tick() // This should not be included in the batch @@ -161,7 +160,7 @@ class AlterIsrManagerTest { alterIsrManager.start() isrs.foreach(alterIsrManager.submit) - time.sleep(100) + time.sleep(1) scheduler.tick() EasyMock.verify(brokerToController) @@ -238,14 +237,14 @@ class AlterIsrManagerTest { alterIsrManager.start() alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) - time.sleep(100) + time.sleep(1) scheduler.tick() // Triggers a request // Enqueue more updates alterIsrManager.submit(AlterIsrItem(tp1, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) alterIsrManager.submit(AlterIsrItem(tp2, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) - time.sleep(100) + time.sleep(1) scheduler.tick() // Trigger the schedule again, but no request this time EasyMock.verify(brokerToController) From 6345f632515c761af5871def8b86d5a45b862747 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Fri, 8 Jan 2021 15:44:19 -0500 Subject: [PATCH 4/8] Use a read/write lock to guard the inflight request flag and the collection of unsent items --- .../scala/kafka/server/AlterIsrManager.scala | 61 +++++++++++-------- 1 file changed, 35 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala index 202defa61fcb1..97dc0c9e065ad 100644 --- a/core/src/main/scala/kafka/server/AlterIsrManager.scala +++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala @@ -17,10 +17,10 @@ package kafka.server import java.util -import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import kafka.api.LeaderAndIsr import kafka.metrics.KafkaMetricsGroup +import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils.{KafkaScheduler, Logging, Scheduler} import kafka.zk.KafkaZkClient import org.apache.kafka.clients.ClientResponse @@ -31,6 +31,7 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} import org.apache.kafka.common.utils.Time +import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ @@ -115,9 +116,12 @@ class DefaultAlterIsrManager( private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]() // Used to allow only one in-flight request at a time - private val inflightRequest: AtomicBoolean = new AtomicBoolean(false) + @volatile + private var inflightRequest: Boolean = false - private val lastIsrPropagationMs = new AtomicLong(0) + // Protects the updates of the inflight flag and prevents new pending items from being submitted while we are + // preparing a request + private val inflightLock: ReentrantReadWriteLock = new ReentrantReadWriteLock() override def start(): Unit = { controllerChannelManager.start() @@ -128,34 +132,39 @@ class DefaultAlterIsrManager( } override def submit(alterIsrItem: AlterIsrItem): Boolean = { - if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) { - if (inflightRequest.compareAndSet(false, true)) { - // optimistically set the inflight flag even though we haven't sent the request yet - scheduler.schedule("send-alter-isr", propagateIsrChanges, 1, -1, TimeUnit.MILLISECONDS) + val (didSubmit, needsPropagate) = inReadLock(inflightLock) { + if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) { + (true, !inflightRequest) + } else { + (false, false) } - true - } else { - false } + if (needsPropagate) { + propagateIsrChanges(true) + } + didSubmit } override def clearPending(topicPartition: TopicPartition): Unit = { unsentIsrUpdates.remove(topicPartition) } - private def propagateIsrChanges(): Unit = { - // Updates could have been cleared by new LeaderAndIsr, so check again - if (!unsentIsrUpdates.isEmpty) { - // Copy current unsent ISRs but don't remove from the map - val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]() - unsentIsrUpdates.values().forEach(item => inflightAlterIsrItems.append(item)) + private def propagateIsrChanges(checkInflight: Boolean): Unit = inWriteLock(inflightLock) { + // If we're checking for unsent items in the response handler, we ignore the inflight flag and send a request if + // there are any unsent items. Otherwise, we got here from the submit(AlterIsrItem) method and need to check that + // there is not an inflight request. + if (!checkInflight || !inflightRequest) { + if (!unsentIsrUpdates.isEmpty) { + // Copy current unsent ISRs but don't remove from the map + val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]() + unsentIsrUpdates.values().forEach(item => inflightAlterIsrItems.append(item)) - val now = time.milliseconds() - lastIsrPropagationMs.set(now) - sendRequest(inflightAlterIsrItems.toSeq) - } else { - // No request was sent, so clear the flag - inflightRequest.set(false) + sendRequest(inflightAlterIsrItems.toSeq) + inflightRequest = true + } else { + // No items were pending, so no request was sent -- clear the inflight flag + inflightRequest = false + } } } @@ -166,7 +175,7 @@ class DefaultAlterIsrManager( // We will not timeout AlterISR request, instead letting it retry indefinitely // until a response is received, or a new LeaderAndIsr overwrites the existing isrState - // which causes the inflight requests to be ignored. + // which causes the response for those partitions to be ignored. controllerChannelManager.sendRequest(new AlterIsrRequest.Builder(message), new ControllerRequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { @@ -175,10 +184,10 @@ class DefaultAlterIsrManager( handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems) match { case Errors.NONE => // In the normal case, check for pending updates to send immediately - scheduler.schedule("send-alter-isr", propagateIsrChanges, 1, -1, TimeUnit.MILLISECONDS) + propagateIsrChanges(false) case _ => - // If we received a top-level error from the controller, retry a request in the near future - scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS) + // If we received a top-level error from the controller, retry the request in the near future + scheduler.schedule("send-alter-isr", () => propagateIsrChanges(false), 50, -1, TimeUnit.MILLISECONDS) } } From 9f1d3d9af0b42ac0fe76ae516493f53a164986b3 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 11 Jan 2021 12:25:59 -0500 Subject: [PATCH 5/8] Simplify locking and fixup tests --- .../scala/kafka/server/AlterIsrManager.scala | 55 +++++++++--------- .../kafka/server/AlterIsrManagerTest.scala | 56 +++++++++---------- 2 files changed, 51 insertions(+), 60 deletions(-) diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala index 97dc0c9e065ad..db515fb9bfcf9 100644 --- a/core/src/main/scala/kafka/server/AlterIsrManager.scala +++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala @@ -20,7 +20,7 @@ import java.util import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import kafka.api.LeaderAndIsr import kafka.metrics.KafkaMetricsGroup -import kafka.utils.CoreUtils.{inReadLock, inWriteLock} +import kafka.utils.CoreUtils.inLock import kafka.utils.{KafkaScheduler, Logging, Scheduler} import kafka.zk.KafkaZkClient import org.apache.kafka.clients.ClientResponse @@ -31,7 +31,7 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} import org.apache.kafka.common.utils.Time -import java.util.concurrent.locks.ReentrantReadWriteLock +import java.util.concurrent.locks.ReentrantLock import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ @@ -119,9 +119,9 @@ class DefaultAlterIsrManager( @volatile private var inflightRequest: Boolean = false - // Protects the updates of the inflight flag and prevents new pending items from being submitted while we are + // Protect updates of the inflight flag and prevent additional pending items from being submitted while we are // preparing a request - private val inflightLock: ReentrantReadWriteLock = new ReentrantReadWriteLock() + private val inflightLock: ReentrantLock = new ReentrantLock() override def start(): Unit = { controllerChannelManager.start() @@ -132,45 +132,40 @@ class DefaultAlterIsrManager( } override def submit(alterIsrItem: AlterIsrItem): Boolean = { - val (didSubmit, needsPropagate) = inReadLock(inflightLock) { + inLock(inflightLock) { if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) { - (true, !inflightRequest) + maybePropagateIsrChanges() + true } else { - (false, false) + false } } - if (needsPropagate) { - propagateIsrChanges(true) - } - didSubmit } override def clearPending(topicPartition: TopicPartition): Unit = { unsentIsrUpdates.remove(topicPartition) } - private def propagateIsrChanges(checkInflight: Boolean): Unit = inWriteLock(inflightLock) { - // If we're checking for unsent items in the response handler, we ignore the inflight flag and send a request if - // there are any unsent items. Otherwise, we got here from the submit(AlterIsrItem) method and need to check that - // there is not an inflight request. - if (!checkInflight || !inflightRequest) { - if (!unsentIsrUpdates.isEmpty) { - // Copy current unsent ISRs but don't remove from the map - val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]() - unsentIsrUpdates.values().forEach(item => inflightAlterIsrItems.append(item)) + private[server] def maybePropagateIsrChanges(): Unit = inLock(inflightLock) { + // Send all pending items if there is not already a request in-flight. + if (!inflightRequest && !unsentIsrUpdates.isEmpty) { + // Copy current unsent ISRs but don't remove from the map, they get cleared in the response handler + val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]() + unsentIsrUpdates.values().forEach(item => inflightAlterIsrItems.append(item)) + sendRequest(inflightAlterIsrItems.toSeq) + inflightRequest = true + } + } - sendRequest(inflightAlterIsrItems.toSeq) - inflightRequest = true - } else { - // No items were pending, so no request was sent -- clear the inflight flag - inflightRequest = false - } + private[server] def clearInFlightRequest(): Unit = inLock(inflightLock) { + if (!inflightRequest) { + warn("Attempting to clear AlterIsr in-flight flag when no apparent request is in-flight") } + inflightRequest = false } private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = { val message = buildRequest(inflightAlterIsrItems) - debug(s"Sending AlterIsr to controller $message") // We will not timeout AlterISR request, instead letting it retry indefinitely @@ -184,10 +179,12 @@ class DefaultAlterIsrManager( handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems) match { case Errors.NONE => // In the normal case, check for pending updates to send immediately - propagateIsrChanges(false) + clearInFlightRequest() + maybePropagateIsrChanges() case _ => // If we received a top-level error from the controller, retry the request in the near future - scheduler.schedule("send-alter-isr", () => propagateIsrChanges(false), 50, -1, TimeUnit.MILLISECONDS) + clearInFlightRequest() + scheduler.schedule("send-alter-isr", () => maybePropagateIsrChanges(), 50, -1, TimeUnit.MILLISECONDS) } } diff --git a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala index f6710a17e1e08..0b74408c97193 100644 --- a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala @@ -34,7 +34,6 @@ import org.junit.{Before, Test} import org.mockito.ArgumentMatchers.{any, anyString} import org.mockito.{ArgumentMatchers, Mockito} - class AlterIsrManagerTest { val topic = "test-topic" @@ -97,28 +96,37 @@ class AlterIsrManagerTest { @Test def testSingleBatch(): Unit = { val capture = EasyMock.newCapture[AbstractRequest.Builder[AlterIsrRequest]]() + val callbackCapture = EasyMock.newCapture[ControllerRequestCompletionHandler]() + EasyMock.expect(brokerToController.start()) - EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.anyObject())).once() + EasyMock.expect(brokerToController.sendRequest(EasyMock.capture(capture), EasyMock.capture(callbackCapture))).times(2) EasyMock.replay(brokerToController) val scheduler = new MockScheduler(time) val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2) alterIsrManager.start() - for (i <- 0 to 9) { + // First request will send batch of one + alterIsrManager.submit(AlterIsrItem(new TopicPartition(topic, 0), + new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) + + // Other submissions will queue up until a response + for (i <- 1 to 9) { alterIsrManager.submit(AlterIsrItem(new TopicPartition(topic, i), new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) } - time.sleep(1) - scheduler.tick() + // Simulate response, omitting partition 0 will allow it to stay in unsent queue + val alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData()) + val resp = new ClientResponse(null, null, "", 0L, 0L, + false, null, null, alterIsrResp) - // This should not be included in the batch - alterIsrManager.submit(AlterIsrItem(new TopicPartition(topic, 10), - new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) + // On the callback, we check for unsent items and send another request + callbackCapture.getValue.onComplete(resp) EasyMock.verify(brokerToController) + // Verify the last request sent had all 10 items val request = capture.getValue.build() assertEquals(request.data().topics().size(), 1) assertEquals(request.data().topics().get(0).partitions().size(), 10) @@ -178,7 +186,7 @@ class AlterIsrManagerTest { errors.foreach(error => { val alterIsrManager = testPartitionError(tp0, error) // Any partition-level error should clear the item from the pending queue allowing for future updates - assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, null, _ => { }, 0))) + assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => { }, 0))) }) } @@ -203,10 +211,8 @@ class AlterIsrManagerTest { alterIsrManager.submit(AlterIsrItem(tp, new LeaderAndIsr(1, 1, List(1,2,3), 10), callback, 0)) - time.sleep(100) - scheduler.tick() - EasyMock.verify(brokerToController) + EasyMock.reset(brokerToController) val alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData() .setTopics(Collections.singletonList( @@ -235,36 +241,24 @@ class AlterIsrManagerTest { val scheduler = new MockScheduler(time) val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2) alterIsrManager.start() - alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) - time.sleep(1) - scheduler.tick() // Triggers a request + // First submit will send the request + alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) - // Enqueue more updates + // These will become pending unsent items alterIsrManager.submit(AlterIsrItem(tp1, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) alterIsrManager.submit(AlterIsrItem(tp2, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) - time.sleep(1) - scheduler.tick() // Trigger the schedule again, but no request this time - EasyMock.verify(brokerToController) - // Even an empty response will clear the in-flight + // Once the callback runs, another request will be sent + EasyMock.reset(brokerToController) + EasyMock.expect(brokerToController.sendRequest(EasyMock.anyObject(), EasyMock.capture(callbackCapture))).once() + EasyMock.replay(brokerToController) val alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData()) val resp = new ClientResponse(null, null, "", 0L, 0L, false, null, null, alterIsrResp) callbackCapture.getValue.onComplete(resp) - - EasyMock.reset(brokerToController) - EasyMock.expect(brokerToController.sendRequest(EasyMock.anyObject(), EasyMock.capture(callbackCapture))).once() - EasyMock.replay(brokerToController) - - // Need to re-enqueue again to trigger the thread to be scheduled - alterIsrManager.clearPending(tp2) - alterIsrManager.submit(AlterIsrItem(tp2, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) - - time.sleep(100) - scheduler.tick() EasyMock.verify(brokerToController) } From 303a2b4ea7733e50d337a8a38fe671085912280b Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 12 Jan 2021 09:12:25 -0500 Subject: [PATCH 6/8] Revert back to a simple AtmoicBoolean --- .../scala/kafka/server/AlterIsrManager.scala | 55 ++++++++----------- 1 file changed, 24 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala index db515fb9bfcf9..906d62e6877d5 100644 --- a/core/src/main/scala/kafka/server/AlterIsrManager.scala +++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala @@ -20,7 +20,6 @@ import java.util import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import kafka.api.LeaderAndIsr import kafka.metrics.KafkaMetricsGroup -import kafka.utils.CoreUtils.inLock import kafka.utils.{KafkaScheduler, Logging, Scheduler} import kafka.zk.KafkaZkClient import org.apache.kafka.clients.ClientResponse @@ -31,7 +30,7 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} import org.apache.kafka.common.utils.Time -import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ @@ -116,12 +115,7 @@ class DefaultAlterIsrManager( private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]() // Used to allow only one in-flight request at a time - @volatile - private var inflightRequest: Boolean = false - - // Protect updates of the inflight flag and prevent additional pending items from being submitted while we are - // preparing a request - private val inflightLock: ReentrantLock = new ReentrantLock() + private val inflightRequest: AtomicBoolean = new AtomicBoolean(false) override def start(): Unit = { controllerChannelManager.start() @@ -132,36 +126,29 @@ class DefaultAlterIsrManager( } override def submit(alterIsrItem: AlterIsrItem): Boolean = { - inLock(inflightLock) { - if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) { - maybePropagateIsrChanges() - true - } else { - false - } - } + val enqueued = unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null + maybePropagateIsrChanges() + enqueued } override def clearPending(topicPartition: TopicPartition): Unit = { unsentIsrUpdates.remove(topicPartition) } - private[server] def maybePropagateIsrChanges(): Unit = inLock(inflightLock) { + private[server] def maybePropagateIsrChanges(): Unit = { // Send all pending items if there is not already a request in-flight. - if (!inflightRequest && !unsentIsrUpdates.isEmpty) { + if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, true)) { // Copy current unsent ISRs but don't remove from the map, they get cleared in the response handler val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]() unsentIsrUpdates.values().forEach(item => inflightAlterIsrItems.append(item)) sendRequest(inflightAlterIsrItems.toSeq) - inflightRequest = true } } - private[server] def clearInFlightRequest(): Unit = inLock(inflightLock) { - if (!inflightRequest) { + private[server] def clearInFlightRequest(): Unit = { + if(!inflightRequest.compareAndSet(true, false)) { warn("Attempting to clear AlterIsr in-flight flag when no apparent request is in-flight") } - inflightRequest = false } private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = { @@ -176,16 +163,22 @@ class DefaultAlterIsrManager( override def onComplete(response: ClientResponse): Unit = { debug(s"Received AlterIsr response $response") val body = response.responseBody().asInstanceOf[AlterIsrResponse] - handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems) match { - case Errors.NONE => - // In the normal case, check for pending updates to send immediately - clearInFlightRequest() - maybePropagateIsrChanges() - case _ => - // If we received a top-level error from the controller, retry the request in the near future - clearInFlightRequest() - scheduler.schedule("send-alter-isr", () => maybePropagateIsrChanges(), 50, -1, TimeUnit.MILLISECONDS) + val error = try { + handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems) + } finally { + // clear the flag so future requests can proceed + clearInFlightRequest() } + + // check if we need to send another request right away + error match { + case Errors.NONE => + // In the normal case, check for pending updates to send immediately + maybePropagateIsrChanges() + case _ => + // If we received a top-level error from the controller, retry the request in the near future + scheduler.schedule("send-alter-isr", () => maybePropagateIsrChanges(), 50, -1, TimeUnit.MILLISECONDS) + } } override def onTimeout(): Unit = { From 9d44d25d7d2e72c374fa777abc1f738a970c9861 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 12 Jan 2021 12:15:31 -0500 Subject: [PATCH 7/8] Fixup unit tests --- .../scala/kafka/server/AlterIsrManager.scala | 4 +- .../kafka/server/AlterIsrManagerTest.scala | 80 +++++++++---------- 2 files changed, 41 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala index 906d62e6877d5..a0f35e65577e8 100644 --- a/core/src/main/scala/kafka/server/AlterIsrManager.scala +++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala @@ -111,8 +111,8 @@ class DefaultAlterIsrManager( val brokerEpochSupplier: () => Long ) extends AlterIsrManager with Logging with KafkaMetricsGroup { - // Used to allow only one pending ISR update per partition - private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]() + // Used to allow only one pending ISR update per partition (visible for testing) + private[server] val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]() // Used to allow only one in-flight request at a time private val inflightRequest: AtomicBoolean = new AtomicBoolean(false) diff --git a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala index 8617b90f6a0ee..b1336d317b3d9 100644 --- a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala @@ -62,9 +62,6 @@ class AlterIsrManagerTest { val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2) alterIsrManager.start() alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)) - time.sleep(50) - scheduler.tick() - EasyMock.verify(brokerToController) } @@ -82,10 +79,6 @@ class AlterIsrManagerTest { // Only send one ISR update for a given topic+partition assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))) assertFalse(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2), 10), _ => {}, 0))) - - time.sleep(50) - scheduler.tick() - EasyMock.verify(brokerToController) val request = capture.getValue.build() @@ -148,17 +141,12 @@ class AlterIsrManagerTest { } private def checkTopLevelError(error: Errors): Unit = { - val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => { }, 0)) - val alterIsrManager = testTopLevelError(isrs, error) - // Any top-level error, we want to retry, so we don't clear items from the pending map - assertFalse(alterIsrManager.submit(AlterIsrItem(tp0, null, _ => { }, 0))) - } - - private def testTopLevelError(isrs: Seq[AlterIsrItem], error: Errors): AlterIsrManager = { + val leaderAndIsr = new LeaderAndIsr(1, 1, List(1,2,3), 10) + val isrs = Seq(AlterIsrItem(tp0, leaderAndIsr, _ => { }, 0)) val callbackCapture = EasyMock.newCapture[ControllerRequestCompletionHandler]() EasyMock.expect(brokerToController.start()) - EasyMock.expect(brokerToController.sendRequest(EasyMock.anyObject(), EasyMock.capture(callbackCapture))).once() + EasyMock.expect(brokerToController.sendRequest(EasyMock.anyObject(), EasyMock.capture(callbackCapture))).times(1) EasyMock.replay(brokerToController) val scheduler = new MockScheduler(time) @@ -166,16 +154,33 @@ class AlterIsrManagerTest { alterIsrManager.start() isrs.foreach(alterIsrManager.submit) - time.sleep(1) - scheduler.tick() - EasyMock.verify(brokerToController) - val alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData().setErrorCode(error.code)) - val resp = new ClientResponse(null, null, "", 0L, 0L, + var alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData().setErrorCode(error.code)) + var resp = new ClientResponse(null, null, "", 0L, 0L, false, null, null, alterIsrResp) callbackCapture.getValue.onComplete(resp) - alterIsrManager + + // Any top-level error, we want to retry, so we don't clear items from the pending map + assertTrue(alterIsrManager.unsentIsrUpdates.containsKey(tp0)) + + EasyMock.reset(brokerToController) + EasyMock.expect(brokerToController.sendRequest(EasyMock.anyObject(), EasyMock.capture(callbackCapture))).times(1) + EasyMock.replay(brokerToController) + + // After some time, we will retry failed requests + time.sleep(100) + scheduler.tick() + + // After a successful response, we can submit another AlterIsrItem + alterIsrResp = partitionResponse(tp0, Errors.NONE) + resp = new ClientResponse(null, null, "", 0L, 0L, + false, null, null, alterIsrResp) + callbackCapture.getValue.onComplete(resp) + + EasyMock.verify(brokerToController) + + assertFalse(alterIsrManager.unsentIsrUpdates.containsKey(tp0)) } @Test @@ -223,14 +228,7 @@ class AlterIsrManagerTest { EasyMock.verify(brokerToController) EasyMock.reset(brokerToController) - val alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData() - .setTopics(Collections.singletonList( - new AlterIsrResponseData.TopicData() - .setName(tp.topic()) - .setPartitions(Collections.singletonList( - new AlterIsrResponseData.PartitionData() - .setPartitionIndex(tp.partition()) - .setErrorCode(error.code)))))) + val alterIsrResp = partitionResponse(tp, error) val resp = new ClientResponse(null, null, "", 0L, 0L, false, null, null, alterIsrResp) callbackCapture.getValue.onComplete(resp) @@ -292,21 +290,10 @@ class AlterIsrManagerTest { alterIsrManager.submit(AlterIsrItem(tp1, new LeaderAndIsr(1, 1, List(1,2,3), 10), callback, 0)) alterIsrManager.submit(AlterIsrItem(tp2, new LeaderAndIsr(1, 1, List(1,2,3), 10), callback, 0)) - - time.sleep(100) - scheduler.tick() - EasyMock.verify(brokerToController) // Three partitions were sent, but only one returned - val alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData() - .setTopics(Collections.singletonList( - new AlterIsrResponseData.TopicData() - .setName(tp0.topic()) - .setPartitions(Collections.singletonList( - new AlterIsrResponseData.PartitionData() - .setPartitionIndex(tp0.partition()) - .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())))))) + val alterIsrResp = partitionResponse(tp0, Errors.UNKNOWN_SERVER_ERROR) val resp = new ClientResponse(null, null, "", 0L, 0L, false, null, null, alterIsrResp) callbackCapture.getValue.onComplete(resp) @@ -342,4 +329,15 @@ class AlterIsrManagerTest { assertTrue(zkIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 3), expectMatch(Left(Errors.INVALID_UPDATE_VERSION)), 0))) } + + private def partitionResponse(tp: TopicPartition, error: Errors): AlterIsrResponse = { + new AlterIsrResponse(new AlterIsrResponseData() + .setTopics(Collections.singletonList( + new AlterIsrResponseData.TopicData() + .setName(tp.topic()) + .setPartitions(Collections.singletonList( + new AlterIsrResponseData.PartitionData() + .setPartitionIndex(tp.partition()) + .setErrorCode(error.code)))))) + } } From 70b20ce03d0d77577bf7828e971a590a967e367e Mon Sep 17 00:00:00 2001 From: David Arthur Date: Wed, 13 Jan 2021 13:49:27 -0500 Subject: [PATCH 8/8] Fix spacing --- core/src/main/scala/kafka/server/AlterIsrManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala index a0f35e65577e8..df308d4a0cc0c 100644 --- a/core/src/main/scala/kafka/server/AlterIsrManager.scala +++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala @@ -146,7 +146,7 @@ class DefaultAlterIsrManager( } private[server] def clearInFlightRequest(): Unit = { - if(!inflightRequest.compareAndSet(true, false)) { + if (!inflightRequest.compareAndSet(true, false)) { warn("Attempting to clear AlterIsr in-flight flag when no apparent request is in-flight") } }