Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8ea7c13
Kafka Channel Provisioner Controllers
neosab Sep 27, 2018
6637554
Remove controllerRuntimeStart and address PR comments
neosab Sep 28, 2018
0dd7028
Merge remote-tracking branch 'upstream/master' into try_kafka_provisi…
neosab Sep 28, 2018
dd502b0
Merge remote-tracking branch 'upstream/master' into try_kafka_provisi…
neosab Oct 8, 2018
c25ce1b
Remove fetching configmap in controller
neosab Oct 8, 2018
2ec153b
Add more tests, improv coverage
neosab Oct 10, 2018
4c8232d
Add ChannelStatus.IsReady
neosab Oct 10, 2018
72a219a
Merge remote-tracking branch 'upstream/master' into try_kafka_provisi…
neosab Oct 11, 2018
00ab78e
Address PR comments
neosab Oct 15, 2018
5dd3164
Merge remote-tracking branch 'upstream/master' into try_kafka_provisi…
neosab Oct 15, 2018
514773c
Remove unfinished README
neosab Oct 15, 2018
8f83d7c
Switch from logr to zap
neosab Oct 15, 2018
356da32
Provision Channel as Kafka Topic
neosab Oct 10, 2018
bc5515d
Deprovision Channel
neosab Oct 16, 2018
53cb8e4
Merge remote-tracking branch 'upstream/master' into kafka_provision_t…
neosab Oct 18, 2018
9097c21
Merge remote-tracking branch 'upstream/master' into kafka_provision_t…
neosab Oct 24, 2018
dddeeec
Merge remote-tracking branch 'upstream/master' into try_kafka_provisi…
neosab Oct 29, 2018
86dd3f8
Merge matzew/try_kafka_provisioner into try_kafka_provisioner
matzew Oct 29, 2018
b66ec8f
Fix few more after pr 562
neosab Oct 29, 2018
e4131f4
Fix tests and imports
neosab Oct 29, 2018
712fbc4
PR feedback for removing ClusterChannelProvisioner name from configmap
neosab Oct 31, 2018
7577094
Merge remote-tracking branch 'upstream/master' into try_kafka_provisi…
neosab Oct 31, 2018
c5dc969
Adding instructions for Channel provisioner
matzew Nov 5, 2018
9934290
short cut code for missing ...
matzew Nov 5, 2018
8a54f24
Updating to latest Kafka client, and fixing idle bug
matzew Nov 5, 2018
de54ab9
Merge pull request #2 from matzew/try_kafka_provisioner
neosab Nov 6, 2018
99f2132
Merge remote-tracking branch 'upstream/master' into HEAD
neosab Nov 6, 2018
240c125
Fix conflicts and unit tests
neosab Nov 6, 2018
32aa096
Merge remote-tracking branch 'upstream/master' into try_kafka_provisi…
neosab Nov 8, 2018
e9dad0d
Address PR feedback
neosab Nov 8, 2018
0190820
Updating docs, based on feedback
matzew Nov 8, 2018
c301c95
Merge pull request #4 from matzew/doc_feedback
neosab Nov 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ required = [
name = "github.com/knative/serving"
version = "v0.1.1"

[[override]]
name = "github.com/Shopify/sarama"
version = "1.19.0"

[[constraint]]
name = "sigs.k8s.io/controller-runtime"
# HEAD as of 2018-09-19
Expand Down
78 changes: 78 additions & 0 deletions config/provisioners/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Apache Kafka Channels

Deployment steps:
1. Setup [Knative Eventing](../../../DEVELOPMENT.md)
1. If not done already, install an Apache Kafka cluster. There are two choices:
* Simple installation of [Apache Kafka](broker).
Comment thread
evankanderson marked this conversation as resolved.
* A production grade installation using the [Strimzi Kafka Operator](strimzi).
Installation [guides](http://strimzi.io/quickstarts/) are provided for
kubernetes and Openshift.

1. Now that Apache Kafka is installed, you need to configure the
`bootstrap_servers` value in the `kafka-channel-controller-config` ConfigMap,
located inside the `config/provisioners/kafka/kafka-provisioner.yaml` file:
```
...
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-channel-controller-config
namespace: knative-eventing
data:
# Broker URL's for the provisioner
bootstrap_servers: kafkabroker.kafka:9092
...
```
> Note: The `bootstrap_servers` needs to contain the address of at least
one broker of your Apache Kafka cluster. If you are using Strimzi, you need
to update the `bootstrap_servers` value to
`my-cluster-kafka-bootstrap.mynamespace:9092`.
1. Apply the 'Kafka' ClusterChannelProvisioner, Controller, and Dispatcher:
```
ko apply -f config/provisioners/kafka/kafka-provisioner.yaml
```
1. Create Channels that reference the 'kafka-channel'.

```yaml
apiVersion: eventing.knative.dev/v1alpha1
kind: Channel
metadata:
name: my-kafka-channel
spec:
provisioner:
apiVersion: eventing.knative.dev/v1alpha1
kind: ClusterChannelProvisioner
name: kafka-channel
```

## Components

The major components are:
* ClusterChannelProvisioner Controller
* Channel Controller
* Channel Controller Config Map.
* Channel Dispatcher
* Channel Dispatcher Config Map.

The ClusterChannelProvisioner Controller and the Channel Controller are colocated
in one Pod:
```shell
kubectl get deployment -n knative-eventing kafka-channel-controller
```

The Channel Controller Config Map is used to configure the `bootstrap_servers`
of your Apache Kafka installation:
```shell
kubectl get configmap -n knative-eventing kafka-channel-dispatcher-config-map
```

The Channel Dispatcher receives and distributes all events:
```shell
kubectl get statefulset -n knative-eventing kafka-channel-dispatcher
```

The Channel Dispatcher Config Map is used to send information about Channels and
Subscriptions from the Channel Controller to the Channel Dispatcher:
```shell
kubectl get configmap -n knative-eventing kafka-channel-dispatcher-config-map
```
13 changes: 13 additions & 0 deletions config/provisioners/kafka/broker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Apache Kafka - simple installation

1. For an installation of a simple (**non production**) Apache Kafka cluster, a setup is provided:
```
kubectl create namespace kafka
kubectl apply -n kafka -f kafka-broker.yaml
```
> Note: If you are running Knative on OpenShift you will need to run the following command first to allow the Kafka broker to run as root:
```
oc adm policy add-scc-to-user anyuid -z default -n kafka
```

Continue the configuration of Knative Eventing with [step `3`](../).
87 changes: 87 additions & 0 deletions config/provisioners/kafka/broker/kafka-broker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
########################################## KAFKA BROKER ######################################
# The following does not need to live in the same namespace as the bus.
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: kafka-broker
spec:
replicas: 1
template:
metadata:
labels:
app: kafka-broker
spec:
containers:
- name: kafka-broker
image: wurstmeister/kafka:1.1.0
ports:
- containerPort: 9092
env:
- name: MY_POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: KAFKA_BROKER_ID
value: "0"
- name: KAFKA_LISTENERS
value: "INTERNAL://:9093,EXTERNAL://:9092"
- name: KAFKA_ADVERTISED_LISTENERS
value: "INTERNAL://:9093,EXTERNAL://kafkabroker.$(MY_POD_NAMESPACE):9092"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "INTERNAL"
- name: KAFKA_ZOOKEEPER_CONNECT
value: "zookeeper.$(MY_POD_NAMESPACE):2181"
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
value: "false"
---
apiVersion: v1
kind: Service
metadata:
name: kafkabroker
spec:
type: NodePort
selector:
app: kafka-broker
ports:
- port: 9092
name: kafka
protocol: TCP
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: zookeeper
spec:
replicas: 1
template:
metadata:
labels:
app: zookeeper
spec:
containers:
- name: zookeeper
image: wurstmeister/zookeeper:3.4.6
ports:
- containerPort: 2181
env:
- name: ZOOKEEPER_ID
value: "1"
- name: ZOOKEEPER_SERVER_1
value: zookeeper

---
apiVersion: v1
kind: Service
metadata:
name: zookeeper
spec:
selector:
app: zookeeper
ports:
- port: 2181
name: zookeeper
protocol: TCP

85 changes: 85 additions & 0 deletions config/provisioners/kafka/kafka-provisioner.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright 2018 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: eventing.knative.dev/v1alpha1
kind: ClusterChannelProvisioner
metadata:
name: kafka-channel
spec: {}
---

apiVersion: v1
kind: ServiceAccount
metadata:
name: kafka-channel-controller
namespace: knative-eventing
---

kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: kafka-channel-controller
rules:
- apiGroups: ["eventing.knative.dev"]
resources: ["clusterchannelprovisioners", "channels"]
verbs: ["get", "watch", "list", "update"]
---

apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: kafka-channel-controller-manage
subjects:
- kind: ServiceAccount
name: kafka-channel-controller
namespace: knative-eventing
roleRef:
kind: ClusterRole
name: kafka-channel-controller
apiGroup: rbac.authorization.k8s.io
---

apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-channel-controller-config
namespace: knative-eventing
data:
# Broker URL's for the provisioner
bootstrap_servers: kafkabroker.kafka:9092
---

apiVersion: apps/v1beta1
kind: Deployment
metadata:
name: kafka-channel-controller
namespace: knative-eventing
spec:
replicas: 1
template:
metadata:
labels:
app: kafka-channel-controller
spec:
serviceAccountName: kafka-channel-controller
containers:
- name: kafka-channel-controller-controller
image: github.com/knative/eventing/pkg/provisioners/kafka
volumeMounts:
- name: kafka-channel-controller-config
mountPath: /etc/config-provisioner
volumes:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would tie the ClusterProvisioner to exactly this broker, right ?

Would it make sense to have a more generic approach, that the kafka channel clusterprovisioner could use the broker coordinates in the arguments of the channel, like:

apiVersion: eventing.knative.dev/v1alpha1
kind: Channel
metadata:
  name: my-kafka-channel
spec:
  provisioner:
    ref:
      apiVersion: eventing.knative.dev/v1alpha1
      kind: ClusterProvisioner
      name: kafka
  arguments:
    args:
      bootstrapservers: "something.somewhere.com:9092"    
     ...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, the broker info is read from the configmap. I tried to retain the old kafka bus' behavior for this initial work.

I am fine with having broker as an argument that overrides a default in the configmap. I am just thinking aloud if this would have any negative impact on the dispatcher. I hope not.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like this is resolved, but I'd want to be able to use a "default Kafka" as a developer without needing to carry around credentials & endpoint addresses on each object. One possible middleground would be to optionally reference a profile in the Channel, and then have the ConfigMap define the acceptable profiles.

Let's start with the simple one-Kafka-per-cluster approach, and then see what customer scenarios actually apply. (For example, the default might be a per-namespace one rather than a global one.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. For this initial implementation, we can keep the endpoint address on the config map.

- name: kafka-channel-controller-config
configMap:
name: kafka-channel-controller-config
5 changes: 5 additions & 0 deletions pkg/apis/eventing/v1alpha1/channel_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ func (cs *ChannelStatus) MarkProvisioned() {
chanCondSet.Manage(cs).MarkTrue(ChannelConditionProvisioned)
}

// MarkNotProvisioned sets ChannelConditionProvisioned condition to False state.
func (cs *ChannelStatus) MarkNotProvisioned(reason, messageFormat string, messageA ...interface{}) {
chanCondSet.Manage(cs).MarkFalse(ChannelConditionProvisioned, reason, messageFormat, messageA...)
}

// SetAddress makes this Channel addressable by setting the hostname. It also
// sets the ChannelConditionAddressable to true.
func (cs *ChannelStatus) SetAddress(hostname string) {
Expand Down
Loading