-
Notifications
You must be signed in to change notification settings - Fork 95
WIP - Eventing support #353
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| # Event Acknowledgement | ||
|
|
||
| One of the hardest challenges with a developer friendly event processing API is providing for simple, intuitive event acknowledgement, at the time that the developer means to acknowledge the event. This page seeks to define the problem, and propose solutions. | ||
|
|
||
| ## Problem definition | ||
|
|
||
| The simplest event processor is one in which there is a one to one mapping of events in to events out. In this use case, it's dead simple to know and understand when an incoming event should be acknowledged as processed - when the outgoing event is published, the incoming event should be acknowledged. | ||
|
|
||
| We are going to assume that the processing guarantees we want are at-least-once - so, the publish of the outgoing event and acknowledgment of the incoming event don't need to be atomic, after an outgoing event is published, and acknowledgment that that publish is complete (eg, synced to disk or replicated to a minimum number of nodes in the outgoing receiver), the incoming event can be acknowledged in a separate operation. If this acknowledgement fails, then the incoming message will be retried at some point later, resulting in a duplicate outgoing event being published, and so on, until the incoming message is acknowledged. Thus, at least once guarantees are maintained. For now, we will not consider either at most once, or strategies for achieving effectively exactly once processing. | ||
|
|
||
| Where it gets more challenging is when processing of events is not one to one. One to many is very common. Many to one is not as common in microservices (though in streaming data aggregation solutions is very common), but we will discuss it nevertheless. | ||
|
|
||
| ### Common use cases | ||
|
|
||
| We will assume a fairly well understood problem domain to frame our use cases in. We have an event sourced entity that represents a user, which is used to provide a user management service. This user entity might have events such as `UserCreated`, `PasswordChanged`, `EmailChanged`. These events are how the user entity internally represents a user, and their schema should be able to change freely to accommodate the needs of the user entity without affecting any downstream services that might consume these events. Migrations may be defined on them to evolve these events over time. For this reason, this stream is not published directly to external services, rather, an anti-corruption layer is used, that translates these events to an external representation of the same events. | ||
|
|
||
| Unlike the internal events from the event log, the external events cannot be changed freely, their schema forms a contract with the downstream services that consumes them. When the internal events change in their structure, the external events must not change in a way that will break backwards compatibility. | ||
|
|
||
| Note that in the use cases below, we're not trying to build a coherent model of a user management service, nor are we trying to design the perfect user management service. Rather, we're saying "let's assume that the business has requirement X", and then discuss how we might provide the mechanism to implement that. These requirements are a given, and hopefully are intuitively relatable, even if that's not how you would necessarily design a system to function. The point is, it's up to the business to make its own decisions, and Cloudstate to provide the mechanisms to make those decisions possible to implement, whatever they are. | ||
|
|
||
| #### Filtering | ||
|
|
||
| Not all events in the internal model will necessarily map to events in the external model. For example, a business decision might be that a users password might be the domain of the user management service and the user management service alone. When a `PasswordChanged` event is emitted from the event log, no event should then be published externally. Effectively, the `PasswordChanged` event must be filtered from the event stream. | ||
|
|
||
| The challenge in acknowledgement here is when should the `PasswordChanged` event be acknowledged, and how should the user function indicate this? Since no event is being output, the output of an event cannot be used to indicate that the `PasswordChanged` event should be acknowledged. Rather, it's the absence of an output event that should indicate that a `PasswordChanged` event should be acknowledged. But how do we differentiate between on output event, and slow processing of the input event? Does the Cloudstate proxy say "well, you haven't output anything for 10 seconds, so I'm going to acknowledge?". This puts a 10 second latency on the processing of all `PasswordChanged` events, so that solution doesn't work. Do we have a specific ack signal, which accompanies no event? | ||
|
|
||
| #### Unfolding | ||
|
|
||
| Sometimes, one input event may need to be unfolded into multiple events. In Akka Streams parlance, this would be a `mapConcat`. Consider the case where in the internal event store, your `UserCreated` event contains an email address, but a business decision is that the external event published when a user created does not contain an email, rather, it should be followed by an email changed event. So, when we process a `UserCreated` internal event, we need to output two external events, a `UserCreated` and an `EmailChanged` event. | ||
|
|
||
| In this case, we do not want to acknowledge the `UserCreated` internal event until both the external events are published. If just the first one is published, and the second one fails, the internal event should not be acknowledged, and it should be reprocessed later. | ||
|
|
||
| So again, the challenge is communicating on which event should the internal event be acknowledged. The mere signal of an output event does not convey enough information to indicate this, it's only the successful publish of the last event from the expanding list of events that should trigger acknowledgement. | ||
|
|
||
| #### Folding | ||
|
|
||
| Sometimes, multiple events need to be folded into a single event. Note here that we're not talking about folding an entire stream into a single value, we're talking about combining multiple events in a stream into a single value. For a use case, the converse of the unfolding scenario applies, perhaps internally, you have a `UserCreated` and `EmailChanged` event, but a business decision is that the external `UserCreated` event should combine these too. | ||
|
|
||
| The challenge with folding is that it is inherently stateful. You need to track events that are in the process of being folded, and delay acknowledging them until the fold is complete. | ||
|
|
||
| However, doing this can be dangerous, and lead to deadlocks. The reason being that typically, the number of unacknowledged events must be limited because unacknowledged events require buffering to track, so that once the head of the queue gets acknowledged, the rest too can be acknowledged. This does depend on message broker though, for message brokers that track acknowledgements to each message independently, it may be fine, but for offset based message brokers like Kafka, it's a problem. Consider the case where a single entity may emit a `UserCreated` and then an `EmailChanged` event, but concurrently, many other entities emit events, so between those two events in the stream for all user events there may be tens, hundreds or thousands of events. If the number of events interleaved here is greater than the configured max outstanding events, then a deadlock will be reached, where the `UserCreated` event can't be acknowledged because the `EmailChanged` event has not yet been received, but the `EmailChanged` event is not being received because the limit of unacknowledged events has been reached, which is being blocked by the waiting `UserCreated` event acknowledgement. | ||
|
|
||
| For this reason, such stateful folds are best done with a persistent save point in the middle, such that the first event can be acknowledged immediately. In this case, what we are essentially doing is a filter with a side effect to a persistent store - when we receive the `UserCreated` event, we persist that we've received it, and emit no event, then when we receive the `EmailChanged` event, we load the persisted state that was caused by the `UserCreated` event, and emit the corresponding folded event. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think things like this quickly devolve into CEP territory—and should probably be handled using some form of temporal query language. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @viktorklang I remember we talked about it a lot in the past. I think it would be possible to build this on top of that implementation and maintain both approaches in the future. Wdyt?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sleipnir It's a tricky path to walk—power&flexibility vs performance&satefy (performance is possible due to making assumptions) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @viktorklang Yes. I know it will not be an easy journey.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've been thinking a fair amount about this - if a particular projection can be feasibly described using a query DSL (eg, sql like, or maybe even graphql), then most likely, that projection wouldn't be defined on the event schema, but a state schema. ie, to define the projection, you would first need to define event handlers that translated the event handlers to some state object, then you could write your query DSL over the schema of that state object. To explain, let's say a user has a list of email addresses, and you have a projection of email addresses to users. Your event model has Having to do this for each projection would effectively nullify the benefits of using a high level query language, since you'd still have to write code to handle the events, and in many cases, the code would be simpler if you didn't go through the intermediate state representation anyway. Now, users have already defined handlers for their events, in the entity itself. One of the principles of CQRS is that read side (including the projections) should not read the state model of the write side - but I'm going to challenge that. Yes, there are advantages of separating the two, but it's not an all or nothing thing, it's not like the moment the projection reads the write side state, that you lose all advantages of CQRS and there's no point in doing CQRS at all. It's a trade off. There is a cost associated with reading from the write side, and there is value from doing it too. And I think, in many cases, the value far exceeds the costs - if you can define a projection simply by specifying an SQL statement on your write side state model, that has a huge amount of value. And what's the cost? Your write side state model can't be evolved without modifying your projection? That's a small price to pay. They can't be scaled independently? Typically they'll live in the same service consuming the same database, their independent scaling ability was already very limited. You can't swap out your write model for something else in a different database etc? Well, you can, but to do it you'll just need to keep the event handlers from your old write model around so that your read side projections can still use them - the result of which is that you've just delayed having to write two sets of event handlers until you actually needed two sets of event handlers, which is a good thing, don't duplicate until there's a need to. Now there are two possible approaches to defining a projection on the write side state, they differ in which state is used to define the projection. One is that the state corresponding to the event being handled is used, the other is that the current state on the write side is used. The first approach requires each projection to store the state itself. When an event is emitted, the current state for that projection is loaded, the event is applied to it using the write side event handlers, the projection handler is then run against the state, and then the state is persisted. This means storage of the state is duplicated for every single projection, which is a major downside of this approach. However, this approach also has advantages, if the state model does end up needing to diverge between the projection and the write side, that's easy to do, because the projection actually isn't sharing state with the write side, it's only reusing the event handler code to maintain its own state, which because it's the same code makes the state identical, but not shared, so at some point, you can effectively fork the projection from the write side, by duplicating the event handlers, and evolving them separately. The second approach is, when an event is emitted, ignore the event itself, and instead read the current state from the write side, and apply that to the projection. There are a few advantages to this:
The disadvantages:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think a really common use-case is to be able to dynamically reorder a projection (think "SORT BY lastName ASC|DESC") as well as consuming parts of a projection (OFFSET + LIMIT) etc.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wouldn't describe that as "reordering a projection", I don't think "order" is a property of a projection, I'd rather say some projections are built using indexes that support ordering. On the topic of ordering and retrieving multiple results - this is where the Cloudstate model doesn't currently have an answer. All of our state models, both current and proposed, are modelled around commands on single entities - a command has an entity key, and the value for that key is loaded through the state model mechanism. This model allows Cloudstate to model the state in a very opinionated way, always passing it to the user code instead of the user code requesting it. But as soon as you get to multiple values, there's no longer an inherent way to identify those values, a single entity key doesn't cut it, nor do multiple entity keys cut it because many queries, you don't know what the entity keys are before you get the values (that's the point of projections). I don't think we can model this in any way other than having user code request it. Also for these types of queries, often you will have multiple indexes ad-hoc criteria for filtering, this is where abstracting the database will cause us problems - we'll have to create our own query language if we want to do that, and we don't want to do that. That's where I think there will be use cases for user code to speak directly to databases using that databases query language.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's where I come back to the thoughts around exposing query languages for projections, which means that we'd much cheaper be able to support different kinds of projections (graph, relational, text, etc) |
||
|
|
||
|
|
||
|
|
||
| ## Developer experience | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This space intentionally left blank? :)
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep - we don't do developer experience.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Re-think please :) |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| /* | ||
| * Copyright 2019 Lightbend Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| /** | ||
| * CloudEvent data. | ||
| * | ||
| * @interface module:cloudstate.CloudEvent | ||
| * @property {string} specversion The CloudEvent spec version | ||
| */ | ||
| function toCloudevent(metadata) { | ||
| return { | ||
| get specversion() { | ||
| return metadata["ce-specversion"]; | ||
| }, | ||
| get id() { | ||
| return metadata["ce-id"]; | ||
| }, | ||
| set id(id) { | ||
| metadata["ce-id"] = id; | ||
| }, | ||
| get source() { | ||
| return metadata["ce-source"]; | ||
| }, | ||
| set source(source) { | ||
| metadata["ce-source"] = source; | ||
| }, | ||
| get type() { | ||
| return metadata["ce-type"]; | ||
| }, | ||
| set type(type) { | ||
| metadata["ce-type"] = type; | ||
| }, | ||
| get datacontenttype() { | ||
| return metadata["Content-Type"]; | ||
| }, | ||
| set datacontenttype(datacontenttype) { | ||
| metadata["Content-Type"] = datacontentype; | ||
| }, | ||
| get dataschema() { | ||
| return metadata["ce-dataschema"]; | ||
| }, | ||
| set dataschema(dataschema) { | ||
| metadata["ce-dataschema"] = dataschema; | ||
| }, | ||
| get subject() { | ||
| return metadata["ce-subject"]; | ||
| }, | ||
| set subject(subject) { | ||
| metadata["ce-subject"] = subject; | ||
| }, | ||
| get time() { | ||
| return metadata["ce-time"]; | ||
| }, | ||
| set time(time) { | ||
| metadata["ce-time"] = time; | ||
| }, | ||
| }; | ||
| } | ||
|
|
||
| module.exports = { | ||
| toCloudevent | ||
| }; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| /* | ||
| * Copyright 2019 Lightbend Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| module.exports = class ContextFailure extends Error { | ||
| constructor(msg) { | ||
| super(msg); | ||
| if (Error.captureStackTrace) { | ||
| Error.captureStackTrace(this, ContextFailure); | ||
| } | ||
| this.name = "ContextFailure"; | ||
| } | ||
| }; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| /* | ||
| * Copyright 2019 Lightbend Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| const AnySupport = require("./protobuf-any"); | ||
| const util = require("util"); | ||
|
|
||
| module.exports = class EffectSerializer { | ||
|
|
||
| constructor(allEntities) { | ||
| this.allEntities = allEntities; | ||
| } | ||
|
|
||
| serializeEffect(method, message) { | ||
| let serviceName, commandName; | ||
| // We support either the grpc method, or a protobufjs method being passed | ||
| if (typeof method.path === "string") { | ||
| const r = new RegExp("^/([^/]+)/([^/]+)$").exec(method.path); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah it should be pulled out into a constant, but not for performance reasons, v8 caches compiled regexes: > console.time("1"); new RegExp("^/([^/]+)/([^/]+)$").exec("foo/bar"); console.timeEnd("1");
1: 0.108ms
undefined
> console.time("2"); new RegExp("^/([^/]+)/([^/]+)$").exec("foo/bar"); console.timeEnd("2");
2: 0.011ms
undefined
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting! |
||
| if (r == null) { | ||
| throw new Error(util.format("Not a valid gRPC method path '%s' on object '%o'", method.path, method)); | ||
| } | ||
| serviceName = r[1]; | ||
| commandName = r[2]; | ||
| } else if (method.type === "rpc") { | ||
| serviceName = this.fullName(method.parent); | ||
| commandName = method.name; | ||
| } | ||
|
|
||
| const service = this.allEntities[serviceName]; | ||
|
|
||
| if (service !== undefined) { | ||
| const command = service.methods[commandName]; | ||
| if (command !== undefined) { | ||
| const payload = AnySupport.serialize(command.resolvedRequestType.create(message), false, false); | ||
| return { | ||
| serviceName: serviceName, | ||
| commandName: commandName, | ||
| payload: payload | ||
| }; | ||
| } else { | ||
| throw new Error(util.format("Command [%s] unknown on service [%s].", commandName, serviceName)) | ||
| } | ||
| } else { | ||
| throw new Error(util.format("Service [%s] has not been registered as an entity in this user function, and so can't be used as a side effect or forward.", service)) | ||
| } | ||
| } | ||
|
|
||
| fullName(item) { | ||
| if (item.parent && item.parent.name !== "") { | ||
| return this.fullName(item.parent) + "." + item.name; | ||
| } else { | ||
| return item.name; | ||
| } | ||
| } | ||
|
|
||
| serializeSideEffect(method, message, synchronous) { | ||
| const msg = this.serializeEffect(method, message); | ||
| msg.synchronous = synchronous; | ||
| return msg; | ||
| } | ||
|
|
||
| }; | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consequently, the filtering of the event then could be the indicator for an acknowledgement then? where "filtered out" means "this input event does not produce an output event". I'm not sure how to model that. Using metadata is perhaps awkward but assuming having a silent-acknowledgment after n seconds same so.
Could the proxy determine even before he sends an event to the user function if an event will not lead to an output event?
PasswordChangedcould be modelled so that the proxy can assume an acknowledgment on the successful reception by the user function. This also leads to the question of, when does the proxy know about a successful reception? An out of band ack-signal might be still better. If traffic does not count, immediately sent to the proxy or coalesced a bit later.In general, what happens if an event is slowly processed, say >10 seconds? Is it assumed to be timed out at 10+n seconds? The same as the incoming event can be resent, described in "Problem Statement", this could lead to a retry by the proxy if the user function does not "respond" within a reasonable amount of time.
I think, without the proxy knowing that an input event for a certain user function does not lead to an output event (modelled by the event type itself), the user function might have to acknowledge the event by an out-of-band signal or a "not-to-be-published" output event sent explicitly.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I may comment here I think we should have an ack signal that does not produce events but that is useful for the user's knowledge of what is happening and I think there may be ack strategies that the user can use, so regardless of the strategy used the fact is that the user you will have explicitly agreed with it and will be aware of the system's behavior. By strategy we could for example have strategies defined by windowning, batch or specific user timeouts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw... I initially wrote this document just to collect my own thoughts. I didn't finish it, and it's just a raw collection of thoughts. When I created this PR, I noticed the document was there, and thought well maybe it'll be useful to help others understand the context.
So, the solution I ended up going with was the following:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jroper I agree with this line of thinking. I personally believe that explicit acking is going to be a recipe for a lot of bugs, so steering clear of that seems like a big win.