diff --git a/cmd/in_memory_channel_controller/main.go b/cmd/in_memory/channel_controller/main.go similarity index 98% rename from cmd/in_memory_channel_controller/main.go rename to cmd/in_memory/channel_controller/main.go index 2436fa7f77d..73ab9f9788f 100644 --- a/cmd/in_memory_channel_controller/main.go +++ b/cmd/in_memory/channel_controller/main.go @@ -26,7 +26,7 @@ import ( informers "github.com/knative/eventing/pkg/client/informers/externalversions" "github.com/knative/eventing/pkg/logconfig" "github.com/knative/eventing/pkg/reconciler" - "github.com/knative/eventing/pkg/reconciler/inmemorychannel" + inmemorychannel "github.com/knative/eventing/pkg/reconciler/inmemorychannel/controller" "github.com/knative/pkg/configmap" kncontroller "github.com/knative/pkg/controller" "github.com/knative/pkg/logging" diff --git a/cmd/in_memory/channel_dispatcher/main.go b/cmd/in_memory/channel_dispatcher/main.go new file mode 100644 index 00000000000..defada3f7d7 --- /dev/null +++ b/cmd/in_memory/channel_dispatcher/main.go @@ -0,0 +1,174 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "flag" + "log" + "time" + + informers "github.com/knative/eventing/pkg/client/informers/externalversions" + dispatcher "github.com/knative/eventing/pkg/inmemorychannel" + "github.com/knative/eventing/pkg/logconfig" + "github.com/knative/eventing/pkg/provisioners/swappable" + "github.com/knative/eventing/pkg/reconciler" + inmemorychannel "github.com/knative/eventing/pkg/reconciler/inmemorychannel/dispatcher" + "github.com/knative/pkg/configmap" + kncontroller "github.com/knative/pkg/controller" + "github.com/knative/pkg/logging" + "github.com/knative/pkg/signals" + "go.uber.org/zap" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +var ( + hardcodedLoggingConfig = flag.Bool("hardCodedLoggingConfig", false, "If true, use the hard coded logging config. It is intended to be used only when debugging outside a Kubernetes cluster.") + masterURL = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") + kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") + + readTimeout = 1 * time.Minute + writeTimeout = 1 * time.Minute + port = 8080 +) + +func main() { + flag.Parse() + logger, atomicLevel := setupLogger() + defer logger.Sync() + + // set up signals so we handle the first shutdown signal gracefully + stopCh := signals.SetupSignalHandler() + + cfg, err := clientcmd.BuildConfigFromFlags(*masterURL, *kubeconfig) + if err != nil { + logger.Fatalw("Error building kubeconfig", zap.Error(err)) + } + + sh, err := swappable.NewEmptyHandler(logger.Desugar()) + if err != nil { + logger.Fatal("Error creating swappable.Handler", zap.Error(err)) + } + + args := &dispatcher.InMemoryDispatcherArgs{ + Port: port, + ReadTimeout: readTimeout, + WriteTimeout: writeTimeout, + Handler: sh, + Logger: logger.Desugar(), + } + inMemoryDispatcher := dispatcher.NewDispatcher(args) + + logger = logger.With(zap.String("controller/impl", "pkg")) + logger.Info("Starting the InMemory dispatcher") + + const numControllers = 1 + cfg.QPS = numControllers * rest.DefaultQPS + cfg.Burst = numControllers * rest.DefaultBurst + opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh) + + eventingInformerFactory := informers.NewSharedInformerFactory(opt.EventingClientSet, opt.ResyncPeriod) + + // Messaging + inMemoryChannelInformer := eventingInformerFactory.Messaging().V1alpha1().InMemoryChannels() + + // Build all of our controllers, with the clients constructed above. + // Add new controllers to this array. + // You also need to modify numControllers above to match this. + controllers := [...]*kncontroller.Impl{ + inmemorychannel.NewController( + opt, + inMemoryDispatcher, + inMemoryChannelInformer, + ), + } + // This line asserts at compile time that the length of controllers is equal to numControllers. + // It is based on https://go101.org/article/tips.html#assert-at-compile-time, which notes that + // var _ [N-M]int + // asserts at compile time that N >= M, which we can use to establish equality of N and M: + // (N >= M) && (M >= N) => (N == M) + var _ [numControllers - len(controllers)][len(controllers) - numControllers]int + + // Watch the logging config map and dynamically update logging levels. + opt.ConfigMapWatcher.Watch(logconfig.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, logconfig.Controller)) + // TODO: Watch the observability config map and dynamically update metrics exporter. + //opt.ConfigMapWatcher.Watch(metrics.ObservabilityConfigName, metrics.UpdateExporterFromConfigMap(component, logger)) + if err := opt.ConfigMapWatcher.Start(stopCh); err != nil { + logger.Fatalw("failed to start configuration manager", zap.Error(err)) + } + + // Start all of the informers and wait for them to sync. + logger.Info("Starting informers.") + if err := kncontroller.StartInformers( + stopCh, + // Messaging + inMemoryChannelInformer.Informer(), + ); err != nil { + logger.Fatalf("Failed to start informers: %v", err) + } + + go inMemoryDispatcher.Start(stopCh) + + logger.Info("Starting controllers.") + kncontroller.StartAll(stopCh, controllers[:]...) + + inMemoryDispatcher.Stop() +} + +func setupLogger() (*zap.SugaredLogger, zap.AtomicLevel) { + // Set up our logger. + loggingConfigMap := getLoggingConfigOrDie() + loggingConfig, err := logging.NewConfigFromMap(loggingConfigMap) + if err != nil { + log.Fatalf("Error parsing logging configuration: %v", err) + } + return logging.NewLoggerFromConfig(loggingConfig, logconfig.Controller) +} + +func getLoggingConfigOrDie() map[string]string { + if hardcodedLoggingConfig != nil && *hardcodedLoggingConfig { + return map[string]string{ + "loglevel.controller": "info", + "zap-logger-config": ` + { + "level": "info", + "development": false, + "outputPaths": ["stdout"], + "errorOutputPaths": ["stderr"], + "encoding": "json", + "encoderConfig": { + "timeKey": "ts", + "levelKey": "level", + "nameKey": "logger", + "callerKey": "caller", + "messageKey": "msg", + "stacktraceKey": "stacktrace", + "lineEnding": "", + "levelEncoder": "", + "timeEncoder": "iso8601", + "durationEncoder": "", + "callerEncoder": "" + }`, + } + } else { + cm, err := configmap.Load("/etc/config-logging") + if err != nil { + log.Fatalf("Error loading logging configuration: %v", err) + } + return cm + } +} diff --git a/config/200-controller-clusterrole.yaml b/config/200-controller-clusterrole.yaml index cf734475777..67b99476f9a 100644 --- a/config/200-controller-clusterrole.yaml +++ b/config/200-controller-clusterrole.yaml @@ -100,3 +100,16 @@ rules: - "get" - "list" - "watch" + + # Messaging resources and statuses we care about. + - apiGroups: + - "messaging.knative.dev" + resources: + - "inmemorychannels" + - "inmemorychannels/status" + verbs: + - "get" + - "list" + - "watch" + - "update" + - "patch" diff --git a/config/channels/in-memory-channel/200-dispatcher-clusterrole.yaml b/config/channels/in-memory-channel/200-dispatcher-clusterrole.yaml index 8839ff61cf1..bcdc80115c1 100644 --- a/config/channels/in-memory-channel/200-dispatcher-clusterrole.yaml +++ b/config/channels/in-memory-channel/200-dispatcher-clusterrole.yaml @@ -25,3 +25,11 @@ rules: - get - list - watch + - apiGroups: + - "" # Core API group. + resources: + - configmaps + verbs: + - get + - list + - watch diff --git a/config/channels/in-memory-channel/400-controller.yaml b/config/channels/in-memory-channel/500-controller.yaml similarity index 95% rename from config/channels/in-memory-channel/400-controller.yaml rename to config/channels/in-memory-channel/500-controller.yaml index 9d09c44fd79..2fb08924057 100644 --- a/config/channels/in-memory-channel/400-controller.yaml +++ b/config/channels/in-memory-channel/500-controller.yaml @@ -29,7 +29,7 @@ spec: serviceAccountName: imc-controller containers: - name: controller - image: github.com/knative/eventing/cmd/in_memory_channel_controller + image: github.com/knative/eventing/cmd/in_memory/channel_controller env: - name: CONFIG_LOGGING_NAME value: config-logging diff --git a/config/channels/in-memory-channel/500-dispatcher.yaml b/config/channels/in-memory-channel/500-dispatcher.yaml index 09d3d41029a..c89eb8f8f47 100644 --- a/config/channels/in-memory-channel/500-dispatcher.yaml +++ b/config/channels/in-memory-channel/500-dispatcher.yaml @@ -29,9 +29,18 @@ spec: serviceAccountName: imc-dispatcher containers: - name: dispatcher - image: github.com/knative/eventing/cmd/in_memory/dispatcher + image: github.com/knative/eventing/cmd/in_memory/channel_dispatcher env: + - name: CONFIG_LOGGING_NAME + value: config-logging - name: SYSTEM_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace + volumeMounts: + - name: config-logging + mountPath: /etc/config-logging + volumes: + - name: config-logging + configMap: + name: config-logging diff --git a/pkg/inmemorychannel/dispatcher.go b/pkg/inmemorychannel/dispatcher.go new file mode 100644 index 00000000000..acbd71ec932 --- /dev/null +++ b/pkg/inmemorychannel/dispatcher.go @@ -0,0 +1,84 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package inmemorychannel + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/knative/eventing/pkg/provisioners/multichannelfanout" + "github.com/knative/eventing/pkg/provisioners/swappable" + pkgtracing "github.com/knative/pkg/tracing" + "go.uber.org/zap" +) + +type Dispatcher interface { + UpdateConfig(config *multichannelfanout.Config) error +} + +type InMemoryDispatcher struct { + handler *swappable.Handler + server *http.Server + + logger *zap.Logger +} + +type InMemoryDispatcherArgs struct { + Port int + ReadTimeout time.Duration + WriteTimeout time.Duration + Handler *swappable.Handler + Logger *zap.Logger +} + +func (d *InMemoryDispatcher) UpdateConfig(config *multichannelfanout.Config) error { + return d.handler.UpdateConfig(config) +} + +// Start starts the inmemory dispatcher's message processing. +func (d *InMemoryDispatcher) Start(stopCh <-chan struct{}) error { + d.logger.Info("in memory dispatcher listening", zap.String("address", d.server.Addr)) + return d.server.ListenAndServe() +} + +func (d *InMemoryDispatcher) Stop() { + ctx, cancel := context.WithTimeout(context.Background(), d.server.WriteTimeout) + defer cancel() + if err := d.server.Shutdown(ctx); err != nil { + d.logger.Error("Shutdown returned an error", zap.Error(err)) + } +} + +func NewDispatcher(args *InMemoryDispatcherArgs) *InMemoryDispatcher { + + server := &http.Server{ + Addr: fmt.Sprintf(":%d", args.Port), + Handler: pkgtracing.HTTPSpanMiddleware(args.Handler), + ErrorLog: zap.NewStdLog(args.Logger), + ReadTimeout: args.ReadTimeout, + WriteTimeout: args.WriteTimeout, + } + + dispatcher := &InMemoryDispatcher{ + handler: args.Handler, + server: server, + logger: args.Logger, + } + + return dispatcher +} diff --git a/pkg/inmemorychannel/dispatcher_test.go b/pkg/inmemorychannel/dispatcher_test.go new file mode 100644 index 00000000000..502a19564ae --- /dev/null +++ b/pkg/inmemorychannel/dispatcher_test.go @@ -0,0 +1,50 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package inmemorychannel + +import ( + "github.com/knative/eventing/pkg/provisioners/swappable" + "testing" + "time" + + logtesting "github.com/knative/pkg/logging/testing" +) + +func TestNewDispatcher(t *testing.T) { + defer logtesting.ClearAll() + + logger := logtesting.TestLogger(t).Desugar() + sh, err := swappable.NewEmptyHandler(logger) + + if err != nil { + t.Fatalf("Failed to create handler") + } + + args := &InMemoryDispatcherArgs{ + Port: 8080, + ReadTimeout: 1 * time.Minute, + WriteTimeout: 1 * time.Minute, + Handler: sh, + Logger: logger, + } + + d := NewDispatcher(args) + + if d == nil { + t.Fatalf("Failed to create with NewDispatcher") + } +} diff --git a/pkg/reconciler/inmemorychannel/controller.go b/pkg/reconciler/inmemorychannel/controller/controller.go similarity index 99% rename from pkg/reconciler/inmemorychannel/controller.go rename to pkg/reconciler/inmemorychannel/controller/controller.go index 52f13d9ec99..0a3513074c4 100644 --- a/pkg/reconciler/inmemorychannel/controller.go +++ b/pkg/reconciler/inmemorychannel/controller/controller.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package inmemorychannel +package controller import ( messaginginformers "github.com/knative/eventing/pkg/client/informers/externalversions/messaging/v1alpha1" diff --git a/pkg/reconciler/inmemorychannel/controller_test.go b/pkg/reconciler/inmemorychannel/controller/controller_test.go similarity index 98% rename from pkg/reconciler/inmemorychannel/controller_test.go rename to pkg/reconciler/inmemorychannel/controller/controller_test.go index c4a0254a4c5..6d652ab58e1 100644 --- a/pkg/reconciler/inmemorychannel/controller_test.go +++ b/pkg/reconciler/inmemorychannel/controller/controller_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package inmemorychannel +package controller import ( "testing" diff --git a/pkg/reconciler/inmemorychannel/inmemorychannel.go b/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go similarity index 99% rename from pkg/reconciler/inmemorychannel/inmemorychannel.go rename to pkg/reconciler/inmemorychannel/controller/inmemorychannel.go index 804fa22d941..c7fd97fb996 100644 --- a/pkg/reconciler/inmemorychannel/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package inmemorychannel +package controller import ( "context" @@ -27,7 +27,7 @@ import ( listers "github.com/knative/eventing/pkg/client/listers/messaging/v1alpha1" "github.com/knative/eventing/pkg/logging" "github.com/knative/eventing/pkg/reconciler" - "github.com/knative/eventing/pkg/reconciler/inmemorychannel/resources" + "github.com/knative/eventing/pkg/reconciler/inmemorychannel/controller/resources" "github.com/knative/eventing/pkg/utils" "github.com/knative/pkg/apis" "github.com/knative/pkg/controller" diff --git a/pkg/reconciler/inmemorychannel/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go similarity index 98% rename from pkg/reconciler/inmemorychannel/inmemorychannel_test.go rename to pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go index d013e65658b..bb99c9fbcaf 100644 --- a/pkg/reconciler/inmemorychannel/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go @@ -14,10 +14,11 @@ See the License for the specific language governing permissions and limitations under the License. */ -package inmemorychannel +package controller import ( "fmt" + "github.com/knative/eventing/pkg/reconciler/inmemorychannel/controller/resources" "testing" "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" @@ -335,7 +336,7 @@ func makeChannelService(imc *v1alpha1.InMemoryChannel) *corev1.Service { Namespace: testNS, Name: fmt.Sprintf("%s-kn-channel", imcName), Labels: map[string]string{ - "eventing.knative.dev/role": "in-memory-channel", + resources.MessagingRoleLabel: resources.MessagingRole, }, OwnerReferences: []metav1.OwnerReference{ *kmeta.NewControllerRef(imc), @@ -358,7 +359,7 @@ func makeChannelServiceNotOwnedByUs(imc *v1alpha1.InMemoryChannel) *corev1.Servi Namespace: testNS, Name: fmt.Sprintf("%s-kn-channel", imcName), Labels: map[string]string{ - "eventing.knative.dev/role": "in-memory-channel", + resources.MessagingRoleLabel: resources.MessagingRole, }, }, Spec: corev1.ServiceSpec{ diff --git a/pkg/reconciler/inmemorychannel/resources/service.go b/pkg/reconciler/inmemorychannel/controller/resources/service.go similarity index 91% rename from pkg/reconciler/inmemorychannel/resources/service.go rename to pkg/reconciler/inmemorychannel/controller/resources/service.go index 3a0a34a6787..edb77c9305b 100644 --- a/pkg/reconciler/inmemorychannel/resources/service.go +++ b/pkg/reconciler/inmemorychannel/controller/resources/service.go @@ -11,10 +11,10 @@ import ( ) const ( - PortName = "http" - PortNumber = 80 - EventingRoleLabel = "eventing.knative.dev/role" - EventingRole = "in-memory-channel" + PortName = "http" + PortNumber = 80 + MessagingRoleLabel = "messaging.knative.dev/role" + MessagingRole = "in-memory-channel" ) // ServiceOption can be used to optionally modify the K8s service in CreateK8sService @@ -54,7 +54,7 @@ func NewK8sService(imc *v1alpha1.InMemoryChannel, opts ...K8sServiceOption) (*co Name: CreateChannelServiceName(imc.ObjectMeta.Name), Namespace: imc.Namespace, Labels: map[string]string{ - EventingRoleLabel: EventingRole, + MessagingRoleLabel: MessagingRole, }, OwnerReferences: []metav1.OwnerReference{ *kmeta.NewControllerRef(imc), diff --git a/pkg/reconciler/inmemorychannel/resources/service_test.go b/pkg/reconciler/inmemorychannel/controller/resources/service_test.go similarity index 97% rename from pkg/reconciler/inmemorychannel/resources/service_test.go rename to pkg/reconciler/inmemorychannel/controller/resources/service_test.go index 202042cac35..527fb65a515 100644 --- a/pkg/reconciler/inmemorychannel/resources/service_test.go +++ b/pkg/reconciler/inmemorychannel/controller/resources/service_test.go @@ -64,7 +64,7 @@ func TestNewK8sService(t *testing.T) { Name: fmt.Sprintf("%s-kn-channel", imcName), Namespace: testNS, Labels: map[string]string{ - EventingRoleLabel: EventingRole, + MessagingRoleLabel: MessagingRole, }, OwnerReferences: []metav1.OwnerReference{ *kmeta.NewControllerRef(imc), @@ -107,7 +107,7 @@ func TestNewK8sServiceWithExternal(t *testing.T) { Name: fmt.Sprintf("%s-kn-channel", imcName), Namespace: testNS, Labels: map[string]string{ - EventingRoleLabel: EventingRole, + MessagingRoleLabel: MessagingRole, }, OwnerReferences: []metav1.OwnerReference{ *kmeta.NewControllerRef(imc), diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go new file mode 100644 index 00000000000..ee6b247d27f --- /dev/null +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -0,0 +1,137 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "github.com/knative/eventing/pkg/inmemorychannel" + + "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + messaginginformers "github.com/knative/eventing/pkg/client/informers/externalversions/messaging/v1alpha1" + listers "github.com/knative/eventing/pkg/client/listers/messaging/v1alpha1" + "github.com/knative/eventing/pkg/logging" + "github.com/knative/eventing/pkg/provisioners/fanout" + "github.com/knative/eventing/pkg/provisioners/multichannelfanout" + "github.com/knative/eventing/pkg/reconciler" + "github.com/knative/pkg/controller" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +const ( + // ReconcilerName is the name of the reconciler. + ReconcilerName = "InMemoryChannels" + + // controllerAgentName is the string used by this controller to identify + // itself when creating events. + controllerAgentName = "in-memory-channel-dispatcher" +) + +// Reconciler reconciles InMemory Channels. +type Reconciler struct { + *reconciler.Base + + dispatcher inmemorychannel.Dispatcher + inmemorychannelLister listers.InMemoryChannelLister + inmemorychannelInformer cache.SharedIndexInformer + impl *controller.Impl +} + +// Check that our Reconciler implements controller.Reconciler. +var _ controller.Reconciler = (*Reconciler)(nil) + +// NewController initializes the controller and is called by the generated code. +// Registers event handlers to enqueue events. +func NewController( + opt reconciler.Options, + dispatcher inmemorychannel.Dispatcher, + inmemorychannelinformer messaginginformers.InMemoryChannelInformer, +) *controller.Impl { + + r := &Reconciler{ + Base: reconciler.NewBase(opt, controllerAgentName), + dispatcher: dispatcher, + inmemorychannelLister: inmemorychannelinformer.Lister(), + inmemorychannelInformer: inmemorychannelinformer.Informer(), + } + r.impl = controller.NewImpl(r, r.Logger, ReconcilerName) + + r.Logger.Info("Setting up event handlers") + + // Watch for inmemory channels. + r.inmemorychannelInformer.AddEventHandler(controller.HandleAll(r.impl.Enqueue)) + + return r.impl +} + +func (r *Reconciler) Reconcile(ctx context.Context, key string) error { + // Convert the namespace/name string into a distinct namespace and name. + _, _, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + logging.FromContext(ctx).Error("invalid resource key") + return nil + } + + // This is a special Reconciler that does the following: + // 1. Lists the inmemory channels. + // 2. Creates a multi-channel-fanout-config. + // 3. Calls the inmemory channel dispatcher's updateConfig func with the new multi-channel-fanout-config. + + channels, err := r.inmemorychannelLister.List(labels.Everything()) + if err != nil { + logging.FromContext(ctx).Error("Error listing InMemory channels") + return err + } + + inmemoryChannels := make([]*v1alpha1.InMemoryChannel, 0) + for _, channel := range channels { + if channel.Status.IsReady() { + inmemoryChannels = append(inmemoryChannels, channel) + } + } + + config := r.newConfigFromInMemoryChannels(inmemoryChannels) + err = r.dispatcher.UpdateConfig(config) + if err != nil { + logging.FromContext(ctx).Error("Error updating InMemory dispatcher config") + return err + } + + return nil +} + +// newConfigFromInMemoryChannels creates a new Config from the list of inmemory channels. +func (r *Reconciler) newConfigFromInMemoryChannels(channels []*v1alpha1.InMemoryChannel) *multichannelfanout.Config { + cc := make([]multichannelfanout.ChannelConfig, 0) + for _, c := range channels { + channelConfig := multichannelfanout.ChannelConfig{ + Namespace: c.Namespace, + Name: c.Name, + HostName: c.Status.Address.Hostname, + } + if c.Spec.Subscribable != nil { + channelConfig.FanoutConfig = fanout.Config{ + AsyncHandler: true, + Subscriptions: c.Spec.Subscribable.Subscribers, + } + } + cc = append(cc, channelConfig) + } + return &multichannelfanout.Config{ + ChannelConfigs: cc, + } +} diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go new file mode 100644 index 00000000000..3642909f547 --- /dev/null +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go @@ -0,0 +1,121 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "github.com/knative/eventing/pkg/inmemorychannel" + "github.com/knative/eventing/pkg/provisioners/multichannelfanout" + "k8s.io/apimachinery/pkg/runtime" + "testing" + + "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + fakeclientset "github.com/knative/eventing/pkg/client/clientset/versioned/fake" + informers "github.com/knative/eventing/pkg/client/informers/externalversions" + "github.com/knative/eventing/pkg/reconciler" + reconciletesting "github.com/knative/eventing/pkg/reconciler/testing" + "github.com/knative/pkg/controller" + logtesting "github.com/knative/pkg/logging/testing" + . "github.com/knative/pkg/reconciler/testing" + fakekubeclientset "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" +) + +const ( + testNS = "test-namespace" + imcName = "test-imc" + channelServiceAddress = "test-imc-kn-channel.test-namespace.svc.cluster.local" +) + +func init() { + // Add types to scheme + _ = v1alpha1.AddToScheme(scheme.Scheme) +} + +func TestNewController(t *testing.T) { + kubeClient := fakekubeclientset.NewSimpleClientset() + eventingClient := fakeclientset.NewSimpleClientset() + + // Create informer factories with fake clients. The second parameter sets the + // resync period to zero, disabling it. + eventingInformerFactory := informers.NewSharedInformerFactory(eventingClient, 0) + + // Eventing + imcInformer := eventingInformerFactory.Messaging().V1alpha1().InMemoryChannels() + + dispatcher := &inmemorychannel.InMemoryDispatcher{} + + c := NewController( + reconciler.Options{ + KubeClientSet: kubeClient, + EventingClientSet: eventingClient, + Logger: logtesting.TestLogger(t), + }, + dispatcher, + imcInformer) + + if c == nil { + t.Fatalf("Failed to create with NewController") + } +} + +func TestAllCases(t *testing.T) { + imcKey := testNS + "/" + imcName + table := TableTest{ + { + Name: "bad workqueue key", + // Make sure Reconcile handles bad keys. + Key: "too/many/parts", + }, { + Name: "updated configuration, no channels", + Key: imcKey, + WantErr: false, + }, { + Name: "updated configuration, one channel", + Key: imcKey, + Objects: []runtime.Object{ + reconciletesting.NewInMemoryChannel(imcName, testNS, + reconciletesting.WithInitInMemoryChannelConditions, + reconciletesting.WithInMemoryChannelDeploymentReady(), + reconciletesting.WithInMemoryChannelServiceReady(), + reconciletesting.WithInMemoryChannelEndpointsReady(), + reconciletesting.WithInMemoryChannelChannelServiceReady(), + reconciletesting.WithInMemoryChannelAddress(channelServiceAddress)), + }, + WantErr: false, + }, {}, + } + defer logtesting.ClearAll() + + table.Test(t, reconciletesting.MakeFactory(func(listers *reconciletesting.Listers, opt reconciler.Options) controller.Reconciler { + return &Reconciler{ + Base: reconciler.NewBase(opt, controllerAgentName), + inmemorychannelLister: listers.GetInMemoryChannelLister(), + // TODO fix + inmemorychannelInformer: nil, + dispatcher: &fakeDispatcher{}, + } + }, + false, + )) +} + +type fakeDispatcher struct{} + +func (d *fakeDispatcher) UpdateConfig(config *multichannelfanout.Config) error { + // TODO set error + return nil +}