-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
NATS transport for go-kit #623
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "time" | ||
|
|
||
| "github.com/go-kit/kit/metrics" | ||
| ) | ||
|
|
||
| type instrumentingMiddleware struct { | ||
| requestCount metrics.Counter | ||
| requestLatency metrics.Histogram | ||
| countResult metrics.Histogram | ||
| next StringService | ||
| } | ||
|
|
||
| func (mw instrumentingMiddleware) Uppercase(s string) (output string, err error) { | ||
| defer func(begin time.Time) { | ||
| lvs := []string{"method", "uppercase", "error", fmt.Sprint(err != nil)} | ||
| mw.requestCount.With(lvs...).Add(1) | ||
| mw.requestLatency.With(lvs...).Observe(time.Since(begin).Seconds()) | ||
| }(time.Now()) | ||
|
|
||
| output, err = mw.next.Uppercase(s) | ||
| return | ||
| } | ||
|
|
||
| func (mw instrumentingMiddleware) Count(s string) (n int) { | ||
| defer func(begin time.Time) { | ||
| lvs := []string{"method", "count", "error", "false"} | ||
| mw.requestCount.With(lvs...).Add(1) | ||
| mw.requestLatency.With(lvs...).Observe(time.Since(begin).Seconds()) | ||
| mw.countResult.Observe(float64(n)) | ||
| }(time.Now()) | ||
|
|
||
| n = mw.next.Count(s) | ||
| return | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "time" | ||
|
|
||
| "github.com/go-kit/kit/log" | ||
| ) | ||
|
|
||
| type loggingMiddleware struct { | ||
| logger log.Logger | ||
| next StringService | ||
| } | ||
|
|
||
| func (mw loggingMiddleware) Uppercase(s string) (output string, err error) { | ||
| defer func(begin time.Time) { | ||
| _ = mw.logger.Log( | ||
| "method", "uppercase", | ||
| "input", s, | ||
| "output", output, | ||
| "err", err, | ||
| "took", time.Since(begin), | ||
| ) | ||
| }(time.Now()) | ||
|
|
||
| output, err = mw.next.Uppercase(s) | ||
| return | ||
| } | ||
|
|
||
| func (mw loggingMiddleware) Count(s string) (n int) { | ||
| defer func(begin time.Time) { | ||
| _ = mw.logger.Log( | ||
| "method", "count", | ||
| "input", s, | ||
| "n", n, | ||
| "took", time.Since(begin), | ||
| ) | ||
| }(time.Now()) | ||
|
|
||
| n = mw.next.Count(s) | ||
| return | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,79 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "net/http" | ||
| "os" | ||
|
|
||
| stdprometheus "github.com/prometheus/client_golang/prometheus" | ||
| "github.com/prometheus/client_golang/prometheus/promhttp" | ||
|
|
||
| "github.com/go-kit/kit/log" | ||
| kitprometheus "github.com/go-kit/kit/metrics/prometheus" | ||
| httptransport "github.com/go-kit/kit/transport/http" | ||
| natstransport "github.com/go-kit/kit/transport/nats" | ||
| "fmt" | ||
| "github.com/nats-io/go-nats" | ||
|
|
||
| ) | ||
|
|
||
| func main() { | ||
|
|
||
| urls := fmt.Sprint(os.Getenv("NATS_SERVER"), ", ", fmt.Sprintf("nats://%v:%v", os.Getenv("NATS_SERVICE_HOST"), os.Getenv("NATS_SERVICE_PORT"))) | ||
|
|
||
| nc, err := nats.Connect(urls) | ||
|
|
||
| defer nc.Close() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. err should be checked before deferring close, otherwise this could fail |
||
|
|
||
| if err != nil { | ||
| logger.Error("Can't connect: %v\n", err) | ||
| } | ||
|
|
||
|
|
||
| logger := log.NewLogfmtLogger(os.Stderr) | ||
|
|
||
| fieldKeys := []string{"method", "error"} | ||
| requestCount := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{ | ||
| Namespace: "my_group", | ||
| Subsystem: "string_service", | ||
| Name: "request_count", | ||
| Help: "Number of requests received.", | ||
| }, fieldKeys) | ||
| requestLatency := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ | ||
| Namespace: "my_group", | ||
| Subsystem: "string_service", | ||
| Name: "request_latency_microseconds", | ||
| Help: "Total duration of requests in microseconds.", | ||
| }, fieldKeys) | ||
| countResult := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ | ||
| Namespace: "my_group", | ||
| Subsystem: "string_service", | ||
| Name: "count_result", | ||
| Help: "The result of each count method.", | ||
| }, []string{}) // no fields here | ||
|
|
||
| var svc StringService | ||
| svc = stringService{} | ||
| svc = loggingMiddleware{logger, svc} | ||
| svc = instrumentingMiddleware{requestCount, requestLatency, countResult, svc} | ||
|
|
||
| uppercaseHandler := natstransport.NewServer( | ||
| makeUppercaseEndpoint(svc), | ||
| decodeUppercaseRequest, | ||
| encodeResponse, | ||
| ) | ||
|
|
||
| countHandler := natstransport.NewServer( | ||
| makeCountEndpoint(svc), | ||
| decodeCountRequest, | ||
| encodeResponse, | ||
| ) | ||
|
|
||
| nc.QueueSubscribe("com.go-kit.uppercase", uppercaseHandler.MsgHandler) | ||
| nc.QueueSubscribe("com.go-kit.count", countHandler.MsgHandler) | ||
|
|
||
| http.Handle("/uppercase", uppercaseHandler) | ||
| http.Handle("/count", countHandler) | ||
| http.Handle("/metrics", promhttp.Handler()) | ||
| logger.Log("msg", "HTTP", "addr", ":8080") | ||
| logger.Log("err", http.ListenAndServe(":8080", nil)) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "errors" | ||
| "strings" | ||
| ) | ||
|
|
||
| // StringService provides operations on strings. | ||
| type StringService interface { | ||
| Uppercase(string) (string, error) | ||
| Count(string) int | ||
| } | ||
|
|
||
| type stringService struct{} | ||
|
|
||
| func (stringService) Uppercase(s string) (string, error) { | ||
| if s == "" { | ||
| return "", ErrEmpty | ||
| } | ||
| return strings.ToUpper(s), nil | ||
| } | ||
|
|
||
| func (stringService) Count(s string) int { | ||
| return len(s) | ||
| } | ||
|
|
||
| // ErrEmpty is returned when an input string is empty. | ||
| var ErrEmpty = errors.New("empty string") |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
|
|
||
| "github.com/go-kit/kit/endpoint" | ||
| ) | ||
|
|
||
| func makeUppercaseEndpoint(svc StringService) endpoint.Endpoint { | ||
| return func(ctx context.Context, request interface{}) (interface{}, error) { | ||
| req := request.(uppercaseRequest) | ||
| v, err := svc.Uppercase(req.S) | ||
| if err != nil { | ||
| return uppercaseResponse{v, err.Error()}, nil | ||
| } | ||
| return uppercaseResponse{v, ""}, nil | ||
| } | ||
| } | ||
|
|
||
| func makeCountEndpoint(svc StringService) endpoint.Endpoint { | ||
| return func(ctx context.Context, request interface{}) (interface{}, error) { | ||
| req := request.(countRequest) | ||
| v := svc.Count(req.S) | ||
| return countResponse{v}, nil | ||
| } | ||
| } | ||
|
|
||
| func decodeUppercaseRequest(_ context.Context, msg *nats.Msg) (interface{}, error) { | ||
| var request uppercaseRequest | ||
| if err := json.Unmarshal(msg.Data, &request); err != nil { | ||
| return nil, err | ||
| } | ||
| return request, nil | ||
| } | ||
|
|
||
| func decodeCountRequest(_ context.Context, msg *nats.Msg) (interface{}, error) { | ||
| var request countRequest | ||
| if err := json.Unmarshal(msg.Data, &request); err != nil { | ||
| return nil, err | ||
| } | ||
| return request, nil | ||
| } | ||
|
|
||
| func encodeResponse(_ context.Context, response interface{}) (r interface{}, err error) { | ||
| return r, err | ||
| } | ||
|
|
||
| type uppercaseRequest struct { | ||
| S string `json:"s"` | ||
| } | ||
|
|
||
| type uppercaseResponse struct { | ||
| V string `json:"v"` | ||
| Err string `json:"err,omitempty"` | ||
| } | ||
|
|
||
| type countRequest struct { | ||
| S string `json:"s"` | ||
| } | ||
|
|
||
| type countResponse struct { | ||
| V int `json:"v"` | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| package nats | ||
|
|
||
| import ( | ||
| "context" | ||
| "github.com/nats-io/go-nats" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stdlib imports should be in there own block, see https://github.com/golang/go/wiki/CodeReviewComments#imports |
||
| ) | ||
|
|
||
| // DecodeRequestFunc extracts a user-domain request object from a NATS request. | ||
| // It's designed to be used in NATS servers, for server-side endpoints. One | ||
| // straightforward DecodeRequestFunc could be something that decodes from the | ||
| // NATS request message to the concrete request type. | ||
| type DecodeRequestFunc func(_ context.Context, msg *nats.Msg) (interface{}, error) | ||
|
|
||
| // EncodeRequestFunc encodes the passed request object into the NATS request | ||
| // object. It's designed to be used in NATS clients, for client-side endpoints. | ||
| // One straightforward EncodeRequestFunc could something that encodes the object | ||
| // directly to the NATS request message. | ||
| type EncodeRequestFunc func(_ context.Context, msg interface{}) ([]byte, error) | ||
|
|
||
| // EncodeResponseFunc encodes the passed response object to the NATS response | ||
| // message. It's designed to be used in NATS servers, for server-side endpoints. | ||
| // One straightforward EncodeResponseFunc could be something that encodes the | ||
| // object directly to the NATS response message. | ||
| type EncodeResponseFunc func(_ context.Context, response interface{}) ([]byte, error) | ||
|
|
||
| // DecodeResponseFunc extracts a user-domain response object from a NATS | ||
| // response object. It's designed to be used in NATS clients, for client-side | ||
| // endpoints. One straightforward DecodeResponseFunc could be something that | ||
| // decodes from the NATS response message to the concrete response type. | ||
| type DecodeResponseFunc func(_ context.Context, msg *nats.Msg) (interface{}, error) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stdlib packages should all be in the first block, see https://github.com/golang/go/wiki/CodeReviewComments#imports