diff --git a/app/main.go b/app/main.go index e3c4ca3ab9..49c9454c29 100644 --- a/app/main.go +++ b/app/main.go @@ -67,7 +67,7 @@ func main() { c := xfer.NewCollector(*batch) defer c.Stop() - r := NewResolver(probes, c.AddAddress) + r := NewResolver(probes, c.Add) defer r.Stop() lifo := NewReportLIFO(c, *window) diff --git a/xfer/collector.go b/xfer/collector.go index 8ac6631a9f..a1970ba2c4 100644 --- a/xfer/collector.go +++ b/xfer/collector.go @@ -17,51 +17,62 @@ const ( var ( // MaxBackoff is the maximum time between connect retries. - MaxBackoff = 2 * time.Minute // externally configurable. + // It's exported so it's externally configurable. + MaxBackoff = 2 * time.Minute + + // This is extracted out for mocking. + tick = time.Tick ) -// Collector connects to probes over TCP and merges reports published by those +// Collector describes anything that can have addresses added and removed, and +// which produces reports that represent aggregate reports from all collected +// addresses. +type Collector interface { + Add(string) + Remove(string) + Reports() <-chan report.Report + Stop() +} + +// realCollector connects to probes over TCP and merges reports published by those // probes into a single one. -type Collector struct { +type realCollector struct { in chan report.Report out chan report.Report + peekc chan chan report.Report add chan string remove chan string - quit chan chan struct{} + quit chan struct{} } -// NewCollector starts the report collector. -func NewCollector(batchTime time.Duration) *Collector { - c := &Collector{ +// NewCollector produces and returns a report collector. +func NewCollector(batchTime time.Duration) Collector { + c := &realCollector{ in: make(chan report.Report), out: make(chan report.Report), + peekc: make(chan chan report.Report), add: make(chan string), remove: make(chan string), - quit: make(chan chan struct{}), + quit: make(chan struct{}), } - go c.loop(batchTime) - return c } -func (c *Collector) loop(batchTime time.Duration) { +func (c *realCollector) loop(batchTime time.Duration) { var ( - tick = time.Tick(batchTime) + tick = tick(batchTime) current = report.NewReport() addrs = map[string]chan struct{}{} - wg = &sync.WaitGroup{} // individual collector goroutines + wg = &sync.WaitGroup{} // per-address goroutines ) add := func(ip string) { if _, ok := addrs[ip]; ok { return } - addrs[ip] = make(chan struct{}) - wg.Add(1) - go func(quit chan struct{}) { defer wg.Done() reportCollector(ip, c.in, quit) @@ -73,7 +84,6 @@ func (c *Collector) loop(batchTime time.Duration) { if !ok { return // hmm } - close(q) delete(addrs, ip) } @@ -84,6 +94,9 @@ func (c *Collector) loop(batchTime time.Duration) { c.out <- current current = report.NewReport() + case pc := <-c.peekc: + pc <- current + case r := <-c.in: current.Merge(r) @@ -93,47 +106,41 @@ func (c *Collector) loop(batchTime time.Duration) { case ip := <-c.remove: remove(ip) - case q := <-c.quit: + case <-c.quit: for _, q := range addrs { close(q) } wg.Wait() - close(q) return } } } -// Stop shuts down a collector and all connections to probes. -func (c *Collector) Stop() { - q := make(chan struct{}) - c.quit <- q - <-q +// Add adds an address to be collected from. +func (c *realCollector) Add(addr string) { + c.add <- addr } -// AddAddress adds the passed IP to the collector, and starts (trying to) -// collect reports from the remote Publisher. -func (c *Collector) AddAddress(ip string) { - c.add <- ip +// Remove removes a previously-added address. +func (c *realCollector) Remove(addr string) { + c.remove <- addr } -// AddAddresses adds the passed IPs to the collector, and starts (trying to) -// collect reports from the remote Publisher. -func (c *Collector) AddAddresses(ips []string) { - for _, addr := range ips { - c.AddAddress(addr) - } +// Reports returns the report chan. It must be consumed by the client, or the +// collector will break. +func (c *realCollector) Reports() <-chan report.Report { + return c.out } -// RemoveAddress removes the passed IP from the collector, and stops -// collecting reports from the remote Publisher. -func (c *Collector) RemoveAddress(ip string) { - c.remove <- ip +func (c *realCollector) peek() report.Report { + pc := make(chan report.Report) + c.peekc <- pc + return <-pc } -// Reports returns the channel where aggregate reports are sent. -func (c *Collector) Reports() <-chan report.Report { - return c.out +// Stop terminates the collector. +func (c *realCollector) Stop() { + close(c.quit) } // reportCollector is the loop to connect to a single Probe. It'll keep @@ -188,7 +195,7 @@ func reportCollector(ip string, col chan<- report.Report, quit <-chan struct{}) log.Printf("decode error: %v", err) break } - //log.Printf("collector: got a report from %v", ip) + log.Printf("collector: got a report from %v", ip) select { case col <- report: diff --git a/xfer/collector_test.go b/xfer/collector_test.go index f839471353..be56fc5860 100644 --- a/xfer/collector_test.go +++ b/xfer/collector_test.go @@ -1,69 +1,102 @@ -package xfer_test +package xfer import ( - "bytes" "encoding/gob" "io/ioutil" "log" "net" + "runtime" "testing" "time" "github.com/weaveworks/scope/report" - "github.com/weaveworks/scope/xfer" ) func TestCollector(t *testing.T) { log.SetOutput(ioutil.Discard) - // Build the address - port := ":12345" - addr, err := net.ResolveTCPAddr("tcp4", "127.0.0.1"+port) + // Swap out ticker + publish := make(chan time.Time) + oldTick := tick + tick = func(time.Duration) <-chan time.Time { return publish } + defer func() { tick = oldTick }() + + // Build a collector + collector := NewCollector(time.Second) + defer collector.Stop() + + concreteCollector, ok := collector.(*realCollector) + if !ok { + t.Fatal("type assertion failure") + } + + // Build a test publisher + reports := make(chan interface{}) + ln := testPublisher(t, reports) + defer ln.Close() + + // Connect the collector to the test publisher + addr := ln.Addr().String() + collector.Add(addr) + collector.Add(addr) // test duplicate case + runtime.Gosched() // make sure it connects + + // Push a report through everything + reports <- report.Report{Network: report.Topology{NodeMetadatas: report.NodeMetadatas{"a": report.NodeMetadata{}}}} + poll(t, time.Millisecond, func() bool { return len(concreteCollector.peek().Network.NodeMetadatas) == 1 }, "missed the report") + go func() { publish <- time.Now() }() + if want, have := 1, len((<-collector.Reports()).Network.NodeMetadatas); want != have { + t.Errorf("want %d, have %d", want, have) + } + + collector.Remove(addr) + collector.Remove(addr) // test duplicate case +} + +func TestCollectorQuitWithActiveConnections(t *testing.T) { + c := NewCollector(time.Second) + c.Add("1.2.3.4:56789") + c.Stop() +} + +func testPublisher(t *testing.T, input <-chan interface{}) net.Listener { + addr, err := net.ResolveTCPAddr("tcp4", "127.0.0.1:0") if err != nil { t.Fatal(err) } - - // Start a raw publisher ln, err := net.ListenTCP("tcp4", addr) if err != nil { t.Fatal(err) } - defer ln.Close() - - // Accept one connection, write one report - data := make(chan []byte) go func() { conn, err := ln.Accept() if err != nil { - t.Error(err) + t.Log(err) return } defer conn.Close() - - if _, err := conn.Write(<-data); err != nil { - t.Error(err) - return + for { + enc := gob.NewEncoder(conn) + for v := range input { + if err := enc.Encode(v); err != nil { + t.Error(err) + return + } + } } }() + return ln +} - // Start a collector - batchTime := 10 * time.Millisecond - c := xfer.NewCollector(batchTime) - c.AddAddress("127.0.0.1" + port) - gate := make(chan struct{}) - go func() { <-c.Reports(); c.Stop(); close(gate) }() - - // Publish a message - var buf bytes.Buffer - if err := gob.NewEncoder(&buf).Encode(report.Report{}); err != nil { - t.Fatal(err) - } - data <- buf.Bytes() - - // Check it was collected and forwarded - select { - case <-gate: - case <-time.After(2 * batchTime): - t.Errorf("timeout waiting for report") +func poll(t *testing.T, d time.Duration, condition func() bool, msg string) { + deadline := time.Now().Add(d) + for { + if time.Now().After(deadline) { + t.Fatal(msg) + } + if condition() { + return + } + time.Sleep(d / 10) } } diff --git a/xfer/merge_test.go b/xfer/merge_test.go index 4fd8c90f08..531fbbd1c4 100644 --- a/xfer/merge_test.go +++ b/xfer/merge_test.go @@ -32,8 +32,8 @@ func TestMerge(t *testing.T) { batchTime := 100 * time.Millisecond c := xfer.NewCollector(batchTime) - c.AddAddress(p1Addr) - c.AddAddress(p2Addr) + c.Add(p1Addr) + c.Add(p2Addr) defer c.Stop() time.Sleep(batchTime / 10) // connect