From 779ccea9a4b5a9febcf02356e429e5819368ddf7 Mon Sep 17 00:00:00 2001 From: nemesisesq Date: Tue, 17 Oct 2017 11:00:58 -0400 Subject: [PATCH 1/6] n --- .idea/kit.iml | 9 ++ .idea/modules.xml | 8 ++ .idea/vcs.xml | 6 ++ .idea/workspace.xml | 202 ++++++++++++++++++++++++++++++++++++++++++++ transport/nats | 1 + 5 files changed, 226 insertions(+) create mode 100644 .idea/kit.iml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 .idea/workspace.xml create mode 160000 transport/nats diff --git a/.idea/kit.iml b/.idea/kit.iml new file mode 100644 index 000000000..5e764c4f0 --- /dev/null +++ b/.idea/kit.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 000000000..b1e2bf24a --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 000000000..94a25f7f4 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/workspace.xml b/.idea/workspace.xml new file mode 100644 index 000000000..2787d54ba --- /dev/null +++ b/.idea/workspace.xml @@ -0,0 +1,202 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + true + DEFINITION_ORDER + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/] b/] new file mode 100644 index 000000000..ac37e7518 --- /dev/null +++ b/] @@ -0,0 +1,14 @@ +Added NATS transport +# Please enter the commit message for your changes. Lines starting +# with '#' will be ignored, and an empty message aborts the commit. +# Explicit paths specified without -i or -o; assuming --only paths... +# On branch master +# Your branch is up-to-date with 'origin/master'. +# +# Changes to be committed: +# modified: .gitignore +# modified: .idea/workspace.xml +# +# Changes not staged for commit: +# modified: transport/nats (modified content) +# From 3475a02bb775327a286d5dbaf19f9eddf3deb4b6 Mon Sep 17 00:00:00 2001 From: nemesisesq Date: Tue, 17 Oct 2017 11:15:43 -0400 Subject: [PATCH 4/6] adding nats as a folder not as a sub module --- transport/nats | 1 - transport/nats/client.go | 116 +++++++++++++++++++++++++++++ transport/nats/encode_decode.go | 30 ++++++++ transport/nats/request_response.go | 75 +++++++++++++++++++ transport/nats/server.go | 105 ++++++++++++++++++++++++++ 5 files changed, 326 insertions(+), 1 deletion(-) delete mode 160000 transport/nats create mode 100644 transport/nats/client.go create mode 100644 transport/nats/encode_decode.go create mode 100644 transport/nats/request_response.go create mode 100644 transport/nats/server.go diff --git a/transport/nats b/transport/nats deleted file mode 160000 index 29afeeae3..000000000 --- a/transport/nats +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 29afeeae398a43300d94d762d9b1ddd912f75761 diff --git a/transport/nats/client.go b/transport/nats/client.go new file mode 100644 index 000000000..a562b080d --- /dev/null +++ b/transport/nats/client.go @@ -0,0 +1,116 @@ +package nats + +import ( + "context" + "fmt" + "reflect" + + "github.com/go-kit/kit/endpoint" + "github.com/nats-io/go-nats" + "time" + "os" + + log "github.com/sirupsen/logrus" + +) + +//Client wraps a nats connection and provides a subject that implements it +type Client struct { + + serviceName string + subject string + enc EncodeRequestFunc + dec DecodeRequestFunc + before []ClientRequestFunc + after []ClientResponseFunc + natsReply reflect.Type + logger log.Logger +} + +// NewClient constructs a usable Client for a single remote endpoint. + +func NewClient( + serviceName string, + subject string, + enc EncodeRequestFunc, + dec DecodeRequestFunc, + natsReply interface{}, + options ...ClientOption, +) *Client { + c := &Client{ + subject: fmt.Sprintf("/%s/%s", serviceName, subject), + enc: enc, + dec: dec, + // We are using reflect.Indirect here to allow both reply structs and + // pointers to these reply structs. New consumers of the client should + // use structs directly, while existing consumers will not break if they + // remain to use pointers to structs. + natsReply: reflect.TypeOf( + reflect.Indirect( + reflect.ValueOf(natsReply), + ).Interface(), + ), + before: []ClientRequestFunc{}, + after: []ClientResponseFunc{}, + } + for _, option := range options { + option(c) + } + return c +} + +// ClientOption sets an optional parameter for clients. +type ClientOption func(*Client) + +// ClientBefore sets the RequestFuncs that are applied to the outgoing gRPC +// request before it's invoked. +func ClientBefore(before ...ClientRequestFunc) ClientOption { + return func(c *Client) { c.before = append(c.before, before...) } +} + +// ClientAfter sets the ClientResponseFuncs that are applied to the incoming +// gRPC response prior to it being decoded. This is useful for obtaining +// response metadata and adding onto the context prior to decoding. +func ClientAfter(after ...ClientResponseFunc) ClientOption { + return func(c *Client) { c.after = append(c.after, after...) } +} + +// Endpoint returns a usable endpoint that will invoke the nats specified by the +// client. +func (c Client) Endpoint() endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + + urls := fmt.Sprint(os.Getenv("NATS_SERVER"), ", ", fmt.Sprintf("nats://%v:%v", os.Getenv("NATS_SERVICE_HOST"), os.Getenv("NATS_SERVICE_PORT"))) + + c.logger.Info(urls) + + nc, err := nats.Connect(urls) + if err != nil { + c.logger.Error("Can't connect: %v\n", err) + } + defer nc.Close() + + var msg *nats.Msg + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + req, err := c.enc(ctx,request) + if err != nil { + return nil, err + } + + if msg, err = nc.Request(c.subject, req, time.Second * 10); err != nil { + return nil, err + } + + //for _, f := range c.after { + // ctx = f(ctx, header, trailer) + //} + + response, err := c.dec(ctx, msg) + if err != nil { + return nil, err + } + return response, nil + } +} diff --git a/transport/nats/encode_decode.go b/transport/nats/encode_decode.go new file mode 100644 index 000000000..0800946f6 --- /dev/null +++ b/transport/nats/encode_decode.go @@ -0,0 +1,30 @@ +package nats + +import ( + "context" + "github.com/nats-io/go-nats" +) + +// DecodeRequestFunc extracts a user-domain request object from a NATS request. +// It's designed to be used in NATS servers, for server-side endpoints. One +// straightforward DecodeRequestFunc could be something that decodes from the +// NATS request message to the concrete request type. +type DecodeRequestFunc func(_ context.Context, msg *nats.Msg) (interface{}, error) + +// EncodeRequestFunc encodes the passed request object into the NATS request +// object. It's designed to be used in NATS clients, for client-side endpoints. +// One straightforward EncodeRequestFunc could something that encodes the object +// directly to the NATS request message. +type EncodeRequestFunc func(_ context.Context, msg interface{}) ([]byte, error) + +// EncodeResponseFunc encodes the passed response object to the NATS response +// message. It's designed to be used in NATS servers, for server-side endpoints. +// One straightforward EncodeResponseFunc could be something that encodes the +// object directly to the NATS response message. +type EncodeResponseFunc func(_ context.Context, response interface{}) ([]byte, error) + +// DecodeResponseFunc extracts a user-domain response object from a NATS +// response object. It's designed to be used in NATS clients, for client-side +// endpoints. One straightforward DecodeResponseFunc could be something that +// decodes from the NATS response message to the concrete response type. +type DecodeResponseFunc func(_ context.Context, msg *nats.Msg) (interface{}, error) diff --git a/transport/nats/request_response.go b/transport/nats/request_response.go new file mode 100644 index 000000000..aa1418171 --- /dev/null +++ b/transport/nats/request_response.go @@ -0,0 +1,75 @@ +package nats +import ( + "context" + "encoding/base64" + "strings" + + "google.golang.org/grpc/metadata" +) + +const ( + binHdrSuffix = "-bin" +) + +// ClientRequestFunc may take information from context and use it to construct +// metadata headers to be transported to the server. ClientRequestFuncs are +// executed after creating the request but prior to sending the gRPC request to +// the server. +type ClientRequestFunc func(context.Context, *metadata.MD) context.Context + +// ServerRequestFunc may take information from the received metadata header and +// use it to place items in the request scoped context. ServerRequestFuncs are +// executed prior to invoking the endpoint. +type ServerRequestFunc func(context.Context, metadata.MD) context.Context + +// ServerResponseFunc may take information from a request context and use it to +// manipulate the gRPC response metadata headers and trailers. ResponseFuncs are +// only executed in servers, after invoking the endpoint but prior to writing a +// response. +type ServerResponseFunc func(ctx context.Context, header *metadata.MD, trailer *metadata.MD) context.Context + +// ClientResponseFunc may take information from a gRPC metadata header and/or +// trailer and make the responses available for consumption. ClientResponseFuncs +// are only executed in clients, after a request has been made, but prior to it +// being decoded. +type ClientResponseFunc func(ctx context.Context, header metadata.MD, trailer metadata.MD) context.Context + +// SetRequestHeader returns a ClientRequestFunc that sets the specified metadata +// key-value pair. +func SetRequestHeader(key, val string) ClientRequestFunc { + return func(ctx context.Context, md *metadata.MD) context.Context { + key, val := EncodeKeyValue(key, val) + (*md)[key] = append((*md)[key], val) + return ctx + } +} + +// SetResponseHeader returns a ResponseFunc that sets the specified metadata +// key-value pair. +func SetResponseHeader(key, val string) ServerResponseFunc { + return func(ctx context.Context, md *metadata.MD, _ *metadata.MD) context.Context { + key, val := EncodeKeyValue(key, val) + (*md)[key] = append((*md)[key], val) + return ctx + } +} + +// SetResponseTrailer returns a ResponseFunc that sets the specified metadata +// key-value pair. +func SetResponseTrailer(key, val string) ServerResponseFunc { + return func(ctx context.Context, _ *metadata.MD, md *metadata.MD) context.Context { + key, val := EncodeKeyValue(key, val) + (*md)[key] = append((*md)[key], val) + return ctx + } +} + +// EncodeKeyValue sanitizes a key-value pair for use in gRPC metadata headers. +func EncodeKeyValue(key, val string) (string, string) { + key = strings.ToLower(key) + if strings.HasSuffix(key, binHdrSuffix) { + v := base64.StdEncoding.EncodeToString([]byte(val)) + val = string(v) + } + return key, val +} diff --git a/transport/nats/server.go b/transport/nats/server.go new file mode 100644 index 000000000..9efce4a90 --- /dev/null +++ b/transport/nats/server.go @@ -0,0 +1,105 @@ +package nats + +import ( + "context" + + "fmt" + "os" + + "github.com/go-kit/kit/endpoint" + "github.com/nats-io/go-nats" + log "github.com/sirupsen/logrus" + +) + +// Server wraps an endpoint and implements grpc.Handler. +type Server struct { + e endpoint.Endpoint + dec DecodeRequestFunc + enc EncodeResponseFunc + before []ServerRequestFunc + after []ServerResponseFunc + logger log.Logger +} + +// NewServer constructs a new server, which implements wraps the provided +// endpoint and implements the Handler interface. Consumers should write +// bindings that adapt the concrete gRPC methods from their compiled protobuf +// definitions to individual handlers. Request and response objects are from the +// caller business domain, not gRPC request and reply types. +func NewServer( + e endpoint.Endpoint, + dec DecodeRequestFunc, + enc EncodeResponseFunc, + options ...ServerOption, +) *Server { + s := &Server{ + e: e, + dec: dec, + enc: enc, + logger: *log.New(), + } + for _, option := range options { + option(s) + } + return s +} + +// ServerOption sets an optional parameter for servers. +type ServerOption func(*Server) + +// ServerBefore functions are executed on the HTTP request object before the +// request is decoded. +func ServerBefore(before ...ServerRequestFunc) ServerOption { + return func(s *Server) { s.before = append(s.before, before...) } +} + +// ServerAfter functions are executed on the HTTP response writer after the +// endpoint is invoked, but before anything is written to the client. +func ServerAfter(after ...ServerResponseFunc) ServerOption { + return func(s *Server) { s.after = append(s.after, after...) } +} + +// ServerErrorLogger is used to log non-terminal errors. By default, no errors +// are logged. +func ServerErrorLogger(logger log.Logger) ServerOption { + return func(s *Server) { s.logger = logger } +} + +// MsgHandler implements the MsgHandler type. +func (s Server) MsgHandler(msg *nats.Msg) { + + urls := fmt.Sprint(os.Getenv("NATS_SERVER"), ", ", fmt.Sprintf("nats://%v:%v", os.Getenv("NATS_SERVICE_HOST"), os.Getenv("NATS_SERVICE_PORT"))) + + s.logger.Info(urls) + + nc, err := nats.Connect(urls) + if err != nil { + s.logger.Error("Can't connect: %v\n", err) + } + + defer nc.Close() + + // Non-nil non empty context to take the place of the first context in th chain of handling. + ctx := context.TODO() + + request, err := s.dec(ctx, msg) + if err != nil { + s.logger.Error("err", err) + return + } + + response, err := s.e(ctx, request) + if err != nil { + s.logger.Error("err", err) + return + } + + payload, err := s.enc(ctx, response) + if err != nil { + s.logger.Error("err", err) + return + } + + nc.Publish(msg.Reply, payload) +} From 2f8d5bf0ef03b5a0f12bf9439f3c4dc962069317 Mon Sep 17 00:00:00 2001 From: nemesisesq Date: Tue, 17 Oct 2017 12:17:57 -0400 Subject: [PATCH 5/6] added example --- ] | 14 ---- examples/stringsvc4/instrumenting.go | 38 +++++++++ examples/stringsvc4/logging.go | 41 ++++++++++ examples/stringsvc4/main.go | 79 ++++++++++++++++++ examples/stringsvc4/service.go | 28 +++++++ examples/stringsvc4/transport.go | 65 +++++++++++++++ transport/nats/client.go | 116 --------------------------- 7 files changed, 251 insertions(+), 130 deletions(-) delete mode 100644 ] create mode 100644 examples/stringsvc4/instrumenting.go create mode 100644 examples/stringsvc4/logging.go create mode 100644 examples/stringsvc4/main.go create mode 100644 examples/stringsvc4/service.go create mode 100644 examples/stringsvc4/transport.go delete mode 100644 transport/nats/client.go diff --git a/] b/] deleted file mode 100644 index ac37e7518..000000000 --- a/] +++ /dev/null @@ -1,14 +0,0 @@ -Added NATS transport -# Please enter the commit message for your changes. Lines starting -# with '#' will be ignored, and an empty message aborts the commit. -# Explicit paths specified without -i or -o; assuming --only paths... -# On branch master -# Your branch is up-to-date with 'origin/master'. -# -# Changes to be committed: -# modified: .gitignore -# modified: .idea/workspace.xml -# -# Changes not staged for commit: -# modified: transport/nats (modified content) -# diff --git a/examples/stringsvc4/instrumenting.go b/examples/stringsvc4/instrumenting.go new file mode 100644 index 000000000..675617d9c --- /dev/null +++ b/examples/stringsvc4/instrumenting.go @@ -0,0 +1,38 @@ +package main + +import ( + "fmt" + "time" + + "github.com/go-kit/kit/metrics" +) + +type instrumentingMiddleware struct { + requestCount metrics.Counter + requestLatency metrics.Histogram + countResult metrics.Histogram + next StringService +} + +func (mw instrumentingMiddleware) Uppercase(s string) (output string, err error) { + defer func(begin time.Time) { + lvs := []string{"method", "uppercase", "error", fmt.Sprint(err != nil)} + mw.requestCount.With(lvs...).Add(1) + mw.requestLatency.With(lvs...).Observe(time.Since(begin).Seconds()) + }(time.Now()) + + output, err = mw.next.Uppercase(s) + return +} + +func (mw instrumentingMiddleware) Count(s string) (n int) { + defer func(begin time.Time) { + lvs := []string{"method", "count", "error", "false"} + mw.requestCount.With(lvs...).Add(1) + mw.requestLatency.With(lvs...).Observe(time.Since(begin).Seconds()) + mw.countResult.Observe(float64(n)) + }(time.Now()) + + n = mw.next.Count(s) + return +} diff --git a/examples/stringsvc4/logging.go b/examples/stringsvc4/logging.go new file mode 100644 index 000000000..b958f3b6f --- /dev/null +++ b/examples/stringsvc4/logging.go @@ -0,0 +1,41 @@ +package main + +import ( + "time" + + "github.com/go-kit/kit/log" +) + +type loggingMiddleware struct { + logger log.Logger + next StringService +} + +func (mw loggingMiddleware) Uppercase(s string) (output string, err error) { + defer func(begin time.Time) { + _ = mw.logger.Log( + "method", "uppercase", + "input", s, + "output", output, + "err", err, + "took", time.Since(begin), + ) + }(time.Now()) + + output, err = mw.next.Uppercase(s) + return +} + +func (mw loggingMiddleware) Count(s string) (n int) { + defer func(begin time.Time) { + _ = mw.logger.Log( + "method", "count", + "input", s, + "n", n, + "took", time.Since(begin), + ) + }(time.Now()) + + n = mw.next.Count(s) + return +} diff --git a/examples/stringsvc4/main.go b/examples/stringsvc4/main.go new file mode 100644 index 000000000..36a7897da --- /dev/null +++ b/examples/stringsvc4/main.go @@ -0,0 +1,79 @@ +package main + +import ( + "net/http" + "os" + + stdprometheus "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + + "github.com/go-kit/kit/log" + kitprometheus "github.com/go-kit/kit/metrics/prometheus" + httptransport "github.com/go-kit/kit/transport/http" + natstransport "github.com/go-kit/kit/transport/nats" + "fmt" + "github.com/nats-io/go-nats" + +) + +func main() { + + urls := fmt.Sprint(os.Getenv("NATS_SERVER"), ", ", fmt.Sprintf("nats://%v:%v", os.Getenv("NATS_SERVICE_HOST"), os.Getenv("NATS_SERVICE_PORT"))) + + nc, err := nats.Connect(urls) + + defer nc.Close() + + if err != nil { + logger.Error("Can't connect: %v\n", err) + } + + + logger := log.NewLogfmtLogger(os.Stderr) + + fieldKeys := []string{"method", "error"} + requestCount := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: "my_group", + Subsystem: "string_service", + Name: "request_count", + Help: "Number of requests received.", + }, fieldKeys) + requestLatency := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ + Namespace: "my_group", + Subsystem: "string_service", + Name: "request_latency_microseconds", + Help: "Total duration of requests in microseconds.", + }, fieldKeys) + countResult := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ + Namespace: "my_group", + Subsystem: "string_service", + Name: "count_result", + Help: "The result of each count method.", + }, []string{}) // no fields here + + var svc StringService + svc = stringService{} + svc = loggingMiddleware{logger, svc} + svc = instrumentingMiddleware{requestCount, requestLatency, countResult, svc} + + uppercaseHandler := natstransport.NewServer( + makeUppercaseEndpoint(svc), + decodeUppercaseRequest, + encodeResponse, + ) + + countHandler := natstransport.NewServer( + makeCountEndpoint(svc), + decodeCountRequest, + encodeResponse, + ) + + nc.QueueSubscribe("com.go-kit.uppercase", uppercaseHandler.MsgHandler) + nc.QueueSubscribe("com.go-kit.count", countHandler.MsgHandler) + + http.Handle("/uppercase", uppercaseHandler) + http.Handle("/count", countHandler) + http.Handle("/metrics", promhttp.Handler()) + logger.Log("msg", "HTTP", "addr", ":8080") + logger.Log("err", http.ListenAndServe(":8080", nil)) +} diff --git a/examples/stringsvc4/service.go b/examples/stringsvc4/service.go new file mode 100644 index 000000000..1da2f3ebb --- /dev/null +++ b/examples/stringsvc4/service.go @@ -0,0 +1,28 @@ +package main + +import ( + "errors" + "strings" +) + +// StringService provides operations on strings. +type StringService interface { + Uppercase(string) (string, error) + Count(string) int +} + +type stringService struct{} + +func (stringService) Uppercase(s string) (string, error) { + if s == "" { + return "", ErrEmpty + } + return strings.ToUpper(s), nil +} + +func (stringService) Count(s string) int { + return len(s) +} + +// ErrEmpty is returned when an input string is empty. +var ErrEmpty = errors.New("empty string") diff --git a/examples/stringsvc4/transport.go b/examples/stringsvc4/transport.go new file mode 100644 index 000000000..d18d439d7 --- /dev/null +++ b/examples/stringsvc4/transport.go @@ -0,0 +1,65 @@ +package main + +import ( + "context" + "encoding/json" + "net/http" + + "github.com/go-kit/kit/endpoint" +) + +func makeUppercaseEndpoint(svc StringService) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(uppercaseRequest) + v, err := svc.Uppercase(req.S) + if err != nil { + return uppercaseResponse{v, err.Error()}, nil + } + return uppercaseResponse{v, ""}, nil + } +} + +func makeCountEndpoint(svc StringService) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(countRequest) + v := svc.Count(req.S) + return countResponse{v}, nil + } +} + +func decodeUppercaseRequest(_ context.Context, msg *nats.Msg) (interface{}, error) { + var request uppercaseRequest + if err := json.Unmarshal(msg.Data, &request); err != nil { + return nil, err + } + return request, nil +} + +func decodeCountRequest(_ context.Context, msg *nats.Msg) (interface{}, error) { + var request countRequest + if err := json.Unmarshal(msg.Data, &request); err != nil { + return nil, err + } + return request, nil +} + +func encodeResponse(_ context.Context, response interface{}) (r interface, err error) { + return r, err +} + +type uppercaseRequest struct { + S string `json:"s"` +} + +type uppercaseResponse struct { + V string `json:"v"` + Err string `json:"err,omitempty"` +} + +type countRequest struct { + S string `json:"s"` +} + +type countResponse struct { + V int `json:"v"` +} diff --git a/transport/nats/client.go b/transport/nats/client.go deleted file mode 100644 index a562b080d..000000000 --- a/transport/nats/client.go +++ /dev/null @@ -1,116 +0,0 @@ -package nats - -import ( - "context" - "fmt" - "reflect" - - "github.com/go-kit/kit/endpoint" - "github.com/nats-io/go-nats" - "time" - "os" - - log "github.com/sirupsen/logrus" - -) - -//Client wraps a nats connection and provides a subject that implements it -type Client struct { - - serviceName string - subject string - enc EncodeRequestFunc - dec DecodeRequestFunc - before []ClientRequestFunc - after []ClientResponseFunc - natsReply reflect.Type - logger log.Logger -} - -// NewClient constructs a usable Client for a single remote endpoint. - -func NewClient( - serviceName string, - subject string, - enc EncodeRequestFunc, - dec DecodeRequestFunc, - natsReply interface{}, - options ...ClientOption, -) *Client { - c := &Client{ - subject: fmt.Sprintf("/%s/%s", serviceName, subject), - enc: enc, - dec: dec, - // We are using reflect.Indirect here to allow both reply structs and - // pointers to these reply structs. New consumers of the client should - // use structs directly, while existing consumers will not break if they - // remain to use pointers to structs. - natsReply: reflect.TypeOf( - reflect.Indirect( - reflect.ValueOf(natsReply), - ).Interface(), - ), - before: []ClientRequestFunc{}, - after: []ClientResponseFunc{}, - } - for _, option := range options { - option(c) - } - return c -} - -// ClientOption sets an optional parameter for clients. -type ClientOption func(*Client) - -// ClientBefore sets the RequestFuncs that are applied to the outgoing gRPC -// request before it's invoked. -func ClientBefore(before ...ClientRequestFunc) ClientOption { - return func(c *Client) { c.before = append(c.before, before...) } -} - -// ClientAfter sets the ClientResponseFuncs that are applied to the incoming -// gRPC response prior to it being decoded. This is useful for obtaining -// response metadata and adding onto the context prior to decoding. -func ClientAfter(after ...ClientResponseFunc) ClientOption { - return func(c *Client) { c.after = append(c.after, after...) } -} - -// Endpoint returns a usable endpoint that will invoke the nats specified by the -// client. -func (c Client) Endpoint() endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (interface{}, error) { - - urls := fmt.Sprint(os.Getenv("NATS_SERVER"), ", ", fmt.Sprintf("nats://%v:%v", os.Getenv("NATS_SERVICE_HOST"), os.Getenv("NATS_SERVICE_PORT"))) - - c.logger.Info(urls) - - nc, err := nats.Connect(urls) - if err != nil { - c.logger.Error("Can't connect: %v\n", err) - } - defer nc.Close() - - var msg *nats.Msg - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - req, err := c.enc(ctx,request) - if err != nil { - return nil, err - } - - if msg, err = nc.Request(c.subject, req, time.Second * 10); err != nil { - return nil, err - } - - //for _, f := range c.after { - // ctx = f(ctx, header, trailer) - //} - - response, err := c.dec(ctx, msg) - if err != nil { - return nil, err - } - return response, nil - } -} From f59001a8240736f59a390667df46033e749b289c Mon Sep 17 00:00:00 2001 From: nemesisesq Date: Tue, 17 Oct 2017 14:27:00 -0400 Subject: [PATCH 6/6] add example --- examples/stringsvc4/transport.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/stringsvc4/transport.go b/examples/stringsvc4/transport.go index d18d439d7..20c430837 100644 --- a/examples/stringsvc4/transport.go +++ b/examples/stringsvc4/transport.go @@ -3,7 +3,6 @@ package main import ( "context" "encoding/json" - "net/http" "github.com/go-kit/kit/endpoint" ) @@ -43,7 +42,7 @@ func decodeCountRequest(_ context.Context, msg *nats.Msg) (interface{}, error) { return request, nil } -func encodeResponse(_ context.Context, response interface{}) (r interface, err error) { +func encodeResponse(_ context.Context, response interface{}) (r interface{}, err error) { return r, err }