Kafka Bus Implementation#99
Conversation
|
Thanks for your pull request. It looks like this may be your first contribution to a Google open source project (if not, look below for help). Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). 📝 Please visit https://cla.developers.google.com/ to sign. Once you've signed (or fixed any issues), please reply here (e.g. What to do if you already signed the CLAIndividual signers
Corporate signers
|
|
/check-cla |
7c97c14 to
078ba14
Compare
|
CLAs look good, thanks! |
4e7e396 to
848bf9e
Compare
|
Rebased on latest master that now includes #88 |
| env: | ||
| - name: KAFKA_BROKERS | ||
| value: "kafkabroker.kafka:9092" | ||
| - name: BUS_NAME |
There was a problem hiding this comment.
BUS_NAME and BUS_NAMESPACE are set by the controller
| *out = new(InjectDelay) | ||
| **out = **in | ||
| } | ||
| *out = new(InjectDelay) |
There was a problem hiding this comment.
Why is codegen updating istio types?
There was a problem hiding this comment.
I really don't know. Maybe we don't reference a fixed version for Istio (or for codegen)?
|
|
||
| import ( | ||
| "flag" | ||
| "github.com/golang/glog" |
There was a problem hiding this comment.
No, somehow Goland did this. Fixing
|
|
||
| func main() { | ||
|
|
||
| kubeconfig := flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") |
There was a problem hiding this comment.
the convention is to define flags in the init func.
There was a problem hiding this comment.
That's only relevant if flags are not in the main package (which really shouldn't ever happen if you ask me), in case the package ends up being imported several times.
Putting flag declarations in init really has no benefit whatsoever. Worse, it encourages global vars
|
/retest |
| @@ -0,0 +1,305 @@ | |||
| /* | |||
| * Copyright 2018 the original author or authors. | |||
| } else { | ||
| return 0, fmt.Errorf("unsupported initialOffset value. Must be one of %s or %s", Oldest, Newest) | ||
| } | ||
| return initialOffset, nil |
There was a problem hiding this comment.
Good catch. This is a leftover from when I thought you could pass an actual int offset here, not just Oldest|Newest
Will fix
|
Rebased on latest master |
|
@evankanderson do you think we could add this PR to be included in the Knative launch project for demos? |
|
Rebased on latest master |
|
@vaikas-google could you please have a look at this PR, having a second non-trivial bus implementation seems valuable for launch /assign @vaikas-google |
vaikas
left a comment
There was a problem hiding this comment.
Couple of small questions, nits. Just curious about the failure model for messages that fail to be delivered.
| } | ||
| req, err := http.NewRequest(http.MethodPost, url.String(), bytes.NewReader(body)) | ||
| if err != nil { | ||
| glog.Errorf("Unable to create subscriber request %v", err) |
There was a problem hiding this comment.
Are these just dropped on the floor? How do we communicate errors from here and what happens in the error cases?
| } | ||
|
|
||
| func channelFromHost(host string) (channel string, namespace string) { | ||
| chunks := strings.Split(host, ".") |
There was a problem hiding this comment.
Should we check to make sure the host is valid form? Or check that we actually get two chunks back, otherwise L268 will panic?
There was a problem hiding this comment.
Will add code to prevent the panic. Paths that end up returning the zero string for either name of namespace will end up in HTTP 404 thanks to code further down
| hs := kafka2HttpHeaders(msg) | ||
| svc := fmt.Sprintf("%s.%s", subscription.Spec.Subscriber, subscription.Namespace) | ||
| d.dispatchEvent(svc, msg.Value, hs) | ||
| consumer.MarkOffset(msg, "") // Mark message as processed |
There was a problem hiding this comment.
Since dispatchEvent does not return error, seems like any delivery failure will result in dropped messages since we mark them as processed?
There was a problem hiding this comment.
True, (for now).
Could change signature to propagate errors, but infinite retry is not better. Would require some retry-and-giveup logic that seems out of scope of this initial PR, wdyt?
Will at least mark as TODO
| conf.ClientID = name + "-dispatcher" | ||
| kafka_client, err := sarama.NewClient(brokers, conf) | ||
| if err != nil { | ||
| glog.Fatalf("Error building kafka client: %s", err.Error()) |
There was a problem hiding this comment.
Just curious why not just log this as "...%v", err)
| subscriber. | ||
|
|
||
| To view logs: | ||
| - for the dispatcher `kail -d kafka-bus -c dispatcher` |
There was a problem hiding this comment.
We should probably add a pointer on how to install kail? Since it's used throughout the examples, wonder if this should be made part of the DEVELOPMENT.md (as optional?).
There was a problem hiding this comment.
Adding the same snippet that was just merged in other READMEs
| } | ||
|
|
||
| func topicNameFromChannel(channel *channelsv1alpha1.Channel) string { | ||
| topicName := fmt.Sprintf("%s.%s", channel.Namespace, channel.Name) |
There was a problem hiding this comment.
maybe just combine these two lines:
return fmt.Sprintf("...
Don't version IDE files Add sarama + sarama-cluster Handle initialOffset
|
Addressed review comments, rebased on latest master |
|
Created #167 to track dispatch error handling |
|
I would prefer changing the signature to return an error, and then ignroing
it with a TODO as you changed it.
…On Tue, Jul 10, 2018 at 8:41 AM Eric Bottard ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In pkg/buses/kafka/dispatcher/dispatcher.go
<#99 (comment)>:
> + if err != nil {
+ return err
+ }
+
+ d.consumers[subscriptionKeyFor(subscription)] = consumer
+
+ go func() {
+ for {
+ msg, more := <-consumer.Messages()
+ if more {
+ glog.Infof("Dispatching a message for subscription %s/%s: %s -> %s", subscription.Namespace,
+ subscription.Name, subscription.Spec.Channel, subscription.Spec.Subscriber)
+ hs := kafka2HttpHeaders(msg)
+ svc := fmt.Sprintf("%s.%s", subscription.Spec.Subscriber, subscription.Namespace)
+ d.dispatchEvent(svc, msg.Value, hs)
+ consumer.MarkOffset(msg, "") // Mark message as processed
True, (for now).
Could change signature to propagate errors, but infinite retry is not
better. Would require some retry-and-giveup logic that seems out of scope
of this initial PR, wdyt?
Will at least mark as TODO
—
You are receiving this because you were assigned.
Reply to this email directly, view it on GitHub
<#99 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AKwedJkHZ6L_TJyuckurcC7aU8MXve62ks5uFMs0gaJpZM4UvGO1>
.
|
|
/lgtm |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: ericbottard, vaikas-google The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
Awesome, thanks for you thorough review(s) @vaikas-google |
From [ROLES.MD](https://github.com/knative/docs/blob/dfc53c67c8e80d30b8863353c9e9b4ad00c41fa0/community/ROLES.md#approver): > Reviewer of the codebase for at least 3 months or 50% of project lifetime, whichever is shorter - [First Issue](knative#80). Opened 6/11 - [First PR](knative#66). Opened 5/31 - [First Review](knative#79 (review)) 6/11 > Primary reviewer for at least 10 substantial PRs to the codebase - knative#422 (review) - knative#414 (review) - knative#325 (review) - knative#225 (review) - knative#189 (review) - knative#168 (review) - knative#165 (review) - knative#99 (review) - knative#79 (review) - knative#111 (review) > Reviewed or merged at least 30 PRs to the codebase - [Reviewed 23 PRs](https://github.com/knative/eventing/pulls?utf8=✓&q=is%3Apr+reviewed-by%3Ascothis) - [Authored 34 merged PRs](https://github.com/knative/eventing/pulls?utf8=✓&q=is%3Apr+author%3Ascothis+is%3Amerged) - [Authored 5 open PRs](https://github.com/knative/eventing/pulls/scothis) > Nominated by an area lead From [WORKING_GROUPS.MD](https://github.com/knative/docs/blob/dfc53c67c8e80d30b8863353c9e9b4ad00c41fa0/community/WORKING-GROUPS.md#events) /assign @vaikas-google > With no objections from other leads 🤞 /cc @evankanderson @grantr @inlined @mattmoor
From [ROLES.MD](https://github.com/knative/docs/blob/dfc53c67c8e80d30b8863353c9e9b4ad00c41fa0/community/ROLES.md#approver): > Reviewer of the codebase for at least 3 months or 50% of project lifetime, whichever is shorter - [First Issue](#80). Opened 6/11 - [First PR](#66). Opened 5/31 - [First Review](#79 (review)) 6/11 > Primary reviewer for at least 10 substantial PRs to the codebase - #422 (review) - #414 (review) - #325 (review) - #225 (review) - #189 (review) - #168 (review) - #165 (review) - #99 (review) - #79 (review) - #111 (review) > Reviewed or merged at least 30 PRs to the codebase - [Reviewed 23 PRs](https://github.com/knative/eventing/pulls?utf8=✓&q=is%3Apr+reviewed-by%3Ascothis) - [Authored 34 merged PRs](https://github.com/knative/eventing/pulls?utf8=✓&q=is%3Apr+author%3Ascothis+is%3Amerged) - [Authored 5 open PRs](https://github.com/knative/eventing/pulls/scothis) > Nominated by an area lead From [WORKING_GROUPS.MD](https://github.com/knative/docs/blob/dfc53c67c8e80d30b8863353c9e9b4ad00c41fa0/community/WORKING-GROUPS.md#events) /assign @vaikas-google > With no objections from other leads 🤞 /cc @evankanderson @grantr @inlined @mattmoor
* Introduce bus backed by Kafka Don't version IDE files Add sarama + sarama-cluster Handle initialOffset * Address review comments * Anticipate ClusterBus * Less invasive update-deps * Rebase on latest master * Update old copyright headers * Remove unreachable code * Adapt to latest bus changes * Rebase against latest master * Use separate directory for kafka bus. Add README. * Address review comments * Add embryo of error handling while dispatching
From [ROLES.MD](https://github.com/knative/docs/blob/dfc53c67c8e80d30b8863353c9e9b4ad00c41fa0/community/ROLES.md#approver): > Reviewer of the codebase for at least 3 months or 50% of project lifetime, whichever is shorter - [First Issue](knative#80). Opened 6/11 - [First PR](knative#66). Opened 5/31 - [First Review](knative#79 (review)) 6/11 > Primary reviewer for at least 10 substantial PRs to the codebase - knative#422 (review) - knative#414 (review) - knative#325 (review) - knative#225 (review) - knative#189 (review) - knative#168 (review) - knative#165 (review) - knative#99 (review) - knative#79 (review) - knative#111 (review) > Reviewed or merged at least 30 PRs to the codebase - [Reviewed 23 PRs](https://github.com/knative/eventing/pulls?utf8=✓&q=is%3Apr+reviewed-by%3Ascothis) - [Authored 34 merged PRs](https://github.com/knative/eventing/pulls?utf8=✓&q=is%3Apr+author%3Ascothis+is%3Amerged) - [Authored 5 open PRs](https://github.com/knative/eventing/pulls/scothis) > Nominated by an area lead From [WORKING_GROUPS.MD](https://github.com/knative/docs/blob/dfc53c67c8e80d30b8863353c9e9b4ad00c41fa0/community/WORKING-GROUPS.md#events) /assign @vaikas-google > With no objections from other leads 🤞 /cc @evankanderson @grantr @inlined @mattmoor
latest, greatest filez
Fixes Issue #83
Similar to #98, this is effectively #88 with one commit on top.This introduces an implementation of Bus backed by Kafka.
The bus itself is split between provisioner (which creates/deletes topics on kafka) and dispatcher (which 1. accepts payloads over http and enqueues them in the topic and 2. consumes from topics and broadcasts to subscribers)
A 1-node Kafka broker (that can sit behind a service) setup is provided in
kafka-broker.yaml. It is expected to be applied in namespacekafka(change the broker address in the bus declaration accordingly if it is not)