From 32ba562b07dd1b524ff52f4aad2aee5d237ec1b5 Mon Sep 17 00:00:00 2001 From: nachocano Date: Wed, 5 Jun 2019 14:41:02 -0700 Subject: [PATCH 01/10] initial push for NATSS unit tests --- contrib/natss/cmd/channel_controller/main.go | 15 +- .../pkg/reconciler/controller/natsschannel.go | 8 +- contrib/natss/pkg/reconciler/reconciler.go | 168 ++++++++++++++++++ .../natss/pkg/reconciler/stats_reporter.go | 164 +++++++++++++++++ .../natss/pkg/reconciler/testing/factory.go | 96 ++++++++++ .../natss/pkg/reconciler/testing/listers.go | 93 ++++++++++ .../pkg/reconciler/testing/natsschannel.go | 114 ++++++++++++ pkg/reconciler/testing/listers.go | 1 - 8 files changed, 640 insertions(+), 19 deletions(-) create mode 100644 contrib/natss/pkg/reconciler/reconciler.go create mode 100644 contrib/natss/pkg/reconciler/stats_reporter.go create mode 100644 contrib/natss/pkg/reconciler/testing/factory.go create mode 100644 contrib/natss/pkg/reconciler/testing/listers.go create mode 100644 contrib/natss/pkg/reconciler/testing/natsschannel.go diff --git a/contrib/natss/cmd/channel_controller/main.go b/contrib/natss/cmd/channel_controller/main.go index ac2daa369f6..5b80fdb42fd 100644 --- a/contrib/natss/cmd/channel_controller/main.go +++ b/contrib/natss/cmd/channel_controller/main.go @@ -23,12 +23,10 @@ import ( "github.com/knative/eventing/contrib/natss/pkg/stanutil" "github.com/knative/eventing/contrib/natss/pkg/util" - clientset "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned" - eventingScheme "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned/scheme" informers "github.com/knative/eventing/contrib/natss/pkg/client/informers/externalversions" + "github.com/knative/eventing/contrib/natss/pkg/reconciler" natsschannel "github.com/knative/eventing/contrib/natss/pkg/reconciler/controller" "github.com/knative/eventing/pkg/logconfig" - "github.com/knative/eventing/pkg/reconciler" "github.com/knative/pkg/configmap" kncontroller "github.com/knative/pkg/controller" "github.com/knative/pkg/logging" @@ -36,7 +34,6 @@ import ( "github.com/knative/pkg/system" "go.uber.org/zap" kubeinformers "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) @@ -80,30 +77,24 @@ func main() { cfg.QPS = numControllers * rest.DefaultQPS cfg.Burst = numControllers * rest.DefaultBurst opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh) - // Setting up our own eventingClientSet as we need the messaging API introduced with natss. - eventingClientSet := clientset.NewForConfigOrDie(cfg) kubeInformerFactory := kubeinformers.NewSharedInformerFactory(opt.KubeClientSet, opt.ResyncPeriod) - eventingInformerFactory := informers.NewSharedInformerFactory(eventingClientSet, opt.ResyncPeriod) + messagingInformerFactory := informers.NewSharedInformerFactory(opt.NatssClientSet, opt.ResyncPeriod) // Messaging - natssChannelInformer := eventingInformerFactory.Messaging().V1alpha1().NatssChannels() + natssChannelInformer := messagingInformerFactory.Messaging().V1alpha1().NatssChannels() // Kube serviceInformer := kubeInformerFactory.Core().V1().Services() endpointsInformer := kubeInformerFactory.Core().V1().Endpoints() deploymentInformer := kubeInformerFactory.Apps().V1().Deployments() - // Adding the scheme. - eventingScheme.AddToScheme(scheme.Scheme) - // Build all of our controllers, with the clients constructed above. // Add new controllers to this array. // You also need to modify numControllers above to match this. controllers := [...]*kncontroller.Impl{ natsschannel.NewController( opt, - eventingClientSet, systemNS, dispatcherDeploymentName, dispatcherServiceName, diff --git a/contrib/natss/pkg/reconciler/controller/natsschannel.go b/contrib/natss/pkg/reconciler/controller/natsschannel.go index 46704840497..b58a9d4d717 100644 --- a/contrib/natss/pkg/reconciler/controller/natsschannel.go +++ b/contrib/natss/pkg/reconciler/controller/natsschannel.go @@ -26,12 +26,11 @@ import ( "github.com/knative/pkg/apis" "github.com/knative/eventing/contrib/natss/pkg/apis/messaging/v1alpha1" - clientset "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned" messaginginformers "github.com/knative/eventing/contrib/natss/pkg/client/informers/externalversions/messaging/v1alpha1" listers "github.com/knative/eventing/contrib/natss/pkg/client/listers/messaging/v1alpha1" + "github.com/knative/eventing/contrib/natss/pkg/reconciler" "github.com/knative/eventing/contrib/natss/pkg/reconciler/controller/resources" "github.com/knative/eventing/pkg/logging" - "github.com/knative/eventing/pkg/reconciler" "github.com/knative/pkg/controller" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" @@ -69,7 +68,6 @@ type Reconciler struct { dispatcherDeploymentName string dispatcherServiceName string - eventingClientSet clientset.Interface natsschannelLister listers.NatssChannelLister natsschannelInformer cache.SharedIndexInformer deploymentLister appsv1listers.DeploymentLister @@ -93,7 +91,6 @@ var _ cache.ResourceEventHandler = (*Reconciler)(nil) // Registers event handlers to enqueue events. func NewController( opt reconciler.Options, - eventingClientSet clientset.Interface, dispatcherNamespace string, dispatcherDeploymentName string, dispatcherServiceName string, @@ -108,7 +105,6 @@ func NewController( dispatcherNamespace: dispatcherNamespace, dispatcherDeploymentName: dispatcherDeploymentName, dispatcherServiceName: dispatcherServiceName, - eventingClientSet: eventingClientSet, natsschannelLister: natsschannelInformer.Lister(), natsschannelInformer: natsschannelInformer.Informer(), deploymentLister: deploymentInformer.Lister(), @@ -333,7 +329,7 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.NatssCh existing := kc.DeepCopy() existing.Status = desired.Status - new, err := r.eventingClientSet.MessagingV1alpha1().NatssChannels(desired.Namespace).UpdateStatus(existing) + new, err := r.NatssClientSet.MessagingV1alpha1().NatssChannels(desired.Namespace).UpdateStatus(existing) if err == nil && becomesReady { duration := time.Since(new.ObjectMeta.CreationTimestamp.Time) r.Logger.Infof("NatssChannel %q became ready after %v", kc.Name, duration) diff --git a/contrib/natss/pkg/reconciler/reconciler.go b/contrib/natss/pkg/reconciler/reconciler.go new file mode 100644 index 00000000000..a1986fd66a4 --- /dev/null +++ b/contrib/natss/pkg/reconciler/reconciler.go @@ -0,0 +1,168 @@ +/* +Copyright 2019 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reconciler + +import ( + "time" + + clientset "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned" + natssScheme "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned/scheme" + "github.com/knative/pkg/configmap" + "github.com/knative/pkg/logging/logkey" + "github.com/knative/pkg/system" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" +) + +// Options defines the common reconciler options. +// We define this to reduce the boilerplate argument list when +// creating our controllers. +type Options struct { + KubeClientSet kubernetes.Interface + DynamicClientSet dynamic.Interface + + NatssClientSet clientset.Interface + + Recorder record.EventRecorder + StatsReporter StatsReporter + + ConfigMapWatcher configmap.Watcher + Logger *zap.SugaredLogger + + ResyncPeriod time.Duration + StopChannel <-chan struct{} +} + +// This is mutable for testing. +var resetPeriod = 30 * time.Second + +func NewOptionsOrDie(cfg *rest.Config, logger *zap.SugaredLogger, stopCh <-chan struct{}) Options { + kubeClient := kubernetes.NewForConfigOrDie(cfg) + dynamicClient := dynamic.NewForConfigOrDie(cfg) + + natssClient := clientset.NewForConfigOrDie(cfg) + + configMapWatcher := configmap.NewInformedWatcher(kubeClient, system.Namespace()) + + return Options{ + KubeClientSet: kubeClient, + DynamicClientSet: dynamicClient, + ConfigMapWatcher: configMapWatcher, + NatssClientSet: natssClient, + Logger: logger, + ResyncPeriod: 10 * time.Hour, // Based on controller-runtime default. + StopChannel: stopCh, + } +} + +// GetTrackerLease returns a multiple of the resync period to use as the +// duration for tracker leases. This attempts to ensure that resyncs happen to +// refresh leases frequently enough that we don't miss updates to tracked +// objects. +func (o Options) GetTrackerLease() time.Duration { + return o.ResyncPeriod * 3 +} + +// Base implements the core controller logic, given a Reconciler. +type Base struct { + // KubeClientSet allows us to talk to the k8s for core APIs + KubeClientSet kubernetes.Interface + + // DynamicClientSet allows us to configure pluggable Build objects + DynamicClientSet dynamic.Interface + + NatssClientSet clientset.Interface + + // ConfigMapWatcher allows us to watch for ConfigMap changes. + ConfigMapWatcher configmap.Watcher + + // Recorder is an event recorder for recording Event resources to the + // Kubernetes API. + Recorder record.EventRecorder + + // StatsReporter reports reconciler's metrics. + StatsReporter StatsReporter + + // Sugared logger is easier to use but is not as performant as the + // raw logger. In performance critical paths, call logger.Desugar() + // and use the returned raw logger instead. In addition to the + // performance benefits, raw logger also preserves type-safety at + // the expense of slightly greater verbosity. + Logger *zap.SugaredLogger +} + +// NewBase instantiates a new instance of Base implementing +// the common & boilerplate code between our reconcilers. +func NewBase(opt Options, controllerAgentName string) *Base { + // Enrich the logs with controller name + logger := opt.Logger.Named(controllerAgentName).With(zap.String(logkey.ControllerType, controllerAgentName)) + + recorder := opt.Recorder + if recorder == nil { + // Create event broadcaster + logger.Debug("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + watches := []watch.Interface{ + eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof), + eventBroadcaster.StartRecordingToSink( + &typedcorev1.EventSinkImpl{Interface: opt.KubeClientSet.CoreV1().Events("")}), + } + recorder = eventBroadcaster.NewRecorder( + scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + go func() { + <-opt.StopChannel + for _, w := range watches { + w.Stop() + } + }() + } + + statsReporter := opt.StatsReporter + if statsReporter == nil { + logger.Debug("Creating stats reporter") + var err error + statsReporter, err = NewStatsReporter(controllerAgentName) + if err != nil { + logger.Fatal(err) + } + } + + base := &Base{ + KubeClientSet: opt.KubeClientSet, + DynamicClientSet: opt.DynamicClientSet, + NatssClientSet: opt.NatssClientSet, + ConfigMapWatcher: opt.ConfigMapWatcher, + Recorder: recorder, + StatsReporter: statsReporter, + Logger: logger, + } + + return base +} + +func init() { + // Add run types to the default Kubernetes Scheme so Events can be + // logged for run types. + _ = natssScheme.AddToScheme(scheme.Scheme) +} diff --git a/contrib/natss/pkg/reconciler/stats_reporter.go b/contrib/natss/pkg/reconciler/stats_reporter.go new file mode 100644 index 00000000000..fdce855f7b8 --- /dev/null +++ b/contrib/natss/pkg/reconciler/stats_reporter.go @@ -0,0 +1,164 @@ +/* +Copyright 2019 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reconciler + +import ( + "context" + "fmt" + "time" + + "github.com/knative/pkg/metrics" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +type Measurement int + +const ( + // NatssChannelReadyCountN is the number of natss channels that have become ready. + NatssChannelReadyCountN = "natsschannel_ready_count" + // NatssChannelReadyLatencyN is the time it takes for a natss channel to become ready since the resource is created. + NatssChannelReadyLatencyN = "natsschannel_ready_latency" +) + +var ( + KindToStatKeys = map[string]StatKey{ + "NatssChannel": { + ReadyCountKey: NatssChannelReadyCountN, + ReadyLatencyKey: NatssChannelReadyLatencyN, + }, + } + + KindToMeasurements map[string]Measurements + + reconcilerTagKey tag.Key + keyTagKey tag.Key +) + +type Measurements struct { + ReadyLatencyStat *stats.Int64Measure + ReadyCountStat *stats.Int64Measure +} + +type StatKey struct { + ReadyLatencyKey string + ReadyCountKey string +} + +func init() { + var err error + // Create the tag keys that will be used to add tags to our measurements. + // Tag keys must conform to the restrictions described in + // go.opencensus.io/tag/validate.go. Currently those restrictions are: + // - length between 1 and 255 inclusive + // - characters are printable US-ASCII + reconcilerTagKey = mustNewTagKey("reconciler") + keyTagKey = mustNewTagKey("key") + + KindToMeasurements = make(map[string]Measurements, len(KindToStatKeys)) + + for kind, keys := range KindToStatKeys { + + readyLatencyStat := stats.Int64( + keys.ReadyLatencyKey, + fmt.Sprintf("Time it takes for a %s to become ready since created", kind), + stats.UnitMilliseconds) + + readyCountStat := stats.Int64( + keys.ReadyCountKey, + fmt.Sprintf("Number of %s that became ready", kind), + stats.UnitDimensionless) + + // Save the measurements for later marks. + KindToMeasurements[kind] = Measurements{ + ReadyCountStat: readyCountStat, + ReadyLatencyStat: readyLatencyStat, + } + + // Create views to see our measurements. This can return an error if + // a previously-registered view has the same name with a different value. + // View name defaults to the measure name if unspecified. + err = view.Register( + &view.View{ + Description: readyLatencyStat.Description(), + Measure: readyLatencyStat, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{reconcilerTagKey, keyTagKey}, + }, + &view.View{ + Description: readyCountStat.Description(), + Measure: readyCountStat, + Aggregation: view.Count(), + TagKeys: []tag.Key{reconcilerTagKey, keyTagKey}, + }, + ) + if err != nil { + panic(err) + } + } +} + +// StatsReporter reports reconcilers' metrics. +type StatsReporter interface { + // ReportReady reports the time it took a resource to become Ready. + ReportReady(kind, namespace, service string, d time.Duration) error +} + +type reporter struct { + ctx context.Context +} + +// NewStatsReporter creates a reporter for reconcilers' metrics +func NewStatsReporter(reconciler string) (StatsReporter, error) { + ctx, err := tag.New( + context.Background(), + tag.Insert(reconcilerTagKey, reconciler)) + if err != nil { + return nil, err + } + return &reporter{ctx: ctx}, nil +} + +// ReportServiceReady reports the time it took a service to become Ready +func (r *reporter) ReportReady(kind, namespace, service string, d time.Duration) error { + key := fmt.Sprintf("%s/%s", namespace, service) + v := int64(d / time.Millisecond) + ctx, err := tag.New( + r.ctx, + tag.Insert(keyTagKey, key)) + if err != nil { + return err + } + + m, ok := KindToMeasurements[kind] + if !ok { + return fmt.Errorf("unknown kind attempted to report ready, %q", kind) + } + + metrics.Record(ctx, m.ReadyCountStat.M(1)) + metrics.Record(ctx, m.ReadyLatencyStat.M(v)) + return nil +} + +func mustNewTagKey(s string) tag.Key { + tagKey, err := tag.NewKey(s) + if err != nil { + panic(err) + } + return tagKey +} diff --git a/contrib/natss/pkg/reconciler/testing/factory.go b/contrib/natss/pkg/reconciler/testing/factory.go new file mode 100644 index 00000000000..6bce22cfa1d --- /dev/null +++ b/contrib/natss/pkg/reconciler/testing/factory.go @@ -0,0 +1,96 @@ +/* +Copyright 2018 The Knative Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "context" + "testing" + + "k8s.io/apimachinery/pkg/runtime" + + fakedynamicclientset "k8s.io/client-go/dynamic/fake" + fakekubeclientset "k8s.io/client-go/kubernetes/fake" + clientgotesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/record" + + "github.com/GoogleCloudPlatform/cloud-run-events/pkg/reconciler" + fakeclientset "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned/fake" + "github.com/knative/pkg/controller" + logtesting "github.com/knative/pkg/logging/testing" + + . "github.com/knative/pkg/reconciler/testing" +) + +const ( + // maxEventBufferSize is the estimated max number of event notifications that + // can be buffered during reconciliation. + maxEventBufferSize = 10 +) + +// Ctor functions create a k8s controller with given params. +type Ctor func(*Listers, reconciler.Options) controller.Reconciler + +// MakeFactory creates a reconciler factory with fake clients and controller created by `ctor`. +func MakeFactory(ctor Ctor) Factory { + return func(t *testing.T, r *TableRow) (controller.Reconciler, ActionRecorderList, EventList, *FakeStatsReporter) { + ls := NewListers(r.Objects) + + kubeClient := fakekubeclientset.NewSimpleClientset(ls.GetKubeObjects()...) + client := fakeclientset.NewSimpleClientset(ls.GetEventsObjects()...) + + dynamicScheme := runtime.NewScheme() + for _, addTo := range clientSetSchemes { + addTo(dynamicScheme) + } + + dynamicClient := fakedynamicclientset.NewSimpleDynamicClient(dynamicScheme, ls.GetAllObjects()...) + eventRecorder := record.NewFakeRecorder(maxEventBufferSize) + statsReporter := &FakeStatsReporter{} + + PrependGenerateNameReactor(&client.Fake) + PrependGenerateNameReactor(&dynamicClient.Fake) + + // Set up our Controller from the fakes. + c := ctor(&ls, reconciler.Options{ + KubeClientSet: kubeClient, + DynamicClientSet: dynamicClient, + RunClientSet: client, + Recorder: eventRecorder, + //StatsReporter: statsReporter, + Logger: logtesting.TestLogger(t), + }) + + for _, reactor := range r.WithReactors { + kubeClient.PrependReactor("*", "*", reactor) + client.PrependReactor("*", "*", reactor) + dynamicClient.PrependReactor("*", "*", reactor) + } + + // Validate all Create operations through the eventing client. + client.PrependReactor("create", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + return ValidateCreates(context.Background(), action) + }) + client.PrependReactor("update", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + return ValidateUpdates(context.Background(), action) + }) + + actionRecorderList := ActionRecorderList{dynamicClient, client, kubeClient} + eventList := EventList{Recorder: eventRecorder} + + return c, actionRecorderList, eventList, statsReporter + } +} diff --git a/contrib/natss/pkg/reconciler/testing/listers.go b/contrib/natss/pkg/reconciler/testing/listers.go new file mode 100644 index 00000000000..23354fa2b6f --- /dev/null +++ b/contrib/natss/pkg/reconciler/testing/listers.go @@ -0,0 +1,93 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + messagingv1alpha1 "github.com/knative/eventing/contrib/natss/pkg/apis/messaging/v1alpha1" + fakemessagingclientset "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned/fake" + messaginglisters "github.com/knative/eventing/contrib/natss/pkg/client/listers/messaging/v1alpha1" + fakeeventsclientset "github.com/knative/eventing/pkg/client/clientset/versioned/fake" + fakesharedclientset "github.com/knative/pkg/client/clientset/versioned/fake" + "github.com/knative/pkg/reconciler/testing" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + fakekubeclientset "k8s.io/client-go/kubernetes/fake" + corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" +) + +var clientSetSchemes = []func(*runtime.Scheme) error{ + fakekubeclientset.AddToScheme, + fakesharedclientset.AddToScheme, + fakeeventsclientset.AddToScheme, + fakemessagingclientset.AddToScheme, +} + +type Listers struct { + sorter testing.ObjectSorter +} + +func NewListers(objs []runtime.Object) Listers { + scheme := runtime.NewScheme() + + for _, addTo := range clientSetSchemes { + addTo(scheme) + } + + ls := Listers{ + sorter: testing.NewObjectSorter(scheme), + } + + ls.sorter.AddObjects(objs...) + + return ls +} + +func (l *Listers) indexerFor(obj runtime.Object) cache.Indexer { + return l.sorter.IndexerForObjectType(obj) +} + +func (l *Listers) GetKubeObjects() []runtime.Object { + return l.sorter.ObjectsForSchemeFunc(fakekubeclientset.AddToScheme) +} + +func (l *Listers) GetEventsObjects() []runtime.Object { + return l.sorter.ObjectsForSchemeFunc(fakeeventsclientset.AddToScheme) +} + +func (l *Listers) GetMessagingObjects() []runtime.Object { + return l.sorter.ObjectsForSchemeFunc(fakemessagingclientset.AddToScheme) +} + +func (l *Listers) GetAllObjects() []runtime.Object { + all := l.GetMessagingObjects() + all = append(all, l.GetEventsObjects()...) + all = append(all, l.GetKubeObjects()...) + return all +} + +func (l *Listers) GetSharedObjects() []runtime.Object { + return l.sorter.ObjectsForSchemeFunc(fakesharedclientset.AddToScheme) +} + +func (l *Listers) GetEndpointsLister() corev1listers.EndpointsLister { + return corev1listers.NewEndpointsLister(l.indexerFor(&corev1.Endpoints{})) +} + +func (l *Listers) GetInMemoryChannelLister() messaginglisters.NatssChannelLister { + return messaginglisters.NewNatssChannelLister(l.indexerFor(&messagingv1alpha1.NatssChannel{})) +} diff --git a/contrib/natss/pkg/reconciler/testing/natsschannel.go b/contrib/natss/pkg/reconciler/testing/natsschannel.go new file mode 100644 index 00000000000..b3ff6373fc8 --- /dev/null +++ b/contrib/natss/pkg/reconciler/testing/natsschannel.go @@ -0,0 +1,114 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "context" + "time" + + "github.com/knative/eventing/contrib/natss/pkg/apis/messaging/v1alpha1" + "github.com/knative/pkg/apis" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + // "k8s.io/apimachinery/pkg/types" +) + +// NatssChannelOption enables further configuration of a NatssChannel. +type NatssChannelOption func(*v1alpha1.NatssChannel) + +// NewNatssChannel creates an NatssChannel with NatssChannelOptions. +func NewNatssChannel(name, namespace string, imcopt ...NatssChannelOption) *v1alpha1.NatssChannel { + imc := &v1alpha1.NatssChannel{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: v1alpha1.NatssChannelSpec{}, + } + for _, opt := range imcopt { + opt(imc) + } + imc.SetDefaults(context.Background()) + return imc +} + +func WithInitNatssChannelConditions(imc *v1alpha1.NatssChannel) { + imc.Status.InitializeConditions() +} + +func WithNatssChannelDeleted(imc *v1alpha1.NatssChannel) { + deleteTime := metav1.NewTime(time.Unix(1e9, 0)) + imc.ObjectMeta.SetDeletionTimestamp(&deleteTime) +} + +func WithNatssChannelDeploymentNotReady(reason, message string) NatssChannelOption { + return func(imc *v1alpha1.NatssChannel) { + imc.Status.MarkDispatcherFailed(reason, message) + } +} + +func WithNatssChannelDeploymentReady() NatssChannelOption { + return func(imc *v1alpha1.NatssChannel) { + imc.Status.PropagateDispatcherStatus(&appsv1.DeploymentStatus{Conditions: []appsv1.DeploymentCondition{{Type: appsv1.DeploymentAvailable, Status: corev1.ConditionTrue}}}) + } +} + +func WithNatssChannelServicetNotReady(reason, message string) NatssChannelOption { + return func(imc *v1alpha1.NatssChannel) { + imc.Status.MarkServiceFailed(reason, message) + } +} + +func WithNatssChannelServiceReady() NatssChannelOption { + return func(imc *v1alpha1.NatssChannel) { + imc.Status.MarkServiceTrue() + } +} + +func WithNatssChannelChannelServicetNotReady(reason, message string) NatssChannelOption { + return func(imc *v1alpha1.NatssChannel) { + imc.Status.MarkChannelServiceFailed(reason, message) + } +} + +func WithNatssChannelChannelServiceReady() NatssChannelOption { + return func(imc *v1alpha1.NatssChannel) { + imc.Status.MarkChannelServiceTrue() + } +} + +func WithNatssChannelEndpointsNotReady(reason, message string) NatssChannelOption { + return func(imc *v1alpha1.NatssChannel) { + imc.Status.MarkEndpointsFailed(reason, message) + } +} + +func WithNatssChannelEndpointsReady() NatssChannelOption { + return func(imc *v1alpha1.NatssChannel) { + imc.Status.MarkEndpointsTrue() + } +} + +func WithNatssChannelAddress(a string) NatssChannelOption { + return func(imc *v1alpha1.NatssChannel) { + imc.Status.SetAddress(&apis.URL{ + Scheme: "http", + Host: a, + }) + } +} diff --git a/pkg/reconciler/testing/listers.go b/pkg/reconciler/testing/listers.go index 70a2588997b..73f61e44d4a 100644 --- a/pkg/reconciler/testing/listers.go +++ b/pkg/reconciler/testing/listers.go @@ -51,7 +51,6 @@ var clientSetSchemes = []func(*runtime.Scheme) error{ fakekubeclientset.AddToScheme, fakesharedclientset.AddToScheme, fakeeventingclientset.AddToScheme, - fakeeventingclientset.AddToScheme, fakeapiextensionsclientset.AddToScheme, subscriberAddToScheme, } From 9fe11d443a94e25ed6756a306edd3a4c35c1ccfe Mon Sep 17 00:00:00 2001 From: nachocano Date: Wed, 5 Jun 2019 16:21:17 -0700 Subject: [PATCH 02/10] adding UTs --- .../controller/natsschannel_test.go | 413 ++++++++++++++++++ .../natss/pkg/reconciler/testing/factory.go | 4 +- .../natss/pkg/reconciler/testing/listers.go | 12 +- 3 files changed, 426 insertions(+), 3 deletions(-) create mode 100644 contrib/natss/pkg/reconciler/controller/natsschannel_test.go diff --git a/contrib/natss/pkg/reconciler/controller/natsschannel_test.go b/contrib/natss/pkg/reconciler/controller/natsschannel_test.go new file mode 100644 index 00000000000..5e0bb9aa113 --- /dev/null +++ b/contrib/natss/pkg/reconciler/controller/natsschannel_test.go @@ -0,0 +1,413 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "github.com/knative/eventing/pkg/utils" + "github.com/knative/pkg/kmeta" + "k8s.io/apimachinery/pkg/runtime" + "testing" + + "github.com/knative/eventing/contrib/natss/pkg/apis/messaging/v1alpha1" + fakeclientset "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned/fake" + informers "github.com/knative/eventing/contrib/natss/pkg/client/informers/externalversions" + "github.com/knative/eventing/contrib/natss/pkg/reconciler" + "github.com/knative/eventing/contrib/natss/pkg/reconciler/controller/resources" + reconciletesting "github.com/knative/eventing/contrib/natss/pkg/reconciler/testing" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "github.com/knative/pkg/controller" + logtesting "github.com/knative/pkg/logging/testing" + . "github.com/knative/pkg/reconciler/testing" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeinformers "k8s.io/client-go/informers" + fakekubeclientset "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" + clientgotesting "k8s.io/client-go/testing" +) + +const ( + systemNS = "knative-eventing" + testNS = "test-namespace" + ncName = "test-nc" + dispatcherDeploymentName = "test-deployment" + dispatcherServiceName = "test-service" + channelServiceAddress = "test-nc-kn-channel.test-namespace.svc.cluster.local" +) + +var ( + trueVal = true + // deletionTime is used when objects are marked as deleted. Rfc3339Copy() + // truncates to seconds to match the loss of precision during serialization. + deletionTime = metav1.Now().Rfc3339Copy() +) + +func init() { + // Add types to scheme + _ = v1alpha1.AddToScheme(scheme.Scheme) + _ = duckv1alpha1.AddToScheme(scheme.Scheme) +} + +func TestNewController(t *testing.T) { + kubeClient := fakekubeclientset.NewSimpleClientset() + messagingClient := fakeclientset.NewSimpleClientset() + + // Create informer factories with fake clients. The second parameter sets the + // resync period to zero, disabling it. + kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, 0) + messagingInformerFactory := informers.NewSharedInformerFactory(messagingClient, 0) + + // Eventing + ncInformer := messagingInformerFactory.Messaging().V1alpha1().NatssChannels() + + // Kube + serviceInformer := kubeInformerFactory.Core().V1().Services() + endpointsInformer := kubeInformerFactory.Core().V1().Endpoints() + deploymentInformer := kubeInformerFactory.Apps().V1().Deployments() + + c := NewController( + reconciler.Options{ + KubeClientSet: kubeClient, + NatssClientSet: messagingClient, + Logger: logtesting.TestLogger(t), + }, + systemNS, + dispatcherDeploymentName, + dispatcherServiceName, + ncInformer, + deploymentInformer, + serviceInformer, + endpointsInformer) + + if c == nil { + t.Fatalf("Failed to create with NewController") + } +} + +func TestAllCases(t *testing.T) { + ncKey := testNS + "/" + ncName + table := TableTest{ + { + Name: "bad workqueue key", + // Make sure Reconcile handles bad keys. + Key: "too/many/parts", + }, { + Name: "key not found", + // Make sure Reconcile handles good keys that don't exist. + Key: "foo/not-found", + }, { + Name: "deleting", + Key: ncKey, + Objects: []runtime.Object{ + reconciletesting.NewNatssChannel(ncName, testNS, + reconciletesting.WithInitNatssChannelConditions, + reconciletesting.WithNatssChannelDeleted)}, + WantErr: false, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, channelReconciled, "NatssChannel reconciled"), + }, + }, { + Name: "deployment does not exist", + Key: ncKey, + Objects: []runtime.Object{ + reconciletesting.NewNatssChannel(ncName, testNS), + }, + WantErr: true, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewNatssChannel(ncName, testNS, + reconciletesting.WithInitNatssChannelConditions, + reconciletesting.WithNatssChannelDeploymentNotReady("DispatcherDeploymentDoesNotExist", "Dispatcher Deployment does not exist")), + }}, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, channelReconcileFailed, "NatssChannel reconciliation failed: deployment.apps \"test-deployment\" not found"), + }, + }, { + Name: "Service does not exist", + Key: ncKey, + Objects: []runtime.Object{ + makeReadyDeployment(), + reconciletesting.NewNatssChannel(ncName, testNS), + }, + WantErr: true, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewNatssChannel(ncName, testNS, + reconciletesting.WithInitNatssChannelConditions, + reconciletesting.WithNatssChannelDeploymentReady(), + reconciletesting.WithNatssChannelServicetNotReady("DispatcherServiceDoesNotExist", "Dispatcher Service does not exist")), + }}, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, channelReconcileFailed, "NatssChannel reconciliation failed: service \"test-service\" not found"), + }, + }, { + Name: "Endpoints does not exist", + Key: ncKey, + Objects: []runtime.Object{ + makeReadyDeployment(), + makeService(), + reconciletesting.NewNatssChannel(ncName, testNS), + }, + WantErr: true, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewNatssChannel(ncName, testNS, + reconciletesting.WithInitNatssChannelConditions, + reconciletesting.WithNatssChannelDeploymentReady(), + reconciletesting.WithNatssChannelServiceReady(), + reconciletesting.WithNatssChannelEndpointsNotReady("DispatcherEndpointsDoesNotExist", "Dispatcher Endpoints does not exist"), + ), + }}, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, channelReconcileFailed, "NatssChannel reconciliation failed: endpoints \"test-service\" not found"), + }, + }, { + Name: "Endpoints not ready", + Key: ncKey, + Objects: []runtime.Object{ + makeReadyDeployment(), + makeService(), + makeEmptyEndpoints(), + reconciletesting.NewNatssChannel(ncName, testNS), + }, + WantErr: true, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewNatssChannel(ncName, testNS, + reconciletesting.WithInitNatssChannelConditions, + reconciletesting.WithNatssChannelDeploymentReady(), + reconciletesting.WithNatssChannelServiceReady(), + reconciletesting.WithNatssChannelEndpointsNotReady("DispatcherEndpointsNotReady", "There are no endpoints ready for Dispatcher service"), + ), + }}, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, channelReconcileFailed, "NatssChannel reconciliation failed: there are no endpoints ready for Dispatcher service"), + }, + }, { + Name: "Works, creates new channel", + Key: ncKey, + Objects: []runtime.Object{ + makeReadyDeployment(), + makeService(), + makeReadyEndpoints(), + reconciletesting.NewNatssChannel(ncName, testNS), + }, + WantErr: false, + WantCreates: []runtime.Object{ + makeChannelService(reconciletesting.NewNatssChannel(ncName, testNS)), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewNatssChannel(ncName, testNS, + reconciletesting.WithInitNatssChannelConditions, + reconciletesting.WithNatssChannelDeploymentReady(), + reconciletesting.WithNatssChannelServiceReady(), + reconciletesting.WithNatssChannelEndpointsReady(), + reconciletesting.WithNatssChannelChannelServiceReady(), + reconciletesting.WithNatssChannelAddress(channelServiceAddress), + ), + }}, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, channelReconciled, "NatssChannel reconciled"), + }, + }, { + Name: "Works, channel exists", + Key: ncKey, + Objects: []runtime.Object{ + makeReadyDeployment(), + makeService(), + makeReadyEndpoints(), + reconciletesting.NewNatssChannel(ncName, testNS), + makeChannelService(reconciletesting.NewNatssChannel(ncName, testNS)), + }, + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewNatssChannel(ncName, testNS, + reconciletesting.WithInitNatssChannelConditions, + reconciletesting.WithNatssChannelDeploymentReady(), + reconciletesting.WithNatssChannelServiceReady(), + reconciletesting.WithNatssChannelEndpointsReady(), + reconciletesting.WithNatssChannelChannelServiceReady(), + reconciletesting.WithNatssChannelAddress(channelServiceAddress), + ), + }}, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, channelReconciled, "NatssChannel reconciled"), + }, + }, { + Name: "channel exists, not owned by us", + Key: ncKey, + Objects: []runtime.Object{ + makeReadyDeployment(), + makeService(), + makeReadyEndpoints(), + reconciletesting.NewNatssChannel(ncName, testNS), + makeChannelServiceNotOwnedByUs(reconciletesting.NewNatssChannel(ncName, testNS)), + }, + WantErr: true, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewNatssChannel(ncName, testNS, + reconciletesting.WithInitNatssChannelConditions, + reconciletesting.WithNatssChannelDeploymentReady(), + reconciletesting.WithNatssChannelServiceReady(), + reconciletesting.WithNatssChannelEndpointsReady(), + reconciletesting.WithNatssChannelChannelServicetNotReady("ChannelServiceFailed", "Channel Service failed: NatssChannel: test-namespace/test-nc does not own Service: \"test-nc-kn-channel\""), + ), + }}, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, channelReconcileFailed, "NatssChannel reconciliation failed: NatssChannel: test-namespace/test-nc does not own Service: \"test-nc-kn-channel\""), + }, + }, { + Name: "channel does not exist, fails to create", + Key: ncKey, + Objects: []runtime.Object{ + makeReadyDeployment(), + makeService(), + makeReadyEndpoints(), + reconciletesting.NewNatssChannel(ncName, testNS), + }, + WantErr: true, + WithReactors: []clientgotesting.ReactionFunc{ + InduceFailure("create", "Services"), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewNatssChannel(ncName, testNS, + reconciletesting.WithInitNatssChannelConditions, + reconciletesting.WithNatssChannelDeploymentReady(), + reconciletesting.WithNatssChannelServiceReady(), + reconciletesting.WithNatssChannelEndpointsReady(), + reconciletesting.WithNatssChannelChannelServicetNotReady("ChannelServiceFailed", "Channel Service failed: inducing failure for create services"), + ), + }}, + WantCreates: []runtime.Object{ + makeChannelService(reconciletesting.NewNatssChannel(ncName, testNS)), + }, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, channelReconcileFailed, "NatssChannel reconciliation failed: inducing failure for create services"), + }, + }, {}, + } + defer logtesting.ClearAll() + + table.Test(t, reconciletesting.MakeFactory(func(listers *reconciletesting.Listers, opt reconciler.Options) controller.Reconciler { + return &Reconciler{ + Base: reconciler.NewBase(opt, controllerAgentName), + dispatcherNamespace: testNS, + dispatcherDeploymentName: dispatcherDeploymentName, + dispatcherServiceName: dispatcherServiceName, + natsschannelLister: listers.GetNatssChannelLister(), + // TODO fix + natsschannelInformer: nil, + deploymentLister: listers.GetDeploymentLister(), + serviceLister: listers.GetServiceLister(), + endpointsLister: listers.GetEndpointsLister(), + } + }, + )) +} + +func makeDeployment() *appsv1.Deployment { + return &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNS, + Name: dispatcherDeploymentName, + }, + Status: appsv1.DeploymentStatus{}, + } +} + +func makeReadyDeployment() *appsv1.Deployment { + d := makeDeployment() + d.Status.Conditions = []appsv1.DeploymentCondition{{Type: appsv1.DeploymentAvailable, Status: corev1.ConditionTrue}} + return d +} + +func makeService() *corev1.Service { + return &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNS, + Name: dispatcherServiceName, + }, + } +} + +func makeChannelService(nc *v1alpha1.NatssChannel) *corev1.Service { + return &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNS, + Name: fmt.Sprintf("%s-kn-channel", ncName), + Labels: map[string]string{ + resources.MessagingRoleLabel: resources.MessagingRole, + }, + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(nc), + }, + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeExternalName, + ExternalName: fmt.Sprintf("%s.%s.svc.%s", dispatcherServiceName, testNS, utils.GetClusterDomainName()), + }, + } +} + +func makeChannelServiceNotOwnedByUs(nc *v1alpha1.NatssChannel) *corev1.Service { + return &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNS, + Name: fmt.Sprintf("%s-kn-channel", ncName), + Labels: map[string]string{ + resources.MessagingRoleLabel: resources.MessagingRole, + }, + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeExternalName, + ExternalName: fmt.Sprintf("%s.%s.svc.%s", dispatcherServiceName, testNS, utils.GetClusterDomainName()), + }, + } +} + +func makeEmptyEndpoints() *corev1.Endpoints { + return &corev1.Endpoints{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Endpoints", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNS, + Name: dispatcherServiceName, + }, + } +} + +func makeReadyEndpoints() *corev1.Endpoints { + e := makeEmptyEndpoints() + e.Subsets = []corev1.EndpointSubset{{Addresses: []corev1.EndpointAddress{{IP: "1.1.1.1"}}}} + return e +} diff --git a/contrib/natss/pkg/reconciler/testing/factory.go b/contrib/natss/pkg/reconciler/testing/factory.go index 6bce22cfa1d..5bf687a0de3 100644 --- a/contrib/natss/pkg/reconciler/testing/factory.go +++ b/contrib/natss/pkg/reconciler/testing/factory.go @@ -27,8 +27,8 @@ import ( clientgotesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" - "github.com/GoogleCloudPlatform/cloud-run-events/pkg/reconciler" fakeclientset "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned/fake" + "github.com/knative/eventing/contrib/natss/pkg/reconciler" "github.com/knative/pkg/controller" logtesting "github.com/knative/pkg/logging/testing" @@ -68,7 +68,7 @@ func MakeFactory(ctor Ctor) Factory { c := ctor(&ls, reconciler.Options{ KubeClientSet: kubeClient, DynamicClientSet: dynamicClient, - RunClientSet: client, + NatssClientSet: client, Recorder: eventRecorder, //StatsReporter: statsReporter, Logger: logtesting.TestLogger(t), diff --git a/contrib/natss/pkg/reconciler/testing/listers.go b/contrib/natss/pkg/reconciler/testing/listers.go index 23354fa2b6f..950dcdbdab5 100644 --- a/contrib/natss/pkg/reconciler/testing/listers.go +++ b/contrib/natss/pkg/reconciler/testing/listers.go @@ -23,9 +23,11 @@ import ( fakeeventsclientset "github.com/knative/eventing/pkg/client/clientset/versioned/fake" fakesharedclientset "github.com/knative/pkg/client/clientset/versioned/fake" "github.com/knative/pkg/reconciler/testing" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" fakekubeclientset "k8s.io/client-go/kubernetes/fake" + appsv1listers "k8s.io/client-go/listers/apps/v1" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" ) @@ -84,10 +86,18 @@ func (l *Listers) GetSharedObjects() []runtime.Object { return l.sorter.ObjectsForSchemeFunc(fakesharedclientset.AddToScheme) } +func (l *Listers) GetServiceLister() corev1listers.ServiceLister { + return corev1listers.NewServiceLister(l.indexerFor(&corev1.Service{})) +} + func (l *Listers) GetEndpointsLister() corev1listers.EndpointsLister { return corev1listers.NewEndpointsLister(l.indexerFor(&corev1.Endpoints{})) } -func (l *Listers) GetInMemoryChannelLister() messaginglisters.NatssChannelLister { +func (l *Listers) GetNatssChannelLister() messaginglisters.NatssChannelLister { return messaginglisters.NewNatssChannelLister(l.indexerFor(&messagingv1alpha1.NatssChannel{})) } + +func (l *Listers) GetDeploymentLister() appsv1listers.DeploymentLister { + return appsv1listers.NewDeploymentLister(l.indexerFor(&appsv1.Deployment{})) +} From 910549834c06e512229f2d7bc5a2508decdaf490 Mon Sep 17 00:00:00 2001 From: nachocano Date: Wed, 5 Jun 2019 16:51:38 -0700 Subject: [PATCH 03/10] cosmetics --- .../controller/natsschannel_test.go | 39 ++++++------- .../pkg/reconciler/testing/natsschannel.go | 56 +++++++++---------- 2 files changed, 48 insertions(+), 47 deletions(-) diff --git a/contrib/natss/pkg/reconciler/controller/natsschannel_test.go b/contrib/natss/pkg/reconciler/controller/natsschannel_test.go index 5e0bb9aa113..1951ac2d31e 100644 --- a/contrib/natss/pkg/reconciler/controller/natsschannel_test.go +++ b/contrib/natss/pkg/reconciler/controller/natsschannel_test.go @@ -103,26 +103,27 @@ func TestNewController(t *testing.T) { func TestAllCases(t *testing.T) { ncKey := testNS + "/" + ncName table := TableTest{ + //{ + // Name: "bad workqueue key", + // // Make sure Reconcile handles bad keys. + // Key: "too/many/parts", + //}, { + // Name: "key not found", + // // Make sure Reconcile handles good keys that don't exist. + // Key: "foo/not-found", + //}, { + // Name: "deleting", + // Key: ncKey, + // Objects: []runtime.Object{ + // reconciletesting.NewNatssChannel(ncName, testNS, + // reconciletesting.WithInitNatssChannelConditions, + // reconciletesting.WithNatssChannelDeleted)}, + // WantErr: false, + // WantEvents: []string{ + // Eventf(corev1.EventTypeNormal, channelReconciled, "NatssChannel reconciled"), + // }, + //}, { { - Name: "bad workqueue key", - // Make sure Reconcile handles bad keys. - Key: "too/many/parts", - }, { - Name: "key not found", - // Make sure Reconcile handles good keys that don't exist. - Key: "foo/not-found", - }, { - Name: "deleting", - Key: ncKey, - Objects: []runtime.Object{ - reconciletesting.NewNatssChannel(ncName, testNS, - reconciletesting.WithInitNatssChannelConditions, - reconciletesting.WithNatssChannelDeleted)}, - WantErr: false, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, channelReconciled, "NatssChannel reconciled"), - }, - }, { Name: "deployment does not exist", Key: ncKey, Objects: []runtime.Object{ diff --git a/contrib/natss/pkg/reconciler/testing/natsschannel.go b/contrib/natss/pkg/reconciler/testing/natsschannel.go index b3ff6373fc8..4936cca12f3 100644 --- a/contrib/natss/pkg/reconciler/testing/natsschannel.go +++ b/contrib/natss/pkg/reconciler/testing/natsschannel.go @@ -32,81 +32,81 @@ import ( type NatssChannelOption func(*v1alpha1.NatssChannel) // NewNatssChannel creates an NatssChannel with NatssChannelOptions. -func NewNatssChannel(name, namespace string, imcopt ...NatssChannelOption) *v1alpha1.NatssChannel { - imc := &v1alpha1.NatssChannel{ +func NewNatssChannel(name, namespace string, ncopt ...NatssChannelOption) *v1alpha1.NatssChannel { + nc := &v1alpha1.NatssChannel{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, }, Spec: v1alpha1.NatssChannelSpec{}, } - for _, opt := range imcopt { - opt(imc) + for _, opt := range ncopt { + opt(nc) } - imc.SetDefaults(context.Background()) - return imc + nc.SetDefaults(context.Background()) + return nc } -func WithInitNatssChannelConditions(imc *v1alpha1.NatssChannel) { - imc.Status.InitializeConditions() +func WithInitNatssChannelConditions(nc *v1alpha1.NatssChannel) { + nc.Status.InitializeConditions() } -func WithNatssChannelDeleted(imc *v1alpha1.NatssChannel) { +func WithNatssChannelDeleted(nc *v1alpha1.NatssChannel) { deleteTime := metav1.NewTime(time.Unix(1e9, 0)) - imc.ObjectMeta.SetDeletionTimestamp(&deleteTime) + nc.ObjectMeta.SetDeletionTimestamp(&deleteTime) } func WithNatssChannelDeploymentNotReady(reason, message string) NatssChannelOption { - return func(imc *v1alpha1.NatssChannel) { - imc.Status.MarkDispatcherFailed(reason, message) + return func(nc *v1alpha1.NatssChannel) { + nc.Status.MarkDispatcherFailed(reason, message) } } func WithNatssChannelDeploymentReady() NatssChannelOption { - return func(imc *v1alpha1.NatssChannel) { - imc.Status.PropagateDispatcherStatus(&appsv1.DeploymentStatus{Conditions: []appsv1.DeploymentCondition{{Type: appsv1.DeploymentAvailable, Status: corev1.ConditionTrue}}}) + return func(nc *v1alpha1.NatssChannel) { + nc.Status.PropagateDispatcherStatus(&appsv1.DeploymentStatus{Conditions: []appsv1.DeploymentCondition{{Type: appsv1.DeploymentAvailable, Status: corev1.ConditionTrue}}}) } } func WithNatssChannelServicetNotReady(reason, message string) NatssChannelOption { - return func(imc *v1alpha1.NatssChannel) { - imc.Status.MarkServiceFailed(reason, message) + return func(nc *v1alpha1.NatssChannel) { + nc.Status.MarkServiceFailed(reason, message) } } func WithNatssChannelServiceReady() NatssChannelOption { - return func(imc *v1alpha1.NatssChannel) { - imc.Status.MarkServiceTrue() + return func(nc *v1alpha1.NatssChannel) { + nc.Status.MarkServiceTrue() } } func WithNatssChannelChannelServicetNotReady(reason, message string) NatssChannelOption { - return func(imc *v1alpha1.NatssChannel) { - imc.Status.MarkChannelServiceFailed(reason, message) + return func(nc *v1alpha1.NatssChannel) { + nc.Status.MarkChannelServiceFailed(reason, message) } } func WithNatssChannelChannelServiceReady() NatssChannelOption { - return func(imc *v1alpha1.NatssChannel) { - imc.Status.MarkChannelServiceTrue() + return func(nc *v1alpha1.NatssChannel) { + nc.Status.MarkChannelServiceTrue() } } func WithNatssChannelEndpointsNotReady(reason, message string) NatssChannelOption { - return func(imc *v1alpha1.NatssChannel) { - imc.Status.MarkEndpointsFailed(reason, message) + return func(nc *v1alpha1.NatssChannel) { + nc.Status.MarkEndpointsFailed(reason, message) } } func WithNatssChannelEndpointsReady() NatssChannelOption { - return func(imc *v1alpha1.NatssChannel) { - imc.Status.MarkEndpointsTrue() + return func(nc *v1alpha1.NatssChannel) { + nc.Status.MarkEndpointsTrue() } } func WithNatssChannelAddress(a string) NatssChannelOption { - return func(imc *v1alpha1.NatssChannel) { - imc.Status.SetAddress(&apis.URL{ + return func(nc *v1alpha1.NatssChannel) { + nc.Status.SetAddress(&apis.URL{ Scheme: "http", Host: a, }) From 69bfae57edfdd0b100c3e40b161569fbfd502a79 Mon Sep 17 00:00:00 2001 From: nachocano Date: Thu, 6 Jun 2019 10:11:19 -0700 Subject: [PATCH 04/10] fixing UTs --- .../controller/natsschannel_test.go | 47 +++++++++---------- .../natss/pkg/reconciler/testing/factory.go | 2 +- 2 files changed, 24 insertions(+), 25 deletions(-) diff --git a/contrib/natss/pkg/reconciler/controller/natsschannel_test.go b/contrib/natss/pkg/reconciler/controller/natsschannel_test.go index 1951ac2d31e..699678d438e 100644 --- a/contrib/natss/pkg/reconciler/controller/natsschannel_test.go +++ b/contrib/natss/pkg/reconciler/controller/natsschannel_test.go @@ -103,27 +103,26 @@ func TestNewController(t *testing.T) { func TestAllCases(t *testing.T) { ncKey := testNS + "/" + ncName table := TableTest{ - //{ - // Name: "bad workqueue key", - // // Make sure Reconcile handles bad keys. - // Key: "too/many/parts", - //}, { - // Name: "key not found", - // // Make sure Reconcile handles good keys that don't exist. - // Key: "foo/not-found", - //}, { - // Name: "deleting", - // Key: ncKey, - // Objects: []runtime.Object{ - // reconciletesting.NewNatssChannel(ncName, testNS, - // reconciletesting.WithInitNatssChannelConditions, - // reconciletesting.WithNatssChannelDeleted)}, - // WantErr: false, - // WantEvents: []string{ - // Eventf(corev1.EventTypeNormal, channelReconciled, "NatssChannel reconciled"), - // }, - //}, { { + Name: "bad workqueue key", + // Make sure Reconcile handles bad keys. + Key: "too/many/parts", + }, { + Name: "key not found", + // Make sure Reconcile handles good keys that don't exist. + Key: "foo/not-found", + }, { + Name: "deleting", + Key: ncKey, + Objects: []runtime.Object{ + reconciletesting.NewNatssChannel(ncName, testNS, + reconciletesting.WithInitNatssChannelConditions, + reconciletesting.WithNatssChannelDeleted)}, + WantErr: false, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, channelReconciled, "NatssChannel reconciled"), + }, + }, { Name: "deployment does not exist", Key: ncKey, Objects: []runtime.Object{ @@ -194,7 +193,7 @@ func TestAllCases(t *testing.T) { ), }}, WantEvents: []string{ - Eventf(corev1.EventTypeWarning, channelReconcileFailed, "NatssChannel reconciliation failed: there are no endpoints ready for Dispatcher service"), + Eventf(corev1.EventTypeWarning, channelReconcileFailed, "NatssChannel reconciliation failed: there are no endpoints ready for Dispatcher service test-service"), }, }, { Name: "Works, creates new channel", @@ -263,11 +262,11 @@ func TestAllCases(t *testing.T) { reconciletesting.WithNatssChannelDeploymentReady(), reconciletesting.WithNatssChannelServiceReady(), reconciletesting.WithNatssChannelEndpointsReady(), - reconciletesting.WithNatssChannelChannelServicetNotReady("ChannelServiceFailed", "Channel Service failed: NatssChannel: test-namespace/test-nc does not own Service: \"test-nc-kn-channel\""), + reconciletesting.WithNatssChannelChannelServicetNotReady("ChannelServiceFailed", "Channel Service failed: natsschannel: test-namespace/test-nc does not own Service: \"test-nc-kn-channel\""), ), }}, WantEvents: []string{ - Eventf(corev1.EventTypeWarning, channelReconcileFailed, "NatssChannel reconciliation failed: NatssChannel: test-namespace/test-nc does not own Service: \"test-nc-kn-channel\""), + Eventf(corev1.EventTypeWarning, channelReconcileFailed, "NatssChannel reconciliation failed: natsschannel: test-namespace/test-nc does not own Service: \"test-nc-kn-channel\""), }, }, { Name: "channel does not exist, fails to create", @@ -297,7 +296,7 @@ func TestAllCases(t *testing.T) { WantEvents: []string{ Eventf(corev1.EventTypeWarning, channelReconcileFailed, "NatssChannel reconciliation failed: inducing failure for create services"), }, - }, {}, + }, } defer logtesting.ClearAll() diff --git a/contrib/natss/pkg/reconciler/testing/factory.go b/contrib/natss/pkg/reconciler/testing/factory.go index 5bf687a0de3..14d2162c7cb 100644 --- a/contrib/natss/pkg/reconciler/testing/factory.go +++ b/contrib/natss/pkg/reconciler/testing/factory.go @@ -50,7 +50,7 @@ func MakeFactory(ctor Ctor) Factory { ls := NewListers(r.Objects) kubeClient := fakekubeclientset.NewSimpleClientset(ls.GetKubeObjects()...) - client := fakeclientset.NewSimpleClientset(ls.GetEventsObjects()...) + client := fakeclientset.NewSimpleClientset(ls.GetMessagingObjects()...) dynamicScheme := runtime.NewScheme() for _, addTo := range clientSetSchemes { From 08337fd83e05565c8707e7e7a26249a5963cf016 Mon Sep 17 00:00:00 2001 From: nachocano Date: Thu, 6 Jun 2019 10:27:34 -0700 Subject: [PATCH 05/10] sockpuppet --- contrib/natss/pkg/reconciler/testing/factory.go | 2 +- contrib/natss/pkg/reconciler/testing/listers.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/contrib/natss/pkg/reconciler/testing/factory.go b/contrib/natss/pkg/reconciler/testing/factory.go index 14d2162c7cb..a6b40280db6 100644 --- a/contrib/natss/pkg/reconciler/testing/factory.go +++ b/contrib/natss/pkg/reconciler/testing/factory.go @@ -1,5 +1,5 @@ /* -Copyright 2018 The Knative Authors. +Copyright 2019 The Knative Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/contrib/natss/pkg/reconciler/testing/listers.go b/contrib/natss/pkg/reconciler/testing/listers.go index 950dcdbdab5..176aedbb1c9 100644 --- a/contrib/natss/pkg/reconciler/testing/listers.go +++ b/contrib/natss/pkg/reconciler/testing/listers.go @@ -1,5 +1,5 @@ /* -Copyright 2018 The Knative Authors +Copyright 2019 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From 313fafddedd55dffb314de0054f08be5c1af0139 Mon Sep 17 00:00:00 2001 From: nachocano Date: Thu, 6 Jun 2019 10:34:06 -0700 Subject: [PATCH 06/10] update to use the new client --- contrib/natss/cmd/channel_dispatcher/main.go | 15 ++++----------- .../pkg/reconciler/dispatcher/natsschannel.go | 10 +++------- 2 files changed, 7 insertions(+), 18 deletions(-) diff --git a/contrib/natss/cmd/channel_dispatcher/main.go b/contrib/natss/cmd/channel_dispatcher/main.go index 9866405979a..64def2e63c9 100644 --- a/contrib/natss/cmd/channel_dispatcher/main.go +++ b/contrib/natss/cmd/channel_dispatcher/main.go @@ -23,18 +23,16 @@ import ( "github.com/knative/eventing/contrib/natss/pkg/util" clientset "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned" - eventingScheme "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned/scheme" informers "github.com/knative/eventing/contrib/natss/pkg/client/informers/externalversions" "github.com/knative/eventing/contrib/natss/pkg/dispatcher" + "github.com/knative/eventing/contrib/natss/pkg/reconciler" natsschannel "github.com/knative/eventing/contrib/natss/pkg/reconciler/dispatcher" "github.com/knative/eventing/pkg/logconfig" - "github.com/knative/eventing/pkg/reconciler" "github.com/knative/pkg/configmap" kncontroller "github.com/knative/pkg/controller" "github.com/knative/pkg/logging" "github.com/knative/pkg/signals" "go.uber.org/zap" - "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) @@ -74,15 +72,11 @@ func main() { cfg.QPS = numControllers * rest.DefaultQPS cfg.Burst = numControllers * rest.DefaultBurst opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh) - // Setting up our own eventingClientSet as we need the messaging API introduced with natss. - eventingClientSet := clientset.NewForConfigOrDie(cfg) - eventingInformerFactory := informers.NewSharedInformerFactory(eventingClientSet, opt.ResyncPeriod) + messagingClientSet := clientset.NewForConfigOrDie(cfg) + messagingInformerFactory := informers.NewSharedInformerFactory(messagingClientSet, opt.ResyncPeriod) // Messaging - natssChannelInformer := eventingInformerFactory.Messaging().V1alpha1().NatssChannels() - - // Adding the scheme. - eventingScheme.AddToScheme(scheme.Scheme) + natssChannelInformer := messagingInformerFactory.Messaging().V1alpha1().NatssChannels() // Build all of our controllers, with the clients constructed above. // Add new controllers to this array. @@ -90,7 +84,6 @@ func main() { controllers := [...]*kncontroller.Impl{ natsschannel.NewController( opt, - eventingClientSet, natssDispatcher, natssChannelInformer, ), diff --git a/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go b/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go index 15cfe801fa4..9a938e5b597 100644 --- a/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go +++ b/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go @@ -26,14 +26,13 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "github.com/knative/eventing/contrib/natss/pkg/apis/messaging/v1alpha1" - clientset "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned" messaginginformers "github.com/knative/eventing/contrib/natss/pkg/client/informers/externalversions/messaging/v1alpha1" listers "github.com/knative/eventing/contrib/natss/pkg/client/listers/messaging/v1alpha1" "github.com/knative/eventing/contrib/natss/pkg/dispatcher" + "github.com/knative/eventing/contrib/natss/pkg/reconciler" "github.com/knative/eventing/pkg/logging" "github.com/knative/eventing/pkg/provisioners/fanout" "github.com/knative/eventing/pkg/provisioners/multichannelfanout" - "github.com/knative/eventing/pkg/reconciler" "github.com/knative/pkg/controller" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" @@ -58,7 +57,6 @@ type Reconciler struct { natssDispatcher *dispatcher.SubscriptionsSupervisor - eventingClientSet clientset.Interface natsschannelLister listers.NatssChannelLister natsschannelInformer cache.SharedIndexInformer impl *controller.Impl @@ -71,14 +69,12 @@ var _ controller.Reconciler = (*Reconciler)(nil) // Registers event handlers to enqueue events. func NewController( opt reconciler.Options, - eventingClientSet clientset.Interface, natssDispatcher *dispatcher.SubscriptionsSupervisor, natsschannelInformer messaginginformers.NatssChannelInformer, ) *controller.Impl { r := &Reconciler{ Base: reconciler.NewBase(opt, controllerAgentName), - eventingClientSet: eventingClientSet, natssDispatcher: natssDispatcher, natsschannelLister: natsschannelInformer.Lister(), natsschannelInformer: natsschannelInformer.Informer(), @@ -124,7 +120,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error { return err } removeFinalizer(natssChannel) - _, err := r.eventingClientSet.MessagingV1alpha1().NatssChannels(natssChannel.Namespace).Update(natssChannel) + _, err := r.NatssClientSet.MessagingV1alpha1().NatssChannels(natssChannel.Namespace).Update(natssChannel) return err } @@ -201,7 +197,7 @@ func (r *Reconciler) ensureFinalizer(channel *v1alpha1.NatssChannel) error { return err } - _, err = r.eventingClientSet.MessagingV1alpha1().NatssChannels(channel.Namespace).Patch(channel.Name, types.MergePatchType, patch) + _, err = r.NatssClientSet.MessagingV1alpha1().NatssChannels(channel.Namespace).Patch(channel.Name, types.MergePatchType, patch) return err } From 54b164b33a407ab08d03c2b3a83851dc23e747f3 Mon Sep 17 00:00:00 2001 From: nachocano Date: Thu, 6 Jun 2019 14:54:17 -0700 Subject: [PATCH 07/10] reporting natss channel --- contrib/natss/pkg/reconciler/controller/natsschannel.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/natss/pkg/reconciler/controller/natsschannel.go b/contrib/natss/pkg/reconciler/controller/natsschannel.go index b58a9d4d717..083e99b2a00 100644 --- a/contrib/natss/pkg/reconciler/controller/natsschannel.go +++ b/contrib/natss/pkg/reconciler/controller/natsschannel.go @@ -333,7 +333,7 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.NatssCh if err == nil && becomesReady { duration := time.Since(new.ObjectMeta.CreationTimestamp.Time) r.Logger.Infof("NatssChannel %q became ready after %v", kc.Name, duration) - if err := r.StatsReporter.ReportReady("Channel", kc.Namespace, kc.Name, duration); err != nil { + if err := r.StatsReporter.ReportReady("NatssChannel", kc.Namespace, kc.Name, duration); err != nil { r.Logger.Infof("Failed to record ready for NatssChannel %q: %v", kc.Name, err) } } From 9a0119c59a883d044d997eea9f9a460f61b435d5 Mon Sep 17 00:00:00 2001 From: nachocano Date: Thu, 6 Jun 2019 16:38:41 -0700 Subject: [PATCH 08/10] adding zipkin tracing --- contrib/natss/cmd/channel_dispatcher/main.go | 7 +++++++ contrib/natss/pkg/reconciler/reconciler.go | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/contrib/natss/cmd/channel_dispatcher/main.go b/contrib/natss/cmd/channel_dispatcher/main.go index 64def2e63c9..5da4076146c 100644 --- a/contrib/natss/cmd/channel_dispatcher/main.go +++ b/contrib/natss/cmd/channel_dispatcher/main.go @@ -18,6 +18,7 @@ package main import ( "flag" + "github.com/knative/eventing/pkg/tracing" "log" "github.com/knative/eventing/contrib/natss/pkg/util" @@ -99,6 +100,12 @@ func main() { opt.ConfigMapWatcher.Watch(logconfig.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, logconfig.Controller)) // TODO: Watch the observability config map and dynamically update metrics exporter. //opt.ConfigMapWatcher.Watch(metrics.ObservabilityConfigName, metrics.UpdateExporterFromConfigMap(component, logger)) + + // Setup zipkin tracing. + if err = tracing.SetupDynamicZipkinPublishing(logger, opt.ConfigMapWatcher, "natss-ch-dispatcher"); err != nil { + logger.Fatalw("Error setting up Zipkin publishing", zap.Error(err)) + } + if err := opt.ConfigMapWatcher.Start(stopCh); err != nil { logger.Fatalw("failed to start configuration manager", zap.Error(err)) } diff --git a/contrib/natss/pkg/reconciler/reconciler.go b/contrib/natss/pkg/reconciler/reconciler.go index a1986fd66a4..18290919219 100644 --- a/contrib/natss/pkg/reconciler/reconciler.go +++ b/contrib/natss/pkg/reconciler/reconciler.go @@ -47,7 +47,7 @@ type Options struct { Recorder record.EventRecorder StatsReporter StatsReporter - ConfigMapWatcher configmap.Watcher + ConfigMapWatcher *configmap.InformedWatcher Logger *zap.SugaredLogger ResyncPeriod time.Duration From a626a99fcddeca6422cc9757853a290b3f44b9af Mon Sep 17 00:00:00 2001 From: Nacho Cano Date: Thu, 6 Jun 2019 21:29:40 -0700 Subject: [PATCH 09/10] copy/paste error --- contrib/natss/pkg/reconciler/reconciler.go | 2 +- contrib/natss/pkg/reconciler/stats_reporter.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/contrib/natss/pkg/reconciler/reconciler.go b/contrib/natss/pkg/reconciler/reconciler.go index 18290919219..d6699f3773a 100644 --- a/contrib/natss/pkg/reconciler/reconciler.go +++ b/contrib/natss/pkg/reconciler/reconciler.go @@ -1,5 +1,5 @@ /* -Copyright 2019 Google LLC +Copyright 2019 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/contrib/natss/pkg/reconciler/stats_reporter.go b/contrib/natss/pkg/reconciler/stats_reporter.go index fdce855f7b8..a1845fba024 100644 --- a/contrib/natss/pkg/reconciler/stats_reporter.go +++ b/contrib/natss/pkg/reconciler/stats_reporter.go @@ -1,5 +1,5 @@ /* -Copyright 2019 Google LLC +Copyright 2019 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From 90b49f1bae2967eb809d494bc2330d483712821b Mon Sep 17 00:00:00 2001 From: Nacho Cano Date: Thu, 6 Jun 2019 21:35:03 -0700 Subject: [PATCH 10/10] changes --- .../reconciler/controller/natsschannel_test.go | 18 +++++++++--------- .../pkg/reconciler/testing/natsschannel.go | 3 +-- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/contrib/natss/pkg/reconciler/controller/natsschannel_test.go b/contrib/natss/pkg/reconciler/controller/natsschannel_test.go index 699678d438e..8c8509e88c6 100644 --- a/contrib/natss/pkg/reconciler/controller/natsschannel_test.go +++ b/contrib/natss/pkg/reconciler/controller/natsschannel_test.go @@ -116,7 +116,7 @@ func TestAllCases(t *testing.T) { Key: ncKey, Objects: []runtime.Object{ reconciletesting.NewNatssChannel(ncName, testNS, - reconciletesting.WithInitNatssChannelConditions, + reconciletesting.WithNatssInitChannelConditions, reconciletesting.WithNatssChannelDeleted)}, WantErr: false, WantEvents: []string{ @@ -131,7 +131,7 @@ func TestAllCases(t *testing.T) { WantErr: true, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconciletesting.NewNatssChannel(ncName, testNS, - reconciletesting.WithInitNatssChannelConditions, + reconciletesting.WithNatssInitChannelConditions, reconciletesting.WithNatssChannelDeploymentNotReady("DispatcherDeploymentDoesNotExist", "Dispatcher Deployment does not exist")), }}, WantEvents: []string{ @@ -147,7 +147,7 @@ func TestAllCases(t *testing.T) { WantErr: true, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconciletesting.NewNatssChannel(ncName, testNS, - reconciletesting.WithInitNatssChannelConditions, + reconciletesting.WithNatssInitChannelConditions, reconciletesting.WithNatssChannelDeploymentReady(), reconciletesting.WithNatssChannelServicetNotReady("DispatcherServiceDoesNotExist", "Dispatcher Service does not exist")), }}, @@ -165,7 +165,7 @@ func TestAllCases(t *testing.T) { WantErr: true, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconciletesting.NewNatssChannel(ncName, testNS, - reconciletesting.WithInitNatssChannelConditions, + reconciletesting.WithNatssInitChannelConditions, reconciletesting.WithNatssChannelDeploymentReady(), reconciletesting.WithNatssChannelServiceReady(), reconciletesting.WithNatssChannelEndpointsNotReady("DispatcherEndpointsDoesNotExist", "Dispatcher Endpoints does not exist"), @@ -186,7 +186,7 @@ func TestAllCases(t *testing.T) { WantErr: true, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconciletesting.NewNatssChannel(ncName, testNS, - reconciletesting.WithInitNatssChannelConditions, + reconciletesting.WithNatssInitChannelConditions, reconciletesting.WithNatssChannelDeploymentReady(), reconciletesting.WithNatssChannelServiceReady(), reconciletesting.WithNatssChannelEndpointsNotReady("DispatcherEndpointsNotReady", "There are no endpoints ready for Dispatcher service"), @@ -210,7 +210,7 @@ func TestAllCases(t *testing.T) { }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconciletesting.NewNatssChannel(ncName, testNS, - reconciletesting.WithInitNatssChannelConditions, + reconciletesting.WithNatssInitChannelConditions, reconciletesting.WithNatssChannelDeploymentReady(), reconciletesting.WithNatssChannelServiceReady(), reconciletesting.WithNatssChannelEndpointsReady(), @@ -234,7 +234,7 @@ func TestAllCases(t *testing.T) { WantErr: false, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconciletesting.NewNatssChannel(ncName, testNS, - reconciletesting.WithInitNatssChannelConditions, + reconciletesting.WithNatssInitChannelConditions, reconciletesting.WithNatssChannelDeploymentReady(), reconciletesting.WithNatssChannelServiceReady(), reconciletesting.WithNatssChannelEndpointsReady(), @@ -258,7 +258,7 @@ func TestAllCases(t *testing.T) { WantErr: true, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconciletesting.NewNatssChannel(ncName, testNS, - reconciletesting.WithInitNatssChannelConditions, + reconciletesting.WithNatssInitChannelConditions, reconciletesting.WithNatssChannelDeploymentReady(), reconciletesting.WithNatssChannelServiceReady(), reconciletesting.WithNatssChannelEndpointsReady(), @@ -283,7 +283,7 @@ func TestAllCases(t *testing.T) { }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconciletesting.NewNatssChannel(ncName, testNS, - reconciletesting.WithInitNatssChannelConditions, + reconciletesting.WithNatssInitChannelConditions, reconciletesting.WithNatssChannelDeploymentReady(), reconciletesting.WithNatssChannelServiceReady(), reconciletesting.WithNatssChannelEndpointsReady(), diff --git a/contrib/natss/pkg/reconciler/testing/natsschannel.go b/contrib/natss/pkg/reconciler/testing/natsschannel.go index 4936cca12f3..5c61d769da6 100644 --- a/contrib/natss/pkg/reconciler/testing/natsschannel.go +++ b/contrib/natss/pkg/reconciler/testing/natsschannel.go @@ -25,7 +25,6 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - // "k8s.io/apimachinery/pkg/types" ) // NatssChannelOption enables further configuration of a NatssChannel. @@ -47,7 +46,7 @@ func NewNatssChannel(name, namespace string, ncopt ...NatssChannelOption) *v1alp return nc } -func WithInitNatssChannelConditions(nc *v1alpha1.NatssChannel) { +func WithNatssInitChannelConditions(nc *v1alpha1.NatssChannel) { nc.Status.InitializeConditions() }