diff --git a/.gitignore b/.gitignore index 6062401c1..801a033b0 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,55 @@ Session.vim # auto-generated tag files tags +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff: +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/dictionaries + +# Sensitive or high-churn files: +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.xml +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml + +# Gradle: +.idea/**/gradle.xml +.idea/**/libraries + +# CMake +cmake-build-debug/ + +# Mongo Explorer plugin: +.idea/**/mongoSettings.xml + +## File-based project format: +*.iws + +## Plugin-specific files: + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +.idea/ \ No newline at end of file diff --git a/examples/stringsvc4/instrumenting.go b/examples/stringsvc4/instrumenting.go new file mode 100644 index 000000000..675617d9c --- /dev/null +++ b/examples/stringsvc4/instrumenting.go @@ -0,0 +1,38 @@ +package main + +import ( + "fmt" + "time" + + "github.com/go-kit/kit/metrics" +) + +type instrumentingMiddleware struct { + requestCount metrics.Counter + requestLatency metrics.Histogram + countResult metrics.Histogram + next StringService +} + +func (mw instrumentingMiddleware) Uppercase(s string) (output string, err error) { + defer func(begin time.Time) { + lvs := []string{"method", "uppercase", "error", fmt.Sprint(err != nil)} + mw.requestCount.With(lvs...).Add(1) + mw.requestLatency.With(lvs...).Observe(time.Since(begin).Seconds()) + }(time.Now()) + + output, err = mw.next.Uppercase(s) + return +} + +func (mw instrumentingMiddleware) Count(s string) (n int) { + defer func(begin time.Time) { + lvs := []string{"method", "count", "error", "false"} + mw.requestCount.With(lvs...).Add(1) + mw.requestLatency.With(lvs...).Observe(time.Since(begin).Seconds()) + mw.countResult.Observe(float64(n)) + }(time.Now()) + + n = mw.next.Count(s) + return +} diff --git a/examples/stringsvc4/logging.go b/examples/stringsvc4/logging.go new file mode 100644 index 000000000..b958f3b6f --- /dev/null +++ b/examples/stringsvc4/logging.go @@ -0,0 +1,41 @@ +package main + +import ( + "time" + + "github.com/go-kit/kit/log" +) + +type loggingMiddleware struct { + logger log.Logger + next StringService +} + +func (mw loggingMiddleware) Uppercase(s string) (output string, err error) { + defer func(begin time.Time) { + _ = mw.logger.Log( + "method", "uppercase", + "input", s, + "output", output, + "err", err, + "took", time.Since(begin), + ) + }(time.Now()) + + output, err = mw.next.Uppercase(s) + return +} + +func (mw loggingMiddleware) Count(s string) (n int) { + defer func(begin time.Time) { + _ = mw.logger.Log( + "method", "count", + "input", s, + "n", n, + "took", time.Since(begin), + ) + }(time.Now()) + + n = mw.next.Count(s) + return +} diff --git a/examples/stringsvc4/main.go b/examples/stringsvc4/main.go new file mode 100644 index 000000000..36a7897da --- /dev/null +++ b/examples/stringsvc4/main.go @@ -0,0 +1,79 @@ +package main + +import ( + "net/http" + "os" + + stdprometheus "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + + "github.com/go-kit/kit/log" + kitprometheus "github.com/go-kit/kit/metrics/prometheus" + httptransport "github.com/go-kit/kit/transport/http" + natstransport "github.com/go-kit/kit/transport/nats" + "fmt" + "github.com/nats-io/go-nats" + +) + +func main() { + + urls := fmt.Sprint(os.Getenv("NATS_SERVER"), ", ", fmt.Sprintf("nats://%v:%v", os.Getenv("NATS_SERVICE_HOST"), os.Getenv("NATS_SERVICE_PORT"))) + + nc, err := nats.Connect(urls) + + defer nc.Close() + + if err != nil { + logger.Error("Can't connect: %v\n", err) + } + + + logger := log.NewLogfmtLogger(os.Stderr) + + fieldKeys := []string{"method", "error"} + requestCount := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: "my_group", + Subsystem: "string_service", + Name: "request_count", + Help: "Number of requests received.", + }, fieldKeys) + requestLatency := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ + Namespace: "my_group", + Subsystem: "string_service", + Name: "request_latency_microseconds", + Help: "Total duration of requests in microseconds.", + }, fieldKeys) + countResult := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ + Namespace: "my_group", + Subsystem: "string_service", + Name: "count_result", + Help: "The result of each count method.", + }, []string{}) // no fields here + + var svc StringService + svc = stringService{} + svc = loggingMiddleware{logger, svc} + svc = instrumentingMiddleware{requestCount, requestLatency, countResult, svc} + + uppercaseHandler := natstransport.NewServer( + makeUppercaseEndpoint(svc), + decodeUppercaseRequest, + encodeResponse, + ) + + countHandler := natstransport.NewServer( + makeCountEndpoint(svc), + decodeCountRequest, + encodeResponse, + ) + + nc.QueueSubscribe("com.go-kit.uppercase", uppercaseHandler.MsgHandler) + nc.QueueSubscribe("com.go-kit.count", countHandler.MsgHandler) + + http.Handle("/uppercase", uppercaseHandler) + http.Handle("/count", countHandler) + http.Handle("/metrics", promhttp.Handler()) + logger.Log("msg", "HTTP", "addr", ":8080") + logger.Log("err", http.ListenAndServe(":8080", nil)) +} diff --git a/examples/stringsvc4/service.go b/examples/stringsvc4/service.go new file mode 100644 index 000000000..1da2f3ebb --- /dev/null +++ b/examples/stringsvc4/service.go @@ -0,0 +1,28 @@ +package main + +import ( + "errors" + "strings" +) + +// StringService provides operations on strings. +type StringService interface { + Uppercase(string) (string, error) + Count(string) int +} + +type stringService struct{} + +func (stringService) Uppercase(s string) (string, error) { + if s == "" { + return "", ErrEmpty + } + return strings.ToUpper(s), nil +} + +func (stringService) Count(s string) int { + return len(s) +} + +// ErrEmpty is returned when an input string is empty. +var ErrEmpty = errors.New("empty string") diff --git a/examples/stringsvc4/transport.go b/examples/stringsvc4/transport.go new file mode 100644 index 000000000..20c430837 --- /dev/null +++ b/examples/stringsvc4/transport.go @@ -0,0 +1,64 @@ +package main + +import ( + "context" + "encoding/json" + + "github.com/go-kit/kit/endpoint" +) + +func makeUppercaseEndpoint(svc StringService) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(uppercaseRequest) + v, err := svc.Uppercase(req.S) + if err != nil { + return uppercaseResponse{v, err.Error()}, nil + } + return uppercaseResponse{v, ""}, nil + } +} + +func makeCountEndpoint(svc StringService) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(countRequest) + v := svc.Count(req.S) + return countResponse{v}, nil + } +} + +func decodeUppercaseRequest(_ context.Context, msg *nats.Msg) (interface{}, error) { + var request uppercaseRequest + if err := json.Unmarshal(msg.Data, &request); err != nil { + return nil, err + } + return request, nil +} + +func decodeCountRequest(_ context.Context, msg *nats.Msg) (interface{}, error) { + var request countRequest + if err := json.Unmarshal(msg.Data, &request); err != nil { + return nil, err + } + return request, nil +} + +func encodeResponse(_ context.Context, response interface{}) (r interface{}, err error) { + return r, err +} + +type uppercaseRequest struct { + S string `json:"s"` +} + +type uppercaseResponse struct { + V string `json:"v"` + Err string `json:"err,omitempty"` +} + +type countRequest struct { + S string `json:"s"` +} + +type countResponse struct { + V int `json:"v"` +} diff --git a/transport/nats/encode_decode.go b/transport/nats/encode_decode.go new file mode 100644 index 000000000..0800946f6 --- /dev/null +++ b/transport/nats/encode_decode.go @@ -0,0 +1,30 @@ +package nats + +import ( + "context" + "github.com/nats-io/go-nats" +) + +// DecodeRequestFunc extracts a user-domain request object from a NATS request. +// It's designed to be used in NATS servers, for server-side endpoints. One +// straightforward DecodeRequestFunc could be something that decodes from the +// NATS request message to the concrete request type. +type DecodeRequestFunc func(_ context.Context, msg *nats.Msg) (interface{}, error) + +// EncodeRequestFunc encodes the passed request object into the NATS request +// object. It's designed to be used in NATS clients, for client-side endpoints. +// One straightforward EncodeRequestFunc could something that encodes the object +// directly to the NATS request message. +type EncodeRequestFunc func(_ context.Context, msg interface{}) ([]byte, error) + +// EncodeResponseFunc encodes the passed response object to the NATS response +// message. It's designed to be used in NATS servers, for server-side endpoints. +// One straightforward EncodeResponseFunc could be something that encodes the +// object directly to the NATS response message. +type EncodeResponseFunc func(_ context.Context, response interface{}) ([]byte, error) + +// DecodeResponseFunc extracts a user-domain response object from a NATS +// response object. It's designed to be used in NATS clients, for client-side +// endpoints. One straightforward DecodeResponseFunc could be something that +// decodes from the NATS response message to the concrete response type. +type DecodeResponseFunc func(_ context.Context, msg *nats.Msg) (interface{}, error) diff --git a/transport/nats/request_response.go b/transport/nats/request_response.go new file mode 100644 index 000000000..aa1418171 --- /dev/null +++ b/transport/nats/request_response.go @@ -0,0 +1,75 @@ +package nats +import ( + "context" + "encoding/base64" + "strings" + + "google.golang.org/grpc/metadata" +) + +const ( + binHdrSuffix = "-bin" +) + +// ClientRequestFunc may take information from context and use it to construct +// metadata headers to be transported to the server. ClientRequestFuncs are +// executed after creating the request but prior to sending the gRPC request to +// the server. +type ClientRequestFunc func(context.Context, *metadata.MD) context.Context + +// ServerRequestFunc may take information from the received metadata header and +// use it to place items in the request scoped context. ServerRequestFuncs are +// executed prior to invoking the endpoint. +type ServerRequestFunc func(context.Context, metadata.MD) context.Context + +// ServerResponseFunc may take information from a request context and use it to +// manipulate the gRPC response metadata headers and trailers. ResponseFuncs are +// only executed in servers, after invoking the endpoint but prior to writing a +// response. +type ServerResponseFunc func(ctx context.Context, header *metadata.MD, trailer *metadata.MD) context.Context + +// ClientResponseFunc may take information from a gRPC metadata header and/or +// trailer and make the responses available for consumption. ClientResponseFuncs +// are only executed in clients, after a request has been made, but prior to it +// being decoded. +type ClientResponseFunc func(ctx context.Context, header metadata.MD, trailer metadata.MD) context.Context + +// SetRequestHeader returns a ClientRequestFunc that sets the specified metadata +// key-value pair. +func SetRequestHeader(key, val string) ClientRequestFunc { + return func(ctx context.Context, md *metadata.MD) context.Context { + key, val := EncodeKeyValue(key, val) + (*md)[key] = append((*md)[key], val) + return ctx + } +} + +// SetResponseHeader returns a ResponseFunc that sets the specified metadata +// key-value pair. +func SetResponseHeader(key, val string) ServerResponseFunc { + return func(ctx context.Context, md *metadata.MD, _ *metadata.MD) context.Context { + key, val := EncodeKeyValue(key, val) + (*md)[key] = append((*md)[key], val) + return ctx + } +} + +// SetResponseTrailer returns a ResponseFunc that sets the specified metadata +// key-value pair. +func SetResponseTrailer(key, val string) ServerResponseFunc { + return func(ctx context.Context, _ *metadata.MD, md *metadata.MD) context.Context { + key, val := EncodeKeyValue(key, val) + (*md)[key] = append((*md)[key], val) + return ctx + } +} + +// EncodeKeyValue sanitizes a key-value pair for use in gRPC metadata headers. +func EncodeKeyValue(key, val string) (string, string) { + key = strings.ToLower(key) + if strings.HasSuffix(key, binHdrSuffix) { + v := base64.StdEncoding.EncodeToString([]byte(val)) + val = string(v) + } + return key, val +} diff --git a/transport/nats/server.go b/transport/nats/server.go new file mode 100644 index 000000000..9efce4a90 --- /dev/null +++ b/transport/nats/server.go @@ -0,0 +1,105 @@ +package nats + +import ( + "context" + + "fmt" + "os" + + "github.com/go-kit/kit/endpoint" + "github.com/nats-io/go-nats" + log "github.com/sirupsen/logrus" + +) + +// Server wraps an endpoint and implements grpc.Handler. +type Server struct { + e endpoint.Endpoint + dec DecodeRequestFunc + enc EncodeResponseFunc + before []ServerRequestFunc + after []ServerResponseFunc + logger log.Logger +} + +// NewServer constructs a new server, which implements wraps the provided +// endpoint and implements the Handler interface. Consumers should write +// bindings that adapt the concrete gRPC methods from their compiled protobuf +// definitions to individual handlers. Request and response objects are from the +// caller business domain, not gRPC request and reply types. +func NewServer( + e endpoint.Endpoint, + dec DecodeRequestFunc, + enc EncodeResponseFunc, + options ...ServerOption, +) *Server { + s := &Server{ + e: e, + dec: dec, + enc: enc, + logger: *log.New(), + } + for _, option := range options { + option(s) + } + return s +} + +// ServerOption sets an optional parameter for servers. +type ServerOption func(*Server) + +// ServerBefore functions are executed on the HTTP request object before the +// request is decoded. +func ServerBefore(before ...ServerRequestFunc) ServerOption { + return func(s *Server) { s.before = append(s.before, before...) } +} + +// ServerAfter functions are executed on the HTTP response writer after the +// endpoint is invoked, but before anything is written to the client. +func ServerAfter(after ...ServerResponseFunc) ServerOption { + return func(s *Server) { s.after = append(s.after, after...) } +} + +// ServerErrorLogger is used to log non-terminal errors. By default, no errors +// are logged. +func ServerErrorLogger(logger log.Logger) ServerOption { + return func(s *Server) { s.logger = logger } +} + +// MsgHandler implements the MsgHandler type. +func (s Server) MsgHandler(msg *nats.Msg) { + + urls := fmt.Sprint(os.Getenv("NATS_SERVER"), ", ", fmt.Sprintf("nats://%v:%v", os.Getenv("NATS_SERVICE_HOST"), os.Getenv("NATS_SERVICE_PORT"))) + + s.logger.Info(urls) + + nc, err := nats.Connect(urls) + if err != nil { + s.logger.Error("Can't connect: %v\n", err) + } + + defer nc.Close() + + // Non-nil non empty context to take the place of the first context in th chain of handling. + ctx := context.TODO() + + request, err := s.dec(ctx, msg) + if err != nil { + s.logger.Error("err", err) + return + } + + response, err := s.e(ctx, request) + if err != nil { + s.logger.Error("err", err) + return + } + + payload, err := s.enc(ctx, response) + if err != nil { + s.logger.Error("err", err) + return + } + + nc.Publish(msg.Reply, payload) +}