From 7c589e60f58d81fb1922ef6f5204be53af4d193c Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Fri, 8 May 2015 15:38:48 -0700 Subject: [PATCH 1/4] KAFKA-1695: Zookeeper acls should be set so only broker can modify zk entries with the exception of /consumers. --- core/src/main/scala/kafka/utils/ZkUtils.scala | 103 +++++++++++++----- 1 file changed, 74 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 4ae310e08d5f2..5794e816d2526 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -17,6 +17,11 @@ package kafka.utils +import java.io.File +import java.net.URI +import java.security.URIParameter +import javax.security.auth.login.Configuration + import kafka.cluster._ import kafka.consumer.{ConsumerThreadId, TopicCount} import kafka.server.ConfigType @@ -26,9 +31,11 @@ import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, import org.I0Itec.zkclient.serialize.ZkSerializer import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.protocol.SecurityProtocol -import collection._ + +import org.apache.zookeeper.ZooDefs +import scala.collection._ import kafka.api.LeaderAndIsr -import org.apache.zookeeper.data.Stat +import org.apache.zookeeper.data.{ACL, Stat} import kafka.admin._ import kafka.common.{KafkaException, NoEpochForPartitionException} import kafka.controller.ReassignedPartitionsContext @@ -50,6 +57,36 @@ object ZkUtils extends Logging { val EntityConfigPath = "/config" val EntityConfigChangesPath = "/config/changes" + /** true if java.security.auth.login.config is set to some jaas file which has "Client" entry. **/ + + val isSecure: Boolean = { + val loginConfigurationFile: String = System.getProperty("java.security.auth.login.config") + if (loginConfigurationFile != null && loginConfigurationFile.length > 0) { + val config_file: File = new File(loginConfigurationFile) + if (!config_file.canRead) { + throw new KafkaException("File " + loginConfigurationFile + " cannot be read.") + } + try { + val config_uri: URI = config_file.toURI + val login_conf = Configuration.getInstance("JavaLoginConfig", new URIParameter(config_uri)) + login_conf.getAppConfigurationEntry("Client") != null + } catch { + case ex: Exception => { + throw new KafkaException(ex) + } + } + } + false + } + + val DefaultAcls: List[ACL] = if (isSecure) { + import scala.collection.JavaConversions._ + (ZooDefs.Ids.CREATOR_ALL_ACL ++ ZooDefs.Ids.READ_ACL_UNSAFE).toList + } else { + import scala.collection.JavaConversions._ + ZooDefs.Ids.OPEN_ACL_UNSAFE.toList + } + def getTopicPath(topic: String): String = { BrokerTopicsPath + "/" + topic } @@ -240,31 +277,37 @@ object ZkUtils extends Logging { /** * make sure a persistent path exists in ZK. Create the path if not exist. */ - def makeSurePersistentPathExists(client: ZkClient, path: String) { + def makeSurePersistentPathExists(client: ZkClient, path: String, acls: List[ACL] = DefaultAcls) { + //Consumer path is kept open as different consumers will write under this node. + val acl = if(path == null || path.isEmpty || path.equals(ConsumersPath)) { + import scala.collection.JavaConversions._ + ZooDefs.Ids.OPEN_ACL_UNSAFE.toList + } else acls + if (!client.exists(path)) - ZkPath.createPersistent(client, path, true) //won't throw NoNodeException or NodeExistsException + ZkPath.createPersistent(client, path, true, acl) //won't throw NoNodeException or NodeExistsException } /** * create the parent path */ - private def createParentPath(client: ZkClient, path: String): Unit = { + private def createParentPath(client: ZkClient, path: String, acls: List[ACL] = DefaultAcls): Unit = { val parentDir = path.substring(0, path.lastIndexOf('/')) if (parentDir.length != 0) { - ZkPath.createPersistent(client, parentDir, true) + ZkPath.createPersistent(client, parentDir, true, acls) } } /** * Create an ephemeral node with the given path and data. Create parents if necessary. */ - private def createEphemeralPath(client: ZkClient, path: String, data: String): Unit = { + private def createEphemeralPath(client: ZkClient, path: String, data: String, acls: List[ACL] = DefaultAcls): Unit = { try { - ZkPath.createEphemeral(client, path, data) + ZkPath.createEphemeral(client, path, data, acls) } catch { case e: ZkNoNodeException => { createParentPath(client, path) - ZkPath.createEphemeral(client, path, data) + ZkPath.createEphemeral(client, path, data, acls) } } } @@ -342,19 +385,19 @@ object ZkUtils extends Logging { /** * Create an persistent node with the given path and data. Create parents if necessary. */ - def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = { + def createPersistentPath(client: ZkClient, path: String, data: String = "", acls: List[ACL] = DefaultAcls): Unit = { try { - ZkPath.createPersistent(client, path, data) + ZkPath.createPersistent(client, path, data, acls) } catch { case e: ZkNoNodeException => { createParentPath(client, path) - ZkPath.createPersistent(client, path, data) + ZkPath.createPersistent(client, path, data, acls) } } } - def createSequentialPersistentPath(client: ZkClient, path: String, data: String = ""): String = { - ZkPath.createPersistentSequential(client, path, data) + def createSequentialPersistentPath(client: ZkClient, path: String, data: String = "", acls: List[ACL] = DefaultAcls): String = { + ZkPath.createPersistentSequential(client, path, data, acls) } /** @@ -362,14 +405,14 @@ object ZkUtils extends Logging { * create parrent directory if necessary. Never throw NodeExistException. * Return the updated path zkVersion */ - def updatePersistentPath(client: ZkClient, path: String, data: String) = { + def updatePersistentPath(client: ZkClient, path: String, data: String, acls: List[ACL] = DefaultAcls) = { try { client.writeData(path, data) } catch { case e: ZkNoNodeException => { createParentPath(client, path) try { - ZkPath.createPersistent(client, path, data) + ZkPath.createPersistent(client, path, data, acls) } catch { case e: ZkNodeExistsException => client.writeData(path, data) @@ -434,13 +477,13 @@ object ZkUtils extends Logging { * Update the value of a persistent node with the given path and data. * create parrent directory if necessary. Never throw NodeExistException. */ - def updateEphemeralPath(client: ZkClient, path: String, data: String): Unit = { + def updateEphemeralPath(client: ZkClient, path: String, data: String, acls: List[ACL] = DefaultAcls): Unit = { try { client.writeData(path, data) } catch { case e: ZkNoNodeException => { createParentPath(client, path) - ZkPath.createEphemeral(client, path, data) + ZkPath.createEphemeral(client, path, data, acls) } case e2: Throwable => throw e2 } @@ -736,15 +779,16 @@ object ZkUtils extends Logging { * It uses the stat returned by the zookeeper and return the version. Every time * client updates the path stat.version gets incremented */ - def getSequenceId(client: ZkClient, path: String): Int = { + def getSequenceId(client: ZkClient, path: String, acls: List[ACL] = DefaultAcls): Int = { try { val stat = client.writeDataReturnStat(path, "", -1) stat.getVersion } catch { case e: ZkNoNodeException => { - createParentPath(client, BrokerSequenceIdPath) + createParentPath(client, BrokerSequenceIdPath, acls) try { - client.createPersistent(BrokerSequenceIdPath, "") + import scala.collection.JavaConversions._ + client.createPersistent(BrokerSequenceIdPath, "", acls) 0 } catch { case e: ZkNodeExistsException => @@ -854,6 +898,7 @@ class ZKConfig(props: VerifiableProperties) { object ZkPath { @volatile private var isNamespacePresent: Boolean = false + import scala.collection.JavaConversions._ def checkNamespace(client: ZkClient) { if(isNamespacePresent) @@ -869,23 +914,23 @@ object ZkPath { isNamespacePresent = false } - def createPersistent(client: ZkClient, path: String, data: Object) { + def createPersistent(client: ZkClient, path: String, data: Object, acls: List[ACL]) { checkNamespace(client) - client.createPersistent(path, data) + client.createPersistent(path, data, acls) } - def createPersistent(client: ZkClient, path: String, createParents: Boolean) { + def createPersistent(client: ZkClient, path: String, createParents: Boolean, acls: List[ACL]) { checkNamespace(client) - client.createPersistent(path, createParents) + client.createPersistent(path, createParents, acls) } - def createEphemeral(client: ZkClient, path: String, data: Object) { + def createEphemeral(client: ZkClient, path: String, data: Object, acls: List[ACL]) { checkNamespace(client) - client.createEphemeral(path, data) + client.createEphemeral(path, data, acls) } - def createPersistentSequential(client: ZkClient, path: String, data: Object): String = { + def createPersistentSequential(client: ZkClient, path: String, data: Object, acls: List[ACL]): String = { checkNamespace(client) - client.createPersistentSequential(path, data) + client.createPersistentSequential(path, data, acls) } } From d2dfd718715d2da31db059b7512ce1c60ecdeb88 Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Fri, 24 Jul 2015 14:52:34 -0700 Subject: [PATCH 2/4] KAFKA-1695: BugFix. --- core/src/main/scala/kafka/utils/ZkUtils.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 5794e816d2526..ffd551c2b6e72 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -61,6 +61,7 @@ object ZkUtils extends Logging { val isSecure: Boolean = { val loginConfigurationFile: String = System.getProperty("java.security.auth.login.config") + var isSecurityEnabled = false if (loginConfigurationFile != null && loginConfigurationFile.length > 0) { val config_file: File = new File(loginConfigurationFile) if (!config_file.canRead) { @@ -69,14 +70,14 @@ object ZkUtils extends Logging { try { val config_uri: URI = config_file.toURI val login_conf = Configuration.getInstance("JavaLoginConfig", new URIParameter(config_uri)) - login_conf.getAppConfigurationEntry("Client") != null + isSecurityEnabled = login_conf.getAppConfigurationEntry("Client") != null } catch { case ex: Exception => { throw new KafkaException(ex) } } } - false + isSecurityEnabled } val DefaultAcls: List[ACL] = if (isSecure) { From f59b3c9841fa59f3220fb12e3ede874cd55fda3b Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Mon, 24 Aug 2015 12:45:31 -0700 Subject: [PATCH 3/4] Addressing Ismael's comment. --- core/src/main/scala/kafka/utils/ZkUtils.scala | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index ffd551c2b6e72..fe650bfeb5b88 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -33,6 +33,7 @@ import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.zookeeper.ZooDefs +import scala.collection.JavaConverters._ import scala.collection._ import kafka.api.LeaderAndIsr import org.apache.zookeeper.data.{ACL, Stat} @@ -63,14 +64,14 @@ object ZkUtils extends Logging { val loginConfigurationFile: String = System.getProperty("java.security.auth.login.config") var isSecurityEnabled = false if (loginConfigurationFile != null && loginConfigurationFile.length > 0) { - val config_file: File = new File(loginConfigurationFile) - if (!config_file.canRead) { - throw new KafkaException("File " + loginConfigurationFile + " cannot be read.") + val configFile: File = new File(loginConfigurationFile) + if (!configFile.canRead) { + throw new KafkaException(s"File $loginConfigurationFile cannot be read.") } try { - val config_uri: URI = config_file.toURI - val login_conf = Configuration.getInstance("JavaLoginConfig", new URIParameter(config_uri)) - isSecurityEnabled = login_conf.getAppConfigurationEntry("Client") != null + val configUri: URI = configFile.toURI + val loginConf = Configuration.getInstance("JavaLoginConfig", new URIParameter(configUri)) + isSecurityEnabled = loginConf.getAppConfigurationEntry("Client") != null } catch { case ex: Exception => { throw new KafkaException(ex) @@ -81,11 +82,9 @@ object ZkUtils extends Logging { } val DefaultAcls: List[ACL] = if (isSecure) { - import scala.collection.JavaConversions._ - (ZooDefs.Ids.CREATOR_ALL_ACL ++ ZooDefs.Ids.READ_ACL_UNSAFE).toList + (ZooDefs.Ids.CREATOR_ALL_ACL.asScala ++ ZooDefs.Ids.READ_ACL_UNSAFE.asScala).toList } else { - import scala.collection.JavaConversions._ - ZooDefs.Ids.OPEN_ACL_UNSAFE.toList + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala.toList } def getTopicPath(topic: String): String = { From a9bdf5e81e97ae8772bc5c3d7e12236470714e34 Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Tue, 25 Aug 2015 15:07:51 -0700 Subject: [PATCH 4/4] Addressing some more styling comments. --- core/src/main/scala/kafka/utils/ZkUtils.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index fe650bfeb5b88..a2544b13d8b90 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -62,7 +62,7 @@ object ZkUtils extends Logging { val isSecure: Boolean = { val loginConfigurationFile: String = System.getProperty("java.security.auth.login.config") - var isSecurityEnabled = false + var isSecurityEnabled = false if (loginConfigurationFile != null && loginConfigurationFile.length > 0) { val configFile: File = new File(loginConfigurationFile) if (!configFile.canRead) { @@ -279,9 +279,8 @@ object ZkUtils extends Logging { */ def makeSurePersistentPathExists(client: ZkClient, path: String, acls: List[ACL] = DefaultAcls) { //Consumer path is kept open as different consumers will write under this node. - val acl = if(path == null || path.isEmpty || path.equals(ConsumersPath)) { - import scala.collection.JavaConversions._ - ZooDefs.Ids.OPEN_ACL_UNSAFE.toList + val acl = if (path == null || path.isEmpty || path.equals(ConsumersPath)) { + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala.toList } else acls if (!client.exists(path))