Skip to content
Merged
15 changes: 3 additions & 12 deletions contrib/natss/cmd/channel_controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,17 @@ import (
"github.com/knative/eventing/contrib/natss/pkg/stanutil"
"github.com/knative/eventing/contrib/natss/pkg/util"

clientset "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned"
eventingScheme "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned/scheme"
informers "github.com/knative/eventing/contrib/natss/pkg/client/informers/externalversions"
"github.com/knative/eventing/contrib/natss/pkg/reconciler"
natsschannel "github.com/knative/eventing/contrib/natss/pkg/reconciler/controller"
"github.com/knative/eventing/pkg/logconfig"
"github.com/knative/eventing/pkg/reconciler"
"github.com/knative/pkg/configmap"
kncontroller "github.com/knative/pkg/controller"
"github.com/knative/pkg/logging"
"github.com/knative/pkg/signals"
"github.com/knative/pkg/system"
"go.uber.org/zap"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
Expand Down Expand Up @@ -80,30 +77,24 @@ func main() {
cfg.QPS = numControllers * rest.DefaultQPS
cfg.Burst = numControllers * rest.DefaultBurst
opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh)
// Setting up our own eventingClientSet as we need the messaging API introduced with natss.
eventingClientSet := clientset.NewForConfigOrDie(cfg)

kubeInformerFactory := kubeinformers.NewSharedInformerFactory(opt.KubeClientSet, opt.ResyncPeriod)
eventingInformerFactory := informers.NewSharedInformerFactory(eventingClientSet, opt.ResyncPeriod)
messagingInformerFactory := informers.NewSharedInformerFactory(opt.NatssClientSet, opt.ResyncPeriod)

// Messaging
natssChannelInformer := eventingInformerFactory.Messaging().V1alpha1().NatssChannels()
natssChannelInformer := messagingInformerFactory.Messaging().V1alpha1().NatssChannels()

// Kube
serviceInformer := kubeInformerFactory.Core().V1().Services()
endpointsInformer := kubeInformerFactory.Core().V1().Endpoints()
deploymentInformer := kubeInformerFactory.Apps().V1().Deployments()

// Adding the scheme.
eventingScheme.AddToScheme(scheme.Scheme)

// Build all of our controllers, with the clients constructed above.
// Add new controllers to this array.
// You also need to modify numControllers above to match this.
controllers := [...]*kncontroller.Impl{
natsschannel.NewController(
opt,
eventingClientSet,
systemNS,
dispatcherDeploymentName,
dispatcherServiceName,
Expand Down
22 changes: 11 additions & 11 deletions contrib/natss/cmd/channel_dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,22 @@ package main

import (
"flag"
"github.com/knative/eventing/pkg/tracing"
"log"

"github.com/knative/eventing/contrib/natss/pkg/util"

clientset "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned"
eventingScheme "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned/scheme"
informers "github.com/knative/eventing/contrib/natss/pkg/client/informers/externalversions"
"github.com/knative/eventing/contrib/natss/pkg/dispatcher"
"github.com/knative/eventing/contrib/natss/pkg/reconciler"
natsschannel "github.com/knative/eventing/contrib/natss/pkg/reconciler/dispatcher"
"github.com/knative/eventing/pkg/logconfig"
"github.com/knative/eventing/pkg/reconciler"
"github.com/knative/pkg/configmap"
kncontroller "github.com/knative/pkg/controller"
"github.com/knative/pkg/logging"
"github.com/knative/pkg/signals"
"go.uber.org/zap"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
Expand Down Expand Up @@ -74,23 +73,18 @@ func main() {
cfg.QPS = numControllers * rest.DefaultQPS
cfg.Burst = numControllers * rest.DefaultBurst
opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh)
// Setting up our own eventingClientSet as we need the messaging API introduced with natss.
eventingClientSet := clientset.NewForConfigOrDie(cfg)
eventingInformerFactory := informers.NewSharedInformerFactory(eventingClientSet, opt.ResyncPeriod)
messagingClientSet := clientset.NewForConfigOrDie(cfg)
messagingInformerFactory := informers.NewSharedInformerFactory(messagingClientSet, opt.ResyncPeriod)

// Messaging
natssChannelInformer := eventingInformerFactory.Messaging().V1alpha1().NatssChannels()

// Adding the scheme.
eventingScheme.AddToScheme(scheme.Scheme)
natssChannelInformer := messagingInformerFactory.Messaging().V1alpha1().NatssChannels()

// Build all of our controllers, with the clients constructed above.
// Add new controllers to this array.
// You also need to modify numControllers above to match this.
controllers := [...]*kncontroller.Impl{
natsschannel.NewController(
opt,
eventingClientSet,
natssDispatcher,
natssChannelInformer,
),
Expand All @@ -106,6 +100,12 @@ func main() {
opt.ConfigMapWatcher.Watch(logconfig.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, logconfig.Controller))
// TODO: Watch the observability config map and dynamically update metrics exporter.
//opt.ConfigMapWatcher.Watch(metrics.ObservabilityConfigName, metrics.UpdateExporterFromConfigMap(component, logger))

// Setup zipkin tracing.
if err = tracing.SetupDynamicZipkinPublishing(logger, opt.ConfigMapWatcher, "natss-ch-dispatcher"); err != nil {
logger.Fatalw("Error setting up Zipkin publishing", zap.Error(err))
}

if err := opt.ConfigMapWatcher.Start(stopCh); err != nil {
logger.Fatalw("failed to start configuration manager", zap.Error(err))
}
Expand Down
10 changes: 3 additions & 7 deletions contrib/natss/pkg/reconciler/controller/natsschannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ import (
"github.com/knative/pkg/apis"

"github.com/knative/eventing/contrib/natss/pkg/apis/messaging/v1alpha1"
clientset "github.com/knative/eventing/contrib/natss/pkg/client/clientset/versioned"
messaginginformers "github.com/knative/eventing/contrib/natss/pkg/client/informers/externalversions/messaging/v1alpha1"
listers "github.com/knative/eventing/contrib/natss/pkg/client/listers/messaging/v1alpha1"
"github.com/knative/eventing/contrib/natss/pkg/reconciler"
"github.com/knative/eventing/contrib/natss/pkg/reconciler/controller/resources"
"github.com/knative/eventing/pkg/logging"
"github.com/knative/eventing/pkg/reconciler"
"github.com/knative/pkg/controller"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -69,7 +68,6 @@ type Reconciler struct {
dispatcherDeploymentName string
dispatcherServiceName string

eventingClientSet clientset.Interface
natsschannelLister listers.NatssChannelLister
natsschannelInformer cache.SharedIndexInformer
deploymentLister appsv1listers.DeploymentLister
Expand All @@ -93,7 +91,6 @@ var _ cache.ResourceEventHandler = (*Reconciler)(nil)
// Registers event handlers to enqueue events.
func NewController(
opt reconciler.Options,
eventingClientSet clientset.Interface,
dispatcherNamespace string,
dispatcherDeploymentName string,
dispatcherServiceName string,
Expand All @@ -108,7 +105,6 @@ func NewController(
dispatcherNamespace: dispatcherNamespace,
dispatcherDeploymentName: dispatcherDeploymentName,
dispatcherServiceName: dispatcherServiceName,
eventingClientSet: eventingClientSet,
natsschannelLister: natsschannelInformer.Lister(),
natsschannelInformer: natsschannelInformer.Informer(),
deploymentLister: deploymentInformer.Lister(),
Expand Down Expand Up @@ -333,11 +329,11 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.NatssCh
existing := kc.DeepCopy()
existing.Status = desired.Status

new, err := r.eventingClientSet.MessagingV1alpha1().NatssChannels(desired.Namespace).UpdateStatus(existing)
new, err := r.NatssClientSet.MessagingV1alpha1().NatssChannels(desired.Namespace).UpdateStatus(existing)
if err == nil && becomesReady {
duration := time.Since(new.ObjectMeta.CreationTimestamp.Time)
r.Logger.Infof("NatssChannel %q became ready after %v", kc.Name, duration)
if err := r.StatsReporter.ReportReady("Channel", kc.Namespace, kc.Name, duration); err != nil {
if err := r.StatsReporter.ReportReady("NatssChannel", kc.Namespace, kc.Name, duration); err != nil {
r.Logger.Infof("Failed to record ready for NatssChannel %q: %v", kc.Name, err)
}
}
Expand Down
Loading