diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index cb20946acd4fa..cdf36cf526b13 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2409,10 +2409,9 @@ class KafkaApis(val requestChannel: RequestChannel, // if the controller hasn't been upgraded to use KIP-380 if (brokerEpochInRequest == AbstractControlRequest.UNKNOWN_BROKER_EPOCH) false else { - val curBrokerEpoch = controller.brokerEpoch - if (brokerEpochInRequest < curBrokerEpoch) true - else if (brokerEpochInRequest == curBrokerEpoch) false - else throw new IllegalStateException(s"Epoch $brokerEpochInRequest larger than current broker epoch $curBrokerEpoch") + // brokerEpochInRequest > controller.brokerEpoch is possible in rare scenarios where the controller gets notified + // about the new broker epoch and sends a control request with this epoch before the broker learns about it + brokerEpochInRequest < controller.brokerEpoch } } diff --git a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala index d35b99712c494..1b60f97228838 100755 --- a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala @@ -93,15 +93,20 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness { @Test def testControlRequestWithCorrectBrokerEpoch() { - testControlRequestWithBrokerEpoch(false) + testControlRequestWithBrokerEpoch(0) } @Test def testControlRequestWithStaleBrokerEpoch() { - testControlRequestWithBrokerEpoch(true) + testControlRequestWithBrokerEpoch(-1) } - private def testControlRequestWithBrokerEpoch(isEpochInRequestStale: Boolean) { + @Test + def testControlRequestWithNewerBrokerEpoch(): Unit = { + testControlRequestWithBrokerEpoch(1) + } + + private def testControlRequestWithBrokerEpoch(epochInRequestDiffFromCurrentEpoch: Long) { val tp = new TopicPartition("new-topic", 0) // create topic with 1 partition, 2 replicas, one on each broker @@ -126,8 +131,7 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness { controllerChannelManager.startup() val broker2 = servers(brokerId2) - val epochInRequest = - if (isEpochInRequestStale) broker2.kafkaController.brokerEpoch - 1 else broker2.kafkaController.brokerEpoch + val epochInRequest = broker2.kafkaController.brokerEpoch + epochInRequestDiffFromCurrentEpoch try { // Send LeaderAndIsr request with correct broker epoch @@ -142,10 +146,12 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness { epochInRequest, partitionStates.asJava, nodes.toSet.asJava) - if (isEpochInRequestStale) { + if (epochInRequestDiffFromCurrentEpoch < 0) { + // stale broker epoch in LEADER_AND_ISR sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, ApiKeys.LEADER_AND_ISR, requestBuilder) } else { + // broker epoch in LEADER_AND_ISR >= current broker epoch sendAndVerifySuccessfulResponse(controllerChannelManager, ApiKeys.LEADER_AND_ISR, requestBuilder) TestUtils.waitUntilLeaderIsKnown(Seq(broker2), tp, 10000) } @@ -171,10 +177,12 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness { epochInRequest, partitionStates.asJava, liverBrokers.toSet.asJava) - if (isEpochInRequestStale) { + if (epochInRequestDiffFromCurrentEpoch < 0) { + // stale broker epoch in UPDATE_METADATA sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, ApiKeys.UPDATE_METADATA, requestBuilder) } else { + // broker epoch in UPDATE_METADATA >= current broker epoch sendAndVerifySuccessfulResponse(controllerChannelManager, ApiKeys.UPDATE_METADATA, requestBuilder) TestUtils.waitUntilMetadataIsPropagated(Seq(broker2), tp.topic(), tp.partition(), 10000) assertEquals(brokerId2, @@ -189,10 +197,12 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness { epochInRequest, // Correct broker epoch true, Set(tp).asJava) - if (isEpochInRequestStale) { + if (epochInRequestDiffFromCurrentEpoch < 0) { + // stale broker epoch in STOP_REPLICA sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, ApiKeys.STOP_REPLICA, requestBuilder) } else { + // broker epoch in STOP_REPLICA >= current broker epoch sendAndVerifySuccessfulResponse(controllerChannelManager, ApiKeys.STOP_REPLICA, requestBuilder) assertTrue(broker2.replicaManager.getPartition(tp).isEmpty) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 249c171f10a66..1810d47b920f6 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -34,7 +34,7 @@ import kafka.security.auth.Authorizer import kafka.server.QuotaFactory.QuotaManagers import kafka.utils.{MockTime, TestUtils} import kafka.zk.KafkaZkClient -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.errors.UnsupportedVersionException import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.metrics.Metrics @@ -53,7 +53,7 @@ import org.junit.Assert.{assertEquals, assertNull, assertTrue} import org.junit.{After, Test} import scala.collection.JavaConverters._ -import scala.collection.Map +import scala.collection.{mutable, Map} class KafkaApisTest { @@ -259,7 +259,7 @@ class KafkaApisTest { val expectedErrors = Map(tp1 -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, tp2 -> Errors.NONE).asJava val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture() - val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() + val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() EasyMock.expect(replicaManager.getMagic(tp1)) .andReturn(Some(RecordBatch.MAGIC_VALUE_V1)) @@ -496,6 +496,157 @@ class KafkaApisTest { assertNull(partitionData.abortedTransactions) } + @Test + def testUpdateMetadataRequestWithCurrentBrokerEpoch(): Unit = { + val currentBrokerEpoch = 1239875L + testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE) + } + + @Test + def testUpdateMetadataRequestWithNewerBrokerEpochIsValid(): Unit = { + val currentBrokerEpoch = 1239875L + testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE) + } + + @Test + def testUpdateMetadataRequestWithStaleBrokerEpochIsRejected(): Unit = { + val currentBrokerEpoch = 1239875L + testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH) + } + + def testUpdateMetadataRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = { + val updateMetadataRequestBuilder = createBasicMetadataRequest("topicA", 1, brokerEpochInRequest) + val request = buildRequest(updateMetadataRequestBuilder) + + val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture() + + EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch) + EasyMock.expect(replicaManager.maybeUpdateMetadataCache( + EasyMock.eq(request._2.context.header.correlationId), + EasyMock.anyObject() + )).andStubReturn( + Seq() + ) + + EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) + EasyMock.replay(replicaManager, controller, requestChannel) + + createKafkaApis().handleUpdateMetadataRequest(request._2) + val updateMetadataResponse = readResponse(ApiKeys.UPDATE_METADATA, request._1, capturedResponse) + .asInstanceOf[UpdateMetadataResponse] + assertEquals(expectedError, updateMetadataResponse.error()) + EasyMock.verify(replicaManager) + } + + @Test + def testLeaderAndIsrRequestWithCurrentBrokerEpoch(): Unit = { + val currentBrokerEpoch = 1239875L + testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE) + } + + @Test + def testLeaderAndIsrRequestWithNewerBrokerEpochIsValid(): Unit = { + val currentBrokerEpoch = 1239875L + testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE) + } + + @Test + def testLeaderAndIsrRequestWithStaleBrokerEpochIsRejected(): Unit = { + val currentBrokerEpoch = 1239875L + testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH) + } + + def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = { + val controllerId = 2 + val controllerEpoch = 6 + val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture() + val topicPartition = new TopicPartition("topicW", 1) + val partitionStates = Map(topicPartition -> + new LeaderAndIsrRequest.PartitionState(1, 0, 1, asList(0, 1), 2, asList(0, 1, 2), false) + ).asJava + val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder( + ApiKeys.LEADER_AND_ISR.latestVersion, + controllerId, + controllerEpoch, + brokerEpochInRequest, + partitionStates, + Set(new Node(0, "host0", 9090), new Node(1, "host1", 9091)).asJava + ) + val request = buildRequest(leaderAndIsrRequestBuilder) + val response = new LeaderAndIsrResponse(Errors.NONE, Map(topicPartition -> Errors.NONE).asJava) + + EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch) + EasyMock.expect(replicaManager.becomeLeaderOrFollower( + EasyMock.eq(request._2.context.header.correlationId), + EasyMock.anyObject(), + EasyMock.anyObject() + )).andStubReturn( + response + ) + + EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) + EasyMock.replay(replicaManager, controller, requestChannel) + + createKafkaApis().handleLeaderAndIsrRequest(request._2) + val leaderAndIsrResponse = readResponse(ApiKeys.LEADER_AND_ISR, request._1, capturedResponse) + .asInstanceOf[LeaderAndIsrResponse] + assertEquals(expectedError, leaderAndIsrResponse.error()) + EasyMock.verify(replicaManager) + } + + @Test + def testStopReplicaRequestWithCurrentBrokerEpoch(): Unit = { + val currentBrokerEpoch = 1239875L + testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE) + } + + @Test + def testStopReplicaRequestWithNewerBrokerEpochIsValid(): Unit = { + val currentBrokerEpoch = 1239875L + testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE) + } + + @Test + def testStopReplicaRequestWithStaleBrokerEpochIsRejected(): Unit = { + val currentBrokerEpoch = 1239875L + testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH) + } + + def testStopReplicaRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = { + val controllerId = 0 + val controllerEpoch = 5 + val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture() + val fooPartition = new TopicPartition("foo", 0) + val stopReplicaRequest = new StopReplicaRequest.Builder( + ApiKeys.STOP_REPLICA.latestVersion, + controllerId, + controllerEpoch, + brokerEpochInRequest, + false, + util.Collections.singleton(fooPartition) + ) + val request = buildRequest(stopReplicaRequest) + + EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch) + + val requestBody: StopReplicaRequest = request._2.body[StopReplicaRequest] + EasyMock.expect(replicaManager.stopReplicas(EasyMock.eq(requestBody))) + .andStubReturn( + (mutable.Map( + fooPartition -> Errors.NONE + ), Errors.NONE) + ) + EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) + + EasyMock.replay(controller, replicaManager, requestChannel) + + createKafkaApis().handleStopReplicaRequest(request._2) + val stopReplicaResponse = readResponse(ApiKeys.STOP_REPLICA, request._1, capturedResponse) + .asInstanceOf[StopReplicaResponse] + assertEquals(expectedError, stopReplicaResponse.error()) + EasyMock.verify(replicaManager) + } + /** * Return pair of listener names in the metadataCache: PLAINTEXT and LISTENER2 respectively. */ @@ -601,14 +752,18 @@ class KafkaApisTest { capturedResponse } - private def setupBasicMetadataCache(topic: String, numPartitions: Int): Unit = { + private def createBasicMetadataRequest(topic: String, numPartitions: Int, brokerEpoch: Long): UpdateMetadataRequest.Builder = { val replicas = List(0.asInstanceOf[Integer]).asJava val partitionState = new UpdateMetadataRequest.PartitionState(1, 0, 1, replicas, 0, replicas, Collections.emptyList()) val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) val broker = new Broker(0, Seq(new EndPoint("broker0", 9092, SecurityProtocol.PLAINTEXT, plaintextListener)).asJava, "rack") val partitions = (0 until numPartitions).map(new TopicPartition(topic, _) -> partitionState).toMap - val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0, - 0, 0, partitions.asJava, Set(broker).asJava).build() - metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest) + new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0, + 0, brokerEpoch, partitions.asJava, Set(broker).asJava) + } + + private def setupBasicMetadataCache(topic: String, numPartitions: Int): Unit = { + val updateMetadataRequest = createBasicMetadataRequest(topic, numPartitions, 0) + metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest.build()) } }