Create MessageReceiver and MessageDispatcher utils#175
Conversation
|
@inlined on the eventing WG call today @ultrasaurus recommend I share this work with you. It overlaps the work you have to create a CloudEvent client. The key difference is not being coupled to CloudEvents while still being able to transport them. /hold |
|
|
||
| // DispatchMessage dispatches a message to a destination over HTTP. | ||
| // | ||
| // The destination is a DNS name. For destinations with a single label, the |
There was a problem hiding this comment.
I think the code shouldn't do that, and let kube-dns take care of that, at least for namespaced buses. A single label name is revolved in the namespace of the running pod. Also, we don't want to add the svc.cluster.local I think
There was a problem hiding this comment.
The issue with not doing this is that the expectations a subscription to a channel backed by a ClusterBus breaks. A subscription needing to know how a channel is implemented is 👎.
Because of an issue we ran into with Istio VirtualServices, only using the fqdn actually works when targeting a Route. Serving hard codes DNS names with .svc.cluster.local, so I'm following their lead. Maybe I should use that utility function?
| // The destination is a DNS name. For destinations with a single label, the | ||
| // default namespace is used to expand the destination into a fully qualified | ||
| // name within the cluster. | ||
| func (d *MessageDispatcher) DispatchMessage(destination string, defaultNamespace string, message *Message) error { |
There was a problem hiding this comment.
I think we'll want to either make this an interface so that we can introduce other implementations than http, or pass in more information than just a string destination
There was a problem hiding this comment.
This will largely depend on how the event transport protocol evolves. For now, I don't think there should be an interface for multiple protocols because that would force the bus to pick an implementation. The protocol implementations and negotiation should be encapsulated inside this method. Maybe at some point the MessageDispatcher could support pluggable protocols, but that would be a different interface.
There was a problem hiding this comment.
Then that's my second proposal: the method should accept a reference to the k8s Service rather than just a string destination.
There was a problem hiding this comment.
This behavior is in flight with the new Flow CRD. I want to defer any decision here until that stabilizes.
| } | ||
| go func() { | ||
| if err := srv.ListenAndServe(); err != nil { | ||
| // probably is an intentional close |
There was a problem hiding this comment.
You can test for actual intentional close, there is a dedicated error for that. Everything else is suspicious
|
|
||
| // Headers provide metadata about the message payload. All header keys | ||
| // should be lowercase. | ||
| Headers map[string]string |
There was a problem hiding this comment.
I understand multi valued headers are more complicated to handle, but I think we should not disregard them, coming from http. I'm especially thinking about Accept in the long run
There was a problem hiding this comment.
At some point we need to normalize the headers. I'd rather isolate the multi-value nature of http at the http layer. Especially since http has a mechanism to flatten multi-value name.
If we support multi-value maps, then for example, the gcppubsub bus would need to flatten the header map. The bus shouldn't know about http semantics, or that the message originated over http.
We now have three bus implementations that each create a web server to receiver messages, do something with that message, and then dispatch the message to subscribers. The MessageReceiver and MessageDispatcher utils allow a bus to focus on the value add logic of the bus and share more of the plumbing. Note: I'm using the term message here instead of event since we don't require that all traffic on a bus be CloudEvents. The Message struct is capable of transporting a CloudEvent without limiting other cases. The stub bus has been updated to use these new helpers. The gcppubsub and kafka buses still need to be updated.
|
Updated, PTAL
|
|
/hold cancel |
|
/assign @vaikas-google |
|
/lgtm |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: scothis, vaikas-google The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Drop sendevent and sequencestepper test images
`eventing-istio` uses `msg` args in tests. Minimal cherry-pick of: - knative@05f6d84 - knative@f6ca59b Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
We now have three bus implementations that each create a web server to receiver messages, do something with that message, and then dispatch the message to subscribers. The MessageReceiver and MessageDispatcher utils allow a bus to focus on the value add logic of the bus and share more of the plumbing. As the event transport protocol add support for protocols beyond HTTP, we can add support to these utils and each bus will gain the capability.
Note: I'm using the term message here instead of event since we don't require that all traffic on a bus be CloudEvents. The Message struct is capable of transporting a CloudEvent without limiting other cases.
The stub bus has been updated to use these new helpers. The gcppubsub and kafka buses still need to be updated.
Proposed Changes