Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 73 additions & 29 deletions core/src/main/scala/kafka/utils/ZkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package kafka.utils

import java.io.File
import java.net.URI
import java.security.URIParameter
import javax.security.auth.login.Configuration

import kafka.cluster._
import kafka.consumer.{ConsumerThreadId, TopicCount}
import kafka.server.ConfigType
Expand All @@ -26,9 +31,12 @@ import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
import org.I0Itec.zkclient.serialize.ZkSerializer
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.protocol.SecurityProtocol
import collection._

import org.apache.zookeeper.ZooDefs
import scala.collection.JavaConverters._
import scala.collection._
import kafka.api.LeaderAndIsr
import org.apache.zookeeper.data.Stat
import org.apache.zookeeper.data.{ACL, Stat}
import kafka.admin._
import kafka.common.{KafkaException, NoEpochForPartitionException}
import kafka.controller.ReassignedPartitionsContext
Expand All @@ -50,6 +58,35 @@ object ZkUtils extends Logging {
val EntityConfigPath = "/config"
val EntityConfigChangesPath = "/config/changes"

/** true if java.security.auth.login.config is set to some jaas file which has "Client" entry. **/

val isSecure: Boolean = {
val loginConfigurationFile: String = System.getProperty("java.security.auth.login.config")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that this is how JAAS is normally configured, but we don't usually use system properties to configure Kafka. Should we be providing an alternative way?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could set the config in server.properties and set the system property during server startup but I am not sure what are we exactly providing by moving away from the defacto way of using jaas. Once we go that route we will have to deal with question of what to do if jaas property is passed through command line and server.properties also has the property set to a different jaas file. I feel doing it through server.properties will just cause more confusion and add un needed complexity.

ZkClient also assumes that the jaas file is configured using system property, it looks for the Client section and login using that section. If we don't set the system property at all we will have to duplicate that code in kafka.

In any case if we want to provide a server.properties and set the system property in kafka code, that should all happen outside of this code somewhere in KafkaServer. I think it is ok to assume here that the system property will be set by the time this code is invoked but I need to confirm when does Scala Object instances are initialized.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, both options have drawbacks. I think there's a benefit in being able to use server.properties instead of the system property, but it is true that it introduces some complexity. I'll leave it up to @gwenshap and/or @junrao to give their opinions.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding scala object initialisation, it happens on first reference.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the standard way of using JAAS is to get the config through system properties, we can keep it that way in Kafka.

var isSecurityEnabled = false
if (loginConfigurationFile != null && loginConfigurationFile.length > 0) {
val configFile: File = new File(loginConfigurationFile)
if (!configFile.canRead) {
throw new KafkaException(s"File $loginConfigurationFile cannot be read.")
}
try {
val configUri: URI = configFile.toURI
val loginConf = Configuration.getInstance("JavaLoginConfig", new URIParameter(configUri))
isSecurityEnabled = loginConf.getAppConfigurationEntry("Client") != null
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we add SASL for the broker, we need to pass in the JAAS login config for the broker, which can be different from that for ZK. So, instead of using "Client" as the name, perhaps, we should name it "ZookeeperClient" to make it clear?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Client name is defined by zookeeper , just like "KafkaClient" is defined/reserved by Kafka. Zookeeper's code specifically looks for a section "Client" and uses the information in that section for authentication so we can not change this name.

} catch {
case ex: Exception => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for opening and closing brackets, right?

throw new KafkaException(ex)
}
}
}
isSecurityEnabled
}

val DefaultAcls: List[ACL] = if (isSecure) {
(ZooDefs.Ids.CREATOR_ALL_ACL.asScala ++ ZooDefs.Ids.READ_ACL_UNSAFE.asScala).toList
} else {
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala.toList
}

def getTopicPath(topic: String): String = {
BrokerTopicsPath + "/" + topic
}
Expand Down Expand Up @@ -240,31 +277,36 @@ object ZkUtils extends Logging {
/**
* make sure a persistent path exists in ZK. Create the path if not exist.
*/
def makeSurePersistentPathExists(client: ZkClient, path: String) {
def makeSurePersistentPathExists(client: ZkClient, path: String, acls: List[ACL] = DefaultAcls) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to have a default parameter of DefaultAcls here? Is it right that we would want to be sure to pass this parameter (excluding tests)?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I looked at this in more detail and it seems like it's the other way around. No-one is passing the acls parameter at the moment. So, why is it a parameter? Is it because we think it may be useful in the future? Just trying to understand the reasoning, thanks.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for future. If you think this is preMature I can delete the extra param.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see the argument for both approaches. From an internal usage perspective, I think I would lean towards not having the argument just yet because it makes it easier to reason about (we don't have to check all the calls to see if other ACLs are being used). If other users are also using these methods as a way to interact with ZK, however, they may find it useful to be able to set their own ACLs. I don't have a strong opinion either way, I think.

//Consumer path is kept open as different consumers will write under this node.
val acl = if (path == null || path.isEmpty || path.equals(ConsumersPath)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should keep open for all paths prefixed with /ConsumerPath, right?

ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala.toList
} else acls
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I apologise if this has already been explained, but why do we need to do this only in makeSurePersistentPathExists and not other create*Path methods?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/consumers should always be created as part of makeSurePersistentPathExists as this is created during server startup and I explain below why consumer path is kept open. I don't see why /consumer will be created as part of any of the other create*Path method which is why i did not pollute the code with getDefaultAclForPath() calls.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, thanks.


if (!client.exists(path))
ZkPath.createPersistent(client, path, true) //won't throw NoNodeException or NodeExistsException
ZkPath.createPersistent(client, path, true, acl) //won't throw NoNodeException or NodeExistsException
}

