Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2921,10 +2921,9 @@ class KafkaApis(val requestChannel: RequestChannel,
// if the controller hasn't been upgraded to use KIP-380
if (brokerEpochInRequest == AbstractControlRequest.UNKNOWN_BROKER_EPOCH) false
else {
val curBrokerEpoch = controller.brokerEpoch
if (brokerEpochInRequest < curBrokerEpoch) true
else if (brokerEpochInRequest == curBrokerEpoch) false
else throw new IllegalStateException(s"Epoch $brokerEpochInRequest larger than current broker epoch $curBrokerEpoch")
// brokerEpochInRequest > controller.brokerEpoch is possible in rare scenarios where the controller gets notified
// about the new broker epoch and sends a control request with this epoch before the broker learns about it
brokerEpochInRequest < controller.brokerEpoch
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,20 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {

@Test
def testControlRequestWithCorrectBrokerEpoch(): Unit = {
testControlRequestWithBrokerEpoch(false)
testControlRequestWithBrokerEpoch(0)
}

@Test
def testControlRequestWithStaleBrokerEpoch(): Unit = {
testControlRequestWithBrokerEpoch(true)
testControlRequestWithBrokerEpoch(-1)
}

private def testControlRequestWithBrokerEpoch(isEpochInRequestStale: Boolean): Unit = {
@Test
def testControlRequestWithNewerBrokerEpoch(): Unit = {
testControlRequestWithBrokerEpoch(1)
}

private def testControlRequestWithBrokerEpoch(epochInRequestDiffFromCurrentEpoch: Long): Unit = {
val tp = new TopicPartition("new-topic", 0)

// create topic with 1 partition, 2 replicas, one on each broker
Expand All @@ -127,8 +132,7 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
controllerChannelManager.startup()

val broker2 = servers(brokerId2)
val epochInRequest =
if (isEpochInRequestStale) broker2.kafkaController.brokerEpoch - 1 else broker2.kafkaController.brokerEpoch
val epochInRequest = broker2.kafkaController.brokerEpoch + epochInRequestDiffFromCurrentEpoch

try {
// Send LeaderAndIsr request with correct broker epoch
Expand All @@ -150,10 +154,12 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
epochInRequest,
partitionStates.asJava, nodes.toSet.asJava)

if (isEpochInRequestStale) {
if (epochInRequestDiffFromCurrentEpoch < 0) {
// stale broker epoch in LEADER_AND_ISR
sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder)
}
else {
// broker epoch in LEADER_AND_ISR >= current broker epoch
sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder)
TestUtils.waitUntilLeaderIsKnown(Seq(broker2), tp, 10000)
}
Expand Down Expand Up @@ -190,10 +196,12 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
epochInRequest,
partitionStates.asJava, liveBrokers.asJava)

if (isEpochInRequestStale) {
if (epochInRequestDiffFromCurrentEpoch < 0) {
// stale broker epoch in UPDATE_METADATA
sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder)
}
else {
// broker epoch in UPDATE_METADATA >= current broker epoch
sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder)
TestUtils.waitUntilMetadataIsPropagated(Seq(broker2), tp.topic, tp.partition, 10000)
assertEquals(brokerId2,
Expand All @@ -208,9 +216,11 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
epochInRequest, // Correct broker epoch
true, Set(tp).asJava)

if (isEpochInRequestStale) {
if (epochInRequestDiffFromCurrentEpoch < 0) {
// stale broker epoch in STOP_REPLICA
sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder)
} else {
// broker epoch in STOP_REPLICA >= current broker epoch
sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder)
assertEquals(HostedPartition.None, broker2.replicaManager.getPartition(tp))
}
Expand Down
182 changes: 173 additions & 9 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ package kafka.server
import java.net.InetAddress
import java.nio.charset.StandardCharsets
import java.util
import java.util.Random
import java.util.{Collections, Optional}
import java.util.{Collections, Optional, Random}
import java.util.Arrays.asList

import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1}
Expand All @@ -34,7 +33,7 @@ import kafka.network.RequestChannel.SendResponse
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.{MockTime, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.memory.MemoryPool
Expand All @@ -49,7 +48,7 @@ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.easymock.{Capture, EasyMock, IAnswer}
import EasyMock._
import org.apache.kafka.common.message.{DescribeGroupsRequestData, HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, SyncGroupRequestData, TxnOffsetCommitRequestData}
import org.apache.kafka.common.message.{DescribeGroupsRequestData, HeartbeatRequestData, JoinGroupRequestData, LeaderAndIsrRequestData, LeaderAndIsrResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, SyncGroupRequestData, TxnOffsetCommitRequestData}
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
Expand Down Expand Up @@ -285,7 +284,7 @@ class KafkaApisTest {
val expectedErrors = Map(tp1 -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, tp2 -> Errors.NONE).asJava

val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()

EasyMock.expect(replicaManager.getMagic(tp1))
.andReturn(Some(RecordBatch.MAGIC_VALUE_V1))
Expand Down Expand Up @@ -853,6 +852,167 @@ class KafkaApisTest {
EasyMock.replay(groupCoordinator)
}

@Test
def testUpdateMetadataRequestWithCurrentBrokerEpoch(): Unit = {
val currentBrokerEpoch = 1239875L
testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE)
}

@Test
def testUpdateMetadataRequestWithNewerBrokerEpochIsValid(): Unit = {
val currentBrokerEpoch = 1239875L
testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE)
}

@Test
def testUpdateMetadataRequestWithStaleBrokerEpochIsRejected(): Unit = {
val currentBrokerEpoch = 1239875L
testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH)
}

def testUpdateMetadataRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = {
val updateMetadataRequestBuilder = createBasicMetadataRequest("topicA", 1, brokerEpochInRequest)
val request = buildRequest(updateMetadataRequestBuilder)

val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()

EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
EasyMock.expect(replicaManager.maybeUpdateMetadataCache(
EasyMock.eq(request._2.context.correlationId),
EasyMock.anyObject()
)).andStubReturn(
Seq()
)

EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
EasyMock.replay(replicaManager, controller, requestChannel)

createKafkaApis().handleUpdateMetadataRequest(request._2)
val updateMetadataResponse = readResponse(ApiKeys.UPDATE_METADATA, request._1, capturedResponse)
.asInstanceOf[UpdateMetadataResponse]
assertEquals(expectedError, updateMetadataResponse.error())
EasyMock.verify(replicaManager)
}

@Test
def testLeaderAndIsrRequestWithCurrentBrokerEpoch(): Unit = {
val currentBrokerEpoch = 1239875L
testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE)
}

@Test
def testLeaderAndIsrRequestWithNewerBrokerEpochIsValid(): Unit = {
val currentBrokerEpoch = 1239875L
testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE)
}

@Test
def testLeaderAndIsrRequestWithStaleBrokerEpochIsRejected(): Unit = {
val currentBrokerEpoch = 1239875L
testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH)
}

def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = {
val controllerId = 2
val controllerEpoch = 6
val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
val partitionStates = Seq(
new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
.setTopicName("topicW")
.setPartitionIndex(1)
.setControllerEpoch(1)
.setLeader(0)
.setLeaderEpoch(1)
.setIsr(asList(0, 1))
.setZkVersion(2)
.setReplicas(asList(0, 1, 2))
.setIsNew(false)
).asJava
val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(
ApiKeys.LEADER_AND_ISR.latestVersion,
controllerId,
controllerEpoch,
brokerEpochInRequest,
partitionStates,
asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091))
)
val request = buildRequest(leaderAndIsrRequestBuilder)
val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code)
.setPartitionErrors(asList()))

EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
EasyMock.expect(replicaManager.becomeLeaderOrFollower(
EasyMock.eq(request._2.context.correlationId),
EasyMock.anyObject(),
EasyMock.anyObject()
)).andStubReturn(
response
)

EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
EasyMock.replay(replicaManager, controller, requestChannel)

createKafkaApis().handleLeaderAndIsrRequest(request._2)
val leaderAndIsrResponse = readResponse(ApiKeys.LEADER_AND_ISR, request._1, capturedResponse)
.asInstanceOf[LeaderAndIsrResponse]
assertEquals(expectedError, leaderAndIsrResponse.error())
EasyMock.verify(replicaManager)
}

@Test
def testStopReplicaRequestWithCurrentBrokerEpoch(): Unit = {
val currentBrokerEpoch = 1239875L
testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE)
}

@Test
def testStopReplicaRequestWithNewerBrokerEpochIsValid(): Unit = {
val currentBrokerEpoch = 1239875L
testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE)
}

@Test
def testStopReplicaRequestWithStaleBrokerEpochIsRejected(): Unit = {
val currentBrokerEpoch = 1239875L
testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH)
}

def testStopReplicaRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = {
val controllerId = 0
val controllerEpoch = 5
val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
val fooPartition = new TopicPartition("foo", 0)
val stopReplicaRequest = new StopReplicaRequest.Builder(
ApiKeys.STOP_REPLICA.latestVersion,
controllerId,
controllerEpoch,
brokerEpochInRequest,
false,
util.Collections.singleton(fooPartition)
)
val request = buildRequest(stopReplicaRequest)

EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)

val requestBody: StopReplicaRequest = request._2.body[StopReplicaRequest]
EasyMock.expect(replicaManager.stopReplicas(EasyMock.eq(requestBody)))
.andStubReturn(
(mutable.Map(
fooPartition -> Errors.NONE
), Errors.NONE)
)
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))

EasyMock.replay(controller, replicaManager, requestChannel)

createKafkaApis().handleStopReplicaRequest(request._2)
val stopReplicaResponse = readResponse(ApiKeys.STOP_REPLICA, request._1, capturedResponse)
.asInstanceOf[StopReplicaResponse]
assertEquals(expectedError, stopReplicaResponse.error())
EasyMock.verify(replicaManager)
}

/**
* Return pair of listener names in the metadataCache: PLAINTEXT and LISTENER2 respectively.
*/
Expand Down Expand Up @@ -978,7 +1138,7 @@ class KafkaApisTest {
capturedResponse
}

private def setupBasicMetadataCache(topic: String, numPartitions: Int): Unit = {
private def createBasicMetadataRequest(topic: String, numPartitions: Int, brokerEpoch: Long): UpdateMetadataRequest.Builder = {
val replicas = List(0.asInstanceOf[Integer]).asJava

def createPartitionState(partition: Int) = new UpdateMetadataPartitionState()
Expand All @@ -1001,8 +1161,12 @@ class KafkaApisTest {
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setListener(plaintextListener.value)).asJava)
val partitionStates = (0 until numPartitions).map(createPartitionState)
val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
0, 0, partitionStates.asJava, Seq(broker).asJava).build()
metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest)
new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
0, brokerEpoch, partitionStates.asJava, Seq(broker).asJava)
}

private def setupBasicMetadataCache(topic: String, numPartitions: Int): Unit = {
val updateMetadataRequest = createBasicMetadataRequest(topic, numPartitions, 0)
metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest.build())
}
}