@@ -18,6 +18,7 @@ package discoverymanager
1818import (
1919 "fmt"
2020 "sync"
21+ "time"
2122
2223 "github.com/arduino/arduino-cli/arduino/discovery"
2324 "github.com/arduino/arduino-cli/i18n"
@@ -83,7 +84,12 @@ func (dm *DiscoveryManager) Start() {
8384 return
8485 }
8586
86- go dm .feeder ()
87+ go func () {
88+ // Feed all watchers with data coming from the discoveries
89+ for ev := range dm .feed {
90+ dm .feedEvent (ev )
91+ }
92+ }()
8793
8894 var wg sync.WaitGroup
8995 for _ , d := range dm .discoveries {
@@ -136,13 +142,13 @@ func (dm *DiscoveryManager) Watch() (*PortWatcher, error) {
136142 dm .Start ()
137143
138144 watcher := & PortWatcher {
139- feed : make (chan * discovery.Event ),
145+ feed : make (chan * discovery.Event , 10 ),
140146 }
141147 watcher .closeCB = func () {
142148 dm .watchersMutex .Lock ()
143149 delete (dm .watchers , watcher )
144- dm .watchersMutex .Unlock ()
145150 close (watcher .feed )
151+ dm .watchersMutex .Unlock ()
146152 }
147153 go func () {
148154 dm .watchersMutex .Lock ()
@@ -180,44 +186,43 @@ func (dm *DiscoveryManager) startDiscovery(d *discovery.PluggableDiscovery) (dis
180186 return nil
181187}
182188
183- func (dm * DiscoveryManager ) feeder () {
184- // Feed all watchers with data coming from the discoveries
185- for ev := range dm .feed {
186- dm .watchersMutex .Lock ()
187- for watcher := range dm .watchers {
188- select {
189- case watcher .feed <- ev :
190- // OK
191- default :
192- // If the watcher is not able to process event fast enough
193- // remove the watcher from the list of watchers
194- go watcher .Close ()
195- }
189+ func (dm * DiscoveryManager ) feedEvent (ev * discovery.Event ) {
190+ dm .watchersMutex .Lock ()
191+ defer dm .watchersMutex .Unlock ()
192+
193+ if ev .Type == "stop" {
194+ // Remove all the cached events for the terminating discovery
195+ delete (dm .watchersCache , ev .DiscoveryID )
196+ return
197+ }
198+
199+ // Send the event to all watchers
200+ for watcher := range dm .watchers {
201+ select {
202+ case watcher .feed <- ev :
203+ // OK
204+ case <- time .After (time .Millisecond * 500 ):
205+ // If the watcher is not able to process event fast enough
206+ // remove the watcher from the list of watchers
207+ logrus .Info ("Watcher is not able to process events fast enough, removing it from the list of watchers" )
208+ delete (dm .watchers , watcher )
196209 }
197- dm .cacheEvent (ev )
198- dm .watchersMutex .Unlock ()
199210 }
200- }
201211
202- func ( dm * DiscoveryManager ) cacheEvent ( ev * discovery. Event ) {
212+ // Cache the event for the discovery
203213 cache := dm .watchersCache [ev .DiscoveryID ]
204214 if cache == nil {
205215 cache = map [string ]* discovery.Event {}
206216 dm .watchersCache [ev .DiscoveryID ] = cache
207217 }
208-
209218 eventID := ev .Port .Address + "|" + ev .Port .Protocol
210219 switch ev .Type {
211220 case "add" :
212221 cache [eventID ] = ev
213222 case "remove" :
214223 delete (cache , eventID )
215- case "quit" :
216- // Remove all the events for this discovery
217- delete (dm .watchersCache , ev .DiscoveryID )
218224 default :
219225 logrus .Errorf ("Unhandled event from discovery: %s" , ev .Type )
220- return
221226 }
222227}
223228
0 commit comments