From edbe20a84a34ff5260b16dd3d5cf9c5043da3093 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Fri, 23 Jun 2023 13:11:24 +0100 Subject: [PATCH 1/2] server: add secure in-process Resource Service client --- agent/consul/server.go | 72 ++++++++++++++++++++++++++++++++---------- 1 file changed, 56 insertions(+), 16 deletions(-) diff --git a/agent/consul/server.go b/agent/consul/server.go index 6bb424c6753..ab2d88ddb7b 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -442,10 +442,20 @@ type Server struct { // typeRegistry contains Consul's registered resource types. typeRegistry resource.Registry - // internalResourceServiceClient is a client that can be used to communicate - // with the Resource Service in-process (i.e. not via the network) without auth. - // It should only be used for purely-internal workloads, such as controllers. - internalResourceServiceClient pbresource.ResourceServiceClient + // resourceServiceServer implements the Resource Service. + resourceServiceServer *resourcegrpc.Server + + // insecureResourceServiceClient is a client that can be used to communicate + // with the Resource Service in-process (i.e. not via the network) *without* + // auth. It should only be used for purely-internal workloads, such as + // controllers. + insecureResourceServiceClient pbresource.ResourceServiceClient + + // insecureResourceServiceClient is a client that can be used to communicate + // with the Resource Service in-process (i.e. not via the network) *with* auth. + // It can be used to make requests to the Resource Service on behalf of the user + // (e.g. from the HTTP API). + secureResourceServiceClient pbresource.ResourceServiceClient // controllerManager schedules the execution of controllers. controllerManager *controller.Manager @@ -803,11 +813,15 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom s.grpcHandler = newGRPCHandlerFromConfig(flat, config, s) s.grpcLeaderForwarder = flat.LeaderForwarder - if err := s.setupInternalResourceService(logger); err != nil { + if err := s.setupSecureResourceServiceClient(); err != nil { + return nil, err + } + if err := s.setupInsecureResourceServiceClient(logger); err != nil { return nil, err } + s.controllerManager = controller.NewManager( - s.internalResourceServiceClient, + s.insecureResourceServiceClient, logger.Named(logging.ControllerRuntime), ) s.registerResources(flat) @@ -1334,23 +1348,50 @@ func (s *Server) setupExternalGRPC(config *Config, logger hclog.Logger) { }) s.peerStreamServer.Register(s.externalGRPCServer) - resourcegrpc.NewServer(resourcegrpc.Config{ + s.resourceServiceServer = resourcegrpc.NewServer(resourcegrpc.Config{ Registry: s.typeRegistry, Backend: s.raftStorageBackend, ACLResolver: s.ACLResolver, Logger: logger.Named("grpc-api.resource"), - }).Register(s.externalGRPCServer) + }) + s.resourceServiceServer.Register(s.externalGRPCServer) } -func (s *Server) setupInternalResourceService(logger hclog.Logger) error { - server := grpc.NewServer() - - resourcegrpc.NewServer(resourcegrpc.Config{ +func (s *Server) setupInsecureResourceServiceClient(logger hclog.Logger) error { + server := resourcegrpc.NewServer(resourcegrpc.Config{ Registry: s.typeRegistry, Backend: s.raftStorageBackend, ACLResolver: resolver.DANGER_NO_AUTH{}, Logger: logger.Named("grpc-api.resource"), - }).Register(server) + }) + + conn, err := s.runInProcessGRPCServer(server.Register) + if err != nil { + return err + } + s.insecureResourceServiceClient = pbresource.NewResourceServiceClient(conn) + + return nil +} + +func (s *Server) setupSecureResourceServiceClient() error { + conn, err := s.runInProcessGRPCServer(s.resourceServiceServer.Register) + if err != nil { + return err + } + s.secureResourceServiceClient = pbresource.NewResourceServiceClient(conn) + + return nil +} + +// runInProcessGRPCServer runs a gRPC server that can only be accessed in the +// same process, rather than over the network, using a pipe listener. +func (s *Server) runInProcessGRPCServer(registerFn ...func(*grpc.Server)) (*grpc.ClientConn, error) { + server := grpc.NewServer() + + for _, fn := range registerFn { + fn(server) + } pipe := agentgrpc.NewPipeListener() go server.Serve(pipe) @@ -1367,15 +1408,14 @@ func (s *Server) setupInternalResourceService(logger hclog.Logger) error { ) if err != nil { server.Stop() - return err + return nil, err } go func() { <-s.shutdownCh conn.Close() }() - s.internalResourceServiceClient = pbresource.NewResourceServiceClient(conn) - return nil + return conn, nil } // Shutdown is used to shutdown the server From d63885c738e6a53daa341252fa4336da0a18535c Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Fri, 23 Jun 2023 13:44:02 +0100 Subject: [PATCH 2/2] agent: add ResourceServiceClient method to delegate --- agent/agent.go | 4 ++++ agent/consul/client.go | 15 +++++++++++++++ agent/consul/server.go | 5 +++++ 3 files changed, 24 insertions(+) diff --git a/agent/agent.go b/agent/agent.go index 90bfffc1afe..56f84d5805f 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -70,6 +70,7 @@ import ( "github.com/hashicorp/consul/lib/mutex" "github.com/hashicorp/consul/lib/routine" "github.com/hashicorp/consul/logging" + "github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/proto/private/pboperator" "github.com/hashicorp/consul/proto/private/pbpeering" "github.com/hashicorp/consul/tlsutil" @@ -197,6 +198,9 @@ type delegate interface { RPC(ctx context.Context, method string, args interface{}, reply interface{}) error + // ResourceServiceClient is a client for the gRPC Resource Service. + ResourceServiceClient() pbresource.ResourceServiceClient + SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error Shutdown() error Stats() map[string]map[string]string diff --git a/agent/consul/client.go b/agent/consul/client.go index e4a3f83324a..256e0e58e37 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/logging" + "github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" ) @@ -93,6 +94,9 @@ type Client struct { EnterpriseClient tlsConfigurator *tlsutil.Configurator + + // resourceServiceClient is a client for the gRPC Resource Service. + resourceServiceClient pbresource.ResourceServiceClient } // NewClient creates and returns a Client @@ -151,6 +155,13 @@ func NewClient(config *Config, deps Deps) (*Client, error) { } c.router = deps.Router + conn, err := deps.GRPCConnPool.ClientConn(deps.ConnPool.Datacenter) + if err != nil { + c.Shutdown() + return nil, fmt.Errorf("Failed to get gRPC client connection: %w", err) + } + c.resourceServiceClient = pbresource.NewResourceServiceClient(conn) + // Start LAN event handlers after the router is complete since the event // handlers depend on the router and the router depends on Serf. go c.lanEventHandler() @@ -451,3 +462,7 @@ func (c *Client) AgentEnterpriseMeta() *acl.EnterpriseMeta { func (c *Client) agentSegmentName() string { return c.config.Segment } + +func (c *Client) ResourceServiceClient() pbresource.ResourceServiceClient { + return c.resourceServiceClient +} diff --git a/agent/consul/server.go b/agent/consul/server.go index ab2d88ddb7b..68b3681e7d1 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -943,6 +943,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler s.peerStreamServer.Register(srv) s.externalACLServer.Register(srv) s.externalConnectCAServer.Register(srv) + s.resourceServiceServer.Register(srv) } return agentgrpc.NewHandler(deps.Logger, config.RPCAddr, register, nil, s.incomingRPCLimiter) @@ -2135,6 +2136,10 @@ func (s *Server) hcpServerStatus(deps Deps) hcp.StatusCallback { } } +func (s *Server) ResourceServiceClient() pbresource.ResourceServiceClient { + return s.secureResourceServiceClient +} + func fileExists(name string) (bool, error) { _, err := os.Stat(name) if err == nil {