diff --git a/contrib/kafka/cmd/webhook/main.go b/contrib/kafka/cmd/webhook/main.go new file mode 100644 index 00000000000..876d9efd1e4 --- /dev/null +++ b/contrib/kafka/cmd/webhook/main.go @@ -0,0 +1,95 @@ +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "flag" + "go.uber.org/zap" + "log" + + messagingv1alpha1 "github.com/knative/eventing/contrib/kafka/pkg/apis/messaging/v1alpha1" + "github.com/knative/eventing/pkg/logconfig" + "github.com/knative/pkg/configmap" + "github.com/knative/pkg/logging" + "github.com/knative/pkg/logging/logkey" + "github.com/knative/pkg/signals" + "github.com/knative/pkg/system" + "github.com/knative/pkg/webhook" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +func main() { + flag.Parse() + // Read the logging config and setup a logger. + cm, err := configmap.Load("/etc/config-logging") + if err != nil { + log.Fatalf("Error loading logging configuration: %v", err) + } + config, err := logging.NewConfigFromMap(cm, logconfig.WebhookName()) + if err != nil { + log.Fatalf("Error parsing logging configuration: %v", err) + } + logger, atomicLevel := logging.NewLoggerFromConfig(config, logconfig.WebhookName()) + defer logger.Sync() + logger = logger.With(zap.String(logkey.ControllerType, logconfig.WebhookName())) + + logger.Infow("Starting the Kafka Messaging Webhook") + + // set up signals so we handle the first shutdown signal gracefully + stopCh := signals.SetupSignalHandler() + + clusterConfig, err := rest.InClusterConfig() + if err != nil { + logger.Fatalw("Failed to get in cluster config", zap.Error(err)) + } + + kubeClient, err := kubernetes.NewForConfig(clusterConfig) + if err != nil { + logger.Fatalw("Failed to get the client set", zap.Error(err)) + } + + // Watch the logging config map and dynamically update logging levels. + configMapWatcher := configmap.NewInformedWatcher(kubeClient, system.Namespace()) + + configMapWatcher.Watch(logconfig.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, logconfig.WebhookName(), logconfig.WebhookName())) + + options := webhook.ControllerOptions{ + ServiceName: logconfig.WebhookName(), + DeploymentName: logconfig.WebhookName(), + Namespace: system.Namespace(), + Port: 8443, + SecretName: "messaging-webhook-certs", + WebhookName: "webhook.messaging.knative.dev", + } + controller := webhook.AdmissionController{ + Client: kubeClient, + Options: options, + Handlers: map[schema.GroupVersionKind]webhook.GenericCRD{ + // For group messaging.knative.dev + messagingv1alpha1.SchemeGroupVersion.WithKind("KafkaChannel"): &messagingv1alpha1.KafkaChannel{}, + }, + Logger: logger, + } + if err != nil { + logger.Fatalw("Failed to create the Kafka admission controller", zap.Error(err)) + } + if err = controller.Run(stopCh); err != nil { + logger.Errorw("controller.Run() failed", zap.Error(err)) + } + logger.Infow("Kafka webhook stopping") +} diff --git a/contrib/kafka/config/200-serviceaccount.yaml b/contrib/kafka/config/200-serviceaccount.yaml index 8daa2857377..1917adca31d 100644 --- a/contrib/kafka/config/200-serviceaccount.yaml +++ b/contrib/kafka/config/200-serviceaccount.yaml @@ -24,3 +24,10 @@ kind: ServiceAccount metadata: name: kafka-ch-dispatcher namespace: knative-eventing + +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: kafka-webhook + namespace: knative-eventing diff --git a/contrib/kafka/config/200-webhook-clusterrole.yaml b/contrib/kafka/config/200-webhook-clusterrole.yaml new file mode 100644 index 00000000000..da99397df5f --- /dev/null +++ b/contrib/kafka/config/200-webhook-clusterrole.yaml @@ -0,0 +1,77 @@ +# Copyright 2019 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: kafka-webhook +rules: + # For watching logging configuration and getting certs. + - apiGroups: + - "" + resources: + - "configmaps" + verbs: + - "get" + - "list" + - "watch" + + # For manipulating certs into secrets. + - apiGroups: + - "" + resources: + - "secrets" + verbs: + - "get" + - "create" + + # For getting our Deployment so we can decorate with ownerref. + - apiGroups: + - "apps" + resources: + - "deployments" + verbs: + - "get" + + - apiGroups: + - "apps" + resources: + - "deployments/finalizers" + verbs: + - update + + # For actually registering our webhook. + - apiGroups: + - "admissionregistration.k8s.io" + resources: + - "mutatingwebhookconfigurations" + verbs: + - "get" + - "list" + - "create" + - "update" + - "delete" + - "patch" + - "watch" + + # Our own resources and statuses we care about. + - apiGroups: + - "messaging.knative.dev" + resources: + - "kafkachannels" + - "kafkachannels/status" + verbs: + - "get" + - "list" + - "watch" diff --git a/contrib/kafka/config/201-clusterrolebinding.yaml b/contrib/kafka/config/201-clusterrolebinding.yaml index 99de053e752..a68a37deb44 100644 --- a/contrib/kafka/config/201-clusterrolebinding.yaml +++ b/contrib/kafka/config/201-clusterrolebinding.yaml @@ -41,3 +41,18 @@ roleRef: name: kafka-ch-dispatcher apiGroup: rbac.authorization.k8s.io +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: kafka-webhook + namespace: knative-eventing +subjects: + - kind: ServiceAccount + name: kafka-webhook + namespace: knative-eventing +roleRef: + kind: ClusterRole + name: kafka-webhook + apiGroup: rbac.authorization.k8s.io diff --git a/contrib/kafka/config/400-webhook-service.yaml b/contrib/kafka/config/400-webhook-service.yaml new file mode 100644 index 00000000000..2a009327145 --- /dev/null +++ b/contrib/kafka/config/400-webhook-service.yaml @@ -0,0 +1,27 @@ +# Copyright 2019 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: Service +metadata: + labels: + role: kafka-webhook + name: kafka-webhook + namespace: knative-eventing +spec: + ports: + - port: 443 + targetPort: 8443 + selector: + role: kafka-webhook diff --git a/contrib/kafka/config/500-webhook.yaml b/contrib/kafka/config/500-webhook.yaml new file mode 100644 index 00000000000..023f55c17f7 --- /dev/null +++ b/contrib/kafka/config/500-webhook.yaml @@ -0,0 +1,53 @@ +# Copyright 2019 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: kafka-webhook + namespace: knative-eventing +spec: + replicas: 1 + selector: + matchLabels: &labels + app: kafka-webhook + role: kafka-webhook + template: + metadata: + annotations: + sidecar.istio.io/inject: "false" + labels: *labels + spec: + serviceAccountName: kafka-webhook + containers: + - name: kafka-webhook + terminationMessagePolicy: FallbackToLogsOnError + image: github.com/knative/eventing/contrib/kafka/cmd/webhook + volumeMounts: + - name: config-logging + mountPath: /etc/config-logging + env: + - name: SYSTEM_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: CONFIG_LOGGING_NAME + value: config-logging + - name: WEBHOOK_NAME + value: kafka-webhook + # TODO set proper resource limits. + volumes: + - name: config-logging + configMap: + name: config-logging diff --git a/contrib/kafka/config/README.md b/contrib/kafka/config/README.md new file mode 100644 index 00000000000..10743b350a7 --- /dev/null +++ b/contrib/kafka/config/README.md @@ -0,0 +1,86 @@ +# Apache Kafka Channels + +Kafka channels are those backed by [Apache Kafka](http://kafka.apache.org/) topics. + +## Deployment steps + +1. Setup [Knative Eventing](../../../DEVELOPMENT.md) +1. If not done already, install an Apache Kafka cluster! + + - For Kubernetes a simple installation is done using the + [Strimzi Kafka Operator](http://strimzi.io). Its installation + [guides](http://strimzi.io/quickstarts/) provide content for Kubernetes and + Openshift. + + > Note: This _Channel_ is not limited to Apache Kafka installations on + > Kubernetes. It is also possible to use an off-cluster Apache Kafka + > installation. + +1. Now that Apache Kafka is installed, you need to configure the + `bootstrap_servers` value in the `config-kafka` ConfigMap, + located inside the `contrib/kafka/config/400-kafka-config.yaml` file: + + ```yaml + apiVersion: v1 + kind: ConfigMap + metadata: + name: config-kafka + namespace: knative-eventing + data: + # Broker URL. Replace this with the URLs for your kafka cluster, + # which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092. + bootstrap_servers: REPLACE_WITH_CLUSTER_URL + ``` + +1. Apply the Kafka config: + + ``` + ko apply -f contrib/kafka/config + ``` + +1. Create the kafka channel custom objects: + + ```yaml + apiVersion: messaging.knative.dev/v1alpha1 + kind: KafkaChannel + metadata: + name: my-kafka-channel + spec: + numPartitions: 1 + replicationFactor: 3 + ``` + You can configure the number of partitions with `numPartitions`, as well as the replication factor with `replicationFactor`. If not set, both will default to `1`. + +## Components + +The major components are: + +- Kafka Channel Controller +- Kafka Channel Dispatcher +- Kafka Webhook +- Kafka Config Map + + +The Kafka Channel Controller is located in one Pod: + +```shell +kubectl get deployment -n knative-eventing kafka-ch-controller +``` + +The Kafka Channel Dispatcher receives and distributes all events to the appropriate consumers: + +```shell +kubectl get deployment -n knative-eventing kafka-ch-dispatcher +``` + +The Kafka Webhook is used to validate and set defaults to `KafkaChannel` custom objects: + +```shell +kubectl get deployment -n knative-eventing kafka-webhook +``` + +The Kafka Config Map is used to configure the `bootstrap_servers` of your Apache Kafka installation: + +```shell +kubectl get configmap -n knative-eventing config-kafka +```