Adds Kafka Channel Provisioner Controllers#468
Conversation
grantr
left a comment
There was a problem hiding this comment.
I think this is the first PR for a Channel implementation in the new model! Thanks @neosab, awesome work.
I haven't reviewed the whole thing because it's WIP, but I have a few comments that might be interesting to you. Feel free to ignore.
| } | ||
|
|
||
| // Skip channel provisioners that we don't manage | ||
| if provisioner.Name != config.Name || provisioner.Namespace != config.Namespace { |
There was a problem hiding this comment.
I think this filtering should be done in the Watch call in the provider. That way the informer cache doesn't even store the other provisioners locally.
Unfortunately this feature doesn't exist in controller-runtime, though it doesn't seem too hard to add. We need a variant of source.Kind that allows filtering by field values. In the meantime we can prototype using source.Informer given a filtered informer.
This will be a common pattern in these controllers so IMO it makes sense to invest in making this easy.
There was a problem hiding this comment.
Thanks for the pointer. I took a quick glance, I do see the generated NewFilteredClusterProvisionerInformer, I will give this a try soon.
|
@grantr @adamharwayne Thanks for your quick reviews! 👍 Sorry about the late response though. |
|
Sorry about the delay with this PR. I was on vacation 🛫 last week and just getting back to this! |
|
I removed the logic to fetch the provisioner configmap in the controller and instead read it from a mounted volume. This way the controller does not need the cluster-wide privilege to read all config maps. |
|
/assign grantr |
n3wscott
left a comment
There was a problem hiding this comment.
Nice start. I would still call it WIP because you don't do any work on the channels yet.
@grantr and I have been talking about the ups and downs of using controller runtime vs meta controller and this PR is exactly what I was talking about with the boilerplate required to get controller runtime up and running. It is not super fun as the dev to implement all this and we think we have an idea to refactor some of this closer inline with how meta-controller does things while still using controller-runtime. You can see, #513, I did a lot of the same stuff (but I did not add tests yet :D)
|
|
||
| // Skip channel not managed by this provisioner | ||
| provisionerRef := channel.Spec.Provisioner.Ref | ||
| clusterProvisioner, err := r.getClusterProvisioner() |
There was a problem hiding this comment.
this is a cool idea! I wonder if the flexibility of setting the name of the provisioner should be a argument though, what does it mean for the controller to change which provisioner it is servicing while it is running?
There was a problem hiding this comment.
The idea was to support multiple provisioners not just the one in the kafka.yaml. The configmap can be updated when new provisioners are added. You are right, it is not that useful with just one provisioner. I can remove this and make the name a const if there is a strong opinion.
There was a problem hiding this comment.
I removed the flexibility of the setting the provisioner name with the latest commit. I don't find a use case as of now. And this will be in line with the in-memory channel provisioner.
| } | ||
|
|
||
| // Skip channel not managed by this provisioner | ||
| provisionerRef := channel.Spec.Provisioner.Ref |
There was a problem hiding this comment.
perhaps all of this validating the provisioner logic can be moved to a helper function called before the deepcopy has to happen.
There was a problem hiding this comment.
I see that #484 also has similar logic (so would most channel provisioners) and would like to move into a common util method once we have both PR's in.
When I posted this couple of weeks back there was no reference implementation for cluster provisioners so I just wanted to test the waters by addressing the Task 1 in #442. If you wish I can implement kafka channel provisioning in this PR itself.
I agree it's a lot of boilerplate code (w/ tests) when implementing provisioners. Not too familiar with meta-controller impl, let me take a look at it to see what you intend to do. |
|
|
||
| type reconciler struct { | ||
| client client.Client | ||
| restConfig *rest.Config |
| return nil | ||
| } | ||
|
|
||
| // Skip Channel as it is not targeting any provisioner |
There was a problem hiding this comment.
I think the decision of whether or not to reconcile should happen earlier. For example, when delete does something, we might not remember that it is not checking whether this controller should be controlling this channel.
My preference is to check before calling reconcile(). At least checking everything but the status of the ClusterProvisioner itself.
There was a problem hiding this comment.
Makes sense, I refactored this.
evankanderson
left a comment
There was a problem hiding this comment.
A few comments, but this looks like it's close to ready.
| ``` | ||
| ko apply -f config/provisioners/kafka/kafka-provisioner.yaml | ||
| ``` | ||
| > Note: If you are using Strimzi, you need to update the `KAFKA_BOOTSTRAP_SERVERS` value in the `kafka-channel-controller-config` ConfigMap to `my-cluster-kafka-bootstrap.kafka.9092`. |
There was a problem hiding this comment.
Can this be done after the ko apply using kubectl edit?
| kind: ClusterChannelProvisioner | ||
| name: kafka-channel | ||
| ``` | ||
| 1. (Optional) Install [Kail](https://github.com/boz/kail) - Kubernetes tail |
There was a problem hiding this comment.
@Harwayne is working on allowing a default provisioner if not specified. It might be worth a commented item here to suggest being able to set this as default.
| ``` | ||
| ko apply -f config/provisioners/kafka/kafka-provisioner.yaml | ||
| ``` | ||
| > Note: If you are using Strimzi, you need to update the `KAFKA_BOOTSTRAP_SERVERS` value in the `kafka-channel-controller-config` ConfigMap to `my-cluster-kafka-bootstrap.kafka.9092`. |
There was a problem hiding this comment.
For non-Strimzi Kafka, are there other parameters (servers and/or credentials) that need to be configured?
There was a problem hiding this comment.
just the bootstrap server - in both cases: Strimzi and no strimzi
| * ClusterChannelProvisioner Controller | ||
| * Channel Controller Config Map | ||
|
|
||
| The ClusterChannelProvisioner Controller and the Channel Controller are colocated in one Pod. |
There was a problem hiding this comment.
+1, I think this is a good colocation.
| } | ||
|
|
||
| err := kafkaClusterAdmin.CreateTopic(topicName, &sarama.TopicDetail{ | ||
| ReplicationFactor: 1, |
There was a problem hiding this comment.
Should this be another argument?
There was a problem hiding this comment.
It could be but it wasn't in the previous ClusterBus implementation hence I didn't make it an argument. Maybe something that can be done later?
| // If we didn't change anything then don't call updateStatus. | ||
| // This is important because the copy we loaded from the informer's | ||
| // cache may be stale and we don't want to overwrite a prior update | ||
| // to status with this stale state. |
There was a problem hiding this comment.
It looks like the when-to-update logic here is different than for Channel. Am I mis-reading?
There was a problem hiding this comment.
You are right. Actually, I added a util method to update provisioner status in #560. I missed to use it here. I fixed it.
There was a problem hiding this comment.
Actually, my bad, I do use the util method but in #573. But I also went ahead and changed it here.
|
So there's good news and bad news. 👍 The good news is that everyone that needs to sign a CLA (the pull request submitter and all commit authors) have done so. Everything is all good there. 😕 The bad news is that it appears that one or more commits were authored or co-authored by someone other than the pull request submitter. We need to confirm that all authors are ok with their commits being contributed to this project. Please have them confirm that here in the pull request. Note to project maintainer: This is a terminal state, meaning the |
|
The following is the coverage report on pkg/.
|
Updating docs, based on feedback
|
The following is the coverage report on pkg/.
|
|
@evankanderson the CLA bot needs, again, some help. |
|
/lgtm |
|
Please jump through the hoops and both add a comment stating that you are ok with these changes to appease the CLA manual approval process. |
|
I am 💯 OK w/ the changes! /lgtm |
|
I am ok with the change too |
|
A Googler has manually verified that the CLAs look good. (Googler, please make sure the reason for overriding the CLA status is clearly documented in these comments.) |
|
ok, I'm leaving for Evan to approve, flipped the CLA bit. |
|
we have two PRs, depending on this. so we can - and will - address issues one way or the other 😹 |
| ) | ||
|
|
||
| type channelArgs struct { | ||
| NumPartitions int32 `json:"NumPartitions,omitempty"` |
There was a problem hiding this comment.
(FYI) You don't need to supply a name parameter here the name is the same as the public field name.
|
|
||
| if errors.IsNotFound(err) { | ||
| r.logger.Info("could not find channel", zap.Any("request", request)) | ||
| return reconcile.Result{}, nil |
There was a problem hiding this comment.
I'm not sure why the dependent resources (e.g. topic in Kafka) would have been deleted when the apiserver resource was deleted. Wouldn't that be the job of this controller? (In particular, the code on line 122.)
(You can fix this in #573 if it is a bug)
| err = r.reconcile(newChannel) | ||
| } else { | ||
| newChannel.Status.MarkNotProvisioned("NotProvisioned", "ClusterChannelProvisioner %s is not ready", clusterChannelProvisioner.Name) | ||
| err = fmt.Errorf("ClusterChannelProvisioner %s is not ready", clusterChannelProvisioner.Name) |
There was a problem hiding this comment.
Ah, I'd missed that err below was scoped to the if block. Thanks for changing the name.
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: evankanderson, neosab The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Related to #442
PR adds basic code for Kafka Channel Provisioner.
Note: This is based on my interpretation of the spec. Please correct me if I am wrong.
Proposed Changes
ClusterProvisionerChannelLimitations
ClusterProvisionerClusterProvisionerthat target different kafka clusters. For now the controller updates status for a configuredClusterProvisionerthat targets a single kafka cluster. I am open to suggestions.ClusterProvisionerbut can be improved to be generic and handleProvisionerin the future.Release Note