From e1c6086525886fd1302d3b99970a1d26905cc072 Mon Sep 17 00:00:00 2001 From: nachocano Date: Fri, 31 May 2019 13:18:47 -0700 Subject: [PATCH 1/9] adding kafka webhook --- contrib/kafka/cmd/webhook/main.go | 95 +++++++++++++++++++ contrib/kafka/config/200-serviceaccount.yaml | 7 ++ .../kafka/config/200-webhook-clusterrole.yaml | 77 +++++++++++++++ .../kafka/config/201-clusterrolebinding.yaml | 15 +++ contrib/kafka/config/400-webhook-service.yaml | 27 ++++++ contrib/kafka/config/500-webhook.yaml | 56 +++++++++++ 6 files changed, 277 insertions(+) create mode 100644 contrib/kafka/cmd/webhook/main.go create mode 100644 contrib/kafka/config/200-webhook-clusterrole.yaml create mode 100644 contrib/kafka/config/400-webhook-service.yaml create mode 100644 contrib/kafka/config/500-webhook.yaml diff --git a/contrib/kafka/cmd/webhook/main.go b/contrib/kafka/cmd/webhook/main.go new file mode 100644 index 00000000000..37da5999b11 --- /dev/null +++ b/contrib/kafka/cmd/webhook/main.go @@ -0,0 +1,95 @@ +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "flag" + "go.uber.org/zap" + "log" + + messagingv1alpha1 "github.com/knative/eventing/contrib/kafka/pkg/apis/messaging/v1alpha1" + "github.com/knative/eventing/pkg/logconfig" + "github.com/knative/pkg/configmap" + "github.com/knative/pkg/logging" + "github.com/knative/pkg/logging/logkey" + "github.com/knative/pkg/signals" + "github.com/knative/pkg/system" + "github.com/knative/pkg/webhook" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +func main() { + flag.Parse() + // Read the logging config and setup a logger. + cm, err := configmap.Load("/etc/config-logging") + if err != nil { + log.Fatalf("Error loading logging configuration: %v", err) + } + config, err := logging.NewConfigFromMap(cm, logconfig.WebhookName()) + if err != nil { + log.Fatalf("Error parsing logging configuration: %v", err) + } + logger, atomicLevel := logging.NewLoggerFromConfig(config, logconfig.WebhookName()) + defer logger.Sync() + logger = logger.With(zap.String(logkey.ControllerType, logconfig.WebhookName())) + + logger.Infow("Starting the Kafka Messaging Webhook") + + // set up signals so we handle the first shutdown signal gracefully + stopCh := signals.SetupSignalHandler() + + clusterConfig, err := rest.InClusterConfig() + if err != nil { + logger.Fatalw("Failed to get in cluster config", zap.Error(err)) + } + + kubeClient, err := kubernetes.NewForConfig(clusterConfig) + if err != nil { + logger.Fatalw("Failed to get the client set", zap.Error(err)) + } + + // Watch the logging config map and dynamically update logging levels. + configMapWatcher := configmap.NewInformedWatcher(kubeClient, system.Namespace()) + + configMapWatcher.Watch(logconfig.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, logconfig.WebhookName(), logconfig.WebhookName())) + + options := webhook.ControllerOptions{ + ServiceName: logconfig.WebhookName(), + DeploymentName: logconfig.WebhookName(), + Namespace: system.Namespace(), + Port: 8443, + SecretName: "messaging-webhook-certs", + WebhookName: "webhook.messaging.knative.dev", + } + controller := webhook.AdmissionController{ + Client: kubeClient, + Options: options, + Handlers: map[schema.GroupVersionKind]webhook.GenericCRD{ + // For group messaging.knative.dev, + messagingv1alpha1.SchemeGroupVersion.WithKind("KafkaChannel"): &messagingv1alpha1.KafkaChannel{}, + }, + Logger: logger, + } + if err != nil { + logger.Fatalw("Failed to create the Kafka admission controller", zap.Error(err)) + } + if err = controller.Run(stopCh); err != nil { + logger.Errorw("controller.Run() failed", zap.Error(err)) + } + logger.Infow("Kafka webhook stopping") +} diff --git a/contrib/kafka/config/200-serviceaccount.yaml b/contrib/kafka/config/200-serviceaccount.yaml index 8daa2857377..1917adca31d 100644 --- a/contrib/kafka/config/200-serviceaccount.yaml +++ b/contrib/kafka/config/200-serviceaccount.yaml @@ -24,3 +24,10 @@ kind: ServiceAccount metadata: name: kafka-ch-dispatcher namespace: knative-eventing + +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: kafka-webhook + namespace: knative-eventing diff --git a/contrib/kafka/config/200-webhook-clusterrole.yaml b/contrib/kafka/config/200-webhook-clusterrole.yaml new file mode 100644 index 00000000000..5a0f9ba21b7 --- /dev/null +++ b/contrib/kafka/config/200-webhook-clusterrole.yaml @@ -0,0 +1,77 @@ +# Copyright 2019 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: kafka-webhook +rules: + # For watching logging configuration and getting certs. + - apiGroups: + - "" + resources: + - "configmaps" + verbs: + - "get" + - "list" + - "watch" + + # For manipulating certs into secrets. + - apiGroups: + - "" + resources: + - "secrets" + verbs: + - "get" + - "create" + + # For getting our Deployment so we can decorate with ownerref. + - apiGroups: + - "apps" + resources: + - "deployments" + verbs: + - "get" + + - apiGroups: + - "apps" + resources: + - "deployments/finalizers" + verbs: + - update + + # For actually registering our webhook. + - apiGroups: + - "admissionregistration.k8s.io" + resources: + - "mutatingwebhookconfigurations" + verbs: + - "get" + - "list" + - "create" + - "update" + - "delete" + - "patch" + - "watch" + + # Our own resources and statuses we care about. + - apiGroups: + - "messaging.knative.dev" + resources: + - "kafkachannels" + - "kafkachannels/status" + verbs: + - "get" + - "list" + - "watch" diff --git a/contrib/kafka/config/201-clusterrolebinding.yaml b/contrib/kafka/config/201-clusterrolebinding.yaml index 99de053e752..a68a37deb44 100644 --- a/contrib/kafka/config/201-clusterrolebinding.yaml +++ b/contrib/kafka/config/201-clusterrolebinding.yaml @@ -41,3 +41,18 @@ roleRef: name: kafka-ch-dispatcher apiGroup: rbac.authorization.k8s.io +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: kafka-webhook + namespace: knative-eventing +subjects: + - kind: ServiceAccount + name: kafka-webhook + namespace: knative-eventing +roleRef: + kind: ClusterRole + name: kafka-webhook + apiGroup: rbac.authorization.k8s.io diff --git a/contrib/kafka/config/400-webhook-service.yaml b/contrib/kafka/config/400-webhook-service.yaml new file mode 100644 index 00000000000..9e20ae833c0 --- /dev/null +++ b/contrib/kafka/config/400-webhook-service.yaml @@ -0,0 +1,27 @@ +# Copyright 2018 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: Service +metadata: + labels: + role: kafka-webhook + name: kafka-webhook + namespace: knative-eventing +spec: + ports: + - port: 443 + targetPort: 8443 + selector: + role: kafka-webhook diff --git a/contrib/kafka/config/500-webhook.yaml b/contrib/kafka/config/500-webhook.yaml new file mode 100644 index 00000000000..f301a00868d --- /dev/null +++ b/contrib/kafka/config/500-webhook.yaml @@ -0,0 +1,56 @@ +# Copyright 2018 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: kafka-webhook + namespace: knative-eventing +spec: + replicas: 1 + selector: + matchLabels: &labels + app: kafka-webhook + role: kafka-webhook + template: + metadata: + annotations: + sidecar.istio.io/inject: "false" + labels: *labels + spec: + serviceAccountName: kafka-webhook + containers: + - name: kafka-webhook + terminationMessagePolicy: FallbackToLogsOnError + image: github.com/knative/eventing/contrib/kafka/cmd/webhook + volumeMounts: + - name: config-logging + mountPath: /etc/config-logging + env: + - name: SYSTEM_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: CONFIG_LOGGING_NAME + value: config-logging + - name: WEBHOOK_NAME + value: kafka-webhook + resources: + limits: + # TODO set a proper value. + memory: 1000Mi # An initial guess, but consistent with serving + volumes: + - name: config-logging + configMap: + name: config-logging From 0266cc79d3bb55299f521f9616e9d0a997ee9415 Mon Sep 17 00:00:00 2001 From: nachocano Date: Fri, 31 May 2019 13:19:51 -0700 Subject: [PATCH 2/9] typo --- contrib/kafka/cmd/webhook/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/kafka/cmd/webhook/main.go b/contrib/kafka/cmd/webhook/main.go index 37da5999b11..876d9efd1e4 100644 --- a/contrib/kafka/cmd/webhook/main.go +++ b/contrib/kafka/cmd/webhook/main.go @@ -80,7 +80,7 @@ func main() { Client: kubeClient, Options: options, Handlers: map[schema.GroupVersionKind]webhook.GenericCRD{ - // For group messaging.knative.dev, + // For group messaging.knative.dev messagingv1alpha1.SchemeGroupVersion.WithKind("KafkaChannel"): &messagingv1alpha1.KafkaChannel{}, }, Logger: logger, From 4db5236c99773e663eab8583519d57381b711ce9 Mon Sep 17 00:00:00 2001 From: nachocano Date: Fri, 31 May 2019 13:24:42 -0700 Subject: [PATCH 3/9] sockpuppet --- contrib/kafka/config/200-webhook-clusterrole.yaml | 2 +- contrib/kafka/config/400-webhook-service.yaml | 2 +- contrib/kafka/config/500-webhook.yaml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/contrib/kafka/config/200-webhook-clusterrole.yaml b/contrib/kafka/config/200-webhook-clusterrole.yaml index 5a0f9ba21b7..da99397df5f 100644 --- a/contrib/kafka/config/200-webhook-clusterrole.yaml +++ b/contrib/kafka/config/200-webhook-clusterrole.yaml @@ -56,7 +56,7 @@ rules: - "admissionregistration.k8s.io" resources: - "mutatingwebhookconfigurations" - verbs: + verbs: - "get" - "list" - "create" diff --git a/contrib/kafka/config/400-webhook-service.yaml b/contrib/kafka/config/400-webhook-service.yaml index 9e20ae833c0..2a009327145 100644 --- a/contrib/kafka/config/400-webhook-service.yaml +++ b/contrib/kafka/config/400-webhook-service.yaml @@ -1,4 +1,4 @@ -# Copyright 2018 The Knative Authors +# Copyright 2019 The Knative Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/contrib/kafka/config/500-webhook.yaml b/contrib/kafka/config/500-webhook.yaml index f301a00868d..2ec479fa7e4 100644 --- a/contrib/kafka/config/500-webhook.yaml +++ b/contrib/kafka/config/500-webhook.yaml @@ -1,4 +1,4 @@ -# Copyright 2018 The Knative Authors +# Copyright 2019 The Knative Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From 47c60d76210ed164896bd34c16ce26949b2b479e Mon Sep 17 00:00:00 2001 From: nachocano Date: Fri, 31 May 2019 14:02:01 -0700 Subject: [PATCH 4/9] adding documentation --- contrib/kafka/config/README.md | 92 ++++++++++++++++++++++ contrib/kafka/config/provisioner/README.md | 2 +- 2 files changed, 93 insertions(+), 1 deletion(-) create mode 100644 contrib/kafka/config/README.md diff --git a/contrib/kafka/config/README.md b/contrib/kafka/config/README.md new file mode 100644 index 00000000000..992242c8022 --- /dev/null +++ b/contrib/kafka/config/README.md @@ -0,0 +1,92 @@ +# Apache Kafka Channels + +Kafka channels are those backed by [Apache Kafka](http://kafka.apache.org/) topics. + +## Deployment steps + +1. Setup [Knative Eventing](../../../../DEVELOPMENT.md) +1. If not done already, install an Apache Kafka cluster! + + - For Kubernetes a simple installation is done using the + [Strimzi Kafka Operator](http://strimzi.io). Its installation + [guides](http://strimzi.io/quickstarts/) provide content for Kubernetes and + Openshift. + + > Note: This _Channel_ is not limited to Apache Kafka installations on + > Kubernetes. It is also possible to use an off-cluster Apache Kafka + > installation. + +1. Now that Apache Kafka is installed, you need to configure the + `bootstrap_servers` value in the `config-kafka` ConfigMap, + located inside the `contrib/kafka/config/400-kafka-config.yaml` file: + + ```yaml + apiVersion: v1 + kind: ConfigMap + metadata: + name: config-kafka + namespace: knative-eventing + data: + # Broker URL. Replace this with the URLs for your kafka cluster, + # which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092. + bootstrap_servers: REPLACE_WITH_CLUSTER_URL + + # Consumer mode to dispatch events from different partitions in parallel. + # By default(multiplex), partitions are multiplexed with a single go channel. + # `multiplex` and `partitions` are valid values. + ## consumer_mode: partitions + ``` + +1. Apply the Kafka config: + + ``` + ko apply -f contrib/kafka/config + ``` + +1. Create the kafka channel custom objects: + + ```yaml + apiVersion: messaging.knative.dev/v1alpha1 + kind: KafkaChannel + metadata: + name: my-kafka-channel + spec: + numPartitions: 1 + replicationFactor: 3 + ``` + You can configure the number of partitions with `numPartitions`, as well as the replication factor with + `replicationFactor`. If not set, both will default to `1`. + +## Components + +The major components are: + +- Kafka Channel Controller +- Kafka Channel Dispatcher +- Kafka Webhook +- Kafka Config Map + + +The Kafka Channel Controller is located in one Pod: + +```shell +kubectl get deployment -n knative-eventing kafka-ch-controller +``` + +The Kafka Channel Dispatcher receives and distributes all events to the appropriate consumers: + +```shell +kubectl get deployment -n knative-eventing kafka-ch-dispatcher +``` + +The Kafka Webhook is used to validate and set defaults to `KafkaChannel` custom objects: + +```shell +kubectl get deployment -n knative-eventing kafka-webhook +``` + +The Kafka Config Map is used to configure the `bootstrap_servers` of your Apache Kafka installation: + +```shell +kubectl get configmap -n knative-eventing config-kafka +``` diff --git a/contrib/kafka/config/provisioner/README.md b/contrib/kafka/config/provisioner/README.md index f67ea4c1e3b..4f14accd8e4 100644 --- a/contrib/kafka/config/provisioner/README.md +++ b/contrib/kafka/config/provisioner/README.md @@ -2,7 +2,7 @@ Deployment steps: -1. Setup [Knative Eventing](../../../../DEVELOPMENT.md) +1. Setup [Knative Eventing](../../../../../DEVELOPMENT.md) 1. If not done already, install an Apache Kafka cluster! - For Kubernetes a simple installation is done using the From eebfc3aab1eb86a26f194e6c2347bae6ebe4490f Mon Sep 17 00:00:00 2001 From: nachocano Date: Fri, 31 May 2019 14:05:53 -0700 Subject: [PATCH 5/9] sockpuppet --- contrib/kafka/config/README.md | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/contrib/kafka/config/README.md b/contrib/kafka/config/README.md index 992242c8022..b99a8e2cb8e 100644 --- a/contrib/kafka/config/README.md +++ b/contrib/kafka/config/README.md @@ -30,12 +30,8 @@ Kafka channels are those backed by [Apache Kafka](http://kafka.apache.org/) topi # Broker URL. Replace this with the URLs for your kafka cluster, # which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092. bootstrap_servers: REPLACE_WITH_CLUSTER_URL - - # Consumer mode to dispatch events from different partitions in parallel. - # By default(multiplex), partitions are multiplexed with a single go channel. - # `multiplex` and `partitions` are valid values. - ## consumer_mode: partitions - ``` + ... + ``` 1. Apply the Kafka config: @@ -50,12 +46,11 @@ Kafka channels are those backed by [Apache Kafka](http://kafka.apache.org/) topi kind: KafkaChannel metadata: name: my-kafka-channel - spec: + spec: numPartitions: 1 replicationFactor: 3 ``` - You can configure the number of partitions with `numPartitions`, as well as the replication factor with - `replicationFactor`. If not set, both will default to `1`. + You can configure the number of partitions with `numPartitions`, as well as the replication factor with `replicationFactor`. If not set, both will default to `1`. ## Components From 9a20bb2d8c67a5791f7581e51b3b94519939b9c7 Mon Sep 17 00:00:00 2001 From: nachocano Date: Fri, 31 May 2019 14:18:00 -0700 Subject: [PATCH 6/9] proper location of README --- contrib/kafka/config/README.md | 2 +- contrib/kafka/config/provisioner/README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/contrib/kafka/config/README.md b/contrib/kafka/config/README.md index b99a8e2cb8e..8964b2ad964 100644 --- a/contrib/kafka/config/README.md +++ b/contrib/kafka/config/README.md @@ -4,7 +4,7 @@ Kafka channels are those backed by [Apache Kafka](http://kafka.apache.org/) topi ## Deployment steps -1. Setup [Knative Eventing](../../../../DEVELOPMENT.md) +1. Setup [Knative Eventing](../../../DEVELOPMENT.md) 1. If not done already, install an Apache Kafka cluster! - For Kubernetes a simple installation is done using the diff --git a/contrib/kafka/config/provisioner/README.md b/contrib/kafka/config/provisioner/README.md index 4f14accd8e4..f67ea4c1e3b 100644 --- a/contrib/kafka/config/provisioner/README.md +++ b/contrib/kafka/config/provisioner/README.md @@ -2,7 +2,7 @@ Deployment steps: -1. Setup [Knative Eventing](../../../../../DEVELOPMENT.md) +1. Setup [Knative Eventing](../../../../DEVELOPMENT.md) 1. If not done already, install an Apache Kafka cluster! - For Kubernetes a simple installation is done using the From 7a19286c76478b3e684445d73facc8c851b929ba Mon Sep 17 00:00:00 2001 From: nachocano Date: Mon, 3 Jun 2019 09:11:12 -0700 Subject: [PATCH 7/9] removing limit value as per code review --- contrib/kafka/config/500-webhook.yaml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/contrib/kafka/config/500-webhook.yaml b/contrib/kafka/config/500-webhook.yaml index 2ec479fa7e4..023f55c17f7 100644 --- a/contrib/kafka/config/500-webhook.yaml +++ b/contrib/kafka/config/500-webhook.yaml @@ -46,10 +46,7 @@ spec: value: config-logging - name: WEBHOOK_NAME value: kafka-webhook - resources: - limits: - # TODO set a proper value. - memory: 1000Mi # An initial guess, but consistent with serving + # TODO set proper resource limits. volumes: - name: config-logging configMap: From a52917a9398a6a043c8fbb925d67c70191da236b Mon Sep 17 00:00:00 2001 From: Nacho Cano Date: Mon, 3 Jun 2019 09:23:20 -0700 Subject: [PATCH 8/9] sockpuppet --- contrib/kafka/config/README.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/contrib/kafka/config/README.md b/contrib/kafka/config/README.md index 8964b2ad964..d08b80704a8 100644 --- a/contrib/kafka/config/README.md +++ b/contrib/kafka/config/README.md @@ -20,7 +20,7 @@ Kafka channels are those backed by [Apache Kafka](http://kafka.apache.org/) topi `bootstrap_servers` value in the `config-kafka` ConfigMap, located inside the `contrib/kafka/config/400-kafka-config.yaml` file: - ```yaml + ```yaml apiVersion: v1 kind: ConfigMap metadata: @@ -30,8 +30,7 @@ Kafka channels are those backed by [Apache Kafka](http://kafka.apache.org/) topi # Broker URL. Replace this with the URLs for your kafka cluster, # which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092. bootstrap_servers: REPLACE_WITH_CLUSTER_URL - ... - ``` + ``` 1. Apply the Kafka config: From 8f5040661a2baff2f8d1d1487061e585a3a127aa Mon Sep 17 00:00:00 2001 From: Nacho Cano Date: Mon, 3 Jun 2019 09:31:59 -0700 Subject: [PATCH 9/9] sockpuppet --- contrib/kafka/config/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/kafka/config/README.md b/contrib/kafka/config/README.md index d08b80704a8..10743b350a7 100644 --- a/contrib/kafka/config/README.md +++ b/contrib/kafka/config/README.md @@ -30,7 +30,7 @@ Kafka channels are those backed by [Apache Kafka](http://kafka.apache.org/) topi # Broker URL. Replace this with the URLs for your kafka cluster, # which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092. bootstrap_servers: REPLACE_WITH_CLUSTER_URL - ``` + ``` 1. Apply the Kafka config: