From a25b686526c9cb9de180dbe95d52f176c90c8dc0 Mon Sep 17 00:00:00 2001 From: Andrew McDermott Date: Fri, 24 Jul 2020 18:04:04 +0100 Subject: [PATCH] Handle EndpointSlice resource --- pkg/cmd/infra/router/router.go | 7 +- .../controller/endpointsubset/sort_address.go | 97 +++++++++++++++ .../controller/endpointsubset/sort_port.go | 104 ++++++++++++++++ pkg/router/controller/factory/factory.go | 111 ++++++++++++++++-- pkg/router/controller/router_controller.go | 83 ++++++++++++- pkg/router/template/plugin.go | 17 ++- 6 files changed, 406 insertions(+), 13 deletions(-) create mode 100644 pkg/router/controller/endpointsubset/sort_address.go create mode 100644 pkg/router/controller/endpointsubset/sort_port.go diff --git a/pkg/cmd/infra/router/router.go b/pkg/cmd/infra/router/router.go index f11a0d7f3..a37502d26 100644 --- a/pkg/cmd/infra/router/router.go +++ b/pkg/cmd/infra/router/router.go @@ -68,6 +68,10 @@ type RouterSelection struct { ExtendedValidation bool ListenAddr string + + // WatchEndpoints when true will watch Endpoints instead of + // EndpointSlices. + WatchEndpoints bool } // Bind sets the appropriate labels @@ -92,6 +96,7 @@ func (o *RouterSelection) Bind(flag *pflag.FlagSet) { flag.Bool("enable-ingress", false, "Enable configuration via ingress resources.") flag.MarkDeprecated("enable-ingress", "Ingress resources are now synchronized to routes automatically.") flag.StringVar(&o.ListenAddr, "listen-addr", env("ROUTER_LISTEN_ADDR", ""), "The name of an interface to listen on to expose metrics and health checking. If not specified, will not listen. Overrides stats port.") + flag.BoolVar(&o.WatchEndpoints, "watch-endpoints", isTrue(env("ROUTER_WATCH_ENDPOINTS", "")), "Watch Endpoints instead of the EndpointSlice resource.") } // RouteUpdate updates the route before it is seen by the cache. @@ -239,7 +244,7 @@ func (o *RouterSelection) Complete() error { // NewFactory initializes a factory that will watch the requested routes func (o *RouterSelection) NewFactory(routeclient routeclientset.Interface, projectclient projectclient.ProjectInterface, kc kclientset.Interface) *controllerfactory.RouterControllerFactory { - factory := controllerfactory.NewDefaultRouterControllerFactory(routeclient, projectclient, kc) + factory := controllerfactory.NewDefaultRouterControllerFactory(routeclient, projectclient, kc, o.WatchEndpoints) factory.LabelSelector = o.LabelSelector factory.FieldSelector = o.FieldSelector factory.Namespace = o.Namespace diff --git a/pkg/router/controller/endpointsubset/sort_address.go b/pkg/router/controller/endpointsubset/sort_address.go new file mode 100644 index 000000000..4f541ab4a --- /dev/null +++ b/pkg/router/controller/endpointsubset/sort_address.go @@ -0,0 +1,97 @@ +package endpointsubset + +import ( + "bytes" + "net" + "sort" + "strings" + + kapi "k8s.io/api/core/v1" +) + +type EndpointAddressCmpFunc func(x, y *kapi.EndpointAddress) int +type EndpointAddressLessFunc func(x, y *kapi.EndpointAddress) bool + +type endpointAddressMultiSorter struct { + addresses []kapi.EndpointAddress + less []EndpointAddressLessFunc +} + +var ( + EndpointAddressHostnameCmpFn = func(x, y *kapi.EndpointAddress) int { + return strings.Compare(x.Hostname, y.Hostname) + } + + EndpointAddressIPCmpFn = func(x, y *kapi.EndpointAddress) int { + return bytes.Compare(net.ParseIP(x.IP), net.ParseIP(y.IP)) + } + + EndpointAddressHostnameLessFn = func(x, y *kapi.EndpointAddress) bool { + return EndpointAddressHostnameCmpFn(x, y) < 0 + } + + EndpointAddressIPLessFn = func(x, y *kapi.EndpointAddress) bool { + return EndpointAddressIPCmpFn(x, y) < 0 + } +) + +var _ sort.Interface = &endpointAddressMultiSorter{} + +// Sort sorts the argument slice according to the comparator functions +// passed to orderBy. +func (s *endpointAddressMultiSorter) Sort(addresses []kapi.EndpointAddress) { + s.addresses = addresses + sort.Sort(s) +} + +// newEndpointAddressOrderBy returns a Sorter that sorts using a number +// of comparator functions. +func newEndpointAddressOrderBy(less ...EndpointAddressLessFunc) *endpointAddressMultiSorter { + return &endpointAddressMultiSorter{ + less: less, + } +} + +// Len is part of sort.Interface. +func (s *endpointAddressMultiSorter) Len() int { + return len(s.addresses) +} + +// Swap is part of sort.Interface. +func (s *endpointAddressMultiSorter) Swap(i, j int) { + s.addresses[i], s.addresses[j] = s.addresses[j], s.addresses[i] +} + +// Less is part of sort.Interface. +func (s *endpointAddressMultiSorter) Less(i, j int) bool { + p, q := s.addresses[i], s.addresses[j] + + // Try all but the last comparison. + var k int + for k = 0; k < len(s.less)-1; k++ { + less := s.less[k] + switch { + case less(&p, &q): + return true + case less(&q, &p): + return false + } + // p == q; try the next comparison. + } + + return s.less[k](&p, &q) +} + +func DefaultEndpointAddressOrderByFuncs() []EndpointAddressLessFunc { + return []EndpointAddressLessFunc{ + EndpointAddressIPLessFn, + EndpointAddressHostnameLessFn, + } +} + +func SortAddresses(addresses []kapi.EndpointAddress, orderByFuncs ...EndpointAddressLessFunc) { + if len(orderByFuncs) == 0 { + orderByFuncs = DefaultEndpointAddressOrderByFuncs() + } + newEndpointAddressOrderBy(orderByFuncs...).Sort(addresses) +} diff --git a/pkg/router/controller/endpointsubset/sort_port.go b/pkg/router/controller/endpointsubset/sort_port.go new file mode 100644 index 000000000..dd1d070b7 --- /dev/null +++ b/pkg/router/controller/endpointsubset/sort_port.go @@ -0,0 +1,104 @@ +package endpointsubset + +import ( + "sort" + "strings" + + kapi "k8s.io/api/core/v1" +) + +type EndpointPortCmpFunc func(x, y *kapi.EndpointPort) int +type EndpointPortLessFunc func(x, y *kapi.EndpointPort) bool + +type endpointPortMultiSorter struct { + ports []kapi.EndpointPort + less []EndpointPortLessFunc +} + +var _ sort.Interface = &endpointPortMultiSorter{} + +var ( + EndpointPortNameCmpFn = func(x, y *kapi.EndpointPort) int { + return strings.Compare(x.Name, y.Name) + } + + EndpointPortNameLessFn = func(x, y *kapi.EndpointPort) bool { + return EndpointPortNameCmpFn(x, y) < 0 + } + + EndpointPortNumberCmpFn = func(x, y *kapi.EndpointPort) int { + return int(x.Port - y.Port) + } + + EndpointPortPortNumberLessFn = func(x, y *kapi.EndpointPort) bool { + return EndpointPortNumberCmpFn(x, y) < 0 + } + + EndpointPortProtocolCmpFn = func(x, y *kapi.EndpointPort) int { + return strings.Compare(string(x.Protocol), string(y.Protocol)) + } + + EndpointPortProtocolLessFn = func(x, y *kapi.EndpointPort) bool { + return EndpointPortProtocolCmpFn(x, y) < 0 + } +) + +// Sort sorts the argument slice according to the comparator functions +// passed to orderBy. +func (s *endpointPortMultiSorter) Sort(ports []kapi.EndpointPort) { + s.ports = ports + sort.Sort(s) +} + +// endpointPortOrderBy returns a Sorter that sorts using a number +// of comparator functions. +func endpointPortOrderBy(less ...EndpointPortLessFunc) *endpointPortMultiSorter { + return &endpointPortMultiSorter{ + less: less, + } +} + +// Len is part of sort.Interface. +func (s *endpointPortMultiSorter) Len() int { + return len(s.ports) +} + +// Swap is part of sort.Interface. +func (s *endpointPortMultiSorter) Swap(i, j int) { + s.ports[i], s.ports[j] = s.ports[j], s.ports[i] +} + +// Less is part of sort.Interface. +func (s *endpointPortMultiSorter) Less(i, j int) bool { + p, q := s.ports[i], s.ports[j] + + // Try all but the last comparison. + var k int + for k = 0; k < len(s.less)-1; k++ { + less := s.less[k] + switch { + case less(&p, &q): + return true + case less(&q, &p): + return false + } + // p == q; try the next comparison. + } + + return s.less[k](&p, &q) +} + +func DefaultEndpointPortOrderByFuncs() []EndpointPortLessFunc { + return []EndpointPortLessFunc{ + EndpointPortPortNumberLessFn, + EndpointPortProtocolLessFn, + EndpointPortNameLessFn, + } +} + +func SortPorts(ports []kapi.EndpointPort, orderByFuncs ...EndpointPortLessFunc) { + if len(orderByFuncs) == 0 { + orderByFuncs = DefaultEndpointPortOrderByFuncs() + } + endpointPortOrderBy(orderByFuncs...).Sort(ports) +} diff --git a/pkg/router/controller/factory/factory.go b/pkg/router/controller/factory/factory.go index f2a7d7b9e..93b945139 100644 --- a/pkg/router/controller/factory/factory.go +++ b/pkg/router/controller/factory/factory.go @@ -3,11 +3,13 @@ package factory import ( "context" "fmt" + "path" "reflect" "sort" "time" kapi "k8s.io/api/core/v1" + discoveryv1beta1 "k8s.io/api/discovery/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -32,6 +34,8 @@ import ( const ( DefaultResyncInterval = 30 * time.Minute + ServiceNameLabel = "kubernetes.io/service-name" + ServiceNameIndex = "service-name" ) var log = logf.Logger.WithName("controller_factory") @@ -52,19 +56,21 @@ type RouterControllerFactory struct { ProjectLabels labels.Selector RouteModifierFn func(route *routev1.Route) - informers map[reflect.Type]kcache.SharedIndexInformer + informers map[reflect.Type]kcache.SharedIndexInformer + watchEndpoints bool } // NewDefaultRouterControllerFactory initializes a default router controller factory. -func NewDefaultRouterControllerFactory(rc routeclientset.Interface, pc projectclient.ProjectInterface, kc kclientset.Interface) *RouterControllerFactory { +func NewDefaultRouterControllerFactory(rc routeclientset.Interface, pc projectclient.ProjectInterface, kc kclientset.Interface, watchEndpoints bool) *RouterControllerFactory { return &RouterControllerFactory{ KClient: kc, RClient: rc, ProjectClient: pc, ResyncInterval: DefaultResyncInterval, - Namespace: metav1.NamespaceAll, - informers: map[reflect.Type]kcache.SharedIndexInformer{}, + Namespace: metav1.NamespaceAll, + informers: map[reflect.Type]kcache.SharedIndexInformer{}, + watchEndpoints: watchEndpoints, } } @@ -104,7 +110,11 @@ func (f *RouterControllerFactory) initInformers(rc *routercontroller.RouterContr if f.NamespaceLabels != nil { f.createNamespacesSharedInformer() } - f.createEndpointsSharedInformer() + if f.watchEndpoints { + f.createEndpointsSharedInformer() + } else { + f.createEndpointSliceSharedInformer() + } f.CreateRoutesSharedInformer() if rc.WatchNodes { @@ -115,6 +125,7 @@ func (f *RouterControllerFactory) initInformers(rc *routercontroller.RouterContr for _, informer := range f.informers { go informer.Run(utilwait.NeverStop) } + // Wait for informers cache to be synced for objType, informer := range f.informers { if !kcache.WaitForCacheSync(utilwait.NeverStop, informer.HasSynced) { @@ -127,13 +138,40 @@ func (f *RouterControllerFactory) registerInformerEventHandlers(rc *routercontro if f.NamespaceLabels != nil { f.registerSharedInformerEventHandlers(&kapi.Namespace{}, rc.HandleNamespace) } - f.registerSharedInformerEventHandlers(&kapi.Endpoints{}, rc.HandleEndpoints) + if f.watchEndpoints { + f.registerSharedInformerEventHandlers(&kapi.Endpoints{}, rc.HandleEndpoints) + } else { + f.registerSharedInformerEventHandlers(&discoveryv1beta1.EndpointSlice{}, func(eventType watch.EventType, obj interface{}) { + eps := obj.(*discoveryv1beta1.EndpointSlice) + if serviceName := endpointSliceServiceName(eps); len(serviceName) == 0 { + log.V(4).Info("EndpointSlice has no service name", "namespace", eps.Namespace, "name", eps.Name, "label", ServiceNameLabel) + } else { + objMeta := eps.ObjectMeta.DeepCopy() + objMeta.Name = serviceName + rc.HandleEndpointSlice(eventType, *objMeta, f.aggregateEndpointSlice(eps.Namespace, serviceName)) + } + }) + } f.registerSharedInformerEventHandlers(&routev1.Route{}, rc.HandleRoute) if rc.WatchNodes { f.registerSharedInformerEventHandlers(&kapi.Node{}, rc.HandleNode) } + +} + +func (f *RouterControllerFactory) aggregateEndpointSlice(namespace, name string) []discoveryv1beta1.EndpointSlice { + objType := reflect.TypeOf(&discoveryv1beta1.EndpointSlice{}) + objs, _ := f.informers[objType].GetIndexer().ByIndex(ServiceNameIndex, path.Join(namespace, name)) + fullSet := make([]discoveryv1beta1.EndpointSlice, len(objs), len(objs)) + + for i := range objs { + eps := objs[i].(*discoveryv1beta1.EndpointSlice) + fullSet[i] = *eps.DeepCopy() + } + + return fullSet } func (f *RouterControllerFactory) informerStoreList(obj runtime.Object) []interface{} { @@ -168,8 +206,30 @@ func (f *RouterControllerFactory) processExistingItems(rc *routercontroller.Rout } } - for _, item := range f.informerStoreList(&kapi.Endpoints{}) { - rc.HandleEndpoints(watch.Added, item.(*kapi.Endpoints)) + if f.watchEndpoints { + for _, item := range f.informerStoreList(&kapi.Endpoints{}) { + rc.HandleEndpoints(watch.Added, item.(*kapi.Endpoints)) + } + } else { + processedServices := map[string]bool{} + + for _, item := range f.informerStoreList(&discoveryv1beta1.EndpointSlice{}) { + eps := item.(*discoveryv1beta1.EndpointSlice) + + serviceName := endpointSliceServiceName(eps) + if len(serviceName) == 0 { + continue + } + + serviceKey := path.Join(eps.Namespace, serviceName) + if !processedServices[serviceKey] { + log.V(4).Info("processing existing items", "namespace", eps.Namespace, "serviceName", serviceName) + objMeta := eps.ObjectMeta.DeepCopy() + objMeta.Name = serviceName + rc.HandleEndpointSlice(watch.Added, *objMeta, f.aggregateEndpointSlice(eps.Namespace, serviceName)) + processedServices[serviceKey] = true + } + } } items := []routev1.Route{} @@ -326,3 +386,38 @@ func (r routeAge) Swap(i, j int) { r[i], r[j] = r[j], r[i] } func (r routeAge) Less(i, j int) bool { return routeapihelpers.RouteLessThan(&r[i], &r[j]) } + +func endpointSliceServiceName(eps *discoveryv1beta1.EndpointSlice) string { + if name, ok := eps.Labels[ServiceNameLabel]; ok && name != "" { + return name + } + return "" +} + +func endpointSliceByServiceLabelIndexFunc(obj interface{}) ([]string, error) { + if eps, ok := obj.(*discoveryv1beta1.EndpointSlice); ok { + if name := endpointSliceServiceName(eps); name != "" { + return []string{path.Join(eps.Namespace, name)}, nil + } + } + return []string{}, nil +} + +func (f *RouterControllerFactory) createEndpointSliceSharedInformer() { + // we do not scope endpointSlice by labels or fields because the route labels != endpoints labels + lw := &kcache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return f.KClient.DiscoveryV1beta1().EndpointSlices(f.Namespace).List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return f.KClient.DiscoveryV1beta1().EndpointSlices(f.Namespace).Watch(context.TODO(), options) + }, + } + eps := &discoveryv1beta1.EndpointSlice{} + objType := reflect.TypeOf(eps) + informer := kcache.NewSharedIndexInformer(lw, eps, f.ResyncInterval, kcache.Indexers{ + kcache.NamespaceIndex: kcache.MetaNamespaceIndexFunc, + ServiceNameIndex: endpointSliceByServiceLabelIndexFunc, + }) + f.informers[objType] = informer +} diff --git a/pkg/router/controller/router_controller.go b/pkg/router/controller/router_controller.go index 85f3f2665..099ae6e08 100644 --- a/pkg/router/controller/router_controller.go +++ b/pkg/router/controller/router_controller.go @@ -6,7 +6,10 @@ import ( "sync" "time" + routev1 "github.com/openshift/api/route/v1" + projectclient "github.com/openshift/client-go/project/clientset/versioned/typed/project/v1" kapi "k8s.io/api/core/v1" + discoveryv1beta1 "k8s.io/api/discovery/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -14,10 +17,9 @@ import ( utilwait "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" - routev1 "github.com/openshift/api/route/v1" - projectclient "github.com/openshift/client-go/project/clientset/versioned/typed/project/v1" logf "github.com/openshift/router/log" "github.com/openshift/router/pkg/router" + "github.com/openshift/router/pkg/router/controller/endpointsubset" ) var log = logf.Logger.WithName("controller") @@ -227,6 +229,83 @@ func (c *RouterController) HandleEndpoints(eventType watch.EventType, obj interf c.Commit() } +// HandleEndpointSlice handles a single EndpointSlice event and refreshes the router backend. +func (c *RouterController) HandleEndpointSlice(eventType watch.EventType, objMeta metav1.ObjectMeta, items []discoveryv1beta1.EndpointSlice) { + c.lock.Lock() + defer c.lock.Unlock() + + var subsets []kapi.EndpointSubset + for i := range items { + var ports []kapi.EndpointPort + var addresses []kapi.EndpointAddress + + for j := range items[i].Endpoints { + for k := range items[i].Endpoints[j].Addresses { + epa := kapi.EndpointAddress{ + IP: items[i].Endpoints[j].Addresses[k], + TargetRef: items[i].Endpoints[j].TargetRef, + } + if items[i].Endpoints[j].Hostname != nil { + epa.Hostname = *items[i].Endpoints[j].Hostname + } + addresses = append(addresses, epa) + } + } + + for j := range items[i].Ports { + endpointPort := kapi.EndpointPort{ + AppProtocol: items[i].Ports[j].AppProtocol, + } + if items[i].Ports[j].Name != nil { + endpointPort.Name = *items[i].Ports[j].Name + } + if items[i].Ports[j].Port != nil { + endpointPort.Port = *items[i].Ports[j].Port + } + if items[i].Ports[j].Protocol != nil { + endpointPort.Protocol = *items[i].Ports[j].Protocol + } + ports = append(ports, endpointPort) + } + + endpointsubset.SortAddresses(addresses) + endpointsubset.SortPorts(ports) + + subsets = append(subsets, kapi.EndpointSubset{ + Addresses: addresses, + Ports: ports, + }) + } + + endpoints := &kapi.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: objMeta.Name, + Namespace: objMeta.Namespace, + Labels: objMeta.Labels, + Annotations: objMeta.Annotations, + OwnerReferences: objMeta.OwnerReferences, + ClusterName: objMeta.ClusterName, + }, + Subsets: subsets, + } + + // RecordNamespaceEndpoints and all HandleEndpoints + // implementations treat watch.Modified and watch.Added the + // same, so we can conflate watch.Modified and watch.Added + // here + if len(items) == 0 { + eventType = watch.Deleted + } else { + eventType = watch.Modified + } + + c.RecordNamespaceEndpoints(eventType, endpoints) + if err := c.Plugin.HandleEndpoints(eventType, endpoints); err != nil { + utilruntime.HandleError(err) + } + c.Commit() +} + // Commit notifies the plugin that it is safe to commit state. func (c *RouterController) Commit() { if c.firstSyncDone { diff --git a/pkg/router/template/plugin.go b/pkg/router/template/plugin.go index de1844637..67b313ca7 100644 --- a/pkg/router/template/plugin.go +++ b/pkg/router/template/plugin.go @@ -3,6 +3,7 @@ package templaterouter import ( "crypto/md5" "fmt" + "net" "path/filepath" "strconv" "strings" @@ -291,6 +292,17 @@ func createRouterEndpoints(endpoints *kapi.Endpoints, excludeUDP bool, lookupSvc out := make([]Endpoint, 0, len(endpoints.Subsets)*4) + // Return address as "[
]" if an IPv6 address, + // otherwise address is returned unadorned. + formatIPAddr := func(address string) string { + if ip := net.ParseIP(address); ip != nil { + if ip.To4() == nil && strings.Count(address, ":") >= 2 { + return "[" + address + "]" + } + } + return address + } + // Now build the actual endpoints we pass to the template for _, s := range subsets { for _, p := range s.Ports { @@ -299,13 +311,14 @@ func createRouterEndpoints(endpoints *kapi.Endpoints, excludeUDP bool, lookupSvc } for _, a := range s.Addresses { ep := Endpoint{ - IP: a.IP, + IP: formatIPAddr(a.IP), Port: strconv.Itoa(int(p.Port)), PortName: p.Name, NoHealthCheck: wasIdled, } + if a.TargetRef != nil { ep.TargetName = a.TargetRef.Name if a.TargetRef.Kind == "Pod" { @@ -314,7 +327,7 @@ func createRouterEndpoints(endpoints *kapi.Endpoints, excludeUDP bool, lookupSvc ep.ID = fmt.Sprintf("ept:%s:%s:%d", endpoints.Name, a.IP, p.Port) } } else { - ep.TargetName = ep.IP + ep.TargetName = a.IP ep.ID = fmt.Sprintf("ept:%s:%s:%d", endpoints.Name, a.IP, p.Port) }