Skip to content

Kafka Channel Dispatcher#589

Merged
knative-prow-robot merged 4 commits into
knative:masterfrom
neosab:kafka_delivery
Nov 15, 2018
Merged

Kafka Channel Dispatcher#589
knative-prow-robot merged 4 commits into
knative:masterfrom
neosab:kafka_delivery

Conversation

@neosab
Copy link
Copy Markdown
Contributor

@neosab neosab commented Nov 7, 2018

Fixes #441

Proposed Changes

  • StatefulSet with the Kafka dispatcher
  • Kafka Channel controller creates a config map with subscribers (uses the same multichannelfanout config structure for uniformity)

TODO (in a subsequent PR)

Release Note

Adds a new Kafka Channel Provisioner 

@knative-prow-robot knative-prow-robot added the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Nov 7, 2018
@knative-prow-robot knative-prow-robot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Nov 7, 2018
@googlebot
Copy link
Copy Markdown

So there's good news and bad news.

👍 The good news is that everyone that needs to sign a CLA (the pull request submitter and all commit authors) have done so. Everything is all good there.

😕 The bad news is that it appears that one or more commits were authored or co-authored by someone other than the pull request submitter. We need to confirm that all authors are ok with their commits being contributed to this project. Please have them confirm that here in the pull request.

Note to project maintainer: This is a terminal state, meaning the cla/google commit status will not change from this state. It's up to you to confirm consent of all the commit author(s), set the cla label to yes (if enabled on your project), and then merge this pull request when appropriate.

@googlebot googlebot added the cla: no Indicates the PR's author has not signed the CLA. label Nov 7, 2018
@neosab neosab changed the title WIP: Kafka delivery WIP: Kafka Channel Dispatcher Nov 7, 2018
@neosab
Copy link
Copy Markdown
Contributor Author

neosab commented Nov 7, 2018

cc @matzew

This is still missing unit tests which I am happy to add once the dispatcher design is validated by others.

@neosab
Copy link
Copy Markdown
Contributor Author

neosab commented Nov 8, 2018

/test pull-knative-eventing-build-tests

Comment thread pkg/provisioners/kafka/dispatcher/dispatcher.go
@matzew
Copy link
Copy Markdown
Member

matzew commented Nov 8, 2018

@neosab Is that based on latest of master ?

@matzew
Copy link
Copy Markdown
Member

matzew commented Nov 8, 2018

/assign matzew

will take a look at this PR

@matzew
Copy link
Copy Markdown
Member

matzew commented Nov 8, 2018

/lgtm
/hold

as said, I got this working end-to-end, also nice to see messages in the kafka topics, using kafkacat

NOTE: other mentioned PRs need to go in first

@knative-prow-robot knative-prow-robot added lgtm Indicates that a PR is ready to be merged. do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. labels Nov 8, 2018
@knative-prow-robot knative-prow-robot removed the lgtm Indicates that a PR is ready to be merged. label Nov 9, 2018
Comment thread config/provisioners/kafka/kafka-provisioner.yaml Outdated
@neosab neosab changed the title WIP: Kafka Channel Dispatcher Kafka Channel Dispatcher Nov 9, 2018
@knative-prow-robot knative-prow-robot removed the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Nov 9, 2018
@neosab
Copy link
Copy Markdown
Contributor Author

neosab commented Nov 9, 2018

/assign @evankanderson

Tested that this works for me. I will work on moving the common code to helpers in a subsequent PR.

@matzew
Copy link
Copy Markdown
Member

matzew commented Nov 9, 2018

I will work on moving the common code to helpers in a subsequent PR.

👍 @neosab let's create an issue in Github for that

@googlebot
Copy link
Copy Markdown

CLAs look good, thanks!

@googlebot googlebot added cla: yes Indicates the PR's author has signed the CLA. and removed cla: no Indicates the PR's author has not signed the CLA. labels Nov 12, 2018
@matzew
Copy link
Copy Markdown
Member

matzew commented Nov 12, 2018

/hold cancel

@knative-prow-robot knative-prow-robot removed the do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. label Nov 12, 2018
@neosab
Copy link
Copy Markdown
Contributor Author

neosab commented Nov 13, 2018

/cc @adamharwayne

@knative-prow-robot
Copy link
Copy Markdown
Contributor

@neosab: GitHub didn't allow me to request PR reviews from the following users: adamharwayne.

Note that only knative members and repo collaborators can review this PR, and authors cannot review their own PRs.

Details

In response to this:

