Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
259f727
messaging
nachocano May 20, 2019
5f5fa9d
adding types
nachocano May 20, 2019
3d5e3db
configs
nachocano May 20, 2019
ecbff88
more stuff
nachocano May 20, 2019
e843960
compiling controller
nachocano May 20, 2019
63365eb
updating channel spec
nachocano May 20, 2019
677f02d
moving types down
nachocano May 20, 2019
01e0efc
updates to UTs
nachocano May 20, 2019
c308652
adding validation that none is empty
nachocano May 20, 2019
7023499
more updates
nachocano May 21, 2019
bb1c93e
Merge remote-tracking branch 'upstream/master' into kafka-crd
nachocano May 21, 2019
b660845
Merge remote-tracking branch 'upstream/master' into kafka-crd
nachocano May 21, 2019
9aeaa7f
more updates
nachocano May 22, 2019
7ee0863
broken tests...
nachocano May 22, 2019
db55bcd
Merge remote-tracking branch 'upstream/master' into kafka-crd
nachocano May 22, 2019
5217601
rollbacking changes
nachocano May 22, 2019
881a3fb
more rollbacks
nachocano May 22, 2019
07c383a
rollback controller main
nachocano May 22, 2019
a7cd931
moving things down
nachocano May 22, 2019
38a9b45
updating config map
nachocano May 22, 2019
f899572
setting defaults
nachocano May 22, 2019
6577838
removing unnecessary file
nachocano May 22, 2019
8088142
removing clientArgs
nachocano May 22, 2019
5edb0d9
removing params
nachocano May 22, 2019
2556368
config-kafka
nachocano May 22, 2019
affe681
patching
nachocano May 22, 2019
23e8e01
updates
nachocano May 22, 2019
5340871
to int
nachocano May 22, 2019
aaa21ec
still errors
nachocano May 22, 2019
9bf97ab
changing dispatcher name
nachocano May 22, 2019
02f8127
properly removing finalizer
nachocano May 22, 2019
a06fe96
Controller part of the Kafka CRD.
nachocano May 22, 2019
5b8d417
cosmetics
nachocano May 22, 2019
be46c2d
register as other projects
nachocano May 22, 2019
de3470b
sockpuppet
nachocano May 22, 2019
f66a616
Merge remote-tracking branch 'upstream/master' into kafka-crd-1
nachocano May 22, 2019
6e57e7a
removing channelInformer from subscription controller as it is not ne…
nachocano May 23, 2019
cf934f9
properly mark subscription as not ready
nachocano May 23, 2019
744b4bd
marking subscription as not ready...
nachocano May 23, 2019
036e714
rolling back the change of removing the channelInformer
nachocano May 23, 2019
61ebc8c
add TODO
nachocano May 23, 2019
6632fbc
moved it to controller package as I'll be creating a dispatcher one
nachocano May 23, 2019
e043ae5
wip, not compiling
nachocano May 23, 2019
c9e6da6
Merge remote-tracking branch 'upstream/master' into channelable
nachocano May 23, 2019
13d2420
removing channelInformer from subscription controller
nachocano May 23, 2019
72d192a
Merge remote-tracking branch 'upstream/master' into kafka-crd-1
nachocano May 23, 2019
65ac322
Merge branch 'kafka-crd-1' into kafka-crd-dispatcher
nachocano May 23, 2019
c486ae0
updates
nachocano May 23, 2019
d460329
cosmetics
nachocano May 23, 2019
169eeeb
sockpuppetttttttt
nachocano May 23, 2019
a929e2a
Merge branch 'kafka-crd-1' into kafka-crd-dispatcher
nachocano May 23, 2019
0bb2a23
Merge remote-tracking branch 'upstream/master' into kafka-crd-1
nachocano May 23, 2019
33cebbe
Merge branch 'kafka-crd-1' into kafka-crd-dispatcher
nachocano May 23, 2019
96033d7
adding new kafka dispatcher controller instead of the previous one
nachocano May 23, 2019
4d35909
updating tests
nachocano May 23, 2019
20fc9b9
starting dispatcher in separate thread
nachocano May 23, 2019
22b3cd1
updating subscribable validation
nachocano May 23, 2019
4eaf403
updating subscription channel pattern
nachocano May 23, 2019
ebc93f0
Merge remote-tracking branch 'upstream/master' into channelable
nachocano May 23, 2019
0f63397
Allow creating subscriptions to CRD channels.
nachocano May 23, 2019
62480f3
updating UTs
nachocano May 23, 2019
3085dfb
cosmetic
nachocano May 23, 2019
13fa52c
Merge branch 'channelable' into kafka-crd-dispatcher
nachocano May 23, 2019
1865d7e
back to replace
nachocano May 23, 2019
8f5d7e0
Merge remote-tracking branch 'upstream/master' into kafka-crd-1
nachocano May 24, 2019
6bdd0cf
rollback change
nachocano May 24, 2019
97a1dd4
rollback change
nachocano May 24, 2019
323a1cd
Merge remote-tracking branch 'upstream/master' into kafka-crd-1
nachocano May 26, 2019
0fdb8d4
updates after code review comments
nachocano May 26, 2019
575099d
unneeded TODO
nachocano May 26, 2019
ed544f0
changing e2e kafka dir folder
nachocano May 26, 2019
d366f72
Merge remote-tracking branch 'upstream/master' into kafka-crd-1
nachocano May 28, 2019
8a54ef0
adding new scheme
nachocano May 28, 2019
e2f8b31
Merge remote-tracking branch 'upstream/master' into kafka-crd-1
nachocano May 28, 2019
cfc1e65
renamed ccp to provisioner
nachocano May 28, 2019
9a91415
passing interface instead
nachocano May 28, 2019
2982f4c
Merge remote-tracking branch 'upstream/master' into kafka-crd-1
nachocano May 28, 2019
d41d5d3
new function available
nachocano May 28, 2019
2876113
removing commented UT to add it in a follow up PR
nachocano May 29, 2019
5fe4f8e
Merge remote-tracking branch 'upstream/master' into kafka-crd-1
nachocano May 29, 2019
f21fc73
update after merge
nachocano May 29, 2019
8f71f9f
adding the subscribable label
nachocano May 29, 2019
28556ab
Merge remote-tracking branch 'upstream/master' into kafka-crd-1
nachocano May 30, 2019
28b68c3
Merge remote-tracking branch 'upstream/master' into kafka-crd-1
nachocano May 31, 2019
21aa9ac
renaming in trunk
nachocano May 31, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 191 additions & 0 deletions contrib/kafka/cmd/channel_controller/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"flag"
"github.com/knative/eventing/contrib/kafka/pkg/utils"
"log"

