-
Notifications
You must be signed in to change notification settings - Fork 630
Moved the internal subscriptions delivery configuration to a config map #4832
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5f81887
d0faeb3
100a8e4
ff3a2b2
c08416b
e330be3
933c048
9d6583a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| # Copyright 2021 The Knative Authors | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # https://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| apiVersion: v1 | ||
| kind: ConfigMap | ||
| metadata: | ||
| name: mt-broker-internal-delivery | ||
| namespace: knative-eventing | ||
| labels: | ||
| eventing.knative.dev/release: devel | ||
| data: | ||
| _example: | | ||
| # Configure the internal channels subscription delivery spec | ||
| delivery: | | ||
| retry: 10 | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| /* | ||
| Copyright 2021 The Knative Authors | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package mtbroker | ||
|
|
||
| import ( | ||
| corev1 "k8s.io/api/core/v1" | ||
| "knative.dev/pkg/configmap" | ||
| "sigs.k8s.io/yaml" | ||
|
|
||
| eventingduck "knative.dev/eventing/pkg/apis/duck/v1" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Don't we use |
||
| ) | ||
|
|
||
| const ( | ||
| InternalDeliveryConfigMapName = "mt-broker-internal-delivery" | ||
| internalDeliveryConfigMapDeliveryKey = "delivery" | ||
|
Comment on lines
+28
to
+29
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason for making the name public but the key private? Can't they both be private? |
||
| ) | ||
|
|
||
| // NewInternalDeliveryConfigFromConfigMap parses the config map into a DeliverySpec | ||
| func NewInternalDeliveryConfigFromConfigMap(cm *corev1.ConfigMap) (*eventingduck.DeliverySpec, error) { | ||
| if cm == nil || len(cm.Data) == 0 { | ||
| return nil, nil | ||
| } | ||
| d, ok := cm.Data[internalDeliveryConfigMapDeliveryKey] | ||
| if !ok { | ||
| return nil, nil | ||
| } | ||
|
|
||
| var delivery eventingduck.DeliverySpec | ||
| err := yaml.Unmarshal([]byte(d), &delivery) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return &delivery, nil | ||
| } | ||
|
|
||
| // InternalDeliveryConfigStore is a typed wrapper around configmap.Untyped store to handle our configmaps. | ||
| // +k8s:deepcopy-gen=false | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is that annotation needed? As far as I can see we don't run deepcopy-gen on that package. |
||
| type InternalDeliveryConfigStore struct { | ||
| *configmap.UntypedStore | ||
| } | ||
|
|
||
| // NewInternalDeliveryConfigStore creates a new store of Configs and optionally calls functions when ConfigMaps are updated. | ||
| func NewInternalDeliveryConfigStore(logger configmap.Logger, onAfterStore ...func(name string, value interface{})) *InternalDeliveryConfigStore { | ||
| store := &InternalDeliveryConfigStore{ | ||
| UntypedStore: configmap.NewUntypedStore( | ||
| "channeldefaults", | ||
| logger, | ||
| configmap.Constructors{ | ||
| InternalDeliveryConfigMapName: NewInternalDeliveryConfigFromConfigMap, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see that |
||
| }, | ||
| onAfterStore..., | ||
| ), | ||
| } | ||
|
|
||
| return store | ||
| } | ||
|
|
||
| // Load creates a InternalDeliveryConfigStore from the current config state of the InternalDeliveryConfigStore. | ||
| func (s *InternalDeliveryConfigStore) Load() *eventingduck.DeliverySpec { | ||
| return s.UntypedLoad(InternalDeliveryConfigMapName).(*eventingduck.DeliverySpec) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| /* | ||
| Copyright 2021 The Knative Authors | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package mtbroker | ||
|
|
||
| import ( | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
| corev1 "k8s.io/api/core/v1" | ||
| "k8s.io/utils/pointer" | ||
|
|
||
| eventingduck "knative.dev/eventing/pkg/apis/duck/v1" | ||
|
|
||
| . "knative.dev/pkg/configmap/testing" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wherever possible it would be nice to avoid |
||
| ) | ||
|
|
||
| func TestMtBrokerInternalConfig(t *testing.T) { | ||
| _, example := ConfigMapsFromTestFile(t, "config-mtbroker-delivery") | ||
|
|
||
| for _, tt := range []struct { | ||
| name string | ||
| fail bool | ||
| want *eventingduck.DeliverySpec | ||
| data *corev1.ConfigMap | ||
| }{{ | ||
| name: "Nil config", | ||
| fail: false, | ||
| want: nil, | ||
| data: nil, | ||
| }, { | ||
| name: "Empty config", | ||
| fail: false, | ||
| want: nil, | ||
| data: &corev1.ConfigMap{}, | ||
| }, { | ||
| name: "With values", | ||
| fail: false, | ||
| want: &eventingduck.DeliverySpec{Retry: pointer.Int32Ptr(10)}, | ||
| data: example, | ||
| }} { | ||
| t.Run(tt.name, func(t *testing.T) { | ||
| testConfig, err := NewInternalDeliveryConfigFromConfigMap(tt.data) | ||
| if tt.fail != (err != nil) { | ||
| t.Fatal("Unexpected error value:", err) | ||
| } | ||
|
|
||
| require.Equal(t, tt.want, testConfig) | ||
| }) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| # Copyright 2021 The Knative Authors | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # https://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| apiVersion: v1 | ||
| kind: ConfigMap | ||
| metadata: | ||
| name: mt-broker-internal-delivery | ||
| namespace: default | ||
|
|
||
| data: | ||
| _example: | | ||
| delivery: | | ||
| retry: 10 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,21 +19,33 @@ package mttrigger | |
| import ( | ||
| "testing" | ||
|
|
||
| corev1 "k8s.io/api/core/v1" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| "knative.dev/pkg/configmap" | ||
| . "knative.dev/pkg/reconciler/testing" | ||
| "knative.dev/pkg/system" | ||
|
|
||
| // Fake injection informers | ||
| _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake" | ||
| _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake" | ||
| _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake" | ||
| "knative.dev/eventing/pkg/reconciler/mtbroker" | ||
|
Comment on lines
28
to
+32
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This import should be relocated, it is not a "Fake injection informer" so future readers will be confused. |
||
|
|
||
| _ "knative.dev/pkg/client/injection/ducks/duck/v1/source/fake" | ||
| _ "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake" | ||
| ) | ||
|
|
||
| func TestNew(t *testing.T) { | ||
| ctx, _ := SetupFakeContext(t) | ||
|
|
||
| c := NewController(ctx, configmap.NewStaticWatcher()) | ||
| cmw := configmap.NewStaticWatcher(&corev1.ConfigMap{ | ||
| ObjectMeta: metav1.ObjectMeta{ | ||
| Name: mtbroker.InternalDeliveryConfigMapName, | ||
| Namespace: system.Namespace(), | ||
| }, | ||
|
Comment on lines
+41
to
+45
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good candidate for |
||
| }) | ||
|
|
||
| c := NewController(ctx, cmw) | ||
|
|
||
| if c == nil { | ||
| t.Fatal("Expected NewController to return a non-nil value") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,23 +29,25 @@ import ( | |
| "k8s.io/client-go/dynamic" | ||
| corev1listers "k8s.io/client-go/listers/core/v1" | ||
|
|
||
| "knative.dev/pkg/apis" | ||
| duckv1 "knative.dev/pkg/apis/duck/v1" | ||
| "knative.dev/pkg/controller" | ||
| "knative.dev/pkg/logging" | ||
| "knative.dev/pkg/network" | ||
| pkgreconciler "knative.dev/pkg/reconciler" | ||
| "knative.dev/pkg/resolver" | ||
| "knative.dev/pkg/system" | ||
|
|
||
| "knative.dev/eventing/pkg/apis/eventing" | ||
| eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" | ||
| messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" | ||
| clientset "knative.dev/eventing/pkg/client/clientset/versioned" | ||
| eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" | ||
| messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1" | ||
| "knative.dev/eventing/pkg/duck" | ||
| "knative.dev/eventing/pkg/reconciler/mtbroker" | ||
| "knative.dev/eventing/pkg/reconciler/mtbroker/resources" | ||
| "knative.dev/eventing/pkg/reconciler/sugar/trigger/path" | ||
| "knative.dev/pkg/apis" | ||
| duckv1 "knative.dev/pkg/apis/duck/v1" | ||
| "knative.dev/pkg/controller" | ||
| "knative.dev/pkg/logging" | ||
| "knative.dev/pkg/network" | ||
| pkgreconciler "knative.dev/pkg/reconciler" | ||
| "knative.dev/pkg/resolver" | ||
| "knative.dev/pkg/system" | ||
| ) | ||
|
|
||
| var brokerGVK = eventingv1.SchemeGroupVersion.WithKind("Broker") | ||
|
|
@@ -67,6 +69,9 @@ type Reconciler struct { | |
| triggerLister eventinglisters.TriggerLister | ||
| configmapLister corev1listers.ConfigMapLister | ||
|
|
||
| // Internal Delivery store | ||
| internalDeliveryConfigStore *mtbroker.InternalDeliveryConfigStore | ||
|
|
||
| // Dynamic tracker to track Sources. In particular, it tracks the dependency between Triggers and Sources. | ||
| sourceTracker duck.ListableTracker | ||
|
|
||
|
|
@@ -163,7 +168,7 @@ func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *eventingv1 | |
| Name: b.Name, | ||
| Namespace: b.Namespace, | ||
| } | ||
| expected := resources.NewSubscription(t, brokerTrigger, brokerObjRef, uri, b.Spec.Delivery) | ||
| expected := resources.NewSubscription(t, brokerTrigger, brokerObjRef, uri, r.internalDeliveryConfigStore.Load()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm little confused why we need to add the ConfigMap here. As discussed in the spec PR, I thought the logic here would simply be (something like this): if (t.Spec.Delivery != nil
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This subscription is an internal subscription between broker components, the actual dispatch to the service is done by the filter handler. Now, with #4654 we're saying I've added a config map for that to allow the user to continue to modify this internal delivery spec, because in another PR i'll actually implement in mtbroker the delivery spec as stated by #4654
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Look at the description of this issue: #4515 |
||
|
|
||
| sub, err := r.subscriptionLister.Subscriptions(t.Namespace).Get(expected.Name) | ||
| // If the resource doesn't exist, we'll create it. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, I think we should document or warn users what this means.
If I set this retry here
retry: 10and my trigger hasretry: 10we're actually retrying at most10 * 10 = 100times since each time the channel implementation retries due to the delivery in this ConfigMap, the filter deployment (once implemented) will retry 10 times.Is my understanding correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly... Maybe we should use this "internal" configuration just for the ingress channel and then for the other channel we use the
BrokerSpec.Delivery/TriggerSpec.Delivery?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with that approach is that retries (in
trigger.spec.delivery.retryorbroker.spec.delivery.retry) are ephemeral since the filter deployment might crash while it was retrying and in that case, we're not reaching the specified retries intrigger.spec.delivery.retryorbroker.spec.delivery.retrybefore giving up.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point... I guess it makes sense, so maybe we need to just use this internal map for the first channel (the ingress one) and then use BrokerSpec.Delivery/TriggerSpec.Delivery for the filter channel?