-
Notifications
You must be signed in to change notification settings - Fork 10
Go support for CloudState User Functions #5
Changes from all commits
52dd4dd
ea5217b
a69533c
cb84071
82e554f
83b5294
5365861
a84b2ee
a8fdc37
d234659
801f67c
14373da
accb0f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| language: go | ||
| os: | ||
| - linux | ||
| - osx | ||
| - windows | ||
| go: | ||
| - 1.13.x | ||
| before_install: | ||
| - go get -t -v ./... | ||
| script: | ||
| - go test -v -race -coverprofile=coverage.txt -covermode=atomic -bench=. ./... | ||
|
marcellanz marked this conversation as resolved.
|
||
| after_success: | ||
| - if [ "$TRAVIS_OS_NAME" = "linux" ]; then bash <(curl -s https://codecov.io/bash); fi | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,2 +1,8 @@ | ||
| # go-support | ||
| GoLang support for [CloudState](https://github.com/cloudstateio/cloudstate) | ||
| # CloudState stateful service support in Go | ||
| [](https://travis-ci.com/marcellanz/go-support) | ||
| [](https://codecov.io/gh/marcellanz/go-support) | ||
| [](https://godoc.org/github.com/marcellanz/go-support) | ||
|
|
||
| This package provides support for writing [CloudState](https://github.com/cloudstateio/cloudstate) stateful functions in Go. | ||
|
|
||
| For more information see https://cloudstate.io. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| FROM golang:1.13.1-alpine3.10 | ||
|
|
||
| RUN apk --no-cache add git | ||
|
|
||
| WORKDIR /go/src/app | ||
| COPY . . | ||
|
|
||
| # -race and therefore CGO needs gcc, we don't want it to have in our build | ||
| RUN CGO_ENABLED=0 go build -v -o tck_shoppingcart ./tck/cmd/tck_shoppingcart | ||
| RUN go install -v ./... | ||
|
|
||
| # multistage – copy over the binary | ||
| FROM alpine:latest | ||
| RUN apk --no-cache add ca-certificates | ||
|
|
||
| WORKDIR /root/ | ||
| COPY --from=0 /go/bin/tck_shoppingcart . | ||
|
|
||
| EXPOSE 8080 | ||
| ENV HOST 0.0.0.0 | ||
| ENV PORT 8080 | ||
|
|
||
| CMD ["./tck_shoppingcart"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| #!/usr/bin/env bash | ||
|
|
||
| set -o nounset | ||
| set -o errexit | ||
| set -o pipefail | ||
|
|
||
| docker build -t gcr.io/mrcllnz/cloudstate-go-tck:latest -f ./build/TCK.Dockerfile . | ||
| docker push gcr.io/mrcllnz/cloudstate-go-tck:latest |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| #!/usr/bin/env bash | ||
|
|
||
| set -o nounset | ||
| set -o errexit | ||
| set -o pipefail | ||
|
|
||
| # CloudState protocol | ||
| protoc --go_out=plugins=grpc,paths=source_relative:. --proto_path=protobuf/frontend/ protobuf/frontend/cloudstate/entity_key.proto | ||
| protoc --go_out=plugins=grpc:. --proto_path=protobuf/protocol/ protobuf/protocol/cloudstate/entity.proto | ||
| protoc --go_out=plugins=grpc:. --proto_path=protobuf/protocol/ protobuf/protocol/cloudstate/event_sourced.proto | ||
| protoc --go_out=plugins=grpc:. --proto_path=protobuf/protocol/ protobuf/protocol/cloudstate/function.proto | ||
|
|
||
| # TCK shopping cart sample | ||
| protoc --go_out=plugins=grpc:. --proto_path=protobuf/protocol/ --proto_path=protobuf/frontend/ --proto_path=protobuf/proxy/ --proto_path=protobuf/example/ protobuf/example/shoppingcart/shoppingcart.proto | ||
| protoc --go_out=plugins=grpc,paths=source_relative:tck/shoppingcart/persistence --proto_path=protobuf/protocol/ --proto_path=protobuf/frontend/ --proto_path=protobuf/proxy/ --proto_path=protobuf/example/shoppingcart/persistence/ protobuf/example/shoppingcart/persistence/domain.proto |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| #!/usr/bin/env bash | ||
|
|
||
| set -o nounset | ||
| set -o errexit | ||
| set -o pipefail | ||
|
|
||
| function fetch() { | ||
| local path=$1 | ||
| local tag=$2 | ||
| mkdir -p "protobuf/$(dirname $path)" | ||
| curl -o "protobuf/${path}" "https://raw.githubusercontent.com/cloudstateio/cloudstate/${tag}/protocols/${path}" | ||
| } | ||
|
|
||
| tag=$1 | ||
|
|
||
| # CloudState protocol | ||
| fetch "protocol/cloudstate/entity.proto" "${tag}" | ||
| fetch "protocol/cloudstate/event_sourced.proto" "${tag}" | ||
| fetch "protocol/cloudstate/function.proto" "${tag}" | ||
| fetch "protocol/cloudstate/crdt.proto" "${tag}" | ||
|
|
||
| # TCK shopping cart example | ||
| fetch "example/shoppingcart/shoppingcart.proto" "${tag}" | ||
| fetch "example/shoppingcart/persistence/domain.proto" "${tag}" | ||
|
|
||
| # CloudState frontend | ||
| fetch "frontend/cloudstate/entity_key.proto" "${tag}" | ||
|
|
||
| # dependencies | ||
| fetch "proxy/grpc/reflection/v1alpha/reflection.proto" "${tag}" | ||
| fetch "frontend/google/api/annotations.proto" "${tag}" | ||
| fetch "frontend/google/api/http.proto" "${tag}" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,245 @@ | ||
| // | ||
| // Copyright 2019 Lightbend Inc. | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package cloudstate | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "log" | ||
| "net" | ||
| "os" | ||
| "runtime" | ||
|
|
||
| "github.com/cloudstateio/go-support/cloudstate/protocol" | ||
| "github.com/golang/protobuf/descriptor" | ||
| "github.com/golang/protobuf/proto" | ||
| filedescr "github.com/golang/protobuf/protoc-gen-go/descriptor" | ||
| "github.com/golang/protobuf/ptypes/empty" | ||
| "google.golang.org/grpc" | ||
| ) | ||
|
marcellanz marked this conversation as resolved.
|
||
|
|
||
| const ( | ||
| SupportLibraryVersion = "0.1.0" | ||
| SupportLibraryName = "cloudstate-go-support" | ||
| ) | ||
|
|
||
| // CloudState is an instance of a CloudState User Function | ||
| type CloudState struct { | ||
| server *grpc.Server | ||
| entityDiscoveryServer *EntityDiscoveryServer | ||
| eventSourcedServer *EventSourcedServer | ||
| } | ||
|
|
||
| // New returns a new CloudState instance. | ||
| func New(options Options) (*CloudState, error) { | ||
| cs := &CloudState{ | ||
| server: grpc.NewServer(), | ||
| entityDiscoveryServer: newEntityDiscoveryResponder(options), | ||
| eventSourcedServer: newEventSourcedServer(), | ||
| } | ||
| protocol.RegisterEntityDiscoveryServer(cs.server, cs.entityDiscoveryServer) | ||
| protocol.RegisterEventSourcedServer(cs.server, cs.eventSourcedServer) | ||
| return cs, nil | ||
| } | ||
|
|
||
| // Options go get a CloudState instance configured. | ||
| type Options struct { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's more typical to call this struct
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Agreed.
Not according to the protocol defined in cloudstate/entity.proto. I followed the requirements there. But even if some of them are optional and some mandatory, naming it Config makes sense, right?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah interesting. I thought the service_name was required, at least. Since this is used by the proxy to forward commands/events to other services, right? |
||
| ServiceName string | ||
| ServiceVersion string | ||
| } | ||
|
|
||
| // DescriptorConfig configures service and dependent descriptors. | ||
| type DescriptorConfig struct { | ||
| Service string | ||
| ServiceMsg descriptor.Message | ||
| Domain []string | ||
| DomainMessages []descriptor.Message | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you comment these fields? It's not clear to me at first reading what they should be set to. In the example it only sets There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we replace this struct and implementation with invoking
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Good point and also one I iterated for some time. This type is used with For the Service, like in the Java Implementation, I would preferably use a type safe reference to a (Service)Descriptor. The Go gRPC implementation does not export them unfortunately. So to get the Service and any other dependent descriptors the user either provides their names or any of the messages used for the service or those dependent messages. For the shopping cart, this would be one of the messages used like The I repeat here how that would look like: Cart Do you see any other ways to configure a gRPC Service and their dependent descriptors @jsravn ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I found https://github.com/jhump/protoreflect, which seems to support retrieving the descriptors from the linked in pb.go files. Although I haven't tested it myself. It also has logic for loading directly from proto files, which we could use instead of implementing it ourselves. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have seen protoreflect and I had the impression I could not get the descriptor this way. It also seems quite a big dependency to depend on tho. |
||
|
|
||
| func (dc DescriptorConfig) AddDomainMessage(m descriptor.Message) DescriptorConfig { | ||
| dc.DomainMessages = append(dc.DomainMessages, m) | ||
| return dc | ||
| } | ||
|
|
||
| func (dc DescriptorConfig) AddDomainDescriptor(filename string) DescriptorConfig { | ||
| dc.Domain = append(dc.Domain, filename) | ||
| return dc | ||
| } | ||
|
|
||
| // RegisterEventSourcedEntity registers an event sourced entity for CloudState. | ||
| func (cs *CloudState) RegisterEventSourcedEntity(ese *EventSourcedEntity, config DescriptorConfig) (err error) { | ||
| ese.registerOnce.Do(func() { | ||
| if err = ese.initZeroValue(); err != nil { | ||
| return | ||
| } | ||
| if err = cs.eventSourcedServer.registerEntity(ese); err != nil { | ||
| return | ||
| } | ||
| if err = cs.entityDiscoveryServer.registerEntity(ese, config); err != nil { | ||
| return | ||
| } | ||
| }) | ||
| return | ||
| } | ||
|
|
||
| // Run runs the CloudState instance. | ||
| func (cs *CloudState) Run() error { | ||
| host, ok := os.LookupEnv("HOST") | ||
| if !ok { | ||
| return fmt.Errorf("unable to get environment variable \"HOST\"") | ||
| } | ||
| port, ok := os.LookupEnv("PORT") | ||
| if !ok { | ||
| return fmt.Errorf("unable to get environment variable \"PORT\"") | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Host and port should be set when creating the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was guided here by
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a good question. I raised this on gitter as well just now. I think a library should just have a single well defined API for applications to interact with. If we have an implicit API via something like environment variables, it can get quite confusing what's going on. I think only the application (whatever owns There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might also be a golang distinction from Java? I think it's more typical to handle all your command line options/env inside main.go, versus loading these environment things elsewhere in the codebase.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Whereas the user function support library acts as a library or framework, I had the impression to be more on the side of a framework where the API surface is quite limited or reduced so to say and user functions can be implemented without too much ceremony. Having it mentioned in the chat, James or Viktor might pick up on this too as it was not yet discussed yet explicitly. As we have now these questions where it could be a distinction between Java and Go or any other implementation, it seems it raises perhaps a question that is not answered by the expectations CloudState would have off of a support library and should be defined by it. Otherwise, I totally agree with your point of having the owner of main.go the sole authority to decide how environment or any other configuration that lets bootstrap the binary. |
||
| lis, err := net.Listen("tcp", fmt.Sprintf("%s:%s", host, port)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see comment above for line 111 |
||
| if err != nil { | ||
| return fmt.Errorf("failed to listen: %v", err) | ||
| } | ||
| if e := cs.server.Serve(lis); e != nil { | ||
| return fmt.Errorf("failed to grpcServer.Serve for: %v", lis) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // EntityDiscoveryServer implements the CloudState discovery protocol. | ||
| type EntityDiscoveryServer struct { | ||
| fileDescriptorSet *filedescr.FileDescriptorSet | ||
| entitySpec *protocol.EntitySpec | ||
| message *descriptor.Message | ||
| } | ||
|
|
||
| // newEntityDiscoveryResponder returns a new and initialized EntityDiscoveryServer. | ||
| func newEntityDiscoveryResponder(options Options) *EntityDiscoveryServer { | ||
| responder := &EntityDiscoveryServer{} | ||
| responder.entitySpec = &protocol.EntitySpec{ | ||
| Entities: make([]*protocol.Entity, 0), | ||
| ServiceInfo: &protocol.ServiceInfo{ | ||
| ServiceName: options.ServiceName, | ||
| ServiceVersion: options.ServiceVersion, | ||
| ServiceRuntime: fmt.Sprintf("%s %s/%s", runtime.Version(), runtime.GOOS, runtime.GOARCH), | ||
| SupportLibraryName: SupportLibraryName, | ||
| SupportLibraryVersion: SupportLibraryVersion, | ||
| }, | ||
| } | ||
| responder.fileDescriptorSet = &filedescr.FileDescriptorSet{ | ||
| File: make([]*filedescr.FileDescriptorProto, 0), | ||
| } | ||
| return responder | ||
| } | ||
|
|
||
| // Discover returns an entity spec for | ||
| func (r *EntityDiscoveryServer) Discover(c context.Context, pi *protocol.ProxyInfo) (*protocol.EntitySpec, error) { | ||
| log.Printf("Received discovery call from sidecar [%s w%s] supporting CloudState %v.%v\n", | ||
| pi.ProxyName, | ||
| pi.ProxyVersion, | ||
| pi.ProtocolMajorVersion, | ||
| pi.ProtocolMinorVersion, | ||
| ) | ||
| for _, filename := range []string{ | ||
| "google/protobuf/empty.proto", | ||
| "google/protobuf/any.proto", | ||
| "google/protobuf/descriptor.proto", | ||
| "google/api/annotations.proto", | ||
| "google/api/http.proto", | ||
| "cloudstate/event_sourced.proto", | ||
| "cloudstate/entity.proto", | ||
| "cloudstate/entity_key.proto", | ||
| } { | ||
| if err := r.registerFileDescriptorProto(filename); err != nil { | ||
| return nil, err | ||
| } | ||
|
marcellanz marked this conversation as resolved.
|
||
| } | ||
| log.Printf("Responding with: %v\n", r.entitySpec.GetServiceInfo()) | ||
| return r.entitySpec, nil | ||
| } | ||
|
|
||
| // ReportError logs any user function error reported by the CloudState proxy. | ||
| func (r *EntityDiscoveryServer) ReportError(c context.Context, fe *protocol.UserFunctionError) (*empty.Empty, error) { | ||
| log.Printf("ReportError: %v\n", fe) | ||
|
marcellanz marked this conversation as resolved.
|
||
| return &empty.Empty{}, nil | ||
| } | ||
|
|
||
| func (r *EntityDiscoveryServer) updateSpec() (err error) { | ||
| protoBytes, err := proto.Marshal(r.fileDescriptorSet) | ||
| if err != nil { | ||
| return errors.New("unable to Marshal FileDescriptorSet") | ||
| } | ||
| r.entitySpec.Proto = protoBytes | ||
| return nil | ||
| } | ||
|
|
||
| func (r *EntityDiscoveryServer) resolveFileDescriptors(dc DescriptorConfig) error { | ||
| // service | ||
| if dc.Service != "" { | ||
| if err := r.registerFileDescriptorProto(dc.Service); err != nil { | ||
| return err | ||
| } | ||
| } else { | ||
| if dc.ServiceMsg != nil { | ||
| if err := r.registerFileDescriptor(dc.ServiceMsg); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| } | ||
| // and dependent domain descriptors | ||
| for _, dp := range dc.Domain { | ||
| if err := r.registerFileDescriptorProto(dp); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| for _, dm := range dc.DomainMessages { | ||
| if err := r.registerFileDescriptor(dm); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (r *EntityDiscoveryServer) registerEntity(e *EventSourcedEntity, config DescriptorConfig) error { | ||
| if err := r.resolveFileDescriptors(config); err != nil { | ||
| return fmt.Errorf("failed to resolveFileDescriptor for DescriptorConfig: %+v: %w", config, err) | ||
| } | ||
| persistenceID := e.entityName | ||
|
marcellanz marked this conversation as resolved.
|
||
| if e.PersistenceID != "" { | ||
| persistenceID = e.PersistenceID | ||
| } | ||
| r.entitySpec.Entities = append(r.entitySpec.Entities, &protocol.Entity{ | ||
| EntityType: EventSourced, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any way to get the value of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can't. Like for the other comments about a type safe way to get the service descriptor, we can't (see below the example for _EventSourced_serviceDesc). I explained this also in the then Draft - PR: cloudstateio/cloudstate#103 (comment) on how to access this information. The comment also mentions a few partly open Issues on the gRPC project for Go. event_sourced.pb.go @jsravn do you see any other way to do that? Tho, I've seen the https://github.com/gogo/protobuf project and that it is more open in this regard, but I have not tried it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think gogo is only protobuf 2? It's been a while since I looked at it. It does seem like golang protobuf is fairly anemic, unfortunately. But I think we can do this with protoreflect: |
||
| ServiceName: e.ServiceName, | ||
| PersistenceId: persistenceID, | ||
| }) | ||
| return r.updateSpec() | ||
| } | ||
|
|
||
| func (r *EntityDiscoveryServer) registerFileDescriptorProto(filename string) error { | ||
| descriptorProto, err := unpackFile(proto.FileDescriptor(filename)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my prior message regarding
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we don't. Generated .pb.go files register their FileDescriptors globally through See above my comment about how to resolve dependent descriptors tho. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, didn't realize that - I thought it was loading from the filesystem. I think https://github.com/jhump/protoreflect could simplify things a bit though. |
||
| if err != nil { | ||
| return fmt.Errorf("failed to registerFileDescriptorProto for filename: %s: %w", filename, err) | ||
| } | ||
| r.fileDescriptorSet.File = append(r.fileDescriptorSet.File, descriptorProto) | ||
| return r.updateSpec() | ||
| } | ||
|
|
||
| func (r *EntityDiscoveryServer) registerFileDescriptor(msg descriptor.Message) error { | ||
| fd, _ := descriptor.ForMessage(msg) // this can panic | ||
| if r := recover(); r != nil { | ||
| return fmt.Errorf("descriptor.ForMessage panicked (%v) for: %+v", r, msg) | ||
| } | ||
| r.fileDescriptorSet.File = append(r.fileDescriptorSet.File, fd) | ||
| return nil | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.