diff --git a/daemon/cc_info_timer.go b/daemon/cc_info_timer.go index 31c0f13..43d7c3f 100644 --- a/daemon/cc_info_timer.go +++ b/daemon/cc_info_timer.go @@ -45,6 +45,9 @@ type CCInfoTimerService struct { stopChan chan struct{} wg sync.WaitGroup + // Guards concurrent fetchRateLimit goroutines + rateLimitFetchMu sync.Mutex + // Git info cache (per working directory) gitCache map[string]*GitCacheEntry @@ -160,7 +163,15 @@ func (s *CCInfoTimerService) timerLoop() { // Fetch immediately on start s.fetchActiveRanges(context.Background()) s.fetchGitInfo() - go s.fetchRateLimit(context.Background()) + go func() { + if !s.rateLimitFetchMu.TryLock() { + return + } + defer s.rateLimitFetchMu.Unlock() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + s.fetchRateLimit(ctx) + }() go s.fetchUserProfile(context.Background()) for { @@ -175,7 +186,15 @@ func (s *CCInfoTimerService) timerLoop() { } s.fetchActiveRanges(context.Background()) s.fetchGitInfo() - go s.fetchRateLimit(context.Background()) + go func() { + if !s.rateLimitFetchMu.TryLock() { + return + } + defer s.rateLimitFetchMu.Unlock() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + s.fetchRateLimit(ctx) + }() case <-s.stopChan: return diff --git a/daemon/chan.go b/daemon/chan.go index f6a1f67..2a89874 100644 --- a/daemon/chan.go +++ b/daemon/chan.go @@ -2,7 +2,9 @@ package daemon import ( "context" + "fmt" "sync" + "time" "github.com/lithammer/shortuuid/v3" "github.com/pkg/errors" @@ -344,6 +346,9 @@ func (s *subscriber) sendMessageToSubscriber(msg *message.Message, logFields wat ctx, cancelCtx := context.WithCancel(s.ctx) defer cancelCtx() + const maxRetries = 3 + retryCount := 0 + SendToSubscriber: for { // copy the message to prevent ack/nack propagation to other consumers @@ -371,8 +376,20 @@ SendToSubscriber: s.logger.Trace("Message acked", logFields) return case <-msgToSend.Nacked(): - s.logger.Trace("Nack received, resending message", logFields) - continue SendToSubscriber + retryCount++ + if retryCount > maxRetries { + s.logger.Error("Max retries reached, dropping message", errors.New("max retries reached"), logFields) + return + } + backoff := time.Duration(100< 0 } return info diff --git a/daemon/socket.go b/daemon/socket.go index e7cfde1..25874f8 100644 --- a/daemon/socket.go +++ b/daemon/socket.go @@ -127,21 +127,22 @@ func (p *SocketHandler) Stop() { func (p *SocketHandler) acceptConnections() { for { - select { - case <-p.stopChan: - return - default: - conn, err := p.listener.Accept() - if err != nil { - continue + conn, err := p.listener.Accept() + if err != nil { + select { + case <-p.stopChan: + return + default: } - go p.handleConnection(conn) + continue } + go p.handleConnection(conn) } } func (p *SocketHandler) handleConnection(conn net.Conn) { defer conn.Close() + conn.SetDeadline(time.Now().Add(5 * time.Second)) decoder := json.NewDecoder(conn) var msg SocketMessage if err := decoder.Decode(&msg); err != nil {