From 5f2389ddc9ba4ca4bed8fe782cad31e7abf82591 Mon Sep 17 00:00:00 2001 From: Marcel Lanz Date: Sat, 12 Oct 2019 00:35:07 +0200 Subject: [PATCH 1/7] [go-support] as discussed in weekly call 2019-10-08: go_packages added to the main repo. --- protocols/example/shoppingcart/persistence/domain.proto | 2 ++ protocols/example/shoppingcart/shoppingcart.proto | 2 ++ protocols/frontend/cloudstate/entity_key.proto | 1 + protocols/protocol/cloudstate/crdt.proto | 2 +- protocols/protocol/cloudstate/entity.proto | 1 + protocols/protocol/cloudstate/event_sourced.proto | 1 + protocols/protocol/cloudstate/function.proto | 1 + 7 files changed, 9 insertions(+), 1 deletion(-) diff --git a/protocols/example/shoppingcart/persistence/domain.proto b/protocols/example/shoppingcart/persistence/domain.proto index e92a791d2..c827b802e 100644 --- a/protocols/example/shoppingcart/persistence/domain.proto +++ b/protocols/example/shoppingcart/persistence/domain.proto @@ -3,6 +3,8 @@ syntax = "proto3"; package com.example.shoppingcart.persistence; +option go_package = "persistence"; + message LineItem { string productId = 1; string name = 2; diff --git a/protocols/example/shoppingcart/shoppingcart.proto b/protocols/example/shoppingcart/shoppingcart.proto index a944bc59a..f7b3cf63b 100644 --- a/protocols/example/shoppingcart/shoppingcart.proto +++ b/protocols/example/shoppingcart/shoppingcart.proto @@ -8,6 +8,8 @@ import "google/api/http.proto"; package com.example.shoppingcart; +option go_package = "tck/shoppingcart"; + message AddLineItem { string user_id = 1 [(.cloudstate.entity_key) = true]; string product_id = 2; diff --git a/protocols/frontend/cloudstate/entity_key.proto b/protocols/frontend/cloudstate/entity_key.proto index 9cd044a54..d2081212e 100644 --- a/protocols/frontend/cloudstate/entity_key.proto +++ b/protocols/frontend/cloudstate/entity_key.proto @@ -23,6 +23,7 @@ import "google/protobuf/descriptor.proto"; package cloudstate; option java_package = "io.cloudstate"; +option go_package = "github.com/cloudstateio/go-support/cloudstate/;cloudstate"; extend google.protobuf.FieldOptions { bool entity_key = 50002; diff --git a/protocols/protocol/cloudstate/crdt.proto b/protocols/protocol/cloudstate/crdt.proto index 2c2bf3541..691242efa 100644 --- a/protocols/protocol/cloudstate/crdt.proto +++ b/protocols/protocol/cloudstate/crdt.proto @@ -24,7 +24,7 @@ import "google/protobuf/any.proto"; import "cloudstate/entity.proto"; option java_package = "io.cloudstate.protocol"; - +option go_package = "cloudstate/protocol"; // CRDT Protocol // diff --git a/protocols/protocol/cloudstate/entity.proto b/protocols/protocol/cloudstate/entity.proto index fa9a3296c..05cd7bef2 100644 --- a/protocols/protocol/cloudstate/entity.proto +++ b/protocols/protocol/cloudstate/entity.proto @@ -25,6 +25,7 @@ import "google/protobuf/empty.proto"; import "google/protobuf/descriptor.proto"; option java_package = "io.cloudstate.protocol"; +option go_package = "cloudstate/protocol"; // A reply to the sender. message Reply { diff --git a/protocols/protocol/cloudstate/event_sourced.proto b/protocols/protocol/cloudstate/event_sourced.proto index 2259ea02a..0417f140f 100644 --- a/protocols/protocol/cloudstate/event_sourced.proto +++ b/protocols/protocol/cloudstate/event_sourced.proto @@ -24,6 +24,7 @@ import "google/protobuf/any.proto"; import "cloudstate/entity.proto"; option java_package = "io.cloudstate.protocol"; +option go_package = "cloudstate/protocol"; // The init message. This will always be the first message sent to the entity when // it is loaded. diff --git a/protocols/protocol/cloudstate/function.proto b/protocols/protocol/cloudstate/function.proto index dedd7a4d9..3ca581b0b 100644 --- a/protocols/protocol/cloudstate/function.proto +++ b/protocols/protocol/cloudstate/function.proto @@ -24,6 +24,7 @@ import "google/protobuf/any.proto"; import "cloudstate/entity.proto"; option java_package = "io.cloudstate.protocol"; +option go_package = "cloudstate/protocol"; message FunctionCommand { // The name of the service this function is on. From e2bbd562090c7b4706023543a8687b9503e74249 Mon Sep 17 00:00:00 2001 From: Marcel Lanz Date: Sat, 12 Oct 2019 00:36:42 +0200 Subject: [PATCH 2/7] [go-support] paradox documentation for go-support. --- build.sbt | 4 +- .../developer/language-support/index.md | 1 + docs/src/main/paradox/user/lang/go/api.md | 3 + docs/src/main/paradox/user/lang/go/crdt.md | 6 + docs/src/main/paradox/user/lang/go/effects.md | 5 + .../main/paradox/user/lang/go/eventsourced.md | 93 +++ .../paradox/user/lang/go/gettingstarted.md | 78 +++ docs/src/main/paradox/user/lang/go/index.md | 16 + .../paradox/user/lang/go/serialization.md | 36 ++ .../main/paradox/user/lang/go/src/event.go | 98 +++ .../paradox/user/lang/go/src/eventsourced.go | 565 ++++++++++++++++++ .../paradox/user/lang/go/src/shoppingcart.go | 212 +++++++ docs/src/main/paradox/user/lang/index.md | 1 + 13 files changed, 1117 insertions(+), 1 deletion(-) create mode 100644 docs/src/main/paradox/user/lang/go/api.md create mode 100644 docs/src/main/paradox/user/lang/go/crdt.md create mode 100644 docs/src/main/paradox/user/lang/go/effects.md create mode 100644 docs/src/main/paradox/user/lang/go/eventsourced.md create mode 100644 docs/src/main/paradox/user/lang/go/gettingstarted.md create mode 100644 docs/src/main/paradox/user/lang/go/index.md create mode 100644 docs/src/main/paradox/user/lang/go/serialization.md create mode 100644 docs/src/main/paradox/user/lang/go/src/event.go create mode 100644 docs/src/main/paradox/user/lang/go/src/eventsourced.go create mode 100644 docs/src/main/paradox/user/lang/go/src/shoppingcart.go diff --git a/build.sbt b/build.sbt index 6e35bead3..1063588c4 100644 --- a/build.sbt +++ b/build.sbt @@ -131,7 +131,9 @@ lazy val docs = (project in file("docs")) "extref.jsdoc.base_url" -> ".../user/lang/javascript/api/module-cloudstate.%s", "cloudstate.version" -> "0.4.3", // hardcode, otherwise we'll end up with the wrong version in the docs "cloudstate.java-support.version" -> "0.4.3", - "cloudstate.node-support.version" -> "0.0.1" + "cloudstate.node-support.version" -> "0.0.1", + "cloudstate.go-support.version" -> "0.1.0", + "cloudstate.go.version" -> "1.13" ), paradoxNavigationDepth := 3, inConfig(Test)( diff --git a/docs/src/main/paradox/developer/language-support/index.md b/docs/src/main/paradox/developer/language-support/index.md index 565a1d4af..d3b9c72b1 100644 --- a/docs/src/main/paradox/developer/language-support/index.md +++ b/docs/src/main/paradox/developer/language-support/index.md @@ -7,6 +7,7 @@ This is achieved by having a gRPC based protocol between the Proxy and the User * Java * JavaScript +* Go ## Creating language support libraries diff --git a/docs/src/main/paradox/user/lang/go/api.md b/docs/src/main/paradox/user/lang/go/api.md new file mode 100644 index 000000000..0022a9cea --- /dev/null +++ b/docs/src/main/paradox/user/lang/go/api.md @@ -0,0 +1,3 @@ +# Go API docs + +The Go API docs can be found [here](https://godoc.org/github.com/cloudstateio/go-support). \ No newline at end of file diff --git a/docs/src/main/paradox/user/lang/go/crdt.md b/docs/src/main/paradox/user/lang/go/crdt.md new file mode 100644 index 000000000..5af6c3d60 --- /dev/null +++ b/docs/src/main/paradox/user/lang/go/crdt.md @@ -0,0 +1,6 @@ +# Conflict-free Replicated Data Types + +* Explain how to use the CRDT API +* Explain how to use CrdtFactory and where it comes from +* Explain how to handle streamed calls +* Explain the APIs for each different CRDT diff --git a/docs/src/main/paradox/user/lang/go/effects.md b/docs/src/main/paradox/user/lang/go/effects.md new file mode 100644 index 000000000..5600c8a0c --- /dev/null +++ b/docs/src/main/paradox/user/lang/go/effects.md @@ -0,0 +1,5 @@ +# Forwarding and effects + +* Explain the ServiceCallFactory interface +* Explain how to forward replies to another service. +* Explain how to emit effects. diff --git a/docs/src/main/paradox/user/lang/go/eventsourced.md b/docs/src/main/paradox/user/lang/go/eventsourced.md new file mode 100644 index 000000000..ca80964bf --- /dev/null +++ b/docs/src/main/paradox/user/lang/go/eventsourced.md @@ -0,0 +1,93 @@ +# Event sourcing + +This page documents how to implement CloudState event sourced entities in Go. For information on what CloudState event sourced entities are, please read the general @ref[Event sourcing](../../features/eventsourced.md) documentation first. + +An event sourced entity can be created by embedding the `cloudstate.EventEmitter` type and also implementing the `cloudstate.EntityInitializer` interface. + +@@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #entity-type } + +Then by composing the CloudState entity with an `cloudstate.EventSourcedEntity` and register it with `cloudState.Register()`, your entity gets configured to be an event sourced entity and handled by the CloudState instance for now on. + +@@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/eventsourced.go) { #event-sourced-entity-type } + +The `PersistenceID` is used to namespace events in the journal, useful for when you share the same database between multiple entities. It defaults to the simple name for the entity type (in this case, `ShoppingCart`), it's good practice to select one explicitly, this means your database isn't depend on type names in your code. + +The `SnapshotEvery` parameter controls how often snapshots are taken, so that the entity doesn't need to be recovered from the whole journal each time it's loaded. If left unset, it defaults to 100. Setting it to a negative number will result in snapshots never being taken. + +## Persistence types and serialization + +Event sourced entities persist events and snapshots, and these need to be serialized when persisted. The most straight forward way to persist events and snapshots is to use protobufs. CloudState will automatically detect if an emitted event is a protobuf, and serialize it as such. For other serialization options, including JSON, see @ref:[Serialization](serialization.md). + +While protobufs are the recommended format for persisting events, it is recommended that you do not persist your services protobuf messages, rather, you should create new messages, even if they are identical to the services. While this may introduce some overhead in needing to convert from one type to the other, the reason for doing this is that it will allow the services public interface to evolve independently from its data storage format, which should be private. + +For our shopping cart example, we'll create a new file called `domain.proto`, the name domain is selected to indicate that these are my applications domain objects: + +@@snip [domain.proto](/docs/src/test/proto/domain.proto) + +## State + +Each entity should store its state locally in a mutable variable, either a mutable field or a multiple structure such as an array type or slice. For our shopping cart, the state is a slice of products, so we'll create a slice of LineItems to contain that: + +@@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #entity-state } + +## Constructing + +The CloudState Go Support Library needs to know how to construct and initialize entities. For this, an entity has to implement the `cloudstate.EntityInitializer` interface. + +(TODO: provide: The constructor below shows having the entity id injected) + +@@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #constructing } + +## Handling commands + +Command handlers are declared by implementing the gRPC ShoppingCartServer interface which is generated from the protobuf definitions. The CloudState Go Support library together with the registered ServiceName in the `cloudstate.EventSourcedEntity` is then able to dispatch commands it gets from the CloudState proxy. + +The return type of the command handler is by definition of the service interface, the output type for the gRPC service call, this will be sent as the reply. + +The following shows the implementation of the `GetCart` command handler. This command handler is a read-only command handler, it doesn't emit any events, it just returns some state: + +@@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #get-cart } + +### Emitting events + +Commands that modify the state may do so by emitting events. + +@@@ warning +The **only** way a command handler may modify its state is by emitting an event. Any modifications made directly to the state from the command handler will not be persisted, and when the entity is passivated and next reloaded, those modifications will not be present. +@@@ + +A command handler may emit an event by using the embedded `cloudstate.EventEmitter` and invoking the `Emit` method on it. Calling `Emit` will immediately invoke the associated event handler for that event - this both validates that the event can be applied to the current state, as well as updates the state so that subsequent processing in the command handler can use it. + +Here's an example of a command handler that emits an event: + +@@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #add-item } + +This command handler also validates the command, ensuring the quantity items added is greater than zero. Returning a `error` fails the command and the support library takes care of signaling that back to the requesting proxy as a `Failure` reply. + +## Handling events + +Event handlers are invoked at two points, when restoring entities from the journal, before any commands are handled, and each time a new event is emitted. An event handlers responsibility is to update the state of the entity according to the event. Event handlers are the only place where its safe to mutate the state of the entity at all. + +Event handlers are declared by either implementing the `cloudstate.EventHandler` interface + +@@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/event.go) { #event-handler } + +or implementing an unary method that matches the type of the event to be handled. Event handlers are differentiated by the type of event they handle. By default, the type of event an event handler handles will be determined by looking for a single argument that the event handler takes. If for any reason this needs to be overridden, or if the event handler method doesn't exists at all, the event is handed over to the `cloudstate.EventHandler` `Handle` method when the entity implements that interface. The by implementing the `HandleEvent(event interface{}) (handled bool, err error)` method, a event handler indicates if he handled the event or if any occurred, returns an error. The returned error has precedent and the handled flag would not be considered. + +Here's an example event handler for the `ItemAdded` event. + +@@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #item-added } + +## Producing and handling snapshots + +## Multiple behaviors + +Multiple behaviors are not supported yet by the Go support library. + +## Registering the entity + +Once you've created your entity, you can register it with the `cloudstate.CloudState` server, by invoking the `Register` method of an CloudState instance. In addition to passing your entity type and service name, you also need to pass any descriptors that you use for persisting events, for example, the `domain.proto` descriptor. + +During registration the oprtional ServiceName and the ServiceVersion can be configured as Options. + +@@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #register } \ No newline at end of file diff --git a/docs/src/main/paradox/user/lang/go/gettingstarted.md b/docs/src/main/paradox/user/lang/go/gettingstarted.md new file mode 100644 index 000000000..6e90e0324 --- /dev/null +++ b/docs/src/main/paradox/user/lang/go/gettingstarted.md @@ -0,0 +1,78 @@ +# Getting started with stateful services in Go + +## Prerequisites + +Go version +: CloudState Go support requires at least Go $cloudstate.go.version$ + +Build tool +: CloudState does not require any particular build tool, you can select your own. + +protoc +: Since CloudState is based on gRPC, you need a protoc compiler to compile gRPC protobuf descriptors. This can be done manually through the [Protocol Buffer Compiler project](https://github.com/protocolbuffers/protobuf#protocol-compiler-installation). + +docker +: CloudState runs in Kubernetes with [Docker](https://www.docker.com), hence you will need Docker to build a container that you can deploy to Kubernetes. Most popular build tools have plugins that assist in building Docker images. + +In addition to the above, you will need to install the CloudState Go support library by issuing `go get -u github.com/cloudstateio/go-support` or with Go module support let the dependency be downloaded by `go [build|run|test]`. + +By using the Go module support your go.mod file will reference the latest version of the support library or you can define which version you like to use. + +go get +: @@@vars +```text +go get -u github.com/cloudstateio/go-support +``` +@@@ + +import path +: @@@vars +```text +import "github.com/cloudstateio/go-support" +``` +@@@ + +go.mod +: @@@vars +``` +module example.com/yourpackage + require ( + github.com/cloudstateio/go-support $cloudstate.go-support.version$ + ) +go $cloudstate.go.version$ +``` +@@@ + +## Protobuf files + +The CloudState Go Support Library provides no dedicated tool beside the protoc compiler to build your protobuf files. The CloudState protocol protobuf files as well as the shopping cart example application protobuf files are provided by the CloudState Repository. + +In addition to the protoc compiler, the gRPC Go plugin is needed to compile the protobuf file to *.pb.go files. Please follow the instructions at the [Go support for Protocol Buffers](https://github.com/golang/protobuf) project page to install the protoc compiler as well as the `protoc-gen-go` plugin which also includes the Google standard protobuf types. + +To build the example shopping cart application shown earlier in @ref:[gRPC descriptors](../../features/grpc.md), you could simply paste that protobuf into `protos/shoppingcart.proto`. You may wish to also define the Go package using the `go_package` proto option, to ensure the package name used conforms to Go package naming conventions + +```proto +option go_package = "example/shoppingcart"; +``` + +Now if you place your protobuf files under protobuf/ and run `protoc --go_out=. --proto_path=protobuf ./protobuf/*.proto`, you'll find your generated protobuf files in `example/shoppingcart`. + +## Creating and starting a server + +## Creating a main package + +Your main package will be responsible for creating the CloudState gRPC server, registering the entities for it to serve, and starting it. To do this, you can use the CloudState server type, for example: + +@@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #shopping-cart-main } + +We will see more details on registering entities in the coming pages. + +## Interfaces to be implemented + +CloudState entities in Go work by implementing interfaces and composing types. + +To get support for the CloudState event emission the CloudState entity should embed the `cloudstate.EventEmitter` type. The EventEmitter allows the entity to emit events during the handling of commands. + +Second, by implementing the `cloudstate.EntityInitializer` interface with its `New()` method, a CloudState instance gets to know how to create and initialize an event sourced entity. + +@@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #compose-entity } diff --git a/docs/src/main/paradox/user/lang/go/index.md b/docs/src/main/paradox/user/lang/go/index.md new file mode 100644 index 000000000..1142b7766 --- /dev/null +++ b/docs/src/main/paradox/user/lang/go/index.md @@ -0,0 +1,16 @@ +# Go + +CloudState offers an idiomatic, annotation based Go support library for writing stateful services. + +@@toc { depth=1 } + +@@@ index + +* [Getting started](gettingstarted.md) +* [Event sourcing](eventsourced.md) +* [Conflict-free Replicated Data Types](crdt.md) +* [Forwarding and effects](effects.md) +* [Serialization](serialization.md) +* [API docs](api.md) + +@@@ diff --git a/docs/src/main/paradox/user/lang/go/serialization.md b/docs/src/main/paradox/user/lang/go/serialization.md new file mode 100644 index 000000000..456e4397d --- /dev/null +++ b/docs/src/main/paradox/user/lang/go/serialization.md @@ -0,0 +1,36 @@ +# Serialization + +CloudState functions serve gRPC interfaces, and naturally the input messages and output messages are protobuf messages that get serialized to the protobuf wire format. However, in addition to these messages, there are a number of places where CloudState needs to serialize other objects, for persistence and replication. This includes: + +* Event sourced @ref[events and snapshots](eventsourced.md#persistence-types-and-serialization). +* CRDT @ref[map keys and set elements](crdt.md), and @ref[LWWRegister values](crdt.md). + +CloudState supports a number of types and serialization options for these values. + +## Primitive types + +CloudState supports serializing the following primitive types: + +| Protobuf type | Go type | +|---------------|-------------| +| string | string | +| bytes | []byte | +| int32 | int32 | +| int64 | int64 | +| float | float32 | +| double | float64 | +| bool | bool | + +The details of how these are serialized can be found @ref[here](../../../developer/language-support/serialization.md#primitive-values). + +@@@ note { title=Important } +Go has a set of [predeclared numeric](https://golang.org/ref/spec#Numeric_types) types with implementation-specific sizes. One of them is `int` which would be an int64 on 64-bit systems CPU architectures. CloudState does not support implicit conversion between an `int` and the corresponding `int64` as an input type for the serialization. The main reason not to support it is, that an `int` is not the same type as an `int64` and therefore a de-serialized value would have to be converted back to an `int` as it is of type `int64` during its serialized state. +@@@ + +## JSON + +CloudState uses the standard library package [`encoding/json`](https://golang.org/pkg/encoding/json/) to serialize JSON. Any type that has a field declared with a string literal tag ``json:"fieldname"`` will be serialized to and from JSON using the [Marshaller and Unmarshaller](https://golang.org/pkg/encoding/json/#Marshal) from the Go standard library package `encoding/json`. + +The details of how these are serialized can be found @ref[here](../../../developer/language-support/serialization.md#json-values). + +Note that if you are using JSON values in CRDT sets or maps, the serialization of these values **must** be stable. This means you must not use maps or sets in your value, and you should define an explicit ordering for the fields in your objects. **(TODO: mention the ordering of fields here by the Go standard library implementation).** diff --git a/docs/src/main/paradox/user/lang/go/src/event.go b/docs/src/main/paradox/user/lang/go/src/event.go new file mode 100644 index 000000000..af7bf745f --- /dev/null +++ b/docs/src/main/paradox/user/lang/go/src/event.go @@ -0,0 +1,98 @@ +// +// Copyright 2019 Lightbend Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloudstate + +import "fmt" + +type OnNext func(event interface{}) error +type OnErr func(err error) +type Subscription struct { + OnNext + OnErr + active bool +} + +func (s *Subscription) Unsubscribe() { + s.active = false +} + +type EventEmitter interface { + Emit(event interface{}) + Subscribe(subs *Subscription) *Subscription + Events() []interface{} + Clear() +} + +func NewEmitter() *eventEmitter { + return &eventEmitter{ + events: make([]interface{}, 0), + subscriptions: make([]*Subscription, 0), + } +} + +type eventEmitter struct { + events []interface{} + subscriptions []*Subscription +} + +// Emit will immediately invoke the associated event handler for that event. +// This both validates that the event can be applied to the current state, as well as +// updates the state so that subsequent processing in the command handler can use it. +func (e *eventEmitter) Emit(event interface{}) { + for _, subs := range e.subscriptions { + if !subs.active { + continue + } + err := subs.OnNext(event) + if r := recover(); r != nil { + subs.OnErr(fmt.Errorf("panicked with: %v", r)) + continue + } + if err != nil && subs.OnErr != nil { + subs.OnErr(err) + // TODO: we have no context here to fail to the proxy + } + } + e.events = append(e.events, event) +} + +func (e *eventEmitter) Events() []interface{} { + return e.events +} + +func (e *eventEmitter) Subscribe(subs *Subscription) *Subscription { + subs.active = true + e.subscriptions = append(e.subscriptions, subs) + return subs +} + +func (e *eventEmitter) Clear() { + e.events = make([]interface{}, 0) +} + +//#event-handler +type EventHandler interface { + HandleEvent(event interface{}) (handled bool, err error) +} +//#event-handler + +type Snapshotter interface { + Snapshot() (snapshot interface{}, err error) +} + +type SnapshotHandler interface { + HandleSnapshot(snapshot interface{}) (handled bool, err error) +} \ No newline at end of file diff --git a/docs/src/main/paradox/user/lang/go/src/eventsourced.go b/docs/src/main/paradox/user/lang/go/src/eventsourced.go new file mode 100644 index 000000000..616b2a49b --- /dev/null +++ b/docs/src/main/paradox/user/lang/go/src/eventsourced.go @@ -0,0 +1,565 @@ +// +// Copyright 2019 Lightbend Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloudstate + +import ( + "context" + "errors" + "fmt" + "github.com/cloudstateio/go-support/cloudstate/encoding" + "github.com/cloudstateio/go-support/cloudstate/protocol" + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/any" + "io" + "net/url" + "reflect" + "strings" + "sync" +) + +// Entity +type Entity interface { + EntityInitializer +} + +// An EntityInitializer knows how to initialize an Entity +type EntityInitializer interface { + New() interface{} +} + +const snapshotEveryDefault = 100 + +// EventSourcedEntity captures an Entity, its ServiceName and PersistenceID. +// It is used to be registered as an event sourced entity on a CloudState instance. +//#event-sourced-entity-type +type EventSourcedEntity struct { + // Entity is a nil or Zero-Initialized reference + // to the entity to be event sourced. It has to + // implement the EntityInitializer interface + // so that CloudState can create new entity instances. + Entity Entity + // ServiceName is used to… + // Setting it is optional. + ServiceName string + // PersistenceID is used to namespace events in the journal, useful for + // when you share the same database between multiple entities. It defaults to + // the simple name for the entity type. + // It’s good practice to select one explicitly, this means your database + // isn’t depend on type names in your code. + // Setting it is optional. + PersistenceID string + // The snapshotEvery parameter controls how often snapshots are taken, + // so that the entity doesn't need to be recovered from the whole journal + // each time it’s loaded. If left unset, it defaults to 100. + // Setting it to a negative number will result in snapshots never being taken. + SnapshotEvery int64 + + // internal + entityName string + registerOnce sync.Once +} +//#event-sourced-entity-type + +// initZeroValue get its Entity type and Zero-Value it to +// something we can use as an initializer. +func (e *EventSourcedEntity) initZeroValue() error { + if reflect.ValueOf(e.Entity).IsNil() { + t := reflect.TypeOf(e.Entity) + if t.Kind() == reflect.Ptr { // TODO: how deep can that go? + t = t.Elem() + } + value := reflect.New(t).Interface() + if ei, ok := value.(EntityInitializer); ok { + e.Entity = ei + } else { + return errors.New("the Entity does not implement EntityInitializer") + } + e.entityName = t.Name() + e.SnapshotEvery = snapshotEveryDefault + } + return nil +} + +// The EntityInstance represents a concrete instance of +// a event sourced entity +type EntityInstance struct { + // Instance is an instance of the EventSourcedEntity.Entity + Instance interface{} + // EventSourcedEntity describes the instance + EventSourcedEntity *EventSourcedEntity + + eventSequence int64 +} + +func (e *EntityInstance) shouldSnapshot() bool { + return e.eventSequence >= e.EventSourcedEntity.SnapshotEvery +} + +func (e *EntityInstance) resetSnapshotEvery() { + e.eventSequence = 0 +} + +// A EntityInstanceContext represents a event sourced entity together with its +// associated service. +// Commands are dispatched through this context. +type EntityInstanceContext struct { // TODO: EntityInstanceContext might be actually a EntityInstance + // EntityInstance is the entity instance of this context + EntityInstance *EntityInstance + // active indicates if this context is active + active bool // TODO: inactivate a context in case of errors +} + +// ServiceName returns the contexts service name. +func (c EntityInstanceContext) ServiceName() string { + return c.EntityInstance.EventSourcedEntity.ServiceName +} + +// EventSourcedHandler is the implementation of the EventSourcedHandler server API for EventSourced service. +type EventSourcedHandler struct { + // entities are indexed by their service name + entities map[string]*EventSourcedEntity + // contexts are entity instance contexts indexed by their entity ids + contexts map[string]*EntityInstanceContext + // cmdMethodCache is the command handler method cache + cmdMethodCache map[string]reflect.Method +} + +// NewEventSourcedHandler returns an initialized EventSourcedHandler +func NewEventSourcedHandler() *EventSourcedHandler { + return &EventSourcedHandler{ + entities: make(map[string]*EventSourcedEntity), + contexts: make(map[string]*EntityInstanceContext), + cmdMethodCache: make(map[string]reflect.Method), + } +} + +func (esh *EventSourcedHandler) registerEntity(ese *EventSourcedEntity) error { + esh.entities[ese.ServiceName] = ese + return nil +} + +// Handle +// from EventSourcedServer.Handle +// The stream. One stream will be established per active entity. +// Once established, the first message sent will be Init, which contains the entity ID, and, +// if the entity has previously persisted a snapshot, it will contain that snapshot. It will +// then send zero to many event messages, one for each event previously persisted. The entity +// is expected to apply these to its state in a deterministic fashion. Once all the events +// are sent, one to many commands are sent, with new commands being sent as new requests for +// the entity come in. The entity is expected to reply to each command with exactly one reply +// message. The entity should reply in order, and any events that the entity requests to be +// persisted the entity should handle itself, applying them to its own state, as if they had +// arrived as events when the event stream was being replayed on load. +func (esh *EventSourcedHandler) Handle(server protocol.EventSourced_HandleServer) error { + var entityId string + var failed error + for { + if failed != nil { + return failed + } + msg, recvErr := server.Recv() + if recvErr == io.EOF { + return nil + } + if recvErr != nil { + return recvErr + } + if cmd := msg.GetCommand(); cmd != nil { + if err := esh.handleCommand(cmd, server); err != nil { + // TODO: in general, what happens with the stream here if an error happens? + failed = handleFailure(err, server, cmd.GetId()) + } + continue + } + if event := msg.GetEvent(); event != nil { + // TODO spec: Why does command carry the entityId and an event not? + if err := esh.handleEvent(entityId, event); err != nil { + failed = handleFailure(err, server, 0) + } + continue + } + if init := msg.GetInit(); init != nil { + if err := esh.handleInit(init, server); err != nil { + failed = handleFailure(err, server, 0) + } + entityId = init.GetEntityId() + continue + } + } +} + +func (esh *EventSourcedHandler) handleInit(init *protocol.EventSourcedInit, server protocol.EventSourced_HandleServer) error { + eid := init.GetEntityId() + if _, present := esh.contexts[eid]; present { + return NewFailureError("unable to server.Send") + } + entity := esh.entities[init.GetServiceName()] + if initializer, ok := entity.Entity.(EntityInitializer); ok { + instance := initializer.New() + esh.contexts[eid] = &EntityInstanceContext{ + EntityInstance: &EntityInstance{ + Instance: instance, + EventSourcedEntity: entity, + }, + active: true, + } + } else { + return fmt.Errorf("unable to handle init entity.Entity does not implement EntityInitializer") + } + + if err := esh.handleInitSnapshot(init); err != nil { + return NewFailureError("unable to server.Send. %w", err) + } + esh.subscribeEvents(esh.contexts[eid].EntityInstance) + return nil +} + +func (esh *EventSourcedHandler) handleInitSnapshot(init *protocol.EventSourcedInit) error { + if init.Snapshot == nil { + return nil + } + entityId := init.GetEntityId() + if snapshotHandler, ok := esh.contexts[entityId].EntityInstance.Instance.(SnapshotHandler); ok { + snapshot, err := esh.unmarshalSnapshot(init) + if snapshot == nil || err != nil { + return NewFailureError("handling snapshot failed with: %v", err) + } + handled, err := snapshotHandler.HandleSnapshot(snapshot) + if err != nil { + return NewFailureError("handling snapshot failed with: %v", err) + } + if handled { + esh.contexts[entityId].EntityInstance.eventSequence = init.GetSnapshot().SnapshotSequence + } + return nil + } + return nil +} + +func (EventSourcedHandler) unmarshalSnapshot(init *protocol.EventSourcedInit) (interface{}, error) { + // see: https://developers.google.com/protocol-buffers/docs/reference/csharp/class/google/protobuf/well-known-types/any#typeurl + typeUrl := init.Snapshot.Snapshot.GetTypeUrl() + if !strings.Contains(typeUrl, "://") { + typeUrl = "https://" + typeUrl + } + typeURL, err := url.Parse(typeUrl) + if err != nil { + return nil, err + } + switch typeURL.Host { + case encoding.PrimitiveTypeURLPrefix: + snapshot, err := encoding.UnmarshalPrimitive(init.Snapshot.Snapshot) + if err != nil { + return nil, fmt.Errorf("unmarshalling snapshot failed with: %v", err) + } + return snapshot, nil + case protoAnyBase: + msgName := strings.TrimPrefix(init.Snapshot.Snapshot.GetTypeUrl(), protoAnyBase+"/") // TODO: this might be something else than a proto message + messageType := proto.MessageType(msgName) + if messageType.Kind() == reflect.Ptr { + if message, ok := reflect.New(messageType.Elem()).Interface().(proto.Message); ok { + err := proto.Unmarshal(init.Snapshot.Snapshot.Value, message) + if err != nil { + return nil, fmt.Errorf("unmarshalling snapshot failed with: %v", err) + } + return message, nil + } + } + } + return nil, fmt.Errorf("unmarshalling snapshot failed with: no snapshot unmarshaller found for: %v", typeURL.String()) +} + +func (esh *EventSourcedHandler) subscribeEvents(instance *EntityInstance) { + if emitter, ok := instance.Instance.(EventEmitter); ok { + emitter.Subscribe(&Subscription{ + OnNext: func(event interface{}) error { + err := esh.applyEvent(instance, event) + if err == nil { + instance.eventSequence++ + } + return err + }, + OnErr: func(err error) { + }, // TODO: investigate what to report to the proxy + }) + } +} + +func (esh *EventSourcedHandler) handleEvent(entityId string, event *protocol.EventSourcedEvent) error { + if entityId == "" { + return NewFailureError("no entityId was found from a previous init message for event sequence: %v", event.Sequence) + } + entityContext := esh.contexts[entityId] + if entityContext == nil { + return NewFailureError("no entity with entityId registered: %v", entityId) + } + err := esh.handleEvents(entityContext.EntityInstance, event) + if err != nil { + return NewFailureError("handle event failed: %v", err) + } + return err +} + +// handleCommand handles a command received from the CloudState proxy. +// +// TODO: remove these following lines of comment +// "Unary RPCs where the client sends a single request to the server and +// gets a single response back, just like a normal function call." are supported right now. +// +// to handle a command we need +// - the entity id, which identifies the entity (its instance) uniquely(?) for this user function instance +// - the service name, like "com.example.shoppingcart.ShoppingCart" +// - a command id +// - a command name, which is one of the gRPC service rpcs defined by this entities service +// - the command payload, which is the message sent for the command as a protobuf.Any blob +// - a streamed flag, (TODO: for what?) +// +// together, these properties allow to call a method of the entities registered service and +// return its response as a reply to the CloudState proxy. +// +// Events: +// Beside calling the service method, we have to collect "events" the service might emit. +// These events afterwards have to be handled by a EventHandler to update the state of the +// entity. The CloudState proxy can re-play these events at any time +func (esh *EventSourcedHandler) handleCommand(cmd *protocol.Command, server protocol.EventSourced_HandleServer) error { + // method to call + method, err := esh.methodToCall(cmd) + if err != nil { + return NewProtocolFailure(protocol.Failure{ + CommandId: cmd.GetId(), + Description: err.Error(), + }) + } + entityContext := esh.contexts[cmd.GetEntityId()] + // build the input arguments for the method we're about to call + inputs, err := esh.buildInputs(entityContext, method, cmd, server.Context()) + if err != nil { + return NewProtocolFailure(protocol.Failure{ + CommandId: cmd.GetId(), + Description: err.Error(), + }) + } + // call it + called := method.Func.Call(inputs) + // The gRPC implementation returns the rpc return method + // and an error as a second return value. + errReturned := called[1] + if errReturned.CanInterface() && errReturned.Interface() != nil && errReturned.Type().Name() == "error" { + // TCK says: TODO Expects entity.Failure, but gets lientAction.Action.Failure(Failure(commandId, msg))) + return NewProtocolFailure(protocol.Failure{ + CommandId: cmd.GetId(), + Description: errReturned.Interface().(error).Error(), + }) + } + // the reply + callReply, err := marshalAny(called[0].Interface()) + if err != nil { // this should never happen + return NewProtocolFailure(protocol.Failure{ + CommandId: cmd.GetId(), + Description: fmt.Errorf("called return value at index 0 is no proto.Message. %w", err).Error(), + }) + } + // emitted events + events, err := marshalEventsAny(entityContext) + if err != nil { + return NewProtocolFailure(protocol.Failure{ + CommandId: cmd.GetId(), + Description: err.Error(), + }) + } + // snapshot + snapshot, err := esh.handleSnapshots(entityContext) + if err != nil { + return NewProtocolFailure(protocol.Failure{ + CommandId: cmd.GetId(), + Description: err.Error(), + }) + } + return sendEventSourcedReply(&protocol.EventSourcedReply{ + CommandId: cmd.GetId(), + ClientAction: &protocol.ClientAction{ + Action: &protocol.ClientAction_Reply{ + Reply: &protocol.Reply{ + Payload: callReply, + }, + }, + }, + Events: events, + Snapshot: snapshot, + }, server) +} + +func (*EventSourcedHandler) buildInputs(entityContext *EntityInstanceContext, method reflect.Method, cmd *protocol.Command, ctx context.Context) ([]reflect.Value, error) { + inputs := make([]reflect.Value, method.Type.NumIn()) + inputs[0] = reflect.ValueOf(entityContext.EntityInstance.Instance) + inputs[1] = reflect.ValueOf(ctx) + // create a zero-value for the type of the message we call the method with + arg1 := method.Type.In(2) + ptr := false + for arg1.Kind() == reflect.Ptr { + ptr = true + arg1 = arg1.Elem() + } + var msg proto.Message + if ptr { + msg = reflect.New(arg1).Interface().(proto.Message) + } else { + msg = reflect.Zero(arg1).Interface().(proto.Message) + } + if err := proto.Unmarshal(cmd.GetPayload().GetValue(), msg); err != nil { + return nil, fmt.Errorf("failed to unmarshal: %w", err) + } + inputs[2] = reflect.ValueOf(msg) + return inputs, nil +} + +func (esh *EventSourcedHandler) methodToCall(cmd *protocol.Command) (reflect.Method, error) { + entityContext := esh.contexts[cmd.GetEntityId()] + cacheKey := entityContext.ServiceName() + cmd.Name + method, hit := esh.cmdMethodCache[cacheKey] + // as measured this cache saves us about 75% of a call + // to be prepared with 4.4µs vs. 17.6µs where a typical + // call by reflection like GetCart() with Func.Call() + // takes ~10µs and to get return values processed somewhere 0.7µs. + if !hit { + entityValue := reflect.ValueOf(entityContext.EntityInstance.Instance) + // entities implement the proxied grpc service + // we try to find the method we're called by name with the + // received command. + methodByName := entityValue.MethodByName(cmd.Name) + if !methodByName.IsValid() { + entity := esh.entities[entityContext.ServiceName()] + return reflect.Method{}, fmt.Errorf("no method named: %s found for: %v", cmd.Name, entity) + } + // gRPC services are unary rpc methods, always. + // They have one message in and one message out. + if err := checkUnary(methodByName); err != nil { + return reflect.Method{}, err + } + // The first argument in the gRPC implementation + // is always a context.Context. + methodArg0Type := methodByName.Type().In(0) + contextType := reflect.TypeOf(context.Background()) + if !contextType.Implements(methodArg0Type) { + return reflect.Method{}, fmt.Errorf( + "first argument for method: %s is not of type: %s", + methodByName.String(), contextType.Name(), + ) + } + // we'll find one for sure as we found one on the entityValue + method, _ = reflect.TypeOf(entityContext.EntityInstance.Instance).MethodByName(cmd.Name) + esh.cmdMethodCache[cacheKey] = method + } + return method, nil +} + +func (*EventSourcedHandler) handleSnapshots(entityContext *EntityInstanceContext) (*any.Any, error) { + if !entityContext.EntityInstance.shouldSnapshot() { + return nil, nil + } + if snapshotter, canSnapshot := entityContext.EntityInstance.Instance.(Snapshotter); canSnapshot { + snap, err := snapshotter.Snapshot() + if err != nil { + return nil, fmt.Errorf("getting a snapshot has failed: %v. %w", err, ErrFailure) + } + // TODO: we expect a proto.Message but should support other formats + snapshot, err := marshalAny(snap) + if err != nil { + return nil, err + } + entityContext.EntityInstance.resetSnapshotEvery() + return snapshot, nil + } else { + // TODO: every entity should implement snapshotting, right? + } + return nil, nil +} + +func checkUnary(methodByName reflect.Value) error { + if methodByName.Type().NumIn() != 2 { + return NewFailureError("method: %s is no unary method", methodByName.String()) + } + return nil +} + +// applyEvent applies an event to a local entity +func (esh EventSourcedHandler) applyEvent(entityInstance *EntityInstance, event interface{}) error { + payload, err := marshalAny(event) + if err != nil { + return err + } + return esh.handleEvents(entityInstance, &protocol.EventSourcedEvent{Payload: payload}) +} + +// handleEvents handles a list of events encoded as protobuf Any messages. +// +// Event sourced entities persist events and snapshots, and these need to be +// serialized when persisted. The most straight forward way to persist events +// and snapshots is to use protobufs. CloudState will automatically detect if +// an emitted event is a protobuf, and serialize it as such. For other +// serialization options, including JSON, see Serialization. +func (EventSourcedHandler) handleEvents(entityInstance *EntityInstance, events ...*protocol.EventSourcedEvent) error { + eventHandler, implementsEventHandler := entityInstance.Instance.(EventHandler) + for _, event := range events { + // TODO: here's the point where events can be protobufs, serialized as json or other formats + msgName := strings.TrimPrefix(event.Payload.GetTypeUrl(), protoAnyBase+"/") + messageType := proto.MessageType(msgName) + + if messageType.Kind() == reflect.Ptr { + // get a zero-ed message of this type + if message, ok := reflect.New(messageType.Elem()).Interface().(proto.Message); ok { + // and marshal onto it what we got as an any.Any onto it + err := proto.Unmarshal(event.Payload.Value, message) + if err != nil { + return fmt.Errorf("%s, %w", err, ErrMarshal) + } else { + // we're ready to handle the proto message + // and we might have a handler + handled := false + if implementsEventHandler { + handled, err = eventHandler.HandleEvent(message) + if err != nil { + return err // FIXME/TODO: is this correct? if we fail here, nothing is safe afterwards. + } + } + // if not, we try to find one + // currently we support a method that has one argument that equals + // to the type of the message received. + if !handled { + // find a concrete handling method + entityValue := reflect.ValueOf(entityInstance.Instance) + entityType := entityValue.Type() + for n := 0; n < entityType.NumMethod(); n++ { + method := entityType.Method(n) + // we expect one argument for now, the domain message + // the first argument is the receiver itself + if method.Func.Type().NumIn() == 2 { + argumentType := method.Func.Type().In(1) + if argumentType.AssignableTo(messageType) { + entityValue.MethodByName(method.Name).Call([]reflect.Value{reflect.ValueOf(message)}) + } + } else { + // we have not found a one-argument method matching the events type as an argument + // TODO: what to do here? we might support more variations of possible handlers we can detect + } + } + } + } + } + } // TODO: what do we do if we haven't handled the events? + } + return nil +} diff --git a/docs/src/main/paradox/user/lang/go/src/shoppingcart.go b/docs/src/main/paradox/user/lang/go/src/shoppingcart.go new file mode 100644 index 000000000..d22a7c153 --- /dev/null +++ b/docs/src/main/paradox/user/lang/go/src/shoppingcart.go @@ -0,0 +1,212 @@ +// +// Copyright 2019 Lightbend Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package main implements an event sourced entity shopping cart example +package main + +import ( + "context" + "errors" + "fmt" + "github.com/cloudstateio/go-support/cloudstate" + "github.com/cloudstateio/go-support/tck/shoppingcart" + domain "github.com/cloudstateio/go-support/tck/shoppingcart/persistence" + "github.com/golang/protobuf/ptypes/empty" + "log" +) + +// main creates a CloudState instance and registers the ShoppingCart +// as a event sourced entity. +//#shopping-cart-main +func main() { + cloudState := cloudstate.NewCloudState(&cloudstate.Options{ + ServiceName: "shopping-cart", + ServiceVersion: "0.1.0", + }) +//#register + err := cloudState.Register( + &cloudstate.EventSourcedEntity{ + Entity: (*ShoppingCart)(nil), + ServiceName: "com.example.shoppingcart.ShoppingCart", + }, + cloudstate.DescriptorConfig{ + Service: "shoppingcart/shoppingcart.proto", + }.AddDomainDescriptor("domain.proto"), + ) +//#register + if err != nil { + log.Fatalf("CloudState failed to register entity: %v", err) + } + err = cloudState.Run() + if err != nil { + log.Fatalf("CloudState failed to run: %v", err) + } +} +//#shopping-cart-main + +// A CloudState event sourced entity. +//#entity-type +//#compose-entity +type ShoppingCart struct { + // our domain object + //#entity-state + cart []*domain.LineItem + //#entity-state + // as an Emitter we can emit events + cloudstate.EventEmitter +} +//#entity-type + +// New implements EntityInitializer and returns a new +// and initialized instance of the ShoppingCart entity. +//#constructing +func (sc ShoppingCart) New() interface{} { + return NewShoppingCart() +} +//#constructing + +// NewShoppingCart returns a new and initialized +// instance of the ShoppingCart entity. +func NewShoppingCart() *ShoppingCart { + return &ShoppingCart{ + cart: make([]*domain.LineItem, 0), + EventEmitter: cloudstate.NewEmitter(), // TODO: the EventEmitter could be provided by the event sourced handler + } +} +//#compose-entity + +// ItemAdded is a event handler function for the ItemAdded event. +//#item-added +func (sc *ShoppingCart) ItemAdded(added *domain.ItemAdded) error { // TODO: enable handling for values + if item, _ := sc.find(added.Item.ProductId); item != nil { + item.Quantity += added.Item.Quantity + } else { + sc.cart = append(sc.cart, &domain.LineItem{ + ProductId: added.Item.ProductId, + Name: added.Item.Name, + Quantity: added.Item.Quantity, + }) + } + return nil +} +//#item-added + +// ItemRemoved is a event handler function for the ItemRemoved event. +func (sc *ShoppingCart) ItemRemoved(removed *domain.ItemRemoved) error { + if !sc.remove(removed.ProductId) { + // this should never happen + return errors.New("unable to remove product") + } + return nil +} + +// Handle lets us handle events by ourselves. +// +// returns handle set to true if we have handled the event +// and any error that happened during the handling +func (sc *ShoppingCart) HandleEvent(event interface{}) (handled bool, err error) { + switch e := event.(type) { + case *domain.ItemAdded: + return true, sc.ItemAdded(e) + //case *domain.ItemRemoved: + // *domain.ItemRemoved is handled by reflection + default: + return false, nil + } +} + +// AddItem implements the AddItem command handling of the shopping cart service. +//#add-item +func (sc *ShoppingCart) AddItem(c context.Context, li *shoppingcart.AddLineItem) (*empty.Empty, error) { + if li.GetQuantity() <= 0 { + return nil, fmt.Errorf("cannot add negative quantity of to item %s", li.GetProductId()) + } + sc.Emit(&domain.ItemAdded{ + Item: &domain.LineItem{ + ProductId: li.ProductId, + Name: li.Name, + Quantity: li.Quantity, + }}) + return &empty.Empty{}, nil +} +//#add-item + +// RemoveItem implements the RemoveItem command handling of the shopping cart service. +func (sc *ShoppingCart) RemoveItem(c context.Context, li *shoppingcart.RemoveLineItem) (*empty.Empty, error) { + if item, _ := sc.find(li.GetProductId()); item == nil { + return nil, fmt.Errorf("cannot remove item %s because it is not in the cart", li.GetProductId()) + } + sc.Emit(&domain.ItemRemoved{ProductId: li.ProductId}) + return &empty.Empty{}, nil +} + +// GetCart implements the GetCart command handling of the shopping cart service. +//#get-cart +func (sc *ShoppingCart) GetCart(c context.Context, _ *shoppingcart.GetShoppingCart) (*shoppingcart.Cart, error) { + cart := &shoppingcart.Cart{} + for _, item := range sc.cart { + cart.Items = append(cart.Items, &shoppingcart.LineItem{ + ProductId: item.ProductId, + Name: item.Name, + Quantity: item.Quantity, + }) + } + return cart, nil +} +//#get-cart + +func (sc *ShoppingCart) Snapshot() (snapshot interface{}, err error) { + return domain.Cart{ + Items: append(make([]*domain.LineItem, len(sc.cart)), sc.cart...), + }, nil +} + +func (sc *ShoppingCart) HandleSnapshot(snapshot interface{}) (handled bool, err error) { + switch value := snapshot.(type) { + case domain.Cart: + sc.cart = append(sc.cart[:0], value.Items...) + return true, nil + default: + return false, nil + } +} + +// find finds a product in the shopping cart by productId and returns it as a LineItem. +func (sc *ShoppingCart) find(productId string) (item *domain.LineItem, index int) { + for i, item := range sc.cart { + if productId == item.ProductId { + return item, i + } + } + return nil, 0 +} + +// remove removes a product from the shopping cart. +// +// A ok flag is returned to indicate that the product was present and removed. +func (sc *ShoppingCart) remove(productId string) (ok bool) { + if item, i := sc.find(productId); item != nil { + // remove and re-slice + copy(sc.cart[i:], sc.cart[i+1:]) + sc.cart = sc.cart[:len(sc.cart)-1] + return true + } else { + return false + } +} + +func init() { + log.SetFlags(log.LstdFlags | log.Lmicroseconds) +} diff --git a/docs/src/main/paradox/user/lang/index.md b/docs/src/main/paradox/user/lang/index.md index 9a75638f4..d51437242 100644 --- a/docs/src/main/paradox/user/lang/index.md +++ b/docs/src/main/paradox/user/lang/index.md @@ -8,5 +8,6 @@ CloudState user functions can be implemented in any language that supports gRPC. * [JavaScript](javascript/index.md) * [Java](java/index.md) +* [Go](go/index.md) @@@ From bfec1c36f9d49e3b717b43210285e2a58f59c80a Mon Sep 17 00:00:00 2001 From: Marcel Lanz Date: Sat, 12 Oct 2019 00:38:13 +0200 Subject: [PATCH 3/7] [go-support] enable the TCK to run the shopping cart example for Go. --- tck/src/it/resources/application.conf | 26 +++++++++++++++++++ .../io/cloudstate/tck/CloudStateTCK.scala | 2 +- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/tck/src/it/resources/application.conf b/tck/src/it/resources/application.conf index 94fa08441..394cdff9a 100644 --- a/tck/src/it/resources/application.conf +++ b/tck/src/it/resources/application.conf @@ -87,4 +87,30 @@ cloudstate-tck.combinations = [{ PORT = "8088" } } +},{ + name = "Akka + Go" + tck { + hostname = "0.0.0.0" + port = 8090 + } + proxy { + hostname = "127.0.0.1" + port = 9000 + directory = ${user.dir} + command = ["java","-Xmx512M", "-Xms128M", "-Dconfig.resource=in-memory.conf", "-Dcloudstate.proxy.dev-mode-enabled=true", "-jar", "proxy/core/target/scala-2.12/akka-proxy.jar"] + env-vars { + USER_FUNCTION_PORT = "8090" + } + } + + frontend { + hostname = "127.0.0.1" + port = 8080 + directory = ${user.dir} + command = ["docker", "run", "--rm", "-p", "127.0.0.1:8080:8080", "gcr.io/mrcllnz/cloudstate-go-tck:latest"] + env-vars { + HOST = "127.0.0.1" + PORT = "8080" + } + } }] \ No newline at end of file diff --git a/tck/src/main/scala/io/cloudstate/tck/CloudStateTCK.scala b/tck/src/main/scala/io/cloudstate/tck/CloudStateTCK.scala index ae1fc6ad0..5a0461972 100644 --- a/tck/src/main/scala/io/cloudstate/tck/CloudStateTCK.scala +++ b/tck/src/main/scala/io/cloudstate/tck/CloudStateTCK.scala @@ -352,7 +352,7 @@ class CloudStateTCK(private[this] final val config: CloudStateTCK.Configuration) cmd.id must not be commandId } - ("The TCK for" + config.name) must { + ("The TCK for " + config.name) must { implicit val scheduler = system.scheduler "verify that the user function process responds" in { From f2423469dd87a6daf39acc1624a7e7ed1620048d Mon Sep 17 00:00:00 2001 From: Marcel Lanz Date: Wed, 16 Oct 2019 02:23:12 +0200 Subject: [PATCH 4/7] [PR Review Feedback] first batch of changes after review of https://github.com/cloudstateio/go-support/pull/5 --- docs/src/main/paradox/user/lang/go/src/shoppingcart.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/src/main/paradox/user/lang/go/src/shoppingcart.go b/docs/src/main/paradox/user/lang/go/src/shoppingcart.go index d22a7c153..b12ced57f 100644 --- a/docs/src/main/paradox/user/lang/go/src/shoppingcart.go +++ b/docs/src/main/paradox/user/lang/go/src/shoppingcart.go @@ -31,7 +31,7 @@ import ( // as a event sourced entity. //#shopping-cart-main func main() { - cloudState := cloudstate.NewCloudState(&cloudstate.Options{ + cloudState := cloudstate.New(cloudstate.Options{ ServiceName: "shopping-cart", ServiceVersion: "0.1.0", }) From 63cdb4f65a85d2a04a6c08c57e15db43081ac4cf Mon Sep 17 00:00:00 2001 From: Marcel Lanz Date: Tue, 22 Oct 2019 07:34:25 +0200 Subject: [PATCH 5/7] [go-support] Update doc and config to Cloudstate instead of CloudState --- .../main/paradox/user/lang/go/eventsourced.md | 12 +++++------ .../paradox/user/lang/go/gettingstarted.md | 20 +++++++++---------- docs/src/main/paradox/user/lang/go/index.md | 2 +- .../paradox/user/lang/go/serialization.md | 10 +++++----- .../paradox/user/lang/go/src/eventsourced.go | 12 +++++------ .../paradox/user/lang/go/src/shoppingcart.go | 8 ++++---- 6 files changed, 32 insertions(+), 32 deletions(-) diff --git a/docs/src/main/paradox/user/lang/go/eventsourced.md b/docs/src/main/paradox/user/lang/go/eventsourced.md index ca80964bf..fc0454115 100644 --- a/docs/src/main/paradox/user/lang/go/eventsourced.md +++ b/docs/src/main/paradox/user/lang/go/eventsourced.md @@ -1,12 +1,12 @@ # Event sourcing -This page documents how to implement CloudState event sourced entities in Go. For information on what CloudState event sourced entities are, please read the general @ref[Event sourcing](../../features/eventsourced.md) documentation first. +This page documents how to implement Cloudstate event sourced entities in Go. For information on what Cloudstate event sourced entities are, please read the general @ref[Event sourcing](../../features/eventsourced.md) documentation first. An event sourced entity can be created by embedding the `cloudstate.EventEmitter` type and also implementing the `cloudstate.EntityInitializer` interface. @@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #entity-type } -Then by composing the CloudState entity with an `cloudstate.EventSourcedEntity` and register it with `cloudState.Register()`, your entity gets configured to be an event sourced entity and handled by the CloudState instance for now on. +Then by composing the Cloudstate entity with an `cloudstate.EventSourcedEntity` and register it with `cloudState.Register()`, your entity gets configured to be an event sourced entity and handled by the Cloudstate instance for now on. @@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/eventsourced.go) { #event-sourced-entity-type } @@ -16,7 +16,7 @@ The `SnapshotEvery` parameter controls how often snapshots are taken, so that th ## Persistence types and serialization -Event sourced entities persist events and snapshots, and these need to be serialized when persisted. The most straight forward way to persist events and snapshots is to use protobufs. CloudState will automatically detect if an emitted event is a protobuf, and serialize it as such. For other serialization options, including JSON, see @ref:[Serialization](serialization.md). +Event sourced entities persist events and snapshots, and these need to be serialized when persisted. The most straight forward way to persist events and snapshots is to use protobufs. Cloudstate will automatically detect if an emitted event is a protobuf, and serialize it as such. For other serialization options, including JSON, see @ref:[Serialization](serialization.md). While protobufs are the recommended format for persisting events, it is recommended that you do not persist your services protobuf messages, rather, you should create new messages, even if they are identical to the services. While this may introduce some overhead in needing to convert from one type to the other, the reason for doing this is that it will allow the services public interface to evolve independently from its data storage format, which should be private. @@ -32,7 +32,7 @@ Each entity should store its state locally in a mutable variable, either a mutab ## Constructing -The CloudState Go Support Library needs to know how to construct and initialize entities. For this, an entity has to implement the `cloudstate.EntityInitializer` interface. +The Cloudstate Go Support Library needs to know how to construct and initialize entities. For this, an entity has to implement the `cloudstate.EntityInitializer` interface. (TODO: provide: The constructor below shows having the entity id injected) @@ -40,7 +40,7 @@ The CloudState Go Support Library needs to know how to construct and initialize ## Handling commands -Command handlers are declared by implementing the gRPC ShoppingCartServer interface which is generated from the protobuf definitions. The CloudState Go Support library together with the registered ServiceName in the `cloudstate.EventSourcedEntity` is then able to dispatch commands it gets from the CloudState proxy. +Command handlers are declared by implementing the gRPC ShoppingCartServer interface which is generated from the protobuf definitions. The Cloudstate Go Support library together with the registered ServiceName in the `cloudstate.EventSourcedEntity` is then able to dispatch commands it gets from the Cloudstate proxy. The return type of the command handler is by definition of the service interface, the output type for the gRPC service call, this will be sent as the reply. @@ -86,7 +86,7 @@ Multiple behaviors are not supported yet by the Go support library. ## Registering the entity -Once you've created your entity, you can register it with the `cloudstate.CloudState` server, by invoking the `Register` method of an CloudState instance. In addition to passing your entity type and service name, you also need to pass any descriptors that you use for persisting events, for example, the `domain.proto` descriptor. +Once you've created your entity, you can register it with the `cloudstate.Cloudstate` server, by invoking the `Register` method of an Cloudstate instance. In addition to passing your entity type and service name, you also need to pass any descriptors that you use for persisting events, for example, the `domain.proto` descriptor. During registration the oprtional ServiceName and the ServiceVersion can be configured as Options. diff --git a/docs/src/main/paradox/user/lang/go/gettingstarted.md b/docs/src/main/paradox/user/lang/go/gettingstarted.md index 6e90e0324..231b67fad 100644 --- a/docs/src/main/paradox/user/lang/go/gettingstarted.md +++ b/docs/src/main/paradox/user/lang/go/gettingstarted.md @@ -3,18 +3,18 @@ ## Prerequisites Go version -: CloudState Go support requires at least Go $cloudstate.go.version$ +: Cloudstate Go support requires at least Go $cloudstate.go.version$ Build tool -: CloudState does not require any particular build tool, you can select your own. +: Cloudstate does not require any particular build tool, you can select your own. protoc -: Since CloudState is based on gRPC, you need a protoc compiler to compile gRPC protobuf descriptors. This can be done manually through the [Protocol Buffer Compiler project](https://github.com/protocolbuffers/protobuf#protocol-compiler-installation). +: Since Cloudstate is based on gRPC, you need a protoc compiler to compile gRPC protobuf descriptors. This can be done manually through the [Protocol Buffer Compiler project](https://github.com/protocolbuffers/protobuf#protocol-compiler-installation). docker -: CloudState runs in Kubernetes with [Docker](https://www.docker.com), hence you will need Docker to build a container that you can deploy to Kubernetes. Most popular build tools have plugins that assist in building Docker images. +: Cloudstate runs in Kubernetes with [Docker](https://www.docker.com), hence you will need Docker to build a container that you can deploy to Kubernetes. Most popular build tools have plugins that assist in building Docker images. -In addition to the above, you will need to install the CloudState Go support library by issuing `go get -u github.com/cloudstateio/go-support` or with Go module support let the dependency be downloaded by `go [build|run|test]`. +In addition to the above, you will need to install the Cloudstate Go support library by issuing `go get -u github.com/cloudstateio/go-support` or with Go module support let the dependency be downloaded by `go [build|run|test]`. By using the Go module support your go.mod file will reference the latest version of the support library or you can define which version you like to use. @@ -45,7 +45,7 @@ go $cloudstate.go.version$ ## Protobuf files -The CloudState Go Support Library provides no dedicated tool beside the protoc compiler to build your protobuf files. The CloudState protocol protobuf files as well as the shopping cart example application protobuf files are provided by the CloudState Repository. +The Cloudstate Go Support Library provides no dedicated tool beside the protoc compiler to build your protobuf files. The Cloudstate protocol protobuf files as well as the shopping cart example application protobuf files are provided by the Cloudstate Repository. In addition to the protoc compiler, the gRPC Go plugin is needed to compile the protobuf file to *.pb.go files. Please follow the instructions at the [Go support for Protocol Buffers](https://github.com/golang/protobuf) project page to install the protoc compiler as well as the `protoc-gen-go` plugin which also includes the Google standard protobuf types. @@ -61,7 +61,7 @@ Now if you place your protobuf files under protobuf/ and run `protoc --go_out=. ## Creating a main package -Your main package will be responsible for creating the CloudState gRPC server, registering the entities for it to serve, and starting it. To do this, you can use the CloudState server type, for example: +Your main package will be responsible for creating the Cloudstate gRPC server, registering the entities for it to serve, and starting it. To do this, you can use the Cloudstate server type, for example: @@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #shopping-cart-main } @@ -69,10 +69,10 @@ We will see more details on registering entities in the coming pages. ## Interfaces to be implemented -CloudState entities in Go work by implementing interfaces and composing types. +Cloudstate entities in Go work by implementing interfaces and composing types. -To get support for the CloudState event emission the CloudState entity should embed the `cloudstate.EventEmitter` type. The EventEmitter allows the entity to emit events during the handling of commands. +To get support for the Cloudstate event emission the Cloudstate entity should embed the `cloudstate.EventEmitter` type. The EventEmitter allows the entity to emit events during the handling of commands. -Second, by implementing the `cloudstate.EntityInitializer` interface with its `New()` method, a CloudState instance gets to know how to create and initialize an event sourced entity. +Second, by implementing the `cloudstate.EntityInitializer` interface with its `New()` method, a Cloudstate instance gets to know how to create and initialize an event sourced entity. @@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #compose-entity } diff --git a/docs/src/main/paradox/user/lang/go/index.md b/docs/src/main/paradox/user/lang/go/index.md index 1142b7766..d2c578ed3 100644 --- a/docs/src/main/paradox/user/lang/go/index.md +++ b/docs/src/main/paradox/user/lang/go/index.md @@ -1,6 +1,6 @@ # Go -CloudState offers an idiomatic, annotation based Go support library for writing stateful services. +Cloudstate offers an idiomatic, annotation based Go support library for writing stateful services. @@toc { depth=1 } diff --git a/docs/src/main/paradox/user/lang/go/serialization.md b/docs/src/main/paradox/user/lang/go/serialization.md index 456e4397d..1d3abb586 100644 --- a/docs/src/main/paradox/user/lang/go/serialization.md +++ b/docs/src/main/paradox/user/lang/go/serialization.md @@ -1,15 +1,15 @@ # Serialization -CloudState functions serve gRPC interfaces, and naturally the input messages and output messages are protobuf messages that get serialized to the protobuf wire format. However, in addition to these messages, there are a number of places where CloudState needs to serialize other objects, for persistence and replication. This includes: +Cloudstate functions serve gRPC interfaces, and naturally the input messages and output messages are protobuf messages that get serialized to the protobuf wire format. However, in addition to these messages, there are a number of places where Cloudstate needs to serialize other objects, for persistence and replication. This includes: * Event sourced @ref[events and snapshots](eventsourced.md#persistence-types-and-serialization). * CRDT @ref[map keys and set elements](crdt.md), and @ref[LWWRegister values](crdt.md). -CloudState supports a number of types and serialization options for these values. +Cloudstate supports a number of types and serialization options for these values. ## Primitive types -CloudState supports serializing the following primitive types: +Cloudstate supports serializing the following primitive types: | Protobuf type | Go type | |---------------|-------------| @@ -24,12 +24,12 @@ CloudState supports serializing the following primitive types: The details of how these are serialized can be found @ref[here](../../../developer/language-support/serialization.md#primitive-values). @@@ note { title=Important } -Go has a set of [predeclared numeric](https://golang.org/ref/spec#Numeric_types) types with implementation-specific sizes. One of them is `int` which would be an int64 on 64-bit systems CPU architectures. CloudState does not support implicit conversion between an `int` and the corresponding `int64` as an input type for the serialization. The main reason not to support it is, that an `int` is not the same type as an `int64` and therefore a de-serialized value would have to be converted back to an `int` as it is of type `int64` during its serialized state. +Go has a set of [predeclared numeric](https://golang.org/ref/spec#Numeric_types) types with implementation-specific sizes. One of them is `int` which would be an int64 on 64-bit systems CPU architectures. Cloudstate does not support implicit conversion between an `int` and the corresponding `int64` as an input type for the serialization. The main reason not to support it is, that an `int` is not the same type as an `int64` and therefore a de-serialized value would have to be converted back to an `int` as it is of type `int64` during its serialized state. @@@ ## JSON -CloudState uses the standard library package [`encoding/json`](https://golang.org/pkg/encoding/json/) to serialize JSON. Any type that has a field declared with a string literal tag ``json:"fieldname"`` will be serialized to and from JSON using the [Marshaller and Unmarshaller](https://golang.org/pkg/encoding/json/#Marshal) from the Go standard library package `encoding/json`. +Cloudstate uses the standard library package [`encoding/json`](https://golang.org/pkg/encoding/json/) to serialize JSON. Any type that has a field declared with a string literal tag ``json:"fieldname"`` will be serialized to and from JSON using the [Marshaller and Unmarshaller](https://golang.org/pkg/encoding/json/#Marshal) from the Go standard library package `encoding/json`. The details of how these are serialized can be found @ref[here](../../../developer/language-support/serialization.md#json-values). diff --git a/docs/src/main/paradox/user/lang/go/src/eventsourced.go b/docs/src/main/paradox/user/lang/go/src/eventsourced.go index 616b2a49b..401a67a8a 100644 --- a/docs/src/main/paradox/user/lang/go/src/eventsourced.go +++ b/docs/src/main/paradox/user/lang/go/src/eventsourced.go @@ -43,13 +43,13 @@ type EntityInitializer interface { const snapshotEveryDefault = 100 // EventSourcedEntity captures an Entity, its ServiceName and PersistenceID. -// It is used to be registered as an event sourced entity on a CloudState instance. +// It is used to be registered as an event sourced entity on a Cloudstate instance. //#event-sourced-entity-type type EventSourcedEntity struct { // Entity is a nil or Zero-Initialized reference // to the entity to be event sourced. It has to // implement the EntityInitializer interface - // so that CloudState can create new entity instances. + // so that Cloudstate can create new entity instances. Entity Entity // ServiceName is used to… // Setting it is optional. @@ -313,7 +313,7 @@ func (esh *EventSourcedHandler) handleEvent(entityId string, event *protocol.Eve return err } -// handleCommand handles a command received from the CloudState proxy. +// handleCommand handles a command received from the Cloudstate proxy. // // TODO: remove these following lines of comment // "Unary RPCs where the client sends a single request to the server and @@ -328,12 +328,12 @@ func (esh *EventSourcedHandler) handleEvent(entityId string, event *protocol.Eve // - a streamed flag, (TODO: for what?) // // together, these properties allow to call a method of the entities registered service and -// return its response as a reply to the CloudState proxy. +// return its response as a reply to the Cloudstate proxy. // // Events: // Beside calling the service method, we have to collect "events" the service might emit. // These events afterwards have to be handled by a EventHandler to update the state of the -// entity. The CloudState proxy can re-play these events at any time +// entity. The Cloudstate proxy can re-play these events at any time func (esh *EventSourcedHandler) handleCommand(cmd *protocol.Command, server protocol.EventSourced_HandleServer) error { // method to call method, err := esh.methodToCall(cmd) @@ -508,7 +508,7 @@ func (esh EventSourcedHandler) applyEvent(entityInstance *EntityInstance, event // // Event sourced entities persist events and snapshots, and these need to be // serialized when persisted. The most straight forward way to persist events -// and snapshots is to use protobufs. CloudState will automatically detect if +// and snapshots is to use protobufs. Cloudstate will automatically detect if // an emitted event is a protobuf, and serialize it as such. For other // serialization options, including JSON, see Serialization. func (EventSourcedHandler) handleEvents(entityInstance *EntityInstance, events ...*protocol.EventSourcedEvent) error { diff --git a/docs/src/main/paradox/user/lang/go/src/shoppingcart.go b/docs/src/main/paradox/user/lang/go/src/shoppingcart.go index b12ced57f..a8a64a907 100644 --- a/docs/src/main/paradox/user/lang/go/src/shoppingcart.go +++ b/docs/src/main/paradox/user/lang/go/src/shoppingcart.go @@ -27,7 +27,7 @@ import ( "log" ) -// main creates a CloudState instance and registers the ShoppingCart +// main creates a Cloudstate instance and registers the ShoppingCart // as a event sourced entity. //#shopping-cart-main func main() { @@ -47,16 +47,16 @@ func main() { ) //#register if err != nil { - log.Fatalf("CloudState failed to register entity: %v", err) + log.Fatalf("Cloudstate failed to register entity: %v", err) } err = cloudState.Run() if err != nil { - log.Fatalf("CloudState failed to run: %v", err) + log.Fatalf("Cloudstate failed to run: %v", err) } } //#shopping-cart-main -// A CloudState event sourced entity. +// A Cloudstate event sourced entity. //#entity-type //#compose-entity type ShoppingCart struct { From fd409c0797e4ea976c99ee02ee41c6508e45758e Mon Sep 17 00:00:00 2001 From: Marcel Lanz Date: Thu, 24 Oct 2019 12:09:19 +0200 Subject: [PATCH 6/7] [PR-review] fixes and issues addressed from the review feedback as well as API changes reflected from the latest updates to the go-support repo. --- docs/src/main/paradox/user/lang/go/api.md | 2 +- .../main/paradox/user/lang/go/eventsourced.md | 49 ++- .../paradox/user/lang/go/gettingstarted.md | 16 +- docs/src/main/paradox/user/lang/go/index.md | 2 +- .../paradox/user/lang/go/serialization.md | 3 +- .../main/paradox/user/lang/go/src/event.go | 17 +- .../paradox/user/lang/go/src/eventsourced.go | 360 ++++++------------ .../paradox/user/lang/go/src/shoppingcart.go | 75 ++-- 8 files changed, 227 insertions(+), 297 deletions(-) diff --git a/docs/src/main/paradox/user/lang/go/api.md b/docs/src/main/paradox/user/lang/go/api.md index 0022a9cea..b0a8051a6 100644 --- a/docs/src/main/paradox/user/lang/go/api.md +++ b/docs/src/main/paradox/user/lang/go/api.md @@ -1,3 +1,3 @@ # Go API docs -The Go API docs can be found [here](https://godoc.org/github.com/cloudstateio/go-support). \ No newline at end of file +The Go API docs can be found [here](https://godoc.org/github.com/cloudstateio/go-support/cloudstate). \ No newline at end of file diff --git a/docs/src/main/paradox/user/lang/go/eventsourced.md b/docs/src/main/paradox/user/lang/go/eventsourced.md index fc0454115..9a237371c 100644 --- a/docs/src/main/paradox/user/lang/go/eventsourced.md +++ b/docs/src/main/paradox/user/lang/go/eventsourced.md @@ -2,18 +2,22 @@ This page documents how to implement Cloudstate event sourced entities in Go. For information on what Cloudstate event sourced entities are, please read the general @ref[Event sourcing](../../features/eventsourced.md) documentation first. -An event sourced entity can be created by embedding the `cloudstate.EventEmitter` type and also implementing the `cloudstate.EntityInitializer` interface. +An event sourced entity can be created by embedding the `cloudstate.EventEmitter` type and also implementing the `cloudstate.Entity` interface. @@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #entity-type } -Then by composing the Cloudstate entity with an `cloudstate.EventSourcedEntity` and register it with `cloudState.Register()`, your entity gets configured to be an event sourced entity and handled by the Cloudstate instance for now on. +Then by composing the Cloudstate entity with an `cloudstate.EventSourcedEntity` and register it with `CloudState.Register()`, your entity gets configured to be an event sourced entity and handled by the Cloudstate instance for now on. @@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/eventsourced.go) { #event-sourced-entity-type } -The `PersistenceID` is used to namespace events in the journal, useful for when you share the same database between multiple entities. It defaults to the simple name for the entity type (in this case, `ShoppingCart`), it's good practice to select one explicitly, this means your database isn't depend on type names in your code. +The `ServiceName` is the fully qualified name of the gRPC service that implements this entities interface. Setting it is mandatory. + +The `PersistenceID` is used to namespace events in the journal, useful for when you share the same database between multiple entities. It is recommended to be the name for the entity type (in this case, `ShoppingCart`) and is set to be mandatory. The `SnapshotEvery` parameter controls how often snapshots are taken, so that the entity doesn't need to be recovered from the whole journal each time it's loaded. If left unset, it defaults to 100. Setting it to a negative number will result in snapshots never being taken. +The `EntityFunc` is a factory method which generates a new Entity whenever Cloudstate has to initialize a new entity. + ## Persistence types and serialization Event sourced entities persist events and snapshots, and these need to be serialized when persisted. The most straight forward way to persist events and snapshots is to use protobufs. Cloudstate will automatically detect if an emitted event is a protobuf, and serialize it as such. For other serialization options, including JSON, see @ref:[Serialization](serialization.md). @@ -32,15 +36,21 @@ Each entity should store its state locally in a mutable variable, either a mutab ## Constructing -The Cloudstate Go Support Library needs to know how to construct and initialize entities. For this, an entity has to implement the `cloudstate.EntityInitializer` interface. +The Cloudstate Go Support Library needs to know how to construct and initialize entities. For this, an entity has to provide a factory function, `EntityFunc`, which is set during registration of the event sourced entity. + +@@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #register } -(TODO: provide: The constructor below shows having the entity id injected) +The entity factory function returns a `cloudstate.Entity` which is composed of two interfaces to handle commands and events. @@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #constructing } ## Handling commands -Command handlers are declared by implementing the gRPC ShoppingCartServer interface which is generated from the protobuf definitions. The Cloudstate Go Support library together with the registered ServiceName in the `cloudstate.EventSourcedEntity` is then able to dispatch commands it gets from the Cloudstate proxy. +An event sourced entity implements the composed `cloudstate.Entity` interface. `cloudstate.Entity` embeds the `cloudstate.EventHandler` interface and therefore entities implementing it get commands from Cloudstate through the event handlers `HandleCommand` method. + +The command types received by an event sourced entity are declared by the gRPC Server interface which is generated from the protobuf definitions. The Cloudstate Go Support library together with the registered `cloudstate.EventSourcedEntity` is then able to dispatch commands it gets from the Cloudstate proxy to the event sourced entity. + +@@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #handle-command } The return type of the command handler is by definition of the service interface, the output type for the gRPC service call, this will be sent as the reply. @@ -68,26 +78,41 @@ This command handler also validates the command, ensuring the quantity items add Event handlers are invoked at two points, when restoring entities from the journal, before any commands are handled, and each time a new event is emitted. An event handlers responsibility is to update the state of the entity according to the event. Event handlers are the only place where its safe to mutate the state of the entity at all. -Event handlers are declared by either implementing the `cloudstate.EventHandler` interface +Event handlers are declared by implementing the `cloudstate.EventHandler` interface. @@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/event.go) { #event-handler } -or implementing an unary method that matches the type of the event to be handled. Event handlers are differentiated by the type of event they handle. By default, the type of event an event handler handles will be determined by looking for a single argument that the event handler takes. If for any reason this needs to be overridden, or if the event handler method doesn't exists at all, the event is handed over to the `cloudstate.EventHandler` `Handle` method when the entity implements that interface. The by implementing the `HandleEvent(event interface{}) (handled bool, err error)` method, a event handler indicates if he handled the event or if any occurred, returns an error. The returned error has precedent and the handled flag would not be considered. +Emitted events by command handlers get dispatched to the implemented event handler which then decides how to proceed with the event. + +@@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #handle-event } -Here's an example event handler for the `ItemAdded` event. +Here's an example of a concrete event handler for the `ItemAdded` event. @@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #item-added } ## Producing and handling snapshots -## Multiple behaviors +Snapshots are an important optimisation for event sourced entities that may contain many events, to ensure that they can be loaded quickly even when they have very long journals. To produce a snapshot, the `cloudstate.Snapshotter` interface has to be implemented that must return a snapshot of the current state in serializable form. + +@@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/event.go) { #snapshotter } + +Here is an example of the TCK shopping cart example creating snapshots for the current `domain.Cart` state of the shopping cart. + +@@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #snapshotter } + +When the entity is loaded again, the snapshot will first be loaded before any other events are received, and passed to a snapshot handler. Snapshot handlers are declared by implementing the `cloudstate.SnapshotHandler` interface. + +@@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/event.go) { #snapshot-handler } + +A snapshot handler then can type-switch over types the corresponding `cloudstate.Snapshotter` interface has implemented. -Multiple behaviors are not supported yet by the Go support library. +@@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #handle-snapshot } ## Registering the entity Once you've created your entity, you can register it with the `cloudstate.Cloudstate` server, by invoking the `Register` method of an Cloudstate instance. In addition to passing your entity type and service name, you also need to pass any descriptors that you use for persisting events, for example, the `domain.proto` descriptor. -During registration the oprtional ServiceName and the ServiceVersion can be configured as Options. +During registration the optional ServiceName and the ServiceVersion can be configured. +(TODO: give an example on how to pick values for these after the spec defines semantics ) @@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #register } \ No newline at end of file diff --git a/docs/src/main/paradox/user/lang/go/gettingstarted.md b/docs/src/main/paradox/user/lang/go/gettingstarted.md index 231b67fad..4dd298f47 100644 --- a/docs/src/main/paradox/user/lang/go/gettingstarted.md +++ b/docs/src/main/paradox/user/lang/go/gettingstarted.md @@ -14,21 +14,21 @@ protoc docker : Cloudstate runs in Kubernetes with [Docker](https://www.docker.com), hence you will need Docker to build a container that you can deploy to Kubernetes. Most popular build tools have plugins that assist in building Docker images. -In addition to the above, you will need to install the Cloudstate Go support library by issuing `go get -u github.com/cloudstateio/go-support` or with Go module support let the dependency be downloaded by `go [build|run|test]`. +In addition to the above, you will need to install the Cloudstate Go support library by issuing `go get -u github.com/cloudstateio/go-support/cloudstate` or with Go module support let the dependency be downloaded by `go [build|run|test]`. By using the Go module support your go.mod file will reference the latest version of the support library or you can define which version you like to use. go get : @@@vars ```text -go get -u github.com/cloudstateio/go-support +go get -u github.com/cloudstateio/go-support/cloudstate ``` @@@ import path : @@@vars ```text -import "github.com/cloudstateio/go-support" +import "github.com/cloudstateio/go-support/cloudstate" ``` @@@ @@ -57,11 +57,9 @@ option go_package = "example/shoppingcart"; Now if you place your protobuf files under protobuf/ and run `protoc --go_out=. --proto_path=protobuf ./protobuf/*.proto`, you'll find your generated protobuf files in `example/shoppingcart`. -## Creating and starting a server - ## Creating a main package -Your main package will be responsible for creating the Cloudstate gRPC server, registering the entities for it to serve, and starting it. To do this, you can use the Cloudstate server type, for example: +Your main package will be responsible for creating the Cloudstate gRPC server, registering the entities for it to serve, and starting it. To do this, you can use the CloudState server type, for example: @@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #shopping-cart-main } @@ -73,6 +71,8 @@ Cloudstate entities in Go work by implementing interfaces and composing types. To get support for the Cloudstate event emission the Cloudstate entity should embed the `cloudstate.EventEmitter` type. The EventEmitter allows the entity to emit events during the handling of commands. -Second, by implementing the `cloudstate.EntityInitializer` interface with its `New()` method, a Cloudstate instance gets to know how to create and initialize an event sourced entity. +Second, during registration of the entity, an entity factory function has to be provided so Cloudstate gets to know how to create and initialize an event sourced entity. + +@@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/eventsourced.go) { #event-sourced-entity-func } -@@snip [shoppingcart.go](/docs/src/main/paradox/user/lang/go/src/shoppingcart.go) { #compose-entity } +This entity factory function returns an `cloudstate.Entity` which itself is a composite interface of a `cloudstate.CommandHandler` and a `cloudstate.EventHandler`. Every event sourced entity has to implement these two interfaces. diff --git a/docs/src/main/paradox/user/lang/go/index.md b/docs/src/main/paradox/user/lang/go/index.md index d2c578ed3..07cb416f3 100644 --- a/docs/src/main/paradox/user/lang/go/index.md +++ b/docs/src/main/paradox/user/lang/go/index.md @@ -1,6 +1,6 @@ # Go -Cloudstate offers an idiomatic, annotation based Go support library for writing stateful services. +Cloudstate offers an idiomatic Go support library for writing stateful services. @@toc { depth=1 } diff --git a/docs/src/main/paradox/user/lang/go/serialization.md b/docs/src/main/paradox/user/lang/go/serialization.md index 1d3abb586..78118c5dc 100644 --- a/docs/src/main/paradox/user/lang/go/serialization.md +++ b/docs/src/main/paradox/user/lang/go/serialization.md @@ -33,4 +33,5 @@ Cloudstate uses the standard library package [`encoding/json`](https://golang.or The details of how these are serialized can be found @ref[here](../../../developer/language-support/serialization.md#json-values). -Note that if you are using JSON values in CRDT sets or maps, the serialization of these values **must** be stable. This means you must not use maps or sets in your value, and you should define an explicit ordering for the fields in your objects. **(TODO: mention the ordering of fields here by the Go standard library implementation).** +Note that if you are using JSON values in CRDT sets or maps, the serialization of these values **must** be stable. This means you must not use maps or sets in your value, and you should define an explicit ordering for the fields in your objects. +**(TODO: mention the ordering of fields here by the Go standard library implementation).** diff --git a/docs/src/main/paradox/user/lang/go/src/event.go b/docs/src/main/paradox/user/lang/go/src/event.go index af7bf745f..efc486ac4 100644 --- a/docs/src/main/paradox/user/lang/go/src/event.go +++ b/docs/src/main/paradox/user/lang/go/src/event.go @@ -15,7 +15,10 @@ package cloudstate -import "fmt" +import ( + "context" + "fmt" +) type OnNext func(event interface{}) error type OnErr func(err error) @@ -89,10 +92,20 @@ type EventHandler interface { } //#event-handler +//#command-handler +type CommandHandler interface { + HandleCommand(ctx context.Context, command interface{}) (handled bool, reply interface{}, err error) +} +//#command-handler + +//#snapshotter type Snapshotter interface { Snapshot() (snapshot interface{}, err error) } +//#snapshotter +//#snapshot-handler type SnapshotHandler interface { HandleSnapshot(snapshot interface{}) (handled bool, err error) -} \ No newline at end of file +} +//#snapshot-handler diff --git a/docs/src/main/paradox/user/lang/go/src/eventsourced.go b/docs/src/main/paradox/user/lang/go/src/eventsourced.go index 401a67a8a..51b65aa20 100644 --- a/docs/src/main/paradox/user/lang/go/src/eventsourced.go +++ b/docs/src/main/paradox/user/lang/go/src/eventsourced.go @@ -17,49 +17,40 @@ package cloudstate import ( "context" - "errors" "fmt" - "github.com/cloudstateio/go-support/cloudstate/encoding" - "github.com/cloudstateio/go-support/cloudstate/protocol" - "github.com/golang/protobuf/proto" - "github.com/golang/protobuf/ptypes/any" "io" "net/url" "reflect" "strings" "sync" + + "github.com/cloudstateio/go-support/cloudstate/encoding" + "github.com/cloudstateio/go-support/cloudstate/protocol" + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/any" ) // Entity type Entity interface { - EntityInitializer -} - -// An EntityInitializer knows how to initialize an Entity -type EntityInitializer interface { - New() interface{} + CommandHandler + EventHandler } const snapshotEveryDefault = 100 // EventSourcedEntity captures an Entity, its ServiceName and PersistenceID. -// It is used to be registered as an event sourced entity on a Cloudstate instance. +// It is used to be registered as an event sourced entity on a CloudState instance. //#event-sourced-entity-type type EventSourcedEntity struct { - // Entity is a nil or Zero-Initialized reference - // to the entity to be event sourced. It has to - // implement the EntityInitializer interface - // so that Cloudstate can create new entity instances. - Entity Entity - // ServiceName is used to… - // Setting it is optional. + // ServiceName is the fully qualified name of the gRPC service that implements this entities interface. + // Setting it is mandatory. ServiceName string // PersistenceID is used to namespace events in the journal, useful for // when you share the same database between multiple entities. It defaults to // the simple name for the entity type. // It’s good practice to select one explicitly, this means your database // isn’t depend on type names in your code. - // Setting it is optional. + // Setting it is mandatory. PersistenceID string // The snapshotEvery parameter controls how often snapshots are taken, // so that the entity doesn't need to be recovered from the whole journal @@ -67,29 +58,17 @@ type EventSourcedEntity struct { // Setting it to a negative number will result in snapshots never being taken. SnapshotEvery int64 - // internal - entityName string - registerOnce sync.Once +//#event-sourced-entity-func + // EntityFactory is a factory method which generates a new Entity. + EntityFunc func() Entity +//#event-sourced-entity-func } //#event-sourced-entity-type -// initZeroValue get its Entity type and Zero-Value it to +// init get its Entity type and Zero-Value it to // something we can use as an initializer. -func (e *EventSourcedEntity) initZeroValue() error { - if reflect.ValueOf(e.Entity).IsNil() { - t := reflect.TypeOf(e.Entity) - if t.Kind() == reflect.Ptr { // TODO: how deep can that go? - t = t.Elem() - } - value := reflect.New(t).Interface() - if ei, ok := value.(EntityInitializer); ok { - e.Entity = ei - } else { - return errors.New("the Entity does not implement EntityInitializer") - } - e.entityName = t.Name() - e.SnapshotEvery = snapshotEveryDefault - } +func (e *EventSourcedEntity) init() error { + e.SnapshotEvery = snapshotEveryDefault return nil } @@ -127,33 +106,31 @@ func (c EntityInstanceContext) ServiceName() string { return c.EntityInstance.EventSourcedEntity.ServiceName } -// EventSourcedHandler is the implementation of the EventSourcedHandler server API for EventSourced service. -type EventSourcedHandler struct { +// EventSourcedServer is the implementation of the EventSourcedServer server API for EventSourced service. +type EventSourcedServer struct { // entities are indexed by their service name entities map[string]*EventSourcedEntity // contexts are entity instance contexts indexed by their entity ids contexts map[string]*EntityInstanceContext - // cmdMethodCache is the command handler method cache - cmdMethodCache map[string]reflect.Method } -// NewEventSourcedHandler returns an initialized EventSourcedHandler -func NewEventSourcedHandler() *EventSourcedHandler { - return &EventSourcedHandler{ - entities: make(map[string]*EventSourcedEntity), - contexts: make(map[string]*EntityInstanceContext), - cmdMethodCache: make(map[string]reflect.Method), +// newEventSourcedServer returns an initialized EventSourcedServer +func newEventSourcedServer() *EventSourcedServer { + return &EventSourcedServer{ + entities: make(map[string]*EventSourcedEntity), + contexts: make(map[string]*EntityInstanceContext), } } -func (esh *EventSourcedHandler) registerEntity(ese *EventSourcedEntity) error { +func (esh *EventSourcedServer) registerEntity(ese *EventSourcedEntity) error { + if _, exists := esh.entities[ese.ServiceName]; exists { + return fmt.Errorf("EventSourcedEntity with service name: %s is already registered", ese.ServiceName) + } esh.entities[ese.ServiceName] = ese return nil } -// Handle -// from EventSourcedServer.Handle -// The stream. One stream will be established per active entity. +// Handle handles the stream. One stream will be established per active entity. // Once established, the first message sent will be Init, which contains the entity ID, and, // if the entity has previously persisted a snapshot, it will contain that snapshot. It will // then send zero to many event messages, one for each event previously persisted. The entity @@ -163,14 +140,14 @@ func (esh *EventSourcedHandler) registerEntity(ese *EventSourcedEntity) error { // message. The entity should reply in order, and any events that the entity requests to be // persisted the entity should handle itself, applying them to its own state, as if they had // arrived as events when the event stream was being replayed on load. -func (esh *EventSourcedHandler) Handle(server protocol.EventSourced_HandleServer) error { +func (esh *EventSourcedServer) Handle(stream protocol.EventSourced_HandleServer) error { var entityId string var failed error for { if failed != nil { return failed } - msg, recvErr := server.Recv() + msg, recvErr := stream.Recv() if recvErr == io.EOF { return nil } @@ -178,22 +155,22 @@ func (esh *EventSourcedHandler) Handle(server protocol.EventSourced_HandleServer return recvErr } if cmd := msg.GetCommand(); cmd != nil { - if err := esh.handleCommand(cmd, server); err != nil { + if err := esh.handleCommand(cmd, stream); err != nil { // TODO: in general, what happens with the stream here if an error happens? - failed = handleFailure(err, server, cmd.GetId()) + failed = handleFailure(err, stream, cmd.GetId()) } continue } if event := msg.GetEvent(); event != nil { // TODO spec: Why does command carry the entityId and an event not? if err := esh.handleEvent(entityId, event); err != nil { - failed = handleFailure(err, server, 0) + failed = handleFailure(err, stream, 0) } continue } if init := msg.GetInit(); init != nil { - if err := esh.handleInit(init, server); err != nil { - failed = handleFailure(err, server, 0) + if err := esh.handleInit(init); err != nil { + failed = handleFailure(err, stream, 0) } entityId = init.GetEntityId() continue @@ -201,23 +178,18 @@ func (esh *EventSourcedHandler) Handle(server protocol.EventSourced_HandleServer } } -func (esh *EventSourcedHandler) handleInit(init *protocol.EventSourcedInit, server protocol.EventSourced_HandleServer) error { +func (esh *EventSourcedServer) handleInit(init *protocol.EventSourcedInit) error { eid := init.GetEntityId() if _, present := esh.contexts[eid]; present { return NewFailureError("unable to server.Send") } entity := esh.entities[init.GetServiceName()] - if initializer, ok := entity.Entity.(EntityInitializer); ok { - instance := initializer.New() - esh.contexts[eid] = &EntityInstanceContext{ - EntityInstance: &EntityInstance{ - Instance: instance, - EventSourcedEntity: entity, - }, - active: true, - } - } else { - return fmt.Errorf("unable to handle init entity.Entity does not implement EntityInitializer") + esh.contexts[eid] = &EntityInstanceContext{ + EntityInstance: &EntityInstance{ + Instance: entity.EntityFunc(), + EventSourcedEntity: entity, + }, + active: true, } if err := esh.handleInitSnapshot(init); err != nil { @@ -227,7 +199,7 @@ func (esh *EventSourcedHandler) handleInit(init *protocol.EventSourcedInit, serv return nil } -func (esh *EventSourcedHandler) handleInitSnapshot(init *protocol.EventSourcedInit) error { +func (esh *EventSourcedServer) handleInitSnapshot(init *protocol.EventSourcedInit) error { if init.Snapshot == nil { return nil } @@ -249,7 +221,7 @@ func (esh *EventSourcedHandler) handleInitSnapshot(init *protocol.EventSourcedIn return nil } -func (EventSourcedHandler) unmarshalSnapshot(init *protocol.EventSourcedInit) (interface{}, error) { +func (EventSourcedServer) unmarshalSnapshot(init *protocol.EventSourcedInit) (interface{}, error) { // see: https://developers.google.com/protocol-buffers/docs/reference/csharp/class/google/protobuf/well-known-types/any#typeurl typeUrl := init.Snapshot.Snapshot.GetTypeUrl() if !strings.Contains(typeUrl, "://") { @@ -282,7 +254,7 @@ func (EventSourcedHandler) unmarshalSnapshot(init *protocol.EventSourcedInit) (i return nil, fmt.Errorf("unmarshalling snapshot failed with: no snapshot unmarshaller found for: %v", typeURL.String()) } -func (esh *EventSourcedHandler) subscribeEvents(instance *EntityInstance) { +func (esh *EventSourcedServer) subscribeEvents(instance *EntityInstance) { if emitter, ok := instance.Instance.(EventEmitter); ok { emitter.Subscribe(&Subscription{ OnNext: func(event interface{}) error { @@ -298,7 +270,7 @@ func (esh *EventSourcedHandler) subscribeEvents(instance *EntityInstance) { } } -func (esh *EventSourcedHandler) handleEvent(entityId string, event *protocol.EventSourcedEvent) error { +func (esh *EventSourcedServer) handleEvent(entityId string, event *protocol.EventSourcedEvent) error { if entityId == "" { return NewFailureError("no entityId was found from a previous init message for event sequence: %v", event.Sequence) } @@ -334,139 +306,76 @@ func (esh *EventSourcedHandler) handleEvent(entityId string, event *protocol.Eve // Beside calling the service method, we have to collect "events" the service might emit. // These events afterwards have to be handled by a EventHandler to update the state of the // entity. The Cloudstate proxy can re-play these events at any time -func (esh *EventSourcedHandler) handleCommand(cmd *protocol.Command, server protocol.EventSourced_HandleServer) error { - // method to call - method, err := esh.methodToCall(cmd) - if err != nil { - return NewProtocolFailure(protocol.Failure{ - CommandId: cmd.GetId(), - Description: err.Error(), - }) - } - entityContext := esh.contexts[cmd.GetEntityId()] - // build the input arguments for the method we're about to call - inputs, err := esh.buildInputs(entityContext, method, cmd, server.Context()) - if err != nil { - return NewProtocolFailure(protocol.Failure{ - CommandId: cmd.GetId(), - Description: err.Error(), - }) - } - // call it - called := method.Func.Call(inputs) - // The gRPC implementation returns the rpc return method - // and an error as a second return value. - errReturned := called[1] - if errReturned.CanInterface() && errReturned.Interface() != nil && errReturned.Type().Name() == "error" { - // TCK says: TODO Expects entity.Failure, but gets lientAction.Action.Failure(Failure(commandId, msg))) - return NewProtocolFailure(protocol.Failure{ - CommandId: cmd.GetId(), - Description: errReturned.Interface().(error).Error(), - }) - } - // the reply - callReply, err := marshalAny(called[0].Interface()) - if err != nil { // this should never happen - return NewProtocolFailure(protocol.Failure{ - CommandId: cmd.GetId(), - Description: fmt.Errorf("called return value at index 0 is no proto.Message. %w", err).Error(), - }) - } - // emitted events - events, err := marshalEventsAny(entityContext) - if err != nil { - return NewProtocolFailure(protocol.Failure{ - CommandId: cmd.GetId(), - Description: err.Error(), - }) +func (esh *EventSourcedServer) handleCommand(cmd *protocol.Command, server protocol.EventSourced_HandleServer) error { + msgName := strings.TrimPrefix(cmd.Payload.GetTypeUrl(), protoAnyBase+"/") + messageType := proto.MessageType(msgName) + if messageType.Kind() != reflect.Ptr { + return fmt.Errorf("messageType: %s is of non Ptr kind", messageType) } - // snapshot - snapshot, err := esh.handleSnapshots(entityContext) - if err != nil { - return NewProtocolFailure(protocol.Failure{ - CommandId: cmd.GetId(), - Description: err.Error(), - }) - } - return sendEventSourcedReply(&protocol.EventSourcedReply{ - CommandId: cmd.GetId(), - ClientAction: &protocol.ClientAction{ - Action: &protocol.ClientAction_Reply{ - Reply: &protocol.Reply{ - Payload: callReply, - }, - }, - }, - Events: events, - Snapshot: snapshot, - }, server) -} - -func (*EventSourcedHandler) buildInputs(entityContext *EntityInstanceContext, method reflect.Method, cmd *protocol.Command, ctx context.Context) ([]reflect.Value, error) { - inputs := make([]reflect.Value, method.Type.NumIn()) - inputs[0] = reflect.ValueOf(entityContext.EntityInstance.Instance) - inputs[1] = reflect.ValueOf(ctx) - // create a zero-value for the type of the message we call the method with - arg1 := method.Type.In(2) - ptr := false - for arg1.Kind() == reflect.Ptr { - ptr = true - arg1 = arg1.Elem() - } - var msg proto.Message - if ptr { - msg = reflect.New(arg1).Interface().(proto.Message) - } else { - msg = reflect.Zero(arg1).Interface().(proto.Message) - } - if err := proto.Unmarshal(cmd.GetPayload().GetValue(), msg); err != nil { - return nil, fmt.Errorf("failed to unmarshal: %w", err) - } - inputs[2] = reflect.ValueOf(msg) - return inputs, nil -} - -func (esh *EventSourcedHandler) methodToCall(cmd *protocol.Command) (reflect.Method, error) { - entityContext := esh.contexts[cmd.GetEntityId()] - cacheKey := entityContext.ServiceName() + cmd.Name - method, hit := esh.cmdMethodCache[cacheKey] - // as measured this cache saves us about 75% of a call - // to be prepared with 4.4µs vs. 17.6µs where a typical - // call by reflection like GetCart() with Func.Call() - // takes ~10µs and to get return values processed somewhere 0.7µs. - if !hit { - entityValue := reflect.ValueOf(entityContext.EntityInstance.Instance) - // entities implement the proxied grpc service - // we try to find the method we're called by name with the - // received command. - methodByName := entityValue.MethodByName(cmd.Name) - if !methodByName.IsValid() { - entity := esh.entities[entityContext.ServiceName()] - return reflect.Method{}, fmt.Errorf("no method named: %s found for: %v", cmd.Name, entity) - } - // gRPC services are unary rpc methods, always. - // They have one message in and one message out. - if err := checkUnary(methodByName); err != nil { - return reflect.Method{}, err - } - // The first argument in the gRPC implementation - // is always a context.Context. - methodArg0Type := methodByName.Type().In(0) - contextType := reflect.TypeOf(context.Background()) - if !contextType.Implements(methodArg0Type) { - return reflect.Method{}, fmt.Errorf( - "first argument for method: %s is not of type: %s", - methodByName.String(), contextType.Name(), - ) + // get a zero-ed message of this type + if message, ok := reflect.New(messageType.Elem()).Interface().(proto.Message); ok { + // and marshal onto it what we got as an any.Any onto it + err := proto.Unmarshal(cmd.Payload.Value, message) + if err != nil { + return fmt.Errorf("%s, %w", err, ErrMarshal) + } else { + // we're ready to handle the proto message + entityContext := esh.contexts[cmd.GetEntityId()] + if commandHandler, ok := entityContext.EntityInstance.Instance.(CommandHandler); ok { + // The gRPC implementation returns the rpc return method + // and an error as a second return value. + _, reply, errReturned := commandHandler.HandleCommand(server.Context(), message) + // the error + if errReturned != nil { + // TCK says: TODO Expects entity.Failure, but gets lientAction.Action.Failure(Failure(commandId, msg))) + return NewProtocolFailure(protocol.Failure{ + CommandId: cmd.GetId(), + Description: errReturned.Error(), + }) + } + // the reply + callReply, err := marshalAny(reply) + if err != nil { // this should never happen + return NewProtocolFailure(protocol.Failure{ + CommandId: cmd.GetId(), + Description: fmt.Errorf("called return value at index 0 is no proto.Message. %w", err).Error(), + }) + } + // emitted events + events, err := marshalEventsAny(entityContext) + if err != nil { + return NewProtocolFailure(protocol.Failure{ + CommandId: cmd.GetId(), + Description: err.Error(), + }) + } + // snapshot + snapshot, err := esh.handleSnapshots(entityContext) + if err != nil { + return NewProtocolFailure(protocol.Failure{ + CommandId: cmd.GetId(), + Description: err.Error(), + }) + } + return sendEventSourcedReply(&protocol.EventSourcedReply{ + CommandId: cmd.GetId(), + ClientAction: &protocol.ClientAction{ + Action: &protocol.ClientAction_Reply{ + Reply: &protocol.Reply{ + Payload: callReply, + }, + }, + }, + Events: events, + Snapshot: snapshot, + }, server) + } } - // we'll find one for sure as we found one on the entityValue - method, _ = reflect.TypeOf(entityContext.EntityInstance.Instance).MethodByName(cmd.Name) - esh.cmdMethodCache[cacheKey] = method } - return method, nil + return nil } -func (*EventSourcedHandler) handleSnapshots(entityContext *EntityInstanceContext) (*any.Any, error) { +func (*EventSourcedServer) handleSnapshots(entityContext *EntityInstanceContext) (*any.Any, error) { if !entityContext.EntityInstance.shouldSnapshot() { return nil, nil } @@ -488,15 +397,8 @@ func (*EventSourcedHandler) handleSnapshots(entityContext *EntityInstanceContext return nil, nil } -func checkUnary(methodByName reflect.Value) error { - if methodByName.Type().NumIn() != 2 { - return NewFailureError("method: %s is no unary method", methodByName.String()) - } - return nil -} - // applyEvent applies an event to a local entity -func (esh EventSourcedHandler) applyEvent(entityInstance *EntityInstance, event interface{}) error { +func (esh EventSourcedServer) applyEvent(entityInstance *EntityInstance, event interface{}) error { payload, err := marshalAny(event) if err != nil { return err @@ -504,14 +406,7 @@ func (esh EventSourcedHandler) applyEvent(entityInstance *EntityInstance, event return esh.handleEvents(entityInstance, &protocol.EventSourcedEvent{Payload: payload}) } -// handleEvents handles a list of events encoded as protobuf Any messages. -// -// Event sourced entities persist events and snapshots, and these need to be -// serialized when persisted. The most straight forward way to persist events -// and snapshots is to use protobufs. Cloudstate will automatically detect if -// an emitted event is a protobuf, and serialize it as such. For other -// serialization options, including JSON, see Serialization. -func (EventSourcedHandler) handleEvents(entityInstance *EntityInstance, events ...*protocol.EventSourcedEvent) error { +func (EventSourcedServer) handleEvents(entityInstance *EntityInstance, events ...*protocol.EventSourcedEvent) error { eventHandler, implementsEventHandler := entityInstance.Instance.(EventHandler) for _, event := range events { // TODO: here's the point where events can be protobufs, serialized as json or other formats @@ -528,35 +423,12 @@ func (EventSourcedHandler) handleEvents(entityInstance *EntityInstance, events . } else { // we're ready to handle the proto message // and we might have a handler - handled := false if implementsEventHandler { - handled, err = eventHandler.HandleEvent(message) + _, err = eventHandler.HandleEvent(context.Background(), message) // TODO: propagate ctx from callee if err != nil { return err // FIXME/TODO: is this correct? if we fail here, nothing is safe afterwards. } } - // if not, we try to find one - // currently we support a method that has one argument that equals - // to the type of the message received. - if !handled { - // find a concrete handling method - entityValue := reflect.ValueOf(entityInstance.Instance) - entityType := entityValue.Type() - for n := 0; n < entityType.NumMethod(); n++ { - method := entityType.Method(n) - // we expect one argument for now, the domain message - // the first argument is the receiver itself - if method.Func.Type().NumIn() == 2 { - argumentType := method.Func.Type().In(1) - if argumentType.AssignableTo(messageType) { - entityValue.MethodByName(method.Name).Call([]reflect.Value{reflect.ValueOf(message)}) - } - } else { - // we have not found a one-argument method matching the events type as an argument - // TODO: what to do here? we might support more variations of possible handlers we can detect - } - } - } } } } // TODO: what do we do if we haven't handled the events? diff --git a/docs/src/main/paradox/user/lang/go/src/shoppingcart.go b/docs/src/main/paradox/user/lang/go/src/shoppingcart.go index a8a64a907..5f3183cee 100644 --- a/docs/src/main/paradox/user/lang/go/src/shoppingcart.go +++ b/docs/src/main/paradox/user/lang/go/src/shoppingcart.go @@ -20,26 +20,28 @@ import ( "context" "errors" "fmt" + "log" + "github.com/cloudstateio/go-support/cloudstate" "github.com/cloudstateio/go-support/tck/shoppingcart" domain "github.com/cloudstateio/go-support/tck/shoppingcart/persistence" "github.com/golang/protobuf/ptypes/empty" - "log" ) -// main creates a Cloudstate instance and registers the ShoppingCart +// main creates a CloudState instance and registers the ShoppingCart // as a event sourced entity. //#shopping-cart-main func main() { - cloudState := cloudstate.New(cloudstate.Options{ + server, err := cloudstate.New(cloudstate.Options{ ServiceName: "shopping-cart", ServiceVersion: "0.1.0", }) //#register - err := cloudState.Register( + err := server.Register( &cloudstate.EventSourcedEntity{ - Entity: (*ShoppingCart)(nil), - ServiceName: "com.example.shoppingcart.ShoppingCart", + ServiceName: "com.example.shoppingcart.ShoppingCart", + PersistenceID: "ShoppingCart", + EntityFunc: NewShoppingCart, }, cloudstate.DescriptorConfig{ Service: "shoppingcart/shoppingcart.proto", @@ -47,11 +49,11 @@ func main() { ) //#register if err != nil { - log.Fatalf("Cloudstate failed to register entity: %v", err) + log.Fatalf("CloudState failed to register entity: %v", err) } - err = cloudState.Run() + err = server.Run() if err != nil { - log.Fatalf("Cloudstate failed to run: %v", err) + log.Fatalf("CloudState failed to run: %v", err) } } //#shopping-cart-main @@ -67,29 +69,22 @@ type ShoppingCart struct { // as an Emitter we can emit events cloudstate.EventEmitter } +//#compose-entity //#entity-type -// New implements EntityInitializer and returns a new -// and initialized instance of the ShoppingCart entity. +// NewShoppingCart returns a new and initialized instance of the ShoppingCart entity. //#constructing -func (sc ShoppingCart) New() interface{} { - return NewShoppingCart() -} -//#constructing - -// NewShoppingCart returns a new and initialized -// instance of the ShoppingCart entity. -func NewShoppingCart() *ShoppingCart { +func NewShoppingCart() cloudstate.Entity { return &ShoppingCart{ cart: make([]*domain.LineItem, 0), - EventEmitter: cloudstate.NewEmitter(), // TODO: the EventEmitter could be provided by the event sourced handler + EventEmitter: cloudstate.NewEmitter(), } } -//#compose-entity +//#constructing // ItemAdded is a event handler function for the ItemAdded event. //#item-added -func (sc *ShoppingCart) ItemAdded(added *domain.ItemAdded) error { // TODO: enable handling for values +func (sc *ShoppingCart) ItemAdded(added *domain.ItemAdded) error { if item, _ := sc.find(added.Item.ProductId); item != nil { item.Quantity += added.Item.Quantity } else { @@ -116,20 +111,22 @@ func (sc *ShoppingCart) ItemRemoved(removed *domain.ItemRemoved) error { // // returns handle set to true if we have handled the event // and any error that happened during the handling -func (sc *ShoppingCart) HandleEvent(event interface{}) (handled bool, err error) { +//#handle-event +func (sc *ShoppingCart) HandleEvent(_ context.Context, event interface{}) (handled bool, err error) { switch e := event.(type) { case *domain.ItemAdded: return true, sc.ItemAdded(e) - //case *domain.ItemRemoved: - // *domain.ItemRemoved is handled by reflection + case *domain.ItemRemoved: + return true, sc.ItemRemoved(e) default: return false, nil } } +//#handle-event // AddItem implements the AddItem command handling of the shopping cart service. //#add-item -func (sc *ShoppingCart) AddItem(c context.Context, li *shoppingcart.AddLineItem) (*empty.Empty, error) { +func (sc *ShoppingCart) AddItem(_ context.Context, li *shoppingcart.AddLineItem) (*empty.Empty, error) { if li.GetQuantity() <= 0 { return nil, fmt.Errorf("cannot add negative quantity of to item %s", li.GetProductId()) } @@ -144,7 +141,7 @@ func (sc *ShoppingCart) AddItem(c context.Context, li *shoppingcart.AddLineItem) //#add-item // RemoveItem implements the RemoveItem command handling of the shopping cart service. -func (sc *ShoppingCart) RemoveItem(c context.Context, li *shoppingcart.RemoveLineItem) (*empty.Empty, error) { +func (sc *ShoppingCart) RemoveItem(_ context.Context, li *shoppingcart.RemoveLineItem) (*empty.Empty, error) { if item, _ := sc.find(li.GetProductId()); item == nil { return nil, fmt.Errorf("cannot remove item %s because it is not in the cart", li.GetProductId()) } @@ -154,7 +151,7 @@ func (sc *ShoppingCart) RemoveItem(c context.Context, li *shoppingcart.RemoveLin // GetCart implements the GetCart command handling of the shopping cart service. //#get-cart -func (sc *ShoppingCart) GetCart(c context.Context, _ *shoppingcart.GetShoppingCart) (*shoppingcart.Cart, error) { +func (sc *ShoppingCart) GetCart(_ context.Context, _ *shoppingcart.GetShoppingCart) (*shoppingcart.Cart, error) { cart := &shoppingcart.Cart{} for _, item := range sc.cart { cart.Items = append(cart.Items, &shoppingcart.LineItem{ @@ -167,12 +164,33 @@ func (sc *ShoppingCart) GetCart(c context.Context, _ *shoppingcart.GetShoppingCa } //#get-cart +//#handle-command +func (sc *ShoppingCart) HandleCommand(ctx context.Context, command interface{}) (handled bool, reply interface{}, err error) { + switch cmd := command.(type) { + case *shoppingcart.GetShoppingCart: + reply, err := sc.GetCart(ctx, cmd) + return true, reply, err + case *shoppingcart.RemoveLineItem: + reply, err := sc.RemoveItem(ctx, cmd) + return true, reply, err + case *shoppingcart.AddLineItem: + reply, err := sc.AddItem(ctx, cmd) + return true, reply, err + default: + return false, reply, err + } +} +//#handle-command + +//#snapshotter func (sc *ShoppingCart) Snapshot() (snapshot interface{}, err error) { return domain.Cart{ Items: append(make([]*domain.LineItem, len(sc.cart)), sc.cart...), }, nil } +//#snapshotter +//#handle-snapshot func (sc *ShoppingCart) HandleSnapshot(snapshot interface{}) (handled bool, err error) { switch value := snapshot.(type) { case domain.Cart: @@ -182,6 +200,7 @@ func (sc *ShoppingCart) HandleSnapshot(snapshot interface{}) (handled bool, err return false, nil } } +//#handle-snapshot // find finds a product in the shopping cart by productId and returns it as a LineItem. func (sc *ShoppingCart) find(productId string) (item *domain.LineItem, index int) { From cf5eea5be527dfc652d4d11f4537f3b8c98c8946 Mon Sep 17 00:00:00 2001 From: Marcel Lanz Date: Thu, 24 Oct 2019 15:07:00 +0200 Subject: [PATCH 7/7] [PR-review] cross-PR review changes from go-support PR #10 --- docs/src/main/paradox/user/lang/go/src/shoppingcart.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/src/main/paradox/user/lang/go/src/shoppingcart.go b/docs/src/main/paradox/user/lang/go/src/shoppingcart.go index 5f3183cee..4db46c1bd 100644 --- a/docs/src/main/paradox/user/lang/go/src/shoppingcart.go +++ b/docs/src/main/paradox/user/lang/go/src/shoppingcart.go @@ -32,7 +32,7 @@ import ( // as a event sourced entity. //#shopping-cart-main func main() { - server, err := cloudstate.New(cloudstate.Options{ + server, err := cloudstate.New(cloudstate.Config{ ServiceName: "shopping-cart", ServiceVersion: "0.1.0", })