Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions core/src/main/scala/kafka/utils/ZKLock.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.utils

import org.I0Itec.zkclient.ZkClient

/**
* This class implements a ZK based exclusive reeentrant lock, according to classic recipe.
* After Curator integration this class should be replaced by a respective recipe in Curator.
*/
class ZKLock {

private[this] var lockPath: String = null
private[this] var childLockPath: String = null
private[this] var zkClient: ZkClient = null
private[this] var backoffMs: Int = 0
private[this] val DEFAULT_BACKOFF_MS: Int = 500

@volatile var holdsLock: Boolean = false

def this(zkClient: ZkClient, path: String, backoff: Option[Int] = None) {
this()

if (zkClient == null)
throw new IllegalArgumentException("zkClient cannot be null")
this.zkClient = zkClient

if (path == null || path.isEmpty())
throw new IllegalArgumentException("Invalid lock path: %s".format(path))
this.lockPath = path

this.backoffMs = backoff match {
case Some(x) => x
case None => DEFAULT_BACKOFF_MS
}
}

@throws(classOf[InterruptedException])
def acquire(): Unit = {
this.synchronized {
if (holdsLock)
return

if (!ZkUtils.pathExists(zkClient, lockPath))
throw new RuntimeException("Persistent lock path '%s' doesn't exists".format(lockPath))

var acquired: Boolean = false
do {
acquired = true

childLockPath = ZkUtils.createEphemeralSequential(zkClient, lockPath + "/exclusive-lock-", "")
val candidateId: Long = childLockPath.substring(childLockPath.lastIndexOf("-") + 1).toLong

val children = ZkUtils.getChildren(zkClient, lockPath)
for (child <- children) {
val childId: Long = child.substring(child.lastIndexOf("-") + 1).toLong
if (childId < candidateId)
acquired = false
}

if (!acquired) {
ZkUtils.deletePath(zkClient, childLockPath)
childLockPath = null
Thread.sleep(backoffMs)
}
} while (!acquired)
holdsLock = acquired
}

}

def release(): Unit = {
this.synchronized {
if (!holdsLock)
throw new IllegalStateException("Locking is not held for path '%s'".format(childLockPath))
ZkUtils.deletePath(zkClient, childLockPath)
}
}
}
31 changes: 28 additions & 3 deletions core/src/main/scala/kafka/utils/ZkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ object ZkUtils extends Logging {
val ConsumersPath = "/consumers"
val BrokerIdsPath = "/brokers/ids"
val BrokerTopicsPath = "/brokers/topics"
val BrokerRegisterLocksPath = "/brokers/register-locks"
val ControllerPath = "/controller"
val ControllerEpochPath = "/controller_epoch"
val ReassignPartitionsPath = "/admin/reassign_partitions"
Expand All @@ -62,6 +63,7 @@ object ZkUtils extends Logging {
val persistentZkPaths = Seq(ConsumersPath,
BrokerIdsPath,
BrokerTopicsPath,
BrokerRegisterLocksPath,
EntityConfigChangesPath,
ZkUtils.getEntityConfigRootPath(ConfigType.Topic),
ZkUtils.getEntityConfigRootPath(ConfigType.Client),
Expand Down Expand Up @@ -207,11 +209,25 @@ object ZkUtils extends Logging {
def registerBrokerInZk(zkClient: ZkClient, zkConnection: ZkConnection, id: Int, host: String, port: Int, advertisedEndpoints: immutable.Map[SecurityProtocol, EndPoint], jmxPort: Int) {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
val timestamp = SystemTime.milliseconds.toString
val zkLock = new ZKLock(zkClient, ZkUtils.BrokerRegisterLocksPath, Some(5))
zkLock.acquire()
try {
verifyUniqueHostPort(host, port, zkClient)
val brokerInfo = Json.encode(Map("version" -> 2, "host" -> host, "port" -> port, "endpoints"->advertisedEndpoints.values.map(_.connectionString).toArray, "jmx_port" -> jmxPort, "timestamp" -> timestamp))
registerBrokerInZk(zkClient, zkConnection, brokerIdPath, brokerInfo)

val brokerInfo = Json.encode(Map("version" -> 2, "host" -> host, "port" -> port, "endpoints"->advertisedEndpoints.values.map(_.connectionString).toArray, "jmx_port" -> jmxPort, "timestamp" -> timestamp))
registerBrokerInZk(zkClient, zkConnection, brokerIdPath, brokerInfo)
info("Registered broker %d at path %s with addresses: %s".format(id, brokerIdPath, advertisedEndpoints.mkString(",")))
} finally {
zkLock.release()
}
}

info("Registered broker %d at path %s with addresses: %s".format(id, brokerIdPath, advertisedEndpoints.mkString(",")))

private def verifyUniqueHostPort(host:String, port:Int, zkClient: ZkClient): Unit = {
val allBrokers = ZkUtils.getAllBrokersInCluster(zkClient)
for (e <- allBrokers flatMap { broker => broker.endPoints.values })
if (e.host == host && e.port == port)
throw new RuntimeException("Host/port combination %s:%d is already registered by an existing broker".format(host, port))
}

private def registerBrokerInZk(zkClient: ZkClient, zkConnection: ZkConnection, brokerIdPath: String, brokerInfo: String) {
Expand Down Expand Up @@ -326,6 +342,10 @@ object ZkUtils extends Logging {
ZkPath.createPersistentSequential(client, path, data)
}

def createEphemeralSequential(client: ZkClient, path: String, data: String = ""): String = {
ZkPath.createEphemeralSequential(client, path, data)
}

/**
* Update the value of a persistent node with the given path and data.
* create parrent directory if necessary. Never throw NodeExistException.
Expand Down Expand Up @@ -863,6 +883,11 @@ object ZkPath {
checkNamespace(client)
client.createPersistentSequential(path, data)
}

def createEphemeralSequential(client: ZkClient, path: String, data: Object): String = {
checkNamespace(client)
client.createEphemeralSequential(path, data)
}
}

/**
Expand Down
16 changes: 16 additions & 0 deletions core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package kafka.server

import kafka.cluster.Broker
import kafka.utils.ZkUtils
import kafka.utils.CoreUtils
import kafka.utils.TestUtils

import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.Assert._
import org.junit.Test

Expand Down Expand Up @@ -68,4 +70,18 @@ class ServerStartupTest extends ZooKeeperTestHarness {
server1.shutdown()
CoreUtils.rm(server1.config.logDirs)
}

@Test
def testConflictBrokerRegistrationWithSameHostAndPort() {

val brokers = List(0, 1)

intercept[RuntimeException] {
for (id <- brokers) {
TestUtils.maybeCreateBrokerRegisterLockPath(zkClient)
val broker = new Broker(id, "localhost", 6667, SecurityProtocol.PLAINTEXT)
ZkUtils.registerBrokerInZk(zkClient, id, "localhost", 6667, broker.endPoints, 6000, jmxPort = -1)
}
}
}
}
14 changes: 13 additions & 1 deletion core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,17 @@ object TestUtils extends Logging {
bytes
}

def maybeCreateBrokerRegisterLockPath(zkClient: ZkClient): Unit = {
if (!ZkUtils.pathExists(zkClient, ZkUtils.BrokerRegisterLocksPath)) {
try {
ZkUtils.createPersistentPath(zkClient, ZkUtils.BrokerRegisterLocksPath, "")
}
catch {
case e: Exception => // do nothing
}
}
}

/**
* Generate a random string of letters and digits of the given length
* @param len The length of the string
Expand Down Expand Up @@ -501,13 +512,14 @@ object TestUtils extends Logging {
}

def createBrokersInZk(zkClient: ZkClient, zkConnection: ZkConnection, ids: Seq[Int]): Seq[Broker] = {
maybeCreateBrokerRegisterLockPath(zkClient)
val brokers = ids.map(id => new Broker(id, "localhost", 6667, SecurityProtocol.PLAINTEXT))
brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, zkConnection, b.id, "localhost", 6667, b.endPoints, jmxPort = -1))
brokers
}

def deleteBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = {
val brokers = ids.map(id => new Broker(id, "localhost", 6667, SecurityProtocol.PLAINTEXT))
val brokers = ids.map(id => new Broker(id, "localhost", 6667 + id, SecurityProtocol.PLAINTEXT))
brokers.foreach(b => ZkUtils.deletePath(zkClient, ZkUtils.BrokerIdsPath + "/" + b))
brokers
}
Expand Down
90 changes: 90 additions & 0 deletions core/src/test/scala/unit/kafka/zk/ZKLockTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package unit.kafka.zk

import java.util.concurrent.CountDownLatch

import org.junit.Assert.assertEquals

import kafka.utils.{ZKLock, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
import java.util.concurrent.atomic.AtomicInteger

import org.junit.Assert._
import org.junit.{Test, Before}

class ZKLockTest extends ZooKeeperTestHarness {

val lockPath: String = "/lock_dir"
val zkSessionTimeoutMs = 1000
val zkConnectionTimeoutMs = 3000

@Test
def testExclusiveZKLock {

val zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, zkConnectionTimeoutMs)

ZkUtils.createPersistentPath(zkClient, lockPath)

assertTrue(ZkUtils.pathExists(zkClient, lockPath))

val threads = 10
var update = 0
var notUpdated = 0
val lockHolders = new AtomicInteger();
val startBarrier = new CountDownLatch(1)
val finishBarrier = new CountDownLatch(threads)

val task = new Runnable () {
override def run {

startBarrier.await() // wait for all threads to be started
val zkLock = new ZKLock(zkClient, lockPath)
try {
zkLock.acquire()
assertEquals(1, lockHolders.incrementAndGet())
if (update == 0)
update += 1
else
notUpdated += 1
}
catch {
case e: Throwable => fail(e.getMessage)
}
finally {
finishBarrier.countDown()
assertEquals(0, lockHolders.decrementAndGet())
zkLock.release()
}
}
}

for (i <- 1 to threads)
new Thread(task).start()

startBarrier.countDown() // start threads
finishBarrier.await() // wait for all threads to finish

assertEquals(0, ZkUtils.getChildren(zkClient, lockPath).size)

assertEquals(1, update)
assertEquals(9, notUpdated)

zkClient.close()
}
}