diff --git a/install.yaml b/install.yaml index 863e3615..95a149a4 100644 --- a/install.yaml +++ b/install.yaml @@ -129,8 +129,7 @@ spec: cpu: "100m" memory: "50Mi" securityContext: - capabilities: - add: ["NET_ADMIN", "SYS_ADMIN"] + privileged: true readinessProbe: httpGet: path: /healthz @@ -148,6 +147,9 @@ spec: - name: infiniband mountPath: /dev/infiniband mountPropagation: HostToContainer + - name: bpf-programs + mountPath: /sys/fs/bpf + mountPropagation: HostToContainer volumes: - name: device-plugin hostPath: @@ -167,4 +169,7 @@ spec: - name: etc hostPath: path: /etc + - name: bpf-programs + hostPath: + path: /sys/fs/bpf --- diff --git a/pkg/driver/dra_hooks.go b/pkg/driver/dra_hooks.go index a653983e..8f368706 100644 --- a/pkg/driver/dra_hooks.go +++ b/pkg/driver/dra_hooks.go @@ -278,6 +278,17 @@ func (np *NetworkDriver) prepareResourceClaim(ctx context.Context, claim *resour } } + // Remove the pinned programs before the NRI hooks since it + // has to walk the entire bpf virtual filesystem and is slow + // TODO: check if there is some other way to do this + if podCfg.Network.Interface.DisableEBPFPrograms != nil && + *podCfg.Network.Interface.DisableEBPFPrograms { + err := unpinBPFPrograms(ifName) + if err != nil { + klog.Infof("error unpinning ebpf programs for %s : %v", ifName, err) + } + } + device := kubeletplugin.Device{ Requests: []string{result.Request}, PoolName: result.Pool, diff --git a/pkg/driver/ebpf.go b/pkg/driver/ebpf.go new file mode 100644 index 00000000..87618e85 --- /dev/null +++ b/pkg/driver/ebpf.go @@ -0,0 +1,198 @@ +/* +Copyright 2025 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package driver + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "runtime" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/link" + "github.com/vishvananda/netlink" + "github.com/vishvananda/netns" + "k8s.io/klog/v2" +) + +// unpinBPFPrograms runs in the host namespace to delete all the pinned bpf programs +func unpinBPFPrograms(ifName string) error { + device, err := netlink.LinkByName(ifName) + if err != nil && !errors.Is(err, netlink.ErrDumpInterrupted) { + return err + } + ifIndex := uint32(device.Attrs().Index) + + klog.V(2).Infof("Attempting to unpin eBPF programs from interface %s", ifName) + return filepath.Walk("/sys/fs/bpf", func(pinPath string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if info.IsDir() { + return nil + } + + l, err := link.LoadPinnedLink(pinPath, &ebpf.LoadPinOptions{}) + if err != nil { + klog.V(4).Infof("error getting link %s: %v", pinPath, err) + return nil + } + + linkInfo, err := l.Info() + if err != nil { + klog.Infof("error link info: %v", err) + return nil + } + + var linkIfIndex uint32 + switch linkInfo.Type { + case link.TCXType: + extra := linkInfo.TCX() + if extra != nil { + linkIfIndex = extra.Ifindex + } + case link.NetkitType: + extra := linkInfo.Netkit() + if extra != nil { + linkIfIndex = extra.Ifindex + } + case link.XDPType: + extra := linkInfo.XDP() + if extra != nil { + linkIfIndex = extra.Ifindex + } + default: + return nil + } + if linkIfIndex != ifIndex { + return nil + } + err = l.Unpin() + if err != nil { + klog.Infof("fail to unpin bpf link %v", err) + } else { + klog.V(2).Infof("succesfully unpin bpf from link %d", linkInfo.ID) + } + return nil + }) + +} + +// detachEBPFPrograms detaches all eBPF programs (TC and TCX) from a given network interface. +// It attempts to remove both classic TC filters and newer TCX programs. +// It runs inside the network namespace to avoid programs on the root namespace +// to cause issues detaching the programs. +func detachEBPFPrograms(containerNsPAth string, ifName string) error { + origns, err := netns.Get() + if err != nil { + return fmt.Errorf("unexpected error trying to get namespace: %v", err) + } + defer origns.Close() + containerNs, err := netns.GetFromPath(containerNsPAth) + if err != nil { + return fmt.Errorf("could not get network namespace from path %s for network device %s : %w", containerNsPAth, ifName, err) + } + defer containerNs.Close() + + runtime.LockOSThread() + defer runtime.UnlockOSThread() + err = netns.Set(containerNs) + if err != nil { + return fmt.Errorf("failt to join network namespace %s : %v", containerNsPAth, err) + } + // Switch back to the original namespace + defer netns.Set(origns) // nolint:errcheck + + var errs []error + device, err := netlink.LinkByName(ifName) + if err != nil && !errors.Is(err, netlink.ErrDumpInterrupted) { + return err + } + + // Detach TC filters (legacy) + klog.V(2).Infof("Attempting to detach TC filters from interface %s", device.Attrs().Name) + for _, parent := range []uint32{netlink.HANDLE_MIN_INGRESS, netlink.HANDLE_MIN_EGRESS} { + filters, err := netlink.FilterList(device, parent) + if err != nil { + klog.V(4).Infof("Could not list TC filters for interface %s (parent %d): %v", device.Attrs().Name, parent, err) + continue + } + for _, f := range filters { + if bpfFilter, ok := f.(*netlink.BpfFilter); ok { + klog.V(4).Infof("Deleting TC filter %s from interface %s (parent %d)", bpfFilter.Name, device.Attrs().Name, parent) + if err := netlink.FilterDel(f); err != nil { + klog.V(2).Infof("failed to delete TC filter %s on %s: %v", bpfFilter.Name, device.Attrs().Name, err) + } + } + } + } + + // Detach TCX programs + klog.V(2).Infof("Attempting to detach TCX programs from interface %s", device.Attrs().Name) + for _, attach := range []ebpf.AttachType{ebpf.AttachTCXIngress, ebpf.AttachTCXEgress} { + klog.V(2).Infof("Attempting to detach programs from attachment %s interface %s", attach.String(), device.Attrs().Name) + result, err := link.QueryPrograms(link.QueryOptions{ + Target: int(device.Attrs().Index), + Attach: attach, + }) + if err != nil { + errs = append(errs, err) + continue + } + for _, p := range result.Programs { + klog.V(2).Infof("Attempting to detach program %d from interface %s", p.ID, device.Attrs().Name) + err = tryDetach(p.ID, device.Attrs().Index, attach) + if err != nil { + klog.V(2).Infof("Failed to detach program %d from interface %s", p.ID, device.Attrs().Name) + errs = append(errs, err) + } + } + } + + return errors.Join(errs...) +} + +func tryDetach(id ebpf.ProgramID, deviceIdx int, attach ebpf.AttachType) error { + prog, err := ebpf.NewProgramFromID(id) + if err != nil { + klog.V(2).Infof("failed to get eBPF program with ID %d: %v", id, err) + return err + } + + if err := prog.Unpin(); err != nil { + klog.Infof("failed to unpin eBPF program %s: %v", prog.String(), err) + return err + } + + err = link.RawDetachProgram(link.RawDetachProgramOptions{ + Target: deviceIdx, + Program: prog, + Attach: attach, + }) + if err != nil { + klog.V(2).Infof("failed to detach eBPF program with ID %d: %v", id, err) + } + + err = prog.Close() + if err != nil { + klog.Infof("failed to close eBPF program %s: %v", prog.String(), err) + return err + } + return nil +} diff --git a/pkg/driver/hostdevice.go b/pkg/driver/hostdevice.go index 3236a405..cb6b7b18 100644 --- a/pkg/driver/hostdevice.go +++ b/pkg/driver/hostdevice.go @@ -20,10 +20,7 @@ import ( "errors" "fmt" "net" - "runtime" - "github.com/cilium/ebpf" - "github.com/cilium/ebpf/link" "github.com/google/dranet/pkg/apis" "github.com/vishvananda/netlink" @@ -247,91 +244,3 @@ func nsDetachNetdev(containerNsPAth string, devName string, outName string) erro } return nil } - -// detachEBPFPrograms detaches all eBPF programs (TC and TCX) from a given network interface. -// It attempts to remove both classic TC filters and newer TCX programs. -// Returns an aggregated error if any detachment fails. -func detachEBPFPrograms(containerNsPAth string, ifName string) error { - origns, err := netns.Get() - if err != nil { - return fmt.Errorf("unexpected error trying to get namespace: %v", err) - } - defer origns.Close() - - containerNs, err := netns.GetFromPath(containerNsPAth) - if err != nil { - return fmt.Errorf("could not get network namespace from path %s for network device %s : %w", containerNsPAth, ifName, err) - } - defer containerNs.Close() - - runtime.LockOSThread() - defer runtime.UnlockOSThread() - err = netns.Set(containerNs) - if err != nil { - return fmt.Errorf("failt to join network namespace %s : %v", containerNsPAth, err) - } - - // Switch back to the original namespace - defer netns.Set(origns) // nolint:errcheck - - var errs []error - - device, err := netlink.LinkByName(ifName) - if err != nil { - return err - } - - // Detach TC filters (legacy) - klog.V(2).Infof("Attempting to detach TC filters from interface %s", device.Attrs().Name) - for _, parent := range []uint32{netlink.HANDLE_MIN_INGRESS, netlink.HANDLE_MIN_EGRESS} { - filters, err := netlink.FilterList(device, parent) - if err != nil { - klog.V(4).Infof("Could not list TC filters for interface %s (parent %d): %v", device.Attrs().Name, parent, err) - continue - } - for _, f := range filters { - if bpfFilter, ok := f.(*netlink.BpfFilter); ok { - klog.V(4).Infof("Deleting TC filter %s from interface %s (parent %d)", bpfFilter.Name, device.Attrs().Name, parent) - if err := netlink.FilterDel(f); err != nil { - klog.V(2).Infof("failed to delete TC filter %s on %s: %v", bpfFilter.Name, device.Attrs().Name, err) - } - } - } - } - // Detach TCX programs - klog.V(2).Infof("Attempting to detach TCX programs from interface %s", device.Attrs().Name) - for _, attach := range []ebpf.AttachType{ebpf.AttachTCXIngress, ebpf.AttachTCXEgress} { - klog.V(2).Infof("Attempting to detach programs from interface %s", device.Attrs().Name) - result, err := link.QueryPrograms(link.QueryOptions{ - Target: int(device.Attrs().Index), - Attach: attach, - }) - if err != nil { - errs = append(errs, err) - continue - } - for _, p := range result.Programs { - klog.V(2).Infof("Attempting to detach program %d from interface %s", p.ID, device.Attrs().Name) - prog, err := ebpf.NewProgramFromID(p.ID) - if err != nil { - klog.V(2).Infof("failed to get eBPF program with ID %d: %v", p.ID, err) - errs = append(errs, err) - continue - } - - err = link.RawDetachProgram(link.RawDetachProgramOptions{ - Target: device.Attrs().Index, - Program: prog, - Attach: attach, - }) - if err != nil { - klog.V(2).Infof("failed to get eBPF program with ID %d: %v", p.ID, err) - errs = append(errs, err) - continue - } - prog.Close() // nolint:errcheck - } - } - - return errors.Join(errs...) -} diff --git a/pkg/driver/nri_hooks.go b/pkg/driver/nri_hooks.go index 451793d6..2cc32073 100644 --- a/pkg/driver/nri_hooks.go +++ b/pkg/driver/nri_hooks.go @@ -157,7 +157,7 @@ func (np *NetworkDriver) RunPodSandbox(ctx context.Context, pod *api.PodSandbox) // Check if the ebpf programs should be disabled if config.Network.Interface.DisableEBPFPrograms != nil && *config.Network.Interface.DisableEBPFPrograms { - err = detachEBPFPrograms(ns, ifNameInNs) + err := detachEBPFPrograms(ns, ifNameInNs) if err != nil { klog.Infof("error disabling ebpf programs for %s in ns %s: %v", ifNameInNs, ns, err) return fmt.Errorf("error disabling ebpf programs for %s in ns %s: %v", ifNameInNs, ns, err) diff --git a/site/content/docs/user/gke-tpu-performance.md b/site/content/docs/user/gke-tpu-performance.md new file mode 100644 index 00000000..2ed37991 --- /dev/null +++ b/site/content/docs/user/gke-tpu-performance.md @@ -0,0 +1,229 @@ +--- +title: "GKE and Cloud TPU v6e (Trillium)" +date: 2025-05-27T11:30:40Z +--- + +If you use TPU Trillium and you want to improve the network performance of your Pods you can balance your network traffic over the VM NICs. + +The `ct6e-standard-4t` machine type is backed by two physical NICs, since the main interface of the VM is used for all the applications and Pods on the host, you can create two additional vNICs on the VM that will be attached to each of the physical NICs, and pass them to the Pod directly, so you can multiplex your traffic to consume the total capacity of the physical NICs. + +```sh +# Create two additional VPC networks +gcloud compute --project=${PROJECT?} \ + networks create \ + tpu-net-1 \ + --mtu=8896 \ + --subnet-mode=custom + +gcloud compute --project=${PROJECT?} \ + networks subnets create \ + tpu-net-1-sub \ + --network=tpu-net-1 \ + --region=${REGION?} \ + --range=192.168.0.0/24 + +gcloud compute --project=${PROJECT?} \ + networks create \ + tpu-net-2 \ + --mtu=8896 \ + --subnet-mode=custom + +gcloud compute --project=${PROJECT?} \ + networks subnets create \ + tpu-net-2-sub \ + --network=tpu-net-1 \ + --region=${REGION?} \ + --range=192.168.1.0/24 + +gcloud container node-pools create POOL_NAME \ + --location=${LOCATION} \ + --cluster=${CLUSTER_NAME} \ + --node-locations=${NODE_ZONES} \ + --machine-type=${MACHINE_TYPE} \ + --tpu-topology=${TPU_TOPOLOGY} \ + --additional-node-network network=tpu-net-1,subnetwork=tpu-net-1-sub \ + --additional-node-network network=tpu-net-2,subnetwork=tpu-net-2-sub \ + --enable-gvnic +``` + +Apply the following manifest to install DraNet: + +```sh +kubectl apply -f https://raw.githubusercontent.com/google/dranet/refs/heads/main/install.yaml +``` + +Once DraNet is running you'll be able to obtain the network resources exposed by the dranet Pods, in order to avoid noise, DraNet has a flag that allow to set client side filter to control the exposed resources, in this case, we can set the flag to ignore network devices that are `virtual`, the manifest will look like: + +```yaml + containers: + - args: + - /dranet + - --v=4 + - --filter=attributes["dra.net/virtual"].BoolValue == false + image: ghcr.io/google/dranet:stable +``` + +First, we tell DraNet what kind of NICs we're interested in and how Pods can claim them. In order to simplify our workloads we can create a `DeviceClass` that matches only the resources exposed by DraNet. + +**DeviceClass (dranet):** This selects NICs managed by DraNet. + +```yaml +apiVersion: resource.k8s.io/v1beta1 +kind: DeviceClass +metadata: + name: dranet +spec: + selectors: + - cel: + expression: device.driver == "dra.net" +``` + +**ResourceClaimTemplate (worker-rdma-nic-template):** This will request the two additional NICs, since we created the additiona networks with the prefix `tpu-net` we can levarage the powerful CEL expressions to match on that prefix. + +Another important factor is the capacity of DraNet to pass Interface configuration options that allow to tune the interfaces for maximum performance, per example, [Big TCP](https://lwn.net/Articles/884104/). + +In addition, if you have GVNIC enabled you can use some private ethtool flags that improve the performance for TCP like [enable-max-rx-buffer-size](enable-max-rx-buffer-size). + +```yaml +apiVersion: resource.k8s.io/v1beta1 +kind: ResourceClaimTemplate +metadata: + name: tpu-net-interfaces +spec: + spec: + devices: + requests: + - name: tpu-net-interface + deviceClassName: dranet + count: 2 + selectors: + - cel: + expression: device.attributes["gce.dra.net"].networkName.startsWith("tpu-net") + config: + - opaque: + driver: dra.net + parameters: + interface: + mtu: 8896 + gsoMaxSize: 65536 + groMaxSize: 65536 + gsoIPv4MaxSize: 65536 + groIPv4MaxSize: 65536 + disableEbpfPrograms: true + ethtool: + privateFlags: + enable-max-rx-buffer-size: true +``` + +To test the network performance we'll use [neper](https://github.com/google/neper), a tool created by the Google kernel teams to test network performance. + +```yaml +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: neper +spec: + selector: + matchLabels: + app: neper + serviceName: neper + replicas: 2 + template: + metadata: + labels: + app: neper + spec: + nodeSelector: + cloud.google.com/gke-tpu-accelerator: tpu-v6e-slice + cloud.google.com/gke-tpu-topology: 4x4 + initContainers: + - name: "network-optimization-sysctls" + image: "busybox" + securityContext: + privileged: true + command: + - sh + - -c + - | + echo 5000 > /proc/sys/net/ipv4/tcp_rto_min_us + echo 1 > /proc/sys/net/ipv4/tcp_no_metrics_save + echo 0 > /proc/sys/net/ipv4/tcp_slow_start_after_idle + echo 131072 > /proc/sys/net/core/optmem_max + echo "4096 41943040 314572800" > /proc/sys/net/ipv4/tcp_rmem + containers: + - name: neper + image: ghcr.io/google/neper:stable + securityContext: + privileged: true + resources: + requests: + google.com/tpu: 4 + limits: + google.com/tpu: 4 + resourceClaims: + - name: tpu-net-interface + resourceClaimTemplateName: tpu-net-interfaces +``` + +We'll get two pods running: + +```sh +$ kubectl get pods +NAME READY STATUS RESTARTS AGE +neper-0 1/1 Running 0 10m +neper-1 1/1 Running 0 22s +``` + +Using neper-1 as a server `kubectl exec -it neper-1 -- sh`, checks first the additional IPs assigned, in this case these IPs are 10.9.9.11 and 10.10.0.11 + +```sh +1: lo: mtu 65536 qdisc noqueue state UNKNOWN qlen 1000 + link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 + inet 127.0.0.1/8 scope host lo + valid_lft forever preferred_lft forever +2: eth0@if13: mtu 1460 qdisc noqueue state UP qlen 1000 + link/ether 16:41:72:68:11:67 brd ff:ff:ff:ff:ff:ff + inet 10.68.2.12/24 brd 10.68.2.255 scope global eth0 + valid_lft forever preferred_lft forever +3: eth1: mtu 8896 qdisc mq state UP qlen 1000 + link/ether 42:01:0a:09:09:0b brd ff:ff:ff:ff:ff:ff + inet 10.9.9.11/32 scope global eth1 + valid_lft forever preferred_lft forever +4: eth2: mtu 8896 qdisc mq state UP qlen 1000 + link/ether 42:01:0a:0a:00:0b brd ff:ff:ff:ff:ff:ff + inet 10.10.0.11/32 scope global eth2 + valid_lft forever preferred_lft forever +``` + +then run one TCP stream server per NIC: + +```sh +for i in 0 1; do + tcp_stream -C$((52279 + i)) --port=$((38339 + i)) --skip-rx-copy -rw -Z -B16384 --test-length=60 --suicide-length=120 -F100 --num-threads=16 --num-flows=32 -D0 --logtostderr &> test$i.log & +done +``` + +and neper-0 as a client `kubectl exec -it neper-0 -- sh` to connect to each TCP server: + +```sh +tcp_stream -C52279 --port=38339 --skip-rx-copy -rw -Z -B16384 --test-length=60 --suicide-length=70 -F100 --num-threads=16 --num-flows=32 --client -H 10.9.9.11 -D0 --logtostderr &> test0.log & +tcp_stream -C52280 --port=38340 --skip-rx-copy -rw -Z -B16384 --test-length=60 --suicide-length=70 -F100 --num-threads=16 --num-flows=32 --client -H 10.10.0.11 -D0 --logtostderr &> test1.log & +``` + +The first test instance recorded a throughput of ~180.17 Gbps, and the second instance simultaneously achieved ~174.73 Gbps. + +```sh +grep throughput test* +test0.log:throughput_opt=Mb +test0.log:throughput=180165.51 +test0.log:throughput_units=Mbit/s +test0.log:local_throughput=180165511242 +test0.log:remote_throughput=177503231653 +test1.log:throughput_opt=Mb +test1.log:throughput=174727.08 +test1.log:throughput_units=Mbit/s +test1.log:local_throughput=174727081480 +test1.log:remote_throughput=175469311719 +``` + +The sum of these two independent tests gives the total aggregated throughput of 354.9 Gbps.