/cc @adamharwayne

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@neosab
Copy link
Copy Markdown
Contributor Author

neosab commented Nov 13, 2018

/cc @Harwayne

Didn't realize your github username changed :)

Copy link
Copy Markdown
Member

@evankanderson evankanderson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks fairly good, but I had a few comments on kubernetes interactions, including with the configmaps.

Comment thread config/provisioners/kafka/kafka-provisioner.yaml Outdated
- apiGroups:
- "" # Core API group.
resources:
- configmaps
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be all configmaps, or just the one?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dispatcher uses github.com/knative/pkg/configmap watcher that needs these cluster-scoped privileges.

Comment thread config/provisioners/kafka/kafka-provisioner.yaml Outdated
Comment thread config/provisioners/kafka/kafka-provisioner.yaml Outdated
volumes:
- name: kafka-channel-controller-config
configMap:
name: kafka-channel-controller-config
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't mark this optional and the ConfigMap isn't defined above, I think the pod will fail until the configmap is defined.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kafka-channel-controller-config is required as it contains the broker info and we check if it is defined while starting up https://github.com/knative/eventing/pull/589/files#diff-8634243f0697f44c13a150873e6cc457R72. Do you want to change this behavior?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering about crash-looping vs sitting and waiting for configuration, vs shipping a default configuration that the user might update later. I guess the current ClusterChannelProvisioner only supports a single set of bootstrap servers, so it may be that crash looping is the right choice for now. At some point in the future, it seems like we might want to support multiple Kafka clusters, at which point 0 clusters would be a valid configuration.

For now, this may be fine, but I have a sort of knee-jerk reaction to configs which fail unless the user has followed a precondition. Maybe a comment here or near the top of the file that creating this ConfigMap should be a precondition to applying this yaml?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The yaml actualy creates the kafka-channel-controller-config configmap in L78. It defaults to a broker URL that one would get if they created the kafka cluster using the sample in ./broker/kafka-broker.yaml. I will add a comment for the end-user to update this config map in cases where they use a different kafka cluster.


