diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java index 7aa72170de973..0e4fc0f271216 100644 --- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java +++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java @@ -62,6 +62,12 @@ public enum ResourceNameType { .collect(Collectors.toMap(ResourceNameType::code, Function.identity())) ); + private final static Map NAME_TO_VALUE = + Collections.unmodifiableMap( + Arrays.stream(ResourceNameType.values()) + .collect(Collectors.toMap(ResourceNameType::name, Function.identity())) + ); + private final byte code; ResourceNameType(byte code) { @@ -88,4 +94,11 @@ public boolean isUnknown() { public static ResourceNameType fromCode(byte code) { return CODE_TO_VALUE.getOrDefault(code, UNKNOWN); } + + /** + * Return the ResourceNameType with the provided name or {@link #UNKNOWN} if one cannot be found. + */ + public static ResourceNameType fromString(String name) { + return NAME_TO_VALUE.getOrDefault(name, UNKNOWN); + } } diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala index 51798519cb137..8ec7f95343125 100644 --- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala +++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala @@ -24,6 +24,8 @@ import kafka.zk.{KafkaZkClient, StateChangeHandlers} import kafka.zookeeper.{StateChangeHandler, ZNodeChildChangeHandler} import org.apache.kafka.common.utils.Time +import scala.util.{Failure, Try} + /** * Handle the notificationMessage. */ @@ -83,12 +85,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient, for (notification <- notifications) { val changeId = changeNumber(notification) if (changeId > lastExecutedChange) { - val changeZnode = seqNodeRoot + "/" + notification - val (data, _) = zkClient.getDataAndStat(changeZnode) - data match { - case Some(d) => notificationHandler.processNotification(d) - case None => warn(s"read null data from $changeZnode when processing notification $notification") - } + processNotification(notification) lastExecutedChange = changeId } } @@ -100,6 +97,18 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient, } } + private def processNotification(notification: String): Unit = { + val changeZnode = seqNodeRoot + "/" + notification + val (data, _) = zkClient.getDataAndStat(changeZnode) + data match { + case Some(d) => Try(notificationHandler.processNotification(d)) match { + case Failure(e) => error(s"error processing change notification from $changeZnode", e) + case _ => + } + case None => warn(s"read null data from $changeZnode") + } + } + private def addChangeNotification(): Unit = { if (!isClosed.get && queue.peek() == null) queue.put(new ChangeNotification) diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala index f07a11c730066..a3dd06528f1a6 100644 --- a/core/src/main/scala/kafka/security/auth/Resource.scala +++ b/core/src/main/scala/kafka/security/auth/Resource.scala @@ -16,6 +16,7 @@ */ package kafka.security.auth +import kafka.common.KafkaException import org.apache.kafka.common.resource.{ResourceNameType, ResourcePattern} object Resource { @@ -26,16 +27,18 @@ object Resource { val WildCardResource = "*" def fromString(str: String): Resource = { - ResourceNameType.values.find(nameType => str.startsWith(nameType.name)) match { - case Some(nameType) => - str.split(Separator, 3) match { - case Array(_, resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name, nameType) - case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str) - } - case _ => - str.split(Separator, 2) match { - case Array(resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name, ResourceNameType.LITERAL) - case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str) + ResourceType.values.find(resourceType => str.startsWith(resourceType.name + Separator)) match { + case None => throw new KafkaException("Invalid resource string: '" + str + "'") + case Some(resourceType) => + val remaining = str.substring(resourceType.name.length + 1) + + ResourceNameType.values.find(nameType => remaining.startsWith(nameType.name + Separator)) match { + case Some(nameType) => + val name = remaining.substring(nameType.name.length + 1) + Resource(resourceType, name, nameType) + + case None => + Resource(resourceType, remaining, ResourceNameType.LITERAL) } } } @@ -74,7 +77,7 @@ case class Resource(resourceType: ResourceType, name: String, nameType: Resource } override def toString: String = { - nameType + Resource.Separator + resourceType.name + Resource.Separator + name + resourceType.name + Resource.Separator + nameType + Resource.Separator + name } } diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index 601b5be78330c..cecad0ed763e3 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -20,13 +20,14 @@ import java.util import java.util.concurrent.locks.ReentrantReadWriteLock import com.typesafe.scalalogging.Logger -import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener} +import kafka.api.KAFKA_2_0_IV1 import kafka.network.RequestChannel.Session import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls import kafka.server.KafkaConfig import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ -import kafka.zk.{AclChangeNotificationSequenceZNode, KafkaZkClient, ZkAclStore} +import kafka.zk.{AclChangeNotificationHandler, AclChangeSubscription, KafkaZkClient, ZkAclChangeStore, ZkAclStore} +import org.apache.kafka.common.errors.UnsupportedVersionException import org.apache.kafka.common.resource.ResourceNameType import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{SecurityUtils, Time} @@ -55,7 +56,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging { private var superUsers = Set.empty[KafkaPrincipal] private var shouldAllowEveryoneIfNoAclIsFound = false private var zkClient: KafkaZkClient = _ - private var aclChangeListeners: Seq[ZkNodeChangeNotificationListener] = List() + private var aclChangeListeners: Iterable[AclChangeSubscription] = Iterable.empty + private var extendedAclSupport: Boolean = _ @volatile private var aclCache = new scala.collection.immutable.TreeMap[Resource, VersionedAcls]()(ResourceOrdering) @@ -96,6 +98,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging { zkMaxInFlightRequests, time, "kafka.security", "SimpleAclAuthorizer") zkClient.createAclPaths() + extendedAclSupport = kafkaConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1 + loadCache() startZkChangeListeners() @@ -161,6 +165,11 @@ class SimpleAclAuthorizer extends Authorizer with Logging { override def addAcls(acls: Set[Acl], resource: Resource) { if (acls != null && acls.nonEmpty) { + if (!extendedAclSupport && resource.nameType == ResourceNameType.PREFIXED) { + throw new UnsupportedVersionException(s"Adding ACLs on prefixed resource patterns requires " + + s"${KafkaConfig.InterBrokerProtocolVersionProp} of $KAFKA_2_0_IV1 or greater") + } + inWriteLock(lock) { updateResourceAcls(resource) { currentAcls => currentAcls ++ acls @@ -238,13 +247,14 @@ class SimpleAclAuthorizer extends Authorizer with Logging { private def loadCache() { inWriteLock(lock) { ZkAclStore.stores.foreach(store => { - val resourceTypes = zkClient.getResourceTypes(store.nameType) + val resourceTypes = zkClient.getResourceTypes(store.patternType) for (rType <- resourceTypes) { val resourceType = ResourceType.fromString(rType) - val resourceNames = zkClient.getResourceNames(store.nameType, resourceType) + val resourceNames = zkClient.getResourceNames(store.patternType, resourceType) for (resourceName <- resourceNames) { - val versionedAcls = getAclsFromZk(new Resource(resourceType, resourceName, store.nameType)) - updateCache(new Resource(resourceType, resourceName, store.nameType), versionedAcls) + val resource = new Resource(resourceType, resourceName, store.patternType) + val versionedAcls = getAclsFromZk(resource) + updateCache(resource, versionedAcls) } } }) @@ -252,13 +262,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging { } private def startZkChangeListeners(): Unit = { - aclChangeListeners = ZkAclStore.stores.map(store => { - val aclChangeListener = new ZkNodeChangeNotificationListener( - zkClient, store.aclChangePath, AclChangeNotificationSequenceZNode.SequenceNumberPrefix, new AclChangedNotificationHandler(store)) - - aclChangeListener.init() - aclChangeListener - }) + aclChangeListeners = ZkAclChangeStore.stores + .map(store => store.createListener(AclChangedNotificationHandler, zkClient)) } private def logAuditMessage(principal: KafkaPrincipal, authorized: Boolean, operation: Operation, resource: Resource, host: String) { @@ -343,17 +348,15 @@ class SimpleAclAuthorizer extends Authorizer with Logging { } private def updateAclChangedFlag(resource: Resource) { - zkClient.createAclChangeNotification(resource) + zkClient.createAclChangeNotification(resource) } private def backoffTime = { retryBackoffMs + Random.nextInt(retryBackoffJitterMs) } - class AclChangedNotificationHandler(store: ZkAclStore) extends NotificationHandler { - override def processNotification(notificationMessage: Array[Byte]) { - val resource: Resource = store.decode(notificationMessage) - + object AclChangedNotificationHandler extends AclChangeNotificationHandler { + override def processNotification(resource: Resource) { inWriteLock(lock) { val versionedAcls = getAclsFromZk(resource) updateCache(resource, versionedAcls) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 6ec8e30e0648a..ad55a6f578221 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -941,14 +941,15 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean //Acl management methods /** - * Creates the required zk nodes for Acl storage + * Creates the required zk nodes for Acl storage and Acl change storage. */ def createAclPaths(): Unit = { ZkAclStore.stores.foreach(store => { createRecursive(store.aclPath, throwIfPathExists = false) - createRecursive(store.aclChangePath, throwIfPathExists = false) ResourceType.values.foreach(resourceType => createRecursive(store.path(resourceType), throwIfPathExists = false)) }) + + ZkAclChangeStore.stores.foreach(store => createRecursive(store.aclChangePath, throwIfPathExists = false)) } /** @@ -1005,13 +1006,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean } /** - * Creates Acl change notification message - * @param resource resource name + * Creates an Acl change notification message. + * @param resource resource pattern that has changed */ def createAclChangeNotification(resource: Resource): Unit = { - val store = ZkAclStore(resource.nameType) - val path = store.changeSequenceZNode.createPath - val createRequest = CreateRequest(path, AclChangeNotificationSequenceZNode.encode(resource), acls(path), CreateMode.PERSISTENT_SEQUENTIAL) + val aclChange = ZkAclStore(resource.nameType).changeStore.createChangeNode(resource) + val createRequest = CreateRequest(aclChange.path, aclChange.bytes, acls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL) val createResponse = retryRequestUntilConnected(createRequest) createResponse.maybeThrow } @@ -1034,10 +1034,10 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean * @throws KeeperException if there is an error while deleting Acl change notifications */ def deleteAclChangeNotifications(): Unit = { - ZkAclStore.stores.foreach(store => { + ZkAclChangeStore.stores.foreach(store => { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(store.aclChangePath)) if (getChildrenResponse.resultCode == Code.OK) { - deleteAclChangeNotifications(store, getChildrenResponse.children) + deleteAclChangeNotifications(store.aclChangePath, getChildrenResponse.children) } else if (getChildrenResponse.resultCode != Code.NONODE) { getChildrenResponse.maybeThrow } @@ -1045,13 +1045,14 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean } /** - * Deletes the Acl change notifications associated with the given sequence nodes - * @param sequenceNodes - */ - private def deleteAclChangeNotifications(store: ZkAclStore, sequenceNodes: Seq[String]): Unit = { - val aclChangeNotificationSequenceZNode = store.changeSequenceZNode + * Deletes the Acl change notifications associated with the given sequence nodes + * + * @param aclChangePath the root path + * @param sequenceNodes the name of the node to delete. + */ + private def deleteAclChangeNotifications(aclChangePath: String, sequenceNodes: Seq[String]): Unit = { val deleteRequests = sequenceNodes.map { sequenceNode => - DeleteRequest(aclChangeNotificationSequenceZNode.deletePath(sequenceNode), ZkVersion.NoVersion) + DeleteRequest(s"$aclChangePath/$sequenceNode", ZkVersion.NoVersion) } val deleteResponses = retryRequestsUntilConnected(deleteRequests) diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index d4470abfd5375..2cbdd8061d3ce 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -23,13 +23,15 @@ import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.core.JsonProcessingException import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr} import kafka.cluster.{Broker, EndPoint} -import kafka.common.KafkaException +import kafka.common.{KafkaException, NotificationHandler, ZkNodeChangeNotificationListener} import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch} +import kafka.security.auth.Resource.Separator import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls import kafka.security.auth.{Acl, Resource, ResourceType} import kafka.server.{ConfigType, DelegationTokenManager} import kafka.utils.Json import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.UnsupportedVersionException import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.resource.ResourceNameType import org.apache.kafka.common.security.auth.SecurityProtocol @@ -42,6 +44,7 @@ import scala.beans.BeanProperty import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.collection.{Seq, breakOut} +import scala.util.{Failure, Success, Try} // This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes). @@ -446,75 +449,205 @@ object StateChangeHandlers { } /** - * Acls for resources are stored in ZK under a root node that is determined by the [[ResourceNameType]]. - * Under each [[ResourceNameType]] node there will be one child node per resource type (Topic, Cluster, Group, etc). - * Under each resourceType there will be a unique child for each resource path and the data for that child will contain + * Acls for resources are stored in ZK under two root paths: + * + * + * Under each root node there will be one child node per resource type (Topic, Cluster, Group, etc). + * Under each resourceType there will be a unique child for each resource pattern and the data for that child will contain * list of its acls as a json object. Following gives an example: * *
+  * // Literal patterns:
   * /kafka-acl/Topic/topic-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
   * /kafka-acl/Cluster/kafka-cluster => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
-  * /kafka-prefixed-acl/Group/group-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
+  *
+  * // Prefixed patterns:
+  * /kafka-acl-extended/PREFIXED/Group/group-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
   * 
+ * + * Acl change events are also stored under two paths: + * */ -case class ZkAclStore(nameType: ResourceNameType) { - val aclPath: String = nameType match { - case ResourceNameType.LITERAL => "/kafka-acl" - case ResourceNameType.PREFIXED => "/kafka-prefixed-acl" - case _ => throw new IllegalArgumentException("Unknown name type:" + nameType) +sealed trait ZkAclStore { + val patternType: ResourceNameType + val aclPath: String + + def path(resourceType: ResourceType): String = s"$aclPath/$resourceType" + + def path(resourceType: ResourceType, resourceName: String): String = s"$aclPath/$resourceType/$resourceName" + + def changeStore: ZkAclChangeStore +} + +object ZkAclStore { + private val storesByType: Map[ResourceNameType, ZkAclStore] = ResourceNameType.values + .filter(nameType => nameType != ResourceNameType.ANY && nameType != ResourceNameType.UNKNOWN) + .map(nameType => (nameType, create(nameType))) + .toMap + + val stores: Iterable[ZkAclStore] = storesByType.values + + val securePaths: Iterable[String] = stores + .flatMap(store => Set(store.aclPath, store.changeStore.aclChangePath)) + + def apply(patternType: ResourceNameType): ZkAclStore = { + storesByType.get(patternType) match { + case Some(store) => store + case None => throw new KafkaException(s"Invalid pattern type: $patternType") + } } - val aclChangePath: String = nameType match { - case ResourceNameType.LITERAL => "/kafka-acl-changes" - case ResourceNameType.PREFIXED => "/kafka-prefixed-acl-changes" - case _ => throw new IllegalArgumentException("Unknown name type:" + nameType) + private def create(patternType: ResourceNameType) = { + patternType match { + case ResourceNameType.LITERAL => LiteralAclStore + case _ => new ExtendedAclStore(patternType) + } } +} - def path(resourceType: ResourceType) = s"$aclPath/$resourceType" +object LiteralAclStore extends ZkAclStore { + val patternType: ResourceNameType = ResourceNameType.LITERAL + val aclPath: String = "/kafka-acl" - def path(resourceType: ResourceType, resourceName: String): String = s"$aclPath/$resourceType/$resourceName" + def changeStore: ZkAclChangeStore = LiteralAclChangeStore +} + +class ExtendedAclStore(val patternType: ResourceNameType) extends ZkAclStore { + if (patternType == ResourceNameType.LITERAL) + throw new IllegalArgumentException("Literal pattern types are not supported") - def changeSequenceZNode: AclChangeNotificationSequenceZNode = AclChangeNotificationSequenceZNode(this) + val aclPath: String = s"/kafka-acl-extended/${patternType.name.toLowerCase}" - def decode(notificationMessage: Array[Byte]): Resource = AclChangeNotificationSequenceZNode.decode(nameType, notificationMessage) + def changeStore: ZkAclChangeStore = ExtendedAclChangeStore } -object ZkAclStore { - val stores: Seq[ZkAclStore] = ResourceNameType.values - .filter(nameType => nameType != ResourceNameType.ANY && nameType != ResourceNameType.UNKNOWN) - .map(nameType => ZkAclStore(nameType)) +trait AclChangeNotificationHandler { + def processNotification(resource: Resource): Unit +} - val securePaths: Seq[String] = stores - .flatMap(store => List(store.aclPath, store.aclChangePath)) +trait AclChangeSubscription extends AutoCloseable { + def close(): Unit } -object ResourceZNode { - def path(resource: Resource): String = ZkAclStore(resource.nameType).path(resource.resourceType, resource.name) +case class AclChangeNode(path: String, bytes: Array[Byte]) - def encode(acls: Set[Acl]): Array[Byte] = Json.encodeAsBytes(Acl.toJsonCompatibleMap(acls).asJava) - def decode(bytes: Array[Byte], stat: Stat): VersionedAcls = VersionedAcls(Acl.fromBytes(bytes), stat.getVersion) +sealed trait ZkAclChangeStore { + val aclChangePath: String + def createPath: String = s"$aclChangePath/${ZkAclChangeStore.SequenceNumberPrefix}" + + def decode(bytes: Array[Byte]): Resource + + protected def encode(resource: Resource): Array[Byte] + + def createChangeNode(resource: Resource): AclChangeNode = AclChangeNode(createPath, encode(resource)) + + def createListener(handler: AclChangeNotificationHandler, zkClient: KafkaZkClient): AclChangeSubscription = { + val rawHandler: NotificationHandler = new NotificationHandler { + def processNotification(bytes: Array[Byte]): Unit = + handler.processNotification(decode(bytes)) + } + + val aclChangeListener = new ZkNodeChangeNotificationListener( + zkClient, aclChangePath, ZkAclChangeStore.SequenceNumberPrefix, rawHandler) + + aclChangeListener.init() + + new AclChangeSubscription { + def close(): Unit = aclChangeListener.close() + } + } } -object AclChangeNotificationSequenceZNode { - val Separator = ":" +object ZkAclChangeStore { + val stores: Iterable[ZkAclChangeStore] = List(LiteralAclChangeStore, ExtendedAclChangeStore) + def SequenceNumberPrefix = "acl_changes_" +} + +case object LiteralAclChangeStore extends ZkAclChangeStore { + val name = "LiteralAclChangeStore" + val aclChangePath: String = "/kafka-acl-changes" def encode(resource: Resource): Array[Byte] = { - (resource.resourceType.name + Separator + resource.name).getBytes(UTF_8) + if (resource.nameType != ResourceNameType.LITERAL) + throw new IllegalArgumentException("Only literal resource patterns can be encoded") + + val legacyName = resource.resourceType + Resource.Separator + resource.name + legacyName.getBytes(UTF_8) + } + + def decode(bytes: Array[Byte]): Resource = { + val string = new String(bytes, UTF_8) + string.split(Separator, 2) match { + case Array(resourceType, resourceName, _*) => new Resource(ResourceType.fromString(resourceType), resourceName, ResourceNameType.LITERAL) + case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + string) + } } +} - def decode(nameType: ResourceNameType, bytes: Array[Byte]): Resource = { - val str = new String(bytes, UTF_8) - str.split(Separator, 2) match { - case Array(resourceType, name, _*) => Resource(ResourceType.fromString(resourceType), name, nameType) - case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str) +case object ExtendedAclChangeStore extends ZkAclChangeStore { + val name = "ExtendedAclChangeStore" + val aclChangePath: String = "/kafka-acl-extended-changes" + + def encode(resource: Resource): Array[Byte] = { + if (resource.nameType == ResourceNameType.LITERAL) + throw new IllegalArgumentException("Literal pattern types are not supported") + + Json.encodeAsBytes(ExtendedAclChangeEvent( + ExtendedAclChangeEvent.currentVersion, + resource.resourceType.name, + resource.name, + resource.nameType.name)) + } + + def decode(bytes: Array[Byte]): Resource = { + val changeEvent = Json.parseBytesAs[ExtendedAclChangeEvent](bytes) match { + case Right(event) => event + case Left(e) => throw new IllegalArgumentException("Failed to parse ACL change event", e) + } + + changeEvent.toResource match { + case Success(r) => r + case Failure(e) => throw new IllegalArgumentException("Failed to convert ACL change event to resource", e) } } } -case class AclChangeNotificationSequenceZNode(store: ZkAclStore) { - def createPath = s"${store.aclChangePath}/${AclChangeNotificationSequenceZNode.SequenceNumberPrefix}" - def deletePath(sequenceNode: String) = s"${store.aclChangePath}/$sequenceNode" +object ResourceZNode { + def path(resource: Resource): String = ZkAclStore(resource.nameType).path(resource.resourceType, resource.name) + + def encode(acls: Set[Acl]): Array[Byte] = Json.encodeAsBytes(Acl.toJsonCompatibleMap(acls).asJava) + def decode(bytes: Array[Byte], stat: Stat): VersionedAcls = VersionedAcls(Acl.fromBytes(bytes), stat.getVersion) +} + +object ExtendedAclChangeEvent { + val currentVersion: Int = 1 +} + +case class ExtendedAclChangeEvent(@BeanProperty @JsonProperty("version") version: Int, + @BeanProperty @JsonProperty("resourceType") resourceType: String, + @BeanProperty @JsonProperty("name") name: String, + @BeanProperty @JsonProperty("resourceNameType") resourceNameType: String) { + if (version > ExtendedAclChangeEvent.currentVersion) + throw new UnsupportedVersionException(s"Acl change event received for unsupported version: $version") + + def toResource: Try[Resource] = { + for { + resType <- Try(ResourceType.fromString(resourceType)) + nameType <- Try(ResourceNameType.fromString(resourceNameType)) + resource = Resource(resType, name, nameType) + } yield resource + } } object ClusterZNode { diff --git a/core/src/test/scala/kafka/security/auth/ResourceTest.scala b/core/src/test/scala/kafka/security/auth/ResourceTest.scala index 2924cff582387..c7ed94956533b 100644 --- a/core/src/test/scala/kafka/security/auth/ResourceTest.scala +++ b/core/src/test/scala/kafka/security/auth/ResourceTest.scala @@ -24,10 +24,15 @@ import org.junit.Assert._ class ResourceTest { @Test(expected = classOf[KafkaException]) - def shouldThrowTwoPartStringWithUnknownResourceType(): Unit = { + def shouldThrowOnTwoPartStringWithUnknownResourceType(): Unit = { Resource.fromString("Unknown:fred") } + @Test(expected = classOf[KafkaException]) + def shouldThrowOnBadResourceTypeSeparator(): Unit = { + Resource.fromString("Topic-fred") + } + @Test def shouldParseOldTwoPartString(): Unit = { assertEquals(Resource(Group, "fred", LITERAL), Resource.fromString("Group:fred")) @@ -41,14 +46,14 @@ class ResourceTest { @Test def shouldParseThreePartString(): Unit = { - assertEquals(Resource(Group, "fred", PREFIXED), Resource.fromString("PREFIXED:Group:fred")) - assertEquals(Resource(Topic, "t", LITERAL), Resource.fromString("LITERAL:Topic:t")) + assertEquals(Resource(Group, "fred", PREFIXED), Resource.fromString("Group:PREFIXED:fred")) + assertEquals(Resource(Topic, "t", LITERAL), Resource.fromString("Topic:LITERAL:t")) } @Test def shouldParseThreePartWithEmbeddedSeparators(): Unit = { - assertEquals(Resource(Group, ":This:is:a:weird:group:name:", PREFIXED), Resource.fromString("PREFIXED:Group::This:is:a:weird:group:name:")) - assertEquals(Resource(Group, ":This:is:a:weird:group:name:", LITERAL), Resource.fromString("LITERAL:Group::This:is:a:weird:group:name:")) + assertEquals(Resource(Group, ":This:is:a:weird:group:name:", PREFIXED), Resource.fromString("Group:PREFIXED::This:is:a:weird:group:name:")) + assertEquals(Resource(Group, ":This:is:a:weird:group:name:", LITERAL), Resource.fromString("Group:LITERAL::This:is:a:weird:group:name:")) } @Test diff --git a/core/src/test/scala/kafka/zk/ExtendedAclStoreTest.scala b/core/src/test/scala/kafka/zk/ExtendedAclStoreTest.scala new file mode 100644 index 0000000000000..4e8580b63e4ba --- /dev/null +++ b/core/src/test/scala/kafka/zk/ExtendedAclStoreTest.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.zk + +import kafka.security.auth.{Resource, Topic} +import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED} +import org.junit.Assert.assertEquals +import org.junit.Test + +class ExtendedAclStoreTest { + private val literalResource = Resource(Topic, "some-topic", LITERAL) + private val prefixedResource = Resource(Topic, "some-topic", PREFIXED) + private val store = new ExtendedAclStore(PREFIXED) + + @Test + def shouldHaveCorrectPaths(): Unit = { + assertEquals("/kafka-acl-extended/prefixed", store.aclPath) + assertEquals("/kafka-acl-extended/prefixed/Topic", store.path(Topic)) + assertEquals("/kafka-acl-extended-changes", store.changeStore.aclChangePath) + } + + @Test + def shouldHaveCorrectPatternType(): Unit = { + assertEquals(PREFIXED, store.patternType) + } + + @Test(expected = classOf[IllegalArgumentException]) + def shouldThrowIfConstructedWithLiteral(): Unit = { + new ExtendedAclStore(LITERAL) + } + + @Test(expected = classOf[IllegalArgumentException]) + def shouldThrowFromEncodeOnLiteral(): Unit = { + store.changeStore.createChangeNode(literalResource) + } + + @Test + def shouldWriteChangesToTheWritePath(): Unit = { + val changeNode = store.changeStore.createChangeNode(prefixedResource) + + assertEquals("/kafka-acl-extended-changes/acl_changes_", changeNode.path) + } + + @Test + def shouldRoundTripChangeNode(): Unit = { + val changeNode = store.changeStore.createChangeNode(prefixedResource) + + val actual = store.changeStore.decode(changeNode.bytes) + + assertEquals(prefixedResource, actual) + } +} \ No newline at end of file diff --git a/core/src/test/scala/kafka/zk/LiteralAclStoreTest.scala b/core/src/test/scala/kafka/zk/LiteralAclStoreTest.scala new file mode 100644 index 0000000000000..22d6f23547b56 --- /dev/null +++ b/core/src/test/scala/kafka/zk/LiteralAclStoreTest.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.zk + +import kafka.security.auth.{Resource, Topic} +import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED} +import org.junit.Assert.assertEquals +import org.junit.Test + +class LiteralAclStoreTest { + private val literalResource = Resource(Topic, "some-topic", LITERAL) + private val prefixedResource = Resource(Topic, "some-topic", PREFIXED) + private val store = LiteralAclStore + + @Test + def shouldHaveCorrectPaths(): Unit = { + assertEquals("/kafka-acl", store.aclPath) + assertEquals("/kafka-acl/Topic", store.path(Topic)) + assertEquals("/kafka-acl-changes", store.changeStore.aclChangePath) + } + + @Test + def shouldHaveCorrectPatternType(): Unit = { + assertEquals(LITERAL, store.patternType) + } + + @Test(expected = classOf[IllegalArgumentException]) + def shouldThrowFromEncodeOnNoneLiteral(): Unit = { + store.changeStore.createChangeNode(prefixedResource) + } + + @Test + def shouldWriteChangesToTheWritePath(): Unit = { + val changeNode = store.changeStore.createChangeNode(literalResource) + + assertEquals("/kafka-acl-changes/acl_changes_", changeNode.path) + } + + @Test + def shouldRoundTripChangeNode(): Unit = { + val changeNode = store.changeStore.createChangeNode(literalResource) + + val actual = store.changeStore.decode(changeNode.bytes) + + assertEquals(literalResource, actual) + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala index 02918d6b2e07b..58f0962b7bdec 100644 --- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala +++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala @@ -18,43 +18,44 @@ package kafka.common import kafka.security.auth.{Group, Resource} import kafka.utils.TestUtils -import kafka.zk.{AclChangeNotificationSequenceZNode, ZkAclStore, ZooKeeperTestHarness} +import kafka.zk.{LiteralAclChangeStore, LiteralAclStore, ZkAclChangeStore, ZooKeeperTestHarness} import org.apache.kafka.common.resource.ResourceNameType.LITERAL -import org.junit.{After, Test} +import org.junit.{After, Before, Test} + +import scala.collection.mutable.ArrayBuffer class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness { - var notificationListener: ZkNodeChangeNotificationListener = _ + private val changeExpirationMs = 1000 + private var notificationListener: ZkNodeChangeNotificationListener = _ + private var notificationHandler: TestNotificationHandler = _ + + @Before + override def setUp(): Unit = { + super.setUp() + zkClient.createAclPaths() + notificationHandler = new TestNotificationHandler() + } @After override def tearDown(): Unit = { if (notificationListener != null) { notificationListener.close() } + super.tearDown() } @Test def testProcessNotification() { - @volatile var notification: Resource = null - @volatile var invocationCount = 0 - val notificationHandler = new NotificationHandler { - override def processNotification(notificationMessage: Array[Byte]): Unit = { - notification = AclChangeNotificationSequenceZNode.decode(LITERAL, notificationMessage) - invocationCount += 1 - } - } - - zkClient.createAclPaths() val notificationMessage1 = Resource(Group, "messageA", LITERAL) val notificationMessage2 = Resource(Group, "messageB", LITERAL) - val changeExpirationMs = 1000 - notificationListener = new ZkNodeChangeNotificationListener(zkClient, ZkAclStore(LITERAL).aclChangePath, - AclChangeNotificationSequenceZNode.SequenceNumberPrefix, notificationHandler, changeExpirationMs) + notificationListener = new ZkNodeChangeNotificationListener(zkClient, LiteralAclChangeStore.aclChangePath, + ZkAclChangeStore.SequenceNumberPrefix, notificationHandler, changeExpirationMs) notificationListener.init() zkClient.createAclChangeNotification(notificationMessage1) - TestUtils.waitUntilTrue(() => invocationCount == 1 && notification == notificationMessage1, + TestUtils.waitUntilTrue(() => notificationHandler.received().size == 1 && notificationHandler.received().last == notificationMessage1, "Failed to send/process notification message in the timeout period.") /* @@ -66,12 +67,43 @@ class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness { */ zkClient.createAclChangeNotification(notificationMessage2) - TestUtils.waitUntilTrue(() => invocationCount == 2 && notification == notificationMessage2, + TestUtils.waitUntilTrue(() => notificationHandler.received().size == 2 && notificationHandler.received().last == notificationMessage2, "Failed to send/process notification message in the timeout period.") (3 to 10).foreach(i => zkClient.createAclChangeNotification(Resource(Group, "message" + i, LITERAL))) - TestUtils.waitUntilTrue(() => invocationCount == 10 , - s"Expected 10 invocations of processNotifications, but there were $invocationCount") + TestUtils.waitUntilTrue(() => notificationHandler.received().size == 10, + s"Expected 10 invocations of processNotifications, but there were ${notificationHandler.received()}") + } + + @Test + def testSwallowsProcessorException() : Unit = { + notificationHandler.setThrowSize(2) + notificationListener = new ZkNodeChangeNotificationListener(zkClient, LiteralAclChangeStore.aclChangePath, + ZkAclChangeStore.SequenceNumberPrefix, notificationHandler, changeExpirationMs) + notificationListener.init() + + zkClient.createAclChangeNotification(Resource(Group, "messageA", LITERAL)) + zkClient.createAclChangeNotification(Resource(Group, "messageB", LITERAL)) + zkClient.createAclChangeNotification(Resource(Group, "messageC", LITERAL)) + + TestUtils.waitUntilTrue(() => notificationHandler.received().size == 3, + s"Expected 2 invocations of processNotifications, but there were ${notificationHandler.received()}") + } + + private class TestNotificationHandler extends NotificationHandler { + private val messages = ArrayBuffer.empty[Resource] + @volatile private var throwSize = Option.empty[Int] + + override def processNotification(notificationMessage: Array[Byte]): Unit = { + messages += LiteralAclStore.changeStore.decode(notificationMessage) + + if (throwSize.contains(messages.size)) + throw new RuntimeException("Oh no, my processing failed!") + } + + def received(): Seq[Resource] = messages + + def setThrowSize(index: Int): Unit = throwSize = Option(index) } -} +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala index 05a433ce97bc9..b3012712c7a6c 100644 --- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala @@ -17,15 +17,21 @@ package kafka.security.auth import java.net.InetAddress +import java.nio.charset.StandardCharsets.UTF_8 import java.util.UUID +import kafka.api.{ApiVersion, KAFKA_2_0_IV0, KAFKA_2_0_IV1} import kafka.network.RequestChannel.Session import kafka.security.auth.Acl.{WildCardHost, WildCardResource} import kafka.server.KafkaConfig import kafka.utils.TestUtils -import kafka.zk.ZooKeeperTestHarness +import kafka.zk.{ZkAclStore, ZooKeeperTestHarness} +import kafka.zookeeper.{GetChildrenRequest, GetDataRequest, ZooKeeperClient} +import org.apache.kafka.common.errors.UnsupportedVersionException +import org.apache.kafka.common.resource.ResourceNameType import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED} import org.apache.kafka.common.security.auth.KafkaPrincipal +import org.apache.kafka.common.utils.Time import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -47,7 +53,8 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { val username = "alice" val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) val session = Session(principal, testHostName) - var config: KafkaConfig = null + var config: KafkaConfig = _ + private var zooKeeperClient: ZooKeeperClient = _ @Before override def setUp() { @@ -64,12 +71,16 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { simpleAclAuthorizer.configure(config.originals) simpleAclAuthorizer2.configure(config.originals) resource = Resource(Topic, "foo-" + UUID.randomUUID(), LITERAL) + + zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, + Time.SYSTEM, "kafka.test", "SimpleAclAuthorizerTest") } @After override def tearDown(): Unit = { simpleAclAuthorizer.close() simpleAclAuthorizer2.close() + zooKeeperClient.close() super.tearDown() } @@ -553,6 +564,88 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { assertEquals(4, simpleAclAuthorizer.getAcls(principal).size) } + @Test(expected = classOf[UnsupportedVersionException]) + def testThrowsOnAddPrefixedAclIfInterBrokerProtocolVersionTooLow(): Unit = { + givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV0)) + simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", PREFIXED)) + } + + @Test + def testWritesExtendedAclChangeEventIfInterBrokerProtocolNotSet(): Unit = { + givenAuthorizerWithProtocolVersion(Option.empty) + val resource = Resource(Topic, "z_other", PREFIXED) + val expected = new String(ZkAclStore(PREFIXED).changeStore.createChangeNode(resource).bytes, UTF_8) + + simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource) + + val actual = getAclChangeEventAsString(PREFIXED) + + assertEquals(expected, actual) + } + + @Test + def testWritesExtendedAclChangeEventWhenInterBrokerProtocolAtLeastKafkaV2(): Unit = { + givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV1)) + val resource = Resource(Topic, "z_other", PREFIXED) + val expected = new String(ZkAclStore(PREFIXED).changeStore.createChangeNode(resource).bytes, UTF_8) + + simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource) + + val actual = getAclChangeEventAsString(PREFIXED) + + assertEquals(expected, actual) + } + + @Test + def testWritesLiteralWritesLiteralAclChangeEventWhenInterBrokerProtocolLessThanKafkaV2eralAclChangesForOlderProtocolVersions(): Unit = { + givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV0)) + val resource = Resource(Topic, "z_other", LITERAL) + val expected = new String(ZkAclStore(LITERAL).changeStore.createChangeNode(resource).bytes, UTF_8) + + simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource) + + val actual = getAclChangeEventAsString(LITERAL) + + assertEquals(expected, actual) + } + + @Test + def testWritesLiteralAclChangeEventWhenInterBrokerProtocolIsKafkaV2(): Unit = { + givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV1)) + val resource = Resource(Topic, "z_other", LITERAL) + val expected = new String(ZkAclStore(LITERAL).changeStore.createChangeNode(resource).bytes, UTF_8) + + simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource) + + val actual = getAclChangeEventAsString(LITERAL) + + assertEquals(expected, actual) + } + + private def givenAuthorizerWithProtocolVersion(protocolVersion: Option[ApiVersion]) { + simpleAclAuthorizer.close() + + val props = TestUtils.createBrokerConfig(0, zkConnect) + props.put(SimpleAclAuthorizer.SuperUsersProp, superUsers) + protocolVersion.foreach(version => props.put(KafkaConfig.InterBrokerProtocolVersionProp, version.toString)) + + config = KafkaConfig.fromProps(props) + + simpleAclAuthorizer.configure(config.originals) + } + + private def getAclChangeEventAsString(patternType: ResourceNameType) = { + val store = ZkAclStore(patternType) + val children = zooKeeperClient.handleRequest(GetChildrenRequest(store.changeStore.aclChangePath)) + children.maybeThrow() + assertEquals("Expecting 1 change event", 1, children.children.size) + + val data = zooKeeperClient.handleRequest(GetDataRequest(s"${store.changeStore.aclChangePath}/${children.children.head}")) + data.maybeThrow() + + new String(data.data, UTF_8) + } + private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], removedAcls: Set[Acl], resource: Resource = resource): Set[Acl] = { var acls = originalAcls diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index cfaf731768019..cc67a010bc401 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -34,11 +34,11 @@ import org.apache.kafka.common.utils.{SecurityUtils, Time} import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException} import org.junit.Assert._ import org.junit.{After, Before, Test} + import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.collection.{Seq, mutable} import scala.util.Random - import kafka.controller.LeaderIsrAndControllerEpoch import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult import kafka.zookeeper._ @@ -426,10 +426,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @Test def testAclManagementMethods() { - ZkAclStore.stores.foreach(store => { assertFalse(zkClient.pathExists(store.aclPath)) - assertFalse(zkClient.pathExists(store.aclChangePath)) + assertFalse(zkClient.pathExists(store.changeStore.aclChangePath)) ResourceType.values.foreach(resource => assertFalse(zkClient.pathExists(store.path(resource)))) }) @@ -438,11 +437,11 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { ZkAclStore.stores.foreach(store => { assertTrue(zkClient.pathExists(store.aclPath)) - assertTrue(zkClient.pathExists(store.aclChangePath)) + assertTrue(zkClient.pathExists(store.changeStore.aclChangePath)) ResourceType.values.foreach(resource => assertTrue(zkClient.pathExists(store.path(resource)))) - val resource1 = new Resource(Topic, UUID.randomUUID().toString, store.nameType) - val resource2 = new Resource(Topic, UUID.randomUUID().toString, store.nameType) + val resource1 = new Resource(Topic, UUID.randomUUID().toString, store.patternType) + val resource2 = new Resource(Topic, UUID.randomUUID().toString, store.patternType) // try getting acls for non-existing resource var versionedAcls = zkClient.getVersionedAclsForResource(resource1) @@ -472,10 +471,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { assertEquals(1, versionedAcls.zkVersion) //get resource Types - assertTrue(ResourceType.values.map( rt => rt.name).toSet == zkClient.getResourceTypes(store.nameType).toSet) + assertTrue(ResourceType.values.map( rt => rt.name).toSet == zkClient.getResourceTypes(store.patternType).toSet) //get resource name - val resourceNames = zkClient.getResourceNames(store.nameType, Topic) + val resourceNames = zkClient.getResourceNames(store.patternType, Topic) assertEquals(2, resourceNames.size) assertTrue(Set(resource1.name,resource2.name) == resourceNames.toSet) @@ -488,14 +487,13 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { //delete with valid expected zk version assertTrue(zkClient.conditionalDelete(resource2, 0)) + zkClient.createAclChangeNotification(Resource(Group, "resource1", store.patternType)) + zkClient.createAclChangeNotification(Resource(Topic, "resource2", store.patternType)) - zkClient.createAclChangeNotification(Resource(Group, "resource1", store.nameType)) - zkClient.createAclChangeNotification(Resource(Topic, "resource2", store.nameType)) - - assertEquals(2, zkClient.getChildren(store.aclChangePath).size) + assertEquals(2, zkClient.getChildren(store.changeStore.aclChangePath).size) zkClient.deleteAclChangeNotifications() - assertTrue(zkClient.getChildren(store.aclChangePath).isEmpty) + assertTrue(zkClient.getChildren(store.changeStore.aclChangePath).isEmpty) }) } diff --git a/docs/upgrade.html b/docs/upgrade.html index 0430b43eb30ec..228798860b871 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -64,7 +64,7 @@

Upgrading from 0.8.x, 0.9.x, 0.1
  • ACLs should not be added to prefixed resources, (added in KIP-290), until all brokers in the cluster have been updated. -

    NOTE: any prefixed ACLs added to a cluster will be ignored should the cluster be downgraded again. +

    NOTE: any prefixed ACLs added to a cluster, even after the cluster is fully upgraded, will be ignored should the cluster be downgraded again.