KAFKA-7007: All ACL changes should use single /kafka-acl-changes path#5161
KAFKA-7007: All ACL changes should use single /kafka-acl-changes path#5161junrao merged 12 commits intoapache:trunkfrom
Conversation
junrao
left a comment
There was a problem hiding this comment.
@big-andy-coates : Thanks for the patch. A few comments below.
| } | ||
|
|
||
| @deprecated("There are now multiple roots for ACLs within ZK. Use ZkAclStore", "2.0") | ||
| object ResourceTypeZNode { |
There was a problem hiding this comment.
AclZNode and ResourceTypeZNode are not part of the public API. We can just remove them.
| def decode(bytes: Array[Byte]): Resource = { | ||
| val changeEvent = Json.parseBytesAs[AclChangeEvent](bytes) match { | ||
| case Right(event) => event | ||
| case Left(e) => throw new IllegalArgumentException("Failed to parse ACL change event", e) |
There was a problem hiding this comment.
During the upgrade phase, the new broker may see an acl_change node of the old format. So, we want to fall back to parse the value of the node using the old format if it's not json.
| <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1, 1.2).</li> | ||
| </ul> | ||
| </li> | ||
| <li> IMPORTANT: Do not change any ACLs during the upgrade. (The format of the ACL change event has changed and is not compatible with old brokers).</li> |
There was a problem hiding this comment.
This is a bit too restricting. To support adding acls during rolling upgrade, we probably only want write the acl_change path with the new json format if inter.broker.protocol is >= 2.0. Otherwise, if the new ACL doesn't use the new prefix feature, we switch to using the old format. Otherwise, we fail the addAcl request.
Since the AclCommand still writes to ZK directly. We probably want to recommend "Not use the 2.0 AclCommand tool until all brokers are upgraded to 2.0."
There was a problem hiding this comment.
Hi @junrao, my bad - we did discuss this. Sorry I missed this. Done now.
| <li>If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguities. | ||
| Hot-swapping the jar-file only might not work.</li> | ||
| <li>ACLs should not be added to prefixed resources, | ||
| <li>ACLs should not be added during the upgrade process. |
There was a problem hiding this comment.
Also, if we have upgraded the whole cluster and started using the new acl_change format in ZK, downgrading the cluster will cause the processing of acl_change sequence node to fail and the sequence node not to be deleted. This means the downgraded broker can't process ACL changes forever. So, your original idea of using a new acl_change path such as /kafka-acl-changes-v2 seems better.
There was a problem hiding this comment.
Actually, this has the other problem. Since the AclCommand tool still writes to ZK directly, if we switch to a new acl-change path, the old version of AclCommand won't work once the brokers are upgraded.
Given the complexity in dealing with upgrades/downgrades, maybe the current approach of using 2 separate acl-change path is not that bad. So, perhaps we can just keep the current approach, but (1) ensure that the new acl with prefix is only added if inter.broker.protocol is new; (2) In ZkNodeChangeNotificationListener.processNotifications(), catch decoding error of individual acl change value, log it and let it go. This may help future format change.
Sorry for going back and forth on this one.
There was a problem hiding this comment.
Hummm.... Good points @junrao.
I have another suggestion...
pre-2.0 behaviour uses a colon-separated string to write the change notification in the format <resource-type>:<name>, where the name can have embedded colons, e.g. a group of my:weird:group:name.
With my recent changes the toString representation of Resource is <name-type>:<resource-type>:<name>, and I've already enhanced the Resource.fromString() method to handle both old and new formats. For the solution below I'll need to swap the first two parts in the new format so that it is <resource-type>:<name-type>:<name>.
Given this new format, rather than switching to JSON we could just write out change notifications out as strings still, with either two or three parts, depending on inter.broker.protocol.version.
During a rolling upgrade change events would be written using two parts, which work with older brokers and are interpreted as 'literal' resources by the new brokers. (Attempts to add Prefixed would result in UVE).
Once upgraded brokers start to write using three parts change events.
On a downgrade older brokers would treat the 'Topic:Prefixed:Foo' as a change on a Topic called 'Prefixed:Foo', which wouldn't match anything. I'd have to check, but there's every possibility that the broker would process this without error and move on. The fact that the old broker effectively ignored the event shouldn't matter as the event must be old to be in this format.
Thoughts?
There was a problem hiding this comment.
Yes, that seems to be a good idea. The only confusion is that if someone has a group named "Literal:weirdgroup" or "Prefixed:weirdgroup". Since that's unlikely, I think what you suggested is probably the best approach given what we have now.
There was a problem hiding this comment.
Great. Thinking on this more, I think we should consider not adding the backwards compatibility logic, I.e. the new brokers should always write using the three part name.
If we do add this logic it will need to be in the code base for a long long time, as there could always be some user upgrading. If we don’t add it, then the edge case this affects is very small / unlikely and the impact minimal: the old broker would only gave incorrect acts until it was itself upgraded, which should be a small window.
What do you think @junrao?
# Conflicts: # core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala # core/src/main/scala/kafka/zk/KafkaZkClient.scala
# Conflicts: # core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala # core/src/main/scala/kafka/zk/ZkData.scala # core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
… part, (`<resource-type>:<name>`), or 3 part, (`<resource-type>:<name-type>:<name>`), depending on if broker api version is less than 2.0V1 or not, respectively.
|
@junrao, so I've switched this back to colon-separated, rather than JSON. It currently still does an api version check, but I really wonder if this is worth it. It might be sufficient to say that 'ACLs added during the upgrade process may not be set on older brokers, but will be picked up once the broker is upgraded or restarted', and this would only be if using the Admin client. If you still really want the version check, (and having that in the code base going forward), then I think this is good to merge. If you'd rather drop the version check, just let me know and I'll update. |
|
What's the behavior when someone with a new copy of kafka-acls.sh interacts with a cluster that's still using the old two-part encoding? That script still talks directly to ZooKeeper, so there's no compatibility shim that we could possibly add, right? So this will break older clusters when used, right? Overall, I agree that we should have used JSON when creating the ACL metadata. But I'm a bit concerned that fixing it now creates a lot of compatibility problems, and doesn't really solve any problem that users have. |
junrao
left a comment
There was a problem hiding this comment.
@big-andy-coates : Thanks for the updated patch. A few more minor comments below.
About not adding the backwards compatibility logic. Upgrading a large cluster may take some time. So, being able to write ACL changes in a compatible format during this process could still be useful. The additional logic for supporting both format doesn't seem too bad.
Finally, while you are here, in ZkNodeChangeNotificationListener.processNotifications(), could we catch decoding error of individual acl change value, log it and let it go. This way, a single bad acl change event won't halt future acl changes.
| def path = "/kafka-acl-changes" | ||
| } | ||
|
|
||
| case class AclChangeEvent(@BeanProperty @JsonProperty("version") version: Int, |
There was a problem hiding this comment.
This class is no longer used.
| import scala.collection.JavaConverters._ | ||
| import scala.collection.mutable.ArrayBuffer | ||
| import scala.collection.{Seq, breakOut} | ||
| import scala.util.{Failure, Success, Try} |
There was a problem hiding this comment.
unused imports Failure, Success
| } | ||
|
|
||
| object AclChangeEvent { | ||
| val currentVersion: Int = 1 |
There was a problem hiding this comment.
No, and done.
| * @param resource resource name | ||
| * Creates colon separated Acl change notification message. | ||
| * | ||
| * Kafka 2.0 saw the format of the ACL change event change from a 'colon separated' string, to a JSON message. |
There was a problem hiding this comment.
The comment needs to be adjusted since we are no longer using JSON.
There was a problem hiding this comment.
Good catch - done.
| * | ||
| * @param resource resource pattern that has changed | ||
| */ | ||
| def createLegacyAclChangeNotification(resource: Resource): Unit = { |
There was a problem hiding this comment.
Would it be better to have a single createAclChangeNotification() method and pass in a isLegacy flag?
There was a problem hiding this comment.
6 = 2x3? ;)
I like it being more explicit in the function name than some boolean flag.
If you feel strongly I can change.
| } | ||
|
|
||
| @Test | ||
| def shouldRoundTripJSON(): Unit = { |
There was a problem hiding this comment.
We are no longer storing JSON.
As discussed with @junrao, the release notes document that the new kafka-acls.sh script should not be used until the cluster is upgraded. If they do, it won't break the cluster, but it will be ignored by brokers. It would be possible for users to pass in the Do we think this is acceptable?
The JSON work has been reverted, (see reasons in the history of the PR). Current impl uses either two or three part, colon-separated, ACL change notification. |
|
Hi @junrao, thanks for the review. All changes done, except #5161 (comment). Let me know on that one. Thanks, Andy |
junrao
left a comment
There was a problem hiding this comment.
@big-andy-coates : Thanks for the latest patch. LGTM. Just one more minor comment below.
| val (data, _) = zkClient.getDataAndStat(changeZnode) | ||
| data match { | ||
| case Some(d) => Try(notificationHandler.processNotification(d)) match { | ||
| case Failure(e) => error(s"error processing change notification from $changeZnode when processing notification $notification", e) |
There was a problem hiding this comment.
Could we include d in the error message?
- Literal ACLs remain on the old paths, using the old formats. - Prefixed, and any latter types, go on new paths, using JSON. - Check remains in place to reject PREFIXED acls until inter.broker.protocol.version is bumped.
|
@cmccabe @junrao @ijuma inline with discussion from last night...
|
| * <li>[[org.apache.kafka.common.resource.ResourceNameType#LITERAL Literal]] patterns are stored under '/kafka-acl'. | ||
| * The format is JSON. See [[kafka.zk.ResourceZNode]] for details.</li> | ||
| * <li>All other patterns are stored under '/kafka-acl-extended/<i>pattern-type</i>'. | ||
| * The format is JSON. See [[kafka.zk.ResourceZNode]] for details.</li> |
There was a problem hiding this comment.
We could store literal patterns under the new path, right?
It's more like, the old path can only have LITERAL patterns, but the new path can have that and other stuff.
There was a problem hiding this comment.
If we stored any literal under the new path old brokers wouldn't pick them up. So for now all literal go under the old path.
| * /kafka-prefixed-acl/Group/group-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]} | ||
| * | ||
| * // Prefixed patterns: | ||
| * /kafka-acl-extended/PREFIXED/Group/group-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]} |
There was a problem hiding this comment.
Why not have a JSON field describing prefixed vs. literal vs. whatever? That would cut down on the number of ZK paths we have to listen to.
There was a problem hiding this comment.
As discussed, not possible, as we'd get ZK path clashes between pattern types.
|
Looks good overall. What's the rationale for making PREFIXED part of the ZK path, rather than a type field in the JSON structure? |
…ChangeStores, (one each for '/kafka-acl-changes' and 'kafka-acl-extended-changes'), and ZkAclStores per Pattern type.
junrao
left a comment
There was a problem hiding this comment.
@big-andy-coates : Thanks for the new patch. Just a few more minor comments.
| } | ||
|
|
||
| object ZkAclStore { | ||
| private val storesByType = ResourceNameType.values |
There was a problem hiding this comment.
Could we explicitly define the type of storesByType?
There was a problem hiding this comment.
On a private field? Cor blimey gov. OK, consider it done.
:D
| * <ul> | ||
| * <li>[[org.apache.kafka.common.resource.ResourceNameType#LITERAL Literal]] patterns are stored under '/kafka-acl-changes'. | ||
| * The format is a UTF8 string in the form: <resource-type>:<resource-name></li> | ||
| * <li>All other patterns are stored under '/kafka-acl-extended-changes/<i>pattern-type</i>' |
There was a problem hiding this comment.
This should be stored under /kafka-acl-extended-changes?
|
|
||
| val securePaths: Seq[String] = stores | ||
| .flatMap(store => List(store.aclPath, store.aclChangePath)) | ||
| trait AclChangeSubscription extends AutoCloseable { |
There was a problem hiding this comment.
Do we need to extend from AutoCloseable since close() doesn't throw exception?
| ResourceType.values.foreach(resourceType => createRecursive(store.path(resourceType), throwIfPathExists = false)) | ||
| }) | ||
|
|
||
| ZkAclChangeStore.stores.foreach(store => createRecursive(store.aclChangePath, throwIfPathExists = false)) |
There was a problem hiding this comment.
The comment of the method should be adjusted to "Creates the required zk nodes for Acl storage and Acl change storage".
| case Some(resourceType) => | ||
| val remaining = str.substring(resourceType.name.length + 1) | ||
|
|
||
| ResourceNameType.values.find(nameType => remaining.startsWith(nameType.name + Separator)) match { |
There was a problem hiding this comment.
Hmm, this is called by LiteralAclChangeStore.decode(), which should only decode a 2-part name.
There was a problem hiding this comment.
Good spot. I've changed LiteralAclChangeStore.decode() to have its own impl. (Basically the old contents of Resource.fromString), to maintain v1.1 behaviour
|
@jun - thanks! - changes made as requested. |
junrao
left a comment
There was a problem hiding this comment.
@big-andy-coates : Your latest PR LGTM. I will merge it when the test is done. A couple other things. (1) Could you update the KIP to reflect the changes that you made and also post an update to the voting thread? (2) It seems useful to include the value of d in the logging message below. Perhaps you can include it in your other PR.
case Some(d) => Try(notificationHandler.processNotification(d)) match {
case Failure(e) => error(s"error processing change notification from $changeZnode", e)
Keep Literal ACLs on the old paths, using the old formats, to maintain backwards compatibility. Have Prefixed, and any latter types, go on new paths, using JSON, (old brokers are not aware of them) Add checks to reject any adminClient requests to add prefixed acls before the cluster is fully upgraded. Colin Patrick McCabe <colin@cmccabe.xyz>, Jun Rao <junrao@gmail.com>
Keep Literal ACLs on the old paths, using the old formats, to maintain backwards compatibility. Have Prefixed, and any latter types, go on new paths, using JSON, (old brokers are not aware of them) Add checks to reject any adminClient requests to add prefixed acls before the cluster is fully upgraded. Colin Patrick McCabe <colin@cmccabe.xyz>, Jun Rao <junrao@gmail.com>
Fix for KAFKA-7007.
The initial PR for KIP-290 #5117 added a new path in ZK for ACL change events to come through for the new Prefixed resource pattern. @jun requested that this be reverted, and in its place we should use JSON values for the change events.
This PR looks to make these changes. Where possible, I've put code back to being the same as pre PR 5117.
cc @cmccabe, @junrao
UPDATE: After implementing the switch to JSON it was decided that this causes too many potential problems. Instead, the code now looks to encode a 3 part ACL change event, (
<resource-type>:<name-type>:<name>), where the old brokers used a 2 part event, (<resource-type>:<name>). Older brokers will effectively ignore the new style events, without any errors or warnings.UPDATE: After more discussions a new hybrid approach is being adopted. The PR now looks to:
Note:
Committer Checklist (excluded from commit message)