diff --git a/dot/network/discovery.go b/dot/network/discovery.go index 35f42ad712..6c3c6b1a54 100644 --- a/dot/network/discovery.go +++ b/dot/network/discovery.go @@ -75,13 +75,15 @@ func (d *discovery) start() error { // get all currently connected peers and use them to bootstrap the DHT peers := d.h.Network().Peers() + t := time.NewTicker(startDHTTimeout) + defer t.Stop() for { if len(peers) > 0 { break } select { - case <-time.After(startDHTTimeout): + case <-t.C: logger.Debug("no peers yet, waiting to start DHT...") // wait for peers to connect before starting DHT, otherwise DHT bootstrap nodes // will be empty and we will fail to fill the routing table @@ -169,11 +171,13 @@ func (d *discovery) advertise() { } func (d *discovery) checkPeerCount() { + t := time.NewTicker(connectToPeersTimeout) + defer t.Stop() for { select { case <-d.ctx.Done(): return - case <-time.After(connectToPeersTimeout): + case <-t.C: if len(d.h.Network().Peers()) > d.minPeers { continue } diff --git a/dot/network/sync.go b/dot/network/sync.go index 855e4d6a25..36de6e405d 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -88,7 +88,9 @@ const ( badPeerThreshold int = -2 protectedPeerThreshold int = 7 - defaultSlotDuration = time.Second * 6 + defaultSlotDuration = time.Second * 6 + defaultHandleResponseQueueDuration = time.Second + defaultPrunePeersDuration = time.Second * 30 ) var ( @@ -132,26 +134,30 @@ type syncQueue struct { goal int64 // goal block number we are trying to sync to currStart, currEnd int64 // the start and end of the BlockResponse we are currently handling; 0 and 0 if we are not currently handling any - benchmarker *syncBenchmarker + benchmarker *syncBenchmarker + handleResponseQueueDuration time.Duration + prunePeersDuration time.Duration } func newSyncQueue(s *Service) *syncQueue { ctx, cancel := context.WithCancel(s.ctx) return &syncQueue{ - s: s, - slotDuration: defaultSlotDuration, - ctx: ctx, - cancel: cancel, - peerScore: new(sync.Map), - requestData: new(sync.Map), - requestDataByHash: new(sync.Map), - justificationRequestData: new(sync.Map), - requestCh: make(chan *syncRequest, blockRequestBufferSize), - responses: []*types.BlockData{}, - responseCh: make(chan []*types.BlockData, blockResponseBufferSize), - benchmarker: newSyncBenchmarker(), - buf: make([]byte, maxBlockResponseSize), + s: s, + slotDuration: defaultSlotDuration, + ctx: ctx, + cancel: cancel, + peerScore: new(sync.Map), + requestData: new(sync.Map), + requestDataByHash: new(sync.Map), + justificationRequestData: new(sync.Map), + requestCh: make(chan *syncRequest, blockRequestBufferSize), + responses: []*types.BlockData{}, + responseCh: make(chan []*types.BlockData, blockResponseBufferSize), + benchmarker: newSyncBenchmarker(), + buf: make([]byte, maxBlockResponseSize), + handleResponseQueueDuration: defaultHandleResponseQueueDuration, + prunePeersDuration: defaultPrunePeersDuration, } } @@ -176,10 +182,12 @@ func (q *syncQueue) syncAtHead() { q.s.syncer.SetSyncing(true) q.s.noGossip = true // don't gossip messages until we're at the head + t := time.NewTicker(q.slotDuration * 2) + defer t.Stop() for { select { // sleep for average block time TODO: make this configurable from slot duration - case <-time.After(q.slotDuration * 2): + case <-t.C: case <-q.ctx.Done(): return } @@ -214,9 +222,11 @@ func (q *syncQueue) syncAtHead() { } func (q *syncQueue) handleResponseQueue() { + t := time.NewTicker(q.handleResponseQueueDuration) + defer t.Stop() for { select { - case <-time.After(time.Second): + case <-t.C: case <-q.ctx.Done(): return } @@ -260,9 +270,11 @@ func (q *syncQueue) handleResponseQueue() { // prune peers with low score and connect to new peers func (q *syncQueue) prunePeers() { + t := time.NewTicker(q.prunePeersDuration) + defer t.Stop() for { select { - case <-time.After(time.Second * 30): + case <-t.C: case <-q.ctx.Done(): return } diff --git a/dot/telemetry/telemetry.go b/dot/telemetry/telemetry.go index cd11d65207..22ba7286ef 100644 --- a/dot/telemetry/telemetry.go +++ b/dot/telemetry/telemetry.go @@ -40,9 +40,10 @@ type Message struct { // Handler struct for holding telemetry related things type Handler struct { - msg chan Message - connections []*telemetryConnection - log log.Logger + msg chan Message + connections []*telemetryConnection + log log.Logger + sendMessageTimeout time.Duration } // KeyValue object to hold key value pairs used in telemetry messages @@ -56,14 +57,17 @@ var ( handlerInstance *Handler ) +const defaultMessageTimeout = time.Second + // GetInstance singleton pattern to for accessing TelemetryHandler func GetInstance() *Handler { //nolint if handlerInstance == nil { once.Do( func() { handlerInstance = &Handler{ - msg: make(chan Message, 256), - log: log.New("pkg", "telemetry"), + msg: make(chan Message, 256), + log: log.New("pkg", "telemetry"), + sendMessageTimeout: defaultMessageTimeout, } go handlerInstance.startListening() }) @@ -109,10 +113,12 @@ func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) { // SendMessage sends Message to connected telemetry listeners func (h *Handler) SendMessage(msg *Message) error { + t := time.NewTicker(h.sendMessageTimeout) + defer t.Stop() select { case h.msg <- *msg: - case <-time.After(time.Second * 1): + case <-t.C: return errors.New("timeout sending message") } return nil diff --git a/lib/grandpa/network.go b/lib/grandpa/network.go index a5c6c90556..e9f8d441e4 100644 --- a/lib/grandpa/network.go +++ b/lib/grandpa/network.go @@ -177,11 +177,13 @@ func (s *Service) handleNetworkMessage(from peer.ID, msg NotificationsMessage) ( } func (s *Service) sendNeighbourMessage() { + t := time.NewTicker(neighbourMessageInterval) + defer t.Stop() for { select { case <-s.ctx.Done(): return - case <-time.After(neighbourMessageInterval): + case <-t.C: if s.neighbourMessage == nil { continue }