Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 90 additions & 2 deletions config/provisioners/kafka/kafka-provisioner.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,20 @@ rules:
- "" # Core API group.
resources:
- services
- configmaps
verbs:
- get
- list
- watch
- create
- apiGroups:
- "" # Core API Group.
resources:
- configmaps
resourceNames:
- kafka-channel-dispatcher
verbs:
- update
- apiGroups:
- networking.istio.io
resources:
Expand Down Expand Up @@ -81,7 +90,7 @@ metadata:
name: kafka-channel-controller-config
namespace: knative-eventing
data:
# Broker URL's for the provisioner
# Broker URL's for the provisioner. Replace this with the URL's for your kafka cluster.
bootstrap_servers: kafkabroker.kafka:9092
---

Expand All @@ -100,11 +109,90 @@ spec:
serviceAccountName: kafka-channel-controller
containers:
- name: kafka-channel-controller-controller
image: github.com/knative/eventing/pkg/provisioners/kafka
image: github.com/knative/eventing/pkg/provisioners/kafka/cmd/controller
volumeMounts:
- name: kafka-channel-controller-config
mountPath: /etc/config-provisioner
volumes:
- name: kafka-channel-controller-config
configMap:
name: kafka-channel-controller-config
---

apiVersion: v1
kind: ServiceAccount
metadata:
name: kafka-channel-dispatcher
namespace: knative-eventing

---

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: kafka-channel-dispatcher
namespace: knative-eventing
rules:
- apiGroups:
- "" # Core API group.
resources:
- configmaps
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be all configmaps, or just the one?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dispatcher uses github.com/knative/pkg/configmap watcher that needs these cluster-scoped privileges.

verbs:
- get
- list
- watch

---

apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: kafka-channel-dispatcher
namespace: knative-eventing
subjects:
- kind: ServiceAccount
name: kafka-channel-dispatcher
namespace: knative-eventing
roleRef:
kind: ClusterRole
name: kafka-channel-dispatcher
apiGroup: rbac.authorization.k8s.io

---

apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka-channel-dispatcher
namespace: knative-eventing
spec:
replicas: 1
selector:
matchLabels: &labels
clusterChannelProvisioner: kafka-channel
role: dispatcher
serviceName: kafka-channel-dispatcher-service
template:
metadata:
annotations:
sidecar.istio.io/inject: "true"
labels: *labels
spec:
serviceAccountName: kafka-channel-dispatcher
containers:
- name: dispatcher
image: github.com/knative/eventing/pkg/provisioners/kafka/cmd/dispatcher
env:
- name: DISPATCHER_CONFIGMAP_NAME
value: kafka-channel-dispatcher
- name: DISPATCHER_CONFIGMAP_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
volumeMounts:
- name: kafka-channel-controller-config
mountPath: /etc/config-provisioner
volumes:
- name: kafka-channel-controller-config
configMap:
name: kafka-channel-controller-config
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't mark this optional and the ConfigMap isn't defined above, I think the pod will fail until the configmap is defined.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kafka-channel-controller-config is required as it contains the broker info and we check if it is defined while starting up https://github.com/knative/eventing/pull/589/files#diff-8634243f0697f44c13a150873e6cc457R72. Do you want to change this behavior?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering about crash-looping vs sitting and waiting for configuration, vs shipping a default configuration that the user might update later. I guess the current ClusterChannelProvisioner only supports a single set of bootstrap servers, so it may be that crash looping is the right choice for now. At some point in the future, it seems like we might want to support multiple Kafka clusters, at which point 0 clusters would be a valid configuration.

For now, this may be fine, but I have a sort of knee-jerk reaction to configs which fail unless the user has followed a precondition. Maybe a comment here or near the top of the file that creating this ConfigMap should be a precondition to applying this yaml?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The yaml actualy creates the kafka-channel-controller-config configmap in L78. It defaults to a broker URL that one would get if they created the kafka cluster using the sample in ./broker/kafka-broker.yaml. I will add a comment for the end-user to update this config map in cases where they use a different kafka cluster.

79 changes: 79 additions & 0 deletions pkg/provisioners/kafka/cmd/controller/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package main

import (
"flag"
"os"

istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/runtime"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"

eventingv1alpha "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
provisionerController "github.com/knative/eventing/pkg/provisioners/kafka/controller"
"github.com/knative/eventing/pkg/provisioners/kafka/controller/channel"
)

// SchemeFunc adds types to a Scheme.
type SchemeFunc func(*runtime.Scheme) error

// ProvideFunc adds a controller to a Manager.
type ProvideFunc func(mgr manager.Manager, config *provisionerController.KafkaProvisionerConfig, logger *zap.Logger) (controller.Controller, error)

