Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator,
kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
brokerState.newState(RunningAsBroker)

Mx4jLoader.maybeLoad()

Expand Down Expand Up @@ -249,6 +248,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
/* register broker metrics */
registerStats()

brokerState.newState(RunningAsBroker)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, one issue that I noticed when doing some other work is that this will override RunningAsController even though it probably should not (it was already doing that before this change).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a good point. It seems that it's more natural to start the controller after the broker registration in ZK. Could you file a separate jira to track that?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.

shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
Expand Down
28 changes: 27 additions & 1 deletion core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package kafka.server
import kafka.utils.ZkUtils
import kafka.utils.CoreUtils
import kafka.utils.TestUtils

import kafka.zk.ZooKeeperTestHarness
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.Test

Expand Down Expand Up @@ -82,4 +82,30 @@ class ServerStartupTest extends ZooKeeperTestHarness {
server.shutdown()
CoreUtils.delete(server.config.logDirs)
}

@Test
def testBrokerStateRunningAfterZK {
val brokerId = 0
val mockBrokerState = EasyMock.niceMock(classOf[kafka.server.BrokerState])

class BrokerStateInterceptor() extends BrokerState {
override def newState(newState: BrokerStates): Unit = {
val brokers = zkUtils.getAllBrokersInCluster()
assertEquals(1, brokers.size)
assertEquals(brokerId, brokers.head.id)
}
}

class MockKafkaServer(override val config: KafkaConfig, override val brokerState: BrokerState = mockBrokerState) extends KafkaServer(config) {}

val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
val server = new MockKafkaServer(KafkaConfig.fromProps(props))

EasyMock.expect(mockBrokerState.newState(RunningAsBroker)).andDelegateTo(new BrokerStateInterceptor).once()
EasyMock.replay(mockBrokerState)

server.startup()
server.shutdown()
CoreUtils.delete(server.config.logDirs)
}
}