Skip to content
This repository was archived by the owner on May 6, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cmd/dranet/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@ import (
"runtime/debug"
"sync/atomic"
"syscall"
"time"

"github.com/google/cel-go/cel"
"github.com/google/cel-go/ext"
"github.com/google/dranet/pkg/driver"
"github.com/google/dranet/pkg/inventory"
"github.com/google/dranet/pkg/pcidb"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/time/rate"

resourcev1 "k8s.io/api/resource/v1"
"k8s.io/client-go/kubernetes"
Expand All @@ -51,6 +54,9 @@ var (
kubeconfig string
bindAddress string
celExpression string
minPollInterval time.Duration
maxPollInterval time.Duration
pollBurst int

ready atomic.Bool
)
Expand All @@ -60,6 +66,9 @@ func init() {
flag.StringVar(&bindAddress, "bind-address", ":9177", "The IP address and port for the metrics and healthz server to serve on")
flag.StringVar(&hostnameOverride, "hostname-override", "", "If non-empty, will be used as the name of the Node that kube-network-policies is running on. If unset, the node name is assumed to be the same as the node's hostname.")
flag.StringVar(&celExpression, "filter", `!("dra.net/type" in attributes) || attributes["dra.net/type"].StringValue != "veth"`, "CEL expression to filter network interface attributes (v1.DeviceAttribute).")
flag.DurationVar(&minPollInterval, "inventory-min-poll-interval", 2*time.Second, "The minimum interval between two consecutive polls of the inventory.")
flag.DurationVar(&maxPollInterval, "inventory-max-poll-interval", 1*time.Minute, "The maximum interval between two consecutive polls of the inventory.")
flag.IntVar(&pollBurst, "inventory-poll-burst", 5, "The number of polls that can be run in a burst.")

flag.Usage = func() {
fmt.Fprint(os.Stderr, "Usage: dranet [options]\n\n")
Expand Down Expand Up @@ -150,6 +159,11 @@ func main() {
}
opts = append(opts, driver.WithFilter(prg))
}
db := inventory.New(
inventory.WithRateLimiter(rate.NewLimiter(rate.Every(minPollInterval), pollBurst)),
inventory.WithMaxPollInterval(maxPollInterval),
)
opts = append(opts, driver.WithInventory(db))
dranet, err := driver.Start(ctx, driverName, clientset, nodeName, opts...)
if err != nil {
klog.Fatalf("driver failed to start: %v", err)
Expand Down
11 changes: 10 additions & 1 deletion pkg/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ func WithFilter(filter cel.Program) Option {
}
}

// WithInventory sets the inventory database for the driver.
func WithInventory(db inventoryDB) Option {
return func(o *NetworkDriver) {
o.netdb = db
}
}

type NetworkDriver struct {
driverName string
nodeName string
Expand Down Expand Up @@ -174,7 +181,9 @@ func Start(ctx context.Context, driverName string, kubeClient kubernetes.Interfa
}()

// register the host network interfaces
plugin.netdb = inventory.New()
if plugin.netdb == nil {
plugin.netdb = inventory.New()
}
go func() {
for i := 0; i < maxAttempts; i++ {
err = plugin.netdb.Run(ctx)
Expand Down
128 changes: 92 additions & 36 deletions pkg/inventory/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,15 @@ import (
)

const (
// database poll period
minInterval = 5 * time.Second
maxInterval = 1 * time.Minute
// defaultMinPollInterval is the default minimum interval between two
// consecutive polls of the inventory.
defaultMinPollInterval = 2 * time.Second
// defaultMaxPollInterval is the default maximum interval between two
// consecutive polls of the inventory.
defaultMaxPollInterval = 1 * time.Minute
// defaultPollBurst is the default number of polls that can be run in a
// burst.
defaultPollBurst = 5
)

var (
Expand All @@ -55,6 +61,10 @@ var (

type DB struct {
instance *cloudprovider.CloudInstance
// TODO: it is not common but may happen in edge cases that the default
// gateway changes revisit once we have more evidence this can be a
// potential problem or break some use cases.
gwInterfaces sets.Set[string]

mu sync.RWMutex
// podNetNsStore gives the network namespace for a pod, indexed by the pods
Expand All @@ -66,18 +76,38 @@ type DB struct {
// The deviceStore is periodically updated by the Run method.
deviceStore map[string]resourceapi.Device

rateLimiter *rate.Limiter
notifications chan []resourceapi.Device
hasDevices bool
rateLimiter *rate.Limiter
maxPollInterval time.Duration
notifications chan []resourceapi.Device
hasDevices bool
}

func New() *DB {
return &DB{
podNetNsStore: map[string]string{},
deviceStore: map[string]resourceapi.Device{},
rateLimiter: rate.NewLimiter(rate.Every(minInterval), 1),
notifications: make(chan []resourceapi.Device),
type Option func(*DB)

func WithRateLimiter(limiter *rate.Limiter) Option {
return func(db *DB) {
db.rateLimiter = limiter
}
}

func WithMaxPollInterval(d time.Duration) Option {
return func(db *DB) {
db.maxPollInterval = d
}
}

func New(opts ...Option) *DB {
db := &DB{
podNetNsStore: map[string]string{},
deviceStore: map[string]resourceapi.Device{},
rateLimiter: rate.NewLimiter(rate.Every(defaultMinPollInterval), defaultPollBurst),
notifications: make(chan []resourceapi.Device),
maxPollInterval: defaultMaxPollInterval,
}
for _, o := range opts {
o(db)
}
return db
}

func (db *DB) AddPodNetNs(pod string, netNsPath string) {
Expand Down Expand Up @@ -114,58 +144,64 @@ func (db *DB) Run(ctx context.Context) error {
doneCh := make(chan struct{})
defer close(doneCh)
if err := netlink.LinkSubscribe(nlChannel, doneCh); err != nil {
klog.Error(err, "error subscribing to netlink interfaces, only syncing periodically", "interval", maxInterval.String())
klog.Error(err, "error subscribing to netlink interfaces, only syncing periodically", "interval", db.maxPollInterval.String())
}

// Obtain data that will not change after the startup
db.instance = getInstanceProperties(ctx)
// TODO: it is not common but may happen in edge cases that the default gateway changes
// revisit once we have more evidence this can be a potential problem or break some use
// cases.
gwInterfaces := getDefaultGwInterfaces()
db.gwInterfaces = getDefaultGwInterfaces()

for {
err := db.rateLimiter.Wait(ctx)
if err != nil {
klog.Error(err, "unexpected rate limited error trying to get system interfaces")
}

devices := db.discoverPCIDevices()
devices = db.discoverNetworkInterfaces(devices)
devices = db.discoverRDMADevices(devices)
devices = db.addCloudAttributes(devices)

// Remove default interface.
filteredDevices := []resourceapi.Device{}
for _, device := range devices {
ifName := device.Attributes[apis.AttrInterfaceName].StringValue
if ifName != nil && gwInterfaces.Has(string(*ifName)) {
klog.V(4).Infof("Ignoring interface %s from discovery since it is an uplink interface", *ifName)
continue
}
filteredDevices = append(filteredDevices, device)
}

klog.V(4).Infof("Found %d devices", len(filteredDevices))
filteredDevices := db.scan()
if len(filteredDevices) > 0 || db.hasDevices {
db.hasDevices = len(filteredDevices) > 0
db.updateDeviceStore(filteredDevices)
db.notifications <- filteredDevices
}

select {
// trigger a reconcile
case <-nlChannel:
// drain the channel so we only sync once
for len(nlChannel) > 0 {
<-nlChannel
}
case <-time.After(maxInterval):
case <-time.After(db.maxPollInterval):
case <-ctx.Done():
return ctx.Err()
}
}
}

// scan discovers the available devices on the node.
// It discovers PCI, network, and RDMA devices, adds cloud attributes,
// filters out default interfaces, and updates the device store.
func (db *DB) scan() []resourceapi.Device {
devices := db.discoverPCIDevices()
devices = db.discoverNetworkInterfaces(devices)
devices = db.discoverRDMADevices(devices)
devices = db.addCloudAttributes(devices)

// Remove default interface.
filteredDevices := []resourceapi.Device{}
for _, device := range devices {
ifName := device.Attributes[apis.AttrInterfaceName].StringValue
if ifName != nil && db.gwInterfaces.Has(string(*ifName)) {
klog.V(4).Infof("Ignoring interface %s from discovery since it is an uplink interface", *ifName)
continue
}
filteredDevices = append(filteredDevices, device)
}

klog.V(4).Infof("Found %d devices", len(filteredDevices))
db.updateDeviceStore(filteredDevices)
return filteredDevices
}

func (db *DB) GetResources(ctx context.Context) <-chan []resourceapi.Device {
return db.notifications
}
Expand Down Expand Up @@ -389,7 +425,27 @@ func (db *DB) GetDevice(deviceName string) (resourceapi.Device, bool) {
return device, exists
}

// GetNetInterfaceName returns the network interface name for a given device. It
// first attempts to retrieve the name from the local device store. If the
// device is not found, it triggers a rescan of the system's devices and retries
// the lookup. This can happen when a device was recently released by a previous
// pod and a scan had not happened yet. This ensures that the function can find
// newly added devices that were not present in the store at the time of the
// initial call.
func (db *DB) GetNetInterfaceName(deviceName string) (string, error) {
name, err := db.getNetInterfaceNameWithoutRescan(deviceName)
Comment thread
gauravkghildiyal marked this conversation as resolved.
if err != nil {
klog.V(3).Infof("Device %q not found in local store, rescanning.", deviceName)
db.scan()
name, err = db.getNetInterfaceNameWithoutRescan(deviceName)
}
return name, err
}

// getNetInterfaceNameWithoutRescan returns the network interface name for a
// given device from the local device store without triggering a rescan if the
// device is not found.
func (db *DB) getNetInterfaceNameWithoutRescan(deviceName string) (string, error) {
device, exists := db.GetDevice(deviceName)
if !exists {
return "", fmt.Errorf("device %s not found in store", deviceName)
Expand Down
6 changes: 6 additions & 0 deletions tests/e2e.bats
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ setup_tcx_filter() {
docker exec "$CLUSTER_NAME"-worker2 bash -c "chmod +x bpftool"
docker exec "$CLUSTER_NAME"-worker2 bash -c "./bpftool prog load dummy_bpf_tcx.o /sys/fs/bpf/dummy_prog_tcx"
docker exec "$CLUSTER_NAME"-worker2 bash -c "./bpftool net attach tcx_ingress pinned /sys/fs/bpf/dummy_prog_tcx dev dummy0"
# We update the interface to trigger a DRANET driver notification, which
# speeds up the test. Otherwise, DRANET would need to wait for a full resync
# (1 minute) to detect changes to attached BPF programs, as netlink
# notifications don't cover them.
docker exec "$CLUSTER_NAME"-worker2 bash -c "ip link set down dev dummy0"
docker exec "$CLUSTER_NAME"-worker2 bash -c "ip link set up dev dummy0"
}

# ---- TESTS ----
Expand Down
Loading