clientset "github.com/knative/eventing/contrib/kafka/pkg/client/clientset/versioned"
eventingScheme "github.com/knative/eventing/contrib/kafka/pkg/client/clientset/versioned/scheme"
informers "github.com/knative/eventing/contrib/kafka/pkg/client/informers/externalversions"
kafkachannel "github.com/knative/eventing/contrib/kafka/pkg/reconciler/controller"
"github.com/knative/eventing/pkg/logconfig"
"github.com/knative/eventing/pkg/logging"
"github.com/knative/eventing/pkg/reconciler"
"github.com/knative/pkg/configmap"
kncontroller "github.com/knative/pkg/controller"
"github.com/knative/pkg/signals"
"github.com/knative/pkg/system"
"go.uber.org/zap"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

const (
dispatcherDeploymentName = "kafka-ch-dispatcher"
dispatcherServiceName = "kafka-ch-dispatcher"
)

var (
hardcodedLoggingConfig = flag.Bool("hardCodedLoggingConfig", false, "If true, use the hard coded logging config. It is intended to be used only when debugging outside a Kubernetes cluster.")
masterURL = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
)

func main() {
flag.Parse()
logger, atomicLevel := setupLogger()
defer logger.Sync()

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

cfg, err := clientcmd.BuildConfigFromFlags(*masterURL, *kubeconfig)
if err != nil {
logger.Fatalw("Error building kubeconfig", zap.Error(err))
}

// TODO the underlying config map needs to be watched and the config should be reloaded if there is a change.
kafkaConfig, err := utils.GetKafkaConfig("/etc/config-kafka")
if err != nil {
logger.Fatalw("Error loading kafka config", zap.Error(err))
}

logger = logger.With(zap.String("controller/impl", "pkg"))
logger.Info("Starting the Kafka controller")

systemNS := system.Namespace()

const numControllers = 1
cfg.QPS = numControllers * rest.DefaultQPS
cfg.Burst = numControllers * rest.DefaultBurst
opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh)
// Setting up our own eventingClientSet as we need the messaging API introduced with kafka.
eventingClientSet := clientset.NewForConfigOrDie(cfg)

kubeInformerFactory := kubeinformers.NewSharedInformerFactory(opt.KubeClientSet, opt.ResyncPeriod)
eventingInformerFactory := informers.NewSharedInformerFactory(eventingClientSet, opt.ResyncPeriod)

// Messaging
kafkaChannelInformer := eventingInformerFactory.Messaging().V1alpha1().KafkaChannels()

