Create a Sidecar for fanning out HTTP requests#435
Conversation
|
Since it is a WIP, some general feedback: Won't it be better to make this fanout sidecar stateless and pass the Channel meta w/ Subscriptions info in the request payload? What are your thoughts? Upside
Downside
|
|
I don't understand the rationale for this. Other than stub bus, I wouldn't expect any production-worthy bus implementation to be doing fanout this way. In cases where actual messaging systems are involved (Kafka, GCP PubSub, AWS Kinesis, Azure EventHub, RabbitMQ, Redis Streams, etc) there would be a dedicated consumer/subscription/binding per "call" at the middleware level so that dispatching, retries, and so on are handled independently. We had discussed moving the actual dispatcher responsibility to a sidecar (such as what's currently here: https://github.com/knative/eventing/blob/master/pkg/buses/message_dispatcher.go), but the fanout itself shouldn't be part of that in my opinion. |
Short answer: Long answer: We need something that will notice Subscription changes and do something to make sure the data-plane is updated. I prefer a Controller rather than the Pods themselves as the entity noticing Subscription changes will require fairly broad permissions to read from the K8s API server. I'm OK with Controllers having those permissions, but I don't want every Pod (that wants fanout) to require this broad of a permission. Now that something has noticed a Subscription change, how does it update the data plane? If the Pod itself notices the change, then we are done. If a Controller notices, then it can send the updated configuration down to the Pods via:
Both args and environment variables cannot be updated on a running Pod, so the Pod would need to be restarted. This seems suboptimal. RPC to the Pod seems fraught with danger. We would need to ensure the Pods are always called with the updated configuration and that if the Pod is restarted, it needs to be called again. Updating a ConfigMap/Secret is useful because its value can be read dynamically by containers in the Pod itself. The big downside is that there is a delay from when the ConfigMap is updated in the API server to when it is updated in the Pod (around 30 seconds IIRC). For now this seems OK, but we may need to rethink the design if this becomes a problem. So now that it is being sent in a ConfigMap, who within the Pod notices the changes and enacts them?
The big downside for the Main sidecar is that now each bus needs to know how to notice that the ConfigMap has changed. Which means multiple, subtly different implementations. Using the Fanout sidecar means that we can have a single implementation that is used by multiple buses. It also makes the buses themselves easier to write by removing the responsibility of noticing Subscription changes. So the overall contract this PR (currently) presents is that if the user wants to use this sidecar, then they must also add a ConfigMap volume to the Pod. They are responsible for ensuring that the ConfigMap is updated (I will likely write a controller for this soon). spec:
containers:
- name: main-container
...
- name: fanout-sidecar
image: github.com/knative/eventing/cmd/fanoutsidecar
ports:
- containerPort: 11235
volumeMounts:
- name: config-map
readOnly: true
mountPath: /etc/config/fanout_sidecar
volumes:
- name: config-map
configMap:
name: fanout-sidecar-configAll this having been said, the code is structured as a nested set of components. The ConfigMap reading portion is the outermost piece, so we could freely reuse the inner pieces in other ways. @markfisher This sidecar is meant to have a few pieces of common functionality. I agree that most production grade buses won't use the fanout portion. However, even the fanout code might be useful in places where the underlying messaging system doesn't scale past N consumers and downstream event duplication is OK. In the fullness of time, we will likely want to split Buses into control plane Controllers and data plane Delivery Adapters. At which point, the Delivery Adapters will need to be able to update Subscription info. Whether or not fanout is used, we could share the Subscription update code. Another common piece will be Subscription filtering. Once filtering on Subscriptions is designed, a sidecar implementing it will be an ideal place, so that all buses use the same implementation and are not responsible for writing it themselves. This PR has a stub for filtering, but can be used to start experimentation. |
0ded217 to
3e26c9d
Compare
|
@adamharwayne sorry for the delayed follow-up, been prepping for S1P...
do you have an actual example where this would be the case? I agree with the intent behind control/data-plane separation, and I think that will be the case even now for the Kafka "bus" moving to the provisioner-based model.
I actually don't see that as ideal since any "bus" (underlying middleware) that supports filtering directly should be used at the native level if that's sufficient for the type of filtering being applied, e.g. Bindings between TopicExchanges and Queues on RabbitMQ where hierarchical paths and wildcards can be provided so that only matched events even get pushed to the Queue from which our data plane adapter would be consuming. I also think anything beyond what can be done natively within the middleware would be better handled as an explicit Function (processing logic) rather than being embedded within the sidecar (transport logic). But I have a feeling @evankanderson agrees with you rather than me, at least currently ;-) |
|
@adamharwayne Thanks for the long explanation, it does clarify many things.
I was thinking that the pods would watch on the subscription changes and hence was wondering the need for the ConfigMap. But it makes sense for the controller to do so and update the pods.
That was also one of my concern about using a ConfigMap. Looks like the default kubelet
Hmm, we could improve this approach with say the controller periodically sync'ing the configuration with the pod. But I wouldn't venture into it because of the complexity involved.
But I agree that the ConfigMap approach seems reasonable for now. This K8S PR kubernetes/kubernetes#64752 looks promising and in the future work may be done to reduce the latency. |
|
@markfisher I've heard that Amazon Kinesis doesn't scale past ~5 consumers well, as it limits to 5 reads per second.
I think you are right, it is better to let the 'bus' itself filter the events if it can. I think the big open question is how much filtering will be built into Subscriptions.
As Subscriptions are identical across underlying Channels (e.g. a Subscription to a Kafka Channel looks the same as a Subscription to a Stub Bus Channel), does this imply that Subscriptions only have filtering that is common to all Channels? Which likely means no filtering at all. Do you think that we should have very basic filtering? Something along the lines of 'only send Events of type X or Y'. Or should that be a Function? If we have filtering at the Subscription level, I think that a Sidecar that implements it is useful. Buses that already handle the Filtering don't use the Sidecar. But all the others can use it without needing to duplicate the work. @neosab My guess is that we will later introduce multiple ways to get the updated subscription info. For now ConfigMap seems acceptable. Later, we will probably make that replaceable with one that has a Watch on the API server for much faster turn around time. Then the user can choose latency vs. permissions trade-off. |
|
@adamharwayne the 5-reads-per-second limit is per-shard on Kinesis, and there are 2 particularly relevant points from https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html:
also, it's important to consider that in a shard/partition-based model, any fanout of events that belong to the same shard/partition may introduce concurrency and thus violate ordering guarantees that the actual subscribing Functions might be relying upon |
|
@markfisher Yes, it is five reads per second per shard. Please correct me if I am wrong, but my understanding is that each shard is distinct. Such that if I send an event into a Kinesis stream, then it will end up in exactly one shard. So if I want twenty different Functions to respond to the one event I just sent, then all twenty Functions need to listen on the one shard where the event landed, so on average each Function can read every four seconds. I don't think I understand:
Is that saying that with fanout, a subscribing Function may get events out of order? In the sense that there are two events that should be sent in a single shard/partition, e1 and e2 (in that order). There are two subscribing Fuctions f2 and f3. The Channel sends e1, which fans out to f3 and f4. f3 ACKs, but f4 times out. In the time after f3 ACKs, but before f4 times out, the Channel sends e2, which fans out to f3 and f4. Then the Channel resends e1, which fans out to f3 and f4. So f3 saw e1, e2, e1. The last two being out of order. Is that the problem you foresee? If so, why not just disallow the Channel from sending e2 until e1 is completed. It will slow things down (as f3 needs to wait for f4 to time out and eventually ACK), but it can keep the ordering guarantee. |
|
@adamharwayne okay, yes I see your point when considering multiple functions subscribing to the same channel, where each is a distinct application/process with its own read operation... your example of 20 functions having to share the limited 5 reads-per-second makes sense. That said, I feel like this is a very Kinesis-specific situation for which users might want to make different tradeoffs, and the options should be made available in a Kinesis-specific way as well, meaning that the configuration options could be exposed on the Kinesis provisioner implementation. Not only that, but Kinesis will continue to evolve more options. In fact, there is already an "enhanced fan out" option: https://docs.aws.amazon.com/streams/latest/dev/introduction-to-enhanced-consumers.html - and as with many other limits for AWS services, it mentions that "If you need to register more than five consumers, you can request a limit increase". Many of those who really need it would be willing to pay for it (not to mention N<=5 Functions is probably the 80% case). My biggest concern is related to the "chain of responsibility" that we've discussed a bit in the Knative Channels doc. If a Sidecar is acting as a Fanout component, then it assumes responsibility, since it would be updating the actual read position for the given shard on behalf of all of the Functions to which it fans out. That means if reliable delivery is important for any of those Functions, then the retry logic within a Fanout Sidecar must itself be reliable, i.e. backed by a persistent store. And regarding the ordering discussion, if the retries are going to be issued in order to potentially multiple subscribers that had failures, then they would need to be queued up in some way where each Function continues to receive its retries in order, i.e. backed by a message queue as the persistent store. That leads to another option for handling this limitation which is to create a single "bridge" Subscription that effectively mirrors the Kinesis stream on another messaging system where read limits are not an issue (e.g. Kafka running on the cluster). Otherwise, the Fanout Sidecar is starting down the slippery slope of implementing a persistent reliably ordered messaging queue itself. I believe Functions should ideally be independent, (one Function's failure should not prevent another Function from continuing to process the event stream in order), able to consume at their own rate or even read events that were written to the stream before the Function registered an interest - and that alone is a significant tradeoff to be considered on a case-by-case basis. If this off-bus Fanout logic is implemented, I think it should be confined to the Kinesis "delivery adapter" implementation with an opt-in configuration parameter that clearly documents the tradeoffs with regard to the read-limits/latency vs. reliability/ordering/function-independence. (sorry this turned out to be such a lengthy reply!) |
|
@markfisher Thank you for sharing your concerns, they lead to a better product for our users.
The design of the fanout functionality is explicit: the sidecar will not maintain individual read positions for each downstream function. If any downstream function fails to ACK an event, then the sidecar NACKs to the upstream persistent store. So on the next attempt all downstream functions will see the same event again. The fanout functionality is meant to provide one possible trade-off between infrastructure usage and reliability. It is explicit, use this if you want to lower requirements on your persistent store and are OK with repeated events.
Ideally they should be. But to actually make this happen can require a trade-off in some other way (e.g. stop using Kinesis). The fanout functionality is intended to provide one possible trade-off. No Channel will be required to use it. Channels like Kafka have no need for the functionality and therefore won't use it. Perhaps the Kafka Channel will eventually use other portions of the sidecar (e.g. filtering), but even then it will always be the Provisioner author's choice to use it or not. We want to lower the barrier to entry for new Channels and the sidecar is meant to help.
I completely agree. The Kinesis Provisioner author will choose the trade-offs and document them appropriately. If they want to support multiple options, then they should expose the functionality to change them via parameters to the Provisioner. |
3e26c9d to
b7bdd18
Compare
…dated by either watching a directory or by a Controller style K8s watch.
| } | ||
|
|
||
| // ServeHTTP takes the request, fans it out to each subscription in f.config. If all the fanned out | ||
| // requests return successfully, then return successfully. Else, return failure. |
There was a problem hiding this comment.
this comment seems wrong, it doesn't return anything. Or if by failure you mean it modifies the http response, perhaps say that instead.
| } | ||
|
|
||
| func addTrackingHeader(r *http.Request) string { | ||
| // Use two random 63 bit ints, as a poor approximation of a UUID. |
There was a problem hiding this comment.
no need to change anything, but just curious why not just use UUID?
|
I reviewed the sidecar code ... here: #484 (review) cuz... reading is hard |
grantr
left a comment
There was a problem hiding this comment.
Thanks for blazing this trail @adamharwayne! I didn't finish reviewing the whole thing, but I'll submit what I've got for today. Hopefully my refactor suggestions are as easy to implement as I think they'll be. 😄
| limitations under the License. | ||
| */ | ||
|
|
||
| package parse |
There was a problem hiding this comment.
Why is this a separate package from multichannelfanout? I'm not arguing for or against, just curious why you chose to make it separate.
There was a problem hiding this comment.
My (flimsy) reasoning is that this takes in a map[string]string, which is unique to ConfigMaps.
I've moved the string->Config part into multichannelfanout. And moved the ConfigMap->String logic up one package, directly in configmap.
|
|
||
| // unmarshallJsonDisallowUnknownFields unmarshalls JSON, but unlike json.Unmarshall, will fail if | ||
| // given an unknown field (rather than json.Unmarshall's ignoring the unknown field). | ||
| func unmarshallJsonDisallowUnknownFields(jb []byte, v interface{}) error { |
There was a problem hiding this comment.
Only one l in Unmarshal. I'd suggest folding this method into the ConfigMapData function above - it might be more readable.
There was a problem hiding this comment.
Renamed.
I prefer to leave it separate, as I think it makes calling function clearer. If you feel strongly, I'll inline it.
…(and hopefully remove the flakiness).
…usly and then waiting on the response.
grantr
left a comment
There was a problem hiding this comment.
Finished! A summary of the changes I'd like to see before approving:
- call Shutdown before exiting
- remove test skip
- fix dispatch handler bottleneck (possibly already fixed according to @adamharwayne)
I have a few nits but they're optional.
| "sync/atomic" | ||
| ) | ||
|
|
||
| // http.Handler that atomically swaps between underlying handlers. |
There was a problem hiding this comment.
The structure of this package is similar to the examples given in the atomic.Value docs. I'm fine with either style as long as there's no read lock involved.
grantr
left a comment
There was a problem hiding this comment.
/lgtm
/approve
/hold
Holding for optional suggestion; cancel hold when ready.
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: adamharwayne, grantr The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
The following is the coverage report on pkg/.
|
|
/test pull-knative-eventing-integration-tests |
|
/lgtm |
…on_patch Adding patch for # 2653 from upstream ...
* skipping make command for P/Z. * Update e2e-common.sh --------- Co-authored-by: Alston Dias <105736505+AlstonDias@users.noreply.github.com>
Add a simple fanout sidecar. It will take a single incoming HTTP request and fan it out to N other endpoints, configured via ConfigMap. It is intended to make creation of future buses easier by handling the fanout to multiple subscribers.
Release Note