Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ControllerChannelManagerTest {

@Test
def testLeaderAndIsrRequestSent(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar"))
val batch = new MockControllerBrokerRequestBatch(context)

val partitions = Map(
Expand Down Expand Up @@ -98,7 +98,7 @@ class ControllerChannelManagerTest {

@Test
def testLeaderAndIsrRequestIsNew(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar"))
val batch = new MockControllerBrokerRequestBatch(context)

val partition = new TopicPartition("foo", 0)
Expand Down Expand Up @@ -126,7 +126,7 @@ class ControllerChannelManagerTest {

@Test
def testLeaderAndIsrRequestSentToLiveOrShuttingDownBrokers(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar"))
val batch = new MockControllerBrokerRequestBatch(context)

// 2 is shutting down, 3 is dead
Expand Down Expand Up @@ -176,7 +176,7 @@ class ControllerChannelManagerTest {

private def testLeaderAndIsrRequestFollowsInterBrokerProtocolVersion(interBrokerProtocolVersion: ApiVersion,
expectedLeaderAndIsrVersion: Short): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar"))
val config = createConfig(interBrokerProtocolVersion)
val batch = new MockControllerBrokerRequestBatch(context, config)

Expand Down Expand Up @@ -215,7 +215,7 @@ class ControllerChannelManagerTest {
def testUpdateMetadataRequestSent(): Unit = {

val topicIds = Map("foo" -> Uuid.randomUuid(), "bar" -> Uuid.randomUuid())
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, topicIds)
val context = initContext(Seq(1, 2, 3), 2, 3, topicIds)
val batch = new MockControllerBrokerRequestBatch(context)

val partitions = Map(
Expand Down Expand Up @@ -264,7 +264,7 @@ class ControllerChannelManagerTest {

@Test
def testUpdateMetadataDoesNotIncludePartitionsWithoutLeaderAndIsr(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar"))
val batch = new MockControllerBrokerRequestBatch(context)

val partitions = Set(
Expand Down Expand Up @@ -292,7 +292,7 @@ class ControllerChannelManagerTest {

@Test
def testUpdateMetadataRequestDuringTopicDeletion(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar"))
val batch = new MockControllerBrokerRequestBatch(context)

val partitions = Map(
Expand Down Expand Up @@ -334,7 +334,7 @@ class ControllerChannelManagerTest {

@Test
def testUpdateMetadataIncludesLiveOrShuttingDownBrokers(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar"))
val batch = new MockControllerBrokerRequestBatch(context)

// 2 is shutting down, 3 is dead
Expand Down Expand Up @@ -379,7 +379,7 @@ class ControllerChannelManagerTest {

private def testUpdateMetadataFollowsInterBrokerProtocolVersion(interBrokerProtocolVersion: ApiVersion,
expectedUpdateMetadataVersion: Short): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar"))
val config = createConfig(interBrokerProtocolVersion)
val batch = new MockControllerBrokerRequestBatch(context, config)

Expand All @@ -400,7 +400,7 @@ class ControllerChannelManagerTest {

@Test
def testStopReplicaRequestSent(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar"))
val batch = new MockControllerBrokerRequestBatch(context)

val partitions = Map(
Expand Down Expand Up @@ -435,7 +435,7 @@ class ControllerChannelManagerTest {

@Test
def testStopReplicaRequestWithAlreadyDefinedDeletedPartition(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar"))
val batch = new MockControllerBrokerRequestBatch(context)

val partition = new TopicPartition("foo", 0)
Expand Down Expand Up @@ -463,7 +463,7 @@ class ControllerChannelManagerTest {
}

private def testStopReplicaRequestsWhileTopicQueuedForDeletion(interBrokerProtocolVersion: ApiVersion): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar"))
val config = createConfig(interBrokerProtocolVersion)
val batch = new MockControllerBrokerRequestBatch(context, config)

Expand Down Expand Up @@ -510,7 +510,7 @@ class ControllerChannelManagerTest {
}

private def testStopReplicaRequestsWhileTopicDeletionStarted(interBrokerProtocolVersion: ApiVersion): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar"))
val config = createConfig(interBrokerProtocolVersion)
val batch = new MockControllerBrokerRequestBatch(context, config)

Expand Down Expand Up @@ -565,7 +565,7 @@ class ControllerChannelManagerTest {
}

private def testStopReplicaRequestWithoutDeletePartitionWhileTopicDeletionStarted(interBrokerProtocolVersion: ApiVersion): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar"))
val config = createConfig(interBrokerProtocolVersion)
val batch = new MockControllerBrokerRequestBatch(context, config)

Expand Down Expand Up @@ -623,7 +623,7 @@ class ControllerChannelManagerTest {

private def testMixedDeleteAndNotDeleteStopReplicaRequests(interBrokerProtocolVersion: ApiVersion,
expectedStopReplicaRequestVersion: Short): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar"))
val config = createConfig(interBrokerProtocolVersion)
val batch = new MockControllerBrokerRequestBatch(context, config)

Expand Down Expand Up @@ -682,7 +682,7 @@ class ControllerChannelManagerTest {

@Test
def testStopReplicaGroupsByBroker(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar"))
val batch = new MockControllerBrokerRequestBatch(context)

val partitions = Map(
Expand Down Expand Up @@ -720,7 +720,7 @@ class ControllerChannelManagerTest {

@Test
def testStopReplicaSentOnlyToLiveAndShuttingDownBrokers(): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty)
val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar"))
val batch = new MockControllerBrokerRequestBatch(context)

// 2 is shutting down, 3 is dead
Expand Down Expand Up @@ -772,7 +772,7 @@ class ControllerChannelManagerTest {

private def testStopReplicaFollowsInterBrokerProtocolVersion(interBrokerProtocolVersion: ApiVersion,
expectedStopReplicaRequestVersion: Short): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo"), 2, 3, Map.empty)
val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo"))
val config = createConfig(interBrokerProtocolVersion)
val batch = new MockControllerBrokerRequestBatch(context, config)

Expand Down Expand Up @@ -884,7 +884,12 @@ class ControllerChannelManagerTest {
private def replicaAssignment(replicas: Seq[Int]): ReplicaAssignment = ReplicaAssignment(replicas, Seq(), Seq())

private def initContext(brokers: Seq[Int],
topics: Set[String],
numPartitions: Int,
replicationFactor: Int,
topics: Set[String]): ControllerContext = initContext(brokers, numPartitions,
replicationFactor, topics.map(_ -> Uuid.randomUuid()).toMap)

private def initContext(brokers: Seq[Int],
numPartitions: Int,
replicationFactor: Int,
topicIds: Map[String, Uuid]): ControllerContext = {
Expand All @@ -896,15 +901,12 @@ class ControllerChannelManagerTest {
}.toMap

context.setLiveBrokers(brokerEpochs)
context.setAllTopics(topics)

for (topic <- topics) {
context.addTopicId(topic, Uuid.randomUuid())
}
context.setAllTopics(topicIds.keySet)
topicIds.foreach { case (name, id) => context.addTopicId(name, id) }

// Simple round-robin replica assignment
var leaderIndex = 0
for (topic <- topics; partitionId <- 0 until numPartitions) {
for (topic <- topicIds.keys; partitionId <- 0 until numPartitions) {
val partition = new TopicPartition(topic, partitionId)
val replicas = (0 until replicationFactor).map { i =>
val replica = brokers((i + leaderIndex) % brokers.size)
Expand All @@ -914,8 +916,6 @@ class ControllerChannelManagerTest {
leaderIndex += 1
}

context.allTopics ++= topics
topicIds.foreach { case (name, id) => context.addTopicId(name, id) }
context
}

Expand Down