Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions contrib/kafka/cmd/webhook/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"flag"
"go.uber.org/zap"
"log"

messagingv1alpha1 "github.com/knative/eventing/contrib/kafka/pkg/apis/messaging/v1alpha1"
"github.com/knative/eventing/pkg/logconfig"
"github.com/knative/pkg/configmap"
"github.com/knative/pkg/logging"
"github.com/knative/pkg/logging/logkey"
"github.com/knative/pkg/signals"
"github.com/knative/pkg/system"
"github.com/knative/pkg/webhook"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

func main() {
flag.Parse()
// Read the logging config and setup a logger.
cm, err := configmap.Load("/etc/config-logging")
if err != nil {
log.Fatalf("Error loading logging configuration: %v", err)
}
config, err := logging.NewConfigFromMap(cm, logconfig.WebhookName())
if err != nil {
log.Fatalf("Error parsing logging configuration: %v", err)
}
logger, atomicLevel := logging.NewLoggerFromConfig(config, logconfig.WebhookName())
defer logger.Sync()
logger = logger.With(zap.String(logkey.ControllerType, logconfig.WebhookName()))

logger.Infow("Starting the Kafka Messaging Webhook")

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

clusterConfig, err := rest.InClusterConfig()
if err != nil {
logger.Fatalw("Failed to get in cluster config", zap.Error(err))
}

kubeClient, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
logger.Fatalw("Failed to get the client set", zap.Error(err))
}

// Watch the logging config map and dynamically update logging levels.
configMapWatcher := configmap.NewInformedWatcher(kubeClient, system.Namespace())

configMapWatcher.Watch(logconfig.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, logconfig.WebhookName(), logconfig.WebhookName()))

options := webhook.ControllerOptions{
ServiceName: logconfig.WebhookName(),
DeploymentName: logconfig.WebhookName(),
Namespace: system.Namespace(),
Port: 8443,
SecretName: "messaging-webhook-certs",
WebhookName: "webhook.messaging.knative.dev",
}
controller := webhook.AdmissionController{
Client: kubeClient,
Options: options,
Handlers: map[schema.GroupVersionKind]webhook.GenericCRD{
// For group messaging.knative.dev
messagingv1alpha1.SchemeGroupVersion.WithKind("KafkaChannel"): &messagingv1alpha1.KafkaChannel{},
},
Logger: logger,
}
if err != nil {
logger.Fatalw("Failed to create the Kafka admission controller", zap.Error(err))
}
if err = controller.Run(stopCh); err != nil {
logger.Errorw("controller.Run() failed", zap.Error(err))
}
logger.Infow("Kafka webhook stopping")
}
7 changes: 7 additions & 0 deletions contrib/kafka/config/200-serviceaccount.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,10 @@ kind: ServiceAccount
metadata:
name: kafka-ch-dispatcher
namespace: knative-eventing

---
apiVersion: v1
kind: ServiceAccount
metadata:
name: kafka-webhook
namespace: knative-eventing
77 changes: 77 additions & 0 deletions contrib/kafka/config/200-webhook-clusterrole.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright 2019 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: kafka-webhook
rules:
# For watching logging configuration and getting certs.
- apiGroups:
- ""
resources:
- "configmaps"
verbs:
- "get"
- "list"
- "watch"

# For manipulating certs into secrets.
- apiGroups:
- ""
resources:
- "secrets"
verbs:
- "get"
- "create"

# For getting our Deployment so we can decorate with ownerref.
- apiGroups:
- "apps"
resources:
- "deployments"
verbs:
- "get"

- apiGroups:
- "apps"
resources:
- "deployments/finalizers"
verbs:
- update

# For actually registering our webhook.
- apiGroups:
- "admissionregistration.k8s.io"
resources:
- "mutatingwebhookconfigurations"
verbs:
- "get"
- "list"
- "create"
- "update"
- "delete"
- "patch"
- "watch"

# Our own resources and statuses we care about.
- apiGroups:
- "messaging.knative.dev"
resources:
- "kafkachannels"
- "kafkachannels/status"
verbs:
- "get"
- "list"
- "watch"
15 changes: 15 additions & 0 deletions contrib/kafka/config/201-clusterrolebinding.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,18 @@ roleRef:
name: kafka-ch-dispatcher
apiGroup: rbac.authorization.k8s.io

