diff --git a/manager/manager.go b/manager/manager.go index 856554d290..e903dcf9fe 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -227,10 +227,44 @@ func New(config *Config) (*Manager, error) { } raftNode := raft.NewNode(newNodeOpts) + // the interceptorWrappers are functions that wrap the prometheus grpc + // interceptor, and add some of code to log errors locally. one for stream + // and one for unary. this is needed because the grpc unary interceptor + // doesn't natively do chaining, you have to implement it in the caller. + // note that even though these are logging errors, we're still using + // debug level. returning errors from GRPC methods is common and expected, + // and logging an ERROR every time a user mistypes a service name would + // pollute the logs really fast. + // + // NOTE(dperny): Because of the fact that these functions are very simple + // in their operation and have no side effects other than the log output, + // they are not automatically tested. If you modify them later, make _sure_ + // that they are correct. If you add substantial side effects, abstract + // these out and test them! + unaryInterceptorWrapper := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + // pass the call down into the grpc_prometheus interceptor + resp, err := grpc_prometheus.UnaryServerInterceptor(ctx, req, info, handler) + if err != nil { + log.G(ctx).WithField("rpc", info.FullMethod).WithError(err).Debug("error handling rpc") + } + return resp, err + } + + streamInterceptorWrapper := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + // we can't re-write a stream context, so don't bother creating a + // sub-context like in unary methods + // pass the call down into the grpc_prometheus interceptor + err := grpc_prometheus.StreamServerInterceptor(srv, ss, info, handler) + if err != nil { + log.G(ss.Context()).WithField("rpc", info.FullMethod).WithError(err).Debug("error handling streaming rpc") + } + return err + } + opts := []grpc.ServerOption{ grpc.Creds(config.SecurityConfig.ServerTLSCreds), - grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), - grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), + grpc.StreamInterceptor(streamInterceptorWrapper), + grpc.UnaryInterceptor(unaryInterceptorWrapper), grpc.MaxMsgSize(transport.GRPCMaxMsgSize), }