Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package clusterinfo

import (
"context"
"errors"
"fmt"
"log"
"maps"
"regexp"
"slices"
"strings"
"time"

Expand All @@ -15,16 +16,15 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/pager"
)

const (
nodeListChunkSize = 100
// describeASGInstancesMaxIDs is the documented per-call limit of
// autoscaling:DescribeAutoScalingInstances. Sending more triggers a
// ValidationError at the API.
describeASGInstancesMaxIDs = 50
)
// describeASGInstancesMaxIDs is the documented per-call limit of
// autoscaling:DescribeAutoScalingInstances. Sending more triggers a
// ValidationError at the API.
const describeASGInstancesMaxIDs = 50

// awsProviderIDRegexp matches the AWS provider ID for EC2-backed nodes.
// Format: aws:///<az>/i-<hex>. Fargate nodes use a different shape and
Expand Down Expand Up @@ -79,38 +79,30 @@ type asgCandidate struct {
func classifyByLabels(ctx context.Context, k8sClient kubernetes.Interface, info *ClusterInfo) ([]asgCandidate, error) {
var candidates []asgCandidate

var cont string
for {
list, err := k8sClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{
Limit: nodeListChunkSize,
Continue: cont,
})
if err != nil {
return nil, fmt.Errorf("failed to list nodes: %w", err)
}

for _, node := range list.Items {
if mgr, entity, ok := classifyNodeByLabel(&node); ok {
addToBucket(info, mgr, entity, node.Name)
continue
}

matches := awsProviderIDRegexp.FindStringSubmatch(node.Spec.ProviderID)
if len(matches) == 2 {
candidates = append(candidates, asgCandidate{
instanceID: matches[1],
nodeName: node.Name,
})
} else {
addToBucket(info, NodeManagerUnknown, node.Spec.ProviderID, node.Name)
}
p := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
return k8sClient.CoreV1().Nodes().List(ctx, opts)
})
if err := p.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
node := obj.(*corev1.Node)
if mgr, entity, ok := classifyNodeByLabel(node); ok {
addToBucket(info, mgr, entity, node.Name)
return nil
}

cont = list.Continue
if cont == "" {
return candidates, nil
matches := awsProviderIDRegexp.FindStringSubmatch(node.Spec.ProviderID)
if len(matches) == 2 {
candidates = append(candidates, asgCandidate{
instanceID: matches[1],
nodeName: node.Name,
})
} else {
addToBucket(info, NodeManagerUnknown, node.Spec.ProviderID, node.Name)
}
return nil
}); err != nil {
return nil, fmt.Errorf("failed to list nodes: %w", err)
}
return candidates, nil
}

// classifyNodeByLabel applies steps 1-3 of the decision tree using only the
Expand Down Expand Up @@ -181,34 +173,39 @@ func addToBucket(info *ClusterInfo, mgr NodeManager, entity, nodeName string) {
}

// detectClusterAutoscaler scans Deployments cluster-wide and returns the
// first match. A match is any Deployment with name "cluster-autoscaler", a
// well-known label, or a container image referencing "cluster-autoscaler".
// Multiple matches yield a warning but only the first is recorded.
// first matching one encountered. A match is any Deployment with name
// "cluster-autoscaler", a well-known label, or a container image referencing
// "cluster-autoscaler". The cluster-wide enumeration order is unspecified, so
// when several matches coexist (a configuration we don't expect in practice),
// which one wins is non-deterministic — we accept that to keep the scan
// short-circuited and avoid materialising every Deployment in memory.
func detectClusterAutoscaler(ctx context.Context, k8sClient kubernetes.Interface) (ClusterAutoscaler, error) {
list, err := k8sClient.AppsV1().Deployments(corev1.NamespaceAll).List(ctx, metav1.ListOptions{})
if err != nil {
return ClusterAutoscaler{}, err
}
errFound := errors.New("cluster-autoscaler found")

matches := lo.Filter(list.Items, isClusterAutoscaler)
if len(matches) == 0 {
return ClusterAutoscaler{}, nil
}
if len(matches) > 1 {
log.Printf("Warning: %d Deployments match cluster-autoscaler heuristics; recording the first one (%s/%s).",
len(matches), matches[0].Namespace, matches[0].Name)
var result ClusterAutoscaler
p := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
return k8sClient.AppsV1().Deployments(corev1.NamespaceAll).List(ctx, opts)
})
err := p.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
dep := obj.(*appsv1.Deployment)
if !isClusterAutoscaler(*dep) {
return nil
}
result = ClusterAutoscaler{
Present: true,
Namespace: dep.Namespace,
Name: dep.Name,
Version: extractClusterAutoscalerVersion(*dep),
}
return errFound
})
if err != nil && !errors.Is(err, errFound) {
return ClusterAutoscaler{}, fmt.Errorf("failed to list Deployments: %w", err)
}
return ClusterAutoscaler{
Present: true,
Namespace: matches[0].Namespace,
Name: matches[0].Name,
Version: extractClusterAutoscalerVersion(matches[0]),
}, nil
return result, nil
}

