@@ -34,16 +34,18 @@ type DiscoveryManager struct {
3434 feed chan * discovery.Event
3535 watchersMutex sync.Mutex
3636 watchers map [* PortWatcher ]bool
37+ watchersCache map [string ]map [string ]* discovery.Event
3738}
3839
3940var tr = i18n .Tr
4041
4142// New creates a new DiscoveryManager
4243func New () * DiscoveryManager {
4344 return & DiscoveryManager {
44- discoveries : map [string ]* discovery.PluggableDiscovery {},
45- watchers : map [* PortWatcher ]bool {},
46- feed : make (chan * discovery.Event , 50 ),
45+ discoveries : map [string ]* discovery.PluggableDiscovery {},
46+ watchers : map [* PortWatcher ]bool {},
47+ feed : make (chan * discovery.Event , 50 ),
48+ watchersCache : map [string ]map [string ]* discovery.Event {},
4749 }
4850}
4951
@@ -139,9 +141,16 @@ func (dm *DiscoveryManager) Watch() (*PortWatcher, error) {
139141 dm .watchersMutex .Unlock ()
140142 close (watcher .feed )
141143 }
142- dm .watchersMutex .Lock ()
143- dm .watchers [watcher ] = true
144- dm .watchersMutex .Unlock ()
144+ go func () {
145+ dm .watchersMutex .Lock ()
146+ for _ , cache := range dm .watchersCache {
147+ for _ , ev := range cache {
148+ watcher .feed <- ev
149+ }
150+ }
151+ dm .watchers [watcher ] = true
152+ dm .watchersMutex .Unlock ()
153+ }()
145154 return watcher , nil
146155}
147156
@@ -188,19 +197,35 @@ func (dm *DiscoveryManager) feeder() {
188197}
189198
190199func (dm * DiscoveryManager ) cacheEvent (ev * discovery.Event ) {
191- // XXX: TODO
200+ cache := dm .watchersCache [ev .DiscoveryID ]
201+ if cache == nil {
202+ cache = map [string ]* discovery.Event {}
203+ dm .watchersCache [ev .DiscoveryID ] = cache
204+ }
205+
206+ eventID := ev .Port .Address + "|" + ev .Port .Protocol
207+ switch ev .Type {
208+ case "add" :
209+ cache [eventID ] = ev
210+ case "remove" :
211+ delete (cache , eventID )
212+ default :
213+ logrus .Errorf ("Unhandled event from discovery: %s" , ev .Type )
214+ return
215+ }
192216}
193217
194218// List return the current list of ports detected from all discoveries
195219func (dm * DiscoveryManager ) List () []* discovery.Port {
196220 dm .Start ()
197221
198- // XXX: Cache ports and return them
199- dm .discoveriesMutex .Lock ()
200- defer dm .discoveriesMutex .Unlock ()
201222 res := []* discovery.Port {}
202- for _ , d := range dm .discoveries {
203- res = append (res , d .ListCachedPorts ()... )
223+ dm .watchersMutex .Lock ()
224+ defer dm .watchersMutex .Unlock ()
225+ for _ , cache := range dm .watchersCache {
226+ for _ , ev := range cache {
227+ res = append (res , ev .Port )
228+ }
204229 }
205230 return res
206231}
0 commit comments