From 426eb826c679a86aab9e4c1167756686917efeff Mon Sep 17 00:00:00 2001 From: Gaurav Ghildiyal Date: Sun, 12 Oct 2025 01:41:11 +0000 Subject: [PATCH 1/2] feat(inventory): Add configurable discovery rate limiting This commit introduces configurable rate limiting for the inventory discovery process. Previously, a fixed 5-second rate limit with a burst of 1 could delay processing of netlink updates, leading to failures during high pod churn scenarios. Command-line flags have been added to control the inventory discovery rate limit and burst size. The default values have been adjusted to be more responsive to rapid pod lifecycle events, ensuring that device state is updated promptly. --- cmd/dranet/app.go | 14 ++++++++++++ pkg/driver/driver.go | 11 ++++++++- pkg/inventory/db.go | 54 ++++++++++++++++++++++++++++++++------------ tests/e2e.bats | 6 +++++ 4 files changed, 70 insertions(+), 15 deletions(-) diff --git a/cmd/dranet/app.go b/cmd/dranet/app.go index d6dc5555..9b1874d9 100644 --- a/cmd/dranet/app.go +++ b/cmd/dranet/app.go @@ -27,12 +27,15 @@ import ( "runtime/debug" "sync/atomic" "syscall" + "time" "github.com/google/cel-go/cel" "github.com/google/cel-go/ext" "github.com/google/dranet/pkg/driver" + "github.com/google/dranet/pkg/inventory" "github.com/google/dranet/pkg/pcidb" "github.com/prometheus/client_golang/prometheus/promhttp" + "golang.org/x/time/rate" resourcev1 "k8s.io/api/resource/v1" "k8s.io/client-go/kubernetes" @@ -51,6 +54,9 @@ var ( kubeconfig string bindAddress string celExpression string + minPollInterval time.Duration + maxPollInterval time.Duration + pollBurst int ready atomic.Bool ) @@ -60,6 +66,9 @@ func init() { flag.StringVar(&bindAddress, "bind-address", ":9177", "The IP address and port for the metrics and healthz server to serve on") flag.StringVar(&hostnameOverride, "hostname-override", "", "If non-empty, will be used as the name of the Node that kube-network-policies is running on. If unset, the node name is assumed to be the same as the node's hostname.") flag.StringVar(&celExpression, "filter", `!("dra.net/type" in attributes) || attributes["dra.net/type"].StringValue != "veth"`, "CEL expression to filter network interface attributes (v1.DeviceAttribute).") + flag.DurationVar(&minPollInterval, "inventory-min-poll-interval", 2*time.Second, "The minimum interval between two consecutive polls of the inventory.") + flag.DurationVar(&maxPollInterval, "inventory-max-poll-interval", 1*time.Minute, "The maximum interval between two consecutive polls of the inventory.") + flag.IntVar(&pollBurst, "inventory-poll-burst", 5, "The number of polls that can be run in a burst.") flag.Usage = func() { fmt.Fprint(os.Stderr, "Usage: dranet [options]\n\n") @@ -150,6 +159,11 @@ func main() { } opts = append(opts, driver.WithFilter(prg)) } + db := inventory.New( + inventory.WithRateLimiter(rate.NewLimiter(rate.Every(minPollInterval), pollBurst)), + inventory.WithMaxPollInterval(maxPollInterval), + ) + opts = append(opts, driver.WithInventory(db)) dranet, err := driver.Start(ctx, driverName, clientset, nodeName, opts...) if err != nil { klog.Fatalf("driver failed to start: %v", err) diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index a8f8bba7..e95bd72d 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -73,6 +73,13 @@ func WithFilter(filter cel.Program) Option { } } +// WithInventory sets the inventory database for the driver. +func WithInventory(db inventoryDB) Option { + return func(o *NetworkDriver) { + o.netdb = db + } +} + type NetworkDriver struct { driverName string nodeName string @@ -174,7 +181,9 @@ func Start(ctx context.Context, driverName string, kubeClient kubernetes.Interfa }() // register the host network interfaces - plugin.netdb = inventory.New() + if plugin.netdb == nil { + plugin.netdb = inventory.New() + } go func() { for i := 0; i < maxAttempts; i++ { err = plugin.netdb.Run(ctx) diff --git a/pkg/inventory/db.go b/pkg/inventory/db.go index 810496be..a7aadf82 100644 --- a/pkg/inventory/db.go +++ b/pkg/inventory/db.go @@ -42,9 +42,15 @@ import ( ) const ( - // database poll period - minInterval = 5 * time.Second - maxInterval = 1 * time.Minute + // defaultMinPollInterval is the default minimum interval between two + // consecutive polls of the inventory. + defaultMinPollInterval = 2 * time.Second + // defaultMaxPollInterval is the default maximum interval between two + // consecutive polls of the inventory. + defaultMaxPollInterval = 1 * time.Minute + // defaultPollBurst is the default number of polls that can be run in a + // burst. + defaultPollBurst = 5 ) var ( @@ -66,18 +72,38 @@ type DB struct { // The deviceStore is periodically updated by the Run method. deviceStore map[string]resourceapi.Device - rateLimiter *rate.Limiter - notifications chan []resourceapi.Device - hasDevices bool + rateLimiter *rate.Limiter + maxPollInterval time.Duration + notifications chan []resourceapi.Device + hasDevices bool } -func New() *DB { - return &DB{ - podNetNsStore: map[string]string{}, - deviceStore: map[string]resourceapi.Device{}, - rateLimiter: rate.NewLimiter(rate.Every(minInterval), 1), - notifications: make(chan []resourceapi.Device), +type Option func(*DB) + +func WithRateLimiter(limiter *rate.Limiter) Option { + return func(db *DB) { + db.rateLimiter = limiter + } +} + +func WithMaxPollInterval(d time.Duration) Option { + return func(db *DB) { + db.maxPollInterval = d + } +} + +func New(opts ...Option) *DB { + db := &DB{ + podNetNsStore: map[string]string{}, + deviceStore: map[string]resourceapi.Device{}, + rateLimiter: rate.NewLimiter(rate.Every(defaultMinPollInterval), defaultPollBurst), + notifications: make(chan []resourceapi.Device), + maxPollInterval: defaultMaxPollInterval, + } + for _, o := range opts { + o(db) } + return db } func (db *DB) AddPodNetNs(pod string, netNsPath string) { @@ -114,7 +140,7 @@ func (db *DB) Run(ctx context.Context) error { doneCh := make(chan struct{}) defer close(doneCh) if err := netlink.LinkSubscribe(nlChannel, doneCh); err != nil { - klog.Error(err, "error subscribing to netlink interfaces, only syncing periodically", "interval", maxInterval.String()) + klog.Error(err, "error subscribing to netlink interfaces, only syncing periodically", "interval", db.maxPollInterval.String()) } // Obtain data that will not change after the startup @@ -159,7 +185,7 @@ func (db *DB) Run(ctx context.Context) error { for len(nlChannel) > 0 { <-nlChannel } - case <-time.After(maxInterval): + case <-time.After(db.maxPollInterval): case <-ctx.Done(): return ctx.Err() } diff --git a/tests/e2e.bats b/tests/e2e.bats index c62785b5..85402fa8 100644 --- a/tests/e2e.bats +++ b/tests/e2e.bats @@ -85,6 +85,12 @@ setup_tcx_filter() { docker exec "$CLUSTER_NAME"-worker2 bash -c "chmod +x bpftool" docker exec "$CLUSTER_NAME"-worker2 bash -c "./bpftool prog load dummy_bpf_tcx.o /sys/fs/bpf/dummy_prog_tcx" docker exec "$CLUSTER_NAME"-worker2 bash -c "./bpftool net attach tcx_ingress pinned /sys/fs/bpf/dummy_prog_tcx dev dummy0" + # We update the interface to trigger a DRANET driver notification, which + # speeds up the test. Otherwise, DRANET would need to wait for a full resync + # (1 minute) to detect changes to attached BPF programs, as netlink + # notifications don't cover them. + docker exec "$CLUSTER_NAME"-worker2 bash -c "ip link set down dev dummy0" + docker exec "$CLUSTER_NAME"-worker2 bash -c "ip link set up dev dummy0" } # ---- TESTS ---- From c1adaaf853a38981073be69b500a41d4d93a4c9e Mon Sep 17 00:00:00 2001 From: Gaurav Ghildiyal Date: Sun, 12 Oct 2025 19:10:50 +0000 Subject: [PATCH 2/2] feat(inventory): Rescan devices if not found in store Previously, if a device was released by a pod and immediately claimed by another, the inventory might not have had a chance to update. Now, if a device is not found in the local store, a new scan is triggered to ensure that newly available devices are discovered before failing. This improves the reliability of device allocation during high pod churn. --- pkg/inventory/db.go | 74 +++++++++++++++++++++++++++++++-------------- 1 file changed, 52 insertions(+), 22 deletions(-) diff --git a/pkg/inventory/db.go b/pkg/inventory/db.go index a7aadf82..e4bd84d5 100644 --- a/pkg/inventory/db.go +++ b/pkg/inventory/db.go @@ -61,6 +61,10 @@ var ( type DB struct { instance *cloudprovider.CloudInstance + // TODO: it is not common but may happen in edge cases that the default + // gateway changes revisit once we have more evidence this can be a + // potential problem or break some use cases. + gwInterfaces sets.Set[string] mu sync.RWMutex // podNetNsStore gives the network namespace for a pod, indexed by the pods @@ -145,10 +149,7 @@ func (db *DB) Run(ctx context.Context) error { // Obtain data that will not change after the startup db.instance = getInstanceProperties(ctx) - // TODO: it is not common but may happen in edge cases that the default gateway changes - // revisit once we have more evidence this can be a potential problem or break some use - // cases. - gwInterfaces := getDefaultGwInterfaces() + db.gwInterfaces = getDefaultGwInterfaces() for { err := db.rateLimiter.Wait(ctx) @@ -156,28 +157,12 @@ func (db *DB) Run(ctx context.Context) error { klog.Error(err, "unexpected rate limited error trying to get system interfaces") } - devices := db.discoverPCIDevices() - devices = db.discoverNetworkInterfaces(devices) - devices = db.discoverRDMADevices(devices) - devices = db.addCloudAttributes(devices) - - // Remove default interface. - filteredDevices := []resourceapi.Device{} - for _, device := range devices { - ifName := device.Attributes[apis.AttrInterfaceName].StringValue - if ifName != nil && gwInterfaces.Has(string(*ifName)) { - klog.V(4).Infof("Ignoring interface %s from discovery since it is an uplink interface", *ifName) - continue - } - filteredDevices = append(filteredDevices, device) - } - - klog.V(4).Infof("Found %d devices", len(filteredDevices)) + filteredDevices := db.scan() if len(filteredDevices) > 0 || db.hasDevices { db.hasDevices = len(filteredDevices) > 0 - db.updateDeviceStore(filteredDevices) db.notifications <- filteredDevices } + select { // trigger a reconcile case <-nlChannel: @@ -192,6 +177,31 @@ func (db *DB) Run(ctx context.Context) error { } } +// scan discovers the available devices on the node. +// It discovers PCI, network, and RDMA devices, adds cloud attributes, +// filters out default interfaces, and updates the device store. +func (db *DB) scan() []resourceapi.Device { + devices := db.discoverPCIDevices() + devices = db.discoverNetworkInterfaces(devices) + devices = db.discoverRDMADevices(devices) + devices = db.addCloudAttributes(devices) + + // Remove default interface. + filteredDevices := []resourceapi.Device{} + for _, device := range devices { + ifName := device.Attributes[apis.AttrInterfaceName].StringValue + if ifName != nil && db.gwInterfaces.Has(string(*ifName)) { + klog.V(4).Infof("Ignoring interface %s from discovery since it is an uplink interface", *ifName) + continue + } + filteredDevices = append(filteredDevices, device) + } + + klog.V(4).Infof("Found %d devices", len(filteredDevices)) + db.updateDeviceStore(filteredDevices) + return filteredDevices +} + func (db *DB) GetResources(ctx context.Context) <-chan []resourceapi.Device { return db.notifications } @@ -415,7 +425,27 @@ func (db *DB) GetDevice(deviceName string) (resourceapi.Device, bool) { return device, exists } +// GetNetInterfaceName returns the network interface name for a given device. It +// first attempts to retrieve the name from the local device store. If the +// device is not found, it triggers a rescan of the system's devices and retries +// the lookup. This can happen when a device was recently released by a previous +// pod and a scan had not happened yet. This ensures that the function can find +// newly added devices that were not present in the store at the time of the +// initial call. func (db *DB) GetNetInterfaceName(deviceName string) (string, error) { + name, err := db.getNetInterfaceNameWithoutRescan(deviceName) + if err != nil { + klog.V(3).Infof("Device %q not found in local store, rescanning.", deviceName) + db.scan() + name, err = db.getNetInterfaceNameWithoutRescan(deviceName) + } + return name, err +} + +// getNetInterfaceNameWithoutRescan returns the network interface name for a +// given device from the local device store without triggering a rescan if the +// device is not found. +func (db *DB) getNetInterfaceNameWithoutRescan(deviceName string) (string, error) { device, exists := db.GetDevice(deviceName) if !exists { return "", fmt.Errorf("device %s not found in store", deviceName)