Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/akamai-purger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func daemon(c Config, ap *akamaiPurger, logger blog.Logger, scope prometheus.Reg
}
}()

start, err := bgrpc.NewServer(c.AkamaiPurger.GRPC).Add(
start, err := bgrpc.NewServer(c.AkamaiPurger.GRPC, logger).Add(
&akamaipb.AkamaiPurger_ServiceDesc, ap).Build(tlsConfig, scope, clk)
cmd.FailOnError(err, "Unable to setup Akamai purger gRPC server")

Expand Down
2 changes: 1 addition & 1 deletion cmd/boulder-ca/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func main() {
logger.Infof("Created a reloadable allow list, it was initialized with %d entries", entries)
}

srv := bgrpc.NewServer(c.CA.GRPCCA)
srv := bgrpc.NewServer(c.CA.GRPCCA, logger)

if !c.CA.DisableOCSPService {
ocspi, err := ca.NewOCSPImpl(
Expand Down
2 changes: 1 addition & 1 deletion cmd/boulder-publisher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func main() {

pubi := publisher.New(bundles, c.Publisher.UserAgent, logger, scope)

start, err := bgrpc.NewServer(c.Publisher.GRPC).Add(
start, err := bgrpc.NewServer(c.Publisher.GRPC, logger).Add(
&pubpb.Publisher_ServiceDesc, pubi).Build(tlsConfig, scope, clk)
cmd.FailOnError(err, "Unable to setup Publisher gRPC server")

Expand Down
2 changes: 1 addition & 1 deletion cmd/boulder-ra/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func main() {
rai.OCSP = ocspc
rai.SA = sac

start, err := bgrpc.NewServer(c.RA.GRPC).Add(
start, err := bgrpc.NewServer(c.RA.GRPC, logger).Add(
&rapb.RegistrationAuthority_ServiceDesc, rai).Build(tlsConfig, scope, clk)
cmd.FailOnError(err, "Unable to setup RA gRPC server")

Expand Down
2 changes: 1 addition & 1 deletion cmd/boulder-sa/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func main() {
sai, err := sa.NewSQLStorageAuthorityWrapping(saroi, dbMap, scope)
cmd.FailOnError(err, "Failed to create SA impl")

start, err := bgrpc.NewServer(c.SA.GRPC).Add(
start, err := bgrpc.NewServer(c.SA.GRPC, logger).WithCheckInterval(c.SA.HealthCheckInterval.Duration).Add(
&sapb.StorageAuthorityReadOnly_ServiceDesc, saroi).Add(
&sapb.StorageAuthority_ServiceDesc, sai).Build(
tls, scope, clk)
Expand Down
2 changes: 1 addition & 1 deletion cmd/boulder-va/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func main() {
c.VA.AccountURIPrefixes)
cmd.FailOnError(err, "Unable to create VA server")

start, err := bgrpc.NewServer(c.VA.GRPC).Add(
start, err := bgrpc.NewServer(c.VA.GRPC, logger).Add(
&vapb.VA_ServiceDesc, vai).Add(
&vapb.CAA_ServiceDesc, vai).Build(tlsConfig, scope, clk)
cmd.FailOnError(err, "Unable to setup VA gRPC server")
Expand Down
4 changes: 4 additions & 0 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ type ServiceConfig struct {
DebugAddr string `validate:"hostname_port"`
GRPC *GRPCServerConfig
TLS TLSConfig

// HealthCheckInterval is the duration between deep health checks of the
// service. Defaults to 5 seconds.
HealthCheckInterval config.Duration `validate:"-"`
}

// DBConfig defines how to connect to a database. The connect string is
Expand Down
2 changes: 1 addition & 1 deletion cmd/crl-storer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func main() {
csi, err := storer.New(issuers, s3client, c.CRLStorer.S3Bucket, scope, logger, clk)
cmd.FailOnError(err, "Failed to create CRLStorer impl")

start, err := bgrpc.NewServer(c.CRLStorer.GRPC).Add(
start, err := bgrpc.NewServer(c.CRLStorer.GRPC, logger).Add(
&cspb.CRLStorer_ServiceDesc, csi).Build(tlsConfig, scope, clk)
cmd.FailOnError(err, "Unable to setup CRLStorer gRPC server")

Expand Down
2 changes: 1 addition & 1 deletion cmd/nonce-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func main() {
cmd.FailOnError(err, "tlsConfig config")

nonceServer := nonce.NewServer(ns)
start, err := bgrpc.NewServer(c.NonceService.GRPC).Add(
start, err := bgrpc.NewServer(c.NonceService.GRPC, logger).Add(
&noncepb.NonceService_ServiceDesc, nonceServer).Build(tlsConfig, scope, cmd.Clock())
cmd.FailOnError(err, "Unable to setup nonce service gRPC server")

Expand Down
120 changes: 107 additions & 13 deletions grpc/server.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package grpc

import (
"context"
"crypto/tls"
"errors"
"fmt"
"net"
"strings"
"time"

grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/jmhodges/clock"
bcreds "github.com/letsencrypt/boulder/grpc/creds"
blog "github.com/letsencrypt/boulder/log"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/filters"
Expand All @@ -27,6 +30,15 @@ var CodedError = status.Errorf

var errNilTLS = errors.New("boulder/grpc: received nil tls.Config")

// checker is an interface for checking the health of a grpc service
// implementation.
type checker interface {
// Health returns nil if the service is healthy, or an error if it is not.
// If the passed context is canceled, it should return immediately with an
// error.
Health(context.Context) error
}

// service represents a single gRPC service that can be registered with a gRPC
// server.
type service struct {
Expand All @@ -37,15 +49,27 @@ type service struct {
// serverBuilder implements a builder pattern for constructing new gRPC servers
// and registering gRPC services on those servers.
type serverBuilder struct {
cfg *cmd.GRPCServerConfig
services map[string]service
err error
cfg *cmd.GRPCServerConfig
services map[string]service
healthSrv *health.Server
checkInterval time.Duration
logger blog.Logger
err error
}

// NewServer returns an object which can be used to build gRPC servers. It
// takes the server's configuration to perform initialization.
func NewServer(c *cmd.GRPCServerConfig) *serverBuilder {
return &serverBuilder{cfg: c, services: make(map[string]service)}
// NewServer returns an object which can be used to build gRPC servers. It takes
// the server's configuration to perform initialization and a logger for deep
// health checks.
func NewServer(c *cmd.GRPCServerConfig, logger blog.Logger) *serverBuilder {
return &serverBuilder{cfg: c, services: make(map[string]service), logger: logger}
}

// WithCheckInterval sets the interval at which the server will check the health
// of its registered services. If this is not called, a default interval of 5
// seconds will be used.
func (sb *serverBuilder) WithCheckInterval(i time.Duration) *serverBuilder {
sb.checkInterval = i
return sb
}

// Add registers a new service (consisting of its description and its
Expand All @@ -59,8 +83,7 @@ func (sb *serverBuilder) Add(desc *grpc.ServiceDesc, impl any) *serverBuilder {
sb.err = fmt.Errorf("attempted double-registration of gRPC service %q", desc.ServiceName)
return sb
}

sb.services[desc.ServiceName] = service{desc, impl}
sb.services[desc.ServiceName] = service{desc: desc, impl: impl}
return sb
}

Expand All @@ -71,9 +94,9 @@ func (sb *serverBuilder) Add(desc *grpc.ServiceDesc, impl any) *serverBuilder {
// gracefully stop the server if one is caught, causing the start() function to
// exit.
func (sb *serverBuilder) Build(tlsConfig *tls.Config, statsRegistry prometheus.Registerer, clk clock.Clock) (func() error, error) {
// Add the health service to all servers.
healthSrv := health.NewServer()
sb = sb.Add(&healthpb.Health_ServiceDesc, healthSrv)
// Register the health service with the server.
sb.healthSrv = health.NewServer()
sb.Add(&healthpb.Health_ServiceDesc, sb.healthSrv)

// Check to see if any of the calls to .Add() resulted in an error.
if sb.err != nil {
Expand Down Expand Up @@ -169,17 +192,88 @@ func (sb *serverBuilder) Build(tlsConfig *tls.Config, statsRegistry prometheus.R
return server.Serve(listener)
}

// Initialize long-running health checks of all services which implement the
// checker interface.
if sb.checkInterval <= 0 {
sb.checkInterval = 5 * time.Second
}
healthCtx, stopHealthChecks := context.WithCancel(context.Background())
for _, s := range sb.services {
check, ok := s.impl.(checker)
if !ok {
continue
}
sb.initLongRunningCheck(healthCtx, s.desc.ServiceName, check.Health)
}

// Start a goroutine which listens for a termination signal, and then
// gracefully stops the gRPC server. This in turn causes the start() function
// to exit, allowing its caller (generally a main() function) to exit.
go cmd.CatchSignals(func() {
healthSrv.Shutdown()
stopHealthChecks()
sb.healthSrv.Shutdown()
server.GracefulStop()
})

return start, nil
}

// initLongRunningCheck initializes a goroutine which will periodically check
// the health of the provided service and update the health server accordingly.
func (sb *serverBuilder) initLongRunningCheck(shutdownCtx context.Context, service string, checkImpl func(context.Context) error) {
// Set the initial health status for the service.
sb.healthSrv.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING)

// check is a helper function that checks the health of the service and, if
// necessary, updates its status in the health server.
checkAndMaybeUpdate := func(checkCtx context.Context, last healthpb.HealthCheckResponse_ServingStatus) healthpb.HealthCheckResponse_ServingStatus {
// Make a context with a timeout at 90% of the interval.
checkImplCtx, cancel := context.WithTimeout(checkCtx, sb.checkInterval*9/10)
defer cancel()

var next healthpb.HealthCheckResponse_ServingStatus
err := checkImpl(checkImplCtx)
if err != nil {
next = healthpb.HealthCheckResponse_NOT_SERVING
} else {
next = healthpb.HealthCheckResponse_SERVING
}

if last == next {
// No change in health status.
return next
}

if next != healthpb.HealthCheckResponse_SERVING {
sb.logger.Errf("transitioning health of %q from %q to %q, due to: %s", service, last, next, err)
} else {
sb.logger.Infof("transitioning health of %q from %q to %q", service, last, next)
}
sb.healthSrv.SetServingStatus(service, next)
return next
}

go func() {
ticker := time.NewTicker(sb.checkInterval)
defer ticker.Stop()

// Assume the service is not healthy to start.
last := healthpb.HealthCheckResponse_NOT_SERVING

// Check immediately, and then at the specified interval.
last = checkAndMaybeUpdate(shutdownCtx, last)
for {
select {
case <-shutdownCtx.Done():
// The server is shutting down.
return
case <-ticker.C:
last = checkAndMaybeUpdate(shutdownCtx, last)
}
}
}()
}

// serverMetrics is a struct type used to return a few registered metrics from
// `newServerMetrics`
type serverMetrics struct {
Expand Down
72 changes: 72 additions & 0 deletions grpc/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package grpc

import (
"context"
"errors"
"testing"
"time"

blog "github.com/letsencrypt/boulder/log"
"github.com/letsencrypt/boulder/test"
"google.golang.org/grpc/health"
)

func Test_serverBuilder_initLongRunningCheck(t *testing.T) {
Comment thread
beautifulentropy marked this conversation as resolved.
t.Parallel()
hs := health.NewServer()
mockLogger := blog.NewMock()
sb := &serverBuilder{
healthSrv: hs,
logger: mockLogger,
checkInterval: time.Millisecond * 50,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

count := 0
failEveryThirdCheck := func(context.Context) error {
count++
if count%3 == 0 {
return errors.New("oops")
}
return nil
}
sb.initLongRunningCheck(ctx, "test", failEveryThirdCheck)
time.Sleep(time.Millisecond * 110)
cancel()

// We expect the following transition timeline:
// - ~0ms 1st check passed, NOT_SERVING to SERVING
// - ~50ms 2nd check passed, [no transition]
// - ~100ms 3rd check failed, SERVING to NOT_SERVING
serving := mockLogger.GetAllMatching(".*\"NOT_SERVING\" to \"SERVING\"")
notServing := mockLogger.GetAllMatching((".*\"SERVING\" to \"NOT_SERVING\""))
test.Assert(t, len(serving) == 1, "expected one serving log line")
test.Assert(t, len(notServing) == 1, "expected one not serving log line")

mockLogger.Clear()

ctx, cancel = context.WithCancel(context.Background())
defer cancel()

count = 0
failEveryOtherCheck := func(context.Context) error {
count++
if count%2 == 0 {
return errors.New("oops")
}
return nil
}
sb.initLongRunningCheck(ctx, "test", failEveryOtherCheck)
time.Sleep(time.Millisecond * 110)
cancel()

// We expect the following transition timeline:
// - ~0ms 1st check passed, NOT_SERVING to SERVING
// - ~50ms 2nd check failed, SERVING to NOT_SERVING
// - ~100ms 3rd check passed, NOT_SERVING to SERVING
serving = mockLogger.GetAllMatching(".*\"NOT_SERVING\" to \"SERVING\"")
notServing = mockLogger.GetAllMatching((".*\"SERVING\" to \"NOT_SERVING\""))
test.Assert(t, len(serving) == 2, "expected two serving log lines")
test.Assert(t, len(notServing) == 1, "expected one not serving log line")
}
14 changes: 14 additions & 0 deletions sa/sa.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,3 +901,17 @@ func (ssa *SQLStorageAuthority) AddBlockedKey(ctx context.Context, req *sapb.Add
}
return &emptypb.Empty{}, nil
}

// Health implements the grpc.checker interface.
func (ssa *SQLStorageAuthority) Health(ctx context.Context) error {
err := ssa.dbMap.WithContext(ctx).SelectOne(new(int), "SELECT 1")
if err != nil {
return err
}

err = ssa.SQLStorageAuthorityRO.Health(ctx)
if err != nil {
return err
}
return nil
}
9 changes: 9 additions & 0 deletions sa/saro.go
Original file line number Diff line number Diff line change
Expand Up @@ -1353,3 +1353,12 @@ func (ssa *SQLStorageAuthorityRO) GetMaxExpiration(ctx context.Context, req *emp
func (ssa *SQLStorageAuthority) GetMaxExpiration(ctx context.Context, req *emptypb.Empty) (*timestamppb.Timestamp, error) {
return ssa.SQLStorageAuthorityRO.GetMaxExpiration(ctx, req)
}

// Health implements the grpc.checker interface.
func (ssa *SQLStorageAuthorityRO) Health(ctx context.Context) error {
err := ssa.dbReadOnlyMap.WithContext(ctx).SelectOne(new(int), "SELECT 1")
if err != nil {
return err
}
return nil
}
1 change: 1 addition & 0 deletions test/config-next/sa.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
}
}
},
"healthCheckInterval": "4s",
"features": {
"StoreRevokerInfo": true
}
Expand Down
Loading