Skip to content
This repository was archived by the owner on Jul 27, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewCollector(cfg *config.AgentConfig) Collector {

// Each check should handle a empty state initialization.
checks: collectorChecks{
process: &checks.ProcessCheck{},
process: checks.NewProcessCheck(cfg),
realTime: &checks.RealTimeCheck{},
connections: &checks.ConnectionsCheck{},
},
Expand Down
3 changes: 2 additions & 1 deletion agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ func handleSignals(exit chan bool) {
func debugCheckResults(cfg *config.AgentConfig, check string) error {
switch check {
case "process":
return printResults(cfg, &checks.ProcessCheck{}, check)
p := checks.NewProcessCheck(cfg)
return printResults(cfg, p, check)
case "connections":
return printResults(cfg, &checks.ConnectionsCheck{}, check)
case "realtime":
Expand Down
45 changes: 0 additions & 45 deletions checks/containers.go

This file was deleted.

39 changes: 30 additions & 9 deletions checks/process.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,42 @@
package checks

import (
"os"
"os/user"
"runtime"
"strconv"
"time"

agentpayload "github.com/DataDog/agent-payload/gogen"
"github.com/DataDog/gopsutil/cpu"
"github.com/DataDog/gopsutil/process"
log "github.com/cihub/seelog"

"github.com/DataDog/datadog-process-agent/config"
"github.com/DataDog/datadog-process-agent/model"
"github.com/DataDog/datadog-process-agent/util/docker"
"github.com/DataDog/datadog-process-agent/util/kubernetes"
)

var lastDockerErr string

type ProcessCheck struct {
lastCPUTime cpu.TimesStat
lastProcs map[int32]*process.FilledProcess
kubeUtil *kubernetes.KubeUtil
}

func NewProcessCheck(cfg *config.AgentConfig) *ProcessCheck {
var err error
var kubeUtil *kubernetes.KubeUtil
if os.Getenv("KUBERNETES_SERVICE_HOST") != "" && cfg.CollectKubernetesMetadata {
kubeUtil, err = kubernetes.NewKubeUtil(cfg)
if err != nil {
log.Errorf("error initializing kubernetes check, metadata won't be collected: %s", err)
}
}

return &ProcessCheck{kubeUtil: kubeUtil}
}

func (p *ProcessCheck) Run(cfg *config.AgentConfig, groupID int32) ([]model.MessageBody, error) {
Expand All @@ -32,14 +49,14 @@ func (p *ProcessCheck) Run(cfg *config.AgentConfig, groupID int32) ([]model.Mess
if err != nil {
return nil, err
}

// End check early if this is our first run.
if p.lastProcs == nil {
p.lastProcs = fps
p.lastCPUTime = cpuTimes[0]
return nil, nil
}

// Pull in container metadata, where available.
pids := make([]int32, 0, len(fps))
for _, fp := range fps {
pids = append(pids, fp.Pid)
Expand All @@ -51,13 +68,17 @@ func (p *ProcessCheck) Run(cfg *config.AgentConfig, groupID int32) ([]model.Mess
log.Warnf("unable to get docker stats: %s", err)
lastDockerErr = err.Error()
}
var kubeMeta *agentpayload.KubeMetadataPayload
if p.kubeUtil != nil {
kubeMeta = p.kubeUtil.GetKubernetesMeta(cfg)
}

info, err := collectSystemInfo(cfg)
if err != nil {
return nil, err
}

// Pre-filter the list to get an accurate grou psize.
// Pre-filter the list to get an accurate group size.
filteredFps := make([]*process.FilledProcess, 0, len(fps))
for _, fp := range fps {
if !p.skipProcess(cfg, fp) {
Expand All @@ -73,14 +94,14 @@ func (p *ProcessCheck) Run(cfg *config.AgentConfig, groupID int32) ([]model.Mess
procs := make([]*model.Process, 0, cfg.ProcLimit)
for _, fp := range filteredFps {
container, _ := containerByPID[fp.Pid]

if len(procs) >= cfg.ProcLimit {
messages = append(messages, &model.CollectorProc{
HostName: cfg.HostName,
Processes: procs,
Info: info,
GroupId: groupID,
GroupSize: int32(groupSize),
HostName: cfg.HostName,
Processes: procs,
Info: info,
GroupId: groupID,
GroupSize: int32(groupSize),
Kubernetes: kubeMeta,
})
procs = make([]*model.Process, 0, cfg.ProcLimit)
}
Expand All @@ -105,7 +126,7 @@ func (p *ProcessCheck) Run(cfg *config.AgentConfig, groupID int32) ([]model.Mess
GroupSize: int32(groupSize),
// FIXME: We should not send this in every payload. Long-term the container
// ID should be enough context to resolve this metadata on the backend.
Kubernetes: GetKubernetesMeta(),
Kubernetes: kubeMeta,
})

// Store the last state for comparison on the next run.
Expand Down
25 changes: 25 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ type AgentConfig struct {
Proxy *url.URL
Timers *CheckTimers
Logger *LoggerConfig

// Kubernetes
CollectKubernetesMetadata bool
KubernetesKubeletHost string
KubernetesHTTPKubeletPort int
KubernetesHTTPSKubeletPort int
}

const (
Expand Down Expand Up @@ -67,6 +73,11 @@ func NewDefaultAgentConfig() *AgentConfig {
Connections: time.NewTicker(3 * 60 * time.Minute),
RealTime: time.NewTicker(2 * time.Second),
},

// Kubernetes
CollectKubernetesMetadata: true,
KubernetesHTTPKubeletPort: 10255,
KubernetesHTTPSKubeletPort: 10250,
}

return ac
Expand Down Expand Up @@ -225,6 +236,20 @@ func mergeEnv(c *AgentConfig) *AgentConfig {
}
}

// Kubernetes config is set via environment only (for now).
if v := os.Getenv("DD_COLLECT_KUBERNETES_METADATA"); v == "false" {
c.CollectKubernetesMetadata = false
}
if v := os.Getenv("DD_KUBERNETES_KUBELET_HOST"); v != "" {
c.KubernetesKubeletHost = v
}
if v := os.Getenv("DD_KUBERNETES_KUBELET_HTTP_PORT"); v != "" {
c.KubernetesHTTPKubeletPort, _ = strconv.Atoi(v)
}
if v := os.Getenv("DD_KUBERNETES_KUBELET_HTTPS_PORT"); v == "false" {
c.KubernetesHTTPSKubeletPort, _ = strconv.Atoi(v)
}

return c
}

Expand Down
20 changes: 20 additions & 0 deletions util/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package docker
import (
"context"
"errors"
"fmt"
"os"

"github.com/docker/docker/api/types"
Expand Down Expand Up @@ -119,3 +120,22 @@ func ContainersByPID(pids []int32) (map[int32]*Container, error) {
}
return containerMap, nil
}

func GetHostname() (string, error) {
if os.Getenv("DOCKER_API_VERSION") == "" {
version, err := detectServerAPIVersion()
if err != nil {
return "", err
}
os.Setenv("DOCKER_API_VERSION", version)
}
client, err := client.NewEnvClient()
if err != nil {
return "", err
}
info, err := client.Info(context.Background())
if err != nil {
return "", fmt.Errorf("unable to get Docker info: %s", err)
}
return info.Name, nil
}
Loading