Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func main() {
c := xfer.NewCollector(*batch)
defer c.Stop()

r := NewResolver(probes, c.AddAddress)
r := NewResolver(probes, c.Add)
defer r.Stop()

lifo := NewReportLIFO(c, *window)
Expand Down
91 changes: 49 additions & 42 deletions xfer/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,62 @@ const (

var (
// MaxBackoff is the maximum time between connect retries.
MaxBackoff = 2 * time.Minute // externally configurable.
// It's exported so it's externally configurable.
MaxBackoff = 2 * time.Minute

// This is extracted out for mocking.
tick = time.Tick

This comment was marked as abuse.

This comment was marked as abuse.

This comment was marked as abuse.

This comment was marked as abuse.

)

// Collector connects to probes over TCP and merges reports published by those
// Collector describes anything that can have addresses added and removed, and
// which produces reports that represent aggregate reports from all collected
// addresses.
type Collector interface {
Add(string)
Remove(string)
Reports() <-chan report.Report
Stop()
}

// realCollector connects to probes over TCP and merges reports published by those
// probes into a single one.
type Collector struct {
type realCollector struct {
in chan report.Report
out chan report.Report
peekc chan chan report.Report
add chan string
remove chan string
quit chan chan struct{}
quit chan struct{}
}

// NewCollector starts the report collector.
func NewCollector(batchTime time.Duration) *Collector {
c := &Collector{
// NewCollector produces and returns a report collector.
func NewCollector(batchTime time.Duration) Collector {
c := &realCollector{
in: make(chan report.Report),
out: make(chan report.Report),
peekc: make(chan chan report.Report),
add: make(chan string),
remove: make(chan string),
quit: make(chan chan struct{}),
quit: make(chan struct{}),
}

go c.loop(batchTime)

return c
}

func (c *Collector) loop(batchTime time.Duration) {
func (c *realCollector) loop(batchTime time.Duration) {
var (
tick = time.Tick(batchTime)
tick = tick(batchTime)
current = report.NewReport()
addrs = map[string]chan struct{}{}
wg = &sync.WaitGroup{} // individual collector goroutines
wg = &sync.WaitGroup{} // per-address goroutines
)

add := func(ip string) {
if _, ok := addrs[ip]; ok {
return
}

addrs[ip] = make(chan struct{})

wg.Add(1)

go func(quit chan struct{}) {
defer wg.Done()
reportCollector(ip, c.in, quit)
Expand All @@ -73,7 +84,6 @@ func (c *Collector) loop(batchTime time.Duration) {
if !ok {
return // hmm
}

close(q)
delete(addrs, ip)
}
Expand All @@ -84,6 +94,9 @@ func (c *Collector) loop(batchTime time.Duration) {
c.out <- current
current = report.NewReport()

case pc := <-c.peekc:
pc <- current

case r := <-c.in:
current.Merge(r)

Expand All @@ -93,47 +106,41 @@ func (c *Collector) loop(batchTime time.Duration) {
case ip := <-c.remove:
remove(ip)

case q := <-c.quit:
case <-c.quit:
for _, q := range addrs {
close(q)
}
wg.Wait()
close(q)
return
}
}
}

// Stop shuts down a collector and all connections to probes.
func (c *Collector) Stop() {
q := make(chan struct{})
c.quit <- q
<-q
// Add adds an address to be collected from.
func (c *realCollector) Add(addr string) {
c.add <- addr
}

// AddAddress adds the passed IP to the collector, and starts (trying to)
// collect reports from the remote Publisher.
func (c *Collector) AddAddress(ip string) {
c.add <- ip
// Remove removes a previously-added address.
func (c *realCollector) Remove(addr string) {
c.remove <- addr
}

// AddAddresses adds the passed IPs to the collector, and starts (trying to)
// collect reports from the remote Publisher.
func (c *Collector) AddAddresses(ips []string) {
for _, addr := range ips {
c.AddAddress(addr)
}
// Reports returns the report chan. It must be consumed by the client, or the
// collector will break.
func (c *realCollector) Reports() <-chan report.Report {
return c.out
}

// RemoveAddress removes the passed IP from the collector, and stops
// collecting reports from the remote Publisher.
func (c *Collector) RemoveAddress(ip string) {
c.remove <- ip
func (c *realCollector) peek() report.Report {
pc := make(chan report.Report)
c.peekc <- pc
return <-pc
}

// Reports returns the channel where aggregate reports are sent.
func (c *Collector) Reports() <-chan report.Report {
return c.out
// Stop terminates the collector.
func (c *realCollector) Stop() {
close(c.quit)
}

// reportCollector is the loop to connect to a single Probe. It'll keep
Expand Down Expand Up @@ -188,7 +195,7 @@ func reportCollector(ip string, col chan<- report.Report, quit <-chan struct{})
log.Printf("decode error: %v", err)
break
}
//log.Printf("collector: got a report from %v", ip)
log.Printf("collector: got a report from %v", ip)

select {
case col <- report:
Expand Down
105 changes: 69 additions & 36 deletions xfer/collector_test.go
Original file line number Diff line number Diff line change
@@ -1,69 +1,102 @@
package xfer_test
package xfer

import (
"bytes"
"encoding/gob"
"io/ioutil"
"log"
"net"
"runtime"
"testing"
"time"

"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/xfer"
)

func TestCollector(t *testing.T) {
log.SetOutput(ioutil.Discard)

// Build the address
port := ":12345"
addr, err := net.ResolveTCPAddr("tcp4", "127.0.0.1"+port)
// Swap out ticker
publish := make(chan time.Time)
oldTick := tick
tick = func(time.Duration) <-chan time.Time { return publish }
defer func() { tick = oldTick }()

// Build a collector
collector := NewCollector(time.Second)
defer collector.Stop()

concreteCollector, ok := collector.(*realCollector)
if !ok {
t.Fatal("type assertion failure")
}

// Build a test publisher
reports := make(chan interface{})
ln := testPublisher(t, reports)
defer ln.Close()

// Connect the collector to the test publisher
addr := ln.Addr().String()
collector.Add(addr)
collector.Add(addr) // test duplicate case
runtime.Gosched() // make sure it connects

// Push a report through everything
reports <- report.Report{Network: report.Topology{NodeMetadatas: report.NodeMetadatas{"a": report.NodeMetadata{}}}}
poll(t, time.Millisecond, func() bool { return len(concreteCollector.peek().Network.NodeMetadatas) == 1 }, "missed the report")
go func() { publish <- time.Now() }()
if want, have := 1, len((<-collector.Reports()).Network.NodeMetadatas); want != have {
t.Errorf("want %d, have %d", want, have)
}

collector.Remove(addr)
collector.Remove(addr) // test duplicate case
}

func TestCollectorQuitWithActiveConnections(t *testing.T) {
c := NewCollector(time.Second)
c.Add("1.2.3.4:56789")
c.Stop()
}

func testPublisher(t *testing.T, input <-chan interface{}) net.Listener {
addr, err := net.ResolveTCPAddr("tcp4", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}

// Start a raw publisher
ln, err := net.ListenTCP("tcp4", addr)
if err != nil {
t.Fatal(err)
}
defer ln.Close()

// Accept one connection, write one report
data := make(chan []byte)
go func() {
conn, err := ln.Accept()
if err != nil {
t.Error(err)
t.Log(err)
return
}
defer conn.Close()

if _, err := conn.Write(<-data); err != nil {
t.Error(err)
return
for {
enc := gob.NewEncoder(conn)
for v := range input {
if err := enc.Encode(v); err != nil {
t.Error(err)
return
}
}
}
}()
return ln
}

// Start a collector
batchTime := 10 * time.Millisecond
c := xfer.NewCollector(batchTime)
c.AddAddress("127.0.0.1" + port)
gate := make(chan struct{})
go func() { <-c.Reports(); c.Stop(); close(gate) }()

// Publish a message
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(report.Report{}); err != nil {
t.Fatal(err)
}
data <- buf.Bytes()

// Check it was collected and forwarded
select {
case <-gate:
case <-time.After(2 * batchTime):
t.Errorf("timeout waiting for report")
func poll(t *testing.T, d time.Duration, condition func() bool, msg string) {
deadline := time.Now().Add(d)
for {
if time.Now().After(deadline) {
t.Fatal(msg)
}
if condition() {
return
}
time.Sleep(d / 10)
}
}
4 changes: 2 additions & 2 deletions xfer/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func TestMerge(t *testing.T) {

batchTime := 100 * time.Millisecond
c := xfer.NewCollector(batchTime)
c.AddAddress(p1Addr)
c.AddAddress(p2Addr)
c.Add(p1Addr)
c.Add(p2Addr)
defer c.Stop()
time.Sleep(batchTime / 10) // connect

Expand Down