diff --git a/contrib/natss/cmd/channel_controller/main.go b/contrib/natss/cmd/channel_controller/main.go index ac2daa369f6..5b80fdb42fd 100644 --- a/contrib/natss/cmd/channel_controller/main.go +++ b/contrib/natss/cmd/channel_controller/main.go @@ -23,12 +23,10 @@ import ( "github.com/knative/eventing/contrib/natss/pkg/stanutil" "github.com/knative/eventing/contrib/natss/pkg/util" - clientset "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned" - eventingScheme "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned/scheme" informers "github.com/knative/eventing/contrib/natss/pkg/client/informers/externalversions" + "github.com/knative/eventing/contrib/natss/pkg/reconciler" natsschannel "github.com/knative/eventing/contrib/natss/pkg/reconciler/controller" "github.com/knative/eventing/pkg/logconfig" - "github.com/knative/eventing/pkg/reconciler" "github.com/knative/pkg/configmap" kncontroller "github.com/knative/pkg/controller" "github.com/knative/pkg/logging" @@ -36,7 +34,6 @@ import ( "github.com/knative/pkg/system" "go.uber.org/zap" kubeinformers "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) @@ -80,30 +77,24 @@ func main() { cfg.QPS = numControllers * rest.DefaultQPS cfg.Burst = numControllers * rest.DefaultBurst opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh) - // Setting up our own eventingClientSet as we need the messaging API introduced with natss. - eventingClientSet := clientset.NewForConfigOrDie(cfg) kubeInformerFactory := kubeinformers.NewSharedInformerFactory(opt.KubeClientSet, opt.ResyncPeriod) - eventingInformerFactory := informers.NewSharedInformerFactory(eventingClientSet, opt.ResyncPeriod) + messagingInformerFactory := informers.NewSharedInformerFactory(opt.NatssClientSet, opt.ResyncPeriod) // Messaging - natssChannelInformer := eventingInformerFactory.Messaging().V1alpha1().NatssChannels() + natssChannelInformer := messagingInformerFactory.Messaging().V1alpha1().NatssChannels() // Kube serviceInformer := kubeInformerFactory.Core().V1().Services() endpointsInformer := kubeInformerFactory.Core().V1().Endpoints() deploymentInformer := kubeInformerFactory.Apps().V1().Deployments() - // Adding the scheme. - eventingScheme.AddToScheme(scheme.Scheme) - // Build all of our controllers, with the clients constructed above. // Add new controllers to this array. // You also need to modify numControllers above to match this. controllers := [...]*kncontroller.Impl{ natsschannel.NewController( opt, - eventingClientSet, systemNS, dispatcherDeploymentName, dispatcherServiceName, diff --git a/contrib/natss/cmd/channel_dispatcher/main.go b/contrib/natss/cmd/channel_dispatcher/main.go index 9866405979a..5da4076146c 100644 --- a/contrib/natss/cmd/channel_dispatcher/main.go +++ b/contrib/natss/cmd/channel_dispatcher/main.go @@ -18,23 +18,22 @@ package main import ( "flag" + "github.com/knative/eventing/pkg/tracing" "log" "github.com/knative/eventing/contrib/natss/pkg/util" clientset "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned" - eventingScheme "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned/scheme" informers "github.com/knative/eventing/contrib/natss/pkg/client/informers/externalversions" "github.com/knative/eventing/contrib/natss/pkg/dispatcher" + "github.com/knative/eventing/contrib/natss/pkg/reconciler" natsschannel "github.com/knative/eventing/contrib/natss/pkg/reconciler/dispatcher" "github.com/knative/eventing/pkg/logconfig" - "github.com/knative/eventing/pkg/reconciler" "github.com/knative/pkg/configmap" kncontroller "github.com/knative/pkg/controller" "github.com/knative/pkg/logging" "github.com/knative/pkg/signals" "go.uber.org/zap" - "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) @@ -74,15 +73,11 @@ func main() { cfg.QPS = numControllers * rest.DefaultQPS cfg.Burst = numControllers * rest.DefaultBurst opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh) - // Setting up our own eventingClientSet as we need the messaging API introduced with natss. - eventingClientSet := clientset.NewForConfigOrDie(cfg) - eventingInformerFactory := informers.NewSharedInformerFactory(eventingClientSet, opt.ResyncPeriod) + messagingClientSet := clientset.NewForConfigOrDie(cfg) + messagingInformerFactory := informers.NewSharedInformerFactory(messagingClientSet, opt.ResyncPeriod) // Messaging - natssChannelInformer := eventingInformerFactory.Messaging().V1alpha1().NatssChannels() - - // Adding the scheme. - eventingScheme.AddToScheme(scheme.Scheme) + natssChannelInformer := messagingInformerFactory.Messaging().V1alpha1().NatssChannels() // Build all of our controllers, with the clients constructed above. // Add new controllers to this array. @@ -90,7 +85,6 @@ func main() { controllers := [...]*kncontroller.Impl{ natsschannel.NewController( opt, - eventingClientSet, natssDispatcher, natssChannelInformer, ), @@ -106,6 +100,12 @@ func main() { opt.ConfigMapWatcher.Watch(logconfig.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, logconfig.Controller)) // TODO: Watch the observability config map and dynamically update metrics exporter. //opt.ConfigMapWatcher.Watch(metrics.ObservabilityConfigName, metrics.UpdateExporterFromConfigMap(component, logger)) + + // Setup zipkin tracing. + if err = tracing.SetupDynamicZipkinPublishing(logger, opt.ConfigMapWatcher, "natss-ch-dispatcher"); err != nil { + logger.Fatalw("Error setting up Zipkin publishing", zap.Error(err)) + } + if err := opt.ConfigMapWatcher.Start(stopCh); err != nil { logger.Fatalw("failed to start configuration manager", zap.Error(err)) } diff --git a/contrib/natss/pkg/reconciler/controller/natsschannel.go b/contrib/natss/pkg/reconciler/controller/natsschannel.go index 46704840497..083e99b2a00 100644 --- a/contrib/natss/pkg/reconciler/controller/natsschannel.go +++ b/contrib/natss/pkg/reconciler/controller/natsschannel.go @@ -26,12 +26,11 @@ import ( "github.com/knative/pkg/apis" "github.com/knative/eventing/contrib/natss/pkg/apis/messaging/v1alpha1" - clientset "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned" messaginginformers "github.com/knative/eventing/contrib/natss/pkg/client/informers/externalversions/messaging/v1alpha1" listers "github.com/knative/eventing/contrib/natss/pkg/client/listers/messaging/v1alpha1" + "github.com/knative/eventing/contrib/natss/pkg/reconciler" "github.com/knative/eventing/contrib/natss/pkg/reconciler/controller/resources" "github.com/knative/eventing/pkg/logging" - "github.com/knative/eventing/pkg/reconciler" "github.com/knative/pkg/controller" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" @@ -69,7 +68,6 @@ type Reconciler struct { dispatcherDeploymentName string dispatcherServiceName string - eventingClientSet clientset.Interface natsschannelLister listers.NatssChannelLister natsschannelInformer cache.SharedIndexInformer deploymentLister appsv1listers.DeploymentLister @@ -93,7 +91,6 @@ var _ cache.ResourceEventHandler = (*Reconciler)(nil) // Registers event handlers to enqueue events. func NewController( opt reconciler.Options, - eventingClientSet clientset.Interface, dispatcherNamespace string, dispatcherDeploymentName string, dispatcherServiceName string, @@ -108,7 +105,6 @@ func NewController( dispatcherNamespace: dispatcherNamespace, dispatcherDeploymentName: dispatcherDeploymentName, dispatcherServiceName: dispatcherServiceName, - eventingClientSet: eventingClientSet, natsschannelLister: natsschannelInformer.Lister(), natsschannelInformer: natsschannelInformer.Informer(), deploymentLister: deploymentInformer.Lister(), @@ -333,11 +329,11 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.NatssCh existing := kc.DeepCopy() existing.Status = desired.Status - new, err := r.eventingClientSet.MessagingV1alpha1().NatssChannels(desired.Namespace).UpdateStatus(existing) + new, err := r.NatssClientSet.MessagingV1alpha1().NatssChannels(desired.Namespace).UpdateStatus(existing) if err == nil && becomesReady { duration := time.Since(new.ObjectMeta.CreationTimestamp.Time) r.Logger.Infof("NatssChannel %q became ready after %v", kc.Name, duration) - if err := r.StatsReporter.ReportReady("Channel", kc.Namespace, kc.Name, duration); err != nil { + if err := r.StatsReporter.ReportReady("NatssChannel", kc.Namespace, kc.Name, duration); err != nil { r.Logger.Infof("Failed to record ready for NatssChannel %q: %v", kc.Name, err) } } diff --git a/contrib/natss/pkg/reconciler/controller/natsschannel_test.go b/contrib/natss/pkg/reconciler/controller/natsschannel_test.go new file mode 100644 index 00000000000..8c8509e88c6 --- /dev/null +++ b/contrib/natss/pkg/reconciler/controller/natsschannel_test.go @@ -0,0 +1,413 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "github.com/knative/eventing/pkg/utils" + "github.com/knative/pkg/kmeta" + "k8s.io/apimachinery/pkg/runtime" + "testing" + + "github.com/knative/eventing/contrib/natss/pkg/apis/messaging/v1alpha1" + fakeclientset "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned/fake" + informers "github.com/knative/eventing/contrib/natss/pkg/client/informers/externalversions" + "github.com/knative/eventing/contrib/natss/pkg/reconciler" + "github.com/knative/eventing/contrib/natss/pkg/reconciler/controller/resources" + reconciletesting "github.com/knative/eventing/contrib/natss/pkg/reconciler/testing" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "github.com/knative/pkg/controller" + logtesting "github.com/knative/pkg/logging/testing" + . "github.com/knative/pkg/reconciler/testing" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeinformers "k8s.io/client-go/informers" + fakekubeclientset "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" + clientgotesting "k8s.io/client-go/testing" +) + +const ( + systemNS = "knative-eventing" + testNS = "test-namespace" + ncName = "test-nc" + dispatcherDeploymentName = "test-deployment" + dispatcherServiceName = "test-service" + channelServiceAddress = "test-nc-kn-channel.test-namespace.svc.cluster.local" +) + +var ( + trueVal = true + // deletionTime is used when objects are marked as deleted. Rfc3339Copy() + // truncates to seconds to match the loss of precision during serialization. + deletionTime = metav1.Now().Rfc3339Copy() +) + +func init() { + // Add types to scheme + _ = v1alpha1.AddToScheme(scheme.Scheme) + _ = duckv1alpha1.AddToScheme(scheme.Scheme) +} + +func TestNewController(t *testing.T) { + kubeClient := fakekubeclientset.NewSimpleClientset() + messagingClient := fakeclientset.NewSimpleClientset() + + // Create informer factories with fake clients. The second parameter sets the + // resync period to zero, disabling it. + kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, 0) + messagingInformerFactory := informers.NewSharedInformerFactory(messagingClient, 0) + + // Eventing + ncInformer := messagingInformerFactory.Messaging().V1alpha1().NatssChannels() + + // Kube + serviceInformer := kubeInformerFactory.Core().V1().Services() + endpointsInformer := kubeInformerFactory.Core().V1().Endpoints() + deploymentInformer := kubeInformerFactory.Apps().V1().Deployments() + + c := NewController( + reconciler.Options{ + KubeClientSet: kubeClient, + NatssClientSet: messagingClient, + Logger: logtesting.TestLogger(t), + }, + systemNS, + dispatcherDeploymentName, + dispatcherServiceName, + ncInformer, + deploymentInformer, + serviceInformer, + endpointsInformer) + + if c == nil { + t.Fatalf("Failed to create with NewController") + } +} + +func TestAllCases(t *testing.T) { + ncKey := testNS + "/" + ncName + table := TableTest{ + { + Name: "bad workqueue key", + // Make sure Reconcile handles bad keys. + Key: "too/many/parts", + }, { + Name: "key not found", + // Make sure Reconcile handles good keys that don't exist. + Key: "foo/not-found", + }, { + Name: "deleting", + Key: ncKey, + Objects: []runtime.Object{ + reconciletesting.NewNatssChannel(ncName, testNS, + reconciletesting.WithNatssInitChannelConditions, + reconciletesting.WithNatssChannelDeleted)}, + WantErr: false, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, channelReconciled, "NatssChannel reconciled"), + }, + }, { + Name: "deployment does not exist", + Key: ncKey, + Objects: []runtime.Object{ + reconciletesting.NewNatssChannel(ncName, testNS), + }, + WantErr: true, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewNatssChannel(ncName, testNS, + reconciletesting.WithNatssInitChannelConditions, + reconciletesting.WithNatssChannelDeploymentNotReady("DispatcherDeploymentDoesNotExist", "Dispatcher Deployment does not exist")), + }}, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, channelReconcileFailed, "NatssChannel reconciliation failed: deployment.apps \"test-deployment\" not found"), + }, + }, { + Name: "Service does not exist", + Key: ncKey, + Objects: []runtime.Object{ + makeReadyDeployment(), + reconciletesting.NewNatssChannel(ncName, testNS), + }, + WantErr: true, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewNatssChannel(ncName, testNS, + reconciletesting.WithNatssInitChannelConditions, + reconciletesting.WithNatssChannelDeploymentReady(), + reconciletesting.WithNatssChannelServicetNotReady("DispatcherServiceDoesNotExist", "Dispatcher Service does not exist")), + }}, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, channelReconcileFailed, "NatssChannel reconciliation failed: service \"test-service\" not found"), + }, + }, { + Name: "Endpoints does not exist", + Key: ncKey, + Objects: []runtime.Object{ + makeReadyDeployment(), + makeService(), + reconciletesting.NewNatssChannel(ncName, testNS), + }, + WantErr: true, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewNatssChannel(ncName, testNS, + reconciletesting.WithNatssInitChannelConditions, + reconciletesting.WithNatssChannelDeploymentReady(), + reconciletesting.WithNatssChannelServiceReady(), + reconciletesting.WithNatssChannelEndpointsNotReady("DispatcherEndpointsDoesNotExist", "Dispatcher Endpoints does not exist"), + ), + }}, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, channelReconcileFailed, "NatssChannel reconciliation failed: endpoints \"test-service\" not found"), + }, + }, { + Name: "Endpoints not ready", + Key: ncKey, + Objects: []runtime.Object{ + makeReadyDeployment(), + makeService(), + makeEmptyEndpoints(), + reconciletesting.NewNatssChannel(ncName, testNS), + }, + WantErr: true, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewNatssChannel(ncName, testNS, + reconciletesting.WithNatssInitChannelConditions, + reconciletesting.WithNatssChannelDeploymentReady(), + reconciletesting.WithNatssChannelServiceReady(), + reconciletesting.WithNatssChannelEndpointsNotReady("DispatcherEndpointsNotReady", "There are no endpoints ready for Dispatcher service"), + ), + }}, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, channelReconcileFailed, "NatssChannel reconciliation failed: there are no endpoints ready for Dispatcher service test-service"), + }, + }, { + Name: "Works, creates new channel", + Key: ncKey, + Objects: []runtime.Object{ + makeReadyDeployment(), + makeService(), + makeReadyEndpoints(), + reconciletesting.NewNatssChannel(ncName, testNS), + }, + WantErr: false, + WantCreates: []runtime.Object{ + makeChannelService(reconciletesting.NewNatssChannel(ncName, testNS)), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewNatssChannel(ncName, testNS, + reconciletesting.WithNatssInitChannelConditions, + reconciletesting.WithNatssChannelDeploymentReady(), + reconciletesting.WithNatssChannelServiceReady(), + reconciletesting.WithNatssChannelEndpointsReady(), + reconciletesting.WithNatssChannelChannelServiceReady(), + reconciletesting.WithNatssChannelAddress(channelServiceAddress), + ), + }}, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, channelReconciled, "NatssChannel reconciled"), + }, + }, { + Name: "Works, channel exists", + Key: ncKey, + Objects: []runtime.Object{ + makeReadyDeployment(), + makeService(), + makeReadyEndpoints(), + reconciletesting.NewNatssChannel(ncName, testNS), + makeChannelService(reconciletesting.NewNatssChannel(ncName, testNS)), + }, + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewNatssChannel(ncName, testNS, + reconciletesting.WithNatssInitChannelConditions, + reconciletesting.WithNatssChannelDeploymentReady(), + reconciletesting.WithNatssChannelServiceReady(), + reconciletesting.WithNatssChannelEndpointsReady(), + reconciletesting.WithNatssChannelChannelServiceReady(), + reconciletesting.WithNatssChannelAddress(channelServiceAddress), + ), + }}, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, channelReconciled, "NatssChannel reconciled"), + }, + }, { + Name: "channel exists, not owned by us", + Key: ncKey, + Objects: []runtime.Object{ + makeReadyDeployment(), + makeService(), + makeReadyEndpoints(), + reconciletesting.NewNatssChannel(ncName, testNS), + makeChannelServiceNotOwnedByUs(reconciletesting.NewNatssChannel(ncName, testNS)), + }, + WantErr: true, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewNatssChannel(ncName, testNS, + reconciletesting.WithNatssInitChannelConditions, + reconciletesting.WithNatssChannelDeploymentReady(), + reconciletesting.WithNatssChannelServiceReady(), + reconciletesting.WithNatssChannelEndpointsReady(), + reconciletesting.WithNatssChannelChannelServicetNotReady("ChannelServiceFailed", "Channel Service failed: natsschannel: test-namespace/test-nc does not own Service: \"test-nc-kn-channel\""), + ), + }}, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, channelReconcileFailed, "NatssChannel reconciliation failed: natsschannel: test-namespace/test-nc does not own Service: \"test-nc-kn-channel\""), + }, + }, { + Name: "channel does not exist, fails to create", + Key: ncKey, + Objects: []runtime.Object{ + makeReadyDeployment(), + makeService(), + makeReadyEndpoints(), + reconciletesting.NewNatssChannel(ncName, testNS), + }, + WantErr: true, + WithReactors: []clientgotesting.ReactionFunc{ + InduceFailure("create", "Services"), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewNatssChannel(ncName, testNS, + reconciletesting.WithNatssInitChannelConditions, + reconciletesting.WithNatssChannelDeploymentReady(), + reconciletesting.WithNatssChannelServiceReady(), + reconciletesting.WithNatssChannelEndpointsReady(), + reconciletesting.WithNatssChannelChannelServicetNotReady("ChannelServiceFailed", "Channel Service failed: inducing failure for create services"), + ), + }}, + WantCreates: []runtime.Object{ + makeChannelService(reconciletesting.NewNatssChannel(ncName, testNS)), + }, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, channelReconcileFailed, "NatssChannel reconciliation failed: inducing failure for create services"), + }, + }, + } + defer logtesting.ClearAll() + + table.Test(t, reconciletesting.MakeFactory(func(listers *reconciletesting.Listers, opt reconciler.Options) controller.Reconciler { + return &Reconciler{ + Base: reconciler.NewBase(opt, controllerAgentName), + dispatcherNamespace: testNS, + dispatcherDeploymentName: dispatcherDeploymentName, + dispatcherServiceName: dispatcherServiceName, + natsschannelLister: listers.GetNatssChannelLister(), + // TODO fix + natsschannelInformer: nil, + deploymentLister: listers.GetDeploymentLister(), + serviceLister: listers.GetServiceLister(), + endpointsLister: listers.GetEndpointsLister(), + } + }, + )) +} + +func makeDeployment() *appsv1.Deployment { + return &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNS, + Name: dispatcherDeploymentName, + }, + Status: appsv1.DeploymentStatus{}, + } +} + +func makeReadyDeployment() *appsv1.Deployment { + d := makeDeployment() + d.Status.Conditions = []appsv1.DeploymentCondition{{Type: appsv1.DeploymentAvailable, Status: corev1.ConditionTrue}} + return d +} + +func makeService() *corev1.Service { + return &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNS, + Name: dispatcherServiceName, + }, + } +} + +func makeChannelService(nc *v1alpha1.NatssChannel) *corev1.Service { + return &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNS, + Name: fmt.Sprintf("%s-kn-channel", ncName), + Labels: map[string]string{ + resources.MessagingRoleLabel: resources.MessagingRole, + }, + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(nc), + }, + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeExternalName, + ExternalName: fmt.Sprintf("%s.%s.svc.%s", dispatcherServiceName, testNS, utils.GetClusterDomainName()), + }, + } +} + +func makeChannelServiceNotOwnedByUs(nc *v1alpha1.NatssChannel) *corev1.Service { + return &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNS, + Name: fmt.Sprintf("%s-kn-channel", ncName), + Labels: map[string]string{ + resources.MessagingRoleLabel: resources.MessagingRole, + }, + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeExternalName, + ExternalName: fmt.Sprintf("%s.%s.svc.%s", dispatcherServiceName, testNS, utils.GetClusterDomainName()), + }, + } +} + +func makeEmptyEndpoints() *corev1.Endpoints { + return &corev1.Endpoints{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Endpoints", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNS, + Name: dispatcherServiceName, + }, + } +} + +func makeReadyEndpoints() *corev1.Endpoints { + e := makeEmptyEndpoints() + e.Subsets = []corev1.EndpointSubset{{Addresses: []corev1.EndpointAddress{{IP: "1.1.1.1"}}}} + return e +} diff --git a/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go b/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go index 15cfe801fa4..9a938e5b597 100644 --- a/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go +++ b/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go @@ -26,14 +26,13 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "github.com/knative/eventing/contrib/natss/pkg/apis/messaging/v1alpha1" - clientset "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned" messaginginformers "github.com/knative/eventing/contrib/natss/pkg/client/informers/externalversions/messaging/v1alpha1" listers "github.com/knative/eventing/contrib/natss/pkg/client/listers/messaging/v1alpha1" "github.com/knative/eventing/contrib/natss/pkg/dispatcher" + "github.com/knative/eventing/contrib/natss/pkg/reconciler" "github.com/knative/eventing/pkg/logging" "github.com/knative/eventing/pkg/provisioners/fanout" "github.com/knative/eventing/pkg/provisioners/multichannelfanout" - "github.com/knative/eventing/pkg/reconciler" "github.com/knative/pkg/controller" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" @@ -58,7 +57,6 @@ type Reconciler struct { natssDispatcher *dispatcher.SubscriptionsSupervisor - eventingClientSet clientset.Interface natsschannelLister listers.NatssChannelLister natsschannelInformer cache.SharedIndexInformer impl *controller.Impl @@ -71,14 +69,12 @@ var _ controller.Reconciler = (*Reconciler)(nil) // Registers event handlers to enqueue events. func NewController( opt reconciler.Options, - eventingClientSet clientset.Interface, natssDispatcher *dispatcher.SubscriptionsSupervisor, natsschannelInformer messaginginformers.NatssChannelInformer, ) *controller.Impl { r := &Reconciler{ Base: reconciler.NewBase(opt, controllerAgentName), - eventingClientSet: eventingClientSet, natssDispatcher: natssDispatcher, natsschannelLister: natsschannelInformer.Lister(), natsschannelInformer: natsschannelInformer.Informer(), @@ -124,7 +120,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error { return err } removeFinalizer(natssChannel) - _, err := r.eventingClientSet.MessagingV1alpha1().NatssChannels(natssChannel.Namespace).Update(natssChannel) + _, err := r.NatssClientSet.MessagingV1alpha1().NatssChannels(natssChannel.Namespace).Update(natssChannel) return err } @@ -201,7 +197,7 @@ func (r *Reconciler) ensureFinalizer(channel *v1alpha1.NatssChannel) error { return err } - _, err = r.eventingClientSet.MessagingV1alpha1().NatssChannels(channel.Namespace).Patch(channel.Name, types.MergePatchType, patch) + _, err = r.NatssClientSet.MessagingV1alpha1().NatssChannels(channel.Namespace).Patch(channel.Name, types.MergePatchType, patch) return err } diff --git a/contrib/natss/pkg/reconciler/reconciler.go b/contrib/natss/pkg/reconciler/reconciler.go new file mode 100644 index 00000000000..d6699f3773a --- /dev/null +++ b/contrib/natss/pkg/reconciler/reconciler.go @@ -0,0 +1,168 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reconciler + +import ( + "time" + + clientset "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned" + natssScheme "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned/scheme" + "github.com/knative/pkg/configmap" + "github.com/knative/pkg/logging/logkey" + "github.com/knative/pkg/system" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" +) + +// Options defines the common reconciler options. +// We define this to reduce the boilerplate argument list when +// creating our controllers. +type Options struct { + KubeClientSet kubernetes.Interface + DynamicClientSet dynamic.Interface + + NatssClientSet clientset.Interface + + Recorder record.EventRecorder + StatsReporter StatsReporter + + ConfigMapWatcher *configmap.InformedWatcher + Logger *zap.SugaredLogger + + ResyncPeriod time.Duration + StopChannel <-chan struct{} +} + +// This is mutable for testing. +var resetPeriod = 30 * time.Second + +func NewOptionsOrDie(cfg *rest.Config, logger *zap.SugaredLogger, stopCh <-chan struct{}) Options { + kubeClient := kubernetes.NewForConfigOrDie(cfg) + dynamicClient := dynamic.NewForConfigOrDie(cfg) + + natssClient := clientset.NewForConfigOrDie(cfg) + + configMapWatcher := configmap.NewInformedWatcher(kubeClient, system.Namespace()) + + return Options{ + KubeClientSet: kubeClient, + DynamicClientSet: dynamicClient, + ConfigMapWatcher: configMapWatcher, + NatssClientSet: natssClient, + Logger: logger, + ResyncPeriod: 10 * time.Hour, // Based on controller-runtime default. + StopChannel: stopCh, + } +} + +// GetTrackerLease returns a multiple of the resync period to use as the +// duration for tracker leases. This attempts to ensure that resyncs happen to +// refresh leases frequently enough that we don't miss updates to tracked +// objects. +func (o Options) GetTrackerLease() time.Duration { + return o.ResyncPeriod * 3 +} + +// Base implements the core controller logic, given a Reconciler. +type Base struct { + // KubeClientSet allows us to talk to the k8s for core APIs + KubeClientSet kubernetes.Interface + + // DynamicClientSet allows us to configure pluggable Build objects + DynamicClientSet dynamic.Interface + + NatssClientSet clientset.Interface + + // ConfigMapWatcher allows us to watch for ConfigMap changes. + ConfigMapWatcher configmap.Watcher + + // Recorder is an event recorder for recording Event resources to the + // Kubernetes API. + Recorder record.EventRecorder + + // StatsReporter reports reconciler's metrics. + StatsReporter StatsReporter + + // Sugared logger is easier to use but is not as performant as the + // raw logger. In performance critical paths, call logger.Desugar() + // and use the returned raw logger instead. In addition to the + // performance benefits, raw logger also preserves type-safety at + // the expense of slightly greater verbosity. + Logger *zap.SugaredLogger +} + +// NewBase instantiates a new instance of Base implementing +// the common & boilerplate code between our reconcilers. +func NewBase(opt Options, controllerAgentName string) *Base { + // Enrich the logs with controller name + logger := opt.Logger.Named(controllerAgentName).With(zap.String(logkey.ControllerType, controllerAgentName)) + + recorder := opt.Recorder + if recorder == nil { + // Create event broadcaster + logger.Debug("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + watches := []watch.Interface{ + eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof), + eventBroadcaster.StartRecordingToSink( + &typedcorev1.EventSinkImpl{Interface: opt.KubeClientSet.CoreV1().Events("")}), + } + recorder = eventBroadcaster.NewRecorder( + scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + go func() { + <-opt.StopChannel + for _, w := range watches { + w.Stop() + } + }() + } + + statsReporter := opt.StatsReporter + if statsReporter == nil { + logger.Debug("Creating stats reporter") + var err error + statsReporter, err = NewStatsReporter(controllerAgentName) + if err != nil { + logger.Fatal(err) + } + } + + base := &Base{ + KubeClientSet: opt.KubeClientSet, + DynamicClientSet: opt.DynamicClientSet, + NatssClientSet: opt.NatssClientSet, + ConfigMapWatcher: opt.ConfigMapWatcher, + Recorder: recorder, + StatsReporter: statsReporter, + Logger: logger, + } + + return base +} + +func init() { + // Add run types to the default Kubernetes Scheme so Events can be + // logged for run types. + _ = natssScheme.AddToScheme(scheme.Scheme) +} diff --git a/contrib/natss/pkg/reconciler/stats_reporter.go b/contrib/natss/pkg/reconciler/stats_reporter.go new file mode 100644 index 00000000000..a1845fba024 --- /dev/null +++ b/contrib/natss/pkg/reconciler/stats_reporter.go @@ -0,0 +1,164 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reconciler + +import ( + "context" + "fmt" + "time" + + "github.com/knative/pkg/metrics" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +type Measurement int + +const ( + // NatssChannelReadyCountN is the number of natss channels that have become ready. + NatssChannelReadyCountN = "natsschannel_ready_count" + // NatssChannelReadyLatencyN is the time it takes for a natss channel to become ready since the resource is created. + NatssChannelReadyLatencyN = "natsschannel_ready_latency" +) + +var ( + KindToStatKeys = map[string]StatKey{ + "NatssChannel": { + ReadyCountKey: NatssChannelReadyCountN, + ReadyLatencyKey: NatssChannelReadyLatencyN, + }, + } + + KindToMeasurements map[string]Measurements + + reconcilerTagKey tag.Key + keyTagKey tag.Key +) + +type Measurements struct { + ReadyLatencyStat *stats.Int64Measure + ReadyCountStat *stats.Int64Measure +} + +type StatKey struct { + ReadyLatencyKey string + ReadyCountKey string +} + +func init() { + var err error + // Create the tag keys that will be used to add tags to our measurements. + // Tag keys must conform to the restrictions described in + // go.opencensus.io/tag/validate.go. Currently those restrictions are: + // - length between 1 and 255 inclusive + // - characters are printable US-ASCII + reconcilerTagKey = mustNewTagKey("reconciler") + keyTagKey = mustNewTagKey("key") + + KindToMeasurements = make(map[string]Measurements, len(KindToStatKeys)) + + for kind, keys := range KindToStatKeys { + + readyLatencyStat := stats.Int64( + keys.ReadyLatencyKey, + fmt.Sprintf("Time it takes for a %s to become ready since created", kind), + stats.UnitMilliseconds) + + readyCountStat := stats.Int64( + keys.ReadyCountKey, + fmt.Sprintf("Number of %s that became ready", kind), + stats.UnitDimensionless) + + // Save the measurements for later marks. + KindToMeasurements[kind] = Measurements{ + ReadyCountStat: readyCountStat, + ReadyLatencyStat: readyLatencyStat, + } + + // Create views to see our measurements. This can return an error if + // a previously-registered view has the same name with a different value. + // View name defaults to the measure name if unspecified. + err = view.Register( + &view.View{ + Description: readyLatencyStat.Description(), + Measure: readyLatencyStat, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{reconcilerTagKey, keyTagKey}, + }, + &view.View{ + Description: readyCountStat.Description(), + Measure: readyCountStat, + Aggregation: view.Count(), + TagKeys: []tag.Key{reconcilerTagKey, keyTagKey}, + }, + ) + if err != nil { + panic(err) + } + } +} + +// StatsReporter reports reconcilers' metrics. +type StatsReporter interface { + // ReportReady reports the time it took a resource to become Ready. + ReportReady(kind, namespace, service string, d time.Duration) error +} + +type reporter struct { + ctx context.Context +} + +// NewStatsReporter creates a reporter for reconcilers' metrics +func NewStatsReporter(reconciler string) (StatsReporter, error) { + ctx, err := tag.New( + context.Background(), + tag.Insert(reconcilerTagKey, reconciler)) + if err != nil { + return nil, err + } + return &reporter{ctx: ctx}, nil +} + +// ReportServiceReady reports the time it took a service to become Ready +func (r *reporter) ReportReady(kind, namespace, service string, d time.Duration) error { + key := fmt.Sprintf("%s/%s", namespace, service) + v := int64(d / time.Millisecond) + ctx, err := tag.New( + r.ctx, + tag.Insert(keyTagKey, key)) + if err != nil { + return err + } + + m, ok := KindToMeasurements[kind] + if !ok { + return fmt.Errorf("unknown kind attempted to report ready, %q", kind) + } + + metrics.Record(ctx, m.ReadyCountStat.M(1)) + metrics.Record(ctx, m.ReadyLatencyStat.M(v)) + return nil +} + +func mustNewTagKey(s string) tag.Key { + tagKey, err := tag.NewKey(s) + if err != nil { + panic(err) + } + return tagKey +} diff --git a/contrib/natss/pkg/reconciler/testing/factory.go b/contrib/natss/pkg/reconciler/testing/factory.go new file mode 100644 index 00000000000..a6b40280db6 --- /dev/null +++ b/contrib/natss/pkg/reconciler/testing/factory.go @@ -0,0 +1,96 @@ +/* +Copyright 2019 The Knative Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "context" + "testing" + + "k8s.io/apimachinery/pkg/runtime" + + fakedynamicclientset "k8s.io/client-go/dynamic/fake" + fakekubeclientset "k8s.io/client-go/kubernetes/fake" + clientgotesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/record" + + fakeclientset "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned/fake" + "github.com/knative/eventing/contrib/natss/pkg/reconciler" + "github.com/knative/pkg/controller" + logtesting "github.com/knative/pkg/logging/testing" + + . "github.com/knative/pkg/reconciler/testing" +) + +const ( + // maxEventBufferSize is the estimated max number of event notifications that + // can be buffered during reconciliation. + maxEventBufferSize = 10 +) + +// Ctor functions create a k8s controller with given params. +type Ctor func(*Listers, reconciler.Options) controller.Reconciler + +// MakeFactory creates a reconciler factory with fake clients and controller created by `ctor`. +func MakeFactory(ctor Ctor) Factory { + return func(t *testing.T, r *TableRow) (controller.Reconciler, ActionRecorderList, EventList, *FakeStatsReporter) { + ls := NewListers(r.Objects) + + kubeClient := fakekubeclientset.NewSimpleClientset(ls.GetKubeObjects()...) + client := fakeclientset.NewSimpleClientset(ls.GetMessagingObjects()...) + + dynamicScheme := runtime.NewScheme() + for _, addTo := range clientSetSchemes { + addTo(dynamicScheme) + } + + dynamicClient := fakedynamicclientset.NewSimpleDynamicClient(dynamicScheme, ls.GetAllObjects()...) + eventRecorder := record.NewFakeRecorder(maxEventBufferSize) + statsReporter := &FakeStatsReporter{} + + PrependGenerateNameReactor(&client.Fake) + PrependGenerateNameReactor(&dynamicClient.Fake) + + // Set up our Controller from the fakes. + c := ctor(&ls, reconciler.Options{ + KubeClientSet: kubeClient, + DynamicClientSet: dynamicClient, + NatssClientSet: client, + Recorder: eventRecorder, + //StatsReporter: statsReporter, + Logger: logtesting.TestLogger(t), + }) + + for _, reactor := range r.WithReactors { + kubeClient.PrependReactor("*", "*", reactor) + client.PrependReactor("*", "*", reactor) + dynamicClient.PrependReactor("*", "*", reactor) + } + + // Validate all Create operations through the eventing client. + client.PrependReactor("create", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + return ValidateCreates(context.Background(), action) + }) + client.PrependReactor("update", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + return ValidateUpdates(context.Background(), action) + }) + + actionRecorderList := ActionRecorderList{dynamicClient, client, kubeClient} + eventList := EventList{Recorder: eventRecorder} + + return c, actionRecorderList, eventList, statsReporter + } +} diff --git a/contrib/natss/pkg/reconciler/testing/listers.go b/contrib/natss/pkg/reconciler/testing/listers.go new file mode 100644 index 00000000000..176aedbb1c9 --- /dev/null +++ b/contrib/natss/pkg/reconciler/testing/listers.go @@ -0,0 +1,103 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + messagingv1alpha1 "github.com/knative/eventing/contrib/natss/pkg/apis/messaging/v1alpha1" + fakemessagingclientset "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned/fake" + messaginglisters "github.com/knative/eventing/contrib/natss/pkg/client/listers/messaging/v1alpha1" + fakeeventsclientset "github.com/knative/eventing/pkg/client/clientset/versioned/fake" + fakesharedclientset "github.com/knative/pkg/client/clientset/versioned/fake" + "github.com/knative/pkg/reconciler/testing" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + fakekubeclientset "k8s.io/client-go/kubernetes/fake" + appsv1listers "k8s.io/client-go/listers/apps/v1" + corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" +) + +var clientSetSchemes = []func(*runtime.Scheme) error{ + fakekubeclientset.AddToScheme, + fakesharedclientset.AddToScheme, + fakeeventsclientset.AddToScheme, + fakemessagingclientset.AddToScheme, +} + +type Listers struct { + sorter testing.ObjectSorter +} + +func NewListers(objs []runtime.Object) Listers { + scheme := runtime.NewScheme() + + for _, addTo := range clientSetSchemes { + addTo(scheme) + } + + ls := Listers{ + sorter: testing.NewObjectSorter(scheme), + } + + ls.sorter.AddObjects(objs...) + + return ls +} + +func (l *Listers) indexerFor(obj runtime.Object) cache.Indexer { + return l.sorter.IndexerForObjectType(obj) +} + +func (l *Listers) GetKubeObjects() []runtime.Object { + return l.sorter.ObjectsForSchemeFunc(fakekubeclientset.AddToScheme) +} + +func (l *Listers) GetEventsObjects() []runtime.Object { + return l.sorter.ObjectsForSchemeFunc(fakeeventsclientset.AddToScheme) +} + +func (l *Listers) GetMessagingObjects() []runtime.Object { + return l.sorter.ObjectsForSchemeFunc(fakemessagingclientset.AddToScheme) +} + +func (l *Listers) GetAllObjects() []runtime.Object { + all := l.GetMessagingObjects() + all = append(all, l.GetEventsObjects()...) + all = append(all, l.GetKubeObjects()...) + return all +} + +func (l *Listers) GetSharedObjects() []runtime.Object { + return l.sorter.ObjectsForSchemeFunc(fakesharedclientset.AddToScheme) +} + +func (l *Listers) GetServiceLister() corev1listers.ServiceLister { + return corev1listers.NewServiceLister(l.indexerFor(&corev1.Service{})) +} + +func (l *Listers) GetEndpointsLister() corev1listers.EndpointsLister { + return corev1listers.NewEndpointsLister(l.indexerFor(&corev1.Endpoints{})) +} + +func (l *Listers) GetNatssChannelLister() messaginglisters.NatssChannelLister { + return messaginglisters.NewNatssChannelLister(l.indexerFor(&messagingv1alpha1.NatssChannel{})) +} + +func (l *Listers) GetDeploymentLister() appsv1listers.DeploymentLister { + return appsv1listers.NewDeploymentLister(l.indexerFor(&appsv1.Deployment{})) +} diff --git a/contrib/natss/pkg/reconciler/testing/natsschannel.go b/contrib/natss/pkg/reconciler/testing/natsschannel.go new file mode 100644 index 00000000000..5c61d769da6 --- /dev/null +++ b/contrib/natss/pkg/reconciler/testing/natsschannel.go @@ -0,0 +1,113 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "context" + "time" + + "github.com/knative/eventing/contrib/natss/pkg/apis/messaging/v1alpha1" + "github.com/knative/pkg/apis" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// NatssChannelOption enables further configuration of a NatssChannel. +type NatssChannelOption func(*v1alpha1.NatssChannel) + +// NewNatssChannel creates an NatssChannel with NatssChannelOptions. +func NewNatssChannel(name, namespace string, ncopt ...NatssChannelOption) *v1alpha1.NatssChannel { + nc := &v1alpha1.NatssChannel{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: v1alpha1.NatssChannelSpec{}, + } + for _, opt := range ncopt { + opt(nc) + } + nc.SetDefaults(context.Background()) + return nc +} + +func WithNatssInitChannelConditions(nc *v1alpha1.NatssChannel) { + nc.Status.InitializeConditions() +} + +func WithNatssChannelDeleted(nc *v1alpha1.NatssChannel) { + deleteTime := metav1.NewTime(time.Unix(1e9, 0)) + nc.ObjectMeta.SetDeletionTimestamp(&deleteTime) +} + +func WithNatssChannelDeploymentNotReady(reason, message string) NatssChannelOption { + return func(nc *v1alpha1.NatssChannel) { + nc.Status.MarkDispatcherFailed(reason, message) + } +} + +func WithNatssChannelDeploymentReady() NatssChannelOption { + return func(nc *v1alpha1.NatssChannel) { + nc.Status.PropagateDispatcherStatus(&appsv1.DeploymentStatus{Conditions: []appsv1.DeploymentCondition{{Type: appsv1.DeploymentAvailable, Status: corev1.ConditionTrue}}}) + } +} + +func WithNatssChannelServicetNotReady(reason, message string) NatssChannelOption { + return func(nc *v1alpha1.NatssChannel) { + nc.Status.MarkServiceFailed(reason, message) + } +} + +func WithNatssChannelServiceReady() NatssChannelOption { + return func(nc *v1alpha1.NatssChannel) { + nc.Status.MarkServiceTrue() + } +} + +func WithNatssChannelChannelServicetNotReady(reason, message string) NatssChannelOption { + return func(nc *v1alpha1.NatssChannel) { + nc.Status.MarkChannelServiceFailed(reason, message) + } +} + +func WithNatssChannelChannelServiceReady() NatssChannelOption { + return func(nc *v1alpha1.NatssChannel) { + nc.Status.MarkChannelServiceTrue() + } +} + +func WithNatssChannelEndpointsNotReady(reason, message string) NatssChannelOption { + return func(nc *v1alpha1.NatssChannel) { + nc.Status.MarkEndpointsFailed(reason, message) + } +} + +func WithNatssChannelEndpointsReady() NatssChannelOption { + return func(nc *v1alpha1.NatssChannel) { + nc.Status.MarkEndpointsTrue() + } +} + +func WithNatssChannelAddress(a string) NatssChannelOption { + return func(nc *v1alpha1.NatssChannel) { + nc.Status.SetAddress(&apis.URL{ + Scheme: "http", + Host: a, + }) + } +} diff --git a/pkg/reconciler/testing/listers.go b/pkg/reconciler/testing/listers.go index 185e80b2429..15edb114d00 100644 --- a/pkg/reconciler/testing/listers.go +++ b/pkg/reconciler/testing/listers.go @@ -51,7 +51,6 @@ var clientSetSchemes = []func(*runtime.Scheme) error{ fakekubeclientset.AddToScheme, fakesharedclientset.AddToScheme, fakeeventingclientset.AddToScheme, - fakeeventingclientset.AddToScheme, fakeapiextensionsclientset.AddToScheme, subscriberAddToScheme, }