diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 2832ebc6266ed..c2c1f879e2d64 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -214,7 +214,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator, kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) - brokerState.newState(RunningAsBroker) Mx4jLoader.maybeLoad() @@ -249,6 +248,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr /* register broker metrics */ registerStats() + brokerState.newState(RunningAsBroker) shutdownLatch = new CountDownLatch(1) startupComplete.set(true) isStartingUp.set(false) diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala index 9b49365c77060..b5560c36d623c 100755 --- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala @@ -20,8 +20,8 @@ package kafka.server import kafka.utils.ZkUtils import kafka.utils.CoreUtils import kafka.utils.TestUtils - import kafka.zk.ZooKeeperTestHarness +import org.easymock.EasyMock import org.junit.Assert._ import org.junit.Test @@ -82,4 +82,30 @@ class ServerStartupTest extends ZooKeeperTestHarness { server.shutdown() CoreUtils.delete(server.config.logDirs) } + + @Test + def testBrokerStateRunningAfterZK { + val brokerId = 0 + val mockBrokerState = EasyMock.niceMock(classOf[kafka.server.BrokerState]) + + class BrokerStateInterceptor() extends BrokerState { + override def newState(newState: BrokerStates): Unit = { + val brokers = zkUtils.getAllBrokersInCluster() + assertEquals(1, brokers.size) + assertEquals(brokerId, brokers.head.id) + } + } + + class MockKafkaServer(override val config: KafkaConfig, override val brokerState: BrokerState = mockBrokerState) extends KafkaServer(config) {} + + val props = TestUtils.createBrokerConfig(brokerId, zkConnect) + val server = new MockKafkaServer(KafkaConfig.fromProps(props)) + + EasyMock.expect(mockBrokerState.newState(RunningAsBroker)).andDelegateTo(new BrokerStateInterceptor).once() + EasyMock.replay(mockBrokerState) + + server.startup() + server.shutdown() + CoreUtils.delete(server.config.logDirs) + } }