add support for Subsciption.replyTo property#325
Conversation
|
/assign @evankanderson |
c7be6f8 to
0d139e6
Compare
|
/test pull-knative-eventing-integration-tests |
| subscription.Name, subscription.Spec.Channel, subscription.Spec.Subscriber) | ||
| message := fromKafkaMessage(msg) | ||
| err := d.messageDispatcher.DispatchMessage(subscription.Spec.Subscriber, subscription.Namespace, message) | ||
| err := d.messageDispatcher.DispatchMessage(subscription.Spec.Subscriber, subscription.Spec.ReplyTo, subscription.Namespace, message) |
There was a problem hiding this comment.
These feels like we should wrap the message and replyTo in a struct and then DispatchMessage is (subscriber, event)?
There was a problem hiding this comment.
Should we consider whether subscriber and namespace also belong in the struct? (would be a single "envelope" message parameter if so).
WDYT @scothis? I addressed all of the other comments (so far) besides this one.
There was a problem hiding this comment.
The message and destination feel fundamental to me. I'd be in favor of something like:
func (d *MessageDispatcher) DispatchMessage(message *Message, destination string, opts *DispatchOpts) error
| if err != nil { | ||
| return fmt.Errorf("Unable to complete request %v", err) | ||
| } | ||
| if replyTo != "" && res != nil { |
There was a problem hiding this comment.
wrap this in executeReply?
There was a problem hiding this comment.
after refactoring so that executeRequest returns a Message (based on the suggestion by @scothis below), this will be reduced to a single call to executeRequest for the reply Message
scothis
left a comment
There was a problem hiding this comment.
I really like the direction, a few nits inline
| return nil | ||
| } | ||
|
|
||
| func (d *MessageDispatcher) executeRequest(destination string, defaultNamespace string, message *Message) (*http.Response, error) { |
There was a problem hiding this comment.
executeRequest should return a Message rather than an http response so that the http transport remains encapsulated.
| // The destination and replyTo are DNS names. For names with a single label, | ||
| // the default namespace is used to expand it into a fully qualified name | ||
| // within the cluster. | ||
| func (d *MessageDispatcher) DispatchMessage(destination string, replyTo string, defaultNamespace string, message *Message) error { |
There was a problem hiding this comment.
perhaps rename replyTo to defaultReplyTo to indicate in the future there may be other sources of metadata to configure replies.
| return fmt.Errorf("Unable to read response %v", err) | ||
| } | ||
| replyMessage := Message{headers, payload} | ||
| d.executeRequest(replyTo, defaultNamespace, &replyMessage) |
There was a problem hiding this comment.
should an error sending the reply be returned instead of suppressed?
|
|
||
| var forwardPrefixes = []string{ | ||
| // knative | ||
| "x-knative-", |
There was a problem hiding this comment.
maybe just knative- since the x- prefix is generally discouraged
0fb1607 to
bbe2fb6
Compare
| Subscriber string `json:"subscriber"` | ||
|
|
||
| // Target service DNS name for replies returned by the subscriber. | ||
| ReplyTo string `json:"replyTo"` |
There was a problem hiding this comment.
I think replyTo is optional, so the json annotation should be json:"replyTo,omitempty"
There was a problem hiding this comment.
indeed, good catch!
|
/lgtm |
| Payload: pubsubMessage.Data, | ||
| } | ||
| err := b.messageDispatcher.DispatchMessage(subscriber, namespace, message) | ||
| defaults := buses.DispatchDefaults{Namespace: sub.Namespace, ReplyTo: sub.Spec.ReplyTo} |
There was a problem hiding this comment.
nit: put each property on a dedicated line
| headers := d.fromHTTPHeaders(res.Header) | ||
| // TODO: add configurable whitelisting of propagated headers/prefixes (configmap?) | ||
| correlationID := message.Headers[correlationIDHeaderName] | ||
| if correlationID != "" { |
There was a problem hiding this comment.
this can be combined with the previous line:
if correlationID, ok := message.Headers[correlationIDHeaderName]; ok {
headers[correlationIDHeaderName] = correlationID
}
bbe2fb6 to
ee11b04
Compare
ee11b04 to
c450495
Compare
c450495 to
a23834d
Compare
|
/lgtm /test pull-knative-eventing-integration-tests |
|
/lgtm In the future, we should be smarter about when we create a reply message. Generally, I'd only expect a reply to be created for a 2xx status codes that also has a non-empty body. |
|
please have a look when you can @evankanderson thanks! |
|
Sorry about that, this fell off my radar. I'm okay experimenting with this approach, but I have a few concerns:
At this point, I'm willing to approve this PR as long as we all agree that we aren't too attached to this implementation yet, and that's it's more of a "spike" than a designed and accepted pattern. We should probably have a discussion over the next few weeks about how to enable this sort of experimentation to proceed without confusing the overall repo between code that we're trying to raise to production quality and experimental forays that aren't part of our roadmap at this point. Maybe annotations and/or PoC branches would be a good way to manage this? |
|
/approve but I'd really like to see us focus on raising the coverage numbers for |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: evankanderson, markfisher, n3wscott The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
@evankanderson I completely agree this should be treated as just an initial spike from which we can experiment... for example I have this rough PoC for request/reply correlation (your point 2, also makes use of the correlation-id you mention in point 4) and I'll start working on something similar for the "activation claim check" pattern (what you referred to as "mailbox/cubbyhole" in point 1). The key is that this initial layer of "chaining" can be a common foundation for both approaches (and likely others). Thanks for the approval. Looking forward to the next steps! |
* add support for Subsciption.replyTo property fixes knative#216 * addressing review comments
From [ROLES.MD](https://github.com/knative/docs/blob/dfc53c67c8e80d30b8863353c9e9b4ad00c41fa0/community/ROLES.md#approver): > Reviewer of the codebase for at least 3 months or 50% of project lifetime, whichever is shorter - [First Issue](knative#80). Opened 6/11 - [First PR](knative#66). Opened 5/31 - [First Review](knative#79 (review)) 6/11 > Primary reviewer for at least 10 substantial PRs to the codebase - knative#422 (review) - knative#414 (review) - knative#325 (review) - knative#225 (review) - knative#189 (review) - knative#168 (review) - knative#165 (review) - knative#99 (review) - knative#79 (review) - knative#111 (review) > Reviewed or merged at least 30 PRs to the codebase - [Reviewed 23 PRs](https://github.com/knative/eventing/pulls?utf8=✓&q=is%3Apr+reviewed-by%3Ascothis) - [Authored 34 merged PRs](https://github.com/knative/eventing/pulls?utf8=✓&q=is%3Apr+author%3Ascothis+is%3Amerged) - [Authored 5 open PRs](https://github.com/knative/eventing/pulls/scothis) > Nominated by an area lead From [WORKING_GROUPS.MD](https://github.com/knative/docs/blob/dfc53c67c8e80d30b8863353c9e9b4ad00c41fa0/community/WORKING-GROUPS.md#events) /assign @vaikas-google > With no objections from other leads 🤞 /cc @evankanderson @grantr @inlined @mattmoor
From [ROLES.MD](https://github.com/knative/docs/blob/dfc53c67c8e80d30b8863353c9e9b4ad00c41fa0/community/ROLES.md#approver): > Reviewer of the codebase for at least 3 months or 50% of project lifetime, whichever is shorter - [First Issue](#80). Opened 6/11 - [First PR](#66). Opened 5/31 - [First Review](#79 (review)) 6/11 > Primary reviewer for at least 10 substantial PRs to the codebase - #422 (review) - #414 (review) - #325 (review) - #225 (review) - #189 (review) - #168 (review) - #165 (review) - #99 (review) - #79 (review) - #111 (review) > Reviewed or merged at least 30 PRs to the codebase - [Reviewed 23 PRs](https://github.com/knative/eventing/pulls?utf8=✓&q=is%3Apr+reviewed-by%3Ascothis) - [Authored 34 merged PRs](https://github.com/knative/eventing/pulls?utf8=✓&q=is%3Apr+author%3Ascothis+is%3Amerged) - [Authored 5 open PRs](https://github.com/knative/eventing/pulls/scothis) > Nominated by an area lead From [WORKING_GROUPS.MD](https://github.com/knative/docs/blob/dfc53c67c8e80d30b8863353c9e9b4ad00c41fa0/community/WORKING-GROUPS.md#events) /assign @vaikas-google > With no objections from other leads 🤞 /cc @evankanderson @grantr @inlined @mattmoor
From [ROLES.MD](https://github.com/knative/docs/blob/dfc53c67c8e80d30b8863353c9e9b4ad00c41fa0/community/ROLES.md#approver): > Reviewer of the codebase for at least 3 months or 50% of project lifetime, whichever is shorter - [First Issue](knative#80). Opened 6/11 - [First PR](knative#66). Opened 5/31 - [First Review](knative#79 (review)) 6/11 > Primary reviewer for at least 10 substantial PRs to the codebase - knative#422 (review) - knative#414 (review) - knative#325 (review) - knative#225 (review) - knative#189 (review) - knative#168 (review) - knative#165 (review) - knative#99 (review) - knative#79 (review) - knative#111 (review) > Reviewed or merged at least 30 PRs to the codebase - [Reviewed 23 PRs](https://github.com/knative/eventing/pulls?utf8=✓&q=is%3Apr+reviewed-by%3Ascothis) - [Authored 34 merged PRs](https://github.com/knative/eventing/pulls?utf8=✓&q=is%3Apr+author%3Ascothis+is%3Amerged) - [Authored 5 open PRs](https://github.com/knative/eventing/pulls/scothis) > Nominated by an area lead From [WORKING_GROUPS.MD](https://github.com/knative/docs/blob/dfc53c67c8e80d30b8863353c9e9b4ad00c41fa0/community/WORKING-GROUPS.md#events) /assign @vaikas-google > With no objections from other leads 🤞 /cc @evankanderson @grantr @inlined @mattmoor
Co-authored-by: pierDipi <pierDipi@users.noreply.github.com>
Fixes #216
Proposed Changes