diff --git a/cmd/dranet/app.go b/cmd/dranet/app.go index d6dc5555..9b1874d9 100644 --- a/cmd/dranet/app.go +++ b/cmd/dranet/app.go @@ -27,12 +27,15 @@ import ( "runtime/debug" "sync/atomic" "syscall" + "time" "github.com/google/cel-go/cel" "github.com/google/cel-go/ext" "github.com/google/dranet/pkg/driver" + "github.com/google/dranet/pkg/inventory" "github.com/google/dranet/pkg/pcidb" "github.com/prometheus/client_golang/prometheus/promhttp" + "golang.org/x/time/rate" resourcev1 "k8s.io/api/resource/v1" "k8s.io/client-go/kubernetes" @@ -51,6 +54,9 @@ var ( kubeconfig string bindAddress string celExpression string + minPollInterval time.Duration + maxPollInterval time.Duration + pollBurst int ready atomic.Bool ) @@ -60,6 +66,9 @@ func init() { flag.StringVar(&bindAddress, "bind-address", ":9177", "The IP address and port for the metrics and healthz server to serve on") flag.StringVar(&hostnameOverride, "hostname-override", "", "If non-empty, will be used as the name of the Node that kube-network-policies is running on. If unset, the node name is assumed to be the same as the node's hostname.") flag.StringVar(&celExpression, "filter", `!("dra.net/type" in attributes) || attributes["dra.net/type"].StringValue != "veth"`, "CEL expression to filter network interface attributes (v1.DeviceAttribute).") + flag.DurationVar(&minPollInterval, "inventory-min-poll-interval", 2*time.Second, "The minimum interval between two consecutive polls of the inventory.") + flag.DurationVar(&maxPollInterval, "inventory-max-poll-interval", 1*time.Minute, "The maximum interval between two consecutive polls of the inventory.") + flag.IntVar(&pollBurst, "inventory-poll-burst", 5, "The number of polls that can be run in a burst.") flag.Usage = func() { fmt.Fprint(os.Stderr, "Usage: dranet [options]\n\n") @@ -150,6 +159,11 @@ func main() { } opts = append(opts, driver.WithFilter(prg)) } + db := inventory.New( + inventory.WithRateLimiter(rate.NewLimiter(rate.Every(minPollInterval), pollBurst)), + inventory.WithMaxPollInterval(maxPollInterval), + ) + opts = append(opts, driver.WithInventory(db)) dranet, err := driver.Start(ctx, driverName, clientset, nodeName, opts...) if err != nil { klog.Fatalf("driver failed to start: %v", err) diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index a8f8bba7..e95bd72d 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -73,6 +73,13 @@ func WithFilter(filter cel.Program) Option { } } +// WithInventory sets the inventory database for the driver. +func WithInventory(db inventoryDB) Option { + return func(o *NetworkDriver) { + o.netdb = db + } +} + type NetworkDriver struct { driverName string nodeName string @@ -174,7 +181,9 @@ func Start(ctx context.Context, driverName string, kubeClient kubernetes.Interfa }() // register the host network interfaces - plugin.netdb = inventory.New() + if plugin.netdb == nil { + plugin.netdb = inventory.New() + } go func() { for i := 0; i < maxAttempts; i++ { err = plugin.netdb.Run(ctx) diff --git a/pkg/inventory/db.go b/pkg/inventory/db.go index 810496be..e4bd84d5 100644 --- a/pkg/inventory/db.go +++ b/pkg/inventory/db.go @@ -42,9 +42,15 @@ import ( ) const ( - // database poll period - minInterval = 5 * time.Second - maxInterval = 1 * time.Minute + // defaultMinPollInterval is the default minimum interval between two + // consecutive polls of the inventory. + defaultMinPollInterval = 2 * time.Second + // defaultMaxPollInterval is the default maximum interval between two + // consecutive polls of the inventory. + defaultMaxPollInterval = 1 * time.Minute + // defaultPollBurst is the default number of polls that can be run in a + // burst. + defaultPollBurst = 5 ) var ( @@ -55,6 +61,10 @@ var ( type DB struct { instance *cloudprovider.CloudInstance + // TODO: it is not common but may happen in edge cases that the default + // gateway changes revisit once we have more evidence this can be a + // potential problem or break some use cases. + gwInterfaces sets.Set[string] mu sync.RWMutex // podNetNsStore gives the network namespace for a pod, indexed by the pods @@ -66,18 +76,38 @@ type DB struct { // The deviceStore is periodically updated by the Run method. deviceStore map[string]resourceapi.Device - rateLimiter *rate.Limiter - notifications chan []resourceapi.Device - hasDevices bool + rateLimiter *rate.Limiter + maxPollInterval time.Duration + notifications chan []resourceapi.Device + hasDevices bool } -func New() *DB { - return &DB{ - podNetNsStore: map[string]string{}, - deviceStore: map[string]resourceapi.Device{}, - rateLimiter: rate.NewLimiter(rate.Every(minInterval), 1), - notifications: make(chan []resourceapi.Device), +type Option func(*DB) + +func WithRateLimiter(limiter *rate.Limiter) Option { + return func(db *DB) { + db.rateLimiter = limiter + } +} + +func WithMaxPollInterval(d time.Duration) Option { + return func(db *DB) { + db.maxPollInterval = d + } +} + +func New(opts ...Option) *DB { + db := &DB{ + podNetNsStore: map[string]string{}, + deviceStore: map[string]resourceapi.Device{}, + rateLimiter: rate.NewLimiter(rate.Every(defaultMinPollInterval), defaultPollBurst), + notifications: make(chan []resourceapi.Device), + maxPollInterval: defaultMaxPollInterval, + } + for _, o := range opts { + o(db) } + return db } func (db *DB) AddPodNetNs(pod string, netNsPath string) { @@ -114,15 +144,12 @@ func (db *DB) Run(ctx context.Context) error { doneCh := make(chan struct{}) defer close(doneCh) if err := netlink.LinkSubscribe(nlChannel, doneCh); err != nil { - klog.Error(err, "error subscribing to netlink interfaces, only syncing periodically", "interval", maxInterval.String()) + klog.Error(err, "error subscribing to netlink interfaces, only syncing periodically", "interval", db.maxPollInterval.String()) } // Obtain data that will not change after the startup db.instance = getInstanceProperties(ctx) - // TODO: it is not common but may happen in edge cases that the default gateway changes - // revisit once we have more evidence this can be a potential problem or break some use - // cases. - gwInterfaces := getDefaultGwInterfaces() + db.gwInterfaces = getDefaultGwInterfaces() for { err := db.rateLimiter.Wait(ctx) @@ -130,28 +157,12 @@ func (db *DB) Run(ctx context.Context) error { klog.Error(err, "unexpected rate limited error trying to get system interfaces") } - devices := db.discoverPCIDevices() - devices = db.discoverNetworkInterfaces(devices) - devices = db.discoverRDMADevices(devices) - devices = db.addCloudAttributes(devices) - - // Remove default interface. - filteredDevices := []resourceapi.Device{} - for _, device := range devices { - ifName := device.Attributes[apis.AttrInterfaceName].StringValue - if ifName != nil && gwInterfaces.Has(string(*ifName)) { - klog.V(4).Infof("Ignoring interface %s from discovery since it is an uplink interface", *ifName) - continue - } - filteredDevices = append(filteredDevices, device) - } - - klog.V(4).Infof("Found %d devices", len(filteredDevices)) + filteredDevices := db.scan() if len(filteredDevices) > 0 || db.hasDevices { db.hasDevices = len(filteredDevices) > 0 - db.updateDeviceStore(filteredDevices) db.notifications <- filteredDevices } + select { // trigger a reconcile case <-nlChannel: @@ -159,13 +170,38 @@ func (db *DB) Run(ctx context.Context) error { for len(nlChannel) > 0 { <-nlChannel } - case <-time.After(maxInterval): + case <-time.After(db.maxPollInterval): case <-ctx.Done(): return ctx.Err() } } } +// scan discovers the available devices on the node. +// It discovers PCI, network, and RDMA devices, adds cloud attributes, +// filters out default interfaces, and updates the device store. +func (db *DB) scan() []resourceapi.Device { + devices := db.discoverPCIDevices() + devices = db.discoverNetworkInterfaces(devices) + devices = db.discoverRDMADevices(devices) + devices = db.addCloudAttributes(devices) + + // Remove default interface. + filteredDevices := []resourceapi.Device{} + for _, device := range devices { + ifName := device.Attributes[apis.AttrInterfaceName].StringValue + if ifName != nil && db.gwInterfaces.Has(string(*ifName)) { + klog.V(4).Infof("Ignoring interface %s from discovery since it is an uplink interface", *ifName) + continue + } + filteredDevices = append(filteredDevices, device) + } + + klog.V(4).Infof("Found %d devices", len(filteredDevices)) + db.updateDeviceStore(filteredDevices) + return filteredDevices +} + func (db *DB) GetResources(ctx context.Context) <-chan []resourceapi.Device { return db.notifications } @@ -389,7 +425,27 @@ func (db *DB) GetDevice(deviceName string) (resourceapi.Device, bool) { return device, exists } +// GetNetInterfaceName returns the network interface name for a given device. It +// first attempts to retrieve the name from the local device store. If the +// device is not found, it triggers a rescan of the system's devices and retries +// the lookup. This can happen when a device was recently released by a previous +// pod and a scan had not happened yet. This ensures that the function can find +// newly added devices that were not present in the store at the time of the +// initial call. func (db *DB) GetNetInterfaceName(deviceName string) (string, error) { + name, err := db.getNetInterfaceNameWithoutRescan(deviceName) + if err != nil { + klog.V(3).Infof("Device %q not found in local store, rescanning.", deviceName) + db.scan() + name, err = db.getNetInterfaceNameWithoutRescan(deviceName) + } + return name, err +} + +// getNetInterfaceNameWithoutRescan returns the network interface name for a +// given device from the local device store without triggering a rescan if the +// device is not found. +func (db *DB) getNetInterfaceNameWithoutRescan(deviceName string) (string, error) { device, exists := db.GetDevice(deviceName) if !exists { return "", fmt.Errorf("device %s not found in store", deviceName) diff --git a/tests/e2e.bats b/tests/e2e.bats index c62785b5..85402fa8 100644 --- a/tests/e2e.bats +++ b/tests/e2e.bats @@ -85,6 +85,12 @@ setup_tcx_filter() { docker exec "$CLUSTER_NAME"-worker2 bash -c "chmod +x bpftool" docker exec "$CLUSTER_NAME"-worker2 bash -c "./bpftool prog load dummy_bpf_tcx.o /sys/fs/bpf/dummy_prog_tcx" docker exec "$CLUSTER_NAME"-worker2 bash -c "./bpftool net attach tcx_ingress pinned /sys/fs/bpf/dummy_prog_tcx dev dummy0" + # We update the interface to trigger a DRANET driver notification, which + # speeds up the test. Otherwise, DRANET would need to wait for a full resync + # (1 minute) to detect changes to attached BPF programs, as netlink + # notifications don't cover them. + docker exec "$CLUSTER_NAME"-worker2 bash -c "ip link set down dev dummy0" + docker exec "$CLUSTER_NAME"-worker2 bash -c "ip link set up dev dummy0" } # ---- TESTS ----