---

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: kafka-webhook
namespace: knative-eventing
subjects:
- kind: ServiceAccount
name: kafka-webhook
namespace: knative-eventing
roleRef:
kind: ClusterRole
name: kafka-webhook
apiGroup: rbac.authorization.k8s.io
27 changes: 27 additions & 0 deletions contrib/kafka/config/400-webhook-service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2019 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: v1
kind: Service
metadata:
labels:
role: kafka-webhook
name: kafka-webhook
namespace: knative-eventing
spec:
ports:
- port: 443
targetPort: 8443
selector:
role: kafka-webhook
53 changes: 53 additions & 0 deletions contrib/kafka/config/500-webhook.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright 2019 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-webhook
namespace: knative-eventing
spec:
replicas: 1
selector:
matchLabels: &labels
app: kafka-webhook
role: kafka-webhook
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
labels: *labels
spec:
serviceAccountName: kafka-webhook
containers:
- name: kafka-webhook
terminationMessagePolicy: FallbackToLogsOnError
image: github.com/knative/eventing/contrib/kafka/cmd/webhook
Comment thread
nachocano marked this conversation as resolved.
volumeMounts:
- name: config-logging
mountPath: /etc/config-logging
env:
- name: SYSTEM_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: CONFIG_LOGGING_NAME
value: config-logging
- name: WEBHOOK_NAME
value: kafka-webhook
# TODO set proper resource limits.
volumes:
- name: config-logging
configMap:
name: config-logging
86 changes: 86 additions & 0 deletions contrib/kafka/config/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Apache Kafka Channels

Kafka channels are those backed by [Apache Kafka](http://kafka.apache.org/) topics.

## Deployment steps

1. Setup [Knative Eventing](../../../DEVELOPMENT.md)
1. If not done already, install an Apache Kafka cluster!

- For Kubernetes a simple installation is done using the
[Strimzi Kafka Operator](http://strimzi.io). Its installation
[guides](http://strimzi.io/quickstarts/) provide content for Kubernetes and
Openshift.

> Note: This _Channel_ is not limited to Apache Kafka installations on
> Kubernetes. It is also possible to use an off-cluster Apache Kafka
> installation.

1. Now that Apache Kafka is installed, you need to configure the
`bootstrap_servers` value in the `config-kafka` ConfigMap,
located inside the `contrib/kafka/config/400-kafka-config.yaml` file:

```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: config-kafka
namespace: knative-eventing
data:
# Broker URL. Replace this with the URLs for your kafka cluster,
# which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092.
bootstrap_servers: REPLACE_WITH_CLUSTER_URL
```

1. Apply the Kafka config:

```
ko apply -f contrib/kafka/config
```

1. Create the kafka channel custom objects:

```yaml
apiVersion: messaging.knative.dev/v1alpha1
kind: KafkaChannel
metadata:
name: my-kafka-channel
spec:
numPartitions: 1
replicationFactor: 3
```
You can configure the number of partitions with `numPartitions`, as well as the replication factor with `replicationFactor`. If not set, both will default to `1`.
Copy link
Copy Markdown
Member

@matzew matzew Jun 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this doc say, how to connect to the broker ? or use as default channel ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need better documentation for all channels. And we should put that in knative/docs.
This is a copy/paster from the current provisioner-based README, plus some minor changes.
IMO we can add that in a follow up in knative/docs. What do you think?


## Components

The major components are:

- Kafka Channel Controller
- Kafka Channel Dispatcher
- Kafka Webhook
- Kafka Config Map


The Kafka Channel Controller is located in one Pod:

```shell
kubectl get deployment -n knative-eventing kafka-ch-controller
```

The Kafka Channel Dispatcher receives and distributes all events to the appropriate consumers:

```shell
kubectl get deployment -n knative-eventing kafka-ch-dispatcher
```

The Kafka Webhook is used to validate and set defaults to `KafkaChannel` custom objects:

```shell
kubectl get deployment -n knative-eventing kafka-webhook
```

The Kafka Config Map is used to configure the `bootstrap_servers` of your Apache Kafka installation:

```shell
kubectl get configmap -n knative-eventing config-kafka
```