Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ public enum ResourceNameType {
.collect(Collectors.toMap(ResourceNameType::code, Function.identity()))
);

private final static Map<String, ResourceNameType> NAME_TO_VALUE =
Collections.unmodifiableMap(
Arrays.stream(ResourceNameType.values())
.collect(Collectors.toMap(ResourceNameType::name, Function.identity()))
);

private final byte code;

ResourceNameType(byte code) {
Expand All @@ -88,4 +94,11 @@ public boolean isUnknown() {
public static ResourceNameType fromCode(byte code) {
return CODE_TO_VALUE.getOrDefault(code, UNKNOWN);
}

/**
* Return the ResourceNameType with the provided name or {@link #UNKNOWN} if one cannot be found.
*/
public static ResourceNameType fromString(String name) {
return NAME_TO_VALUE.getOrDefault(name, UNKNOWN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import kafka.zk.{KafkaZkClient, StateChangeHandlers}
import kafka.zookeeper.{StateChangeHandler, ZNodeChildChangeHandler}
import org.apache.kafka.common.utils.Time

import scala.util.{Failure, Try}

/**
* Handle the notificationMessage.
*/
Expand Down Expand Up @@ -83,12 +85,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
for (notification <- notifications) {
val changeId = changeNumber(notification)
if (changeId > lastExecutedChange) {
val changeZnode = seqNodeRoot + "/" + notification
val (data, _) = zkClient.getDataAndStat(changeZnode)
data match {
case Some(d) => notificationHandler.processNotification(d)
case None => warn(s"read null data from $changeZnode when processing notification $notification")
}
processNotification(notification)
lastExecutedChange = changeId
}
}
Expand All @@ -100,6 +97,18 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
}
}

private def processNotification(notification: String): Unit = {
val changeZnode = seqNodeRoot + "/" + notification
val (data, _) = zkClient.getDataAndStat(changeZnode)
data match {
case Some(d) => Try(notificationHandler.processNotification(d)) match {
case Failure(e) => error(s"error processing change notification from $changeZnode", e)
case _ =>
}
case None => warn(s"read null data from $changeZnode")
}
}

private def addChangeNotification(): Unit = {
if (!isClosed.get && queue.peek() == null)
queue.put(new ChangeNotification)
Expand Down
25 changes: 14 additions & 11 deletions core/src/main/scala/kafka/security/auth/Resource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package kafka.security.auth

import kafka.common.KafkaException
import org.apache.kafka.common.resource.{ResourceNameType, ResourcePattern}

object Resource {
Expand All @@ -26,16 +27,18 @@ object Resource {
val WildCardResource = "*"

def fromString(str: String): Resource = {
ResourceNameType.values.find(nameType => str.startsWith(nameType.name)) match {
case Some(nameType) =>
str.split(Separator, 3) match {
case Array(_, resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name, nameType)
case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
}
case _ =>
str.split(Separator, 2) match {
case Array(resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name, ResourceNameType.LITERAL)
case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
ResourceType.values.find(resourceType => str.startsWith(resourceType.name + Separator)) match {
case None => throw new KafkaException("Invalid resource string: '" + str + "'")
case Some(resourceType) =>
val remaining = str.substring(resourceType.name.length + 1)

ResourceNameType.values.find(nameType => remaining.startsWith(nameType.name + Separator)) match {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this is called by LiteralAclChangeStore.decode(), which should only decode a 2-part name.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot. I've changed LiteralAclChangeStore.decode() to have its own impl. (Basically the old contents of Resource.fromString), to maintain v1.1 behaviour

case Some(nameType) =>
val name = remaining.substring(nameType.name.length + 1)
Resource(resourceType, name, nameType)

case None =>
Resource(resourceType, remaining, ResourceNameType.LITERAL)
}
}
}
Expand Down Expand Up @@ -74,7 +77,7 @@ case class Resource(resourceType: ResourceType, name: String, nameType: Resource
}

override def toString: String = {
nameType + Resource.Separator + resourceType.name + Resource.Separator + name
resourceType.name + Resource.Separator + nameType + Resource.Separator + name
}
}

41 changes: 22 additions & 19 deletions core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import java.util
import java.util.concurrent.locks.ReentrantReadWriteLock

import com.typesafe.scalalogging.Logger
import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
import kafka.api.KAFKA_2_0_IV1
import kafka.network.RequestChannel.Session
import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
import kafka.server.KafkaConfig
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import kafka.zk.{AclChangeNotificationSequenceZNode, KafkaZkClient, ZkAclStore}
import kafka.zk.{AclChangeNotificationHandler, AclChangeSubscription, KafkaZkClient, ZkAclChangeStore, ZkAclStore}
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.resource.ResourceNameType
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{SecurityUtils, Time}
Expand Down Expand Up @@ -55,7 +56,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
private var superUsers = Set.empty[KafkaPrincipal]
private var shouldAllowEveryoneIfNoAclIsFound = false
private var zkClient: KafkaZkClient = _
private var aclChangeListeners: Seq[ZkNodeChangeNotificationListener] = List()
private var aclChangeListeners: Iterable[AclChangeSubscription] = Iterable.empty
private var extendedAclSupport: Boolean = _

@volatile
private var aclCache = new scala.collection.immutable.TreeMap[Resource, VersionedAcls]()(ResourceOrdering)
Expand Down Expand Up @@ -96,6 +98,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
zkMaxInFlightRequests, time, "kafka.security", "SimpleAclAuthorizer")
zkClient.createAclPaths()

extendedAclSupport = kafkaConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1

loadCache()

startZkChangeListeners()
Expand Down Expand Up @@ -161,6 +165,11 @@ class SimpleAclAuthorizer extends Authorizer with Logging {

override def addAcls(acls: Set[Acl], resource: Resource) {
if (acls != null && acls.nonEmpty) {
if (!extendedAclSupport && resource.nameType == ResourceNameType.PREFIXED) {
throw new UnsupportedVersionException(s"Adding ACLs on prefixed resource patterns requires " +
s"${KafkaConfig.InterBrokerProtocolVersionProp} of $KAFKA_2_0_IV1 or greater")
}

inWriteLock(lock) {
updateResourceAcls(resource) { currentAcls =>
currentAcls ++ acls
Expand Down Expand Up @@ -238,27 +247,23 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
private def loadCache() {
inWriteLock(lock) {
ZkAclStore.stores.foreach(store => {
val resourceTypes = zkClient.getResourceTypes(store.nameType)
val resourceTypes = zkClient.getResourceTypes(store.patternType)
for (rType <- resourceTypes) {
val resourceType = ResourceType.fromString(rType)
val resourceNames = zkClient.getResourceNames(store.nameType, resourceType)
val resourceNames = zkClient.getResourceNames(store.patternType, resourceType)
for (resourceName <- resourceNames) {
val versionedAcls = getAclsFromZk(new Resource(resourceType, resourceName, store.nameType))
updateCache(new Resource(resourceType, resourceName, store.nameType), versionedAcls)
val resource = new Resource(resourceType, resourceName, store.patternType)
val versionedAcls = getAclsFromZk(resource)
updateCache(resource, versionedAcls)
}
}
})
}
}

private def startZkChangeListeners(): Unit = {
aclChangeListeners = ZkAclStore.stores.map(store => {
val aclChangeListener = new ZkNodeChangeNotificationListener(
zkClient, store.aclChangePath, AclChangeNotificationSequenceZNode.SequenceNumberPrefix, new AclChangedNotificationHandler(store))

aclChangeListener.init()
aclChangeListener
})
aclChangeListeners = ZkAclChangeStore.stores
.map(store => store.createListener(AclChangedNotificationHandler, zkClient))
}

private def logAuditMessage(principal: KafkaPrincipal, authorized: Boolean, operation: Operation, resource: Resource, host: String) {
Expand Down Expand Up @@ -343,17 +348,15 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
}

private def updateAclChangedFlag(resource: Resource) {
zkClient.createAclChangeNotification(resource)
zkClient.createAclChangeNotification(resource)
}

private def backoffTime = {
retryBackoffMs + Random.nextInt(retryBackoffJitterMs)
}

class AclChangedNotificationHandler(store: ZkAclStore) extends NotificationHandler {
override def processNotification(notificationMessage: Array[Byte]) {
val resource: Resource = store.decode(notificationMessage)

object AclChangedNotificationHandler extends AclChangeNotificationHandler {
override def processNotification(resource: Resource) {
inWriteLock(lock) {
val versionedAcls = getAclsFromZk(resource)
updateCache(resource, versionedAcls)
Expand Down
31 changes: 16 additions & 15 deletions core/src/main/scala/kafka/zk/KafkaZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -941,14 +941,15 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
//Acl management methods

/**
* Creates the required zk nodes for Acl storage
* Creates the required zk nodes for Acl storage and Acl change storage.
*/
def createAclPaths(): Unit = {
ZkAclStore.stores.foreach(store => {
createRecursive(store.aclPath, throwIfPathExists = false)
createRecursive(store.aclChangePath, throwIfPathExists = false)
ResourceType.values.foreach(resourceType => createRecursive(store.path(resourceType), throwIfPathExists = false))
})

ZkAclChangeStore.stores.foreach(store => createRecursive(store.aclChangePath, throwIfPathExists = false))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment of the method should be adjusted to "Creates the required zk nodes for Acl storage and Acl change storage".

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

/**
Expand Down Expand Up @@ -1005,13 +1006,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
}

/**
* Creates Acl change notification message
* @param resource resource name
* Creates an Acl change notification message.
* @param resource resource pattern that has changed
*/
def createAclChangeNotification(resource: Resource): Unit = {
val store = ZkAclStore(resource.nameType)
val path = store.changeSequenceZNode.createPath
val createRequest = CreateRequest(path, AclChangeNotificationSequenceZNode.encode(resource), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
val aclChange = ZkAclStore(resource.nameType).changeStore.createChangeNode(resource)
val createRequest = CreateRequest(aclChange.path, aclChange.bytes, acls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL)
val createResponse = retryRequestUntilConnected(createRequest)
createResponse.maybeThrow
}
Expand All @@ -1034,24 +1034,25 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
* @throws KeeperException if there is an error while deleting Acl change notifications
*/
def deleteAclChangeNotifications(): Unit = {
ZkAclStore.stores.foreach(store => {
ZkAclChangeStore.stores.foreach(store => {
val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(store.aclChangePath))
if (getChildrenResponse.resultCode == Code.OK) {
deleteAclChangeNotifications(store, getChildrenResponse.children)
deleteAclChangeNotifications(store.aclChangePath, getChildrenResponse.children)
} else if (getChildrenResponse.resultCode != Code.NONODE) {
getChildrenResponse.maybeThrow
}
})
}

/**
* Deletes the Acl change notifications associated with the given sequence nodes
* @param sequenceNodes
*/
private def deleteAclChangeNotifications(store: ZkAclStore, sequenceNodes: Seq[String]): Unit = {
val aclChangeNotificationSequenceZNode = store.changeSequenceZNode
* Deletes the Acl change notifications associated with the given sequence nodes
*
* @param aclChangePath the root path
* @param sequenceNodes the name of the node to delete.
*/
private def deleteAclChangeNotifications(aclChangePath: String, sequenceNodes: Seq[String]): Unit = {
val deleteRequests = sequenceNodes.map { sequenceNode =>
DeleteRequest(aclChangeNotificationSequenceZNode.deletePath(sequenceNode), ZkVersion.NoVersion)
DeleteRequest(s"$aclChangePath/$sequenceNode", ZkVersion.NoVersion)
}

val deleteResponses = retryRequestsUntilConnected(deleteRequests)
Expand Down
Loading