From 54052cb17b1572b50ce770168a30c5d9cbcb278a Mon Sep 17 00:00:00 2001 From: Roger Hoover Date: Tue, 24 May 2016 18:37:24 -0700 Subject: [PATCH 1/2] Setting broker state as running after publishing to ZK --- .../main/scala/kafka/server/KafkaServer.scala | 2 +- .../unit/kafka/server/ServerStartupTest.scala | 32 ++++++++++++++++--- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 2832ebc6266ed..c2c1f879e2d64 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -214,7 +214,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator, kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) - brokerState.newState(RunningAsBroker) Mx4jLoader.maybeLoad() @@ -249,6 +248,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr /* register broker metrics */ registerStats() + brokerState.newState(RunningAsBroker) shutdownLatch = new CountDownLatch(1) startupComplete.set(true) isStartingUp.set(false) diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala index 9b49365c77060..bdebd07964cf5 100755 --- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala @@ -17,11 +17,9 @@ package kafka.server -import kafka.utils.ZkUtils -import kafka.utils.CoreUtils -import kafka.utils.TestUtils - +import kafka.utils._ import kafka.zk.ZooKeeperTestHarness +import org.easymock.EasyMock import org.junit.Assert._ import org.junit.Test @@ -82,4 +80,30 @@ class ServerStartupTest extends ZooKeeperTestHarness { server.shutdown() CoreUtils.delete(server.config.logDirs) } + + @Test + def testBrokerStateRunningAfterZK { + val brokerId = 0 + val mockBrokerState = EasyMock.niceMock(classOf[kafka.server.BrokerState]) + + class BrokerStateInterceptor() extends BrokerState { + override def newState(newState: BrokerStates): Unit = { + val brokers = zkUtils.getAllBrokersInCluster() + assertEquals(1, brokers.size) + assertEquals(brokerId, brokers.head.id) + } + } + + class MockKafkaServer(override val config: KafkaConfig, override val brokerState: BrokerState = mockBrokerState) extends KafkaServer(config) {} + + val props = TestUtils.createBrokerConfig(brokerId, zkConnect) + val server = new MockKafkaServer(KafkaConfig.fromProps(props)) + + EasyMock.expect(mockBrokerState.newState(RunningAsBroker)).andDelegateTo(new BrokerStateInterceptor).once() + EasyMock.replay(mockBrokerState) + + server.startup() + server.shutdown() + CoreUtils.delete(server.config.logDirs) + } } From cc8ce55f874121dc4c26a63ffd6c8c7eb3a8107d Mon Sep 17 00:00:00 2001 From: Roger Hoover Date: Tue, 24 May 2016 18:40:51 -0700 Subject: [PATCH 2/2] Restore imports --- core/src/test/scala/unit/kafka/server/ServerStartupTest.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala index bdebd07964cf5..b5560c36d623c 100755 --- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala @@ -17,7 +17,9 @@ package kafka.server -import kafka.utils._ +import kafka.utils.ZkUtils +import kafka.utils.CoreUtils +import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness import org.easymock.EasyMock import org.junit.Assert._