Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
informers "github.com/knative/eventing/pkg/client/informers/externalversions"
"github.com/knative/eventing/pkg/logconfig"
"github.com/knative/eventing/pkg/reconciler"
"github.com/knative/eventing/pkg/reconciler/inmemorychannel"
inmemorychannel "github.com/knative/eventing/pkg/reconciler/inmemorychannel/controller"
"github.com/knative/pkg/configmap"
kncontroller "github.com/knative/pkg/controller"
"github.com/knative/pkg/logging"
Expand Down
174 changes: 174 additions & 0 deletions cmd/in_memory/channel_dispatcher/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"flag"
"log"
"time"

informers "github.com/knative/eventing/pkg/client/informers/externalversions"
dispatcher "github.com/knative/eventing/pkg/inmemorychannel"
"github.com/knative/eventing/pkg/logconfig"
"github.com/knative/eventing/pkg/provisioners/swappable"
"github.com/knative/eventing/pkg/reconciler"
inmemorychannel "github.com/knative/eventing/pkg/reconciler/inmemorychannel/dispatcher"
"github.com/knative/pkg/configmap"
kncontroller "github.com/knative/pkg/controller"
"github.com/knative/pkg/logging"
"github.com/knative/pkg/signals"
"go.uber.org/zap"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

var (
hardcodedLoggingConfig = flag.Bool("hardCodedLoggingConfig", false, "If true, use the hard coded logging config. It is intended to be used only when debugging outside a Kubernetes cluster.")
masterURL = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")

readTimeout = 1 * time.Minute
writeTimeout = 1 * time.Minute
port = 8080
)

func main() {
flag.Parse()
logger, atomicLevel := setupLogger()
defer logger.Sync()

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

cfg, err := clientcmd.BuildConfigFromFlags(*masterURL, *kubeconfig)
if err != nil {
logger.Fatalw("Error building kubeconfig", zap.Error(err))
}

sh, err := swappable.NewEmptyHandler(logger.Desugar())
if err != nil {
logger.Fatal("Error creating swappable.Handler", zap.Error(err))
}

args := &dispatcher.InMemoryDispatcherArgs{
Port: port,
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
Handler: sh,
Logger: logger.Desugar(),
}
inMemoryDispatcher := dispatcher.NewDispatcher(args)

logger = logger.With(zap.String("controller/impl", "pkg"))
logger.Info("Starting the InMemory dispatcher")

const numControllers = 1
cfg.QPS = numControllers * rest.DefaultQPS
cfg.Burst = numControllers * rest.DefaultBurst
opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh)

eventingInformerFactory := informers.NewSharedInformerFactory(opt.EventingClientSet, opt.ResyncPeriod)

// Messaging
inMemoryChannelInformer := eventingInformerFactory.Messaging().V1alpha1().InMemoryChannels()

// Build all of our controllers, with the clients constructed above.
// Add new controllers to this array.
// You also need to modify numControllers above to match this.
controllers := [...]*kncontroller.Impl{
inmemorychannel.NewController(
opt,
inMemoryDispatcher,
inMemoryChannelInformer,
),
}
// This line asserts at compile time that the length of controllers is equal to numControllers.
// It is based on https://go101.org/article/tips.html#assert-at-compile-time, which notes that
// var _ [N-M]int
// asserts at compile time that N >= M, which we can use to establish equality of N and M:
// (N >= M) && (M >= N) => (N == M)
var _ [numControllers - len(controllers)][len(controllers) - numControllers]int

// Watch the logging config map and dynamically update logging levels.
opt.ConfigMapWatcher.Watch(logconfig.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, logconfig.Controller))
// TODO: Watch the observability config map and dynamically update metrics exporter.
//opt.ConfigMapWatcher.Watch(metrics.ObservabilityConfigName, metrics.UpdateExporterFromConfigMap(component, logger))
if err := opt.ConfigMapWatcher.Start(stopCh); err != nil {
logger.Fatalw("failed to start configuration manager", zap.Error(err))
}

// Start all of the informers and wait for them to sync.
logger.Info("Starting informers.")
if err := kncontroller.StartInformers(
stopCh,
// Messaging
inMemoryChannelInformer.Informer(),
); err != nil {
logger.Fatalf("Failed to start informers: %v", err)
}

go inMemoryDispatcher.Start(stopCh)

logger.Info("Starting controllers.")
kncontroller.StartAll(stopCh, controllers[:]...)

inMemoryDispatcher.Stop()
}

func setupLogger() (*zap.SugaredLogger, zap.AtomicLevel) {
// Set up our logger.
loggingConfigMap := getLoggingConfigOrDie()
loggingConfig, err := logging.NewConfigFromMap(loggingConfigMap)
if err != nil {
log.Fatalf("Error parsing logging configuration: %v", err)
}
return logging.NewLoggerFromConfig(loggingConfig, logconfig.Controller)
}