// Kube
serviceInformer := kubeInformerFactory.Core().V1().Services()
endpointsInformer := kubeInformerFactory.Core().V1().Endpoints()
deploymentInformer := kubeInformerFactory.Apps().V1().Deployments()

// Adding the scheme.
eventingScheme.AddToScheme(scheme.Scheme)

// Build all of our controllers, with the clients constructed above.
// Add new controllers to this array.
// You also need to modify numControllers above to match this.
controllers := [...]*kncontroller.Impl{
kafkachannel.NewController(
opt,
eventingClientSet,
kafkaConfig,
systemNS,
dispatcherDeploymentName,
dispatcherServiceName,
kafkaChannelInformer,
deploymentInformer,
serviceInformer,
endpointsInformer,
),
}
// This line asserts at compile time that the length of controllers is equal to numControllers.
// It is based on https://go101.org/article/tips.html#assert-at-compile-time, which notes that
// var _ [N-M]int
// asserts at compile time that N >= M, which we can use to establish equality of N and M:
// (N >= M) && (M >= N) => (N == M)
var _ [numControllers - len(controllers)][len(controllers) - numControllers]int

// Watch the logging config map and dynamically update logging levels.
opt.ConfigMapWatcher.Watch(logconfig.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, logconfig.Controller))
// TODO: Watch the observability config map and dynamically update metrics exporter.
//opt.ConfigMapWatcher.Watch(metrics.ObservabilityConfigName, metrics.UpdateExporterFromConfigMap(component, logger))
if err := opt.ConfigMapWatcher.Start(stopCh); err != nil {
logger.Fatalw("failed to start configuration manager", zap.Error(err))
}

// Start all of the informers and wait for them to sync.
logger.Info("Starting informers.")
if err := kncontroller.StartInformers(
stopCh,
// Messaging
kafkaChannelInformer.Informer(),

// Kube
serviceInformer.Informer(),
deploymentInformer.Informer(),
endpointsInformer.Informer(),
); err != nil {
logger.Fatalf("Failed to start informers: %v", err)
}

logger.Info("Starting controllers.")
kncontroller.StartAll(stopCh, controllers[:]...)
}

func setupLogger() (*zap.SugaredLogger, zap.AtomicLevel) {
// Set up our logger.
loggingConfigMap := getLoggingConfigOrDie()
loggingConfig, err := logging.NewConfigFromMap(loggingConfigMap)
if err != nil {
log.Fatalf("Error parsing logging configuration: %v", err)
}
return logging.NewLoggerFromConfig(loggingConfig, logconfig.Controller)
}

func getLoggingConfigOrDie() map[string]string {
if hardcodedLoggingConfig != nil && *hardcodedLoggingConfig {
return map[string]string{
"loglevel.controller": "info",
"zap-logger-config": `
{
"level": "info",
"development": false,
"outputPaths": ["stdout"],
"errorOutputPaths": ["stderr"],
"encoding": "json",
"encoderConfig": {
"timeKey": "ts",
"levelKey": "level",
"nameKey": "logger",
"callerKey": "caller",
"messageKey": "msg",
"stacktraceKey": "stacktrace",
"lineEnding": "",
"levelEncoder": "",
"timeEncoder": "iso8601",
"durationEncoder": "",
"callerEncoder": ""
}`,
}
} else {
cm, err := configmap.Load("/etc/config-logging")
if err != nil {
log.Fatalf("Error loading logging configuration: %v", err)
}
return cm
}
}
179 changes: 179 additions & 0 deletions contrib/kafka/cmd/channel_dispatcher/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"flag"
"github.com/knative/eventing/contrib/kafka/pkg/utils"
"log"

clientset "github.com/knative/eventing/contrib/kafka/pkg/client/clientset/versioned"
eventingScheme "github.com/knative/eventing/contrib/kafka/pkg/client/clientset/versioned/scheme"
informers "github.com/knative/eventing/contrib/kafka/pkg/client/informers/externalversions"
"github.com/knative/eventing/contrib/kafka/pkg/dispatcher"
kafkachannel "github.com/knative/eventing/contrib/kafka/pkg/reconciler/dispatcher"
"github.com/knative/eventing/pkg/logconfig"
"github.com/knative/eventing/pkg/logging"
"github.com/knative/eventing/pkg/reconciler"
"github.com/knative/pkg/configmap"
kncontroller "github.com/knative/pkg/controller"
"github.com/knative/pkg/signals"
"go.uber.org/zap"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

var (
hardcodedLoggingConfig = flag.Bool("hardCodedLoggingConfig", false, "If true, use the hard coded logging config. It is intended to be used only when debugging outside a Kubernetes cluster.")
masterURL = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
)

func main() {
flag.Parse()
logger, atomicLevel := setupLogger()
defer logger.Sync()

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

cfg, err := clientcmd.BuildConfigFromFlags(*masterURL, *kubeconfig)
if err != nil {
logger.Fatalw("Error building kubeconfig", zap.Error(err))
}

kafkaConfig, err := utils.GetKafkaConfig("/etc/config-kafka")
if err != nil {
logger.Fatalw("Error loading kafka config", zap.Error(err))
}

args := &dispatcher.KafkaDispatcherArgs{
ClientID: "kafka-ch-dispatcher",
Brokers: kafkaConfig.Brokers,
ConsumerMode: kafkaConfig.ConsumerMode,
TopicFunc: utils.TopicName,
Logger: logger.Desugar(),
}
kafkaDispatcher, err := dispatcher.NewDispatcher(args)
if err != nil {
logger.Fatalw("Unable to create kafka dispatcher", zap.Error(err))
}

logger = logger.With(zap.String("controller/impl", "pkg"))
logger.Info("Starting the Kafka dispatcher")

const numControllers = 1
cfg.QPS = numControllers * rest.DefaultQPS
cfg.Burst = numControllers * rest.DefaultBurst
opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh)
// Setting up our own eventingClientSet as we need the messaging API introduced with kafka.
eventingClientSet := clientset.NewForConfigOrDie(cfg)
eventingInformerFactory := informers.NewSharedInformerFactory(eventingClientSet, opt.ResyncPeriod)

// Messaging
kafkaChannelInformer := eventingInformerFactory.Messaging().V1alpha1().KafkaChannels()

// Adding the scheme.
eventingScheme.AddToScheme(scheme.Scheme)

// Build all of our controllers, with the clients constructed above.
// Add new controllers to this array.
// You also need to modify numControllers above to match this.
controllers := [...]*kncontroller.Impl{
kafkachannel.NewController(
opt,
eventingClientSet,
kafkaDispatcher,
kafkaChannelInformer,
),
}
// This line asserts at compile time that the length of controllers is equal to numControllers.
// It is based on https://go101.org/article/tips.html#assert-at-compile-time, which notes that
// var _ [N-M]int
// asserts at compile time that N >= M, which we can use to establish equality of N and M:
// (N >= M) && (M >= N) => (N == M)
var _ [numControllers - len(controllers)][len(controllers) - numControllers]int

// Watch the logging config map and dynamically update logging levels.
opt.ConfigMapWatcher.Watch(logconfig.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, logconfig.Controller))
// TODO: Watch the observability config map and dynamically update metrics exporter.
//opt.ConfigMapWatcher.Watch(metrics.ObservabilityConfigName, metrics.UpdateExporterFromConfigMap(component, logger))
if err := opt.ConfigMapWatcher.Start(stopCh); err != nil {
logger.Fatalw("failed to start configuration manager", zap.Error(err))
}

// Start all of the informers and wait for them to sync.
logger.Info("Starting informers.")
if err := kncontroller.StartInformers(
stopCh,
// Messaging
kafkaChannelInformer.Informer(),
); err != nil {
logger.Fatalf("Failed to start informers: %v", err)
}

logger.Info("Starting dispatcher.")
go kafkaDispatcher.Start(stopCh)

logger.Info("Starting controllers.")
kncontroller.StartAll(stopCh, controllers[:]...)
}

func setupLogger() (*zap.SugaredLogger, zap.AtomicLevel) {
// Set up our logger.
loggingConfigMap := getLoggingConfigOrDie()
loggingConfig, err := logging.NewConfigFromMap(loggingConfigMap)
if err != nil {
log.Fatalf("Error parsing logging configuration: %v", err)
}
return logging.NewLoggerFromConfig(loggingConfig, logconfig.Controller)
}

func getLoggingConfigOrDie() map[string]string {
if hardcodedLoggingConfig != nil && *hardcodedLoggingConfig {
return map[string]string{
"loglevel.controller": "info",
"zap-logger-config": `
{
"level": "info",
"development": false,
"outputPaths": ["stdout"],
"errorOutputPaths": ["stderr"],
"encoding": "json",
"encoderConfig": {
"timeKey": "ts",
"levelKey": "level",
"nameKey": "logger",
"callerKey": "caller",
"messageKey": "msg",
"stacktraceKey": "stacktrace",
"lineEnding": "",
"levelEncoder": "",
"timeEncoder": "iso8601",
"durationEncoder": "",
"callerEncoder": ""
}`,
}
} else {
cm, err := configmap.Load("/etc/config-logging")
if err != nil {
log.Fatalf("Error loading logging configuration: %v", err)
}
return cm
}
}
Loading