From 98d772dac540e67dc147930ec98c18b25f5f38d5 Mon Sep 17 00:00:00 2001 From: Conor Branagan Date: Fri, 7 Jul 2017 17:45:23 -0400 Subject: [PATCH] kubernetes: Collect local pod and container state from kubelet In an earlier change we cached the Kubernetes metadata state to avoid additional load on a server. But this had the trade-off that we would have out-of-date information which is definitely not great for high-churn containers. So as a solution this change will make is so we _always_ query the local kubelet (which should be a very light, local query) for pod and container state. This local data is merged with the global data to provide both an up-to-date view as well as data that only comes from the master (deployments, services). The final trade-off is that we could have pods/containers that aren't initially attached to their deployment or service until the next global fetch. --- agent/collector.go | 2 +- agent/main.go | 3 +- checks/containers.go | 45 ------ checks/process.go | 39 +++-- config/config.go | 25 +++ util/docker/docker.go | 20 +++ util/kubernetes/kubernetes.go | 287 ++++++++++++++++++++++++++++++++++ 7 files changed, 365 insertions(+), 56 deletions(-) delete mode 100644 checks/containers.go create mode 100644 util/kubernetes/kubernetes.go diff --git a/agent/collector.go b/agent/collector.go index 7b6c060ae..6678c5f00 100644 --- a/agent/collector.go +++ b/agent/collector.go @@ -76,7 +76,7 @@ func NewCollector(cfg *config.AgentConfig) Collector { // Each check should handle a empty state initialization. checks: collectorChecks{ - process: &checks.ProcessCheck{}, + process: checks.NewProcessCheck(cfg), realTime: &checks.RealTimeCheck{}, connections: &checks.ConnectionsCheck{}, }, diff --git a/agent/main.go b/agent/main.go index 7b0a3509f..f97f0deec 100644 --- a/agent/main.go +++ b/agent/main.go @@ -150,7 +150,8 @@ func handleSignals(exit chan bool) { func debugCheckResults(cfg *config.AgentConfig, check string) error { switch check { case "process": - return printResults(cfg, &checks.ProcessCheck{}, check) + p := checks.NewProcessCheck(cfg) + return printResults(cfg, p, check) case "connections": return printResults(cfg, &checks.ConnectionsCheck{}, check) case "realtime": diff --git a/checks/containers.go b/checks/containers.go deleted file mode 100644 index b502f84f6..000000000 --- a/checks/containers.go +++ /dev/null @@ -1,45 +0,0 @@ -package checks - -import ( - log "github.com/cihub/seelog" - "os" - "time" - - agentpayload "github.com/DataDog/agent-payload/gogen" - "github.com/DataDog/datadog-agent/pkg/metadata/kubernetes" - "github.com/DataDog/datadog-process-agent/util/cache" -) - -const ( - kubernetesMetaTTL = 3 * time.Minute -) - -var lastKubeErr string - -func GetKubernetesMeta() *agentpayload.KubeMetadataPayload { - if os.Getenv("KUBERNETES_SERVICE_HOST") == "" { - // If this is not defined then we're not running in a k8s cluster. - return nil - } - - var kubeMeta *agentpayload.KubeMetadataPayload - cacheKey := "kubernetes_meta" - payload, ok := cache.Get(cacheKey) - if !ok { - payload, err := kubernetes.GetPayload() - if err != nil { - if err.Error() != lastKubeErr { - // Swallowing this error for now with an error as it shouldn't block collection. - log.Errorf("Unable to get kubernetes metadata: %s", err) - // Only log the same error once to prevent noisy logs. - lastKubeErr = err.Error() - } - return nil - } - kubeMeta = payload.(*agentpayload.KubeMetadataPayload) - cache.SetWithTTL(cacheKey, kubeMeta, kubernetesMetaTTL) - } else if payload != nil { - kubeMeta = payload.(*agentpayload.KubeMetadataPayload) - } - return kubeMeta -} diff --git a/checks/process.go b/checks/process.go index 40d4141be..b89c273b0 100644 --- a/checks/process.go +++ b/checks/process.go @@ -1,11 +1,13 @@ package checks import ( + "os" "os/user" "runtime" "strconv" "time" + agentpayload "github.com/DataDog/agent-payload/gogen" "github.com/DataDog/gopsutil/cpu" "github.com/DataDog/gopsutil/process" log "github.com/cihub/seelog" @@ -13,6 +15,7 @@ import ( "github.com/DataDog/datadog-process-agent/config" "github.com/DataDog/datadog-process-agent/model" "github.com/DataDog/datadog-process-agent/util/docker" + "github.com/DataDog/datadog-process-agent/util/kubernetes" ) var lastDockerErr string @@ -20,6 +23,20 @@ var lastDockerErr string type ProcessCheck struct { lastCPUTime cpu.TimesStat lastProcs map[int32]*process.FilledProcess + kubeUtil *kubernetes.KubeUtil +} + +func NewProcessCheck(cfg *config.AgentConfig) *ProcessCheck { + var err error + var kubeUtil *kubernetes.KubeUtil + if os.Getenv("KUBERNETES_SERVICE_HOST") != "" && cfg.CollectKubernetesMetadata { + kubeUtil, err = kubernetes.NewKubeUtil(cfg) + if err != nil { + log.Errorf("error initializing kubernetes check, metadata won't be collected: %s", err) + } + } + + return &ProcessCheck{kubeUtil: kubeUtil} } func (p *ProcessCheck) Run(cfg *config.AgentConfig, groupID int32) ([]model.MessageBody, error) { @@ -32,7 +49,6 @@ func (p *ProcessCheck) Run(cfg *config.AgentConfig, groupID int32) ([]model.Mess if err != nil { return nil, err } - // End check early if this is our first run. if p.lastProcs == nil { p.lastProcs = fps @@ -40,6 +56,7 @@ func (p *ProcessCheck) Run(cfg *config.AgentConfig, groupID int32) ([]model.Mess return nil, nil } + // Pull in container metadata, where available. pids := make([]int32, 0, len(fps)) for _, fp := range fps { pids = append(pids, fp.Pid) @@ -51,13 +68,17 @@ func (p *ProcessCheck) Run(cfg *config.AgentConfig, groupID int32) ([]model.Mess log.Warnf("unable to get docker stats: %s", err) lastDockerErr = err.Error() } + var kubeMeta *agentpayload.KubeMetadataPayload + if p.kubeUtil != nil { + kubeMeta = p.kubeUtil.GetKubernetesMeta(cfg) + } info, err := collectSystemInfo(cfg) if err != nil { return nil, err } - // Pre-filter the list to get an accurate grou psize. + // Pre-filter the list to get an accurate group size. filteredFps := make([]*process.FilledProcess, 0, len(fps)) for _, fp := range fps { if !p.skipProcess(cfg, fp) { @@ -73,14 +94,14 @@ func (p *ProcessCheck) Run(cfg *config.AgentConfig, groupID int32) ([]model.Mess procs := make([]*model.Process, 0, cfg.ProcLimit) for _, fp := range filteredFps { container, _ := containerByPID[fp.Pid] - if len(procs) >= cfg.ProcLimit { messages = append(messages, &model.CollectorProc{ - HostName: cfg.HostName, - Processes: procs, - Info: info, - GroupId: groupID, - GroupSize: int32(groupSize), + HostName: cfg.HostName, + Processes: procs, + Info: info, + GroupId: groupID, + GroupSize: int32(groupSize), + Kubernetes: kubeMeta, }) procs = make([]*model.Process, 0, cfg.ProcLimit) } @@ -105,7 +126,7 @@ func (p *ProcessCheck) Run(cfg *config.AgentConfig, groupID int32) ([]model.Mess GroupSize: int32(groupSize), // FIXME: We should not send this in every payload. Long-term the container // ID should be enough context to resolve this metadata on the backend. - Kubernetes: GetKubernetesMeta(), + Kubernetes: kubeMeta, }) // Store the last state for comparison on the next run. diff --git a/config/config.go b/config/config.go index e98cf4758..a5ae38e66 100644 --- a/config/config.go +++ b/config/config.go @@ -35,6 +35,12 @@ type AgentConfig struct { Proxy *url.URL Timers *CheckTimers Logger *LoggerConfig + + // Kubernetes + CollectKubernetesMetadata bool + KubernetesKubeletHost string + KubernetesHTTPKubeletPort int + KubernetesHTTPSKubeletPort int } const ( @@ -67,6 +73,11 @@ func NewDefaultAgentConfig() *AgentConfig { Connections: time.NewTicker(3 * 60 * time.Minute), RealTime: time.NewTicker(2 * time.Second), }, + + // Kubernetes + CollectKubernetesMetadata: true, + KubernetesHTTPKubeletPort: 10255, + KubernetesHTTPSKubeletPort: 10250, } return ac @@ -225,6 +236,20 @@ func mergeEnv(c *AgentConfig) *AgentConfig { } } + // Kubernetes config is set via environment only (for now). + if v := os.Getenv("DD_COLLECT_KUBERNETES_METADATA"); v == "false" { + c.CollectKubernetesMetadata = false + } + if v := os.Getenv("DD_KUBERNETES_KUBELET_HOST"); v != "" { + c.KubernetesKubeletHost = v + } + if v := os.Getenv("DD_KUBERNETES_KUBELET_HTTP_PORT"); v != "" { + c.KubernetesHTTPKubeletPort, _ = strconv.Atoi(v) + } + if v := os.Getenv("DD_KUBERNETES_KUBELET_HTTPS_PORT"); v == "false" { + c.KubernetesHTTPSKubeletPort, _ = strconv.Atoi(v) + } + return c } diff --git a/util/docker/docker.go b/util/docker/docker.go index 619442e97..3c88ed1d1 100644 --- a/util/docker/docker.go +++ b/util/docker/docker.go @@ -3,6 +3,7 @@ package docker import ( "context" "errors" + "fmt" "os" "github.com/docker/docker/api/types" @@ -119,3 +120,22 @@ func ContainersByPID(pids []int32) (map[int32]*Container, error) { } return containerMap, nil } + +func GetHostname() (string, error) { + if os.Getenv("DOCKER_API_VERSION") == "" { + version, err := detectServerAPIVersion() + if err != nil { + return "", err + } + os.Setenv("DOCKER_API_VERSION", version) + } + client, err := client.NewEnvClient() + if err != nil { + return "", err + } + info, err := client.Info(context.Background()) + if err != nil { + return "", fmt.Errorf("unable to get Docker info: %s", err) + } + return info.Name, nil +} diff --git a/util/kubernetes/kubernetes.go b/util/kubernetes/kubernetes.go new file mode 100644 index 000000000..7867a358d --- /dev/null +++ b/util/kubernetes/kubernetes.go @@ -0,0 +1,287 @@ +package kubernetes + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strings" + "time" + + agentpayload "github.com/DataDog/agent-payload/gogen" + agentkubernetes "github.com/DataDog/datadog-agent/pkg/metadata/kubernetes" + log "github.com/cihub/seelog" + + "github.com/DataDog/datadog-process-agent/config" + "github.com/DataDog/datadog-process-agent/util/cache" + "github.com/DataDog/datadog-process-agent/util/docker" +) + +// Kubelet constants +const ( + authTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token" + kubernetesMetaTTL = 3 * time.Minute + + // Kube creator types, from owner reference. + kindDaemonSet = "DaemonSet" + kindReplicaSet = "ReplicaSet" + kindReplicationController = "ReplicationController" + kindDeployment = "Deployment" + kindJob = "Job" +) + +// Pod contains fields for unmarshalling a Pod +type Pod struct { + Metadata ObjectMeta `json:"metadata,omitempty"` + Spec PodSpec `json:"spec,omitempty"` + Status PodStatus `json:"status,omitempty"` +} + +// PodList contains fields for unmarshalling a PodList +type PodList struct { + Items []*Pod `json:"items,omitempty"` +} + +// PodSpec contains fields for unmarshalling a PodSpec +type PodSpec struct { + HostNetwork bool `json:"hostNetwork,omitempty"` + Hostname string `json:"hostname,omitempty"` +} + +// PodStatus contains fields for unmarshalling a PodStatus +type PodStatus struct { + HostIP string `json:"hostIP,omitempty"` + PodIP string `json:"podIP,omitempty"` + ContainerStatuses []*ContainerStatus `json:"containerStatuses,omitempty"` +} + +// ContainerStatus contains fields for unmarshaling a ContainerStatus +type ContainerStatus struct { + Name string `json:"name,omitempty"` + ContainerID string `json:"containerID,omitempty"` + Image string `json:"image,omitempty"` + ImageID string `json:"imageID,omitempty"` +} + +// ObjectMetadata contains the fields for unmarshaling Kubernetes resource metadata +// limited to just those fields we use in our metadata collection. +type ObjectMeta struct { + Name string `json:"name,omitempty"` + Namespace string `json:"namespace,omitempty"` + Uid string `json:"uid,omitempty"` + Labels map[string]string `json:"labels,omitempty"` + OwnerReferences []*OwnerReference `json:"ownerReferences,omitempty"` +} + +// OwnerReference contains information to identify an owning object limited to +// what we need for metadata collection. +type OwnerReference struct { + Kind string `json:"kind,omitempty"` + Name string `json:"name,omitempty"` +} + +// KubeUtil is a struct to hold Kubernetes config state. +type KubeUtil struct { + kubeletAPIURL string + lastKubeErr string +} + +// NewKubeUtil returns a new instance of KubeUtil. +func NewKubeUtil(cfg *config.AgentConfig) (*KubeUtil, error) { + kubeletURL, err := locateKubelet(cfg) + if err != nil { + return nil, err + } + + return &KubeUtil{kubeletAPIURL: kubeletURL}, nil +} + +// GetKubernetesMeta returns a Kubernetes metadata payload using a mix of state from the +// Kube master and local kubelet. +func (ku *KubeUtil) GetKubernetesMeta(cfg *config.AgentConfig) *agentpayload.KubeMetadataPayload { + // The whole metadata payload is cached to limit load on the master server. + var kubeMeta *agentpayload.KubeMetadataPayload + cacheKey := "kubernetes_meta" + payload, ok := cache.Get(cacheKey) + if !ok { + payload, err := agentkubernetes.GetPayload() + if err != nil { + if err.Error() != ku.lastKubeErr { + // Swallowing this error for now with an error as it shouldn't block collection. + log.Errorf("Unable to get kubernetes metadata: %s", err) + // Only log the same error once to prevent noisy logs. + ku.lastKubeErr = err.Error() + } + return nil + } + kubeMeta = payload.(*agentpayload.KubeMetadataPayload) + cache.SetWithTTL(cacheKey, kubeMeta, kubernetesMetaTTL) + } else if payload != nil { + kubeMeta = payload.(*agentpayload.KubeMetadataPayload) + } + + if kubeMeta != nil { + // But we can fetch local state from the kubelet and merge. + localPods, err := ku.GetLocalPodList() + if err != nil { + log.Errorf("Unable to get local pods from kubelet: %s", err) + } + pods, containers := parseLocalPods(localPods, kubeMeta.Services) + kubeMeta.Pods = pods + kubeMeta.Containers = containers + } + + return kubeMeta +} + +// GetLocalPodList returns the list of pods running on the node where this pod is running +func (ku *KubeUtil) GetLocalPodList() ([]*Pod, error) { + data, err := performKubeletQuery(fmt.Sprintf("%s/pods", ku.kubeletAPIURL)) + if err != nil { + return nil, fmt.Errorf("Error performing kubelet query: %s", err) + } + + var v PodList + if err := json.Unmarshal(data, &v); err != nil { + return nil, fmt.Errorf("Error unmarshalling json: %s", err) + } + + return v.Items, nil +} + +// parseLocalPods will parse pods returned from a local kubelet query. Note that much of this +// is duplication of logic in the datadog-agent Kubernetes metadata provider but with varying +// types. We may want to consolidate at some point. +func parseLocalPods( + localPods []*Pod, + services []*agentpayload.KubeMetadataPayload_Service, +) ([]*agentpayload.KubeMetadataPayload_Pod, []*agentpayload.KubeMetadataPayload_Container) { + pods := make([]*agentpayload.KubeMetadataPayload_Pod, 0, len(localPods)) + containers := make([]*agentpayload.KubeMetadataPayload_Container, 0) + for _, p := range localPods { + cids := make([]string, 0, len(p.Status.ContainerStatuses)) + for _, c := range p.Status.ContainerStatuses { + containers = append(containers, &agentpayload.KubeMetadataPayload_Container{ + Name: c.Name, + Id: c.ContainerID, + Image: c.Image, + ImageId: c.ImageID, + }) + cids = append(cids, c.ContainerID) + } + + pod := &agentpayload.KubeMetadataPayload_Pod{ + Uid: p.Metadata.Uid, + Name: p.Metadata.Name, + Namespace: p.Metadata.Namespace, + HostIp: p.Status.HostIP, + PodIp: p.Status.PodIP, + Labels: p.Metadata.Labels, + ServiceUids: findPodServices(p.Metadata.Namespace, p.Metadata.Labels, services), + ContainerIds: cids, + } + setPodCreator(pod, p.Metadata.OwnerReferences) + pods = append(pods, pod) + } + return pods, containers +} + +func findPodServices( + namespace string, + labels map[string]string, + services []*agentpayload.KubeMetadataPayload_Service, +) []string { + uids := make([]string, 0) + for _, s := range services { + if s.Namespace != namespace { + continue + } + match := true + for k, search := range s.Selector { + if v, ok := labels[k]; !ok || v != search { + match = false + break + } + } + if match { + uids = append(uids, s.Uid) + } + } + return uids +} + +func setPodCreator(pod *agentpayload.KubeMetadataPayload_Pod, ownerRefs []*OwnerReference) { + for _, o := range ownerRefs { + switch o.Kind { + case kindDaemonSet: + pod.DaemonSet = o.Name + case kindReplicaSet: + pod.ReplicaSet = o.Name + case kindReplicationController: + pod.ReplicationController = o.Name + case kindJob: + pod.Job = o.Name + } + } +} + +// Try and find the hostname to query the kubelet +func locateKubelet(cfg *config.AgentConfig) (string, error) { + hostname := cfg.KubernetesKubeletHost + var err error + if hostname == "" { + hostname, err = docker.GetHostname() + if err != nil { + return "", fmt.Errorf("Unable to get hostname from docker: %s", err) + } + } + + port := cfg.KubernetesHTTPKubeletPort + url := fmt.Sprintf("http://%s:%d", hostname, port) + if _, err := performKubeletQuery(url); err == nil { + return url, nil + } + log.Debugf("Couldn't query kubelet over HTTP, assuming it's not in no_auth mode.") + + port = cfg.KubernetesHTTPSKubeletPort + url = fmt.Sprintf("https://%s:%d", hostname, port) + if _, err := performKubeletQuery(url); err == nil { + return url, nil + } + + return "", fmt.Errorf("Could not find a method to connect to kubelet") +} + +// performKubeletQuery performs a GET query against kubelet and return the response body +func performKubeletQuery(url string) ([]byte, error) { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, fmt.Errorf("Could not create request: %s", err) + } + + if strings.HasPrefix(url, "https") { + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", getAuthToken())) + } + + res, err := http.Get(url) + if err != nil { + return nil, fmt.Errorf("Error executing request to %s: %s", url, err) + } + defer res.Body.Close() + + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("Error reading response from %s: %s", url, err) + } + return body, nil +} + +// Read the kubelet token +func getAuthToken() string { + token, err := ioutil.ReadFile(authTokenPath) + if err != nil { + log.Errorf("Could not read token from %s: %s", authTokenPath, err) + return "" + } + return string(token) +}