func getLoggingConfigOrDie() map[string]string {
if hardcodedLoggingConfig != nil && *hardcodedLoggingConfig {
return map[string]string{
"loglevel.controller": "info",
"zap-logger-config": `
{
"level": "info",
"development": false,
"outputPaths": ["stdout"],
"errorOutputPaths": ["stderr"],
"encoding": "json",
"encoderConfig": {
"timeKey": "ts",
"levelKey": "level",
"nameKey": "logger",
"callerKey": "caller",
"messageKey": "msg",
"stacktraceKey": "stacktrace",
"lineEnding": "",
"levelEncoder": "",
"timeEncoder": "iso8601",
"durationEncoder": "",
"callerEncoder": ""
}`,
}
} else {
cm, err := configmap.Load("/etc/config-logging")
if err != nil {
log.Fatalf("Error loading logging configuration: %v", err)
}
return cm
}
}
13 changes: 13 additions & 0 deletions config/200-controller-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,16 @@ rules:
- "get"
- "list"
- "watch"

# Messaging resources and statuses we care about.
- apiGroups:
- "messaging.knative.dev"
resources:
- "inmemorychannels"
- "inmemorychannels/status"
verbs:
- "get"
- "list"
- "watch"
- "update"
- "patch"
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,11 @@ rules:
- get
- list
- watch
- apiGroups:
- "" # Core API group.
resources:
- configmaps
verbs:
- get
- list
- watch
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ spec:
serviceAccountName: imc-controller
containers:
- name: controller
image: github.com/knative/eventing/cmd/in_memory_channel_controller
image: github.com/knative/eventing/cmd/in_memory/channel_controller
env:
- name: CONFIG_LOGGING_NAME
value: config-logging
Expand Down
11 changes: 10 additions & 1 deletion config/channels/in-memory-channel/500-dispatcher.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,18 @@ spec:
serviceAccountName: imc-dispatcher
containers:
- name: dispatcher
image: github.com/knative/eventing/cmd/in_memory/dispatcher
image: github.com/knative/eventing/cmd/in_memory/channel_dispatcher
env:
- name: CONFIG_LOGGING_NAME
value: config-logging
- name: SYSTEM_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
volumeMounts:
- name: config-logging
mountPath: /etc/config-logging
volumes:
- name: config-logging
configMap:
name: config-logging
84 changes: 84 additions & 0 deletions pkg/inmemorychannel/dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package inmemorychannel

import (
"context"
"fmt"
"net/http"
"time"

"github.com/knative/eventing/pkg/provisioners/multichannelfanout"
"github.com/knative/eventing/pkg/provisioners/swappable"
pkgtracing "github.com/knative/pkg/tracing"
"go.uber.org/zap"
)

type Dispatcher interface {
UpdateConfig(config *multichannelfanout.Config) error
}

type InMemoryDispatcher struct {
handler *swappable.Handler
server *http.Server

logger *zap.Logger
}

type InMemoryDispatcherArgs struct {
Port int
ReadTimeout time.Duration
WriteTimeout time.Duration
Handler *swappable.Handler
Logger *zap.Logger
}

func (d *InMemoryDispatcher) UpdateConfig(config *multichannelfanout.Config) error {
return d.handler.UpdateConfig(config)
}

// Start starts the inmemory dispatcher's message processing.
func (d *InMemoryDispatcher) Start(stopCh <-chan struct{}) error {
d.logger.Info("in memory dispatcher listening", zap.String("address", d.server.Addr))
return d.server.ListenAndServe()
}

func (d *InMemoryDispatcher) Stop() {
ctx, cancel := context.WithTimeout(context.Background(), d.server.WriteTimeout)
defer cancel()
if err := d.server.Shutdown(ctx); err != nil {
d.logger.Error("Shutdown returned an error", zap.Error(err))
}
}

func NewDispatcher(args *InMemoryDispatcherArgs) *InMemoryDispatcher {

server := &http.Server{
Addr: fmt.Sprintf(":%d", args.Port),
Handler: pkgtracing.HTTPSpanMiddleware(args.Handler),
ErrorLog: zap.NewStdLog(args.Logger),
ReadTimeout: args.ReadTimeout,
WriteTimeout: args.WriteTimeout,
}

dispatcher := &InMemoryDispatcher{
handler: args.Handler,
server: server,
logger: args.Logger,
}

return dispatcher
}
50 changes: 50 additions & 0 deletions pkg/inmemorychannel/dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package inmemorychannel

import (
"github.com/knative/eventing/pkg/provisioners/swappable"
"testing"
"time"

logtesting "github.com/knative/pkg/logging/testing"
)

func TestNewDispatcher(t *testing.T) {
defer logtesting.ClearAll()

logger := logtesting.TestLogger(t).Desugar()
sh, err := swappable.NewEmptyHandler(logger)

if err != nil {
t.Fatalf("Failed to create handler")
}

args := &InMemoryDispatcherArgs{
Port: 8080,
ReadTimeout: 1 * time.Minute,
WriteTimeout: 1 * time.Minute,
Handler: sh,
Logger: logger,
}

d := NewDispatcher(args)

if d == nil {
t.Fatalf("Failed to create with NewDispatcher")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package inmemorychannel
package controller

import (
messaginginformers "github.com/knative/eventing/pkg/client/informers/externalversions/messaging/v1alpha1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package inmemorychannel
package controller

import (
"testing"
Expand Down
Loading