Start on eventing support#465
Conversation
|
|
||
| import scala.collection.mutable | ||
|
|
||
| object MergeSequence { |
There was a problem hiding this comment.
Didn't this already get added to akka?
There was a problem hiding this comment.
Yes it did, but my initial aim was just to get the branch rebased and compiling, which is what this is.
👍 Sounds good to switch over to Akka projections. |
fd3f698 to
1ac498c
Compare
pvlugter
left a comment
There was a problem hiding this comment.
LGTM. There's a lot there, with some complexity in places. Examples and tests are looking good 👍
| def eventsByTagQuery(tag: String, fromOffset: Long): Source[EventEnvelope, NotUsed] = | ||
| // Technically, there's a race condition that could result in messages being dropped here, if a new message is | ||
| // persisted after this returns, but before the subscriber to the query materializes this stream, that message | ||
| // will be dropped. But this is only the in memory journal which makes no sense to use in production. |
There was a problem hiding this comment.
Fair enough that it's likely not needed. If we did want to close this gap, I guess it could retrieve the old messages more as a lazy source once materialized, or something similar.
|
Failing in the TCK. Unexpected connection type. We could have these go through different probes, so the tests don't affect each other, but it's also useful to know that something wasn't expected. |
|
The TCK failure is because I'd created an event sourced subscriber in the JS shopping cart example. |
| import scala.concurrent.Future | ||
| import scala.util.{Failure, Success} | ||
|
|
||
| final class InterceptActionService(context: InterceptorContext) { |
There was a problem hiding this comment.
Good, was just about to add the intercept for actions.
|
nice work! |
| // - The `Effect` method receives an `EffectRequest` message and must respond with a `Response` that contains the id | ||
| // from the effect message. | ||
| // - The `ProcessAnyEvent` method receives a `google.protobuf.Any`, which will contain JSON serialized according to | ||
| // the Cloudstate JSON serialization conventions, with a `type_url` of `json.cloudstate.io/JsonEvent`. The contents |
There was a problem hiding this comment.
I think the mandated value for the type (not the prefix) of json.cloudstate.io/JsonEvent here can't be enforced as different languages choose "a path determined using a language-specific mechanism" for the type serialized as per https://cloudstate.io/docs/contribute/serialization.html#json-values.
Although the TCK does not enforce the type_urldescribed here:
b722f86#diff-e685c4ba3ccc54f02fab8a427f5f505c63fc8edd56b92dab716e934ccacb2b9aR3295
There was a problem hiding this comment.
The suffix can be anything - but the proxy handles JSON in a particular way, so the TCK does indirectly enforce it.
There was a problem hiding this comment.
The suffix can be anything
Then this:
with a
type_urlofjson.cloudstate.io/JsonEvent
is inaccurate I think. The suffix is out of reach to be defined by the TCK model implemented. Instead it's what the language support has decided to be for a type of a languages types for the Cloudstate JSON serialisation implementation.
| // | ||
| // - Reply: reply with the given message in a `Response`. | ||
| // - Forward: forward to another service, in place of replying with a `Response`. | ||
| // - SideEffect: add a side effect to the current reply, forward, or failure. |
There was a problem hiding this comment.
SideEffect is missing yet in ProcessStep .
| // - Contain a single `message` property with the value of the `message` field in `JsonEvent`. | ||
| // - Be serialized according to the Cloudstate JSON serialization conventions - that is, with the JSON serialized to | ||
| // bytes, then placed into a protobuf message with a single bytes field with field number 1. | ||
| // - Have a type_url of `json.cloudstate.io/JsonEvent`. |
There was a problem hiding this comment.
The Java Support implementation does return here a JsonMessage, instead of a JsonEventas an Event.
Could we write that the type_url has to have a prefix of ``json.cloudstate.io/`?
This is the old pull request #353, rebased, and got to the point of compilation. Haven't tried running yet. My next major task is to switch to using Akka projections.