diff --git a/go.mod b/go.mod index d686763ad7..3bc7ad4875 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/spf13/cobra v1.4.0 github.com/stretchr/testify v1.7.1 go.uber.org/zap v1.19.1 + google.golang.org/grpc v1.45.0 k8s.io/api v0.24.2 k8s.io/apimachinery v0.24.2 k8s.io/client-go v0.24.2 @@ -72,6 +73,7 @@ require ( golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect google.golang.org/appengine v1.6.7 // indirect + google.golang.org/genproto v0.0.0-20220329172620-7be39ac1afc7 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect diff --git a/go.sum b/go.sum index 0b72c938e2..9bf546d211 100644 --- a/go.sum +++ b/go.sum @@ -869,6 +869,7 @@ google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxH google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20220107163113-42d7afdf6368/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20220329172620-7be39ac1afc7 h1:HOL66YCI20JvN2hVk6o2YIp9i/3RvzVUz82PqNr7fXw= google.golang.org/genproto v0.0.0-20220329172620-7be39ac1afc7/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= @@ -893,6 +894,7 @@ google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= +google.golang.org/grpc v1.45.0 h1:NEpgUqV3Z+ZjkqMsxMg11IaDrXY4RY6CQukSGK0uI1M= google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= diff --git a/internal/cmd/root.go b/internal/cmd/root.go index 274dbac4a9..6012e2ff42 100644 --- a/internal/cmd/root.go +++ b/internal/cmd/root.go @@ -27,6 +27,7 @@ func GetRootCommand() *cobra.Command { } cmd.AddCommand(getServerCommand()) + cmd.AddCommand(getxDSTestCommand()) return cmd } diff --git a/internal/cmd/xdstest.go b/internal/cmd/xdstest.go new file mode 100644 index 0000000000..211874520c --- /dev/null +++ b/internal/cmd/xdstest.go @@ -0,0 +1,237 @@ +package cmd + +import ( + "net" + "time" + + "github.com/envoyproxy/gateway/internal/ir" + "github.com/envoyproxy/gateway/internal/log" + "github.com/envoyproxy/gateway/internal/xds/cache" + "github.com/envoyproxy/gateway/internal/xds/translator" + "github.com/spf13/cobra" + "google.golang.org/grpc" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" + + controlplane_service_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3" + controlplane_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + controlplane_service_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3" + controlplane_service_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/service/listener/v3" + controlplane_service_route_v3 "github.com/envoyproxy/go-control-plane/envoy/service/route/v3" + controlplane_service_runtime_v3 "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3" + controlplane_service_secret_v3 "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3" + controlplane_server_v3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" +) + +// The xdstest command is intended just to show how updating the IR can produce different +// xDS output, including showing that Delta xDS works. +// You'll need an xDS probe like the `contour cli` command to check. +// +// It's also intended that this get removed once we have a full loop implemented in +// `gateway serve`. + +// getServerCommand returns the server cobra command to be executed. +func getxDSTestCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "xdstest", + Aliases: []string{"xdstest"}, + Short: "Run a test xDS server", + RunE: func(cmd *cobra.Command, args []string) error { + return xDSTest() + }, + } + return cmd +} + +// xDSTest implements the command. +// This is deliberately verbose and unoptimized, since the purpose +// is just to illustrate how the flow will need to work. +func xDSTest() error { + + // Grab the logr.Logger. + logger, err := log.NewLogger() + if err != nil { + return err + } + + // Set the logr Logger to debug. + // zap's logr impl requires negative levels. + logger = logger.V(-2) + + ctx := signals.SetupSignalHandler() + + logger.Info("Starting xDS Tester service") + defer logger.Info("Stopping xDS Tester service") + + // Create three IR versions that we'll swap between, to + // generate xDS updates for the various methods. + ir1 := &ir.Xds{ + HTTP: []*ir.HTTPListener{ + { + Name: "first-listener", + Address: "0.0.0.0", + Port: 10080, + Hostnames: []string{ + "*", + }, + Routes: []*ir.HTTPRoute{ + { + Name: "first-route", + Destinations: []*ir.RouteDestination{ + { + Host: "1.2.3.4", + Port: 50000, + }, + }, + }, + }, + }, + }, + } + + ir2 := &ir.Xds{ + HTTP: []*ir.HTTPListener{ + { + Name: "first-listener", + Address: "0.0.0.0", + Port: 10080, + Hostnames: []string{ + "*", + }, + Routes: []*ir.HTTPRoute{ + { + Name: "second-route", + Destinations: []*ir.RouteDestination{ + { + Host: "1.2.3.4", + Port: 50000, + }, + }, + }, + }, + }, + }, + } + + ir3 := &ir.Xds{ + HTTP: []*ir.HTTPListener{ + { + Name: "second-listener", + Address: "0.0.0.0", + Port: 10080, + Hostnames: []string{ + "*", + }, + Routes: []*ir.HTTPRoute{ + { + Name: "second-route", + Destinations: []*ir.RouteDestination{ + { + Host: "1.2.3.4", + Port: 50000, + }, + }, + }, + }, + }, + }, + } + + // Now, we do the translation because everything is static. + // Normally, we'd do this in response to updates on the + // message bus. + cacheVersion1, err := translator.TranslateXDSIR(ir1) + if err != nil { + return err + } + + cacheVersion2, err := translator.TranslateXDSIR(ir2) + if err != nil { + return err + } + + cacheVersion3, err := translator.TranslateXDSIR(ir3) + if err != nil { + return err + } + + // Set up the gRPC server and register the xDS handler. + g := grpc.NewServer() + + snapCache := cache.NewSnapshotCache(false, logger) + RegisterServer(controlplane_server_v3.NewServer(ctx, snapCache, snapCache), g) + + addr := net.JoinHostPort("0.0.0.0", "8001") + l, err := net.Listen("tcp", addr) + if err != nil { + return err + } + + // Handle the signals and stop when the signal context does. + go func() { + <-ctx.Done() + + // We don't use GracefulStop here because envoy + // has long-lived hanging xDS requests. There's no + // mechanism to make those pending requests fail, + // so we forcibly terminate the TCP sessions. + g.Stop() + }() + + // Loop through the various configs, updating the SnapshotCache + // each time. This will run until the process is killed by signal + // (SIGINT, SIGKILL etc). + go func() { + // This little function sleeps 10 seconds then swaps + // between various versions of the IR + logger.Info("Sleeping for a bit before updating the cache") + for { + time.Sleep(10 * time.Second) + logger.Info("Updating the cache for first-listener with first-route") + err := snapCache.GenerateNewSnapshot(cacheVersion1.GetXdsResources()) + if err != nil { + logger.Error(err, "Something went wrong with generating a snapshot") + } + time.Sleep(10 * time.Second) + logger.Info("Updating the cache for first-listener with second-route") + err = snapCache.GenerateNewSnapshot(cacheVersion2.GetXdsResources()) + if err != nil { + logger.Error(err, "Something went wrong with generating a snapshot") + } + time.Sleep(10 * time.Second) + logger.Info("Updating the cache for second-listener with second-route") + err = snapCache.GenerateNewSnapshot(cacheVersion3.GetXdsResources()) + if err != nil { + logger.Error(err, "Something went wrong with generating a snapshot") + } + } + }() + + return g.Serve(l) + +} + +// Some helper stuff that we'll need to put somewhere eventually. + +// Server is a collection of handlers for streaming discovery requests. +type Server interface { + controlplane_service_cluster_v3.ClusterDiscoveryServiceServer + controlplane_service_endpoint_v3.EndpointDiscoveryServiceServer + controlplane_service_listener_v3.ListenerDiscoveryServiceServer + controlplane_service_route_v3.RouteDiscoveryServiceServer + controlplane_service_discovery_v3.AggregatedDiscoveryServiceServer + controlplane_service_secret_v3.SecretDiscoveryServiceServer + controlplane_service_runtime_v3.RuntimeDiscoveryServiceServer +} + +// RegisterServer registers the given xDS protocol Server with the gRPC +// runtime. +func RegisterServer(srv Server, g *grpc.Server) { + // register services + controlplane_service_discovery_v3.RegisterAggregatedDiscoveryServiceServer(g, srv) + controlplane_service_secret_v3.RegisterSecretDiscoveryServiceServer(g, srv) + controlplane_service_cluster_v3.RegisterClusterDiscoveryServiceServer(g, srv) + controlplane_service_endpoint_v3.RegisterEndpointDiscoveryServiceServer(g, srv) + controlplane_service_listener_v3.RegisterListenerDiscoveryServiceServer(g, srv) + controlplane_service_route_v3.RegisterRouteDiscoveryServiceServer(g, srv) + controlplane_service_runtime_v3.RegisterRuntimeDiscoveryServiceServer(g, srv) +} diff --git a/internal/xds/cache/logrwrapper.go b/internal/xds/cache/logrwrapper.go new file mode 100644 index 0000000000..5c1f073284 --- /dev/null +++ b/internal/xds/cache/logrwrapper.go @@ -0,0 +1,47 @@ +package cache + +import ( + "fmt" + + "github.com/go-logr/logr" +) + +// LogrWrapper is a nasty hack for turning the logr.Logger we get from NewLogger() +// into something that go-control-plane can accept. +// It seems pretty silly to take a zap logger, which is levelled, turn it into +// a V-style logr Logger, then turn it back again with this, but here we are. +// TODO(youngnick): Reopen the logging library discussion then do something about this. +type LogrWrapper struct { + logr logr.Logger +} + +const LevelDebug int = -2 +const LevelInfo int = 0 +const LevelWarn int = -1 + +func (l LogrWrapper) Debugf(template string, args ...interface{}) { + + l.logr.V(LevelDebug).Info(fmt.Sprintf(template, args...)) +} + +func (l LogrWrapper) Infof(template string, args ...interface{}) { + + l.logr.V(LevelInfo).Info(fmt.Sprintf(template, args...)) +} + +func (l LogrWrapper) Warnf(template string, args ...interface{}) { + + l.logr.V(LevelWarn).Info(fmt.Sprintf(template, args...)) +} + +func (l LogrWrapper) Errorf(template string, args ...interface{}) { + + l.logr.Error(fmt.Errorf(template, args...), "") +} + +func NewLogrWrapper(log logr.Logger) *LogrWrapper { + + return &LogrWrapper{ + logr: log, + } +} diff --git a/internal/xds/cache/snapshotcache.go b/internal/xds/cache/snapshotcache.go new file mode 100644 index 0000000000..5b6997cd98 --- /dev/null +++ b/internal/xds/cache/snapshotcache.go @@ -0,0 +1,331 @@ +// Portions of this code are based on code from Contour, available at: +// https://github.com/projectcontour/contour/blob/main/internal/xds/v3/snapshotter.go + +package cache + +import ( + "context" + "fmt" + "math" + "strconv" + "sync" + + envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + envoy_cache_v3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + envoy_server_v3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" + "github.com/go-logr/logr" + + "github.com/envoyproxy/gateway/internal/xds/types" +) + +var Hash = envoy_cache_v3.IDHash{} + +// SnapshotCacheWithCallbacks uses the go-control-plane SimpleCache to store snapshots of +// Envoy resources, sliced by Node ID so that we can do incremental xDS properly. +// It does this by also implementing callbacks to make sure that the cache is kept +// up to date for each new node. +// +// Having the cache also implement the callbacks is a little bit hacky, but it makes sure +// that all the required bookkeeping happens. +// TODO(youngnick): Talk to the go-control-plane maintainers and see if we can upstream +// this in a better way. +type SnapshotCacheWithCallbacks interface { + envoy_cache_v3.SnapshotCache + envoy_server_v3.Callbacks + GenerateNewSnapshot(types.XdsResources) error +} + +type snapshotcache struct { + envoy_cache_v3.SnapshotCache + + lastSnapshot *envoy_cache_v3.Snapshot + + log *LogrWrapper + + streamIDNodeID map[int64]string + + snapshotVersion int64 + + mu sync.Mutex +} + +// GenerateNewSnapshot takes a table of resources (the output from the IR->xDS +// translator) and updates the snapshot version. +func (s *snapshotcache) GenerateNewSnapshot(resources types.XdsResources) error { + + s.mu.Lock() + defer s.mu.Unlock() + + version := s.newSnapshotVersion() + + // Create a snapshot with all xDS resources. + snapshot, err := envoy_cache_v3.NewSnapshot( + version, + resources, + ) + if err != nil { + return err + } + + s.lastSnapshot = snapshot + + for _, node := range s.getNodeIDs() { + s.log.Debugf("Generating a snapshot with Node %s", node) + err := s.SetSnapshot(context.TODO(), node, snapshot) + if err != nil { + return err + } + } + + return nil + +} + +// newSnapshotVersion increments the current snapshotVersion +// and returns as a string. +func (s *snapshotcache) newSnapshotVersion() string { + + // Reset the snapshotVersion if it ever hits max size. + if s.snapshotVersion == math.MaxInt64 { + s.snapshotVersion = 0 + } + + // Increment the snapshot version & return as string. + s.snapshotVersion++ + return strconv.FormatInt(s.snapshotVersion, 10) +} + +// NewSnapshotCache gives you a fresh SnapshotCache. +// It needs a logger that supports the go-control-plane +// required interface (Debugf, Infof, Warnf, and Errorf). +func NewSnapshotCache(ads bool, logger logr.Logger) SnapshotCacheWithCallbacks { + // Set up the nasty wrapper hack. + wrappedLogger := NewLogrWrapper(logger) + return &snapshotcache{ + SnapshotCache: envoy_cache_v3.NewSnapshotCache(ads, &Hash, wrappedLogger), + log: wrappedLogger, + streamIDNodeID: make(map[int64]string), + } +} + +func (s *snapshotcache) getNodeIDs() []string { + + var nodeIDs []string + + for _, node := range s.streamIDNodeID { + + nodeIDs = append(nodeIDs, node) + + } + + return nodeIDs + +} + +// OnStreamOpen and the other OnStream* functions implement the callbacks for the +// state-of-the-world stream types. +func (s *snapshotcache) OnStreamOpen(ctx context.Context, streamID int64, typeURL string) error { + + s.mu.Lock() + defer s.mu.Unlock() + + s.streamIDNodeID[streamID] = "" + + return nil +} + +func (s *snapshotcache) OnStreamClosed(streamID int64) { + + s.mu.Lock() + defer s.mu.Unlock() + + delete(s.streamIDNodeID, streamID) + +} + +func (s *snapshotcache) OnStreamRequest(streamID int64, req *envoy_service_discovery_v3.DiscoveryRequest) error { + + s.mu.Lock() + // We could do this a little earlier than the defer, since the last half of this func is only logging + // but that seemed like a premature optimization. + defer s.mu.Unlock() + + // It's possible that only the first discovery request will have a node ID set. + // We also need to save the node ID to the node list anyway. + // So check if we have a nodeID for this stream already, then set it if not. + nodeID := s.streamIDNodeID[streamID] + if nodeID == "" { + nodeID = Hash.ID(req.Node) + if nodeID == "" { + return fmt.Errorf("couldn't hash the node ID from the first discovery request on stream %d", streamID) + } + s.log.Debugf("First discovery request on stream %d, got nodeID %s", streamID, nodeID) + s.streamIDNodeID[streamID] = nodeID + } + + var nodeVersion string + + var errorCode int32 + var errorMessage string + + // If no snapshot has been generated yet, we can't do anything, so don't mess with this request. + // go-control-plane will respond with an empty response, then send an update when a snapshot is generated. + if s.lastSnapshot == nil { + return nil + } + + _, err := s.GetSnapshot(nodeID) + if err != nil { + err = s.SetSnapshot(context.TODO(), nodeID, s.lastSnapshot) + if err != nil { + return err + } + } + + if req.Node != nil { + if bv := req.Node.GetUserAgentBuildVersion(); bv != nil && bv.Version != nil { + nodeVersion = fmt.Sprintf("v%d.%d.%d", bv.Version.MajorNumber, bv.Version.MinorNumber, bv.Version.Patch) + } + } + + s.log.Debugf("Got a new request, version_info %s, response_nonce %s, nodeID %s, node_version %s", req.VersionInfo, req.ResponseNonce, nodeID, nodeVersion) + + if status := req.ErrorDetail; status != nil { + // if Envoy rejected the last update log the details here. + // TODO(youngnick): Handle NACK properly + errorCode = status.Code + errorMessage = status.Message + } + + s.log.Debugf("handling v3 xDS resource request, version_info %s, response_nonce %s, nodeID %s, node_version %s, resource_names %v, type_url %s, errorCode %d, errorMessage %s", + req.VersionInfo, req.ResponseNonce, + nodeID, nodeVersion, req.ResourceNames, req.GetTypeUrl(), + errorCode, errorMessage) + + return nil +} + +func (s *snapshotcache) OnStreamResponse(ctx context.Context, streamID int64, req *envoy_service_discovery_v3.DiscoveryRequest, resp *envoy_service_discovery_v3.DiscoveryResponse) { + + // No mutex lock required here because no writing to the cache. + + nodeID := s.streamIDNodeID[streamID] + if nodeID == "" { + s.log.Errorf("Tried to send a response to a node we haven't seen yet on stream %d", streamID) + } + + s.log.Debugf("Sending Response on stream %d to node %s", streamID, nodeID) +} + +// OnDeltaStreamOpen and the other OnDeltaStream*/OnStreamDelta* functions implement +// the callbacks for the incremental xDS versions. +// Yes, the different ordering in the name is part of the go-control-plane interface. +func (s *snapshotcache) OnDeltaStreamOpen(ctx context.Context, streamID int64, typeURL string) error { + + s.mu.Lock() + defer s.mu.Unlock() + + // Ensure that we're adding the streamID to the Node ID list. + s.streamIDNodeID[streamID] = "" + + return nil +} + +func (s *snapshotcache) OnDeltaStreamClosed(streamID int64) { + + s.mu.Lock() + defer s.mu.Unlock() + + delete(s.streamIDNodeID, streamID) + +} + +func (s *snapshotcache) OnStreamDeltaRequest(streamID int64, req *envoy_service_discovery_v3.DeltaDiscoveryRequest) error { + + s.mu.Lock() + // We could do this a little earlier than with a defer, since the last half of this func is logging + // but that seemed like a premature optimization. + defer s.mu.Unlock() + + var nodeVersion string + + var errorCode int32 + var errorMessage string + + // It's possible that only the first incremental discovery request will have a node ID set. + // We also need to save the node ID to the node list anyway. + // So check if we have a nodeID for this stream already, then set it if not. + nodeID := s.streamIDNodeID[streamID] + if nodeID == "" { + nodeID = Hash.ID(req.Node) + if nodeID == "" { + return fmt.Errorf("couldn't hash the node ID from the first incremental discovery request on stream %d", streamID) + } + s.log.Debugf("First incremental discovery request on stream %d, got nodeID %s", streamID, nodeID) + s.streamIDNodeID[streamID] = nodeID + } + + // If no snapshot has been written into the snapshotcache yet, we can't do anything, so don't mess with + // this request. go-control-plane will respond with an empty response, then send an update when a + // snapshot is generated. + if s.lastSnapshot == nil { + return nil + } + + _, err := s.GetSnapshot(nodeID) + if err != nil { + err = s.SetSnapshot(context.TODO(), nodeID, s.lastSnapshot) + if err != nil { + return err + } + + } + + if req.Node != nil { + + if bv := req.Node.GetUserAgentBuildVersion(); bv != nil && bv.Version != nil { + nodeVersion = fmt.Sprintf("v%d.%d.%d", bv.Version.MajorNumber, bv.Version.MinorNumber, bv.Version.Patch) + } + } + + s.log.Debugf("Got a new request, response_nonce %s, nodeID %s, node_version %s", + req.ResponseNonce, nodeID, nodeVersion) + + if status := req.ErrorDetail; status != nil { + // if Envoy rejected the last update log the details here. + // TODO(youngnick): Handle NACK properly + errorCode = status.Code + errorMessage = status.Message + } + + s.log.Debugf("handling v3 xDS resource request, response_nonce %s, nodeID %s, node_version %s, resource_names_subscribe %v, resource_names_unsubscribe %v, type_url %s, errorCode %d, errorMessage %s", + req.ResponseNonce, + nodeID, nodeVersion, + req.ResourceNamesSubscribe, req.ResourceNamesUnsubscribe, + req.GetTypeUrl(), + errorCode, errorMessage) + + return nil +} + +func (s *snapshotcache) OnStreamDeltaResponse(streamID int64, req *envoy_service_discovery_v3.DeltaDiscoveryRequest, resp *envoy_service_discovery_v3.DeltaDiscoveryResponse) { + + // No mutex lock required here because no writing to the cache. + + nodeID := s.streamIDNodeID[streamID] + if nodeID == "" { + s.log.Errorf("Tried to send a response to a node we haven't seen yet on stream %d", streamID) + } + + s.log.Debugf("Sending Incremental Response on stream %d to node %s", streamID, nodeID) + +} + +func (s *snapshotcache) OnFetchRequest(ctx context.Context, req *envoy_service_discovery_v3.DiscoveryRequest) error { + + return nil +} + +func (s *snapshotcache) OnFetchResponse(req *envoy_service_discovery_v3.DiscoveryRequest, resp *envoy_service_discovery_v3.DiscoveryResponse) { + +} diff --git a/internal/xds/translator/context.go b/internal/xds/translator/context.go deleted file mode 100644 index 074bf033c9..0000000000 --- a/internal/xds/translator/context.go +++ /dev/null @@ -1,30 +0,0 @@ -package translator - -import ( - "github.com/envoyproxy/go-control-plane/pkg/cache/types" - resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3" -) - -// XdsResources represents all the xds resources -type XdsResources map[resource.Type][]types.Resource - -// Context holds all the translated xds resources -type Context struct { - xdsResources XdsResources -} - -// GetXdsResources retrieves the translated xds resources saved in the translator context. -func (t *Context) GetXdsResources() XdsResources { - return t.xdsResources -} - -func (t *Context) addXdsResource(rType resource.Type, xdsResource types.Resource) { - if t.xdsResources == nil { - t.xdsResources = make(XdsResources) - } - if t.xdsResources[rType] == nil { - t.xdsResources[rType] = make([]types.Resource, 0, 1) - } - - t.xdsResources[rType] = append(t.xdsResources[rType], xdsResource) -} diff --git a/internal/xds/translator/in/xds-ir/http-route.yaml b/internal/xds/translator/in/xds-ir/http-route.yaml deleted file mode 100644 index 0f32e85d0f..0000000000 --- a/internal/xds/translator/in/xds-ir/http-route.yaml +++ /dev/null @@ -1,10 +0,0 @@ -name: "http-route" -http: -- name: "listener-1" - port: 10080 - hostnames: - - "*" - routes: - - destinations: - - host: "1.2.3.4" - port: 50000 diff --git a/internal/xds/translator/translator.go b/internal/xds/translator/translator.go index 218e8cdd81..9e45a55931 100644 --- a/internal/xds/translator/translator.go +++ b/internal/xds/translator/translator.go @@ -10,15 +10,16 @@ import ( "github.com/tetratelabs/multierror" "github.com/envoyproxy/gateway/internal/ir" + "github.com/envoyproxy/gateway/internal/xds/types" ) -// TranslateXdsIR translates the XDS IR into xDS resources -func TranslateXdsIR(ir *ir.Xds) (*Context, error) { +// TranslateXDSIR translates the XDS IR into xDS resources +func TranslateXDSIR(ir *ir.Xds) (*types.ResourceVersionTable, error) { if ir == nil { return nil, errors.New("ir is nil") } - tCtx := new(Context) + tCtx := new(types.ResourceVersionTable) for _, httpListener := range ir.HTTP { // 1:1 between IR HTTPListener and xDS Listener @@ -29,7 +30,7 @@ func TranslateXdsIR(ir *ir.Xds) (*Context, error) { // 1:1 between IR TLSListenerConfig and xDS Secret if httpListener.TLS != nil { - // Build downstream TLS context. + // Build downstream TLS details. tSocket, err := buildXdsDownstreamTLSSocket(httpListener.Name, httpListener.TLS) if err != nil { return nil, multierror.Append(err, errors.New("error building xds listener tls socket")) @@ -40,7 +41,7 @@ func TranslateXdsIR(ir *ir.Xds) (*Context, error) { if err != nil { return nil, multierror.Append(err, errors.New("error building xds listener tls secret")) } - tCtx.addXdsResource(resource.SecretType, secret) + tCtx.AddXdsResource(resource.SecretType, secret) } // Allocate virtual host for this httpListener. @@ -64,7 +65,7 @@ func TranslateXdsIR(ir *ir.Xds) (*Context, error) { if err != nil { return nil, multierror.Append(err, errors.New("error building xds cluster")) } - tCtx.addXdsResource(resource.ClusterType, xdsCluster) + tCtx.AddXdsResource(resource.ClusterType, xdsCluster) } @@ -73,8 +74,8 @@ func TranslateXdsIR(ir *ir.Xds) (*Context, error) { } xdsRouteCfg.VirtualHosts = append(xdsRouteCfg.VirtualHosts, vHost) - tCtx.addXdsResource(resource.ListenerType, xdsListener) - tCtx.addXdsResource(resource.RouteType, xdsRouteCfg) + tCtx.AddXdsResource(resource.ListenerType, xdsListener) + tCtx.AddXdsResource(resource.RouteType, xdsRouteCfg) } return tCtx, nil diff --git a/internal/xds/translator/translator_test.go b/internal/xds/translator/translator_test.go index 07ed92793d..a414461c04 100644 --- a/internal/xds/translator/translator_test.go +++ b/internal/xds/translator/translator_test.go @@ -35,11 +35,11 @@ func TestTranslateXdsIR(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { ir := requireXdsIRFromInputTestData(t, "xds-ir", tc.name+".yaml") - tCtx, err := TranslateXdsIR(ir) + tCtx, err := TranslateXDSIR(ir) require.NoError(t, err) - listeners := tCtx.xdsResources[resource.ListenerType] - routes := tCtx.xdsResources[resource.RouteType] - clusters := tCtx.xdsResources[resource.ClusterType] + listeners := tCtx.XdsResources[resource.ListenerType] + routes := tCtx.XdsResources[resource.RouteType] + clusters := tCtx.XdsResources[resource.ClusterType] require.Equal(t, requireTestDataOutFile(t, "xds-ir", tc.name+".listeners.yaml"), requireResourcesToYAMLString(t, listeners)) require.Equal(t, requireTestDataOutFile(t, "xds-ir", tc.name+".routes.yaml"), requireResourcesToYAMLString(t, routes)) require.Equal(t, requireTestDataOutFile(t, "xds-ir", tc.name+".clusters.yaml"), requireResourcesToYAMLString(t, clusters)) diff --git a/internal/xds/types/resourceversiontable.go b/internal/xds/types/resourceversiontable.go new file mode 100644 index 0000000000..91153df22a --- /dev/null +++ b/internal/xds/types/resourceversiontable.go @@ -0,0 +1,30 @@ +package types + +import ( + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3" +) + +// XdsResources represents all the xds resources +type XdsResources map[resource.Type][]types.Resource + +// ResourceVersionTable holds all the translated xds resources +type ResourceVersionTable struct { + XdsResources XdsResources +} + +// GetXdsResources retrieves the translated xds resources saved in the translator context. +func (t *ResourceVersionTable) GetXdsResources() XdsResources { + return t.XdsResources +} + +func (t *ResourceVersionTable) AddXdsResource(rType resource.Type, xdsResource types.Resource) { + if t.XdsResources == nil { + t.XdsResources = make(XdsResources) + } + if t.XdsResources[rType] == nil { + t.XdsResources[rType] = make([]types.Resource, 0, 1) + } + + t.XdsResources[rType] = append(t.XdsResources[rType], xdsResource) +}