Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 22 additions & 21 deletions core/src/main/scala/kafka/utils/ZkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package kafka.utils

import java.util.Collections
import java.util.concurrent.CountDownLatch

import kafka.admin._
Expand All @@ -42,6 +41,7 @@ import scala.collection.JavaConverters._

object ZkUtils {

private val UseDefaultAcls = new java.util.ArrayList[ACL]

// Important: it is necessary to add any new top level Zookeeper path here
val AdminPath = "/admin"
Expand Down Expand Up @@ -107,7 +107,7 @@ object ZkUtils {
}

def sensitivePath(path: String): Boolean = {
path != null && !SensitiveZkRootPaths.forall(!path.startsWith(_))
path != null && SensitiveZkRootPaths.exists(path.startsWith(_))
}

@deprecated("This is deprecated, use defaultAcls(isSecure, path) which doesn't make sensitive data world readable", since = "0.10.2.1")
Expand Down Expand Up @@ -235,10 +235,11 @@ class ZkUtils(val zkClient: ZkClient,
IsrChangeNotificationPath,
PidBlockPath)

import ZkUtils._

@deprecated("This is deprecated, use defaultAcls(path) which doesn't make sensitive data world readable", since = "0.10.2.1")
val DefaultAcls: java.util.List[ACL] = ZkUtils.defaultAcls(isSecure, "")

private val useDefaultAcl = Collections.emptyList[ACL]
def defaultAcls(path: String): java.util.List[ACL] = ZkUtils.defaultAcls(isSecure, path)

def getController(): Int = {
Expand Down Expand Up @@ -432,11 +433,11 @@ class ZkUtils(val zkClient: ZkClient,
/**
* make sure a persistent path exists in ZK. Create the path if not exist.
*/
def makeSurePersistentPathExists(path: String, acls: java.util.List[ACL] = useDefaultAcl) {
def makeSurePersistentPathExists(path: String, acls: java.util.List[ACL] = UseDefaultAcls) {
//Consumer path is kept open as different consumers will write under this node.
val acl = if (path == null || path.isEmpty || path.equals(ConsumersPath)) {
ZooDefs.Ids.OPEN_ACL_UNSAFE
} else if (acls == useDefaultAcl) {
} else if (acls eq UseDefaultAcls) {
ZkUtils.defaultAcls(isSecure, path)
} else {
acls
Expand All @@ -449,8 +450,8 @@ class ZkUtils(val zkClient: ZkClient,
/**
* create the parent path
*/
private def createParentPath(path: String, acls: java.util.List[ACL] = useDefaultAcl): Unit = {
val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls
private def createParentPath(path: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
val parentDir = path.substring(0, path.lastIndexOf('/'))
if (parentDir.length != 0) {
ZkPath.createPersistent(zkClient, parentDir, createParents = true, acl)
Expand All @@ -460,8 +461,8 @@ class ZkUtils(val zkClient: ZkClient,
/**
* Create an ephemeral node with the given path and data. Create parents if necessary.
*/
private def createEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = useDefaultAcl): Unit = {
val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls
private def createEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
try {
ZkPath.createEphemeral(zkClient, path, data, acl)
} catch {
Expand All @@ -475,8 +476,8 @@ class ZkUtils(val zkClient: ZkClient,
* Create an ephemeral node with the given path and data.
* Throw NodeExistException if node already exists.
*/
def createEphemeralPathExpectConflict(path: String, data: String, acls: java.util.List[ACL] = useDefaultAcl): Unit = {
val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls
def createEphemeralPathExpectConflict(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
try {
createEphemeralPath(path, data, acl)
} catch {
Expand All @@ -501,8 +502,8 @@ class ZkUtils(val zkClient: ZkClient,
/**
* Create a persistent node with the given path and data. Create parents if necessary.
*/
def createPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = useDefaultAcl): Unit = {
val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls
def createPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
try {
ZkPath.createPersistent(zkClient, path, data, acl)
} catch {
Expand All @@ -512,8 +513,8 @@ class ZkUtils(val zkClient: ZkClient,
}
}

def createSequentialPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = useDefaultAcl): String = {
val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls
def createSequentialPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = UseDefaultAcls): String = {
val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
ZkPath.createPersistentSequential(zkClient, path, data, acl)
}

Expand All @@ -522,8 +523,8 @@ class ZkUtils(val zkClient: ZkClient,
* create parent directory if necessary. Never throw NodeExistException.
* Return the updated path zkVersion
*/
def updatePersistentPath(path: String, data: String, acls: java.util.List[ACL] = useDefaultAcl) = {
val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls
def updatePersistentPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls) = {
val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
try {
zkClient.writeData(path, data)
} catch {
Expand Down Expand Up @@ -593,8 +594,8 @@ class ZkUtils(val zkClient: ZkClient,
* Update the value of a persistent node with the given path and data.
* create parent directory if necessary. Never throw NodeExistException.
*/
def updateEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = useDefaultAcl): Unit = {
val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls
def updateEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
try {
zkClient.writeData(path, data)
} catch {
Expand Down Expand Up @@ -871,8 +872,8 @@ class ZkUtils(val zkClient: ZkClient,
* It uses the stat returned by the zookeeper and return the version. Every time
* client updates the path stat.version gets incremented. Starting value of sequence number is 1.
*/
def getSequenceId(path: String, acls: java.util.List[ACL] = useDefaultAcl): Int = {
val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls
def getSequenceId(path: String, acls: java.util.List[ACL] = UseDefaultAcls): Int = {
val acl = if (acls == UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
def writeToZk: Int = zkClient.writeDataReturnStat(path, "", -1).getVersion
try {
writeToZk
Expand Down