Subscription Controller basics.#437
Conversation
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: vaikas-google The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
| // target. | ||
| // +optional | ||
| Result *ResultStrategy `json:"result,omitempty"` | ||
| Sinkable string `json:"sinkable"` |
There was a problem hiding this comment.
I thought the dispatcher acts upon the channel's resolved subscriptions to determine the target DNS [1] to send an event and optionally handle replies depending on whether the target was a Callable. With this change to a single Sinkable, I am failing to understand how the reply logic will be handled by the dispatcher?
There was a problem hiding this comment.
Yes, that's exactly right. But this is really for connecting Subscriptions together, well, really Sources to Channels. Say I have the following:
Source -> Call -> Result
Result is really a channel and if another subscription wants to connect to that channel, the model will look more like this:
Source -> Call -> Result (Now becomes a Source2) -> Call -> Result2
But, we need to be able to support multiple subscriptions to the channel, so it really looks more like this:
Source -> Call -> Result -> SourceChannel -> Call -> Result2
-> SourceChannel2 -> Call -> Result3
In this model, the "Result" Channel will only get SInkable, since they represent "input (or Source)" channels fronting Subscriptions and will allow for Fanout. We do not want the Dispatcher to be handling failures in fanouts.
Does that make more sense?
There was a problem hiding this comment.
I've added both entries back there, but in the form of a fully resolved string. I still don't think that Channel should be in the business of resolving refs, etc. but I understand why we probably need both of them here from control plane side of things. I've added that to the pkg/apis repo.
|
|
||
| // ChannelSubscriberSpec defines a single subscriber to a Channel. At least one | ||
| // of Call or Result must be present. | ||
| // ChannelSubscriberSpec defines a single subscriber to a Channel. |
There was a problem hiding this comment.
You should add:
var _ = duck.VerifyType(&Channel{}, &duckv1alpha1.Channelable{})For all implemented interfaces.
a194864 to
48774f4
Compare
|
@neosab I'd like to get enough in so that we can then keep working in parallel in smaller chunks. |
@vaikas-google 👍 Sounds good. |
| return apis.ErrMissingField("call", "result").ViaField(fmt.Sprintf("subscriber[%d]", i)) | ||
| if cs.Channelable != nil { | ||
| for i, subscriber := range cs.Channelable.Subscribers { | ||
| if subscriber.SinkableDomain == "" { |
There was a problem hiding this comment.
We should error if both CallableDomain and SinkableDomain are empty
| for i, subscriber := range cs.Channelable.Subscribers { | ||
| if subscriber.SinkableDomain == "" && subscriber.CallableDomain == "" { | ||
| fe := apis.ErrMissingField("sinkableDomain", "callableDomain") | ||
| fe.Details = "expected at least one of, got none" |
There was a problem hiding this comment.
I couldn't find. I thought I found it but the name was a bit misleading (IMHO). I created one but didn't want to block this on it and instead would prefer to clean it up later.
There was a problem hiding this comment.
What about this:
apis.ErrMissingField("sinkableDomain", "callableDomain").ViaField(fmt.Sprintf("subscribers[%d]", i))There was a problem hiding this comment.
There is no helper for adding details to a Err* type method, there is a helper for setting the index.
it should be:
errs = errs.Also(apis.ErrMissingField("sinkableDomain", "callableDomain").ViaFieldIndex("subscribers", i))
grantr
left a comment
There was a problem hiding this comment.
/lgtm
Couple minor issues, but I'm 👍 to get this in to unblock experiments. Thanks @vaikas-google! The dynamic client stuff seems elegant to me.
| for i, subscriber := range cs.Channelable.Subscribers { | ||
| if subscriber.SinkableDomain == "" && subscriber.CallableDomain == "" { | ||
| fe := apis.ErrMissingField("sinkableDomain", "callableDomain") | ||
| fe.Details = "expected at least one of, got none" |
There was a problem hiding this comment.
What about this:
apis.ErrMissingField("sinkableDomain", "callableDomain").ViaField(fmt.Sprintf("subscribers[%d]", i))|
|
||
| // Subscription might be Subscribable. This depends if there's a Result channel | ||
| // In that case, this points to that resource. | ||
| Subscribable duckv1alpha1.Subscribable `json:"subscribable,omitempty"` |
There was a problem hiding this comment.
IIUC, if a Subscription has a Result Channel, then chained Subscriptions are allowed. It would be possible to create another Subscription that looks like this:
from:
ref:
kind: Subscription
name: some-sub
call:
// ...
result:
// ...Is this the intent?
There was a problem hiding this comment.
YES! :) That's exactly right.
There was a problem hiding this comment.
YES! That's exactly right, and only if a Subscription has a Result. Just need to fill the status up for that and Result.Status would then point to the resolved ResultChannel
| } | ||
|
|
||
| const ( | ||
| // SubscriptionConditionReady has status True when all subcondtions below have been set to True. |
There was a problem hiding this comment.
nonblocking typo nit: subconditions
|
|
||
| // SubscriptionConditionFromReady has status True when controller has successfully added a subscription to From | ||
| // resource. | ||
| SubscriptionConditionFromReady duckv1alpha1.ConditionType = "Subscribed" |
There was a problem hiding this comment.
Why is the name of this FromReady internally but Subscribed externally?
There was a problem hiding this comment.
haha, when I was writing code it seemed like a better name. Forgot to change here, will do...
| // converge the two. It then updates the Status block of the Subscription resource | ||
| // with the current status of the resource. | ||
| func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { | ||
| glog.Infof("Reconciling subscription %v", request) |
There was a problem hiding this comment.
It can be a future PR, but this should probably use zap.
There was a problem hiding this comment.
ack. Let's do in a followup so others can start experimenting / partying on this code as well :)
| return nil | ||
| } | ||
|
|
||
| func (r *reconciler) CreateResourceInterface(namespace string, ref *corev1.ObjectReference) (dynamic.ResourceInterface, error) { |
There was a problem hiding this comment.
Not sure if this is useful, or even how it works, but controller-runtime just gained the ability to work with unstructured objects: kubernetes-sigs/controller-runtime#101
There was a problem hiding this comment.
Awesome! Let's investigate on a followup pr, ok?
| // Add our subscription to the object | ||
| after := c.DeepCopyObject().(*duckv1alpha1.Channel) | ||
| after.Spec.Channelable = &duckv1alpha1.Channelable{ | ||
| Subscribers: []duckv1alpha1.ChannelSubscriberSpec{{CallableDomain: "foobar"}}, |
There was a problem hiding this comment.
I think this should probably use the given callableDomain and resultDomain
There was a problem hiding this comment.
haha, yes :) Didn't get a push out before I had to go get to the plane and I lost wifi.
| return resourceClient.Get(ref.Name, metav1.GetOptions{}) | ||
| } | ||
|
|
||
| func (r *reconciler) reconcileFromChannel(namespace string, subscribable corev1.ObjectReference, callDomain string, resultDomain string, deleted bool) error { |
There was a problem hiding this comment.
How is deleted used in this method? Will that be a future enhancement?
| return jsonpatch.CreatePatch(rawBefore, rawAfter) | ||
| } | ||
|
|
||
| const ( |
There was a problem hiding this comment.
I think all of these constants could be deleted - they don't seem to be used anywhere.
| duckv1alpha1.AddToScheme(scheme.Scheme) | ||
| } | ||
|
|
||
| var injectDomainInternalMocks = controllertesting.Mocks{ |
There was a problem hiding this comment.
Thanks @adamharwayne for writing the mock client for this!
|
Thanks, sorry for the duplicate comments, sometimes I make a comment, then they seem to disappear just to come back. Hopefully the dupes don't contradict each other at least. |
|
Ok, pushed one more version and added more unit tests and set some rudimentary conditions. If it's not horribly wrong, please merge it and we'll continue after the merge. I'm turning into a pumpkin until Wednesday, but would be nice to see it land before then. |
|
The following is the coverage report on pkg/.
|
| Subscribable duckv1alpha1.Subscribable `json:"subscribable,omitempty"` | ||
| } | ||
|
|
||
| const ( |
There was a problem hiding this comment.
You need to update line 173 with:
var subCondSet = duckv1alpha1.NewLivingConditionSet(SubscriptionConditionReferencesResolved, SubscriptionConditionFromReady)
| ) | ||
|
|
||
| // GetSpecJSON returns spec as json | ||
| func (s *Subscription) GetSpecJSON() ([]byte, error) { |
There was a problem hiding this comment.
you can delete GetSpecJSON now that the duck work has gone in.
|
|
||
| func isValidResultStrategy(r *ResultStrategy) *apis.FieldError { | ||
| return isValidObjectReference(*r.Target).ViaField("target") | ||
| func isValidResultStrategy(r ResultStrategy) *apis.FieldError { |
There was a problem hiding this comment.
FYI, I am taking some of these and moving it to a common place to reuse the logic:
https://github.com/knative/eventing/pull/457/files#diff-96c67291a6d32860d990069d9d9d5cd3
There was a problem hiding this comment.
@n3wscott if this PR goes in first, can you fix up the duplication?
|
|
||
| // SubscriptionConditionFromReady has status True when controller has successfully added a subscription to From | ||
| // resource. | ||
| SubscriptionConditionFromReady duckv1alpha1.ConditionType = "FromReady" |
There was a problem hiding this comment.
After you add these, it is helpful to expose an IsReady method using the Conditions helper to be the judge.
https://github.com/knative/eventing/pull/457/files#diff-67a9c730f0d592911dfa317323fea9e5R131
Also a InitializeConditions
https://github.com/knative/eventing/pull/457/files#diff-67a9c730f0d592911dfa317323fea9e5R137
And then Mark* style methods as you need them.
|
|
||
| func (r *reconciler) reconcile(subscription *v1alpha1.Subscription) error { | ||
| // TODO: Should this just also set up a defer call for subscription.SetConditions. | ||
| // No time right now as I'm turning into a pumpking but seems reasonable. |
There was a problem hiding this comment.
I'll switch these to using Mark* methods in a followup PR.
| } | ||
| // Everything went well, set the fact that subscriptions have been modified | ||
| conditions = append(conditions, duckv1alpha1.Condition{ | ||
| Type: duckv1alpha1.ConditionReady, |
There was a problem hiding this comment.
Was this supposed to be setting ConditionFromReady?
…pagation removing patch, b/c merged upstream
…t. This allows us to mount in custom ca bundles via ConfigMap/VolumeSource. (knative#437) OCP documentation is here: https://docs.openshift.com/container-platform/4.14/networking/configuring-a-custom-pki.html#certificate-injection-using-operators_configuring-a-custom-pki Signed-off-by: Matthias Wessendorf <mwessend@redhat.com>
Fixes #438
Proposed Changes
Add the barebones for getting a Subscription Controller wiring things into source channels, and call / sinkables using the duck types.
Release Note