diff --git a/cmd/kubectl-datadog/autoscaling/cluster/common/clusterinfo/classify.go b/cmd/kubectl-datadog/autoscaling/cluster/common/clusterinfo/classify.go index 03cefe2ac2..a5b7caa84a 100644 --- a/cmd/kubectl-datadog/autoscaling/cluster/common/clusterinfo/classify.go +++ b/cmd/kubectl-datadog/autoscaling/cluster/common/clusterinfo/classify.go @@ -2,10 +2,11 @@ package clusterinfo import ( "context" + "errors" "fmt" - "log" "maps" "regexp" + "slices" "strings" "time" @@ -15,16 +16,15 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/pager" ) -const ( - nodeListChunkSize = 100 - // describeASGInstancesMaxIDs is the documented per-call limit of - // autoscaling:DescribeAutoScalingInstances. Sending more triggers a - // ValidationError at the API. - describeASGInstancesMaxIDs = 50 -) +// describeASGInstancesMaxIDs is the documented per-call limit of +// autoscaling:DescribeAutoScalingInstances. Sending more triggers a +// ValidationError at the API. +const describeASGInstancesMaxIDs = 50 // awsProviderIDRegexp matches the AWS provider ID for EC2-backed nodes. // Format: aws:////i-. Fargate nodes use a different shape and @@ -79,38 +79,30 @@ type asgCandidate struct { func classifyByLabels(ctx context.Context, k8sClient kubernetes.Interface, info *ClusterInfo) ([]asgCandidate, error) { var candidates []asgCandidate - var cont string - for { - list, err := k8sClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{ - Limit: nodeListChunkSize, - Continue: cont, - }) - if err != nil { - return nil, fmt.Errorf("failed to list nodes: %w", err) - } - - for _, node := range list.Items { - if mgr, entity, ok := classifyNodeByLabel(&node); ok { - addToBucket(info, mgr, entity, node.Name) - continue - } - - matches := awsProviderIDRegexp.FindStringSubmatch(node.Spec.ProviderID) - if len(matches) == 2 { - candidates = append(candidates, asgCandidate{ - instanceID: matches[1], - nodeName: node.Name, - }) - } else { - addToBucket(info, NodeManagerUnknown, node.Spec.ProviderID, node.Name) - } + p := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return k8sClient.CoreV1().Nodes().List(ctx, opts) + }) + if err := p.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error { + node := obj.(*corev1.Node) + if mgr, entity, ok := classifyNodeByLabel(node); ok { + addToBucket(info, mgr, entity, node.Name) + return nil } - cont = list.Continue - if cont == "" { - return candidates, nil + matches := awsProviderIDRegexp.FindStringSubmatch(node.Spec.ProviderID) + if len(matches) == 2 { + candidates = append(candidates, asgCandidate{ + instanceID: matches[1], + nodeName: node.Name, + }) + } else { + addToBucket(info, NodeManagerUnknown, node.Spec.ProviderID, node.Name) } + return nil + }); err != nil { + return nil, fmt.Errorf("failed to list nodes: %w", err) } + return candidates, nil } // classifyNodeByLabel applies steps 1-3 of the decision tree using only the @@ -181,34 +173,39 @@ func addToBucket(info *ClusterInfo, mgr NodeManager, entity, nodeName string) { } // detectClusterAutoscaler scans Deployments cluster-wide and returns the -// first match. A match is any Deployment with name "cluster-autoscaler", a -// well-known label, or a container image referencing "cluster-autoscaler". -// Multiple matches yield a warning but only the first is recorded. +// first matching one encountered. A match is any Deployment with name +// "cluster-autoscaler", a well-known label, or a container image referencing +// "cluster-autoscaler". The cluster-wide enumeration order is unspecified, so +// when several matches coexist (a configuration we don't expect in practice), +// which one wins is non-deterministic — we accept that to keep the scan +// short-circuited and avoid materialising every Deployment in memory. func detectClusterAutoscaler(ctx context.Context, k8sClient kubernetes.Interface) (ClusterAutoscaler, error) { - list, err := k8sClient.AppsV1().Deployments(corev1.NamespaceAll).List(ctx, metav1.ListOptions{}) - if err != nil { - return ClusterAutoscaler{}, err - } + errFound := errors.New("cluster-autoscaler found") - matches := lo.Filter(list.Items, isClusterAutoscaler) - if len(matches) == 0 { - return ClusterAutoscaler{}, nil - } - if len(matches) > 1 { - log.Printf("Warning: %d Deployments match cluster-autoscaler heuristics; recording the first one (%s/%s).", - len(matches), matches[0].Namespace, matches[0].Name) + var result ClusterAutoscaler + p := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return k8sClient.AppsV1().Deployments(corev1.NamespaceAll).List(ctx, opts) + }) + err := p.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error { + dep := obj.(*appsv1.Deployment) + if !isClusterAutoscaler(*dep) { + return nil + } + result = ClusterAutoscaler{ + Present: true, + Namespace: dep.Namespace, + Name: dep.Name, + Version: extractClusterAutoscalerVersion(*dep), + } + return errFound + }) + if err != nil && !errors.Is(err, errFound) { + return ClusterAutoscaler{}, fmt.Errorf("failed to list Deployments: %w", err) } - return ClusterAutoscaler{ - Present: true, - Namespace: matches[0].Namespace, - Name: matches[0].Name, - Version: extractClusterAutoscalerVersion(matches[0]), - }, nil + return result, nil } -// isClusterAutoscaler matches lo.Filter's predicate signature so it can be -// passed directly without a wrapper. -func isClusterAutoscaler(d appsv1.Deployment, _ int) bool { +func isClusterAutoscaler(d appsv1.Deployment) bool { // A Deployment scaled to zero is effectively disabled; ignoring it lets // users who already stopped CA (per the Karpenter migration guide) get // `Present: false` in the snapshot. A nil Replicas defaults to 1 per the @@ -221,7 +218,7 @@ func isClusterAutoscaler(d appsv1.Deployment, _ int) bool { d.Labels["k8s-app"] == "cluster-autoscaler" { return true } - return lo.SomeBy(d.Spec.Template.Spec.Containers, func(c corev1.Container) bool { + return slices.ContainsFunc(d.Spec.Template.Spec.Containers, func(c corev1.Container) bool { return strings.Contains(c.Image, "cluster-autoscaler") }) } diff --git a/cmd/kubectl-datadog/autoscaling/cluster/install/guess/eksautomode.go b/cmd/kubectl-datadog/autoscaling/cluster/install/guess/eksautomode.go index de938ba52b..c3da708116 100644 --- a/cmd/kubectl-datadog/autoscaling/cluster/install/guess/eksautomode.go +++ b/cmd/kubectl-datadog/autoscaling/cluster/install/guess/eksautomode.go @@ -2,8 +2,8 @@ package guess import ( "fmt" + "slices" - "github.com/samber/lo" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/discovery" @@ -20,7 +20,7 @@ func IsEKSAutoModeEnabled(discoveryClient discovery.DiscoveryInterface) (bool, e return false, fmt.Errorf("failed to query eks.amazonaws.com/v1 API group: %w", err) } - return lo.ContainsBy(resources.APIResources, func(r metav1.APIResource) bool { + return slices.ContainsFunc(resources.APIResources, func(r metav1.APIResource) bool { return r.Name == "nodeclasses" }), nil } diff --git a/cmd/kubectl-datadog/autoscaling/cluster/install/guess/foreignkarpenter.go b/cmd/kubectl-datadog/autoscaling/cluster/install/guess/foreignkarpenter.go index 6142cab890..68072ad4f0 100644 --- a/cmd/kubectl-datadog/autoscaling/cluster/install/guess/foreignkarpenter.go +++ b/cmd/kubectl-datadog/autoscaling/cluster/install/guess/foreignkarpenter.go @@ -2,15 +2,18 @@ package guess import ( "context" + "errors" "fmt" "log" "slices" "strings" - "github.com/samber/lo" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/pager" ) // Datadog-namespaced labels written via the Karpenter chart's additionalLabels. @@ -39,11 +42,6 @@ const karpenterServiceEnvName = "KARPENTER_SERVICE" // `someone/karpenter/controller-something` do not false-positive. const karpenterControllerImageRepoSuffix = "karpenter/controller" -// deploymentListChunkSize bounds the size of a single List response so we -// don't pull thousands of Deployments into memory at once on dense clusters. -// Matches the chunk size used by GetNodesProperties. -const deploymentListChunkSize = 100 - // ForeignKarpenter is the location of a Karpenter controller Deployment // that conflicts with the install we're about to perform — either a // third-party install or a previous kubectl-datadog install in a different @@ -77,32 +75,28 @@ type ForeignKarpenter struct { // materialised in memory just to answer "is there at least one foreign // Karpenter Deployment". func FindForeignKarpenterInstallation(ctx context.Context, clientset kubernetes.Interface, targetNamespace string) (*ForeignKarpenter, error) { - var cont string - for { - deps, err := clientset.AppsV1().Deployments(metav1.NamespaceAll).List(ctx, metav1.ListOptions{ - Limit: deploymentListChunkSize, - Continue: cont, - }) - if err != nil { - return nil, fmt.Errorf("failed to list Deployments: %w", err) - } + errFound := errors.New("foreign Karpenter found") - for _, dep := range deps.Items { - if !hasKarpenterControllerContainer(dep.Spec.Template.Spec.Containers) { - continue - } - if dep.Namespace == targetNamespace && dep.Labels[InstalledByLabel] == InstalledByValue { - continue - } - log.Printf("Detected foreign Karpenter Deployment %s/%s", dep.Namespace, dep.Name) - return &ForeignKarpenter{Namespace: dep.Namespace, Name: dep.Name}, nil + var result *ForeignKarpenter + p := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return clientset.AppsV1().Deployments(metav1.NamespaceAll).List(ctx, opts) + }) + err := p.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error { + dep := obj.(*appsv1.Deployment) + if !hasKarpenterControllerContainer(dep.Spec.Template.Spec.Containers) { + return nil } - - cont = deps.Continue - if cont == "" { - return nil, nil + if dep.Namespace == targetNamespace && dep.Labels[InstalledByLabel] == InstalledByValue { + return nil } + log.Printf("Detected foreign Karpenter Deployment %s/%s", dep.Namespace, dep.Name) + result = &ForeignKarpenter{Namespace: dep.Namespace, Name: dep.Name} + return errFound + }) + if err != nil && !errors.Is(err, errFound) { + return nil, fmt.Errorf("failed to list Deployments: %w", err) } + return result, nil } // hasKarpenterControllerContainer reports whether any container in the pod @@ -110,11 +104,11 @@ func FindForeignKarpenterInstallation(ctx context.Context, clientset kubernetes. // chart-unconditional KARPENTER_SERVICE env var; secondary is the canonical // `karpenter/controller` image repository tail. func hasKarpenterControllerContainer(containers []corev1.Container) bool { - return lo.ContainsBy(containers, isKarpenterControllerContainer) + return slices.ContainsFunc(containers, isKarpenterControllerContainer) } func isKarpenterControllerContainer(c corev1.Container) bool { - if lo.ContainsBy(c.Env, func(e corev1.EnvVar) bool { return e.Name == karpenterServiceEnvName }) { + if slices.ContainsFunc(c.Env, func(e corev1.EnvVar) bool { return e.Name == karpenterServiceEnvName }) { return true } return imageRepoEndsWith(c.Image, karpenterControllerImageRepoSuffix) diff --git a/cmd/kubectl-datadog/autoscaling/cluster/install/guess/foreignkarpenter_test.go b/cmd/kubectl-datadog/autoscaling/cluster/install/guess/foreignkarpenter_test.go index 937e490e4a..88bc64bb91 100644 --- a/cmd/kubectl-datadog/autoscaling/cluster/install/guess/foreignkarpenter_test.go +++ b/cmd/kubectl-datadog/autoscaling/cluster/install/guess/foreignkarpenter_test.go @@ -230,10 +230,9 @@ func TestFindForeignKarpenterInstallation(t *testing.T) { assert.Contains(t, err.Error(), "failed to list Deployments") }) - t.Run("pagination follows Continue token across pages and short-circuits on first foreign match", func(t *testing.T) { - // Three pages: an empty page with a non-empty Continue token, a page - // with only our own Deployment, and a page where the foreign install - // lives. Page 4 must never be requested. + t.Run("pagination forwards Continue tokens across pages", func(t *testing.T) { + // Three pages with the foreign install on the last one, exercising + // cross-page Continue token forwarding. pages := []*appsv1.DeploymentList{ { ListMeta: metav1.ListMeta{Continue: "page2"}, @@ -249,16 +248,10 @@ func TestFindForeignKarpenterInstallation(t *testing.T) { }, }, { - ListMeta: metav1.ListMeta{Continue: "page4"}, Items: []appsv1.Deployment{ *deployment("their-ns", "their-karpenter", nil, karpenterControllerImage), }, }, - { - Items: []appsv1.Deployment{ - *deployment("never", "fetched", nil, karpenterControllerImage), - }, - }, } clientset := fake.NewSimpleClientset() @@ -266,9 +259,9 @@ func TestFindForeignKarpenterInstallation(t *testing.T) { clientset.PrependReactor("list", "deployments", func(action k8stesting.Action) (bool, runtime.Object, error) { opts := action.(k8stesting.ListActionImpl).GetListOptions() calls = append(calls, opts.Continue) - assert.EqualValues(t, deploymentListChunkSize, opts.Limit, "Limit must be set so the API server can chunk") + assert.NotZero(t, opts.Limit, "Limit must be set so the API server can chunk") require.Less(t, len(calls)-1, len(pages), - "reactor would over-fetch beyond the synthetic pages — early-exit broken") + "reactor would over-fetch beyond the synthetic pages") return true, pages[len(calls)-1], nil }) @@ -277,7 +270,7 @@ func TestFindForeignKarpenterInstallation(t *testing.T) { require.NoError(t, err) assert.Equal(t, &ForeignKarpenter{Namespace: "their-ns", Name: "their-karpenter"}, result) assert.Equal(t, []string{"", "page2", "page3"}, calls, - "each call must forward the previous page's Continue token, and page 4 must never be requested") + "each call must forward the previous page's Continue token") }) } diff --git a/cmd/kubectl-datadog/autoscaling/cluster/install/guess/nodesproperties.go b/cmd/kubectl-datadog/autoscaling/cluster/install/guess/nodesproperties.go index 3b9342af51..0c87366640 100644 --- a/cmd/kubectl-datadog/autoscaling/cluster/install/guess/nodesproperties.go +++ b/cmd/kubectl-datadog/autoscaling/cluster/install/guess/nodesproperties.go @@ -14,109 +14,135 @@ import ( "github.com/samber/lo" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/pager" ) -const nodeListChunkSize = 100 - // awsProviderIDRegexp matches the AWS provider ID format for EC2 instances. // Format: aws:///ZONE/INSTANCE_ID (e.g., aws:///us-east-1a/i-0abc123def456789) var awsProviderIDRegexp = regexp.MustCompile(`^aws:///[^/]+/(i-[0-9a-f]+)$`) +// ec2DescribeBatchSize bounds the number of instance IDs we hand to a single +// ec2:DescribeInstances / ec2:DescribeImages call. The K8s pager streams +// nodes individually, so without an explicit cap we'd send every cluster +// node in one EC2 request — risking request-size, throttling, or response +// pagination issues on dense clusters. +const ec2DescribeBatchSize = 100 + +// pendingNode captures the subset of corev1.Node that processNodeBatch needs. +// Storing only Labels and Taints avoids retaining the pager page's Items +// backing array (which a *corev1.Node would pin through its embedded slice +// header) and avoids copying the whole Node struct. +type pendingNode struct { + labels map[string]string + taints []corev1.Taint +} + func GetNodesProperties(ctx context.Context, clientset *kubernetes.Clientset, ec2Client *ec2.Client) (*NodePoolsSet, error) { nps := NewNodePoolsSet() - var cont string - for { - nodesList, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{ - Limit: nodeListChunkSize, - Continue: cont, - }) - if err != nil { - return nil, fmt.Errorf("failed to list nodes: %w", err) + pending := map[string]pendingNode{} + flush := func() error { + if len(pending) == 0 { + return nil + } + if err := processNodeBatch(ctx, ec2Client, nps, pending); err != nil { + return err } + clear(pending) + return nil + } - instanceToNode := lo.FilterSliceToMap(nodesList.Items, func(node corev1.Node) (string, corev1.Node, bool) { - // Filter out Karpenter-managed nodes - if _, isKarpenter := node.Labels["karpenter.k8s.aws/ec2nodeclass"]; isKarpenter { - return "", node, false - } + p := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return clientset.CoreV1().Nodes().List(ctx, opts) + }) + if err := p.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error { + node := obj.(*corev1.Node) + if _, isKarpenter := node.Labels["karpenter.k8s.aws/ec2nodeclass"]; isKarpenter { + return nil + } + matches := awsProviderIDRegexp.FindStringSubmatch(node.Spec.ProviderID) + if len(matches) != 2 { + log.Printf("Skipping node %s with unexpected provider ID: %s", node.Name, node.Spec.ProviderID) + return nil + } + pending[matches[1]] = pendingNode{labels: node.Labels, taints: node.Spec.Taints} + if len(pending) >= ec2DescribeBatchSize { + return flush() + } + return nil + }); err != nil { + return nil, fmt.Errorf("failed to list nodes: %w", err) + } - matches := awsProviderIDRegexp.FindStringSubmatch(node.Spec.ProviderID) - if len(matches) != 2 { - log.Printf("Skipping node %s with unexpected provider ID: %s", node.Name, node.Spec.ProviderID) - return "", node, false - } - return matches[1], node, true + if err := flush(); err != nil { + return nil, err + } + return nps, nil +} + +func processNodeBatch(ctx context.Context, ec2Client *ec2.Client, nps *NodePoolsSet, instanceToNode map[string]pendingNode) error { + instances, err := ec2Client.DescribeInstances(ctx, &ec2.DescribeInstancesInput{ + InstanceIds: slices.Collect(maps.Keys(instanceToNode)), + }) + if err != nil { + return fmt.Errorf("failed to describe instances: %w", err) + } + + imageIds := lo.Uniq(lo.FlatMap(instances.Reservations, func(reservation ec2types.Reservation, _ int) []string { + return lo.Map(reservation.Instances, func(instance ec2types.Instance, _ int) string { + return *instance.ImageId }) + })) - if len(instanceToNode) != 0 { - instances, err := ec2Client.DescribeInstances(ctx, &ec2.DescribeInstancesInput{ - InstanceIds: slices.Collect(maps.Keys(instanceToNode)), - }) - if err != nil { - return nil, fmt.Errorf("failed to describe instances: %w", err) - } + images, err := ec2Client.DescribeImages(ctx, &ec2.DescribeImagesInput{ + ImageIds: imageIds, + }) + if err != nil { + return fmt.Errorf("failed to describe images: %w", err) + } + amiIDsToFamily := lo.Associate(images.Images, func(image ec2types.Image) (string, string) { + return *image.ImageId, detectAMIFamilyFromImage(*image.Name) + }) - imageIds := lo.Uniq(lo.FlatMap(instances.Reservations, func(reservation ec2types.Reservation, _ int) []string { - return lo.Map(reservation.Instances, func(instance ec2types.Instance, _ int) string { - return *instance.ImageId - }) - })) + for _, reservation := range instances.Reservations { + for _, instance := range reservation.Instances { + node := instanceToNode[*instance.InstanceId] - images, err := ec2Client.DescribeImages(ctx, &ec2.DescribeImagesInput{ - ImageIds: imageIds, - }) - if err != nil { - return nil, fmt.Errorf("failed to describe images: %w", err) + amiFamily := "Custom" + if family, ok := amiIDsToFamily[*instance.ImageId]; ok { + amiFamily = family } - amiIDsToFamily := lo.Associate(images.Images, func(image ec2types.Image) (string, string) { - return *image.ImageId, detectAMIFamilyFromImage(*image.Name) - }) - for _, reservation := range instances.Reservations { - for _, instance := range reservation.Instances { - node := instanceToNode[*instance.InstanceId] - - amiFamily := "Custom" - if family, ok := amiIDsToFamily[*instance.ImageId]; ok { - amiFamily = family - } - - blockDeviceMappings, err := extractBlockDeviceMappingsWithVolumeDetails(ctx, ec2Client, instance.BlockDeviceMappings) - if err != nil { - log.Printf("Failed to get volume details for instance %s: %v", *instance.InstanceId, err) - blockDeviceMappings = extractBasicBlockDeviceMappings(instance.BlockDeviceMappings) - } - - nps.Add(NodePoolsSetAddParams{ - AMIFamily: amiFamily, - AMIID: *instance.ImageId, - SubnetIDs: []string{*instance.SubnetId}, - SecurityGroupIDs: lo.Map(instance.SecurityGroups, func(sg ec2types.GroupIdentifier, _ int) string { return *sg.GroupId }), - MetadataOptions: extractMetadataOptions(instance.MetadataOptions), - BlockDeviceMappings: blockDeviceMappings, - Labels: node.Labels, - Taints: node.Spec.Taints, - Architecture: convertArchitecture(instance.Architecture), - Zones: extractZones(instance.Placement), - InstanceTypes: []string{string(instance.InstanceType)}, - CapacityType: convertInstanceLifecycleType(instance.InstanceLifecycle), - }) - } + blockDeviceMappings, err := extractBlockDeviceMappingsWithVolumeDetails(ctx, ec2Client, instance.BlockDeviceMappings) + if err != nil { + log.Printf("Failed to get volume details for instance %s: %v", *instance.InstanceId, err) + blockDeviceMappings = extractBasicBlockDeviceMappings(instance.BlockDeviceMappings) } - } - cont = nodesList.Continue - if cont == "" { - return nps, nil + nps.Add(NodePoolsSetAddParams{ + AMIFamily: amiFamily, + AMIID: *instance.ImageId, + SubnetIDs: []string{*instance.SubnetId}, + SecurityGroupIDs: lo.Map(instance.SecurityGroups, func(sg ec2types.GroupIdentifier, _ int) string { return *sg.GroupId }), + MetadataOptions: extractMetadataOptions(instance.MetadataOptions), + BlockDeviceMappings: blockDeviceMappings, + Labels: node.labels, + Taints: node.taints, + Architecture: convertArchitecture(instance.Architecture), + Zones: extractZones(instance.Placement), + InstanceTypes: []string{string(instance.InstanceType)}, + CapacityType: convertInstanceLifecycleType(instance.InstanceLifecycle), + }) } } + return nil } func detectAMIFamilyFromImage(imageName string) string { containsAny := func(s string, patterns ...string) bool { - return lo.SomeBy(patterns, func(pattern string) bool { + return slices.ContainsFunc(patterns, func(pattern string) bool { return strings.Contains(s, pattern) }) } diff --git a/cmd/kubectl-datadog/autoscaling/cluster/uninstall/uninstall.go b/cmd/kubectl-datadog/autoscaling/cluster/uninstall/uninstall.go index 7384f89d69..650bc617ca 100644 --- a/cmd/kubectl-datadog/autoscaling/cluster/uninstall/uninstall.go +++ b/cmd/kubectl-datadog/autoscaling/cluster/uninstall/uninstall.go @@ -25,8 +25,10 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/cli-runtime/pkg/genericclioptions" + "k8s.io/client-go/tools/pager" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -394,24 +396,24 @@ func listKarpenterNodes(ctx context.Context, cli *clients.Clients, ec2NodeClassN return nil, nil // No EC2NodeClasses to match } - // List all Karpenter-managed nodes - nodesList, err := cli.K8sClientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{ - LabelSelector: "karpenter.k8s.aws/ec2nodeclass", - }) - if err != nil { - return nil, err - } - - // Filter nodes that belong to our EC2NodeClasses nodeClassSet := lo.SliceToMap(ec2NodeClassNames, func(name string) (string, struct{}) { return name, struct{}{} }) - return lo.FilterMap(nodesList.Items, func(node corev1.Node, _ int) (string, bool) { - nodeClass := node.Labels["karpenter.k8s.aws/ec2nodeclass"] - _, matches := nodeClassSet[nodeClass] - return node.Name, matches - }), nil + var names []string + p := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return cli.K8sClientset.CoreV1().Nodes().List(ctx, opts) + }) + if err := p.EachListItem(ctx, metav1.ListOptions{LabelSelector: "karpenter.k8s.aws/ec2nodeclass"}, func(obj runtime.Object) error { + node := obj.(*corev1.Node) + if _, matches := nodeClassSet[node.Labels["karpenter.k8s.aws/ec2nodeclass"]]; matches { + names = append(names, node.Name) + } + return nil + }); err != nil { + return nil, fmt.Errorf("failed to list Karpenter nodes: %w", err) + } + return names, nil } func waitForKarpenterNodesToTerminate(ctx context.Context, cli *clients.Clients, clusterName string, nodePoolNames []string) error {