func main() {
flag.Parse()
logf.SetLogger(logf.ZapLogger(false))

logger := provisioners.NewProvisionerLoggerFromConfig(provisioners.NewLoggingConfig())
defer logger.Sync()

// Setup a Manager
mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
if err != nil {
logger.Error(err, "unable to run controller manager")
os.Exit(1)
}

// Add custom types to this array to get them into the manager's scheme.
schemeFuncs := []SchemeFunc{
eventingv1alpha.AddToScheme,
istiov1alpha3.AddToScheme,
}
for _, schemeFunc := range schemeFuncs {
schemeFunc(mgr.GetScheme())
}

// Add each controller's ProvideController func to this list to have the
// manager run it.
providers := []ProvideFunc{
provisionerController.ProvideController,
channel.ProvideController,
}

// TODO the underlying config map needs to be watched and the config should be reloaded if there is a change.
provisionerConfig, err := provisionerController.GetProvisionerConfig("/etc/config-provisioner")

if err != nil {
logger.Error(err, "unable to run controller manager")
os.Exit(1)
}

for _, provider := range providers {
if _, err := provider(mgr, provisionerConfig, logger.Desugar()); err != nil {
logger.Error(err, "unable to run controller manager")
os.Exit(1)
}
}

// Start blocks forever.
err = mgr.Start(signals.SetupSignalHandler())
if err != nil {
logger.Fatal("Manager.Start() returned an error", zap.Error(err))
}
}
99 changes: 99 additions & 0 deletions pkg/provisioners/kafka/cmd/dispatcher/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
Copyright 2018 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"fmt"
"log"
"os"

"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"

provisionerController "github.com/knative/eventing/pkg/provisioners/kafka/controller"
"github.com/knative/eventing/pkg/provisioners/kafka/dispatcher"
"github.com/knative/eventing/pkg/sidecar/configmap/watcher"
"github.com/knative/eventing/pkg/system"
)

func main() {

configMapName := os.Getenv("DISPATCHER_CONFIGMAP_NAME")
if configMapName == "" {
configMapName = provisionerController.DispatcherConfigMapName
}
configMapNamespace := os.Getenv("DISPATCHER_CONFIGMAP_NAMESPACE")
if configMapNamespace == "" {
configMapNamespace = system.Namespace
}

logger, err := zap.NewProduction()
if err != nil {
log.Fatalf("unable to create logger: %v", err)
}

provisionerConfig, err := provisionerController.GetProvisionerConfig("/etc/config-provisioner")
if err != nil {
logger.Fatal("unable to load provisioner config", zap.Error(err))
}

mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
if err != nil {
logger.Fatal("unable to create manager.", zap.Error(err))
}

kafkaDispatcher, err := dispatcher.NewDispatcher(provisionerConfig.Brokers, logger)
if err != nil {
logger.Fatal("unable to create kafka dispatcher.", zap.Error(err))
}

kc, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
logger.Fatal("unable to create kubernetes client.", zap.Error(err))
}

cmw, err := watcher.NewWatcher(logger, kc, configMapNamespace, configMapName, kafkaDispatcher.UpdateConfig)
if err != nil {
logger.Fatal("unable to create configmap watcher", zap.String("configmap", fmt.Sprintf("%s/%s", configMapNamespace, configMapName)))
}
mgr.Add(cmw)

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

// Start both the manager (which notices ConfigMap changes) and the HTTP server.
var g errgroup.Group
g.Go(func() error {
// Start blocks forever, so run it in a goroutine.
return mgr.Start(stopCh)
})

g.Go(func() error {
// Setups message receiver and blocks
return kafkaDispatcher.Start(stopCh)
})

err = g.Wait()
if err != nil {
logger.Error("Either the kafka message receiver or the ConfigMap noticer failed.", zap.Error(err))
}

}
25 changes: 18 additions & 7 deletions pkg/provisioners/kafka/controller/channel/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -31,6 +32,7 @@ import (

eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
common "github.com/knative/eventing/pkg/provisioners/kafka/controller"
"github.com/knative/eventing/pkg/system"
)

const (
Expand All @@ -39,11 +41,19 @@ const (
controllerAgentName = "kafka-provisioner-channel-controller"
)

var (
defaultConfigMapKey = types.NamespacedName{
Namespace: system.Namespace,
Name: common.DispatcherConfigMapName,
}
)

type reconciler struct {
client client.Client
recorder record.EventRecorder
logger *zap.Logger
config *common.KafkaProvisionerConfig
client client.Client
recorder record.EventRecorder
logger *zap.Logger
config *common.KafkaProvisionerConfig
configMapKey client.ObjectKey
// Using a shared kafkaClusterAdmin does not work currently because of an issue with
// Shopify/sarama, see https://github.com/Shopify/sarama/issues/1162.
kafkaClusterAdmin sarama.ClusterAdmin
Expand All @@ -57,9 +67,10 @@ func ProvideController(mgr manager.Manager, config *common.KafkaProvisionerConfi
// Setup a new controller to Reconcile Channel.
c, err := controller.New(controllerAgentName, mgr, controller.Options{
Reconciler: &reconciler{
recorder: mgr.GetRecorder(controllerAgentName),
logger: logger,
config: config,
recorder: mgr.GetRecorder(controllerAgentName),
logger: logger,
config: config,
configMapKey: defaultConfigMapKey,
},
})
if err != nil {
Expand Down
Loading