From 58864d0ef9ab762b79660a9d83e545235e325db9 Mon Sep 17 00:00:00 2001 From: Brandon Weng <18161326+BrandonWeng@users.noreply.github.com> Date: Tue, 13 Dec 2022 11:15:26 -0500 Subject: [PATCH 1/6] Error if there's dupe connections --- internal/p2p/peermanager.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index 7391de4ea..df01c7a32 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -575,6 +575,8 @@ func (m *PeerManager) Dialed(address NodeAddress) error { return fmt.Errorf("rejecting connection to self (%v)", address.NodeID) } if m.connected[address.NodeID] { + dupeConnectionErr := fmt.Errorf("peer %q is already connected", address.NodeID) + m.Errored(address.NodeID, dupeConnectionErr) return fmt.Errorf("peer %v is already connected", address.NodeID) } if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) { @@ -642,7 +644,9 @@ func (m *PeerManager) Accepted(peerID types.NodeID) error { return fmt.Errorf("rejecting connection from self (%v)", peerID) } if m.connected[peerID] { - return fmt.Errorf("peer %q is already connected", peerID) + dupeConnectionErr := fmt.Errorf("peer %q is already connected", peerID) + m.Errored(peerID, dupeConnectionErr) + return dupeConnectionErr } if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected)+int(m.options.MaxConnectedUpgrade) { From 470a68cccd7d74376f9afa7be08515568e918194 Mon Sep 17 00:00:00 2001 From: Brandon Weng <18161326+BrandonWeng@users.noreply.github.com> Date: Tue, 13 Dec 2022 12:58:35 -0500 Subject: [PATCH 2/6] Go Routine --- internal/p2p/peermanager.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index df01c7a32..b68d3bbf6 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -576,8 +576,8 @@ func (m *PeerManager) Dialed(address NodeAddress) error { } if m.connected[address.NodeID] { dupeConnectionErr := fmt.Errorf("peer %q is already connected", address.NodeID) - m.Errored(address.NodeID, dupeConnectionErr) - return fmt.Errorf("peer %v is already connected", address.NodeID) + go m.Errored(address.NodeID, dupeConnectionErr) + return dupeConnectionErr } if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) { if upgradeFromPeer == "" || len(m.connected) >= @@ -645,7 +645,7 @@ func (m *PeerManager) Accepted(peerID types.NodeID) error { } if m.connected[peerID] { dupeConnectionErr := fmt.Errorf("peer %q is already connected", peerID) - m.Errored(peerID, dupeConnectionErr) + go m.Errored(peerID, dupeConnectionErr) return dupeConnectionErr } if m.options.MaxConnected > 0 && From 40a876c67d1bfff3481e9f01563c9de35ded90af Mon Sep 17 00:00:00 2001 From: Brandon Weng <18161326+BrandonWeng@users.noreply.github.com> Date: Tue, 13 Dec 2022 15:01:15 -0500 Subject: [PATCH 3/6] evict --- internal/p2p/peermanager.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index b68d3bbf6..32a2dd7d6 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -575,8 +575,8 @@ func (m *PeerManager) Dialed(address NodeAddress) error { return fmt.Errorf("rejecting connection to self (%v)", address.NodeID) } if m.connected[address.NodeID] { - dupeConnectionErr := fmt.Errorf("peer %q is already connected", address.NodeID) - go m.Errored(address.NodeID, dupeConnectionErr) + dupeConnectionErr := fmt.Errorf("cant dial, peer=%q is already connected", address.NodeID) + m.evictPeer(address.NodeID, dupeConnectionErr) return dupeConnectionErr } if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) { @@ -644,8 +644,8 @@ func (m *PeerManager) Accepted(peerID types.NodeID) error { return fmt.Errorf("rejecting connection from self (%v)", peerID) } if m.connected[peerID] { - dupeConnectionErr := fmt.Errorf("peer %q is already connected", peerID) - go m.Errored(peerID, dupeConnectionErr) + dupeConnectionErr := fmt.Errorf("can't accept, peer=%q is already connected", peerID) + m.evictPeer(peerID, dupeConnectionErr) return dupeConnectionErr } if m.options.MaxConnected > 0 && @@ -796,6 +796,10 @@ func (m *PeerManager) Errored(peerID types.NodeID, err error) { m.mtx.Lock() defer m.mtx.Unlock() + m.evictPeer(peerID, err) +} + +func (m *PeerManager) evictPeer(peerID types.NodeID, err error) { if m.connected[peerID] { m.evict[peerID] = true } From d4beba2dfc8be2fea80918f7ab5e69d0252eb600 Mon Sep 17 00:00:00 2001 From: Brandon Weng <18161326+BrandonWeng@users.noreply.github.com> Date: Tue, 13 Dec 2022 17:15:21 -0500 Subject: [PATCH 4/6] Disconnect and error instead --- internal/p2p/peermanager.go | 6 ------ internal/p2p/pex/reactor.go | 2 +- internal/p2p/router.go | 10 ++++++++++ 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index 32a2dd7d6..76e628710 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -576,7 +576,6 @@ func (m *PeerManager) Dialed(address NodeAddress) error { } if m.connected[address.NodeID] { dupeConnectionErr := fmt.Errorf("cant dial, peer=%q is already connected", address.NodeID) - m.evictPeer(address.NodeID, dupeConnectionErr) return dupeConnectionErr } if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) { @@ -645,7 +644,6 @@ func (m *PeerManager) Accepted(peerID types.NodeID) error { } if m.connected[peerID] { dupeConnectionErr := fmt.Errorf("can't accept, peer=%q is already connected", peerID) - m.evictPeer(peerID, dupeConnectionErr) return dupeConnectionErr } if m.options.MaxConnected > 0 && @@ -796,10 +794,6 @@ func (m *PeerManager) Errored(peerID types.NodeID, err error) { m.mtx.Lock() defer m.mtx.Unlock() - m.evictPeer(peerID, err) -} - -func (m *PeerManager) evictPeer(peerID types.NodeID, err error) { if m.connected[peerID] { m.evict[peerID] = true } diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index 541d395a7..a701bec29 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -311,7 +311,7 @@ func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh *p2p.Channel) e defer r.mtx.Unlock() if len(r.availablePeers) == 0 { // no peers are available - r.logger.Debug("no available peers to send a PEX request to (retrying)") + r.logger.Error("no available peers to send a PEX request to (retrying)") return nil } diff --git a/internal/p2p/router.go b/internal/p2p/router.go index d821b1a77..2e645d1d4 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -8,6 +8,7 @@ import ( "math/rand" "net" "runtime" + "strings" "sync" "time" @@ -546,6 +547,10 @@ func (r *Router) openConnection(ctx context.Context, conn Connection) { } if err := r.runWithPeerMutex(func() error { return r.peerManager.Accepted(peerInfo.NodeID) }); err != nil { + // If peer is trying to reconnect, fail it and let it reconnect + if strings.Contains(err.Error(), "is already connected") { + r.peerManager.Errored(peerInfo.NodeID, err) + } r.logger.Error("failed to accept connection", "op", "incoming/accepted", "peer", peerInfo.NodeID, "err", err) return @@ -637,6 +642,11 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) { } if err := r.runWithPeerMutex(func() error { return r.peerManager.Dialed(address) }); err != nil { + // If peer is trying to reconnect, fail it and let it reconnect + if strings.Contains(err.Error(), "is already connected") { + r.peerManager.Disconnected(ctx, address.NodeID) + } + r.logger.Error("failed to dial peer", "op", "outgoing/dialing", "peer", address.NodeID, "err", err) conn.Close() From d90d6fa21dfcd8e76bc6f8e670528032596ab1b6 Mon Sep 17 00:00:00 2001 From: Brandon Weng <18161326+BrandonWeng@users.noreply.github.com> Date: Tue, 13 Dec 2022 17:56:13 -0500 Subject: [PATCH 5/6] panic instead --- internal/p2p/pex/reactor.go | 6 +----- internal/p2p/router.go | 2 +- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index a701bec29..63e1d8752 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -179,9 +179,6 @@ func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) { // channel just because of an error here? } - // Note we do not update the poll timer upon making a request, only - // when we receive an update that updates our priors. - case envelope, ok := <-incoming: if !ok { return // channel closed @@ -311,8 +308,7 @@ func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh *p2p.Channel) e defer r.mtx.Unlock() if len(r.availablePeers) == 0 { // no peers are available - r.logger.Error("no available peers to send a PEX request to (retrying)") - return nil + panic("no available peers to send a PEX request to") } // Select an arbitrary peer from the available set. diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 2e645d1d4..2af742fa2 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -547,7 +547,7 @@ func (r *Router) openConnection(ctx context.Context, conn Connection) { } if err := r.runWithPeerMutex(func() error { return r.peerManager.Accepted(peerInfo.NodeID) }); err != nil { - // If peer is trying to reconnect, fail it and let it reconnect + // If peer is trying to reconnect, error and let it reconnect if strings.Contains(err.Error(), "is already connected") { r.peerManager.Errored(peerInfo.NodeID, err) } From 846e8b9e6e4bfb29d65d51100d4c3a39c3374d72 Mon Sep 17 00:00:00 2001 From: Brandon Weng <18161326+BrandonWeng@users.noreply.github.com> Date: Tue, 13 Dec 2022 23:08:01 -0500 Subject: [PATCH 6/6] revert panic --- internal/p2p/pex/reactor.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index 63e1d8752..77c959c21 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -308,7 +308,8 @@ func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh *p2p.Channel) e defer r.mtx.Unlock() if len(r.availablePeers) == 0 { // no peers are available - panic("no available peers to send a PEX request to") + r.logger.Error("no available peers to send a PEX request to (retrying)") + return nil } // Select an arbitrary peer from the available set.