-
Notifications
You must be signed in to change notification settings - Fork 746
Add xds-server implementation #176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
88e386e
First cut
youngnick 7e14b2a
Working test xds server
youngnick 6cf03da
Complete move from pkg to internal
youngnick 482d32e
Move to logr wrapper for logging
youngnick 68b1708
Renamed CacheVersion to ResourceVersionTable and added comments
youngnick a6bb239
Fix linting
youngnick 0b9c641
Fix go.mod
youngnick c57de3d
Fix go.mod and go.sum again
youngnick c1d79c6
Fix PR comments
youngnick e7f3ee6
Added mutex for snapshotcache and made nodeID handling more resilient
youngnick fbfb23b
Resolve PR comments
youngnick File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,237 @@ | ||
| package cmd | ||
|
|
||
| import ( | ||
| "net" | ||
| "time" | ||
|
|
||
| "github.com/envoyproxy/gateway/internal/ir" | ||
| "github.com/envoyproxy/gateway/internal/log" | ||
| "github.com/envoyproxy/gateway/internal/xds/cache" | ||
| "github.com/envoyproxy/gateway/internal/xds/translator" | ||
| "github.com/spf13/cobra" | ||
| "google.golang.org/grpc" | ||
| "sigs.k8s.io/controller-runtime/pkg/manager/signals" | ||
|
|
||
| controlplane_service_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3" | ||
| controlplane_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" | ||
| controlplane_service_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3" | ||
| controlplane_service_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/service/listener/v3" | ||
| controlplane_service_route_v3 "github.com/envoyproxy/go-control-plane/envoy/service/route/v3" | ||
| controlplane_service_runtime_v3 "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3" | ||
| controlplane_service_secret_v3 "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3" | ||
| controlplane_server_v3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" | ||
| ) | ||
|
|
||
| // The xdstest command is intended just to show how updating the IR can produce different | ||
| // xDS output, including showing that Delta xDS works. | ||
| // You'll need an xDS probe like the `contour cli` command to check. | ||
| // | ||
| // It's also intended that this get removed once we have a full loop implemented in | ||
| // `gateway serve`. | ||
|
|
||
| // getServerCommand returns the server cobra command to be executed. | ||
| func getxDSTestCommand() *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "xdstest", | ||
| Aliases: []string{"xdstest"}, | ||
| Short: "Run a test xDS server", | ||
| RunE: func(cmd *cobra.Command, args []string) error { | ||
| return xDSTest() | ||
| }, | ||
| } | ||
| return cmd | ||
| } | ||
|
|
||
| // xDSTest implements the command. | ||
| // This is deliberately verbose and unoptimized, since the purpose | ||
| // is just to illustrate how the flow will need to work. | ||
| func xDSTest() error { | ||
|
|
||
| // Grab the logr.Logger. | ||
| logger, err := log.NewLogger() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Set the logr Logger to debug. | ||
| // zap's logr impl requires negative levels. | ||
| logger = logger.V(-2) | ||
|
|
||
| ctx := signals.SetupSignalHandler() | ||
|
|
||
| logger.Info("Starting xDS Tester service") | ||
| defer logger.Info("Stopping xDS Tester service") | ||
|
|
||
| // Create three IR versions that we'll swap between, to | ||
| // generate xDS updates for the various methods. | ||
| ir1 := &ir.Xds{ | ||
| HTTP: []*ir.HTTPListener{ | ||
| { | ||
| Name: "first-listener", | ||
| Address: "0.0.0.0", | ||
| Port: 10080, | ||
| Hostnames: []string{ | ||
| "*", | ||
| }, | ||
| Routes: []*ir.HTTPRoute{ | ||
| { | ||
| Name: "first-route", | ||
| Destinations: []*ir.RouteDestination{ | ||
| { | ||
| Host: "1.2.3.4", | ||
| Port: 50000, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| ir2 := &ir.Xds{ | ||
| HTTP: []*ir.HTTPListener{ | ||
| { | ||
| Name: "first-listener", | ||
| Address: "0.0.0.0", | ||
| Port: 10080, | ||
| Hostnames: []string{ | ||
| "*", | ||
| }, | ||
| Routes: []*ir.HTTPRoute{ | ||
| { | ||
| Name: "second-route", | ||
| Destinations: []*ir.RouteDestination{ | ||
| { | ||
| Host: "1.2.3.4", | ||
| Port: 50000, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| ir3 := &ir.Xds{ | ||
| HTTP: []*ir.HTTPListener{ | ||
| { | ||
| Name: "second-listener", | ||
| Address: "0.0.0.0", | ||
| Port: 10080, | ||
| Hostnames: []string{ | ||
| "*", | ||
| }, | ||
| Routes: []*ir.HTTPRoute{ | ||
| { | ||
| Name: "second-route", | ||
| Destinations: []*ir.RouteDestination{ | ||
| { | ||
| Host: "1.2.3.4", | ||
| Port: 50000, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| // Now, we do the translation because everything is static. | ||
| // Normally, we'd do this in response to updates on the | ||
| // message bus. | ||
| cacheVersion1, err := translator.TranslateXDSIR(ir1) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| cacheVersion2, err := translator.TranslateXDSIR(ir2) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| cacheVersion3, err := translator.TranslateXDSIR(ir3) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Set up the gRPC server and register the xDS handler. | ||
| g := grpc.NewServer() | ||
|
|
||
| snapCache := cache.NewSnapshotCache(false, logger) | ||
| RegisterServer(controlplane_server_v3.NewServer(ctx, snapCache, snapCache), g) | ||
|
|
||
| addr := net.JoinHostPort("0.0.0.0", "8001") | ||
| l, err := net.Listen("tcp", addr) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Handle the signals and stop when the signal context does. | ||
| go func() { | ||
| <-ctx.Done() | ||
|
|
||
| // We don't use GracefulStop here because envoy | ||
| // has long-lived hanging xDS requests. There's no | ||
| // mechanism to make those pending requests fail, | ||
| // so we forcibly terminate the TCP sessions. | ||
| g.Stop() | ||
| }() | ||
|
|
||
| // Loop through the various configs, updating the SnapshotCache | ||
| // each time. This will run until the process is killed by signal | ||
| // (SIGINT, SIGKILL etc). | ||
| go func() { | ||
| // This little function sleeps 10 seconds then swaps | ||
| // between various versions of the IR | ||
| logger.Info("Sleeping for a bit before updating the cache") | ||
| for { | ||
| time.Sleep(10 * time.Second) | ||
| logger.Info("Updating the cache for first-listener with first-route") | ||
| err := snapCache.GenerateNewSnapshot(cacheVersion1.GetXdsResources()) | ||
| if err != nil { | ||
| logger.Error(err, "Something went wrong with generating a snapshot") | ||
| } | ||
| time.Sleep(10 * time.Second) | ||
| logger.Info("Updating the cache for first-listener with second-route") | ||
| err = snapCache.GenerateNewSnapshot(cacheVersion2.GetXdsResources()) | ||
| if err != nil { | ||
| logger.Error(err, "Something went wrong with generating a snapshot") | ||
| } | ||
| time.Sleep(10 * time.Second) | ||
| logger.Info("Updating the cache for second-listener with second-route") | ||
| err = snapCache.GenerateNewSnapshot(cacheVersion3.GetXdsResources()) | ||
| if err != nil { | ||
| logger.Error(err, "Something went wrong with generating a snapshot") | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| return g.Serve(l) | ||
|
|
||
| } | ||
|
|
||
| // Some helper stuff that we'll need to put somewhere eventually. | ||
|
|
||
| // Server is a collection of handlers for streaming discovery requests. | ||
| type Server interface { | ||
| controlplane_service_cluster_v3.ClusterDiscoveryServiceServer | ||
| controlplane_service_endpoint_v3.EndpointDiscoveryServiceServer | ||
| controlplane_service_listener_v3.ListenerDiscoveryServiceServer | ||
| controlplane_service_route_v3.RouteDiscoveryServiceServer | ||
| controlplane_service_discovery_v3.AggregatedDiscoveryServiceServer | ||
| controlplane_service_secret_v3.SecretDiscoveryServiceServer | ||
| controlplane_service_runtime_v3.RuntimeDiscoveryServiceServer | ||
| } | ||
|
|
||
| // RegisterServer registers the given xDS protocol Server with the gRPC | ||
| // runtime. | ||
| func RegisterServer(srv Server, g *grpc.Server) { | ||
| // register services | ||
| controlplane_service_discovery_v3.RegisterAggregatedDiscoveryServiceServer(g, srv) | ||
| controlplane_service_secret_v3.RegisterSecretDiscoveryServiceServer(g, srv) | ||
| controlplane_service_cluster_v3.RegisterClusterDiscoveryServiceServer(g, srv) | ||
| controlplane_service_endpoint_v3.RegisterEndpointDiscoveryServiceServer(g, srv) | ||
| controlplane_service_listener_v3.RegisterListenerDiscoveryServiceServer(g, srv) | ||
| controlplane_service_route_v3.RegisterRouteDiscoveryServiceServer(g, srv) | ||
| controlplane_service_runtime_v3.RegisterRuntimeDiscoveryServiceServer(g, srv) | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| package cache | ||
|
|
||
| import ( | ||
| "fmt" | ||
|
|
||
| "github.com/go-logr/logr" | ||
| ) | ||
|
|
||
| // LogrWrapper is a nasty hack for turning the logr.Logger we get from NewLogger() | ||
| // into something that go-control-plane can accept. | ||
| // It seems pretty silly to take a zap logger, which is levelled, turn it into | ||
| // a V-style logr Logger, then turn it back again with this, but here we are. | ||
| // TODO(youngnick): Reopen the logging library discussion then do something about this. | ||
| type LogrWrapper struct { | ||
| logr logr.Logger | ||
| } | ||
|
|
||
| const LevelDebug int = -2 | ||
| const LevelInfo int = 0 | ||
| const LevelWarn int = -1 | ||
|
|
||
| func (l LogrWrapper) Debugf(template string, args ...interface{}) { | ||
|
|
||
| l.logr.V(LevelDebug).Info(fmt.Sprintf(template, args...)) | ||
| } | ||
|
|
||
| func (l LogrWrapper) Infof(template string, args ...interface{}) { | ||
|
|
||
| l.logr.V(LevelInfo).Info(fmt.Sprintf(template, args...)) | ||
| } | ||
|
|
||
| func (l LogrWrapper) Warnf(template string, args ...interface{}) { | ||
|
|
||
| l.logr.V(LevelWarn).Info(fmt.Sprintf(template, args...)) | ||
| } | ||
|
|
||
| func (l LogrWrapper) Errorf(template string, args ...interface{}) { | ||
|
|
||
| l.logr.Error(fmt.Errorf(template, args...), "") | ||
| } | ||
|
|
||
| func NewLogrWrapper(log logr.Logger) *LogrWrapper { | ||
|
|
||
| return &LogrWrapper{ | ||
| logr: log, | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since the input is fixed, this function would be could be a good candidate for a test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, but in order to test it, we'll need an xDS client, which I'll need to write. That will be a reasonable amount of work, so I wanted to do it in a followup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's roll with the test cmd and create the xds client as part of #177