/**
* create the parent path
*/
private def createParentPath(client: ZkClient, path: String): Unit = {
private def createParentPath(client: ZkClient, path: String, acls: List[ACL] = DefaultAcls): Unit = {
val parentDir = path.substring(0, path.lastIndexOf('/'))
if (parentDir.length != 0) {
ZkPath.createPersistent(client, parentDir, true)
ZkPath.createPersistent(client, parentDir, true, acls)
}
}

/**
* Create an ephemeral node with the given path and data. Create parents if necessary.
*/
private def createEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
private def createEphemeralPath(client: ZkClient, path: String, data: String, acls: List[ACL] = DefaultAcls): Unit = {
try {
ZkPath.createEphemeral(client, path, data)
ZkPath.createEphemeral(client, path, data, acls)
} catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
ZkPath.createEphemeral(client, path, data)
ZkPath.createEphemeral(client, path, data, acls)
}
}
}
Expand Down Expand Up @@ -342,34 +384,34 @@ object ZkUtils extends Logging {
/**
* Create an persistent node with the given path and data. Create parents if necessary.
*/
def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = {
def createPersistentPath(client: ZkClient, path: String, data: String = "", acls: List[ACL] = DefaultAcls): Unit = {
try {
ZkPath.createPersistent(client, path, data)
ZkPath.createPersistent(client, path, data, acls)
} catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
ZkPath.createPersistent(client, path, data)
ZkPath.createPersistent(client, path, data, acls)
}
}
}

def createSequentialPersistentPath(client: ZkClient, path: String, data: String = ""): String = {
ZkPath.createPersistentSequential(client, path, data)
def createSequentialPersistentPath(client: ZkClient, path: String, data: String = "", acls: List[ACL] = DefaultAcls): String = {
ZkPath.createPersistentSequential(client, path, data, acls)
}

/**
* Update the value of a persistent node with the given path and data.
* create parrent directory if necessary. Never throw NodeExistException.
* Return the updated path zkVersion
*/
def updatePersistentPath(client: ZkClient, path: String, data: String) = {
def updatePersistentPath(client: ZkClient, path: String, data: String, acls: List[ACL] = DefaultAcls) = {
try {
client.writeData(path, data)
} catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
try {
ZkPath.createPersistent(client, path, data)
ZkPath.createPersistent(client, path, data, acls)
} catch {
case e: ZkNodeExistsException =>
client.writeData(path, data)
Expand Down Expand Up @@ -434,13 +476,13 @@ object ZkUtils extends Logging {
* Update the value of a persistent node with the given path and data.
* create parrent directory if necessary. Never throw NodeExistException.
*/
def updateEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
def updateEphemeralPath(client: ZkClient, path: String, data: String, acls: List[ACL] = DefaultAcls): Unit = {
try {
client.writeData(path, data)
} catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
ZkPath.createEphemeral(client, path, data)
ZkPath.createEphemeral(client, path, data, acls)
}
case e2: Throwable => throw e2
}
Expand Down Expand Up @@ -736,15 +778,16 @@ object ZkUtils extends Logging {
* It uses the stat returned by the zookeeper and return the version. Every time
* client updates the path stat.version gets incremented
*/
def getSequenceId(client: ZkClient, path: String): Int = {
def getSequenceId(client: ZkClient, path: String, acls: List[ACL] = DefaultAcls): Int = {
try {
val stat = client.writeDataReturnStat(path, "", -1)
stat.getVersion
} catch {
case e: ZkNoNodeException => {
createParentPath(client, BrokerSequenceIdPath)
createParentPath(client, BrokerSequenceIdPath, acls)
try {
client.createPersistent(BrokerSequenceIdPath, "")
import scala.collection.JavaConversions._
client.createPersistent(BrokerSequenceIdPath, "", acls)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also do asJava here and remove the import.

0
} catch {
case e: ZkNodeExistsException =>
Expand Down Expand Up @@ -854,6 +897,7 @@ class ZKConfig(props: VerifiableProperties) {

object ZkPath {
@volatile private var isNamespacePresent: Boolean = false
import scala.collection.JavaConversions._

def checkNamespace(client: ZkClient) {
if(isNamespacePresent)
Expand All @@ -869,23 +913,23 @@ object ZkPath {
isNamespacePresent = false
}

def createPersistent(client: ZkClient, path: String, data: Object) {
def createPersistent(client: ZkClient, path: String, data: Object, acls: List[ACL]) {
checkNamespace(client)
client.createPersistent(path, data)
client.createPersistent(path, data, acls)
}

def createPersistent(client: ZkClient, path: String, createParents: Boolean) {
def createPersistent(client: ZkClient, path: String, createParents: Boolean, acls: List[ACL]) {
checkNamespace(client)
client.createPersistent(path, createParents)
client.createPersistent(path, createParents, acls)
}

def createEphemeral(client: ZkClient, path: String, data: Object) {
def createEphemeral(client: ZkClient, path: String, data: Object, acls: List[ACL]) {
checkNamespace(client)
client.createEphemeral(path, data)
client.createEphemeral(path, data, acls)
}

def createPersistentSequential(client: ZkClient, path: String, data: Object): String = {
def createPersistentSequential(client: ZkClient, path: String, data: Object, acls: List[ACL]): String = {
checkNamespace(client)
client.createPersistentSequential(path, data)
client.createPersistentSequential(path, data, acls)
}
}