diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 4ae310e08d5f2..a2544b13d8b90 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -17,6 +17,11 @@ package kafka.utils +import java.io.File +import java.net.URI +import java.security.URIParameter +import javax.security.auth.login.Configuration + import kafka.cluster._ import kafka.consumer.{ConsumerThreadId, TopicCount} import kafka.server.ConfigType @@ -26,9 +31,12 @@ import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, import org.I0Itec.zkclient.serialize.ZkSerializer import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.protocol.SecurityProtocol -import collection._ + +import org.apache.zookeeper.ZooDefs +import scala.collection.JavaConverters._ +import scala.collection._ import kafka.api.LeaderAndIsr -import org.apache.zookeeper.data.Stat +import org.apache.zookeeper.data.{ACL, Stat} import kafka.admin._ import kafka.common.{KafkaException, NoEpochForPartitionException} import kafka.controller.ReassignedPartitionsContext @@ -50,6 +58,35 @@ object ZkUtils extends Logging { val EntityConfigPath = "/config" val EntityConfigChangesPath = "/config/changes" + /** true if java.security.auth.login.config is set to some jaas file which has "Client" entry. **/ + + val isSecure: Boolean = { + val loginConfigurationFile: String = System.getProperty("java.security.auth.login.config") + var isSecurityEnabled = false + if (loginConfigurationFile != null && loginConfigurationFile.length > 0) { + val configFile: File = new File(loginConfigurationFile) + if (!configFile.canRead) { + throw new KafkaException(s"File $loginConfigurationFile cannot be read.") + } + try { + val configUri: URI = configFile.toURI + val loginConf = Configuration.getInstance("JavaLoginConfig", new URIParameter(configUri)) + isSecurityEnabled = loginConf.getAppConfigurationEntry("Client") != null + } catch { + case ex: Exception => { + throw new KafkaException(ex) + } + } + } + isSecurityEnabled + } + + val DefaultAcls: List[ACL] = if (isSecure) { + (ZooDefs.Ids.CREATOR_ALL_ACL.asScala ++ ZooDefs.Ids.READ_ACL_UNSAFE.asScala).toList + } else { + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala.toList + } + def getTopicPath(topic: String): String = { BrokerTopicsPath + "/" + topic } @@ -240,31 +277,36 @@ object ZkUtils extends Logging { /** * make sure a persistent path exists in ZK. Create the path if not exist. */ - def makeSurePersistentPathExists(client: ZkClient, path: String) { + def makeSurePersistentPathExists(client: ZkClient, path: String, acls: List[ACL] = DefaultAcls) { + //Consumer path is kept open as different consumers will write under this node. + val acl = if (path == null || path.isEmpty || path.equals(ConsumersPath)) { + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala.toList + } else acls + if (!client.exists(path)) - ZkPath.createPersistent(client, path, true) //won't throw NoNodeException or NodeExistsException + ZkPath.createPersistent(client, path, true, acl) //won't throw NoNodeException or NodeExistsException } /** * create the parent path */ - private def createParentPath(client: ZkClient, path: String): Unit = { + private def createParentPath(client: ZkClient, path: String, acls: List[ACL] = DefaultAcls): Unit = { val parentDir = path.substring(0, path.lastIndexOf('/')) if (parentDir.length != 0) { - ZkPath.createPersistent(client, parentDir, true) + ZkPath.createPersistent(client, parentDir, true, acls) } } /** * Create an ephemeral node with the given path and data. Create parents if necessary. */ - private def createEphemeralPath(client: ZkClient, path: String, data: String): Unit = { + private def createEphemeralPath(client: ZkClient, path: String, data: String, acls: List[ACL] = DefaultAcls): Unit = { try { - ZkPath.createEphemeral(client, path, data) + ZkPath.createEphemeral(client, path, data, acls) } catch { case e: ZkNoNodeException => { createParentPath(client, path) - ZkPath.createEphemeral(client, path, data) + ZkPath.createEphemeral(client, path, data, acls) } } } @@ -342,19 +384,19 @@ object ZkUtils extends Logging { /** * Create an persistent node with the given path and data. Create parents if necessary. */ - def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = { + def createPersistentPath(client: ZkClient, path: String, data: String = "", acls: List[ACL] = DefaultAcls): Unit = { try { - ZkPath.createPersistent(client, path, data) + ZkPath.createPersistent(client, path, data, acls) } catch { case e: ZkNoNodeException => { createParentPath(client, path) - ZkPath.createPersistent(client, path, data) + ZkPath.createPersistent(client, path, data, acls) } } } - def createSequentialPersistentPath(client: ZkClient, path: String, data: String = ""): String = { - ZkPath.createPersistentSequential(client, path, data) + def createSequentialPersistentPath(client: ZkClient, path: String, data: String = "", acls: List[ACL] = DefaultAcls): String = { + ZkPath.createPersistentSequential(client, path, data, acls) } /** @@ -362,14 +404,14 @@ object ZkUtils extends Logging { * create parrent directory if necessary. Never throw NodeExistException. * Return the updated path zkVersion */ - def updatePersistentPath(client: ZkClient, path: String, data: String) = { + def updatePersistentPath(client: ZkClient, path: String, data: String, acls: List[ACL] = DefaultAcls) = { try { client.writeData(path, data) } catch { case e: ZkNoNodeException => { createParentPath(client, path) try { - ZkPath.createPersistent(client, path, data) + ZkPath.createPersistent(client, path, data, acls) } catch { case e: ZkNodeExistsException => client.writeData(path, data) @@ -434,13 +476,13 @@ object ZkUtils extends Logging { * Update the value of a persistent node with the given path and data. * create parrent directory if necessary. Never throw NodeExistException. */ - def updateEphemeralPath(client: ZkClient, path: String, data: String): Unit = { + def updateEphemeralPath(client: ZkClient, path: String, data: String, acls: List[ACL] = DefaultAcls): Unit = { try { client.writeData(path, data) } catch { case e: ZkNoNodeException => { createParentPath(client, path) - ZkPath.createEphemeral(client, path, data) + ZkPath.createEphemeral(client, path, data, acls) } case e2: Throwable => throw e2 } @@ -736,15 +778,16 @@ object ZkUtils extends Logging { * It uses the stat returned by the zookeeper and return the version. Every time * client updates the path stat.version gets incremented */ - def getSequenceId(client: ZkClient, path: String): Int = { + def getSequenceId(client: ZkClient, path: String, acls: List[ACL] = DefaultAcls): Int = { try { val stat = client.writeDataReturnStat(path, "", -1) stat.getVersion } catch { case e: ZkNoNodeException => { - createParentPath(client, BrokerSequenceIdPath) + createParentPath(client, BrokerSequenceIdPath, acls) try { - client.createPersistent(BrokerSequenceIdPath, "") + import scala.collection.JavaConversions._ + client.createPersistent(BrokerSequenceIdPath, "", acls) 0 } catch { case e: ZkNodeExistsException => @@ -854,6 +897,7 @@ class ZKConfig(props: VerifiableProperties) { object ZkPath { @volatile private var isNamespacePresent: Boolean = false + import scala.collection.JavaConversions._ def checkNamespace(client: ZkClient) { if(isNamespacePresent) @@ -869,23 +913,23 @@ object ZkPath { isNamespacePresent = false } - def createPersistent(client: ZkClient, path: String, data: Object) { + def createPersistent(client: ZkClient, path: String, data: Object, acls: List[ACL]) { checkNamespace(client) - client.createPersistent(path, data) + client.createPersistent(path, data, acls) } - def createPersistent(client: ZkClient, path: String, createParents: Boolean) { + def createPersistent(client: ZkClient, path: String, createParents: Boolean, acls: List[ACL]) { checkNamespace(client) - client.createPersistent(path, createParents) + client.createPersistent(path, createParents, acls) } - def createEphemeral(client: ZkClient, path: String, data: Object) { + def createEphemeral(client: ZkClient, path: String, data: Object, acls: List[ACL]) { checkNamespace(client) - client.createEphemeral(path, data) + client.createEphemeral(path, data, acls) } - def createPersistentSequential(client: ZkClient, path: String, data: Object): String = { + def createPersistentSequential(client: ZkClient, path: String, data: Object, acls: List[ACL]): String = { checkNamespace(client) - client.createPersistentSequential(path, data) + client.createPersistentSequential(path, data, acls) } }