// isClusterAutoscaler matches lo.Filter's predicate signature so it can be
// passed directly without a wrapper.
func isClusterAutoscaler(d appsv1.Deployment, _ int) bool {
func isClusterAutoscaler(d appsv1.Deployment) bool {
// A Deployment scaled to zero is effectively disabled; ignoring it lets
// users who already stopped CA (per the Karpenter migration guide) get
// `Present: false` in the snapshot. A nil Replicas defaults to 1 per the
Expand All @@ -221,7 +218,7 @@ func isClusterAutoscaler(d appsv1.Deployment, _ int) bool {
d.Labels["k8s-app"] == "cluster-autoscaler" {
return true
}
return lo.SomeBy(d.Spec.Template.Spec.Containers, func(c corev1.Container) bool {
return slices.ContainsFunc(d.Spec.Template.Spec.Containers, func(c corev1.Container) bool {
return strings.Contains(c.Image, "cluster-autoscaler")
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package guess

import (
"fmt"
"slices"

"github.com/samber/lo"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery"
Expand All @@ -20,7 +20,7 @@ func IsEKSAutoModeEnabled(discoveryClient discovery.DiscoveryInterface) (bool, e
return false, fmt.Errorf("failed to query eks.amazonaws.com/v1 API group: %w", err)
}

return lo.ContainsBy(resources.APIResources, func(r metav1.APIResource) bool {
return slices.ContainsFunc(resources.APIResources, func(r metav1.APIResource) bool {
return r.Name == "nodeclasses"
}), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@ package guess

import (
"context"
"errors"
"fmt"
"log"
"slices"
"strings"

"github.com/samber/lo"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/pager"
)

// Datadog-namespaced labels written via the Karpenter chart's additionalLabels.
Expand Down Expand Up @@ -39,11 +42,6 @@ const karpenterServiceEnvName = "KARPENTER_SERVICE"
// `someone/karpenter/controller-something` do not false-positive.
const karpenterControllerImageRepoSuffix = "karpenter/controller"

// deploymentListChunkSize bounds the size of a single List response so we
// don't pull thousands of Deployments into memory at once on dense clusters.
// Matches the chunk size used by GetNodesProperties.
const deploymentListChunkSize = 100

// ForeignKarpenter is the location of a Karpenter controller Deployment
// that conflicts with the install we're about to perform — either a
// third-party install or a previous kubectl-datadog install in a different
Expand Down Expand Up @@ -77,44 +75,40 @@ type ForeignKarpenter struct {
// materialised in memory just to answer "is there at least one foreign
// Karpenter Deployment".
func FindForeignKarpenterInstallation(ctx context.Context, clientset kubernetes.Interface, targetNamespace string) (*ForeignKarpenter, error) {
var cont string
for {
deps, err := clientset.AppsV1().Deployments(metav1.NamespaceAll).List(ctx, metav1.ListOptions{
Limit: deploymentListChunkSize,
Continue: cont,
})
if err != nil {
return nil, fmt.Errorf("failed to list Deployments: %w", err)
}
errFound := errors.New("foreign Karpenter found")

for _, dep := range deps.Items {
if !hasKarpenterControllerContainer(dep.Spec.Template.Spec.Containers) {
continue
}
if dep.Namespace == targetNamespace && dep.Labels[InstalledByLabel] == InstalledByValue {
continue
}
log.Printf("Detected foreign Karpenter Deployment %s/%s", dep.Namespace, dep.Name)
return &ForeignKarpenter{Namespace: dep.Namespace, Name: dep.Name}, nil
var result *ForeignKarpenter
p := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
return clientset.AppsV1().Deployments(metav1.NamespaceAll).List(ctx, opts)
})
err := p.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
dep := obj.(*appsv1.Deployment)
if !hasKarpenterControllerContainer(dep.Spec.Template.Spec.Containers) {
return nil
}

cont = deps.Continue
if cont == "" {
return nil, nil
if dep.Namespace == targetNamespace && dep.Labels[InstalledByLabel] == InstalledByValue {
return nil
}
log.Printf("Detected foreign Karpenter Deployment %s/%s", dep.Namespace, dep.Name)
result = &ForeignKarpenter{Namespace: dep.Namespace, Name: dep.Name}
return errFound
})
if err != nil && !errors.Is(err, errFound) {
return nil, fmt.Errorf("failed to list Deployments: %w", err)
}
return result, nil
}

// hasKarpenterControllerContainer reports whether any container in the pod
// spec is the Karpenter controller — primary signal is the
// chart-unconditional KARPENTER_SERVICE env var; secondary is the canonical
// `karpenter/controller` image repository tail.
func hasKarpenterControllerContainer(containers []corev1.Container) bool {
return lo.ContainsBy(containers, isKarpenterControllerContainer)
return slices.ContainsFunc(containers, isKarpenterControllerContainer)
}

func isKarpenterControllerContainer(c corev1.Container) bool {
if lo.ContainsBy(c.Env, func(e corev1.EnvVar) bool { return e.Name == karpenterServiceEnvName }) {
if slices.ContainsFunc(c.Env, func(e corev1.EnvVar) bool { return e.Name == karpenterServiceEnvName }) {
return true
}
return imageRepoEndsWith(c.Image, karpenterControllerImageRepoSuffix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,9 @@ func TestFindForeignKarpenterInstallation(t *testing.T) {
assert.Contains(t, err.Error(), "failed to list Deployments")
})

t.Run("pagination follows Continue token across pages and short-circuits on first foreign match", func(t *testing.T) {
// Three pages: an empty page with a non-empty Continue token, a page
// with only our own Deployment, and a page where the foreign install
// lives. Page 4 must never be requested.
t.Run("pagination forwards Continue tokens across pages", func(t *testing.T) {
// Three pages with the foreign install on the last one, exercising
// cross-page Continue token forwarding.
pages := []*appsv1.DeploymentList{
{
ListMeta: metav1.ListMeta{Continue: "page2"},
Expand All @@ -249,26 +248,20 @@ func TestFindForeignKarpenterInstallation(t *testing.T) {
},
},
{
ListMeta: metav1.ListMeta{Continue: "page4"},
Items: []appsv1.Deployment{
*deployment("their-ns", "their-karpenter", nil, karpenterControllerImage),
},
},
{
Items: []appsv1.Deployment{
*deployment("never", "fetched", nil, karpenterControllerImage),
},
},
}

clientset := fake.NewSimpleClientset()
var calls []string
clientset.PrependReactor("list", "deployments", func(action k8stesting.Action) (bool, runtime.Object, error) {
opts := action.(k8stesting.ListActionImpl).GetListOptions()
calls = append(calls, opts.Continue)
assert.EqualValues(t, deploymentListChunkSize, opts.Limit, "Limit must be set so the API server can chunk")
assert.NotZero(t, opts.Limit, "Limit must be set so the API server can chunk")
require.Less(t, len(calls)-1, len(pages),
"reactor would over-fetch beyond the synthetic pages — early-exit broken")
"reactor would over-fetch beyond the synthetic pages")
return true, pages[len(calls)-1], nil
})

Expand All @@ -277,7 +270,7 @@ func TestFindForeignKarpenterInstallation(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, &ForeignKarpenter{Namespace: "their-ns", Name: "their-karpenter"}, result)
assert.Equal(t, []string{"", "page2", "page3"}, calls,
"each call must forward the previous page's Continue token, and page 4 must never be requested")
"each call must forward the previous page's Continue token")
})
}

Expand Down
Loading
Loading