From 16f55977ced5eaa437e00b4c9c18f6f01a0521df Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Thu, 7 Jun 2018 21:40:57 +0100 Subject: [PATCH 01/10] KAFKA-7007: All ACL changes should use single /kafka-acl-changes path --- .../security/auth/SimpleAclAuthorizer.scala | 23 ++---- .../main/scala/kafka/zk/KafkaZkClient.scala | 25 +++--- core/src/main/scala/kafka/zk/ZkData.scala | 82 +++++++++++++------ ...ZkNodeChangeNotificationListenerTest.scala | 6 +- .../unit/kafka/zk/KafkaZkClientTest.scala | 16 ++-- docs/upgrade.html | 7 +- 6 files changed, 89 insertions(+), 70 deletions(-) diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index c828970cc0d79..40ca6a628caeb 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -26,7 +26,7 @@ import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls import kafka.server.KafkaConfig import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ -import kafka.zk.{AclChangeNotificationSequenceZNode, KafkaZkClient, ZkAclStore} +import kafka.zk.{AclChangeNotificationSequenceZNode, AclChangeNotificationZNode, KafkaZkClient, ZkAclStore} import org.apache.kafka.common.resource.{ResourceFilter, ResourceNameType => JResourceNameType} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{SecurityUtils, Time} @@ -55,7 +55,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging { private var superUsers = Set.empty[KafkaPrincipal] private var shouldAllowEveryoneIfNoAclIsFound = false private var zkClient: KafkaZkClient = _ - private var aclChangeListeners: Seq[ZkNodeChangeNotificationListener] = List() + private var aclChangeListener: ZkNodeChangeNotificationListener = _ @volatile private var aclCache = new scala.collection.immutable.TreeMap[Resource, VersionedAcls]()(ResourceOrdering) @@ -98,7 +98,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging { loadCache() - startZkChangeListeners() + aclChangeListener = new ZkNodeChangeNotificationListener(zkClient, AclChangeNotificationZNode.path, AclChangeNotificationSequenceZNode.SequenceNumberPrefix, AclChangedNotificationHandler) + aclChangeListener.init() } override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = { @@ -230,7 +231,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging { } def close() { - aclChangeListeners.foreach(listener => listener.close()) + if (aclChangeListener != null) aclChangeListener.close() if (zkClient != null) zkClient.close() } @@ -250,16 +251,6 @@ class SimpleAclAuthorizer extends Authorizer with Logging { } } - private def startZkChangeListeners(): Unit = { - aclChangeListeners = ZkAclStore.stores.map(store => { - val aclChangeListener = new ZkNodeChangeNotificationListener( - zkClient, store.aclChangePath, AclChangeNotificationSequenceZNode.SequenceNumberPrefix, new AclChangedNotificationHandler(store)) - - aclChangeListener.init() - aclChangeListener - }) - } - private def logAuditMessage(principal: KafkaPrincipal, authorized: Boolean, operation: Operation, resource: Resource, host: String) { def logMessage: String = { val authResult = if (authorized) "Allowed" else "Denied" @@ -349,9 +340,9 @@ class SimpleAclAuthorizer extends Authorizer with Logging { retryBackoffMs + Random.nextInt(retryBackoffJitterMs) } - class AclChangedNotificationHandler(store: ZkAclStore) extends NotificationHandler { + object AclChangedNotificationHandler extends NotificationHandler { override def processNotification(notificationMessage: Array[Byte]) { - val resource: Resource = store.decode(notificationMessage) + val resource: Resource = AclChangeNotificationSequenceZNode.decode(notificationMessage) inWriteLock(lock) { val versionedAcls = getAclsFromZk(resource) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 90f71a1e1d201..534cf0279ed8b 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -943,9 +943,10 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean * Creates the required zk nodes for Acl storage */ def createAclPaths(): Unit = { + createRecursive(AclChangeNotificationZNode.path, throwIfPathExists = false) + ZkAclStore.stores.foreach(store => { createRecursive(store.aclPath, throwIfPathExists = false) - createRecursive(store.aclChangePath, throwIfPathExists = false) ResourceType.values.foreach(resourceType => createRecursive(store.path(resourceType), throwIfPathExists = false)) }) } @@ -1008,8 +1009,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean * @param resource resource name */ def createAclChangeNotification(resource: Resource): Unit = { - val store = ZkAclStore(resource.resourceNameType) - val path = store.changeSequenceZNode.createPath + val path = AclChangeNotificationSequenceZNode.createPath val createRequest = CreateRequest(path, AclChangeNotificationSequenceZNode.encode(resource), acls(path), CreateMode.PERSISTENT_SEQUENTIAL) val createResponse = retryRequestUntilConnected(createRequest) createResponse.maybeThrow @@ -1033,24 +1033,21 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean * @throws KeeperException if there is an error while deleting Acl change notifications */ def deleteAclChangeNotifications(): Unit = { - ZkAclStore.stores.foreach(store => { - val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(store.aclChangePath)) - if (getChildrenResponse.resultCode == Code.OK) { - deleteAclChangeNotifications(store, getChildrenResponse.children) - } else if (getChildrenResponse.resultCode != Code.NONODE) { - getChildrenResponse.maybeThrow - } - }) + val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(AclChangeNotificationZNode.path)) + if (getChildrenResponse.resultCode == Code.OK) { + deleteAclChangeNotifications(getChildrenResponse.children) + } else if (getChildrenResponse.resultCode != Code.NONODE) { + getChildrenResponse.maybeThrow + } } /** * Deletes the Acl change notifications associated with the given sequence nodes * @param sequenceNodes */ - private def deleteAclChangeNotifications(store: ZkAclStore, sequenceNodes: Seq[String]): Unit = { - val aclChangeNotificationSequenceZNode = store.changeSequenceZNode + private def deleteAclChangeNotifications(sequenceNodes: Seq[String]): Unit = { val deleteRequests = sequenceNodes.map { sequenceNode => - DeleteRequest(aclChangeNotificationSequenceZNode.deletePath(sequenceNode), ZkVersion.NoVersion) + DeleteRequest(AclChangeNotificationSequenceZNode.deletePath(sequenceNode), ZkVersion.NoVersion) } val deleteResponses = retryRequestsUntilConnected(deleteRequests) diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index 0524b4599c11d..12349d0f51e52 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -30,6 +30,7 @@ import kafka.security.auth.{Acl, Literal, Prefixed, Resource, ResourceNameType, import kafka.server.{ConfigType, DelegationTokenManager} import kafka.utils.Json import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.UnsupportedVersionException import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation} @@ -41,6 +42,7 @@ import scala.beans.BeanProperty import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.collection.{Seq, breakOut} +import scala.util.{Failure, Success, Try} // This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes). @@ -460,22 +462,12 @@ case class ZkAclStore(nameType: ResourceNameType) { val aclPath: String = nameType match { case Literal => "/kafka-acl" case Prefixed => "/kafka-prefixed-acl" - case _ => throw new IllegalArgumentException("Unknown name type:" + nameType) - } - - val aclChangePath: String = nameType match { - case Literal => "/kafka-acl-changes" - case Prefixed => "/kafka-prefixed-acl-changes" - case _ => throw new IllegalArgumentException("Unknown name type:" + nameType) + case _ => throw new IllegalArgumentException("Invalid name type:" + nameType) } def path(resourceType: ResourceType) = s"$aclPath/$resourceType" def path(resourceType: ResourceType, resourceName: String): String = s"$aclPath/$resourceType/$resourceName" - - def changeSequenceZNode: AclChangeNotificationSequenceZNode = AclChangeNotificationSequenceZNode(this) - - def decode(notificationMessage: Array[Byte]): Resource = AclChangeNotificationSequenceZNode.decode(nameType, notificationMessage) } object ZkAclStore { @@ -483,7 +475,17 @@ object ZkAclStore { .map(nameType => ZkAclStore(nameType)) val securePaths: Seq[String] = stores - .flatMap(store => List(store.aclPath, store.aclChangePath)) + .flatMap(store => List(store.aclPath)) ++ Seq(AclChangeNotificationZNode.path) +} + +@deprecated("There are now multiple roots for ACLs within ZK. Use ZkAclStore", "2.0") +object AclZNode { + def path = "/kafka-acl" +} + +@deprecated("There are now multiple roots for ACLs within ZK. Use ZkAclStore", "2.0") +object ResourceTypeZNode { + def path(resourceType: String) = s"${AclZNode.path}/$resourceType" } object ResourceZNode { @@ -493,26 +495,54 @@ object ResourceZNode { def decode(bytes: Array[Byte], stat: Stat): VersionedAcls = VersionedAcls(Acl.fromBytes(bytes), stat.getVersion) } +object AclChangeNotificationZNode { + def path = "/kafka-acl-changes" +} + +case class AclChangeEvent(@BeanProperty @JsonProperty("version") version: Int, + @BeanProperty @JsonProperty("resourceType") resourceType: String, + @BeanProperty @JsonProperty("name") name: String, + @BeanProperty @JsonProperty("resourceNameType") resourceNameType: String) { + if (version > AclChangeEvent.currentVersion) + throw new UnsupportedVersionException(s"Acl change event received for unsupported version: $version") + + def toResource : Try[Resource] = { + for { + resType <- Try(ResourceType.fromString(resourceType)) + nameType <- Try(ResourceNameType.fromString(resourceNameType)) + resource = Resource(resType, name, nameType) + } yield resource + } +} + +object AclChangeEvent { + val currentVersion: Int = 1 +} + object AclChangeNotificationSequenceZNode { - val Separator = ":" def SequenceNumberPrefix = "acl_changes_" - def encode(resource: Resource): Array[Byte] = { - (resource.resourceType.name + Separator + resource.name).getBytes(UTF_8) - } + def createPath = s"${AclChangeNotificationZNode.path}/$SequenceNumberPrefix" + def deletePath(sequenceNode: String) = s"${AclChangeNotificationZNode.path}/$sequenceNode" - def decode(nameType: ResourceNameType, bytes: Array[Byte]): Resource = { - val str = new String(bytes, UTF_8) - str.split(Separator, 2) match { - case Array(resourceType, name, _*) => Resource(ResourceType.fromString(resourceType), name, nameType) - case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str) + def encode(resource: Resource): Array[Byte] = + Json.encodeAsBytes(AclChangeEvent( + AclChangeEvent.currentVersion, + resource.resourceType.name, + resource.name, + resource.resourceNameType.name)) + + def decode(bytes: Array[Byte]): Resource = { + val changeEvent = Json.parseBytesAs[AclChangeEvent](bytes) match { + case Right(event) => event + case Left(e) => throw new IllegalArgumentException("Failed to parse ACL change event", e) } - } -} -case class AclChangeNotificationSequenceZNode(store: ZkAclStore) { - def createPath = s"${store.aclChangePath}/${AclChangeNotificationSequenceZNode.SequenceNumberPrefix}" - def deletePath(sequenceNode: String) = s"${store.aclChangePath}/$sequenceNode" + changeEvent.toResource match { + case Success(r) => r + case Failure(e) => throw new IllegalArgumentException("Failed to parse ACL change event", e) + } + } } object ClusterZNode { diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala index cee0bd6b72582..e53c60e4e7ca6 100644 --- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala +++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala @@ -18,7 +18,7 @@ package kafka.common import kafka.security.auth.{Group, Literal, Resource} import kafka.utils.TestUtils -import kafka.zk.{AclChangeNotificationSequenceZNode, ZkAclStore, ZooKeeperTestHarness} +import kafka.zk.{AclChangeNotificationSequenceZNode, AclChangeNotificationZNode, ZooKeeperTestHarness} import org.junit.{After, Test} class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness { @@ -38,7 +38,7 @@ class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness { @volatile var invocationCount = 0 val notificationHandler = new NotificationHandler { override def processNotification(notificationMessage: Array[Byte]): Unit = { - notification = AclChangeNotificationSequenceZNode.decode(Literal, notificationMessage) + notification = AclChangeNotificationSequenceZNode.decode(notificationMessage) invocationCount += 1 } } @@ -48,7 +48,7 @@ class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness { val notificationMessage2 = Resource(Group, "messageB", Literal) val changeExpirationMs = 1000 - notificationListener = new ZkNodeChangeNotificationListener(zkClient, ZkAclStore(Literal).aclChangePath, + notificationListener = new ZkNodeChangeNotificationListener(zkClient, AclChangeNotificationZNode.path, AclChangeNotificationSequenceZNode.SequenceNumberPrefix, notificationHandler, changeExpirationMs) notificationListener.init() diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index cfaf731768019..98b0e3dd6a3bf 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -426,19 +426,19 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @Test def testAclManagementMethods() { - + assertFalse(zkClient.pathExists(AclChangeNotificationZNode.path)) ZkAclStore.stores.foreach(store => { assertFalse(zkClient.pathExists(store.aclPath)) - assertFalse(zkClient.pathExists(store.aclChangePath)) ResourceType.values.foreach(resource => assertFalse(zkClient.pathExists(store.path(resource)))) }) // create acl paths zkClient.createAclPaths + assertTrue(zkClient.pathExists(AclChangeNotificationZNode.path)) + ZkAclStore.stores.foreach(store => { assertTrue(zkClient.pathExists(store.aclPath)) - assertTrue(zkClient.pathExists(store.aclChangePath)) ResourceType.values.foreach(resource => assertTrue(zkClient.pathExists(store.path(resource)))) val resource1 = new Resource(Topic, UUID.randomUUID().toString, store.nameType) @@ -488,15 +488,15 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { //delete with valid expected zk version assertTrue(zkClient.conditionalDelete(resource2, 0)) - zkClient.createAclChangeNotification(Resource(Group, "resource1", store.nameType)) zkClient.createAclChangeNotification(Resource(Topic, "resource2", store.nameType)) + }) - assertEquals(2, zkClient.getChildren(store.aclChangePath).size) + val expectedChangeEvents = ResourceNameType.values.size * 2 + assertEquals(expectedChangeEvents, zkClient.getChildren(AclChangeNotificationZNode.path).size) - zkClient.deleteAclChangeNotifications() - assertTrue(zkClient.getChildren(store.aclChangePath).isEmpty) - }) + zkClient.deleteAclChangeNotifications() + assertTrue(zkClient.getChildren(AclChangeNotificationZNode.path).isEmpty) } @Test diff --git a/docs/upgrade.html b/docs/upgrade.html index 0430b43eb30ec..0f483066060bf 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -42,6 +42,7 @@

Upgrading from 0.8.x, 0.9.x, 0.1
  • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1, 1.2).
  • +
  • IMPORTANT: Do not change any ACLs during the upgrade. (The format of the ACL change event has changed and is not compatible with old brokers).
  • Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
  • Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 2.0.
  • Restart the brokers one by one for the new protocol version to take effect.
  • @@ -61,10 +62,10 @@

    Upgrading from 0.8.x, 0.9.x, 0.1 Similarly for the message format version.
  • If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguities. Hot-swapping the jar-file only might not work.
  • -
  • ACLs should not be added to prefixed resources, +
  • ACLs should not be added during the upgrade process. +

    NOTE: Any ACLs added to prefixed resources, (added in KIP-290), - until all brokers in the cluster have been updated. -

    NOTE: any prefixed ACLs added to a cluster will be ignored should the cluster be downgraded again. + will be ignored should the cluster be downgraded again.

  • From 60139255d674649101382ffeb5992a7a1e249a21 Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Sat, 9 Jun 2018 13:21:59 +0100 Subject: [PATCH 02/10] Removed unused from ZkData --- core/src/main/scala/kafka/zk/ZkData.scala | 12 +----------- .../test/scala/unit/kafka/admin/AclCommandTest.scala | 2 +- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index 2f25c9643f173..578a8ea9ad5df 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -478,16 +478,6 @@ object ZkAclStore { .flatMap(store => List(store.aclPath)) ++ Seq(AclChangeNotificationZNode.path) } -@deprecated("There are now multiple roots for ACLs within ZK. Use ZkAclStore", "2.0") -object AclZNode { - def path = "/kafka-acl" -} - -@deprecated("There are now multiple roots for ACLs within ZK. Use ZkAclStore", "2.0") -object ResourceTypeZNode { - def path(resourceType: String) = s"${AclZNode.path}/$resourceType" -} - object ResourceZNode { def path(resource: Resource): String = ZkAclStore(resource.nameType).path(resource.resourceType, resource.name) @@ -530,7 +520,7 @@ object AclChangeNotificationSequenceZNode { AclChangeEvent.currentVersion, resource.resourceType.name, resource.name, - resource.resourceNameType.name)) + resource.nameType.name)) def decode(bytes: Array[Byte]): Resource = { val changeEvent = Json.parseBytesAs[AclChangeEvent](bytes) match { diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala index 71754ba263318..2a0bb7efa46b2 100644 --- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala @@ -24,7 +24,7 @@ import kafka.server.KafkaConfig import kafka.utils.{Logging, TestUtils} import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.junit.{After, Before, Test} +import org.junit.{Before, Test} class AclCommandTest extends ZooKeeperTestHarness with Logging { From 237db81715171d0937fccac5240c1cfeabd2d5bf Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Sat, 9 Jun 2018 13:42:32 +0100 Subject: [PATCH 03/10] Post merge fix up --- core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index 98b0e3dd6a3bf..844690f13cb2e 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -34,14 +34,15 @@ import org.apache.kafka.common.utils.{SecurityUtils, Time} import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException} import org.junit.Assert._ import org.junit.{After, Before, Test} + import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.collection.{Seq, mutable} import scala.util.Random - import kafka.controller.LeaderIsrAndControllerEpoch import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult import kafka.zookeeper._ +import org.apache.kafka.common.resource.ResourceNameType import org.apache.kafka.common.security.JaasUtils import org.apache.zookeeper.data.Stat @@ -492,7 +493,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { zkClient.createAclChangeNotification(Resource(Topic, "resource2", store.nameType)) }) - val expectedChangeEvents = ResourceNameType.values.size * 2 + val expectedChangeEvents = ResourceNameType.values() + .count(nameType => nameType != ResourceNameType.ANY && nameType != ResourceNameType.UNKNOWN) * 2 + assertEquals(expectedChangeEvents, zkClient.getChildren(AclChangeNotificationZNode.path).size) zkClient.deleteAclChangeNotifications() From 9185513cf8b2acef27133c5290bc15a25f77fcc6 Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Sat, 9 Jun 2018 15:17:58 +0100 Subject: [PATCH 04/10] Jun's requested changes --- .../security/auth/SimpleAclAuthorizer.scala | 15 +++- .../main/scala/kafka/zk/KafkaZkClient.scala | 20 ++++- core/src/main/scala/kafka/zk/ZkData.scala | 18 +++- ...lChangeNotificationSequenceZNodeTest.scala | 49 +++++++++++ .../auth/SimpleAclAuthorizerTest.scala | 88 ++++++++++++++++++- docs/upgrade.html | 12 ++- 6 files changed, 191 insertions(+), 11 deletions(-) create mode 100644 core/src/test/scala/kafka/zk/AclChangeNotificationSequenceZNodeTest.scala diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index b78efb26e8d95..5670985b352df 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -20,6 +20,7 @@ import java.util import java.util.concurrent.locks.ReentrantReadWriteLock import com.typesafe.scalalogging.Logger +import kafka.api.KAFKA_2_0_IV1 import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener} import kafka.network.RequestChannel.Session import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls @@ -27,6 +28,7 @@ import kafka.server.KafkaConfig import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ import kafka.zk.{AclChangeNotificationSequenceZNode, AclChangeNotificationZNode, KafkaZkClient, ZkAclStore} +import org.apache.kafka.common.errors.UnsupportedVersionException import org.apache.kafka.common.resource.ResourceNameType import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{SecurityUtils, Time} @@ -56,6 +58,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging { private var shouldAllowEveryoneIfNoAclIsFound = false private var zkClient: KafkaZkClient = _ private var aclChangeListener: ZkNodeChangeNotificationListener = _ + private var legacyChangeEvent: Boolean = _ @volatile private var aclCache = new scala.collection.immutable.TreeMap[Resource, VersionedAcls]()(ResourceOrdering) @@ -96,6 +99,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging { zkMaxInFlightRequests, time, "kafka.security", "SimpleAclAuthorizer") zkClient.createAclPaths() + legacyChangeEvent = kafkaConfig.interBrokerProtocolVersion < KAFKA_2_0_IV1 + loadCache() aclChangeListener = new ZkNodeChangeNotificationListener(zkClient, AclChangeNotificationZNode.path, AclChangeNotificationSequenceZNode.SequenceNumberPrefix, AclChangedNotificationHandler) @@ -162,6 +167,11 @@ class SimpleAclAuthorizer extends Authorizer with Logging { override def addAcls(acls: Set[Acl], resource: Resource) { if (acls != null && acls.nonEmpty) { + if (legacyChangeEvent && resource.nameType == ResourceNameType.PREFIXED) { + throw new UnsupportedVersionException(s"Adding ACLs on prefixed resource patterns requires " + + s"${KafkaConfig.InterBrokerProtocolVersionProp} of $KAFKA_2_0_IV1 or greater") + } + inWriteLock(lock) { updateResourceAcls(resource) { currentAcls => currentAcls ++ acls @@ -334,7 +344,10 @@ class SimpleAclAuthorizer extends Authorizer with Logging { } private def updateAclChangedFlag(resource: Resource) { - zkClient.createAclChangeNotification(resource) + if (legacyChangeEvent) + zkClient.createLegacyAclChangeNotification(resource) + else + zkClient.createAclChangeNotification(resource) } private def backoffTime = { diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index e4ad57b56f877..4fd37d06f6428 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -1006,8 +1006,24 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean } /** - * Creates Acl change notification message - * @param resource resource name + * Creates colon separated Acl change notification message. + * + * Kafka 2.0 saw the format of the ACL change event change from a 'colon separated' string, to a JSON message. + * This method will create a message in the old format, which is still needed while a cluster has + * 'inter.broker.protocol.version' < 2.0. + * + * @param resource resource pattern that has changed + */ + def createLegacyAclChangeNotification(resource: Resource): Unit = { + val path = AclChangeNotificationSequenceZNode.createPath + val createRequest = CreateRequest(path, AclChangeNotificationSequenceZNode.encodeLegacy(resource), acls(path), CreateMode.PERSISTENT_SEQUENTIAL) + val createResponse = retryRequestUntilConnected(createRequest) + createResponse.maybeThrow + } + + /** + * Creates JSON Acl change notification message. + * @param resource resource pattern that has changed */ def createAclChangeNotification(resource: Resource): Unit = { val path = AclChangeNotificationSequenceZNode.createPath diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index 613550d9e2467..3da61fd346c40 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -517,6 +517,14 @@ object AclChangeNotificationSequenceZNode { def createPath = s"${AclChangeNotificationZNode.path}/$SequenceNumberPrefix" def deletePath(sequenceNode: String) = s"${AclChangeNotificationZNode.path}/$sequenceNode" + def encodeLegacy(resource: Resource): Array[Byte] = { + if (resource.nameType != ResourceNameType.LITERAL) + throw new IllegalArgumentException("Only literal resource patterns can be encoded") + + val legacyName = resource.resourceType + Resource.Separator + resource.name + legacyName.getBytes(UTF_8) + } + def encode(resource: Resource): Array[Byte] = Json.encodeAsBytes(AclChangeEvent( AclChangeEvent.currentVersion, @@ -525,7 +533,15 @@ object AclChangeNotificationSequenceZNode { resource.nameType.name)) def decode(bytes: Array[Byte]): Resource = { - val changeEvent = Json.parseBytesAs[AclChangeEvent](bytes) match { + val string = new String(bytes, UTF_8) + if (string.startsWith("{")) + decode(string) + else + Resource.fromString(string) + } + + private def decode(string: String): Resource = { + val changeEvent = Json.parseStringAs[AclChangeEvent](string) match { case Right(event) => event case Left(e) => throw new IllegalArgumentException("Failed to parse ACL change event", e) } diff --git a/core/src/test/scala/kafka/zk/AclChangeNotificationSequenceZNodeTest.scala b/core/src/test/scala/kafka/zk/AclChangeNotificationSequenceZNodeTest.scala new file mode 100644 index 0000000000000..f0481adfefe73 --- /dev/null +++ b/core/src/test/scala/kafka/zk/AclChangeNotificationSequenceZNodeTest.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.zk + +import kafka.security.auth.{Resource, Topic} +import org.apache.kafka.common.resource.ResourceNameType +import org.junit.Test +import org.junit.Assert.assertEquals + +class AclChangeNotificationSequenceZNodeTest { + private val literalResource = Resource(Topic, "some-topic", ResourceNameType.LITERAL) + private val prefixedResource = Resource(Topic, "some-topic", ResourceNameType.PREFIXED) + + @Test(expected = classOf[IllegalArgumentException]) + def shouldThrowFromLegacyEncodeOnNoneLiteral(): Unit = { + AclChangeNotificationSequenceZNode.encodeLegacy(prefixedResource) + } + + @Test + def shouldRoundTripLegacyString(): Unit = { + val bytes = AclChangeNotificationSequenceZNode.encodeLegacy(literalResource) + val actual = AclChangeNotificationSequenceZNode.decode(bytes) + + assertEquals(literalResource, actual) + } + + @Test + def shouldRoundTripJSON(): Unit = { + val bytes = AclChangeNotificationSequenceZNode.encode(prefixedResource) + val actual = AclChangeNotificationSequenceZNode.decode(bytes) + + assertEquals(prefixedResource, actual) + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala index 05a433ce97bc9..7132f2e0755e0 100644 --- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala @@ -17,15 +17,20 @@ package kafka.security.auth import java.net.InetAddress +import java.nio.charset.StandardCharsets.UTF_8 import java.util.UUID +import kafka.api.{ApiVersion, KAFKA_2_0_IV0, KAFKA_2_0_IV1} import kafka.network.RequestChannel.Session import kafka.security.auth.Acl.{WildCardHost, WildCardResource} import kafka.server.KafkaConfig -import kafka.utils.TestUtils -import kafka.zk.ZooKeeperTestHarness +import kafka.utils.{Json, TestUtils} +import kafka.zk.{AclChangeEvent, AclChangeNotificationZNode, ZooKeeperTestHarness} +import kafka.zookeeper.{GetChildrenRequest, GetDataRequest, ZooKeeperClient} +import org.apache.kafka.common.errors.UnsupportedVersionException import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED} import org.apache.kafka.common.security.auth.KafkaPrincipal +import org.apache.kafka.common.utils.Time import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -47,7 +52,8 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { val username = "alice" val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) val session = Session(principal, testHostName) - var config: KafkaConfig = null + var config: KafkaConfig = _ + private var zooKeeperClient: ZooKeeperClient = _ @Before override def setUp() { @@ -64,12 +70,16 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { simpleAclAuthorizer.configure(config.originals) simpleAclAuthorizer2.configure(config.originals) resource = Resource(Topic, "foo-" + UUID.randomUUID(), LITERAL) + + zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, + Time.SYSTEM, "kafka.test", "SimpleAclAuthorizerTest") } @After override def tearDown(): Unit = { simpleAclAuthorizer.close() simpleAclAuthorizer2.close() + zooKeeperClient.close() super.tearDown() } @@ -553,6 +563,78 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { assertEquals(4, simpleAclAuthorizer.getAcls(principal).size) } + @Test(expected = classOf[UnsupportedVersionException]) + def testThrowsOnAddPrefixedAclIfInterBrokerProtocolVersionTooLow(): Unit = { + givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV0)) + simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", PREFIXED)) + } + + @Test + def testWritesAclChangeEventAsJSONIfInterBrokerProtocolNotSet(): Unit = { + givenAuthorizerWithProtocolVersion(Option.empty) + + simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", PREFIXED)) + + val string = getAclChangeEventAsString + + val changeEvent = Json.parseStringAs[AclChangeEvent](string) match { + case Right(event) => event + case Left(e) => fail("Failed to parse ACL change event", e) + } + + assertEquals(AclChangeEvent(AclChangeEvent.currentVersion, "Topic", "z_other", "PREFIXED"), changeEvent) + } + + @Test + def testWritesAclChangeEventAsJSONWhenInterBrokerProtocolAtLeastKafka20V1(): Unit = { + givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV1)) + + simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", PREFIXED)) + + val string = getAclChangeEventAsString + + val changeEvent = Json.parseStringAs[AclChangeEvent](string) match { + case Right(event) => event + case Left(e) => fail("Failed to parse ACL change event", e) + } + + assertEquals(AclChangeEvent(AclChangeEvent.currentVersion, "Topic", "z_other", "PREFIXED"), changeEvent) + } + + @Test + def testWritesAclChangeEventAsLegacyStringWhenInterBrokerProtocolLessThanKafka20V1(): Unit = { + givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV0)) + + simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", LITERAL)) + + val string = getAclChangeEventAsString + + assertEquals("Topic" + Resource.Separator + "z_other", string) + } + + private def givenAuthorizerWithProtocolVersion(protocolVersion: Option[ApiVersion]) { + simpleAclAuthorizer.close() + + val props = TestUtils.createBrokerConfig(0, zkConnect) + props.put(SimpleAclAuthorizer.SuperUsersProp, superUsers) + protocolVersion.foreach(version => props.put(KafkaConfig.InterBrokerProtocolVersionProp, version.toString)) + + config = KafkaConfig.fromProps(props) + + simpleAclAuthorizer.configure(config.originals) + } + + private def getAclChangeEventAsString = { + val children = zooKeeperClient.handleRequest(GetChildrenRequest(AclChangeNotificationZNode.path)) + children.maybeThrow() + assertEquals("Expecting 1 change event", 1, children.children.size) + + val data = zooKeeperClient.handleRequest(GetDataRequest(s"${AclChangeNotificationZNode.path}/${children.children.head}")) + data.maybeThrow() + + new String(data.data, UTF_8) + } + private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], removedAcls: Set[Acl], resource: Resource = resource): Set[Acl] = { var acls = originalAcls diff --git a/docs/upgrade.html b/docs/upgrade.html index 0f483066060bf..91f26ded8f068 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -42,7 +42,6 @@

    Upgrading from 0.8.x, 0.9.x, 0.1
  • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1, 1.2).
  • -
  • IMPORTANT: Do not change any ACLs during the upgrade. (The format of the ACL change event has changed and is not compatible with old brokers).
  • Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
  • Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 2.0.
  • Restart the brokers one by one for the new protocol version to take effect.
  • @@ -62,10 +61,15 @@

    Upgrading from 0.8.x, 0.9.x, 0.1 Similarly for the message format version.
  • If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguities. Hot-swapping the jar-file only might not work.
  • -
  • ACLs should not be added during the upgrade process. -

    NOTE: Any ACLs added to prefixed resources, +

  • Do not use the 2.0 AclCommand tool, a.k.a kafka-acl script, until all brokers are upgraded to 2.0. + You can continue to use the previous version of the tool while the cluster is upgrading. + (This is to avoid issues with the new JSON format for ACL change events introduced as part of + KIP-290) +
  • +
  • ACLs should not be added to prefixed resources, (added in KIP-290), - will be ignored should the cluster be downgraded again. + until all brokers in the cluster have been updated. +

    NOTE: any prefixed ACLs added to a cluster will be ignored should the cluster be downgraded again.

  • From 5769d56fce87663567736b70843a65467039216f Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Sun, 10 Jun 2018 11:39:43 +0100 Subject: [PATCH 05/10] Revert back from JSON ACL change event to String based, with either 2 part, (`:`), or 3 part, (`::`), depending on if broker api version is less than 2.0V1 or not, respectively. --- .../scala/kafka/security/auth/Resource.scala | 25 ++++++----- core/src/main/scala/kafka/zk/ZkData.scala | 27 ++---------- .../kafka/security/auth/ResourceTest.scala | 15 ++++--- ...lChangeNotificationSequenceZNodeTest.scala | 39 ++++++++++++++-- .../auth/SimpleAclAuthorizerTest.scala | 44 +++++++++---------- docs/upgrade.html | 5 ++- 6 files changed, 85 insertions(+), 70 deletions(-) diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala index f07a11c730066..a3dd06528f1a6 100644 --- a/core/src/main/scala/kafka/security/auth/Resource.scala +++ b/core/src/main/scala/kafka/security/auth/Resource.scala @@ -16,6 +16,7 @@ */ package kafka.security.auth +import kafka.common.KafkaException import org.apache.kafka.common.resource.{ResourceNameType, ResourcePattern} object Resource { @@ -26,16 +27,18 @@ object Resource { val WildCardResource = "*" def fromString(str: String): Resource = { - ResourceNameType.values.find(nameType => str.startsWith(nameType.name)) match { - case Some(nameType) => - str.split(Separator, 3) match { - case Array(_, resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name, nameType) - case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str) - } - case _ => - str.split(Separator, 2) match { - case Array(resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name, ResourceNameType.LITERAL) - case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str) + ResourceType.values.find(resourceType => str.startsWith(resourceType.name + Separator)) match { + case None => throw new KafkaException("Invalid resource string: '" + str + "'") + case Some(resourceType) => + val remaining = str.substring(resourceType.name.length + 1) + + ResourceNameType.values.find(nameType => remaining.startsWith(nameType.name + Separator)) match { + case Some(nameType) => + val name = remaining.substring(nameType.name.length + 1) + Resource(resourceType, name, nameType) + + case None => + Resource(resourceType, remaining, ResourceNameType.LITERAL) } } } @@ -74,7 +77,7 @@ case class Resource(resourceType: ResourceType, name: String, nameType: Resource } override def toString: String = { - nameType + Resource.Separator + resourceType.name + Resource.Separator + name + resourceType.name + Resource.Separator + nameType + Resource.Separator + name } } diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index 3da61fd346c40..a63df2dfe6e36 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -526,31 +526,10 @@ object AclChangeNotificationSequenceZNode { } def encode(resource: Resource): Array[Byte] = - Json.encodeAsBytes(AclChangeEvent( - AclChangeEvent.currentVersion, - resource.resourceType.name, - resource.name, - resource.nameType.name)) - - def decode(bytes: Array[Byte]): Resource = { - val string = new String(bytes, UTF_8) - if (string.startsWith("{")) - decode(string) - else - Resource.fromString(string) - } - - private def decode(string: String): Resource = { - val changeEvent = Json.parseStringAs[AclChangeEvent](string) match { - case Right(event) => event - case Left(e) => throw new IllegalArgumentException("Failed to parse ACL change event", e) - } + resource.toString.getBytes(UTF_8) - changeEvent.toResource match { - case Success(r) => r - case Failure(e) => throw new IllegalArgumentException("Failed to parse ACL change event", e) - } - } + def decode(bytes: Array[Byte]): Resource = + Resource.fromString(new String(bytes, UTF_8)) } object ClusterZNode { diff --git a/core/src/test/scala/kafka/security/auth/ResourceTest.scala b/core/src/test/scala/kafka/security/auth/ResourceTest.scala index 2924cff582387..c7ed94956533b 100644 --- a/core/src/test/scala/kafka/security/auth/ResourceTest.scala +++ b/core/src/test/scala/kafka/security/auth/ResourceTest.scala @@ -24,10 +24,15 @@ import org.junit.Assert._ class ResourceTest { @Test(expected = classOf[KafkaException]) - def shouldThrowTwoPartStringWithUnknownResourceType(): Unit = { + def shouldThrowOnTwoPartStringWithUnknownResourceType(): Unit = { Resource.fromString("Unknown:fred") } + @Test(expected = classOf[KafkaException]) + def shouldThrowOnBadResourceTypeSeparator(): Unit = { + Resource.fromString("Topic-fred") + } + @Test def shouldParseOldTwoPartString(): Unit = { assertEquals(Resource(Group, "fred", LITERAL), Resource.fromString("Group:fred")) @@ -41,14 +46,14 @@ class ResourceTest { @Test def shouldParseThreePartString(): Unit = { - assertEquals(Resource(Group, "fred", PREFIXED), Resource.fromString("PREFIXED:Group:fred")) - assertEquals(Resource(Topic, "t", LITERAL), Resource.fromString("LITERAL:Topic:t")) + assertEquals(Resource(Group, "fred", PREFIXED), Resource.fromString("Group:PREFIXED:fred")) + assertEquals(Resource(Topic, "t", LITERAL), Resource.fromString("Topic:LITERAL:t")) } @Test def shouldParseThreePartWithEmbeddedSeparators(): Unit = { - assertEquals(Resource(Group, ":This:is:a:weird:group:name:", PREFIXED), Resource.fromString("PREFIXED:Group::This:is:a:weird:group:name:")) - assertEquals(Resource(Group, ":This:is:a:weird:group:name:", LITERAL), Resource.fromString("LITERAL:Group::This:is:a:weird:group:name:")) + assertEquals(Resource(Group, ":This:is:a:weird:group:name:", PREFIXED), Resource.fromString("Group:PREFIXED::This:is:a:weird:group:name:")) + assertEquals(Resource(Group, ":This:is:a:weird:group:name:", LITERAL), Resource.fromString("Group:LITERAL::This:is:a:weird:group:name:")) } @Test diff --git a/core/src/test/scala/kafka/zk/AclChangeNotificationSequenceZNodeTest.scala b/core/src/test/scala/kafka/zk/AclChangeNotificationSequenceZNodeTest.scala index f0481adfefe73..8ba5afc31575c 100644 --- a/core/src/test/scala/kafka/zk/AclChangeNotificationSequenceZNodeTest.scala +++ b/core/src/test/scala/kafka/zk/AclChangeNotificationSequenceZNodeTest.scala @@ -17,14 +17,17 @@ package kafka.zk -import kafka.security.auth.{Resource, Topic} -import org.apache.kafka.common.resource.ResourceNameType +import java.nio.charset.StandardCharsets.UTF_8 + +import kafka.security.auth.Resource.Separator +import kafka.security.auth.{Resource, ResourceType, Topic} +import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED} import org.junit.Test import org.junit.Assert.assertEquals class AclChangeNotificationSequenceZNodeTest { - private val literalResource = Resource(Topic, "some-topic", ResourceNameType.LITERAL) - private val prefixedResource = Resource(Topic, "some-topic", ResourceNameType.PREFIXED) + private val literalResource = Resource(Topic, "some-topic", LITERAL) + private val prefixedResource = Resource(Topic, "some-topic", PREFIXED) @Test(expected = classOf[IllegalArgumentException]) def shouldThrowFromLegacyEncodeOnNoneLiteral(): Unit = { @@ -46,4 +49,32 @@ class AclChangeNotificationSequenceZNodeTest { assertEquals(prefixedResource, actual) } + + @Test + def shouldNotThrowIfOldBrokerParsingNewFormatWithLiteralAcl(): Unit = { + val bytes = AclChangeNotificationSequenceZNode.encode(literalResource) + val actual = legacyDecode(bytes) + + assertEquals(Resource(Topic, "LITERAL:some-topic", LITERAL), actual) + } + + @Test + def shouldNotThrowIfOldBrokerParsingNewFormatWithPrefixedAcl(): Unit = { + val bytes = AclChangeNotificationSequenceZNode.encode(prefixedResource) + val actual = legacyDecode(bytes) + + assertEquals(Resource(Topic, "PREFIXED:some-topic", LITERAL), actual) + } + + private def legacyDecode(bytes: Array[Byte]): Resource = + legacyFromString(new String(bytes, UTF_8)) + + //noinspection ScalaDeprecation + // Old version of kafka.auth.Resource.fromString used in pre-2.0 Kafka: + private def legacyFromString(str: String): Resource = { + str.split(Separator, 2) match { + case Array(resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name) + case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str) + } + } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala index 7132f2e0755e0..98606f03e2122 100644 --- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala @@ -24,8 +24,8 @@ import kafka.api.{ApiVersion, KAFKA_2_0_IV0, KAFKA_2_0_IV1} import kafka.network.RequestChannel.Session import kafka.security.auth.Acl.{WildCardHost, WildCardResource} import kafka.server.KafkaConfig -import kafka.utils.{Json, TestUtils} -import kafka.zk.{AclChangeEvent, AclChangeNotificationZNode, ZooKeeperTestHarness} +import kafka.utils.TestUtils +import kafka.zk.{AclChangeNotificationSequenceZNode, AclChangeNotificationZNode, ZooKeeperTestHarness} import kafka.zookeeper.{GetChildrenRequest, GetDataRequest, ZooKeeperClient} import org.apache.kafka.common.errors.UnsupportedVersionException import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED} @@ -570,46 +570,42 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { } @Test - def testWritesAclChangeEventAsJSONIfInterBrokerProtocolNotSet(): Unit = { + def testWritesAclChangeEventAsNewFormatIfInterBrokerProtocolNotSet(): Unit = { givenAuthorizerWithProtocolVersion(Option.empty) + val resource = Resource(Topic, "z_other", PREFIXED) + val expected = new String(AclChangeNotificationSequenceZNode.encode(resource), UTF_8) - simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", PREFIXED)) - - val string = getAclChangeEventAsString + simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource) - val changeEvent = Json.parseStringAs[AclChangeEvent](string) match { - case Right(event) => event - case Left(e) => fail("Failed to parse ACL change event", e) - } + val actual = getAclChangeEventAsString - assertEquals(AclChangeEvent(AclChangeEvent.currentVersion, "Topic", "z_other", "PREFIXED"), changeEvent) + assertEquals(expected, actual) } @Test - def testWritesAclChangeEventAsJSONWhenInterBrokerProtocolAtLeastKafka20V1(): Unit = { + def testWritesAclChangeEventAsNewFormatWhenInterBrokerProtocolAtLeastKafka20V1(): Unit = { givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV1)) + val resource = Resource(Topic, "z_other", PREFIXED) + val expected = new String(AclChangeNotificationSequenceZNode.encode(resource), UTF_8) - simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", PREFIXED)) + simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource) - val string = getAclChangeEventAsString + val actual = getAclChangeEventAsString - val changeEvent = Json.parseStringAs[AclChangeEvent](string) match { - case Right(event) => event - case Left(e) => fail("Failed to parse ACL change event", e) - } - - assertEquals(AclChangeEvent(AclChangeEvent.currentVersion, "Topic", "z_other", "PREFIXED"), changeEvent) + assertEquals(expected, actual) } @Test - def testWritesAclChangeEventAsLegacyStringWhenInterBrokerProtocolLessThanKafka20V1(): Unit = { + def testWritesAclChangeEventAsLegacyFormatWhenInterBrokerProtocolLessThanKafka20V1(): Unit = { givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV0)) + val resource = Resource(Topic, "z_other", LITERAL) + val expected = new String(AclChangeNotificationSequenceZNode.encodeLegacy(resource), UTF_8) - simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", LITERAL)) + simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource) - val string = getAclChangeEventAsString + val actual = getAclChangeEventAsString - assertEquals("Topic" + Resource.Separator + "z_other", string) + assertEquals(expected, actual) } private def givenAuthorizerWithProtocolVersion(protocolVersion: Option[ApiVersion]) { diff --git a/docs/upgrade.html b/docs/upgrade.html index 91f26ded8f068..d77657af929b6 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -63,13 +63,14 @@

    Upgrading from 0.8.x, 0.9.x, 0.1 Hot-swapping the jar-file only might not work.
  • Do not use the 2.0 AclCommand tool, a.k.a kafka-acl script, until all brokers are upgraded to 2.0. You can continue to use the previous version of the tool while the cluster is upgrading. - (This is to avoid issues with the new JSON format for ACL change events introduced as part of + ACLs added by the 2.0 AclCommand tool will be ignored by older brokers. + (This is due to a change in the ACL change event format introduced as part of KIP-290)
  • ACLs should not be added to prefixed resources, (added in KIP-290), until all brokers in the cluster have been updated. -

    NOTE: any prefixed ACLs added to a cluster will be ignored should the cluster be downgraded again. +

    NOTE: any prefixed ACLs added to a cluster, even after the cluster is fully upgraded, will be ignored should the cluster be downgraded again.

  • From 7046b0c1f441acff16d534dc70b4fff2db66928d Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Mon, 11 Jun 2018 19:55:11 +0100 Subject: [PATCH 06/10] Jun's change requests --- .../ZkNodeChangeNotificationListener.scala | 21 ++++-- .../main/scala/kafka/zk/KafkaZkClient.scala | 4 +- core/src/main/scala/kafka/zk/ZkData.scala | 22 ------ ...lChangeNotificationSequenceZNodeTest.scala | 4 +- ...ZkNodeChangeNotificationListenerTest.scala | 69 ++++++++++++++----- 5 files changed, 69 insertions(+), 51 deletions(-) diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala index 51798519cb137..285a2ebf3d739 100644 --- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala +++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala @@ -24,6 +24,8 @@ import kafka.zk.{KafkaZkClient, StateChangeHandlers} import kafka.zookeeper.{StateChangeHandler, ZNodeChildChangeHandler} import org.apache.kafka.common.utils.Time +import scala.util.{Failure, Try} + /** * Handle the notificationMessage. */ @@ -83,12 +85,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient, for (notification <- notifications) { val changeId = changeNumber(notification) if (changeId > lastExecutedChange) { - val changeZnode = seqNodeRoot + "/" + notification - val (data, _) = zkClient.getDataAndStat(changeZnode) - data match { - case Some(d) => notificationHandler.processNotification(d) - case None => warn(s"read null data from $changeZnode when processing notification $notification") - } + processNotification(notification) lastExecutedChange = changeId } } @@ -100,6 +97,18 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient, } } + private def processNotification(notification: String): Unit = { + val changeZnode = seqNodeRoot + "/" + notification + val (data, _) = zkClient.getDataAndStat(changeZnode) + data match { + case Some(d) => Try(notificationHandler.processNotification(d)) match { + case Failure(e) => error(s"error processing change notification from $changeZnode when processing notification $notification", e) + case _ => + } + case None => warn(s"read null data from $changeZnode when processing notification $notification") + } + } + private def addChangeNotification(): Unit = { if (!isClosed.get && queue.peek() == null) queue.put(new ChangeNotification) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 4fd37d06f6428..51f1c412c5a6e 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -1008,7 +1008,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean /** * Creates colon separated Acl change notification message. * - * Kafka 2.0 saw the format of the ACL change event change from a 'colon separated' string, to a JSON message. + * Kafka 2.0 saw the format of the ACL change event change from a 2-part to a 3-part colon-separated string. * This method will create a message in the old format, which is still needed while a cluster has * 'inter.broker.protocol.version' < 2.0. * @@ -1022,7 +1022,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean } /** - * Creates JSON Acl change notification message. + * Creates an Acl change notification message. * @param resource resource pattern that has changed */ def createAclChangeNotification(resource: Resource): Unit = { diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index a63df2dfe6e36..6d40bb0d8dd9f 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -30,7 +30,6 @@ import kafka.security.auth.{Acl, Resource, ResourceType} import kafka.server.{ConfigType, DelegationTokenManager} import kafka.utils.Json import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.UnsupportedVersionException import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.resource.ResourceNameType import org.apache.kafka.common.security.auth.SecurityProtocol @@ -43,7 +42,6 @@ import scala.beans.BeanProperty import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.collection.{Seq, breakOut} -import scala.util.{Failure, Success, Try} // This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes). @@ -491,26 +489,6 @@ object AclChangeNotificationZNode { def path = "/kafka-acl-changes" } -case class AclChangeEvent(@BeanProperty @JsonProperty("version") version: Int, - @BeanProperty @JsonProperty("resourceType") resourceType: String, - @BeanProperty @JsonProperty("name") name: String, - @BeanProperty @JsonProperty("resourceNameType") resourceNameType: String) { - if (version > AclChangeEvent.currentVersion) - throw new UnsupportedVersionException(s"Acl change event received for unsupported version: $version") - - def toResource : Try[Resource] = { - for { - resType <- Try(ResourceType.fromString(resourceType)) - nameType <- Try(ResourceNameType.valueOf(resourceNameType)) - resource = Resource(resType, name, nameType) - } yield resource - } -} - -object AclChangeEvent { - val currentVersion: Int = 1 -} - object AclChangeNotificationSequenceZNode { def SequenceNumberPrefix = "acl_changes_" diff --git a/core/src/test/scala/kafka/zk/AclChangeNotificationSequenceZNodeTest.scala b/core/src/test/scala/kafka/zk/AclChangeNotificationSequenceZNodeTest.scala index 8ba5afc31575c..9b2335a20fff1 100644 --- a/core/src/test/scala/kafka/zk/AclChangeNotificationSequenceZNodeTest.scala +++ b/core/src/test/scala/kafka/zk/AclChangeNotificationSequenceZNodeTest.scala @@ -35,7 +35,7 @@ class AclChangeNotificationSequenceZNodeTest { } @Test - def shouldRoundTripLegacyString(): Unit = { + def shouldRoundTripLegacyTwoPartString(): Unit = { val bytes = AclChangeNotificationSequenceZNode.encodeLegacy(literalResource) val actual = AclChangeNotificationSequenceZNode.decode(bytes) @@ -43,7 +43,7 @@ class AclChangeNotificationSequenceZNodeTest { } @Test - def shouldRoundTripJSON(): Unit = { + def shouldRoundTripThreePartString(): Unit = { val bytes = AclChangeNotificationSequenceZNode.encode(prefixedResource) val actual = AclChangeNotificationSequenceZNode.decode(bytes) diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala index 6456e25b71aa2..cc754a35e85ee 100644 --- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala +++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala @@ -19,42 +19,43 @@ package kafka.common import kafka.security.auth.{Group, Resource} import kafka.utils.TestUtils import kafka.zk.{AclChangeNotificationSequenceZNode, AclChangeNotificationZNode, ZooKeeperTestHarness} -import org.apache.kafka.common.resource.ResourceNameType.LITERAL -import org.junit.{After, Test} +import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED} +import org.junit.{After, Before, Test} + +import scala.collection.mutable.ArrayBuffer class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness { - var notificationListener: ZkNodeChangeNotificationListener = _ + private val changeExpirationMs = 1000 + private var notificationListener: ZkNodeChangeNotificationListener = _ + private var notificationHandler: TestNotificationHandler = _ + + @Before + override def setUp(): Unit = { + super.setUp() + zkClient.createAclPaths() + notificationHandler = new TestNotificationHandler() + } @After override def tearDown(): Unit = { if (notificationListener != null) { notificationListener.close() } + super.tearDown() } @Test def testProcessNotification() { - @volatile var notification: Resource = null - @volatile var invocationCount = 0 - val notificationHandler = new NotificationHandler { - override def processNotification(notificationMessage: Array[Byte]): Unit = { - notification = AclChangeNotificationSequenceZNode.decode(notificationMessage) - invocationCount += 1 - } - } - - zkClient.createAclPaths() val notificationMessage1 = Resource(Group, "messageA", LITERAL) val notificationMessage2 = Resource(Group, "messageB", LITERAL) - val changeExpirationMs = 1000 notificationListener = new ZkNodeChangeNotificationListener(zkClient, AclChangeNotificationZNode.path, AclChangeNotificationSequenceZNode.SequenceNumberPrefix, notificationHandler, changeExpirationMs) notificationListener.init() zkClient.createAclChangeNotification(notificationMessage1) - TestUtils.waitUntilTrue(() => invocationCount == 1 && notification == notificationMessage1, + TestUtils.waitUntilTrue(() => notificationHandler.received().size == 1 && notificationHandler.received().last == notificationMessage1, "Failed to send/process notification message in the timeout period.") /* @@ -66,12 +67,42 @@ class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness { */ zkClient.createAclChangeNotification(notificationMessage2) - TestUtils.waitUntilTrue(() => invocationCount == 2 && notification == notificationMessage2, + TestUtils.waitUntilTrue(() => notificationHandler.received().size == 2 && notificationHandler.received().last == notificationMessage2, "Failed to send/process notification message in the timeout period.") (3 to 10).foreach(i => zkClient.createAclChangeNotification(Resource(Group, "message" + i, LITERAL))) - TestUtils.waitUntilTrue(() => invocationCount == 10 , - s"Expected 10 invocations of processNotifications, but there were $invocationCount") + TestUtils.waitUntilTrue(() => notificationHandler.received().size == 10, + s"Expected 10 invocations of processNotifications, but there were ${notificationHandler.received()}") + } + + @Test + def testSwallowsProcessorException() : Unit = { + notificationHandler.setThrowSize(1) + notificationListener = new ZkNodeChangeNotificationListener(zkClient, AclChangeNotificationZNode.path, + AclChangeNotificationSequenceZNode.SequenceNumberPrefix, notificationHandler, changeExpirationMs) + notificationListener.init() + + zkClient.createAclChangeNotification(Resource(Group, "messageA", PREFIXED)) + zkClient.createAclChangeNotification(Resource(Group, "messageB", LITERAL)) + + TestUtils.waitUntilTrue(() => notificationHandler.received().size == 2, + s"Expected 2 invocations of processNotifications, but there were ${notificationHandler.received()}") + } + + private class TestNotificationHandler extends NotificationHandler { + private val messages = ArrayBuffer.empty[Resource] + @volatile private var throwSize = Option.empty[Int] + + override def processNotification(notificationMessage: Array[Byte]): Unit = { + messages += AclChangeNotificationSequenceZNode.decode(notificationMessage) + + if (throwSize.contains(messages.size)) + throw new RuntimeException("Oh no, my processing failed!") + } + + def received(): Seq[Resource] = messages + + def setThrowSize(index: Int): Unit = throwSize = Option(index) } -} +} \ No newline at end of file From b919bd13dc685180eec75973a07bcf969960aa0a Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Tue, 12 Jun 2018 12:07:47 +0100 Subject: [PATCH 07/10] Switch to a hybrid: - Literal ACLs remain on the old paths, using the old formats. - Prefixed, and any latter types, go on new paths, using JSON. - Check remains in place to reject PREFIXED acls until inter.broker.protocol.version is bumped. --- .../common/resource/ResourceNameType.java | 13 ++ .../ZkNodeChangeNotificationListener.scala | 4 +- .../security/auth/SimpleAclAuthorizer.scala | 39 ++-- .../main/scala/kafka/zk/KafkaZkClient.scala | 49 ++--- core/src/main/scala/kafka/zk/ZkData.scala | 194 +++++++++++++++--- ...lChangeNotificationSequenceZNodeTest.scala | 80 -------- .../kafka/zk/ExtendedZkAclStoreTest.scala | 67 ++++++ .../kafka/zk/LiteralZkAclStoreTest.scala | 62 ++++++ ...ZkNodeChangeNotificationListenerTest.scala | 17 +- .../auth/SimpleAclAuthorizerTest.scala | 41 ++-- .../unit/kafka/zk/KafkaZkClientTest.scala | 29 ++- docs/upgrade.html | 6 - 12 files changed, 394 insertions(+), 207 deletions(-) delete mode 100644 core/src/test/scala/kafka/zk/AclChangeNotificationSequenceZNodeTest.scala create mode 100644 core/src/test/scala/kafka/zk/ExtendedZkAclStoreTest.scala create mode 100644 core/src/test/scala/kafka/zk/LiteralZkAclStoreTest.scala diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java index 7aa72170de973..0e4fc0f271216 100644 --- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java +++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java @@ -62,6 +62,12 @@ public enum ResourceNameType { .collect(Collectors.toMap(ResourceNameType::code, Function.identity())) ); + private final static Map NAME_TO_VALUE = + Collections.unmodifiableMap( + Arrays.stream(ResourceNameType.values()) + .collect(Collectors.toMap(ResourceNameType::name, Function.identity())) + ); + private final byte code; ResourceNameType(byte code) { @@ -88,4 +94,11 @@ public boolean isUnknown() { public static ResourceNameType fromCode(byte code) { return CODE_TO_VALUE.getOrDefault(code, UNKNOWN); } + + /** + * Return the ResourceNameType with the provided name or {@link #UNKNOWN} if one cannot be found. + */ + public static ResourceNameType fromString(String name) { + return NAME_TO_VALUE.getOrDefault(name, UNKNOWN); + } } diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala index 285a2ebf3d739..8ec7f95343125 100644 --- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala +++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala @@ -102,10 +102,10 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient, val (data, _) = zkClient.getDataAndStat(changeZnode) data match { case Some(d) => Try(notificationHandler.processNotification(d)) match { - case Failure(e) => error(s"error processing change notification from $changeZnode when processing notification $notification", e) + case Failure(e) => error(s"error processing change notification from $changeZnode", e) case _ => } - case None => warn(s"read null data from $changeZnode when processing notification $notification") + case None => warn(s"read null data from $changeZnode") } } diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index 5670985b352df..904516dc5e22c 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -21,13 +21,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import com.typesafe.scalalogging.Logger import kafka.api.KAFKA_2_0_IV1 -import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener} import kafka.network.RequestChannel.Session import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls import kafka.server.KafkaConfig import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ -import kafka.zk.{AclChangeNotificationSequenceZNode, AclChangeNotificationZNode, KafkaZkClient, ZkAclStore} +import kafka.zk.{AclChangeNotificationHandler, AclChangeSubscription, KafkaZkClient, ZkAclStore} import org.apache.kafka.common.errors.UnsupportedVersionException import org.apache.kafka.common.resource.ResourceNameType import org.apache.kafka.common.security.auth.KafkaPrincipal @@ -57,8 +56,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging { private var superUsers = Set.empty[KafkaPrincipal] private var shouldAllowEveryoneIfNoAclIsFound = false private var zkClient: KafkaZkClient = _ - private var aclChangeListener: ZkNodeChangeNotificationListener = _ - private var legacyChangeEvent: Boolean = _ + private var aclChangeListeners: Iterable[AclChangeSubscription] = Iterable.empty + private var extendedAclSupport: Boolean = _ @volatile private var aclCache = new scala.collection.immutable.TreeMap[Resource, VersionedAcls]()(ResourceOrdering) @@ -99,12 +98,11 @@ class SimpleAclAuthorizer extends Authorizer with Logging { zkMaxInFlightRequests, time, "kafka.security", "SimpleAclAuthorizer") zkClient.createAclPaths() - legacyChangeEvent = kafkaConfig.interBrokerProtocolVersion < KAFKA_2_0_IV1 + extendedAclSupport = kafkaConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1 loadCache() - aclChangeListener = new ZkNodeChangeNotificationListener(zkClient, AclChangeNotificationZNode.path, AclChangeNotificationSequenceZNode.SequenceNumberPrefix, AclChangedNotificationHandler) - aclChangeListener.init() + startZkChangeListeners() } override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = { @@ -167,7 +165,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging { override def addAcls(acls: Set[Acl], resource: Resource) { if (acls != null && acls.nonEmpty) { - if (legacyChangeEvent && resource.nameType == ResourceNameType.PREFIXED) { + if (!extendedAclSupport && resource.nameType == ResourceNameType.PREFIXED) { throw new UnsupportedVersionException(s"Adding ACLs on prefixed resource patterns requires " + s"${KafkaConfig.InterBrokerProtocolVersionProp} of $KAFKA_2_0_IV1 or greater") } @@ -242,26 +240,32 @@ class SimpleAclAuthorizer extends Authorizer with Logging { } def close() { - if (aclChangeListener != null) aclChangeListener.close() + aclChangeListeners.foreach(listener => listener.close()) if (zkClient != null) zkClient.close() } private def loadCache() { inWriteLock(lock) { ZkAclStore.stores.foreach(store => { - val resourceTypes = zkClient.getResourceTypes(store.nameType) + val resourceTypes = zkClient.getResourceTypes(store.patternType) for (rType <- resourceTypes) { val resourceType = ResourceType.fromString(rType) - val resourceNames = zkClient.getResourceNames(store.nameType, resourceType) + val resourceNames = zkClient.getResourceNames(store.patternType, resourceType) for (resourceName <- resourceNames) { - val versionedAcls = getAclsFromZk(new Resource(resourceType, resourceName, store.nameType)) - updateCache(new Resource(resourceType, resourceName, store.nameType), versionedAcls) + val resource = new Resource(resourceType, resourceName, store.patternType) + val versionedAcls = getAclsFromZk(resource) + updateCache(resource, versionedAcls) } } }) } } + private def startZkChangeListeners(): Unit = { + aclChangeListeners = ZkAclStore.stores + .map(store => store.changeNode.createListener(AclChangedNotificationHandler, zkClient)) + } + private def logAuditMessage(principal: KafkaPrincipal, authorized: Boolean, operation: Operation, resource: Resource, host: String) { def logMessage: String = { val authResult = if (authorized) "Allowed" else "Denied" @@ -344,9 +348,6 @@ class SimpleAclAuthorizer extends Authorizer with Logging { } private def updateAclChangedFlag(resource: Resource) { - if (legacyChangeEvent) - zkClient.createLegacyAclChangeNotification(resource) - else zkClient.createAclChangeNotification(resource) } @@ -354,10 +355,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging { retryBackoffMs + Random.nextInt(retryBackoffJitterMs) } - object AclChangedNotificationHandler extends NotificationHandler { - override def processNotification(notificationMessage: Array[Byte]) { - val resource: Resource = AclChangeNotificationSequenceZNode.decode(notificationMessage) - + object AclChangedNotificationHandler extends AclChangeNotificationHandler { + override def processNotification(resource: Resource) { inWriteLock(lock) { val versionedAcls = getAclsFromZk(resource) updateCache(resource, versionedAcls) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 51f1c412c5a6e..26a8576f04732 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -944,10 +944,9 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean * Creates the required zk nodes for Acl storage */ def createAclPaths(): Unit = { - createRecursive(AclChangeNotificationZNode.path, throwIfPathExists = false) - ZkAclStore.stores.foreach(store => { createRecursive(store.aclPath, throwIfPathExists = false) + createRecursive(store.aclChangePath, throwIfPathExists = false) ResourceType.values.foreach(resourceType => createRecursive(store.path(resourceType), throwIfPathExists = false)) }) } @@ -1005,29 +1004,13 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean } } - /** - * Creates colon separated Acl change notification message. - * - * Kafka 2.0 saw the format of the ACL change event change from a 2-part to a 3-part colon-separated string. - * This method will create a message in the old format, which is still needed while a cluster has - * 'inter.broker.protocol.version' < 2.0. - * - * @param resource resource pattern that has changed - */ - def createLegacyAclChangeNotification(resource: Resource): Unit = { - val path = AclChangeNotificationSequenceZNode.createPath - val createRequest = CreateRequest(path, AclChangeNotificationSequenceZNode.encodeLegacy(resource), acls(path), CreateMode.PERSISTENT_SEQUENTIAL) - val createResponse = retryRequestUntilConnected(createRequest) - createResponse.maybeThrow - } - /** * Creates an Acl change notification message. * @param resource resource pattern that has changed */ def createAclChangeNotification(resource: Resource): Unit = { - val path = AclChangeNotificationSequenceZNode.createPath - val createRequest = CreateRequest(path, AclChangeNotificationSequenceZNode.encode(resource), acls(path), CreateMode.PERSISTENT_SEQUENTIAL) + val aclChange = ZkAclStore(resource.nameType).changeNode.createChangeNode(resource) + val createRequest = CreateRequest(aclChange.path, aclChange.bytes, acls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL) val createResponse = retryRequestUntilConnected(createRequest) createResponse.maybeThrow } @@ -1050,21 +1033,25 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean * @throws KeeperException if there is an error while deleting Acl change notifications */ def deleteAclChangeNotifications(): Unit = { - val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(AclChangeNotificationZNode.path)) - if (getChildrenResponse.resultCode == Code.OK) { - deleteAclChangeNotifications(getChildrenResponse.children) - } else if (getChildrenResponse.resultCode != Code.NONODE) { - getChildrenResponse.maybeThrow - } + ZkAclStore.stores.foreach(store => { + val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(store.aclChangePath)) + if (getChildrenResponse.resultCode == Code.OK) { + deleteAclChangeNotifications(store.aclChangePath, getChildrenResponse.children) + } else if (getChildrenResponse.resultCode != Code.NONODE) { + getChildrenResponse.maybeThrow + } + }) } /** - * Deletes the Acl change notifications associated with the given sequence nodes - * @param sequenceNodes - */ - private def deleteAclChangeNotifications(sequenceNodes: Seq[String]): Unit = { + * Deletes the Acl change notifications associated with the given sequence nodes + * + * @param aclChangePath the root path + * @param sequenceNodes the name of the node to delete. + */ + private def deleteAclChangeNotifications(aclChangePath: String, sequenceNodes: Seq[String]): Unit = { val deleteRequests = sequenceNodes.map { sequenceNode => - DeleteRequest(AclChangeNotificationSequenceZNode.deletePath(sequenceNode), ZkVersion.NoVersion) + DeleteRequest(s"$aclChangePath/$sequenceNode", ZkVersion.NoVersion) } val deleteResponses = retryRequestsUntilConnected(deleteRequests) diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index 6d40bb0d8dd9f..1b4e93ec79bac 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -23,13 +23,14 @@ import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.core.JsonProcessingException import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr} import kafka.cluster.{Broker, EndPoint} -import kafka.common.KafkaException +import kafka.common.{KafkaException, NotificationHandler, ZkNodeChangeNotificationListener} import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch} import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls import kafka.security.auth.{Acl, Resource, ResourceType} import kafka.server.{ConfigType, DelegationTokenManager} import kafka.utils.Json import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.UnsupportedVersionException import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.resource.ResourceNameType import org.apache.kafka.common.security.auth.SecurityProtocol @@ -42,6 +43,7 @@ import scala.beans.BeanProperty import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.collection.{Seq, breakOut} +import scala.util.{Failure, Success, Try} // This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes). @@ -445,37 +447,131 @@ object StateChangeHandlers { def zkNodeChangeListenerHandler(seqNodeRoot: String) = s"change-notification-$seqNodeRoot" } +// Todo(ac): Rename ResourceNameType to PatternType (and nameType name-Type etc). + + /** - * Acls for resources are stored in ZK under a root node that is determined by the [[ResourceNameType]]. - * Under each [[ResourceNameType]] node there will be one child node per resource type (Topic, Cluster, Group, etc). - * Under each resourceType there will be a unique child for each resource path and the data for that child will contain + * Acls for resources are stored in ZK under two root paths: + *
      + *
    • [[org.apache.kafka.common.resource.ResourceNameType#LITERAL Literal]] patterns are stored under '/kafka-acl'. + * The format is JSON. See [[kafka.zk.ResourceZNode]] for details.
    • + *
    • All other patterns are stored under '/kafka-acl-extended/pattern-type'. + * The format is JSON. See [[kafka.zk.ResourceZNode]] for details.
    • + *
    + * + * Under each root node there will be one child node per resource type (Topic, Cluster, Group, etc). + * Under each resourceType there will be a unique child for each resource pattern and the data for that child will contain * list of its acls as a json object. Following gives an example: * *
    +  * // Literal patterns:
       * /kafka-acl/Topic/topic-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
       * /kafka-acl/Cluster/kafka-cluster => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
    -  * /kafka-prefixed-acl/Group/group-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
    +  *
    +  * // Prefixed patterns:
    +  * /kafka-acl-extended/PREFIXED/Group/group-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
       * 
    + * + * Acl change events are also stored under two paths: + *
      + *
    • [[org.apache.kafka.common.resource.ResourceNameType#LITERAL Literal]] patterns are stored under '/kafka-acl-changes'. + * The format is a UTF8 string in the form: <resource-type>:<resource-name>
    • + *
    • All other patterns are stored under '/kafka-acl-extended-changes/pattern-type' + * The format is JSON, as defined by [[kafka.zk.ExtendedAclChangeEvent]]
    • + *
    */ -case class ZkAclStore(nameType: ResourceNameType) { - val aclPath: String = nameType match { - case ResourceNameType.LITERAL => "/kafka-acl" - case ResourceNameType.PREFIXED => "/kafka-prefixed-acl" - case _ => throw new IllegalArgumentException("Invalid name type:" + nameType) - } +trait ZkAclStore { + val patternType: ResourceNameType + val aclPath: String + val aclChangePath: String - def path(resourceType: ResourceType) = s"$aclPath/$resourceType" + def path(resourceType: ResourceType): String = s"$aclPath/$resourceType" def path(resourceType: ResourceType, resourceName: String): String = s"$aclPath/$resourceType/$resourceName" + + def changeNode: AclChangeNotificationSequenceZNode } object ZkAclStore { - val stores: Seq[ZkAclStore] = ResourceNameType.values + private val storesByType = ResourceNameType.values .filter(nameType => nameType != ResourceNameType.ANY && nameType != ResourceNameType.UNKNOWN) - .map(nameType => ZkAclStore(nameType)) + .map(nameType => (nameType, create(nameType))) + .toMap + + val stores: Iterable[ZkAclStore] = storesByType.values + + val securePaths: Iterable[String] = stores + .flatMap(store => List(store.aclPath, store.aclChangePath)) + + def apply(patternType: ResourceNameType): ZkAclStore = { + storesByType.get(patternType) match { + case Some(store) => store + case None => throw new KafkaException(s"Invalid pattern type: $patternType") + } + } + + private def create(patternType: ResourceNameType) = { + patternType match { + case ResourceNameType.LITERAL => LiteralZkAclStore + case _ => new ExtendedZkAclStore(patternType) + } + } +} + +object LiteralZkAclStore extends ZkAclStore { + val patternType: ResourceNameType = ResourceNameType.LITERAL + val aclPath: String = "/kafka-acl" + val aclChangePath: String = "/kafka-acl-changes" + + def changeNode: AclChangeNotificationSequenceZNode = new AclChangeNotificationSequenceZNode { + def path: String = LiteralZkAclStore.aclChangePath + + def encode(resource: Resource): Array[Byte] = { + if (resource.nameType != ResourceNameType.LITERAL) + throw new IllegalArgumentException("Only literal resource patterns can be encoded") + + val legacyName = resource.resourceType + Resource.Separator + resource.name + legacyName.getBytes(UTF_8) + } + + def decode(bytes: Array[Byte]): Resource = + Resource.fromString(new String(bytes, UTF_8)) + } +} + +class ExtendedZkAclStore(val patternType: ResourceNameType) extends ZkAclStore { + if (patternType == ResourceNameType.LITERAL) + throw new IllegalArgumentException("Literal pattern types are not supported") + + val aclPath: String = s"/kafka-acl-extended/${patternType.name.toLowerCase}" + val aclChangePath: String = s"/kafka-acl-extended-changes/${patternType.name.toLowerCase}" + + def changeNode: AclChangeNotificationSequenceZNode = new AclChangeNotificationSequenceZNode { + def path: String = aclChangePath - val securePaths: Seq[String] = stores - .flatMap(store => List(store.aclPath)) ++ Seq(AclChangeNotificationZNode.path) + def encode(resource: Resource): Array[Byte] = { + if (resource.nameType == ResourceNameType.LITERAL) + throw new IllegalArgumentException("Literal pattern types are not supported") + + Json.encodeAsBytes(ExtendedAclChangeEvent( + ExtendedAclChangeEvent.currentVersion, + resource.resourceType.name, + resource.name, + resource.nameType.name)) + } + + def decode(bytes: Array[Byte]): Resource = { + val changeEvent = Json.parseBytesAs[ExtendedAclChangeEvent](bytes) match { + case Right(event) => event + case Left(e) => throw new IllegalArgumentException("Failed to parse ACL change event", e) + } + + changeEvent.toResource match { + case Success(r) => r + case Failure(e) => throw new IllegalArgumentException("Failed to convert ACL change event to resource", e) + } + } + } } object ResourceZNode { @@ -485,29 +581,67 @@ object ResourceZNode { def decode(bytes: Array[Byte], stat: Stat): VersionedAcls = VersionedAcls(Acl.fromBytes(bytes), stat.getVersion) } -object AclChangeNotificationZNode { - def path = "/kafka-acl-changes" +object ExtendedAclChangeEvent { + val currentVersion: Int = 1 +} + +case class ExtendedAclChangeEvent(@BeanProperty @JsonProperty("version") version: Int, + @BeanProperty @JsonProperty("resourceType") resourceType: String, + @BeanProperty @JsonProperty("name") name: String, + @BeanProperty @JsonProperty("resourceNameType") resourceNameType: String) { + if (version > ExtendedAclChangeEvent.currentVersion) + throw new UnsupportedVersionException(s"Acl change event received for unsupported version: $version") + + def toResource: Try[Resource] = { + for { + resType <- Try(ResourceType.fromString(resourceType)) + nameType <- Try(ResourceNameType.fromString(resourceNameType)) + resource = Resource(resType, name, nameType) + } yield resource + } +} + +trait AclChangeNotificationHandler { + def processNotification(resource: Resource): Unit } +trait AclChangeSubscription extends AutoCloseable { + def close(): Unit +} + +case class AclChangeNode(path: String, bytes: Array[Byte]) + object AclChangeNotificationSequenceZNode { def SequenceNumberPrefix = "acl_changes_" +} - def createPath = s"${AclChangeNotificationZNode.path}/$SequenceNumberPrefix" - def deletePath(sequenceNode: String) = s"${AclChangeNotificationZNode.path}/$sequenceNode" +trait AclChangeNotificationSequenceZNode { - def encodeLegacy(resource: Resource): Array[Byte] = { - if (resource.nameType != ResourceNameType.LITERAL) - throw new IllegalArgumentException("Only literal resource patterns can be encoded") + protected def path: String - val legacyName = resource.resourceType + Resource.Separator + resource.name - legacyName.getBytes(UTF_8) - } + def decode(bytes: Array[Byte]): Resource + + protected def encode(resource: Resource): Array[Byte] + + protected def createPath: String = s"$path/${AclChangeNotificationSequenceZNode.SequenceNumberPrefix}" - def encode(resource: Resource): Array[Byte] = - resource.toString.getBytes(UTF_8) + def createChangeNode(resource: Resource): AclChangeNode = AclChangeNode(createPath, encode(resource)) - def decode(bytes: Array[Byte]): Resource = - Resource.fromString(new String(bytes, UTF_8)) + def createListener(handler: AclChangeNotificationHandler, zkClient: KafkaZkClient): AclChangeSubscription = { + val rawHandler: NotificationHandler = new NotificationHandler { + def processNotification(bytes: Array[Byte]): Unit = + handler.processNotification(decode(bytes)) + } + + val aclChangeListener = new ZkNodeChangeNotificationListener( + zkClient, path, AclChangeNotificationSequenceZNode.SequenceNumberPrefix, rawHandler) + + aclChangeListener.init() + + new AclChangeSubscription { + def close(): Unit = aclChangeListener.close() + } + } } object ClusterZNode { diff --git a/core/src/test/scala/kafka/zk/AclChangeNotificationSequenceZNodeTest.scala b/core/src/test/scala/kafka/zk/AclChangeNotificationSequenceZNodeTest.scala deleted file mode 100644 index 9b2335a20fff1..0000000000000 --- a/core/src/test/scala/kafka/zk/AclChangeNotificationSequenceZNodeTest.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.zk - -import java.nio.charset.StandardCharsets.UTF_8 - -import kafka.security.auth.Resource.Separator -import kafka.security.auth.{Resource, ResourceType, Topic} -import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED} -import org.junit.Test -import org.junit.Assert.assertEquals - -class AclChangeNotificationSequenceZNodeTest { - private val literalResource = Resource(Topic, "some-topic", LITERAL) - private val prefixedResource = Resource(Topic, "some-topic", PREFIXED) - - @Test(expected = classOf[IllegalArgumentException]) - def shouldThrowFromLegacyEncodeOnNoneLiteral(): Unit = { - AclChangeNotificationSequenceZNode.encodeLegacy(prefixedResource) - } - - @Test - def shouldRoundTripLegacyTwoPartString(): Unit = { - val bytes = AclChangeNotificationSequenceZNode.encodeLegacy(literalResource) - val actual = AclChangeNotificationSequenceZNode.decode(bytes) - - assertEquals(literalResource, actual) - } - - @Test - def shouldRoundTripThreePartString(): Unit = { - val bytes = AclChangeNotificationSequenceZNode.encode(prefixedResource) - val actual = AclChangeNotificationSequenceZNode.decode(bytes) - - assertEquals(prefixedResource, actual) - } - - @Test - def shouldNotThrowIfOldBrokerParsingNewFormatWithLiteralAcl(): Unit = { - val bytes = AclChangeNotificationSequenceZNode.encode(literalResource) - val actual = legacyDecode(bytes) - - assertEquals(Resource(Topic, "LITERAL:some-topic", LITERAL), actual) - } - - @Test - def shouldNotThrowIfOldBrokerParsingNewFormatWithPrefixedAcl(): Unit = { - val bytes = AclChangeNotificationSequenceZNode.encode(prefixedResource) - val actual = legacyDecode(bytes) - - assertEquals(Resource(Topic, "PREFIXED:some-topic", LITERAL), actual) - } - - private def legacyDecode(bytes: Array[Byte]): Resource = - legacyFromString(new String(bytes, UTF_8)) - - //noinspection ScalaDeprecation - // Old version of kafka.auth.Resource.fromString used in pre-2.0 Kafka: - private def legacyFromString(str: String): Resource = { - str.split(Separator, 2) match { - case Array(resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name) - case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str) - } - } -} \ No newline at end of file diff --git a/core/src/test/scala/kafka/zk/ExtendedZkAclStoreTest.scala b/core/src/test/scala/kafka/zk/ExtendedZkAclStoreTest.scala new file mode 100644 index 0000000000000..6b0c7f8fe0c68 --- /dev/null +++ b/core/src/test/scala/kafka/zk/ExtendedZkAclStoreTest.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.zk + +import kafka.security.auth.{Resource, Topic} +import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED} +import org.junit.Assert.assertEquals +import org.junit.Test + +class ExtendedZkAclStoreTest { + private val literalResource = Resource(Topic, "some-topic", LITERAL) + private val prefixedResource = Resource(Topic, "some-topic", PREFIXED) + private val store = new ExtendedZkAclStore(PREFIXED) + + @Test + def shouldHaveCorrectPaths(): Unit = { + assertEquals("/kafka-acl-extended/prefixed", store.aclPath) + assertEquals("/kafka-acl-extended/prefixed/Topic", store.path(Topic)) + assertEquals("/kafka-acl-extended-changes/prefixed", store.aclChangePath) + } + + @Test + def shouldHaveCorrectPatternType(): Unit = { + assertEquals(PREFIXED, store.patternType) + } + + @Test(expected = classOf[IllegalArgumentException]) + def shouldThrowIfConstructedWithLiteral(): Unit = { + new ExtendedZkAclStore(LITERAL) + } + + @Test(expected = classOf[IllegalArgumentException]) + def shouldThrowFromEncodeOnLiteral(): Unit = { + store.changeNode.createChangeNode(literalResource) + } + + @Test + def shouldWriteChangesToTheWritePath(): Unit = { + val changeNode = store.changeNode.createChangeNode(prefixedResource) + + assertEquals("/kafka-acl-extended-changes/prefixed/acl_changes_", changeNode.path) + } + + @Test + def shouldRoundTripChangeNode(): Unit = { + val changeNode = store.changeNode.createChangeNode(prefixedResource) + + val actual = store.changeNode.decode(changeNode.bytes) + + assertEquals(prefixedResource, actual) + } +} \ No newline at end of file diff --git a/core/src/test/scala/kafka/zk/LiteralZkAclStoreTest.scala b/core/src/test/scala/kafka/zk/LiteralZkAclStoreTest.scala new file mode 100644 index 0000000000000..777c216aabfc9 --- /dev/null +++ b/core/src/test/scala/kafka/zk/LiteralZkAclStoreTest.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.zk + +import kafka.security.auth.{Resource, Topic} +import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED} +import org.junit.Assert.assertEquals +import org.junit.Test + +class LiteralZkAclStoreTest { + private val literalResource = Resource(Topic, "some-topic", LITERAL) + private val prefixedResource = Resource(Topic, "some-topic", PREFIXED) + private val store = LiteralZkAclStore + + @Test + def shouldHaveCorrectPaths(): Unit = { + assertEquals("/kafka-acl", store.aclPath) + assertEquals("/kafka-acl/Topic", store.path(Topic)) + assertEquals("/kafka-acl-changes", store.aclChangePath) + } + + @Test + def shouldHaveCorrectPatternType(): Unit = { + assertEquals(LITERAL, store.patternType) + } + + @Test(expected = classOf[IllegalArgumentException]) + def shouldThrowFromEncodeOnNoneLiteral(): Unit = { + store.changeNode.createChangeNode(prefixedResource) + } + + @Test + def shouldWriteChangesToTheWritePath(): Unit = { + val changeNode = store.changeNode.createChangeNode(literalResource) + + assertEquals("/kafka-acl-changes/acl_changes_", changeNode.path) + } + + @Test + def shouldRoundTripChangeNode(): Unit = { + val changeNode = store.changeNode.createChangeNode(literalResource) + + val actual = store.changeNode.decode(changeNode.bytes) + + assertEquals(literalResource, actual) + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala index cc754a35e85ee..301cb3e5dce6d 100644 --- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala +++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala @@ -18,8 +18,8 @@ package kafka.common import kafka.security.auth.{Group, Resource} import kafka.utils.TestUtils -import kafka.zk.{AclChangeNotificationSequenceZNode, AclChangeNotificationZNode, ZooKeeperTestHarness} -import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED} +import kafka.zk.{AclChangeNotificationSequenceZNode, LiteralZkAclStore, ZooKeeperTestHarness} +import org.apache.kafka.common.resource.ResourceNameType.LITERAL import org.junit.{After, Before, Test} import scala.collection.mutable.ArrayBuffer @@ -50,7 +50,7 @@ class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness { val notificationMessage1 = Resource(Group, "messageA", LITERAL) val notificationMessage2 = Resource(Group, "messageB", LITERAL) - notificationListener = new ZkNodeChangeNotificationListener(zkClient, AclChangeNotificationZNode.path, + notificationListener = new ZkNodeChangeNotificationListener(zkClient, LiteralZkAclStore.aclChangePath, AclChangeNotificationSequenceZNode.SequenceNumberPrefix, notificationHandler, changeExpirationMs) notificationListener.init() @@ -78,15 +78,16 @@ class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness { @Test def testSwallowsProcessorException() : Unit = { - notificationHandler.setThrowSize(1) - notificationListener = new ZkNodeChangeNotificationListener(zkClient, AclChangeNotificationZNode.path, + notificationHandler.setThrowSize(2) + notificationListener = new ZkNodeChangeNotificationListener(zkClient, LiteralZkAclStore.aclChangePath, AclChangeNotificationSequenceZNode.SequenceNumberPrefix, notificationHandler, changeExpirationMs) notificationListener.init() - zkClient.createAclChangeNotification(Resource(Group, "messageA", PREFIXED)) + zkClient.createAclChangeNotification(Resource(Group, "messageA", LITERAL)) zkClient.createAclChangeNotification(Resource(Group, "messageB", LITERAL)) + zkClient.createAclChangeNotification(Resource(Group, "messageC", LITERAL)) - TestUtils.waitUntilTrue(() => notificationHandler.received().size == 2, + TestUtils.waitUntilTrue(() => notificationHandler.received().size == 3, s"Expected 2 invocations of processNotifications, but there were ${notificationHandler.received()}") } @@ -95,7 +96,7 @@ class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness { @volatile private var throwSize = Option.empty[Int] override def processNotification(notificationMessage: Array[Byte]): Unit = { - messages += AclChangeNotificationSequenceZNode.decode(notificationMessage) + messages += LiteralZkAclStore.changeNode.decode(notificationMessage) if (throwSize.contains(messages.size)) throw new RuntimeException("Oh no, my processing failed!") diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala index 98606f03e2122..6636698716052 100644 --- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala @@ -25,9 +25,10 @@ import kafka.network.RequestChannel.Session import kafka.security.auth.Acl.{WildCardHost, WildCardResource} import kafka.server.KafkaConfig import kafka.utils.TestUtils -import kafka.zk.{AclChangeNotificationSequenceZNode, AclChangeNotificationZNode, ZooKeeperTestHarness} +import kafka.zk.{ZkAclStore, ZooKeeperTestHarness} import kafka.zookeeper.{GetChildrenRequest, GetDataRequest, ZooKeeperClient} import org.apache.kafka.common.errors.UnsupportedVersionException +import org.apache.kafka.common.resource.ResourceNameType import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.Time @@ -570,40 +571,53 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { } @Test - def testWritesAclChangeEventAsNewFormatIfInterBrokerProtocolNotSet(): Unit = { + def testWritesExtendedAclChangeEventIfInterBrokerProtocolNotSet(): Unit = { givenAuthorizerWithProtocolVersion(Option.empty) val resource = Resource(Topic, "z_other", PREFIXED) - val expected = new String(AclChangeNotificationSequenceZNode.encode(resource), UTF_8) + val expected = new String(ZkAclStore(PREFIXED).changeNode.createChangeNode(resource).bytes, UTF_8) simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource) - val actual = getAclChangeEventAsString + val actual = getAclChangeEventAsString(PREFIXED) assertEquals(expected, actual) } @Test - def testWritesAclChangeEventAsNewFormatWhenInterBrokerProtocolAtLeastKafka20V1(): Unit = { + def testWritesExtendedAclChangeEventWhenInterBrokerProtocolAtLeastKafkaV2(): Unit = { givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV1)) val resource = Resource(Topic, "z_other", PREFIXED) - val expected = new String(AclChangeNotificationSequenceZNode.encode(resource), UTF_8) + val expected = new String(ZkAclStore(PREFIXED).changeNode.createChangeNode(resource).bytes, UTF_8) simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource) - val actual = getAclChangeEventAsString + val actual = getAclChangeEventAsString(PREFIXED) assertEquals(expected, actual) } @Test - def testWritesAclChangeEventAsLegacyFormatWhenInterBrokerProtocolLessThanKafka20V1(): Unit = { + def testWritesLiteralWritesLiteralAclChangeEventWhenInterBrokerProtocolLessThanKafkaV2eralAclChangesForOlderProtocolVersions(): Unit = { givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV0)) val resource = Resource(Topic, "z_other", LITERAL) - val expected = new String(AclChangeNotificationSequenceZNode.encodeLegacy(resource), UTF_8) + val expected = new String(ZkAclStore(LITERAL).changeNode.createChangeNode(resource).bytes, UTF_8) simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource) - val actual = getAclChangeEventAsString + val actual = getAclChangeEventAsString(LITERAL) + + assertEquals(expected, actual) + } + + @Test + def testWritesLiteralAclChangeEventWhenInterBrokerProtocolIsKafkaV2(): Unit = { + givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV1)) + val resource = Resource(Topic, "z_other", LITERAL) + val expected = new String(ZkAclStore(LITERAL).changeNode.createChangeNode(resource).bytes, UTF_8) + + simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource) + + val actual = getAclChangeEventAsString(LITERAL) assertEquals(expected, actual) } @@ -620,12 +634,13 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { simpleAclAuthorizer.configure(config.originals) } - private def getAclChangeEventAsString = { - val children = zooKeeperClient.handleRequest(GetChildrenRequest(AclChangeNotificationZNode.path)) + private def getAclChangeEventAsString(patternType: ResourceNameType) = { + val store = ZkAclStore(patternType) + val children = zooKeeperClient.handleRequest(GetChildrenRequest(store.aclChangePath)) children.maybeThrow() assertEquals("Expecting 1 change event", 1, children.children.size) - val data = zooKeeperClient.handleRequest(GetDataRequest(s"${AclChangeNotificationZNode.path}/${children.children.head}")) + val data = zooKeeperClient.handleRequest(GetDataRequest(s"${store.aclChangePath}/${children.children.head}")) data.maybeThrow() new String(data.data, UTF_8) diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index 844690f13cb2e..021c44c5f45c3 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -42,7 +42,6 @@ import scala.util.Random import kafka.controller.LeaderIsrAndControllerEpoch import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult import kafka.zookeeper._ -import org.apache.kafka.common.resource.ResourceNameType import org.apache.kafka.common.security.JaasUtils import org.apache.zookeeper.data.Stat @@ -427,23 +426,22 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @Test def testAclManagementMethods() { - assertFalse(zkClient.pathExists(AclChangeNotificationZNode.path)) ZkAclStore.stores.foreach(store => { assertFalse(zkClient.pathExists(store.aclPath)) + assertFalse(zkClient.pathExists(store.aclChangePath)) ResourceType.values.foreach(resource => assertFalse(zkClient.pathExists(store.path(resource)))) }) // create acl paths zkClient.createAclPaths - assertTrue(zkClient.pathExists(AclChangeNotificationZNode.path)) - ZkAclStore.stores.foreach(store => { assertTrue(zkClient.pathExists(store.aclPath)) + assertTrue(zkClient.pathExists(store.aclChangePath)) ResourceType.values.foreach(resource => assertTrue(zkClient.pathExists(store.path(resource)))) - val resource1 = new Resource(Topic, UUID.randomUUID().toString, store.nameType) - val resource2 = new Resource(Topic, UUID.randomUUID().toString, store.nameType) + val resource1 = new Resource(Topic, UUID.randomUUID().toString, store.patternType) + val resource2 = new Resource(Topic, UUID.randomUUID().toString, store.patternType) // try getting acls for non-existing resource var versionedAcls = zkClient.getVersionedAclsForResource(resource1) @@ -473,10 +471,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { assertEquals(1, versionedAcls.zkVersion) //get resource Types - assertTrue(ResourceType.values.map( rt => rt.name).toSet == zkClient.getResourceTypes(store.nameType).toSet) + assertTrue(ResourceType.values.map( rt => rt.name).toSet == zkClient.getResourceTypes(store.patternType).toSet) //get resource name - val resourceNames = zkClient.getResourceNames(store.nameType, Topic) + val resourceNames = zkClient.getResourceNames(store.patternType, Topic) assertEquals(2, resourceNames.size) assertTrue(Set(resource1.name,resource2.name) == resourceNames.toSet) @@ -489,17 +487,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { //delete with valid expected zk version assertTrue(zkClient.conditionalDelete(resource2, 0)) - zkClient.createAclChangeNotification(Resource(Group, "resource1", store.nameType)) - zkClient.createAclChangeNotification(Resource(Topic, "resource2", store.nameType)) - }) - - val expectedChangeEvents = ResourceNameType.values() - .count(nameType => nameType != ResourceNameType.ANY && nameType != ResourceNameType.UNKNOWN) * 2 + zkClient.createAclChangeNotification(Resource(Group, "resource1", store.patternType)) + zkClient.createAclChangeNotification(Resource(Topic, "resource2", store.patternType)) - assertEquals(expectedChangeEvents, zkClient.getChildren(AclChangeNotificationZNode.path).size) + assertEquals(2, zkClient.getChildren(store.aclChangePath).size) - zkClient.deleteAclChangeNotifications() - assertTrue(zkClient.getChildren(AclChangeNotificationZNode.path).isEmpty) + zkClient.deleteAclChangeNotifications() + assertTrue(zkClient.getChildren(store.aclChangePath).isEmpty) + }) } @Test diff --git a/docs/upgrade.html b/docs/upgrade.html index d77657af929b6..228798860b871 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -61,12 +61,6 @@

    Upgrading from 0.8.x, 0.9.x, 0.1 Similarly for the message format version.
  • If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguities. Hot-swapping the jar-file only might not work.
  • -
  • Do not use the 2.0 AclCommand tool, a.k.a kafka-acl script, until all brokers are upgraded to 2.0. - You can continue to use the previous version of the tool while the cluster is upgrading. - ACLs added by the 2.0 AclCommand tool will be ignored by older brokers. - (This is due to a change in the ACL change event format introduced as part of - KIP-290) -
  • ACLs should not be added to prefixed resources, (added in KIP-290), until all brokers in the cluster have been updated. From 328030376f287d1515dd411344c33309c23404ff Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Tue, 12 Jun 2018 15:57:57 +0100 Subject: [PATCH 08/10] Remove todo --- core/src/main/scala/kafka/zk/ZkData.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index 1b4e93ec79bac..e45902d50b05d 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -447,9 +447,6 @@ object StateChangeHandlers { def zkNodeChangeListenerHandler(seqNodeRoot: String) = s"change-notification-$seqNodeRoot" } -// Todo(ac): Rename ResourceNameType to PatternType (and nameType name-Type etc). - - /** * Acls for resources are stored in ZK under two root paths: *
      From ae50f4e0427fbc0e20840415616135966bb14d9f Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Tue, 12 Jun 2018 17:19:34 +0100 Subject: [PATCH 09/10] Split ZkAclChangeStore from ZkAclStore, so that we can have two ZkAclChangeStores, (one each for '/kafka-acl-changes' and 'kafka-acl-extended-changes'), and ZkAclStores per Pattern type. --- .../security/auth/SimpleAclAuthorizer.scala | 6 +- .../main/scala/kafka/zk/KafkaZkClient.scala | 7 +- core/src/main/scala/kafka/zk/ZkData.scala | 173 +++++++++--------- ...eTest.scala => ExtendedAclStoreTest.scala} | 18 +- ...reTest.scala => LiteralAclStoreTest.scala} | 14 +- ...ZkNodeChangeNotificationListenerTest.scala | 12 +- .../auth/SimpleAclAuthorizerTest.scala | 12 +- .../unit/kafka/zk/KafkaZkClientTest.scala | 8 +- 8 files changed, 127 insertions(+), 123 deletions(-) rename core/src/test/scala/kafka/zk/{ExtendedZkAclStoreTest.scala => ExtendedAclStoreTest.scala} (76%) rename core/src/test/scala/kafka/zk/{LiteralZkAclStoreTest.scala => LiteralAclStoreTest.scala} (81%) diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index 904516dc5e22c..cecad0ed763e3 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -26,7 +26,7 @@ import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls import kafka.server.KafkaConfig import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ -import kafka.zk.{AclChangeNotificationHandler, AclChangeSubscription, KafkaZkClient, ZkAclStore} +import kafka.zk.{AclChangeNotificationHandler, AclChangeSubscription, KafkaZkClient, ZkAclChangeStore, ZkAclStore} import org.apache.kafka.common.errors.UnsupportedVersionException import org.apache.kafka.common.resource.ResourceNameType import org.apache.kafka.common.security.auth.KafkaPrincipal @@ -262,8 +262,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging { } private def startZkChangeListeners(): Unit = { - aclChangeListeners = ZkAclStore.stores - .map(store => store.changeNode.createListener(AclChangedNotificationHandler, zkClient)) + aclChangeListeners = ZkAclChangeStore.stores + .map(store => store.createListener(AclChangedNotificationHandler, zkClient)) } private def logAuditMessage(principal: KafkaPrincipal, authorized: Boolean, operation: Operation, resource: Resource, host: String) { diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 26a8576f04732..97ece58a22df2 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -946,9 +946,10 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean def createAclPaths(): Unit = { ZkAclStore.stores.foreach(store => { createRecursive(store.aclPath, throwIfPathExists = false) - createRecursive(store.aclChangePath, throwIfPathExists = false) ResourceType.values.foreach(resourceType => createRecursive(store.path(resourceType), throwIfPathExists = false)) }) + + ZkAclChangeStore.stores.foreach(store => createRecursive(store.aclChangePath, throwIfPathExists = false)) } /** @@ -1009,7 +1010,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean * @param resource resource pattern that has changed */ def createAclChangeNotification(resource: Resource): Unit = { - val aclChange = ZkAclStore(resource.nameType).changeNode.createChangeNode(resource) + val aclChange = ZkAclStore(resource.nameType).changeStore.createChangeNode(resource) val createRequest = CreateRequest(aclChange.path, aclChange.bytes, acls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL) val createResponse = retryRequestUntilConnected(createRequest) createResponse.maybeThrow @@ -1033,7 +1034,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean * @throws KeeperException if there is an error while deleting Acl change notifications */ def deleteAclChangeNotifications(): Unit = { - ZkAclStore.stores.foreach(store => { + ZkAclChangeStore.stores.foreach(store => { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(store.aclChangePath)) if (getChildrenResponse.resultCode == Code.OK) { deleteAclChangeNotifications(store.aclChangePath, getChildrenResponse.children) diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index e45902d50b05d..c9c2e43e620a7 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -477,16 +477,15 @@ object StateChangeHandlers { * The format is JSON, as defined by [[kafka.zk.ExtendedAclChangeEvent]] *
    */ -trait ZkAclStore { +sealed trait ZkAclStore { val patternType: ResourceNameType val aclPath: String - val aclChangePath: String def path(resourceType: ResourceType): String = s"$aclPath/$resourceType" def path(resourceType: ResourceType, resourceName: String): String = s"$aclPath/$resourceType/$resourceName" - def changeNode: AclChangeNotificationSequenceZNode + def changeStore: ZkAclChangeStore } object ZkAclStore { @@ -498,7 +497,7 @@ object ZkAclStore { val stores: Iterable[ZkAclStore] = storesByType.values val securePaths: Iterable[String] = stores - .flatMap(store => List(store.aclPath, store.aclChangePath)) + .flatMap(store => Set(store.aclPath, store.changeStore.aclChangePath)) def apply(patternType: ResourceNameType): ZkAclStore = { storesByType.get(patternType) match { @@ -509,64 +508,111 @@ object ZkAclStore { private def create(patternType: ResourceNameType) = { patternType match { - case ResourceNameType.LITERAL => LiteralZkAclStore - case _ => new ExtendedZkAclStore(patternType) + case ResourceNameType.LITERAL => LiteralAclStore + case _ => new ExtendedAclStore(patternType) } } } -object LiteralZkAclStore extends ZkAclStore { +object LiteralAclStore extends ZkAclStore { val patternType: ResourceNameType = ResourceNameType.LITERAL val aclPath: String = "/kafka-acl" - val aclChangePath: String = "/kafka-acl-changes" - def changeNode: AclChangeNotificationSequenceZNode = new AclChangeNotificationSequenceZNode { - def path: String = LiteralZkAclStore.aclChangePath + def changeStore: ZkAclChangeStore = LiteralAclChangeStore +} + +class ExtendedAclStore(val patternType: ResourceNameType) extends ZkAclStore { + if (patternType == ResourceNameType.LITERAL) + throw new IllegalArgumentException("Literal pattern types are not supported") + + val aclPath: String = s"/kafka-acl-extended/${patternType.name.toLowerCase}" + + def changeStore: ZkAclChangeStore = ExtendedAclChangeStore +} + +trait AclChangeNotificationHandler { + def processNotification(resource: Resource): Unit +} + +trait AclChangeSubscription extends AutoCloseable { + def close(): Unit +} + +case class AclChangeNode(path: String, bytes: Array[Byte]) + +sealed trait ZkAclChangeStore { + val aclChangePath: String + def createPath: String = s"$aclChangePath/${ZkAclChangeStore.SequenceNumberPrefix}" - def encode(resource: Resource): Array[Byte] = { - if (resource.nameType != ResourceNameType.LITERAL) - throw new IllegalArgumentException("Only literal resource patterns can be encoded") + def decode(bytes: Array[Byte]): Resource - val legacyName = resource.resourceType + Resource.Separator + resource.name - legacyName.getBytes(UTF_8) + protected def encode(resource: Resource): Array[Byte] + + def createChangeNode(resource: Resource): AclChangeNode = AclChangeNode(createPath, encode(resource)) + + def createListener(handler: AclChangeNotificationHandler, zkClient: KafkaZkClient): AclChangeSubscription = { + val rawHandler: NotificationHandler = new NotificationHandler { + def processNotification(bytes: Array[Byte]): Unit = + handler.processNotification(decode(bytes)) } - def decode(bytes: Array[Byte]): Resource = - Resource.fromString(new String(bytes, UTF_8)) + val aclChangeListener = new ZkNodeChangeNotificationListener( + zkClient, aclChangePath, ZkAclChangeStore.SequenceNumberPrefix, rawHandler) + + aclChangeListener.init() + + new AclChangeSubscription { + def close(): Unit = aclChangeListener.close() + } } } -class ExtendedZkAclStore(val patternType: ResourceNameType) extends ZkAclStore { - if (patternType == ResourceNameType.LITERAL) - throw new IllegalArgumentException("Literal pattern types are not supported") +object ZkAclChangeStore { + val stores: Iterable[ZkAclChangeStore] = List(LiteralAclChangeStore, ExtendedAclChangeStore) - val aclPath: String = s"/kafka-acl-extended/${patternType.name.toLowerCase}" - val aclChangePath: String = s"/kafka-acl-extended-changes/${patternType.name.toLowerCase}" + def SequenceNumberPrefix = "acl_changes_" +} - def changeNode: AclChangeNotificationSequenceZNode = new AclChangeNotificationSequenceZNode { - def path: String = aclChangePath +case object LiteralAclChangeStore extends ZkAclChangeStore { + val name = "LiteralAclChangeStore" + val aclChangePath: String = "/kafka-acl-changes" - def encode(resource: Resource): Array[Byte] = { - if (resource.nameType == ResourceNameType.LITERAL) - throw new IllegalArgumentException("Literal pattern types are not supported") + def encode(resource: Resource): Array[Byte] = { + if (resource.nameType != ResourceNameType.LITERAL) + throw new IllegalArgumentException("Only literal resource patterns can be encoded") - Json.encodeAsBytes(ExtendedAclChangeEvent( - ExtendedAclChangeEvent.currentVersion, - resource.resourceType.name, - resource.name, - resource.nameType.name)) - } + val legacyName = resource.resourceType + Resource.Separator + resource.name + legacyName.getBytes(UTF_8) + } - def decode(bytes: Array[Byte]): Resource = { - val changeEvent = Json.parseBytesAs[ExtendedAclChangeEvent](bytes) match { - case Right(event) => event - case Left(e) => throw new IllegalArgumentException("Failed to parse ACL change event", e) - } + def decode(bytes: Array[Byte]): Resource = + Resource.fromString(new String(bytes, UTF_8)) +} - changeEvent.toResource match { - case Success(r) => r - case Failure(e) => throw new IllegalArgumentException("Failed to convert ACL change event to resource", e) - } +case object ExtendedAclChangeStore extends ZkAclChangeStore { + val name = "ExtendedAclChangeStore" + val aclChangePath: String = "/kafka-acl-extended-changes" + + def encode(resource: Resource): Array[Byte] = { + if (resource.nameType == ResourceNameType.LITERAL) + throw new IllegalArgumentException("Literal pattern types are not supported") + + Json.encodeAsBytes(ExtendedAclChangeEvent( + ExtendedAclChangeEvent.currentVersion, + resource.resourceType.name, + resource.name, + resource.nameType.name)) + } + + def decode(bytes: Array[Byte]): Resource = { + val changeEvent = Json.parseBytesAs[ExtendedAclChangeEvent](bytes) match { + case Right(event) => event + case Left(e) => throw new IllegalArgumentException("Failed to parse ACL change event", e) + } + + changeEvent.toResource match { + case Success(r) => r + case Failure(e) => throw new IllegalArgumentException("Failed to convert ACL change event to resource", e) } } } @@ -598,49 +644,6 @@ case class ExtendedAclChangeEvent(@BeanProperty @JsonProperty("version") version } } -trait AclChangeNotificationHandler { - def processNotification(resource: Resource): Unit -} - -trait AclChangeSubscription extends AutoCloseable { - def close(): Unit -} - -case class AclChangeNode(path: String, bytes: Array[Byte]) - -object AclChangeNotificationSequenceZNode { - def SequenceNumberPrefix = "acl_changes_" -} - -trait AclChangeNotificationSequenceZNode { - - protected def path: String - - def decode(bytes: Array[Byte]): Resource - - protected def encode(resource: Resource): Array[Byte] - - protected def createPath: String = s"$path/${AclChangeNotificationSequenceZNode.SequenceNumberPrefix}" - - def createChangeNode(resource: Resource): AclChangeNode = AclChangeNode(createPath, encode(resource)) - - def createListener(handler: AclChangeNotificationHandler, zkClient: KafkaZkClient): AclChangeSubscription = { - val rawHandler: NotificationHandler = new NotificationHandler { - def processNotification(bytes: Array[Byte]): Unit = - handler.processNotification(decode(bytes)) - } - - val aclChangeListener = new ZkNodeChangeNotificationListener( - zkClient, path, AclChangeNotificationSequenceZNode.SequenceNumberPrefix, rawHandler) - - aclChangeListener.init() - - new AclChangeSubscription { - def close(): Unit = aclChangeListener.close() - } - } -} - object ClusterZNode { def path = "/cluster" } diff --git a/core/src/test/scala/kafka/zk/ExtendedZkAclStoreTest.scala b/core/src/test/scala/kafka/zk/ExtendedAclStoreTest.scala similarity index 76% rename from core/src/test/scala/kafka/zk/ExtendedZkAclStoreTest.scala rename to core/src/test/scala/kafka/zk/ExtendedAclStoreTest.scala index 6b0c7f8fe0c68..4e8580b63e4ba 100644 --- a/core/src/test/scala/kafka/zk/ExtendedZkAclStoreTest.scala +++ b/core/src/test/scala/kafka/zk/ExtendedAclStoreTest.scala @@ -22,16 +22,16 @@ import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED} import org.junit.Assert.assertEquals import org.junit.Test -class ExtendedZkAclStoreTest { +class ExtendedAclStoreTest { private val literalResource = Resource(Topic, "some-topic", LITERAL) private val prefixedResource = Resource(Topic, "some-topic", PREFIXED) - private val store = new ExtendedZkAclStore(PREFIXED) + private val store = new ExtendedAclStore(PREFIXED) @Test def shouldHaveCorrectPaths(): Unit = { assertEquals("/kafka-acl-extended/prefixed", store.aclPath) assertEquals("/kafka-acl-extended/prefixed/Topic", store.path(Topic)) - assertEquals("/kafka-acl-extended-changes/prefixed", store.aclChangePath) + assertEquals("/kafka-acl-extended-changes", store.changeStore.aclChangePath) } @Test @@ -41,26 +41,26 @@ class ExtendedZkAclStoreTest { @Test(expected = classOf[IllegalArgumentException]) def shouldThrowIfConstructedWithLiteral(): Unit = { - new ExtendedZkAclStore(LITERAL) + new ExtendedAclStore(LITERAL) } @Test(expected = classOf[IllegalArgumentException]) def shouldThrowFromEncodeOnLiteral(): Unit = { - store.changeNode.createChangeNode(literalResource) + store.changeStore.createChangeNode(literalResource) } @Test def shouldWriteChangesToTheWritePath(): Unit = { - val changeNode = store.changeNode.createChangeNode(prefixedResource) + val changeNode = store.changeStore.createChangeNode(prefixedResource) - assertEquals("/kafka-acl-extended-changes/prefixed/acl_changes_", changeNode.path) + assertEquals("/kafka-acl-extended-changes/acl_changes_", changeNode.path) } @Test def shouldRoundTripChangeNode(): Unit = { - val changeNode = store.changeNode.createChangeNode(prefixedResource) + val changeNode = store.changeStore.createChangeNode(prefixedResource) - val actual = store.changeNode.decode(changeNode.bytes) + val actual = store.changeStore.decode(changeNode.bytes) assertEquals(prefixedResource, actual) } diff --git a/core/src/test/scala/kafka/zk/LiteralZkAclStoreTest.scala b/core/src/test/scala/kafka/zk/LiteralAclStoreTest.scala similarity index 81% rename from core/src/test/scala/kafka/zk/LiteralZkAclStoreTest.scala rename to core/src/test/scala/kafka/zk/LiteralAclStoreTest.scala index 777c216aabfc9..22d6f23547b56 100644 --- a/core/src/test/scala/kafka/zk/LiteralZkAclStoreTest.scala +++ b/core/src/test/scala/kafka/zk/LiteralAclStoreTest.scala @@ -22,16 +22,16 @@ import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED} import org.junit.Assert.assertEquals import org.junit.Test -class LiteralZkAclStoreTest { +class LiteralAclStoreTest { private val literalResource = Resource(Topic, "some-topic", LITERAL) private val prefixedResource = Resource(Topic, "some-topic", PREFIXED) - private val store = LiteralZkAclStore + private val store = LiteralAclStore @Test def shouldHaveCorrectPaths(): Unit = { assertEquals("/kafka-acl", store.aclPath) assertEquals("/kafka-acl/Topic", store.path(Topic)) - assertEquals("/kafka-acl-changes", store.aclChangePath) + assertEquals("/kafka-acl-changes", store.changeStore.aclChangePath) } @Test @@ -41,21 +41,21 @@ class LiteralZkAclStoreTest { @Test(expected = classOf[IllegalArgumentException]) def shouldThrowFromEncodeOnNoneLiteral(): Unit = { - store.changeNode.createChangeNode(prefixedResource) + store.changeStore.createChangeNode(prefixedResource) } @Test def shouldWriteChangesToTheWritePath(): Unit = { - val changeNode = store.changeNode.createChangeNode(literalResource) + val changeNode = store.changeStore.createChangeNode(literalResource) assertEquals("/kafka-acl-changes/acl_changes_", changeNode.path) } @Test def shouldRoundTripChangeNode(): Unit = { - val changeNode = store.changeNode.createChangeNode(literalResource) + val changeNode = store.changeStore.createChangeNode(literalResource) - val actual = store.changeNode.decode(changeNode.bytes) + val actual = store.changeStore.decode(changeNode.bytes) assertEquals(literalResource, actual) } diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala index 301cb3e5dce6d..58f0962b7bdec 100644 --- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala +++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala @@ -18,7 +18,7 @@ package kafka.common import kafka.security.auth.{Group, Resource} import kafka.utils.TestUtils -import kafka.zk.{AclChangeNotificationSequenceZNode, LiteralZkAclStore, ZooKeeperTestHarness} +import kafka.zk.{LiteralAclChangeStore, LiteralAclStore, ZkAclChangeStore, ZooKeeperTestHarness} import org.apache.kafka.common.resource.ResourceNameType.LITERAL import org.junit.{After, Before, Test} @@ -50,8 +50,8 @@ class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness { val notificationMessage1 = Resource(Group, "messageA", LITERAL) val notificationMessage2 = Resource(Group, "messageB", LITERAL) - notificationListener = new ZkNodeChangeNotificationListener(zkClient, LiteralZkAclStore.aclChangePath, - AclChangeNotificationSequenceZNode.SequenceNumberPrefix, notificationHandler, changeExpirationMs) + notificationListener = new ZkNodeChangeNotificationListener(zkClient, LiteralAclChangeStore.aclChangePath, + ZkAclChangeStore.SequenceNumberPrefix, notificationHandler, changeExpirationMs) notificationListener.init() zkClient.createAclChangeNotification(notificationMessage1) @@ -79,8 +79,8 @@ class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness { @Test def testSwallowsProcessorException() : Unit = { notificationHandler.setThrowSize(2) - notificationListener = new ZkNodeChangeNotificationListener(zkClient, LiteralZkAclStore.aclChangePath, - AclChangeNotificationSequenceZNode.SequenceNumberPrefix, notificationHandler, changeExpirationMs) + notificationListener = new ZkNodeChangeNotificationListener(zkClient, LiteralAclChangeStore.aclChangePath, + ZkAclChangeStore.SequenceNumberPrefix, notificationHandler, changeExpirationMs) notificationListener.init() zkClient.createAclChangeNotification(Resource(Group, "messageA", LITERAL)) @@ -96,7 +96,7 @@ class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness { @volatile private var throwSize = Option.empty[Int] override def processNotification(notificationMessage: Array[Byte]): Unit = { - messages += LiteralZkAclStore.changeNode.decode(notificationMessage) + messages += LiteralAclStore.changeStore.decode(notificationMessage) if (throwSize.contains(messages.size)) throw new RuntimeException("Oh no, my processing failed!") diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala index 6636698716052..b3012712c7a6c 100644 --- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala @@ -574,7 +574,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { def testWritesExtendedAclChangeEventIfInterBrokerProtocolNotSet(): Unit = { givenAuthorizerWithProtocolVersion(Option.empty) val resource = Resource(Topic, "z_other", PREFIXED) - val expected = new String(ZkAclStore(PREFIXED).changeNode.createChangeNode(resource).bytes, UTF_8) + val expected = new String(ZkAclStore(PREFIXED).changeStore.createChangeNode(resource).bytes, UTF_8) simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource) @@ -587,7 +587,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { def testWritesExtendedAclChangeEventWhenInterBrokerProtocolAtLeastKafkaV2(): Unit = { givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV1)) val resource = Resource(Topic, "z_other", PREFIXED) - val expected = new String(ZkAclStore(PREFIXED).changeNode.createChangeNode(resource).bytes, UTF_8) + val expected = new String(ZkAclStore(PREFIXED).changeStore.createChangeNode(resource).bytes, UTF_8) simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource) @@ -600,7 +600,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { def testWritesLiteralWritesLiteralAclChangeEventWhenInterBrokerProtocolLessThanKafkaV2eralAclChangesForOlderProtocolVersions(): Unit = { givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV0)) val resource = Resource(Topic, "z_other", LITERAL) - val expected = new String(ZkAclStore(LITERAL).changeNode.createChangeNode(resource).bytes, UTF_8) + val expected = new String(ZkAclStore(LITERAL).changeStore.createChangeNode(resource).bytes, UTF_8) simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource) @@ -613,7 +613,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { def testWritesLiteralAclChangeEventWhenInterBrokerProtocolIsKafkaV2(): Unit = { givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV1)) val resource = Resource(Topic, "z_other", LITERAL) - val expected = new String(ZkAclStore(LITERAL).changeNode.createChangeNode(resource).bytes, UTF_8) + val expected = new String(ZkAclStore(LITERAL).changeStore.createChangeNode(resource).bytes, UTF_8) simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource) @@ -636,11 +636,11 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { private def getAclChangeEventAsString(patternType: ResourceNameType) = { val store = ZkAclStore(patternType) - val children = zooKeeperClient.handleRequest(GetChildrenRequest(store.aclChangePath)) + val children = zooKeeperClient.handleRequest(GetChildrenRequest(store.changeStore.aclChangePath)) children.maybeThrow() assertEquals("Expecting 1 change event", 1, children.children.size) - val data = zooKeeperClient.handleRequest(GetDataRequest(s"${store.aclChangePath}/${children.children.head}")) + val data = zooKeeperClient.handleRequest(GetDataRequest(s"${store.changeStore.aclChangePath}/${children.children.head}")) data.maybeThrow() new String(data.data, UTF_8) diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index 021c44c5f45c3..cc67a010bc401 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -428,7 +428,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { def testAclManagementMethods() { ZkAclStore.stores.foreach(store => { assertFalse(zkClient.pathExists(store.aclPath)) - assertFalse(zkClient.pathExists(store.aclChangePath)) + assertFalse(zkClient.pathExists(store.changeStore.aclChangePath)) ResourceType.values.foreach(resource => assertFalse(zkClient.pathExists(store.path(resource)))) }) @@ -437,7 +437,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { ZkAclStore.stores.foreach(store => { assertTrue(zkClient.pathExists(store.aclPath)) - assertTrue(zkClient.pathExists(store.aclChangePath)) + assertTrue(zkClient.pathExists(store.changeStore.aclChangePath)) ResourceType.values.foreach(resource => assertTrue(zkClient.pathExists(store.path(resource)))) val resource1 = new Resource(Topic, UUID.randomUUID().toString, store.patternType) @@ -490,10 +490,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { zkClient.createAclChangeNotification(Resource(Group, "resource1", store.patternType)) zkClient.createAclChangeNotification(Resource(Topic, "resource2", store.patternType)) - assertEquals(2, zkClient.getChildren(store.aclChangePath).size) + assertEquals(2, zkClient.getChildren(store.changeStore.aclChangePath).size) zkClient.deleteAclChangeNotifications() - assertTrue(zkClient.getChildren(store.aclChangePath).isEmpty) + assertTrue(zkClient.getChildren(store.changeStore.aclChangePath).isEmpty) }) } From 23bb972dc517ee4efdc54a4703e6d89602b8241a Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Tue, 12 Jun 2018 19:40:22 +0100 Subject: [PATCH 10/10] Jun's requested changes. --- core/src/main/scala/kafka/zk/KafkaZkClient.scala | 2 +- core/src/main/scala/kafka/zk/ZkData.scala | 14 ++++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 97ece58a22df2..ad55a6f578221 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -941,7 +941,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean //Acl management methods /** - * Creates the required zk nodes for Acl storage + * Creates the required zk nodes for Acl storage and Acl change storage. */ def createAclPaths(): Unit = { ZkAclStore.stores.foreach(store => { diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index c9c2e43e620a7..2cbdd8061d3ce 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -25,6 +25,7 @@ import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr} import kafka.cluster.{Broker, EndPoint} import kafka.common.{KafkaException, NotificationHandler, ZkNodeChangeNotificationListener} import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch} +import kafka.security.auth.Resource.Separator import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls import kafka.security.auth.{Acl, Resource, ResourceType} import kafka.server.{ConfigType, DelegationTokenManager} @@ -473,7 +474,7 @@ object StateChangeHandlers { *
      *
    • [[org.apache.kafka.common.resource.ResourceNameType#LITERAL Literal]] patterns are stored under '/kafka-acl-changes'. * The format is a UTF8 string in the form: <resource-type>:<resource-name>
    • - *
    • All other patterns are stored under '/kafka-acl-extended-changes/pattern-type' + *
    • All other patterns are stored under '/kafka-acl-extended-changes' * The format is JSON, as defined by [[kafka.zk.ExtendedAclChangeEvent]]
    • *
    */ @@ -489,7 +490,7 @@ sealed trait ZkAclStore { } object ZkAclStore { - private val storesByType = ResourceNameType.values + private val storesByType: Map[ResourceNameType, ZkAclStore] = ResourceNameType.values .filter(nameType => nameType != ResourceNameType.ANY && nameType != ResourceNameType.UNKNOWN) .map(nameType => (nameType, create(nameType))) .toMap @@ -585,8 +586,13 @@ case object LiteralAclChangeStore extends ZkAclChangeStore { legacyName.getBytes(UTF_8) } - def decode(bytes: Array[Byte]): Resource = - Resource.fromString(new String(bytes, UTF_8)) + def decode(bytes: Array[Byte]): Resource = { + val string = new String(bytes, UTF_8) + string.split(Separator, 2) match { + case Array(resourceType, resourceName, _*) => new Resource(ResourceType.fromString(resourceType), resourceName, ResourceNameType.LITERAL) + case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + string) + } + } } case object ExtendedAclChangeStore extends ZkAclChangeStore {