if value, ok := configMap[BrokerConfigMapKey]; ok {
bootstrapServers := strings.Split(value, ",")
if len(bootstrapServers) == 0 {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strings.Split will never return a zero-length array (it always returns at least one entry). I think you want to validate that all entries are non-empty (and add a test)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A test would have caught it :). I added it now.


config := &KafkaProvisionerConfig{}

if value, ok := configMap[BrokerConfigMapKey]; ok {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access to a non-existent value in a map will return a default value, so you could write this as simply:

config.Brokers := strings.Split(configMap[BrokerConfigMapKey], ",")
for _, s := range(config.Brokers) {
  if len(s) == 0 {
    return nil, fmt.Errorf("Empty bootstrap_servers in configuration %s: %q", path, configMap[BrokerConfigMapKey])
  }
}
return config, nil

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I misread it. Thanks! I am fixing it.

if _, ok := d.kafkaConsumers[channelRef][sub]; ok {
// subscribe can be called multiple times for the same subscription,
//unsubscribe before we resubscribe
err := d.unsubscribe(channelRef, sub)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we unsubscribe and resubscribe, will we potentially drop messages?

Again, this may be another TODO at the moment.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I am not sure why we need to unsubscribe and re-subscribe in case of existing subscription. I retained this from the original kafka bus implementation. cc @markfisher @scothis @matzew if they know.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to TODO and file an issue for this. At some point, we'll want to actually write some tests to stress this, I expect.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, filed #620

d.updateLock.Lock()
defer d.updateLock.Unlock()

if diff := d.ConfigDiff(config); diff != "" {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably if we end up running more than one replica, we'll need to change this to filter the subscriptions by some sort of hash of the replica ID? (Fine to leave as a TODO.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the comment on duplicate subscribers, if we run more replicas, the kafka consumer/subscription/replica will still be added to the same consumer group that will ensure exactly one of them consumes a partition. Wouldn't this eliminate the need to filter subscriptions for each replica?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if you have multiple replicas, won't they each attempt to consume from the same consumer group? Will all but one block (I'm not familiar enough with Kafka and this library to be sure, it's possible that this is simply a competing consumer model)?

Obviously, with only 1 partition, the benefit of replicas will be limited, but I'm guessing that we'll support wider fanout soon.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sarama-cluster library provides a balanced consumer group implementation on top of sarama. Even with multiple replicas, since consumer instances are part of the same consumer group, they coordinate so that at most one consumer will have access to a partition in a topic subscribed by the group.

Obviously, with only 1 partition, the benefit of replicas will be limited, but I'm guessing that we'll support wider fanout soon.

We already support NumPartitions as a channel argument.

Comment thread pkg/provisioners/kafka/dispatcher/dispatcher.go
@neosab
Copy link
Copy Markdown
Contributor Author

neosab commented Nov 14, 2018

@evankanderson I pushed an update and responded to your comments. Please take a look when it's possible.

@evankanderson
Copy link
Copy Markdown
Member

/approve

I'm going to leave to @matzew or @scothis to LGTM this, since I'm not a Kafka expert, but I'm okay with this going in with a few TODOs for later.

@knative-prow-robot
Copy link
Copy Markdown
Contributor

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: evankanderson, neosab

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@knative-prow-robot knative-prow-robot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Nov 15, 2018
@knative-metrics-robot
Copy link
Copy Markdown

The following is the coverage report on pkg/.
Say /test pull-knative-eventing-go-coverage to re-run this coverage report

File Old Coverage New Coverage Delta
pkg/provisioners/kafka/cmd/controller/main.go Do not exist 0.0%
pkg/provisioners/kafka/cmd/dispatcher/main.go Do not exist 0.0%
pkg/provisioners/kafka/controller/channel/reconcile.go 79.4% 76.1% -3.3
pkg/provisioners/kafka/controller/util.go Do not exist 100.0%
pkg/provisioners/kafka/dispatcher/dispatcher.go Do not exist 69.4%

@matzew
Copy link
Copy Markdown
Member

matzew commented Nov 15, 2018

/lgtm

there are a few minor things, there are some issues filed for that...

I tested (on Minishift) the k8sevent source demo, and used this channel:

apiVersion: eventing.knative.dev/v1alpha1
kind: Channel
metadata:
  name: testchannel
spec:
  provisioner:
    apiVersion: eventing.knative.dev/v1alpha1
    kind: ClusterChannelProvisioner
    name: kafka-channel

REceiving messages on the dumper, like:

2018/11/15 11:41:47 Message Dumper received a message: POST / HTTP/1.1
Host: message-dumper.myproject.svc.cluster.local
Accept-Encoding: gzip
Ce-Cloudeventsversion: 0.1
Ce-Eventid: 4a60a086-e8b2-11e8-8a83-52540070a16c
Ce-Eventtime: 2018-11-15T08:41:48Z
Ce-Eventtype: dev.knative.k8s.event
Ce-Source: /apis/v1/namespaces/myproject/services/my-cluster-kafka-0
Content-Length: 725
Content-Type: application/json
User-Agent: Go-http-client/1.1
X-B3-Parentspanid: a23806bdb563af6a
X-B3-Sampled: 1
X-B3-Spanid: eb1c6949dcf5174c
X-B3-Traceid: a23806bdb563af6a
X-Forwarded-For: 127.0.0.1
X-Forwarded-Proto: http
X-Request-Id: 4cc00841-3752-91ef-9d8a-a86552d465a7

{"metadata":{"name":"my-cluster-kafka-0.15673f956e57f476","namespace":"myproject","selfLink":"/api/v1/namespaces/myproject/events/my-cluster-kafka-0.15673f956e57f476","uid":"4a60a086-e8b2-11e8-8a83-52540070a16c","resourceVersion":"55770","creationTimestamp":"2018-11-15T08:41:48Z"},"involvedObject":{"kind":"Service","namespace":"myproject","name":"my-cluster-kafka-0","uid":"17a1fb19-e8b2-11e8-8a83-52540070a16c","apiVersion":"v1","resourceVersion":"7942"},"reason":"ExternalIP","message":"Count: 1 -\u003e 0","source":{"component":"service-controller"},"firstTimestamp":"2018-11-15T08:41:48Z","lastTimestamp":"2018-11-15T11:41:47Z","count":92,"type":"Normal","eventTime":null,"reportingComponent":"","reportingInstance":""}

``

@knative-prow-robot knative-prow-robot added the lgtm Indicates that a PR is ready to be merged. label Nov 15, 2018
@knative-prow-robot knative-prow-robot merged commit ac7ecfe into knative:master Nov 15, 2018
@grantr grantr added this to the v0.2.1 milestone Nov 28, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved Indicates a PR has been approved by an approver from all required OWNERS files. cla: yes Indicates the PR's author has signed the CLA. lgtm Indicates that a PR is ready to be merged. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants