diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index da402ec4f8764..9016ab111033b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2921,10 +2921,9 @@ class KafkaApis(val requestChannel: RequestChannel, // if the controller hasn't been upgraded to use KIP-380 if (brokerEpochInRequest == AbstractControlRequest.UNKNOWN_BROKER_EPOCH) false else { - val curBrokerEpoch = controller.brokerEpoch - if (brokerEpochInRequest < curBrokerEpoch) true - else if (brokerEpochInRequest == curBrokerEpoch) false - else throw new IllegalStateException(s"Epoch $brokerEpochInRequest larger than current broker epoch $curBrokerEpoch") + // brokerEpochInRequest > controller.brokerEpoch is possible in rare scenarios where the controller gets notified + // about the new broker epoch and sends a control request with this epoch before the broker learns about it + brokerEpochInRequest < controller.brokerEpoch } } diff --git a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala index 23836dd66d8c7..e7b43f5671dfa 100755 --- a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala @@ -94,15 +94,20 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness { @Test def testControlRequestWithCorrectBrokerEpoch(): Unit = { - testControlRequestWithBrokerEpoch(false) + testControlRequestWithBrokerEpoch(0) } @Test def testControlRequestWithStaleBrokerEpoch(): Unit = { - testControlRequestWithBrokerEpoch(true) + testControlRequestWithBrokerEpoch(-1) } - private def testControlRequestWithBrokerEpoch(isEpochInRequestStale: Boolean): Unit = { + @Test + def testControlRequestWithNewerBrokerEpoch(): Unit = { + testControlRequestWithBrokerEpoch(1) + } + + private def testControlRequestWithBrokerEpoch(epochInRequestDiffFromCurrentEpoch: Long): Unit = { val tp = new TopicPartition("new-topic", 0) // create topic with 1 partition, 2 replicas, one on each broker @@ -127,8 +132,7 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness { controllerChannelManager.startup() val broker2 = servers(brokerId2) - val epochInRequest = - if (isEpochInRequestStale) broker2.kafkaController.brokerEpoch - 1 else broker2.kafkaController.brokerEpoch + val epochInRequest = broker2.kafkaController.brokerEpoch + epochInRequestDiffFromCurrentEpoch try { // Send LeaderAndIsr request with correct broker epoch @@ -150,10 +154,12 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness { epochInRequest, partitionStates.asJava, nodes.toSet.asJava) - if (isEpochInRequestStale) { + if (epochInRequestDiffFromCurrentEpoch < 0) { + // stale broker epoch in LEADER_AND_ISR sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder) } else { + // broker epoch in LEADER_AND_ISR >= current broker epoch sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder) TestUtils.waitUntilLeaderIsKnown(Seq(broker2), tp, 10000) } @@ -190,10 +196,12 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness { epochInRequest, partitionStates.asJava, liveBrokers.asJava) - if (isEpochInRequestStale) { + if (epochInRequestDiffFromCurrentEpoch < 0) { + // stale broker epoch in UPDATE_METADATA sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder) } else { + // broker epoch in UPDATE_METADATA >= current broker epoch sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder) TestUtils.waitUntilMetadataIsPropagated(Seq(broker2), tp.topic, tp.partition, 10000) assertEquals(brokerId2, @@ -208,9 +216,11 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness { epochInRequest, // Correct broker epoch true, Set(tp).asJava) - if (isEpochInRequestStale) { + if (epochInRequestDiffFromCurrentEpoch < 0) { + // stale broker epoch in STOP_REPLICA sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder) } else { + // broker epoch in STOP_REPLICA >= current broker epoch sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder) assertEquals(HostedPartition.None, broker2.replicaManager.getPartition(tp)) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index a3f58e9e6cad3..c255d99c7d0c4 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -20,8 +20,7 @@ package kafka.server import java.net.InetAddress import java.nio.charset.StandardCharsets import java.util -import java.util.Random -import java.util.{Collections, Optional} +import java.util.{Collections, Optional, Random} import java.util.Arrays.asList import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1} @@ -34,7 +33,7 @@ import kafka.network.RequestChannel.SendResponse import kafka.server.QuotaFactory.QuotaManagers import kafka.utils.{MockTime, TestUtils} import kafka.zk.KafkaZkClient -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.errors.UnsupportedVersionException import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.memory.MemoryPool @@ -49,7 +48,7 @@ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.easymock.{Capture, EasyMock, IAnswer} import EasyMock._ -import org.apache.kafka.common.message.{DescribeGroupsRequestData, HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, SyncGroupRequestData, TxnOffsetCommitRequestData} +import org.apache.kafka.common.message.{DescribeGroupsRequestData, HeartbeatRequestData, JoinGroupRequestData, LeaderAndIsrRequestData, LeaderAndIsrResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, SyncGroupRequestData, TxnOffsetCommitRequestData} import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection} @@ -285,7 +284,7 @@ class KafkaApisTest { val expectedErrors = Map(tp1 -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, tp2 -> Errors.NONE).asJava val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture() - val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() + val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() EasyMock.expect(replicaManager.getMagic(tp1)) .andReturn(Some(RecordBatch.MAGIC_VALUE_V1)) @@ -853,6 +852,167 @@ class KafkaApisTest { EasyMock.replay(groupCoordinator) } + @Test + def testUpdateMetadataRequestWithCurrentBrokerEpoch(): Unit = { + val currentBrokerEpoch = 1239875L + testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE) + } + + @Test + def testUpdateMetadataRequestWithNewerBrokerEpochIsValid(): Unit = { + val currentBrokerEpoch = 1239875L + testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE) + } + + @Test + def testUpdateMetadataRequestWithStaleBrokerEpochIsRejected(): Unit = { + val currentBrokerEpoch = 1239875L + testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH) + } + + def testUpdateMetadataRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = { + val updateMetadataRequestBuilder = createBasicMetadataRequest("topicA", 1, brokerEpochInRequest) + val request = buildRequest(updateMetadataRequestBuilder) + + val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture() + + EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch) + EasyMock.expect(replicaManager.maybeUpdateMetadataCache( + EasyMock.eq(request._2.context.correlationId), + EasyMock.anyObject() + )).andStubReturn( + Seq() + ) + + EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) + EasyMock.replay(replicaManager, controller, requestChannel) + + createKafkaApis().handleUpdateMetadataRequest(request._2) + val updateMetadataResponse = readResponse(ApiKeys.UPDATE_METADATA, request._1, capturedResponse) + .asInstanceOf[UpdateMetadataResponse] + assertEquals(expectedError, updateMetadataResponse.error()) + EasyMock.verify(replicaManager) + } + + @Test + def testLeaderAndIsrRequestWithCurrentBrokerEpoch(): Unit = { + val currentBrokerEpoch = 1239875L + testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE) + } + + @Test + def testLeaderAndIsrRequestWithNewerBrokerEpochIsValid(): Unit = { + val currentBrokerEpoch = 1239875L + testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE) + } + + @Test + def testLeaderAndIsrRequestWithStaleBrokerEpochIsRejected(): Unit = { + val currentBrokerEpoch = 1239875L + testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH) + } + + def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = { + val controllerId = 2 + val controllerEpoch = 6 + val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture() + val partitionStates = Seq( + new LeaderAndIsrRequestData.LeaderAndIsrPartitionState() + .setTopicName("topicW") + .setPartitionIndex(1) + .setControllerEpoch(1) + .setLeader(0) + .setLeaderEpoch(1) + .setIsr(asList(0, 1)) + .setZkVersion(2) + .setReplicas(asList(0, 1, 2)) + .setIsNew(false) + ).asJava + val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder( + ApiKeys.LEADER_AND_ISR.latestVersion, + controllerId, + controllerEpoch, + brokerEpochInRequest, + partitionStates, + asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091)) + ) + val request = buildRequest(leaderAndIsrRequestBuilder) + val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData() + .setErrorCode(Errors.NONE.code) + .setPartitionErrors(asList())) + + EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch) + EasyMock.expect(replicaManager.becomeLeaderOrFollower( + EasyMock.eq(request._2.context.correlationId), + EasyMock.anyObject(), + EasyMock.anyObject() + )).andStubReturn( + response + ) + + EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) + EasyMock.replay(replicaManager, controller, requestChannel) + + createKafkaApis().handleLeaderAndIsrRequest(request._2) + val leaderAndIsrResponse = readResponse(ApiKeys.LEADER_AND_ISR, request._1, capturedResponse) + .asInstanceOf[LeaderAndIsrResponse] + assertEquals(expectedError, leaderAndIsrResponse.error()) + EasyMock.verify(replicaManager) + } + + @Test + def testStopReplicaRequestWithCurrentBrokerEpoch(): Unit = { + val currentBrokerEpoch = 1239875L + testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE) + } + + @Test + def testStopReplicaRequestWithNewerBrokerEpochIsValid(): Unit = { + val currentBrokerEpoch = 1239875L + testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE) + } + + @Test + def testStopReplicaRequestWithStaleBrokerEpochIsRejected(): Unit = { + val currentBrokerEpoch = 1239875L + testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH) + } + + def testStopReplicaRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = { + val controllerId = 0 + val controllerEpoch = 5 + val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture() + val fooPartition = new TopicPartition("foo", 0) + val stopReplicaRequest = new StopReplicaRequest.Builder( + ApiKeys.STOP_REPLICA.latestVersion, + controllerId, + controllerEpoch, + brokerEpochInRequest, + false, + util.Collections.singleton(fooPartition) + ) + val request = buildRequest(stopReplicaRequest) + + EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch) + + val requestBody: StopReplicaRequest = request._2.body[StopReplicaRequest] + EasyMock.expect(replicaManager.stopReplicas(EasyMock.eq(requestBody))) + .andStubReturn( + (mutable.Map( + fooPartition -> Errors.NONE + ), Errors.NONE) + ) + EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) + + EasyMock.replay(controller, replicaManager, requestChannel) + + createKafkaApis().handleStopReplicaRequest(request._2) + val stopReplicaResponse = readResponse(ApiKeys.STOP_REPLICA, request._1, capturedResponse) + .asInstanceOf[StopReplicaResponse] + assertEquals(expectedError, stopReplicaResponse.error()) + EasyMock.verify(replicaManager) + } + /** * Return pair of listener names in the metadataCache: PLAINTEXT and LISTENER2 respectively. */ @@ -978,7 +1138,7 @@ class KafkaApisTest { capturedResponse } - private def setupBasicMetadataCache(topic: String, numPartitions: Int): Unit = { + private def createBasicMetadataRequest(topic: String, numPartitions: Int, brokerEpoch: Long): UpdateMetadataRequest.Builder = { val replicas = List(0.asInstanceOf[Integer]).asJava def createPartitionState(partition: Int) = new UpdateMetadataPartitionState() @@ -1001,8 +1161,12 @@ class KafkaApisTest { .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) .setListener(plaintextListener.value)).asJava) val partitionStates = (0 until numPartitions).map(createPartitionState) - val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0, - 0, 0, partitionStates.asJava, Seq(broker).asJava).build() - metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest) + new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0, + 0, brokerEpoch, partitionStates.asJava, Seq(broker).asJava) + } + + private def setupBasicMetadataCache(topic: String, numPartitions: Int): Unit = { + val updateMetadataRequest = createBasicMetadataRequest(topic, numPartitions, 0) + metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest.build()) } }