diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index 7391de4ea..76e628710 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -575,7 +575,8 @@ func (m *PeerManager) Dialed(address NodeAddress) error { return fmt.Errorf("rejecting connection to self (%v)", address.NodeID) } if m.connected[address.NodeID] { - return fmt.Errorf("peer %v is already connected", address.NodeID) + dupeConnectionErr := fmt.Errorf("cant dial, peer=%q is already connected", address.NodeID) + return dupeConnectionErr } if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) { if upgradeFromPeer == "" || len(m.connected) >= @@ -642,7 +643,8 @@ func (m *PeerManager) Accepted(peerID types.NodeID) error { return fmt.Errorf("rejecting connection from self (%v)", peerID) } if m.connected[peerID] { - return fmt.Errorf("peer %q is already connected", peerID) + dupeConnectionErr := fmt.Errorf("can't accept, peer=%q is already connected", peerID) + return dupeConnectionErr } if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected)+int(m.options.MaxConnectedUpgrade) { diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index 541d395a7..77c959c21 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -179,9 +179,6 @@ func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) { // channel just because of an error here? } - // Note we do not update the poll timer upon making a request, only - // when we receive an update that updates our priors. - case envelope, ok := <-incoming: if !ok { return // channel closed @@ -311,7 +308,7 @@ func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh *p2p.Channel) e defer r.mtx.Unlock() if len(r.availablePeers) == 0 { // no peers are available - r.logger.Debug("no available peers to send a PEX request to (retrying)") + r.logger.Error("no available peers to send a PEX request to (retrying)") return nil } diff --git a/internal/p2p/router.go b/internal/p2p/router.go index d821b1a77..2af742fa2 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -8,6 +8,7 @@ import ( "math/rand" "net" "runtime" + "strings" "sync" "time" @@ -546,6 +547,10 @@ func (r *Router) openConnection(ctx context.Context, conn Connection) { } if err := r.runWithPeerMutex(func() error { return r.peerManager.Accepted(peerInfo.NodeID) }); err != nil { + // If peer is trying to reconnect, error and let it reconnect + if strings.Contains(err.Error(), "is already connected") { + r.peerManager.Errored(peerInfo.NodeID, err) + } r.logger.Error("failed to accept connection", "op", "incoming/accepted", "peer", peerInfo.NodeID, "err", err) return @@ -637,6 +642,11 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) { } if err := r.runWithPeerMutex(func() error { return r.peerManager.Dialed(address) }); err != nil { + // If peer is trying to reconnect, fail it and let it reconnect + if strings.Contains(err.Error(), "is already connected") { + r.peerManager.Disconnected(ctx, address.NodeID) + } + r.logger.Error("failed to dial peer", "op", "outgoing/dialing", "peer", address.NodeID, "err", err) conn.Close()