diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index e42d10488f8df..2027ec8aadadf 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -36,13 +36,12 @@ import kafka.utils.CoreUtils.inLock import kafka.utils.ZkUtils._ import kafka.utils._ import org.I0Itec.zkclient.exception.ZkNodeExistsException -import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient} +import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection} import org.apache.zookeeper.Watcher.Event.KeeperState import scala.collection._ import scala.collection.JavaConversions._ - /** * This class handles the consumers interaction with zookeeper * @@ -90,6 +89,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private val rebalanceLock = new Object private var fetcher: Option[ConsumerFetcherManager] = None private var zkClient: ZkClient = null + private var zkConnection : ZkConnection = null private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] private val checkpointedZkOffsets = new Pool[TopicAndPartition, Long] private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]] @@ -178,7 +178,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def connectZk() { info("Connecting to zookeeper instance at " + config.zkConnect) - zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs) + val (client, connection) = ZkUtils.createZkClientAndConnection(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs) + zkClient = client + zkConnection = connection } // Blocks until the offset manager is located and a channel is established to it. @@ -261,9 +263,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val timestamp = SystemTime.milliseconds.toString val consumerRegistrationInfo = Json.encode(Map("version" -> 1, "subscription" -> topicCount.getTopicCountMap, "pattern" -> topicCount.pattern, "timestamp" -> timestamp)) + val zkWatchedEphemeral = new ZKCheckedEphemeral(dirs. + consumerRegistryDir + "/" + consumerIdString, + consumerRegistrationInfo, + zkConnection.getZookeeper) + zkWatchedEphemeral.create() - createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, null, - (consumerZKString, consumer) => true, config.zkSessionTimeoutMs) info("end registering consumer " + consumerIdString + " in ZK") } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 54a31c6cffd54..a7b44cab501a9 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -37,13 +37,14 @@ import kafka.utils.CoreUtils._ import org.apache.zookeeper.Watcher.Event.KeeperState import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.utils.Time -import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient} +import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} import java.util.concurrent.locks.ReentrantLock import kafka.server._ import kafka.common.TopicAndPartition class ControllerContext(val zkClient: ZkClient, + val zkConnection: ZkConnection, val zkSessionTimeout: Int) { var controllerChannelManager: ControllerChannelManager = null val controllerLock: ReentrantLock = new ReentrantLock() @@ -153,11 +154,11 @@ object KafkaController extends Logging { } } -class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { +class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection: ZkConnection, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true private val stateChangeLogger = KafkaController.stateChangeLogger - val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs) + val controllerContext = new ControllerContext(zkClient, zkConnection, config.zkSessionTimeoutMs) val partitionStateMachine = new PartitionStateMachine(this) val replicaStateMachine = new ReplicaStateMachine(this) private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index e6e270bbdea6c..16760d4f6d3e6 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -21,7 +21,7 @@ import kafka.cluster.EndPoint import kafka.utils._ import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.zookeeper.Watcher.Event.KeeperState -import org.I0Itec.zkclient.{IZkStateListener, ZkClient} +import org.I0Itec.zkclient.{IZkStateListener, ZkClient, ZkConnection} import java.net.InetAddress @@ -35,8 +35,8 @@ import java.net.InetAddress */ class KafkaHealthcheck(private val brokerId: Int, private val advertisedEndpoints: Map[SecurityProtocol, EndPoint], - private val zkSessionTimeoutMs: Int, - private val zkClient: ZkClient) extends Logging { + private val zkClient: ZkClient, + private val zkConnection: ZkConnection) extends Logging { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId val sessionExpireListener = new SessionExpireListener @@ -62,7 +62,7 @@ class KafkaHealthcheck(private val brokerId: Int, // only PLAINTEXT is supported as default // if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered and older clients will be unable to connect val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null)) - ZkUtils.registerBrokerInZk(zkClient, brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, zkSessionTimeoutMs, jmxPort) + ZkUtils.registerBrokerInZk(zkClient, zkConnection, brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort) } /** @@ -71,9 +71,7 @@ class KafkaHealthcheck(private val brokerId: Int, */ class SessionExpireListener() extends IZkStateListener { @throws(classOf[Exception]) - def handleStateChanged(state: KeeperState) { - // do nothing, since zkclient will do reconnect for us. - } + def handleStateChanged(state: KeeperState) {} /** * Called after the zookeeper session has expired and a new session has been created. You would have to re-create diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 5cc9c5d5a1eb1..ba3333ddee24b 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -43,7 +43,7 @@ import org.apache.kafka.common.utils.AppInfoParser import scala.collection.mutable import scala.collection.JavaConverters._ -import org.I0Itec.zkclient.ZkClient +import org.I0Itec.zkclient.{ZkClient, ZkConnection} import kafka.controller.{ControllerStats, KafkaController} import kafka.cluster.{EndPoint, Broker} import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBrokerIdException} @@ -129,6 +129,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr val metadataCache: MetadataCache = new MetadataCache(config.brokerId) var zkClient: ZkClient = null + var zkConnection: ZkConnection = null val correlationId: AtomicInteger = new AtomicInteger(0) val brokerMetaPropsFile = "meta.properties" val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap @@ -164,7 +165,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr kafkaScheduler.startup() /* setup zookeeper */ - zkClient = initZk() + val (client, connection) = initZk() + zkClient = client + zkConnection = connection /* start log manager */ logManager = createLogManager(zkClient, brokerState) @@ -183,7 +186,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr replicaManager.startup() /* start kafka controller */ - kafkaController = new KafkaController(config, zkClient, brokerState, kafkaMetricsTime, metrics, threadNamePrefix) + kafkaController = new KafkaController(config, zkClient, zkConnection, brokerState, kafkaMetricsTime, metrics, threadNamePrefix) kafkaController.startup() /* start kafka coordinator */ @@ -220,7 +223,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr else (protocol, endpoint) } - kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, config.zkSessionTimeoutMs, zkClient) + kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkClient, zkConnection) kafkaHealthcheck.startup() /* register broker metrics */ @@ -242,7 +245,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr } } - private def initZk(): ZkClient = { + private def initZk(): (ZkClient, ZkConnection) = { info("Connecting to zookeeper on " + config.zkConnect) val chroot = { @@ -260,9 +263,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr zkClientForChrootCreation.close() } - val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs) + val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs) ZkUtils.setupCommonPaths(zkClient) - zkClient + (zkClient, zkConnection) } diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala index 2b36b2de3e7ea..b283e0a37b410 100755 --- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala +++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala @@ -18,7 +18,7 @@ package kafka.server import kafka.utils.ZkUtils._ import kafka.utils.CoreUtils._ -import kafka.utils.{Json, SystemTime, Logging} +import kafka.utils.{Json, SystemTime, Logging, ZKCheckedEphemeral} import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.I0Itec.zkclient.IZkDataListener import kafka.controller.ControllerContext @@ -56,7 +56,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, case None => -1 } } - + def elect: Boolean = { val timestamp = SystemTime.milliseconds.toString val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp)) @@ -73,9 +73,10 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, } try { - createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId, - (controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int], - controllerContext.zkSessionTimeout) + val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath, + electString, + controllerContext.zkConnection.getZookeeper) + zkCheckedEphemeral.create() info(brokerId + " successfully elected as leader") leaderId = brokerId onBecomingLeader() diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 74b587e04cdd6..e1cfa2e795964 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -17,11 +17,12 @@ package kafka.utils +import java.util.concurrent.CountDownLatch import kafka.cluster._ import kafka.consumer.{ConsumerThreadId, TopicCount} import kafka.server.ConfigType -import org.I0Itec.zkclient.ZkClient -import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, +import org.I0Itec.zkclient.{ZkClient,ZkConnection} +import org.I0Itec.zkclient.exception.{ZkException, ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError, ZkBadVersionException} import org.I0Itec.zkclient.serialize.ZkSerializer import org.apache.kafka.common.config.ConfigException @@ -36,6 +37,14 @@ import kafka.controller.KafkaController import kafka.controller.LeaderIsrAndControllerEpoch import kafka.common.TopicAndPartition +import org.apache.zookeeper.AsyncCallback.{DataCallback,StringCallback} +import org.apache.zookeeper.CreateMode +import org.apache.zookeeper.KeeperException +import org.apache.zookeeper.KeeperException.Code +import org.apache.zookeeper.ZooDefs.Ids +import org.apache.zookeeper.ZooKeeper + + object ZkUtils extends Logging { val ConsumersPath = "/consumers" val BrokerIdsPath = "/brokers/ids" @@ -195,24 +204,22 @@ object ZkUtils extends Logging { * @param timeout * @param jmxPort */ - def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, advertisedEndpoints: immutable.Map[SecurityProtocol, EndPoint], timeout: Int, jmxPort: Int) { + def registerBrokerInZk(zkClient: ZkClient, zkConnection: ZkConnection, id: Int, host: String, port: Int, advertisedEndpoints: immutable.Map[SecurityProtocol, EndPoint], jmxPort: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id val timestamp = SystemTime.milliseconds.toString val brokerInfo = Json.encode(Map("version" -> 2, "host" -> host, "port" -> port, "endpoints"->advertisedEndpoints.values.map(_.connectionString).toArray, "jmx_port" -> jmxPort, "timestamp" -> timestamp)) - val expectedBroker = new Broker(id, advertisedEndpoints) - - registerBrokerInZk(zkClient, brokerIdPath, brokerInfo, expectedBroker, timeout) + registerBrokerInZk(zkClient, zkConnection, brokerIdPath, brokerInfo) info("Registered broker %d at path %s with addresses: %s".format(id, brokerIdPath, advertisedEndpoints.mkString(","))) } - private def registerBrokerInZk(zkClient: ZkClient, brokerIdPath: String, brokerInfo: String, expectedBroker: Broker, timeout: Int) { + private def registerBrokerInZk(zkClient: ZkClient, zkConnection: ZkConnection, brokerIdPath: String, brokerInfo: String) { try { - createEphemeralPathExpectConflictHandleZKBug(zkClient, brokerIdPath, brokerInfo, expectedBroker, - (brokerString: String, broker: Any) => Broker.createBroker(broker.asInstanceOf[Broker].id, brokerString).equals(broker.asInstanceOf[Broker]), - timeout) - + val zkCheckedEphemeral = new ZKCheckedEphemeral(brokerIdPath, + brokerInfo, + zkConnection.getZookeeper) + zkCheckedEphemeral.create() } catch { case e: ZkNodeExistsException => throw new RuntimeException("A broker is already registered on the path " + brokerIdPath @@ -301,47 +308,6 @@ object ZkUtils extends Logging { } } - /** - * Create an ephemeral node with the given path and data. - * Throw NodeExistsException if node already exists. - * Handles the following ZK session timeout bug: - * - * https://issues.apache.org/jira/browse/ZOOKEEPER-1740 - * - * Upon receiving a NodeExistsException, read the data from the conflicted path and - * trigger the checker function comparing the read data and the expected data, - * If the checker function returns true then the above bug might be encountered, back off and retry; - * otherwise re-throw the exception - */ - def createEphemeralPathExpectConflictHandleZKBug(zkClient: ZkClient, path: String, data: String, expectedCallerData: Any, checker: (String, Any) => Boolean, backoffTime: Int): Unit = { - while (true) { - try { - createEphemeralPathExpectConflict(zkClient, path, data) - return - } catch { - case e: ZkNodeExistsException => { - // An ephemeral node may still exist even after its corresponding session has expired - // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted - // and hence the write succeeds without ZkNodeExistsException - ZkUtils.readDataMaybeNull(zkClient, path)._1 match { - case Some(writtenData) => { - if (checker(writtenData, expectedCallerData)) { - info("I wrote this conflicted ephemeral node [%s] at %s a while back in a different session, ".format(data, path) - + "hence I will backoff for this node to be deleted by Zookeeper and retry") - - Thread.sleep(backoffTime) - } else { - throw e - } - } - case None => // the node disappeared; retry creating the ephemeral node immediately - } - } - case e2: Throwable => throw e2 - } - } - } - /** * Create an persistent node with the given path and data. Create parents if necessary. */ @@ -809,7 +775,13 @@ object ZkUtils extends Logging { def createZkClient(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): ZkClient = { val zkClient = new ZkClient(zkUrl, sessionTimeout, connectionTimeout, ZKStringSerializer) - zkClient + zkClient + } + + def createZkClientAndConnection(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): (ZkClient, ZkConnection) = { + val zkConnection = new ZkConnection(zkUrl, sessionTimeout) + val zkClient = new ZkClient(zkConnection, connectionTimeout, ZKStringSerializer) + (zkClient, zkConnection) } } @@ -892,3 +864,157 @@ object ZkPath { client.createPersistentSequential(path, data) } } + +/** + * Creates an ephemeral znode checking the session owner + * in the case of conflict. In the regular case, the + * znode is created and the create call returns OK. If + * the call receives a node exists event, then it checks + * if the session matches. If it does, then it returns OK, + * and otherwise it fails the operation. + */ + +class ZKCheckedEphemeral(path: String, + data: String, + zkHandle: ZooKeeper) extends Logging { + private val createCallback = new CreateCallback + private val getDataCallback = new GetDataCallback + val latch: CountDownLatch = new CountDownLatch(1) + var result: Code = Code.OK + + private class CreateCallback extends StringCallback { + def processResult(rc: Int, + path: String, + ctx: Object, + name: String) { + Code.get(rc) match { + case Code.OK => + setResult(Code.OK) + case Code.CONNECTIONLOSS => + // try again + createEphemeral + case Code.NONODE => + error("No node for path %s (could be the parent missing)".format(path)) + setResult(Code.NONODE) + case Code.NODEEXISTS => + zkHandle.getData(path, false, getDataCallback, null) + case Code.SESSIONEXPIRED => + error("Session has expired while creating %s".format(path)) + setResult(Code.SESSIONEXPIRED) + case _ => + warn("ZooKeeper event while creating registration node: %s %s".format(path, Code.get(rc))) + setResult(Code.get(rc)) + } + } + } + + private class GetDataCallback extends DataCallback { + def processResult(rc: Int, + path: String, + ctx: Object, + readData: Array[Byte], + stat: Stat) { + Code.get(rc) match { + case Code.OK => + if (stat.getEphemeralOwner != zkHandle.getSessionId) + setResult(Code.NODEEXISTS) + else + setResult(Code.OK) + case Code.NONODE => + info("The ephemeral node [%s] at %s has gone away while reading it, ".format(data, path)) + createEphemeral + case Code.SESSIONEXPIRED => + error("Session has expired while reading znode %s".format(path)) + setResult(Code.SESSIONEXPIRED) + case _ => + warn("ZooKeeper event while getting znode data: %s %s".format(path, Code.get(rc))) + setResult(Code.get(rc)) + } + } + } + + private def createEphemeral() { + zkHandle.create(path, + ZKStringSerializer.serialize(data), + Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL, + createCallback, + null) + } + + private def createRecursive(prefix: String, suffix: String) { + debug("Path: %s, Prefix: %s, Suffix: %s".format(path, prefix, suffix)) + if(suffix.isEmpty()) { + createEphemeral + } else { + zkHandle.create(prefix, + new Array[Byte](0), + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + new StringCallback() { + def processResult(rc : Int, + path : String, + ctx : Object, + name : String) { + Code.get(rc) match { + case Code.OK | Code.NODEEXISTS => + // Nothing to do + case Code.CONNECTIONLOSS => + // try again + val suffix = ctx.asInstanceOf[String] + createRecursive(path, suffix) + case Code.NONODE => + error("No node for path %s (could be the parent missing)".format(path)) + setResult(Code.get(rc)) + case Code.SESSIONEXPIRED => + error("Session has expired while creating %s".format(path)) + setResult(Code.get(rc)) + case _ => + warn("ZooKeeper event while creating registration node: %s %s".format(path, Code.get(rc))) + setResult(Code.get(rc)) + } + } + }, + suffix) + // Update prefix and suffix + val index = suffix.indexOf('/', 1) match { + case -1 => suffix.length + case x : Int => x + } + // Get new prefix + val newPrefix = prefix + suffix.substring(0, index) + // Get new suffix + val newSuffix = suffix.substring(index, suffix.length) + createRecursive(newPrefix, newSuffix) + } + } + + private def setResult(code: Code) { + result = code + latch.countDown() + } + + private def waitUntilResolved(): Code = { + latch.await() + result + } + + def create() { + val index = path.indexOf('/', 1) match { + case -1 => path.length + case x : Int => x + } + val prefix = path.substring(0, index) + val suffix = path.substring(index, path.length) + debug("Path: %s, Prefix: %s, Suffix: %s".format(path, prefix, suffix)) + createRecursive(prefix, suffix) + val result = waitUntilResolved() + info("Result of znode creation is: %s".format(result)) + result match { + case Code.OK => + // Nothing to do + case _ => + throw ZkException.create(KeeperException.create(result)) + } + } +} diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 9bd8171f484c1..2d18069884d18 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -66,7 +66,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging { @Test def testManualReplicaAssignment() { val brokers = List(0, 1, 2, 3, 4) - TestUtils.createBrokersInZk(zkClient, brokers) + TestUtils.createBrokersInZk(zkClient, zkConnection, brokers) // duplicate brokers intercept[IllegalArgumentException] { @@ -117,7 +117,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging { 11 -> 1 ) val topic = "test" - TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4)) + TestUtils.createBrokersInZk(zkClient, zkConnection, List(0, 1, 2, 3, 4)) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // create leaders for all partitions @@ -137,7 +137,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging { def testTopicCreationWithCollision() { val topic = "test.topic" val collidingTopic = "test_topic" - TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4)) + TestUtils.createBrokersInZk(zkClient, zkConnection, List(0, 1, 2, 3, 4)) // create the topic AdminUtils.createTopic(zkClient, topic, 3, 1) diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index 9bfec72261eaf..d4fa0d5c1b58a 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -36,7 +36,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging { val cleanupVal = "compact" // create brokers val brokers = List(0, 1, 2) - TestUtils.createBrokersInZk(zkClient, brokers) + TestUtils.createBrokersInZk(zkClient, zkConnection, brokers) // create the topic val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString, "--replication-factor", "1", @@ -67,7 +67,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging { // create brokers val brokers = List(0, 1, 2) - TestUtils.createBrokersInZk(zkClient, brokers) + TestUtils.createBrokersInZk(zkClient, zkConnection, brokers) // create the NormalTopic val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString, diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index ff1783096ef88..ac347ef307663 100755 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -136,7 +136,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness { new LeaderAndIsrRequest.EndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port) } - val controllerContext = new ControllerContext(zkClient, 6000) + val controllerContext = new ControllerContext(zkClient, zkConnection, 6000) controllerContext.liveBrokers = brokers.toSet val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, new SystemTime, new Metrics) controllerChannelManager.startup() diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index b01adc860b416..7f482fb61adcf 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -31,7 +31,7 @@ import org.apache.kafka.common.utils.Utils._ import collection.mutable.ListBuffer -import org.I0Itec.zkclient.ZkClient +import org.I0Itec.zkclient.{ZkClient, ZkConnection} import kafka.server._ import kafka.producer._ @@ -500,9 +500,9 @@ object TestUtils extends Logging { } } - def createBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = { + def createBrokersInZk(zkClient: ZkClient, zkConnection: ZkConnection, ids: Seq[Int]): Seq[Broker] = { val brokers = ids.map(id => new Broker(id, "localhost", 6667, SecurityProtocol.PLAINTEXT)) - brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, "localhost", 6667, b.endPoints, 6000, jmxPort = -1)) + brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, zkConnection, b.id, "localhost", 6667, b.endPoints, jmxPort = -1)) brokers } diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala index f240e89b67d3f..2bf658c5f244c 100644 --- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala @@ -19,7 +19,13 @@ package kafka.zk import kafka.consumer.ConsumerConfig import kafka.utils.ZkUtils +import kafka.utils.ZKCheckedEphemeral import kafka.utils.TestUtils +import org.apache.zookeeper.CreateMode +import org.apache.zookeeper.WatchedEvent +import org.apache.zookeeper.Watcher +import org.apache.zookeeper.ZooDefs.Ids +import org.I0Itec.zkclient.exception.{ZkException,ZkNodeExistsException} import org.junit.{Test, Assert} class ZKEphemeralTest extends ZooKeeperTestHarness { @@ -44,4 +50,96 @@ class ZKEphemeralTest extends ZooKeeperTestHarness { val nodeExists = ZkUtils.pathExists(zkClient, "/tmp/zktest") Assert.assertFalse(nodeExists) } + + /***** + ***** Tests for ZkWatchedEphemeral + *****/ + + /** + * Tests basic creation + */ + @Test + def testZkWatchedEphemeral = { + val path = "/zwe-test" + testCreation(path) + } + + /** + * Tests recursive creation + */ + @Test + def testZkWatchedEphemeralRecursive = { + val path = "/zwe-test-parent/zwe-test" + testCreation(path) + } + + private def testCreation(path: String) { + val zk = zkConnection.getZookeeper + val zwe = new ZKCheckedEphemeral(path, "", zk) + var created = false + var counter = 10 + + zk.exists(path, new Watcher() { + def process(event: WatchedEvent) { + if(event.getType == Watcher.Event.EventType.NodeCreated) { + created = true + } + } + }) + zwe.create() + // Waits until the znode is created + TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, path), + "Znode %s wasn't created".format(path)) + } + + /** + * Tests that it fails in the presence of an overlapping + * session. + */ + @Test + def testOverlappingSessions = { + val path = "/zwe-test" + val zk1 = zkConnection.getZookeeper + + //Creates a second session + val (_, zkConnection2) = ZkUtils.createZkClientAndConnection(zkConnect, zkSessionTimeoutMs, zkConnectionTimeout) + val zk2 = zkConnection2.getZookeeper + var zwe = new ZKCheckedEphemeral(path, "", zk2) + + // Creates znode for path in the first session + zk1.create(path, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) + + //Bootstraps the ZKWatchedEphemeral object + var gotException = false; + try { + zwe.create() + } catch { + case e: ZkNodeExistsException => + gotException = true + } + Assert.assertTrue(gotException) + } + + /** + * Tests if succeeds with znode from the same session + * + */ + @Test + def testSameSession = { + val path = "/zwe-test" + val zk = zkConnection.getZookeeper + // Creates znode for path in the first session + zk.create(path, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) + + var zwe = new ZKCheckedEphemeral(path, "", zk) + //Bootstraps the ZKWatchedEphemeral object + var gotException = false; + try { + zwe.create() + } catch { + case e: ZkNodeExistsException => + gotException = true + } + Assert.assertFalse(gotException) + } } diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index e4bfb48c2602d..3e1c6e0ceecc1 100755 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -17,7 +17,7 @@ package kafka.zk -import org.I0Itec.zkclient.ZkClient +import org.I0Itec.zkclient.{ZkClient, ZkConnection} import kafka.utils.{ZkUtils, CoreUtils} import org.junit.{After, Before} import org.scalatest.junit.JUnitSuite @@ -26,6 +26,7 @@ trait ZooKeeperTestHarness extends JUnitSuite { var zkPort: Int = -1 var zookeeper: EmbeddedZookeeper = null var zkClient: ZkClient = null + var zkConnection : ZkConnection = null val zkConnectionTimeout = 6000 val zkSessionTimeout = 6000 @@ -35,7 +36,9 @@ trait ZooKeeperTestHarness extends JUnitSuite { def setUp() { zookeeper = new EmbeddedZookeeper() zkPort = zookeeper.port - zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout) + val (client, connection) = ZkUtils.createZkClientAndConnection(zkConnect, zkSessionTimeout, zkConnectionTimeout) + zkClient = client + zkConnection = connection } @After