diff --git a/Makefile b/Makefile index c313d5dba1..4b4947241d 100644 --- a/Makefile +++ b/Makefile @@ -23,7 +23,7 @@ $(SCOPE_EXPORT): $(APP_EXE) $(PROBE_EXE) docker/* $(APP_EXE): app/*.go report/*.go xfer/*.go -$(PROBE_EXE): probe/*.go report/*.go xfer/*.go +$(PROBE_EXE): probe/*.go probe/tag/*.go report/*.go xfer/*.go $(APP_EXE) $(PROBE_EXE): go get -tags netgo ./$(@D) diff --git a/app/api_topology.go b/app/api_topology.go index 3fb1d1a38a..0f8af047bf 100644 --- a/app/api_topology.go +++ b/app/api_topology.go @@ -30,17 +30,6 @@ type APIEdge struct { Metadata report.AggregateMetadata `json:"metadata"` } -// topologySelecter selects a single topology from a report. -type topologySelecter func(r report.Report) report.Topology - -func selectProcess(r report.Report) report.Topology { - return r.Process -} - -func selectNetwork(r report.Report) report.Topology { - return r.Network -} - // Full topology. func handleTopology(rep Reporter, t topologyView, w http.ResponseWriter, r *http.Request) { respondWith(w, http.StatusOK, APITopology{ diff --git a/app/origin_node.go b/app/origin_node.go index 0cd96fcf79..fa6b7e432e 100644 --- a/app/origin_node.go +++ b/app/origin_node.go @@ -34,8 +34,8 @@ func originNodeForProcess(node report.NodeMetadata) OriginNode { {Key: "Process name", ValueMajor: node["name"], ValueMinor: ""}, } for _, tuple := range []struct{ key, human string }{ - {"docker_id", "Container ID"}, - {"docker_name", "Container name"}, + {"docker_container_id", "Container ID"}, + {"docker_container_name", "Container name"}, {"docker_image_id", "Container image ID"}, {"docker_image_name", "Container image name"}, {"cgroup", "cgroup"}, diff --git a/app/router.go b/app/router.go index 931f7c2bc9..d6a9dca9a6 100644 --- a/app/router.go +++ b/app/router.go @@ -47,16 +47,16 @@ func apiHandler(w http.ResponseWriter, r *http.Request) { type topologyView struct { human string - selector topologySelecter + selector report.TopologySelector mapper report.MapFunc pseudo report.PseudoFunc groupedTopology string } var topologyRegistry = map[string]topologyView{ - "applications": {"Applications", selectProcess, report.ProcessPID, report.GenericPseudoNode, "applications-grouped"}, - "applications-grouped": {"Applications", selectProcess, report.ProcessName, report.GenericGroupedPseudoNode, ""}, - "containers": {"Containers", selectProcess, report.ProcessContainer, report.InternetOnlyPseudoNode, "containers-grouped"}, - "containers-grouped": {"Containers", selectProcess, report.ProcessContainerImage, report.InternetOnlyPseudoNode, ""}, - "hosts": {"Hosts", selectNetwork, report.NetworkHostname, report.GenericPseudoNode, ""}, + "applications": {"Applications", report.SelectProcess, report.ProcessPID, report.GenericPseudoNode, "applications-grouped"}, + "applications-grouped": {"Applications", report.SelectProcess, report.ProcessName, report.GenericGroupedPseudoNode, ""}, + "containers": {"Containers", report.SelectProcess, report.ProcessContainer, report.InternetOnlyPseudoNode, "containers-grouped"}, + "containers-grouped": {"Containers", report.SelectProcess, report.ProcessContainerImage, report.InternetOnlyPseudoNode, ""}, + "hosts": {"Hosts", report.SelectNetwork, report.NetworkHostname, report.GenericPseudoNode, ""}, } diff --git a/probe/docker_process_mapper_test.go b/probe/docker_process_mapper_test.go deleted file mode 100644 index aaba6f8d61..0000000000 --- a/probe/docker_process_mapper_test.go +++ /dev/null @@ -1,100 +0,0 @@ -package main - -import ( - "runtime" - "testing" - "time" - - docker "github.com/fsouza/go-dockerclient" -) - -type mockDockerClient struct { - apiContainers []docker.APIContainers - containers map[string]*docker.Container - apiImages []docker.APIImages -} - -func (m mockDockerClient) ListContainers(options docker.ListContainersOptions) ([]docker.APIContainers, error) { - return m.apiContainers, nil -} - -func (m mockDockerClient) InspectContainer(id string) (*docker.Container, error) { - return m.containers[id], nil -} - -func (m mockDockerClient) ListImages(options docker.ListImagesOptions) ([]docker.APIImages, error) { - return m.apiImages, nil -} - -func (m mockDockerClient) AddEventListener(events chan<- *docker.APIEvents) error { - return nil -} - -func (m mockDockerClient) RemoveEventListener(events chan *docker.APIEvents) error { - return nil -} - -func TestDockerProcessMapper(t *testing.T) { - oldPIDTreeStub, oldDockerClientStub := newPIDTreeStub, newDockerClient - defer func() { - newPIDTreeStub = oldPIDTreeStub - newDockerClient = oldDockerClientStub - }() - - newPIDTreeStub = func(procRoot string) (*pidTree, error) { - pid1 := &process{pid: 1} - pid2 := &process{pid: 2, ppid: 1, parent: pid1} - pid1.children = []*process{pid2} - - return &pidTree{ - processes: map[int]*process{ - 1: pid1, 2: pid2, - }, - }, nil - } - - newDockerClient = func(endpoint string) (dockerClient, error) { - return mockDockerClient{ - apiContainers: []docker.APIContainers{{ID: "foo"}}, - containers: map[string]*docker.Container{ - "foo": { - ID: "foo", - Name: "bar", - Image: "baz", - State: docker.State{Pid: 1, Running: true}, - }, - }, - apiImages: []docker.APIImages{{ID: "baz", RepoTags: []string{"tag"}}}, - }, nil - } - - dockerMapper, _ := newDockerMapper("/proc", 10*time.Second) - dockerIDMapper := dockerMapper.idMapper() - dockerNameMapper := dockerMapper.nameMapper() - dockerImageIDMapper := dockerMapper.imageIDMapper() - dockerImageNameMapper := dockerMapper.imageNameMapper() - - runtime.Gosched() - - for pid, want := range map[uint]struct{ id, name, imageID, imageName string }{ - 1: {"foo", "bar", "baz", "tag"}, - 2: {"foo", "bar", "baz", "tag"}, - } { - haveID, err := dockerIDMapper.Map(pid) - if err != nil || want.id != haveID { - t.Errorf("%d: want %q, have %q (%v)", pid, want.id, haveID, err) - } - haveName, err := dockerNameMapper.Map(pid) - if err != nil || want.name != haveName { - t.Errorf("%d: want %q, have %q (%v)", pid, want.name, haveName, err) - } - haveImageID, err := dockerImageIDMapper.Map(pid) - if err != nil || want.imageID != haveImageID { - t.Errorf("%d: want %q, have %q (%v)", pid, want.imageID, haveImageID, err) - } - haveImageName, err := dockerImageNameMapper.Map(pid) - if err != nil || want.imageName != haveImageName { - t.Errorf("%d: want %q, have %q (%v)", pid, want.imageName, haveImageName, err) - } - } -} diff --git a/probe/main.go b/probe/main.go index de49fd130a..58b8afc3f9 100644 --- a/probe/main.go +++ b/probe/main.go @@ -5,7 +5,6 @@ import ( "log" "net" "net/http" - _ "net/http/pprof" "os" "os/signal" "runtime" @@ -14,12 +13,12 @@ import ( "time" "github.com/weaveworks/procspy" + "github.com/weaveworks/scope/probe/tag" "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/xfer" ) -// Set during buildtime. -var version = "unknown" +var version = "dev" // set at build time func main() { var ( @@ -29,9 +28,7 @@ func main() { listen = flag.String("listen", ":"+strconv.Itoa(xfer.ProbePort), "listen address") prometheusEndpoint = flag.String("prometheus.endpoint", "/metrics", "Prometheus metrics exposition endpoint (requires -http.listen)") spyProcs = flag.Bool("processes", true, "report processes (needs root)") - cgroupsRoot = flag.String("cgroups.root", "", "if provided, enrich -processes with cgroup names from this root (e.g. /mnt/cgroups)") - cgroupsInterval = flag.Duration("cgroups.interval", 10*time.Second, "how often to update cgroup names") - dockerMapper = flag.Bool("docker", true, "collect Docker-related attributes for processes") + dockerTagger = flag.Bool("docker", true, "collect Docker-related attributes for processes") dockerInterval = flag.Duration("docker.interval", 10*time.Second, "how often to update Docker attributes") procRoot = flag.String("proc.root", "/proc", "location of the proc filesystem") ) @@ -42,7 +39,7 @@ func main() { os.Exit(1) } - log.Printf("probe starting, version %s", version) + log.Printf("probe version %s", version) procspy.SetProcRoot(*procRoot) @@ -66,32 +63,14 @@ func main() { } defer publisher.Close() - pms := []processMapper{identityMapper{}} - - if *cgroupsRoot != "" { - if fi, err := os.Stat(*cgroupsRoot); err == nil && fi.IsDir() { - log.Printf("enriching -processes with cgroup names from %s", *cgroupsRoot) - cgroupMapper := newCgroupMapper(*cgroupsRoot, *cgroupsInterval) - defer cgroupMapper.Stop() - pms = append(pms, cgroupMapper) - } else { - log.Printf("-cgroups.root=%s: %v", *cgroupsRoot, err) - } - } - - if *dockerMapper && runtime.GOOS == "Linux" { - docker, err := newDockerMapper(*procRoot, *dockerInterval) + taggers := []tag.Tagger{tag.NewTopologyTagger()} + if *dockerTagger { + t, err := tag.NewDockerTagger(*procRoot, *dockerInterval) if err != nil { - log.Fatal(err) + log.Fatalf("failed to start docker tagger: %v", err) } - defer docker.Stop() - - pms = append(pms, - docker.idMapper(), - docker.nameMapper(), - docker.imageIDMapper(), - docker.imageNameMapper(), - ) + defer t.Stop() + taggers = append(taggers, t) } log.Printf("listening on %s", *listen) @@ -116,7 +95,8 @@ func main() { r = report.MakeReport() case <-spyTick: - r.Merge(spy(hostname, hostname, *spyProcs, pms)) + r.Merge(spy(hostname, hostname, *spyProcs)) + r = tag.Apply(r, taggers) // log.Printf("merged report:\n%#v\n", r) case <-quit: diff --git a/probe/process_mapper.go b/probe/process_mapper.go deleted file mode 100644 index cfb59307d7..0000000000 --- a/probe/process_mapper.go +++ /dev/null @@ -1,143 +0,0 @@ -package main - -import ( - "bufio" - "fmt" - "log" - "os" - "path/filepath" - "regexp" - "strconv" - "strings" - "sync" - "time" -) - -type processMapper interface { - Key() string - Map(pid uint) (string, error) -} - -type identityMapper struct{} - -func (m identityMapper) Key() string { return "identity" } -func (m identityMapper) Map(pid uint) (string, error) { return strconv.FormatUint(uint64(pid), 10), nil } - -// cgroupMapper is a cgroup task mapper. -type cgroupMapper struct { - sync.RWMutex - root string - d map[uint]string - quit chan struct{} -} - -func newCgroupMapper(root string, interval time.Duration) *cgroupMapper { - m := cgroupMapper{ - root: root, - d: map[uint]string{}, - quit: make(chan struct{}), - } - m.update() - go m.loop(interval) - return &m -} - -func (m *cgroupMapper) Stop() { - close(m.quit) -} - -func (m *cgroupMapper) Key() string { return "cgroup" } - -// Map uses the cache to find the process name for pid. It is safe for -// concurrent use. -func (m *cgroupMapper) Map(pid uint) (string, error) { - m.RLock() - p, ok := m.d[pid] - m.RUnlock() - - if !ok { - return "", fmt.Errorf("no cgroup for PID %d", pid) - } - - return p, nil -} - -func (m *cgroupMapper) loop(d time.Duration) { - ticker := time.Tick(d) - for { - select { - case <-ticker: - m.update() - case <-m.quit: - return - } - } -} - -func (m *cgroupMapper) update() { - // We want to read "//tasks" files. - fh, err := os.Open(m.root) - if err != nil { - log.Printf("cgroup mapper: %s", err) - return - } - - dirNames, err := fh.Readdirnames(-1) - fh.Close() - if err != nil { - log.Printf("cgroup mapper: %s", err) - return - } - - pmap := map[uint]string{} - for _, d := range dirNames { - cg := normalizeCgroup(d) - dirFilename := filepath.Join(m.root, d) - - s, err := os.Stat(dirFilename) - if err != nil || !s.IsDir() { - continue - } - - taskFilename := filepath.Join(dirFilename, "tasks") - - f, err := os.Open(taskFilename) - if err != nil { - continue - } - - r := bufio.NewReader(f) - for { - line, _, err := r.ReadLine() - if err != nil { - break // we expect an EOF - } - - pid, err := strconv.ParseUint(string(line), 10, 64) - if err != nil { - log.Printf("continue mapper: %s", err) - continue - } - - pmap[uint(pid)] = cg - } - - f.Close() - } - - m.Lock() - m.d = pmap - m.Unlock() -} - -var lxcRe = regexp.MustCompile(`^([^-]+)-([^-]+)-([A-Fa-f0-9]+)-([0-9]+)$`) - -func normalizeCgroup(s string) string { - // Format is currently "primarykey-secondarykey-revision-instance". We - // want to collapse all instances (and maybe all revisions, in the future) - // to the same node. So we remove the instance. - if m := lxcRe.FindStringSubmatch(s); len(m) > 0 { - return strings.Join([]string{m[1], m[2], m[3]}, "-") - } - return s -} diff --git a/probe/process_mapper_test.go b/probe/process_mapper_test.go deleted file mode 100644 index 99421a9fc5..0000000000 --- a/probe/process_mapper_test.go +++ /dev/null @@ -1,63 +0,0 @@ -package main - -import ( - "io/ioutil" - "os" - "path" - "path/filepath" - "testing" - "time" -) - -func TestCgroupMapper(t *testing.T) { - tmp := setupTmpFS(t, map[string]string{ - "/systemd/tasks": "1\n2\n4911\n1000\n25156\n", - "/systemd/notify_on_release": "0\n", - "/netscape/tasks": "666\n4242\n", - "/netscape/notify_on_release": "0\n", - "/weirdfile": "", - }) - defer removeAll(t, tmp) - - m := newCgroupMapper(tmp, 1*time.Second) - for pid, want := range map[uint]string{ - 111: "", - 999: "", - 4911: "systemd", - 1: "systemd", // first one in the file - 25156: "systemd", // last one in the tasks file - 4242: "netscape", - } { - if have, _ := m.Map(pid); want != have { - t.Errorf("%d: want %q, have %q", pid, want, have) - } - } -} - -func setupTmpFS(t *testing.T, fs map[string]string) string { - tmp, err := ioutil.TempDir(os.TempDir(), "scope-probe-test-cgroup-mapper") - if err != nil { - t.Fatal(err) - } - //t.Logf("using TempDir %s", tmp) - - for file, content := range fs { - dir := path.Dir(file) - if err := os.MkdirAll(filepath.Join(tmp, dir), 0777); err != nil { - removeAll(t, tmp) - t.Fatalf("MkdirAll: %v", err) - } - - if err := ioutil.WriteFile(filepath.Join(tmp, file), []byte(content), 0655); err != nil { - removeAll(t, tmp) - t.Fatalf("WriteFile: %v", err) - } - } - return tmp -} - -func removeAll(t *testing.T, path string) { - if err := os.RemoveAll(path); err != nil { - t.Error(err) - } -} diff --git a/probe/spy.go b/probe/spy.go index 5d72464210..5d40a2bd48 100644 --- a/probe/spy.go +++ b/probe/spy.go @@ -18,7 +18,6 @@ import ( func spy( hostID, hostName string, includeProcesses bool, - pms []processMapper, ) report.Report { defer func(begin time.Time) { spyDuration.WithLabelValues().Observe(float64(time.Since(begin))) @@ -33,7 +32,7 @@ func spy( } for conn := conns.Next(); conn != nil; conn = conns.Next() { - addConnection(&r, conn, hostID, hostName, pms) + addConnection(&r, conn, hostID, hostName) } return r @@ -43,7 +42,6 @@ func addConnection( r *report.Report, c *procspy.Connection, hostID, hostName string, - pms []processMapper, ) { var ( scopedLocal = scopedIP(hostID, c.LocalAddress) @@ -84,14 +82,6 @@ func addConnection( "domain": hostID, } - for _, pm := range pms { - v, err := pm.Map(c.PID) - if err != nil { - continue - } - md[pm.Key()] = v - } - r.Process.NodeMetadatas[scopedLocal] = md } // Count the TCP connection. diff --git a/probe/spy_test.go b/probe/spy_test.go index a7e80df1c1..04f5319e86 100644 --- a/probe/spy_test.go +++ b/probe/spy_test.go @@ -87,7 +87,7 @@ func TestSpyNetwork(t *testing.T) { nodeName = "frenchs-since-1904" ) - r := spy(nodeID, nodeName, false, []processMapper{}) + r := spy(nodeID, nodeName, false) //buf, _ := json.MarshalIndent(r, "", " ") //t.Logf("\n%s\n", buf) @@ -123,7 +123,7 @@ func TestSpyProcess(t *testing.T) { nodeName = "fishermans-friend" ) - r := spy(nodeID, nodeName, true, []processMapper{}) + r := spy(nodeID, nodeName, true) // buf, _ := json.MarshalIndent(r, "", " ") ; t.Logf("\n%s\n", buf) var ( @@ -150,26 +150,3 @@ func TestSpyProcess(t *testing.T) { } } } - -func TestSpyProcessDataSource(t *testing.T) { - procspy.SetFixtures(fixConnectionsWithProcesses) - - const ( - nodeID = "chianti" - nodeName = "harmonisch" - ) - - m := identityMapper{} - r := spy(nodeID, nodeName, true, []processMapper{m}) - scopedLocal := scopedIPPort(nodeID, fixLocalAddress, fixLocalPort) - - k := m.Key() - v, err := m.Map(fixProcessPID) - if err != nil { - t.Fatal(err) - } - - if want, have := v, r.Process.NodeMetadatas[scopedLocal][k]; want != have { - t.Fatalf("%s: want %q, have %q", k, want, have) - } -} diff --git a/probe/docker_process_mapper.go b/probe/tag/docker_tagger.go similarity index 50% rename from probe/docker_process_mapper.go rename to probe/tag/docker_tagger.go index 6687932323..e4b9b34e97 100644 --- a/probe/docker_process_mapper.go +++ b/probe/tag/docker_tagger.go @@ -1,13 +1,14 @@ -package main +package tag import ( - "fmt" "log" + "strconv" "strings" "sync" "time" docker "github.com/fsouza/go-dockerclient" + "github.com/weaveworks/scope/report" ) const ( @@ -15,7 +16,14 @@ const ( start = "start" ) -type dockerMapper struct { +var ( + newDockerClientStub = newDockerClient + newPIDTreeStub = newPIDTree +) + +// DockerTagger is a tagger that tags Docker container information to process +// nodes that have a PID. +type DockerTagger struct { sync.RWMutex quit chan struct{} interval time.Duration @@ -28,13 +36,14 @@ type dockerMapper struct { pidTree *pidTree } -func newDockerMapper(procRoot string, interval time.Duration) (*dockerMapper, error) { +// NewDockerTagger returns a usable DockerTagger. Don't forget to Stop it. +func NewDockerTagger(procRoot string, interval time.Duration) (*DockerTagger, error) { pidTree, err := newPIDTreeStub(procRoot) if err != nil { return nil, err } - m := dockerMapper{ + t := DockerTagger{ containers: map[string]*docker.Container{}, containersByPID: map[int]*docker.Container{}, images: map[string]*docker.APIImages{}, @@ -46,34 +55,35 @@ func newDockerMapper(procRoot string, interval time.Duration) (*dockerMapper, er quit: make(chan struct{}), } - go m.loop() - return &m, nil + go t.loop() + return &t, nil } -func (m *dockerMapper) Stop() { - close(m.quit) +// Stop stops the Docker tagger's event subscriber. +func (t *DockerTagger) Stop() { + close(t.quit) } -func (m *dockerMapper) loop() { - if !m.update() { +func (t *DockerTagger) loop() { + if !t.update() { return } - ticker := time.Tick(m.interval) + ticker := time.Tick(t.interval) for { select { case <-ticker: - if !m.update() { + if !t.update() { return } - case <-m.quit: + case <-t.quit: return } } } -// for mocking +// Sub-interface for mocking. type dockerClient interface { ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error) InspectContainer(string) (*docker.Container, error) @@ -82,19 +92,13 @@ type dockerClient interface { RemoveEventListener(chan *docker.APIEvents) error } -func newRealDockerClient(endpoint string) (dockerClient, error) { +func newDockerClient(endpoint string) (dockerClient, error) { return docker.NewClient(endpoint) } -var ( - newDockerClient = newRealDockerClient - newPIDTreeStub = newPIDTree -) - -// returns false when stopping. -func (m *dockerMapper) update() bool { +func (t *DockerTagger) update() bool { endpoint := "unix:///var/run/docker.sock" - client, err := newDockerClient(endpoint) + client, err := newDockerClientStub(endpoint) if err != nil { log.Printf("docker mapper: %s", err) return true @@ -111,40 +115,40 @@ func (m *dockerMapper) update() bool { } }() - if err := m.updateContainers(client); err != nil { + if err := t.updateContainers(client); err != nil { log.Printf("docker mapper: %s", err) return true } - if err := m.updateImages(client); err != nil { + if err := t.updateImages(client); err != nil { log.Printf("docker mapper: %s", err) return true } - otherUpdates := time.Tick(m.interval) + otherUpdates := time.Tick(t.interval) for { select { case event := <-events: - m.handleEvent(event, client) + t.handleEvent(event, client) case <-otherUpdates: - if err := m.updatePIDTree(); err != nil { + if err := t.updatePIDTree(); err != nil { log.Printf("docker mapper: %s", err) continue } - if err := m.updateImages(client); err != nil { + if err := t.updateImages(client); err != nil { log.Printf("docker mapper: %s", err) continue } - case <-m.quit: + case <-t.quit: return false } } } -func (m *dockerMapper) updateContainers(client dockerClient) error { +func (t *DockerTagger) updateContainers(client dockerClient) error { apiContainers, err := client.ListContainers(docker.ListContainersOptions{All: true}) if err != nil { return err @@ -165,44 +169,44 @@ func (m *dockerMapper) updateContainers(client dockerClient) error { containers = append(containers, container) } - m.Lock() + t.Lock() for _, container := range containers { - m.containers[container.ID] = container - m.containersByPID[container.State.Pid] = container + t.containers[container.ID] = container + t.containersByPID[container.State.Pid] = container } - m.Unlock() + t.Unlock() return nil } -func (m *dockerMapper) updateImages(client dockerClient) error { +func (t *DockerTagger) updateImages(client dockerClient) error { images, err := client.ListImages(docker.ListImagesOptions{}) if err != nil { return err } - m.Lock() + t.Lock() for i := range images { image := &images[i] - m.images[image.ID] = image + t.images[image.ID] = image } - m.Unlock() + t.Unlock() return nil } -func (m *dockerMapper) handleEvent(event *docker.APIEvents, client dockerClient) { +func (t *DockerTagger) handleEvent(event *docker.APIEvents, client dockerClient) { switch event.Status { case stop: containerID := event.ID - m.Lock() - if container, ok := m.containers[containerID]; ok { - delete(m.containers, containerID) - delete(m.containersByPID, container.State.Pid) + t.Lock() + if container, ok := t.containers[containerID]; ok { + delete(t.containers, containerID) + delete(t.containersByPID, container.State.Pid) } else { log.Printf("docker mapper: container %s not found", containerID) } - m.Unlock() + t.Unlock() case start: containerID := event.ID @@ -217,88 +221,77 @@ func (m *dockerMapper) handleEvent(event *docker.APIEvents, client dockerClient) return } - m.Lock() - m.containers[containerID] = container - m.containersByPID[container.State.Pid] = container - m.Unlock() + t.Lock() + t.containers[containerID] = container + t.containersByPID[container.State.Pid] = container + t.Unlock() } } -func (m *dockerMapper) updatePIDTree() error { - pidTree, err := newPIDTreeStub(m.procRoot) +func (t *DockerTagger) updatePIDTree() error { + pidTree, err := newPIDTreeStub(t.procRoot) if err != nil { return err } - m.Lock() - m.pidTree = pidTree - m.Unlock() + t.Lock() + t.pidTree = pidTree + t.Unlock() return nil } -type dockerProcessMapper struct { - *dockerMapper - key string - f func(*docker.Container) string -} - -func (m *dockerProcessMapper) Key() string { return m.key } -func (m *dockerProcessMapper) Map(pid uint) (string, error) { - var ( - container *docker.Container - ok bool - err error - candidate = int(pid) - ) - - m.RLock() - for { - container, ok = m.containersByPID[candidate] - if ok { - break +// Tag implements Tagger. +func (t *DockerTagger) Tag(r report.Report) report.Report { + for nodeID, nodeMetadata := range r.Process.NodeMetadatas { + pidStr, ok := nodeMetadata["pid"] + if !ok { + //log.Printf("dockerTagger: %q: no process node ID", id) + continue } - candidate, err = m.pidTree.getParent(candidate) + pid, err := strconv.ParseUint(pidStr, 10, 64) if err != nil { - break + //log.Printf("dockerTagger: %q: bad process node PID (%v)", id, err) + continue } - } - m.RUnlock() - - if err != nil { - return "", fmt.Errorf("no container found for PID %d", pid) - } - return m.f(container), nil -} + var ( + container *docker.Container + candidate = int(pid) + ) -func (m *dockerMapper) idMapper() processMapper { - return &dockerProcessMapper{m, "docker_id", func(c *docker.Container) string { - return c.ID - }} -} + t.RLock() + for { + container, ok = t.containersByPID[candidate] + if ok { + break + } + candidate, err = t.pidTree.getParent(candidate) + if err != nil { + break + } + } + t.RUnlock() -func (m *dockerMapper) nameMapper() processMapper { - return &dockerProcessMapper{m, "docker_name", func(c *docker.Container) string { - return strings.TrimPrefix(c.Name, "/") - }} -} + if !ok { + continue + } -func (m *dockerMapper) imageIDMapper() processMapper { - return &dockerProcessMapper{m, "docker_image_id", func(c *docker.Container) string { - return c.Image - }} -} + md := report.NodeMetadata{ + "docker_container_id": container.ID, + "docker_container_name": strings.TrimPrefix(container.Name, "/"), + "docker_image_id": container.Image, + } -func (m *dockerMapper) imageNameMapper() processMapper { - return &dockerProcessMapper{m, "docker_image_name", func(c *docker.Container) string { - m.RLock() - image, ok := m.images[c.Image] - m.RUnlock() + t.RLock() + image, ok := t.images[container.Image] + t.RUnlock() - if !ok || len(image.RepoTags) == 0 { - return "" + if ok && len(image.RepoTags) > 0 { + md["docker_image_name"] = image.RepoTags[0] } - return image.RepoTags[0] - }} + r.Process.NodeMetadatas[nodeID].Merge(md) + } + + return r } diff --git a/probe/tag/docker_tagger_test.go b/probe/tag/docker_tagger_test.go new file mode 100644 index 0000000000..682e78a62d --- /dev/null +++ b/probe/tag/docker_tagger_test.go @@ -0,0 +1,94 @@ +package tag + +import ( + "reflect" + "runtime" + "testing" + "time" + + docker "github.com/fsouza/go-dockerclient" + "github.com/weaveworks/scope/report" +) + +type mockDockerClient struct { + apiContainers []docker.APIContainers + containers map[string]*docker.Container + apiImages []docker.APIImages +} + +func (m mockDockerClient) ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error) { + return m.apiContainers, nil +} + +func (m mockDockerClient) InspectContainer(id string) (*docker.Container, error) { + return m.containers[id], nil +} + +func (m mockDockerClient) ListImages(docker.ListImagesOptions) ([]docker.APIImages, error) { + return m.apiImages, nil +} + +func (m mockDockerClient) AddEventListener(events chan<- *docker.APIEvents) error { + return nil +} + +func (m mockDockerClient) RemoveEventListener(events chan *docker.APIEvents) error { + return nil +} + +func TestDockerTagger(t *testing.T) { + oldPIDTree, oldDockerClient := newPIDTreeStub, newDockerClientStub + defer func() { newPIDTreeStub, newDockerClientStub = oldPIDTree, oldDockerClient }() + + newPIDTreeStub = func(procRoot string) (*pidTree, error) { + pid1 := &process{pid: 1} + pid2 := &process{pid: 2, ppid: 1, parent: pid1} + pid1.children = []*process{pid2} + return &pidTree{ + processes: map[int]*process{ + 1: pid1, 2: pid2, + }, + }, nil + } + + newDockerClientStub = func(endpoint string) (dockerClient, error) { + return mockDockerClient{ + apiContainers: []docker.APIContainers{{ID: "foo"}}, + containers: map[string]*docker.Container{ + "foo": { + ID: "foo", + Name: "bar", + Image: "baz", + State: docker.State{Pid: 1, Running: true}, + }, + }, + apiImages: []docker.APIImages{{ID: "baz", RepoTags: []string{"bang", "not-chosen"}}}, + }, nil + } + + var ( + endpoint1NodeID = "somehost.com;192.168.1.1;12345" + endpoint2NodeID = "somehost.com;192.168.1.1;67890" + processNodeMetadata = report.NodeMetadata{ + "docker_container_id": "foo", + "docker_container_name": "bar", + "docker_image_id": "baz", + "docker_image_name": "bang", + } + ) + + r := report.MakeReport() + r.Process.NodeMetadatas[endpoint1NodeID] = report.NodeMetadata{"pid": "1"} + r.Process.NodeMetadatas[endpoint2NodeID] = report.NodeMetadata{"pid": "2"} + + dockerTagger, _ := NewDockerTagger("/irrelevant", 10*time.Second) + runtime.Gosched() + for _, endpointNodeID := range []string{endpoint1NodeID, endpoint2NodeID} { + want := processNodeMetadata.Copy() + have := dockerTagger.Tag(r).Process.NodeMetadatas[endpointNodeID].Copy() + delete(have, "pid") + if !reflect.DeepEqual(want, have) { + t.Errorf("%q: want %+v, have %+v", endpointNodeID, want, have) + } + } +} diff --git a/probe/pidtree.go b/probe/tag/pidtree.go similarity index 99% rename from probe/pidtree.go rename to probe/tag/pidtree.go index 65d0f2d61e..fe665be9ae 100644 --- a/probe/pidtree.go +++ b/probe/tag/pidtree.go @@ -1,4 +1,4 @@ -package main +package tag import ( "fmt" diff --git a/probe/pidtree_test.go b/probe/tag/pidtree_test.go similarity index 96% rename from probe/pidtree_test.go rename to probe/tag/pidtree_test.go index f83fbd4971..fe4805f07f 100644 --- a/probe/pidtree_test.go +++ b/probe/tag/pidtree_test.go @@ -1,4 +1,4 @@ -package main +package tag import ( "fmt" @@ -48,7 +48,7 @@ func TestPIDTree(t *testing.T) { return []byte(fmt.Sprintf("%d na R %d", pid, parent)), nil } - pidtree, err := newPIDTree("/proc") + pidtree, err := newPIDTreeStub("/proc") if err != nil { t.Fatalf("newPIDTree error: %v", err) } diff --git a/probe/tag/tagger.go b/probe/tag/tagger.go new file mode 100644 index 0000000000..fb1afa4edb --- /dev/null +++ b/probe/tag/tagger.go @@ -0,0 +1,16 @@ +package tag + +import "github.com/weaveworks/scope/report" + +// Tagger tags nodes with value-add node metadata. +type Tagger interface { + Tag(r report.Report) report.Report +} + +// Apply tags the report with all the taggers. +func Apply(r report.Report, taggers []Tagger) report.Report { + for _, tagger := range taggers { + r = tagger.Tag(r) + } + return r +} diff --git a/probe/tag/tagger_test.go b/probe/tag/tagger_test.go new file mode 100644 index 0000000000..6f6caa906d --- /dev/null +++ b/probe/tag/tagger_test.go @@ -0,0 +1,44 @@ +package tag_test + +import ( + "reflect" + "testing" + + "github.com/weaveworks/scope/probe/tag" + "github.com/weaveworks/scope/report" +) + +func TestApply(t *testing.T) { + var ( + processNodeID = "c" + networkNodeID = "d" + processNodeMetadata = report.NodeMetadata{"5": "6"} + networkNodeMetadata = report.NodeMetadata{"7": "8"} + ) + + r := report.MakeReport() + r.Process.NodeMetadatas[processNodeID] = processNodeMetadata + r.Network.NodeMetadatas[networkNodeID] = networkNodeMetadata + r = tag.Apply(r, []tag.Tagger{tag.NewTopologyTagger()}) + + for _, tuple := range []struct { + want report.NodeMetadata + from report.Topology + via string + }{ + {copy(processNodeMetadata).Merge(report.NodeMetadata{"topology": "process"}), r.Process, processNodeID}, + {copy(networkNodeMetadata).Merge(report.NodeMetadata{"topology": "network"}), r.Network, networkNodeID}, + } { + if want, have := tuple.want, tuple.from.NodeMetadatas[tuple.via]; !reflect.DeepEqual(want, have) { + t.Errorf("want %+v, have %+v", want, have) + } + } +} + +func copy(input report.NodeMetadata) report.NodeMetadata { + output := make(report.NodeMetadata, len(input)) + for k, v := range input { + output[k] = v + } + return output +} diff --git a/probe/tag/topology_tagger.go b/probe/tag/topology_tagger.go new file mode 100644 index 0000000000..309fd52837 --- /dev/null +++ b/probe/tag/topology_tagger.go @@ -0,0 +1,25 @@ +package tag + +import ( + "github.com/weaveworks/scope/report" +) + +type topologyTagger struct{} + +// NewTopologyTagger tags each node with the topology that it comes from. +func NewTopologyTagger() Tagger { + return &topologyTagger{} +} + +func (topologyTagger) Tag(r report.Report) report.Report { + for val, topology := range map[string]*report.Topology{ + "process": &(r.Process), + "network": &(r.Network), + } { + md := report.NodeMetadata{"topology": val} + for nodeID := range topology.NodeMetadatas { + (*topology).NodeMetadatas[nodeID].Merge(md) + } + } + return r +} diff --git a/probe/tag/topology_tagger_test.go b/probe/tag/topology_tagger_test.go new file mode 100644 index 0000000000..00c628ace5 --- /dev/null +++ b/probe/tag/topology_tagger_test.go @@ -0,0 +1,19 @@ +package tag_test + +import ( + "reflect" + "testing" + + "github.com/weaveworks/scope/probe/tag" + "github.com/weaveworks/scope/report" +) + +func TestTagMissingID(t *testing.T) { + const nodeID = "not-found" + r := report.MakeReport() + want := report.NodeMetadata{} + have := tag.NewTopologyTagger().Tag(r).Process.NodeMetadatas[nodeID].Copy() + if !reflect.DeepEqual(want, have) { + t.Error("TopologyTagger erroneously tagged a missing node ID") + } +} diff --git a/report/mapping_functions.go b/report/mapping_functions.go index f739edcba3..e04024ebc9 100644 --- a/report/mapping_functions.go +++ b/report/mapping_functions.go @@ -33,6 +33,19 @@ type MapFunc func(string, NodeMetadata) (MappedNode, bool) // node IDs prior to mapping. type PseudoFunc func(srcNodeID string, srcNode RenderableNode, dstNodeID string) (MappedNode, bool) +// TopologySelector selects a single topology from a report. +type TopologySelector func(r Report) Topology + +// SelectProcess selects the process topology. +func SelectProcess(r Report) Topology { + return r.Process +} + +// SelectNetwork selects the network topology. +func SelectNetwork(r Report) Topology { + return r.Network +} + // ProcessPID takes a node NodeMetadata from a Process topology, and returns a // representation with the ID based on the process PID and the labels based // on the process name. @@ -70,10 +83,10 @@ func ProcessName(_ string, m NodeMetadata) (MappedNode, bool) { // are grouped into the Uncontained node. func ProcessContainer(_ string, m NodeMetadata) (MappedNode, bool) { var id, major, minor, rank string - if m["docker_id"] == "" { + if m["docker_container_id"] == "" { id, major, minor, rank = "uncontained", "Uncontained", "", "uncontained" } else { - id, major, minor, rank = m["docker_id"], m["docker_name"], m["domain"], m["docker_image_id"] + id, major, minor, rank = m["docker_container_id"], m["docker_container_name"], m["domain"], m["docker_image_id"] } return MappedNode{ diff --git a/report/mapping_test.go b/report/mapping_test.go index 79a1eabbee..c413fa62fa 100644 --- a/report/mapping_test.go +++ b/report/mapping_test.go @@ -69,13 +69,13 @@ func TestUngroupedMapping(t *testing.T) { f: ProcessContainer, id: "bar-id", meta: NodeMetadata{ - "pid": "42", - "name": "curl", - "domain": "hosta", - "docker_id": "d321fe0", - "docker_name": "walking_sparrow", - "docker_image_id": "1101fff", - "docker_image_name": "org/app:latest", + "pid": "42", + "name": "curl", + "domain": "hosta", + "docker_container_id": "d321fe0", + "docker_container_name": "walking_sparrow", + "docker_image_id": "1101fff", + "docker_image_name": "org/app:latest", }, wantOK: true, wantID: "d321fe0", diff --git a/report/topology.go b/report/topology.go index d9b4541551..0836e02928 100644 --- a/report/topology.go +++ b/report/topology.go @@ -54,6 +54,26 @@ type EdgeMetadata struct { // which should probably change (see comment on type MapFunc). type NodeMetadata map[string]string +// Copy returns a value copy, useful for tests. +func (nm NodeMetadata) Copy() NodeMetadata { + cp := make(NodeMetadata, len(nm)) + for k, v := range nm { + cp[k] = v + } + return cp +} + +// Merge merges two node metadata maps together. In case of conflict, the +// other (right-hand) side wins. Always reassign the result of merge to the +// destination. Merge is defined on the value-type, but node metadata map is +// itself a reference type, so if you want to maintain immutability, use copy. +func (nm NodeMetadata) Merge(other NodeMetadata) NodeMetadata { + for k, v := range other { + nm[k] = v // other takes precedence + } + return nm +} + // NewTopology gives you a Topology. func NewTopology() Topology { return Topology{