Skip to content
This repository was archived by the owner on Jan 20, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
0fd206e
wip
pompon0 Aug 18, 2025
5d7abc8
almost
pompon0 Aug 18, 2025
41458d9
seems fine
pompon0 Aug 18, 2025
f787ad4
should work
pompon0 Aug 18, 2025
54373ba
fix
pompon0 Aug 18, 2025
4cfdd55
tc := tc
pompon0 Aug 18, 2025
170eb67
fixed a cleanup in test
pompon0 Aug 18, 2025
f099af6
refactored baseService
pompon0 Aug 20, 2025
fe5aa98
added task spawning
pompon0 Aug 20, 2025
7cdf7f9
example refactor: ticker
pompon0 Aug 20, 2025
4ead621
Merge remote-tracking branch 'origin/main' into gprusak-refactor
pompon0 Aug 20, 2025
47e476b
Merge branch 'gprusak-refactor' into gprusak-refactor2
pompon0 Aug 20, 2025
f200d99
go vet
pompon0 Aug 20, 2025
e5321d5
Merge branch 'gprusak-refactor' into gprusak-refactor2
pompon0 Aug 20, 2025
73be0e7
Merge remote-tracking branch 'origin/main' into gprusak-refactor2
pompon0 Aug 21, 2025
7343299
Merge branch 'main' into gprusak-refactor2
pompon0 Aug 22, 2025
8ea7c2f
Merge branch 'main' into gprusak-refactor2
pompon0 Aug 25, 2025
4d0ed31
Merge branch 'main' into gprusak-refactor2
pompon0 Aug 26, 2025
f609e96
2-heap queue
pompon0 Aug 26, 2025
e58e0ee
almost done
pompon0 Aug 26, 2025
257fa17
binary compiles
pompon0 Aug 26, 2025
d42991d
tests wip
pompon0 Aug 26, 2025
910fe80
before making OpenChannel private
pompon0 Aug 26, 2025
69d692f
made the channels non-cancellable instead
pompon0 Aug 26, 2025
b68a532
p2p tests pass, need more tests for the queue
pompon0 Aug 26, 2025
f840a26
internal tests compile
pompon0 Aug 26, 2025
9d210f5
compensated for stupid descriptors
pompon0 Aug 26, 2025
0fefcc8
formatting
pompon0 Aug 27, 2025
85b8b31
Merge branch 'gprusak-refactor2' into gprusak-refactor3c
pompon0 Aug 27, 2025
dc494a7
moved size quirk outside of simple-queue
pompon0 Aug 27, 2025
1c76fb6
queue tests
pompon0 Aug 27, 2025
b8f730c
metrics for dropped messages
pompon0 Aug 27, 2025
4f3f056
fix
pompon0 Aug 27, 2025
2de60d9
Merge remote-tracking branch 'origin/main' into gprusak-refactor3c
pompon0 Aug 28, 2025
e8589df
race condition fix
pompon0 Aug 28, 2025
7592ac8
fix
pompon0 Aug 28, 2025
4f0b937
snapshot
pompon0 Aug 29, 2025
36c939f
WIP
pompon0 Aug 29, 2025
66111f8
fixed address tests
pompon0 Aug 29, 2025
2e83f6e
peermanager tests
pompon0 Aug 29, 2025
c838558
wip
pompon0 Aug 29, 2025
0d858a7
pex kinda fixed
pompon0 Sep 1, 2025
9b43f06
p2p tests pass
pompon0 Sep 1, 2025
9da3bfc
test fix
pompon0 Sep 1, 2025
19a29cb
fmt
pompon0 Sep 1, 2025
05dbdb7
tidy
pompon0 Sep 1, 2025
30272db
fixed flaky test
pompon0 Sep 1, 2025
011db9b
Merge branch 'main' into gprusak-refactor3c
pompon0 Sep 1, 2025
4dc086a
fixed panic in handshakePeer
pompon0 Sep 2, 2025
0effa48
fixed test
pompon0 Sep 2, 2025
9392ba7
removed log
pompon0 Sep 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ require (
go.uber.org/zap v1.21.0 // indirect
golang.org/x/exp/typeparams v0.0.0-20220218215828-6cf2b201936e // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/term v0.27.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
Expand Down Expand Up @@ -254,5 +253,6 @@ require (
go.opentelemetry.io/otel v1.9.0
go.opentelemetry.io/otel/sdk v1.9.0
go.opentelemetry.io/otel/trace v1.9.0
golang.org/x/sys v0.28.0
google.golang.org/protobuf v1.28.0
)
12 changes: 1 addition & 11 deletions internal/blocksync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func setup(
}

chDesc := &p2p.ChannelDescriptor{ID: BlockSyncChannel, MessageType: new(bcproto.Message)}
rts.blockSyncChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc)
rts.blockSyncChannels = rts.network.MakeChannelsNoCleanup(t, chDesc)

i := 0
for nodeID := range rts.network.Nodes {
Expand Down Expand Up @@ -101,10 +101,7 @@ func setup(
func makeReactor(
ctx context.Context,
t *testing.T,
nodeID types.NodeID,
genDoc *types.GenesisDoc,
privVal types.PrivValidator,
channelCreator p2p.ChannelCreator,
peerEvents p2p.PeerEventSubscriber,
peerManager *p2p.PeerManager,
restartChan chan struct{},
Expand Down Expand Up @@ -188,10 +185,6 @@ func (rts *reactorTestSuite) addNode(
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])

chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
return rts.blockSyncChannels[nodeID], nil
}

peerEvents := func(ctx context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] }
restartChan := make(chan struct{})
remediationConfig := config.DefaultSelfRemediationConfig()
Expand All @@ -200,10 +193,7 @@ func (rts *reactorTestSuite) addNode(
reactor := makeReactor(
ctx,
t,
nodeID,
genDoc,
privVal,
chCreator,
peerEvents,
rts.network.Nodes[nodeID].PeerManager,
restartChan,
Expand Down
11 changes: 6 additions & 5 deletions internal/consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math"
"math/rand"
"os"
"sync"
Expand Down Expand Up @@ -57,7 +58,7 @@ func chDesc(chID p2p.ChannelID, size int) *p2p.ChannelDescriptor {
return &p2p.ChannelDescriptor{
ID: chID,
MessageType: new(tmcons.Message),
RecvBufferCapacity: size,
RecvBufferCapacity: int(math.Sqrt(float64(size)) + 1),
}
}

Expand All @@ -78,10 +79,10 @@ func setup(
blocksyncSubs: make(map[types.NodeID]eventbus.Subscription, numNodes),
}

rts.stateChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(StateChannel, size))
rts.dataChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(DataChannel, size))
rts.voteChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(VoteChannel, size))
rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(VoteSetBitsChannel, size))
rts.stateChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(StateChannel, size))
rts.dataChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(DataChannel, size))
rts.voteChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(VoteChannel, size))
rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(VoteSetBitsChannel, size))

ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)
Expand Down
8 changes: 6 additions & 2 deletions internal/evidence/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,12 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store) *reactorTe
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numStateStores),
}

chDesc := &p2p.ChannelDescriptor{ID: evidence.EvidenceChannel, MessageType: new(tmproto.Evidence)}
rts.evidenceChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc)
chDesc := &p2p.ChannelDescriptor{
ID: evidence.EvidenceChannel,
MessageType: new(tmproto.Evidence),
RecvBufferCapacity: 10,
}
rts.evidenceChannels = rts.network.MakeChannelsNoCleanup(t, chDesc)
require.Len(t, rts.network.RandomNode().PeerManager.Peers(), 0)

idx := 0
Expand Down
4 changes: 2 additions & 2 deletions internal/mempool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode
}

chDesc := GetChannelDescriptor(cfg.Mempool)
rts.mempoolChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc)
rts.mempoolChannels = rts.network.MakeChannelsNoCleanup(t, chDesc)

for nodeID := range rts.network.Nodes {
rts.kvstores[nodeID] = kvstore.NewApplication()
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) {
go primaryReactor.broadcastTxRoutine(ctx, secondary, rts.mempoolChannels[primary])

wg := &sync.WaitGroup{}
for i := 0; i < 50; i++ {
for range 50 {
next := &WrappedTx{}
wg.Add(1)
go func() {
Expand Down
16 changes: 10 additions & 6 deletions internal/p2p/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net"
"net/netip"
"net/url"
"regexp"
"strconv"
Expand Down Expand Up @@ -97,7 +98,7 @@ func ParseNodeAddress(urlString string) (NodeAddress, error) {

// Resolve resolves a NodeAddress into a set of Endpoints, by expanding
// out a DNS hostname to IP addresses.
func (a NodeAddress) Resolve(ctx context.Context) ([]*Endpoint, error) {
func (a NodeAddress) Resolve(ctx context.Context) ([]Endpoint, error) {
if a.Protocol == "" {
return nil, errors.New("address has no protocol")
}
Expand All @@ -109,7 +110,7 @@ func (a NodeAddress) Resolve(ctx context.Context) ([]*Endpoint, error) {
if a.NodeID == "" {
return nil, errors.New("local address has no node ID")
}
return []*Endpoint{{
return []Endpoint{{
Protocol: a.Protocol,
Path: string(a.NodeID),
}}, nil
Expand All @@ -119,12 +120,15 @@ func (a NodeAddress) Resolve(ctx context.Context) ([]*Endpoint, error) {
if err != nil {
return nil, err
}
endpoints := make([]*Endpoint, len(ips))
endpoints := make([]Endpoint, len(ips))
for i, ip := range ips {
endpoints[i] = &Endpoint{
ip, ok := netip.AddrFromSlice(ip)
if !ok {
return nil, fmt.Errorf("LookupIP returned invalid IP %q", ip)
}
endpoints[i] = Endpoint{
Protocol: a.Protocol,
IP: ip,
Port: a.Port,
Addr: netip.AddrPortFrom(ip, a.Port),
Path: a.Path,
}
}
Expand Down
71 changes: 29 additions & 42 deletions internal/p2p/address_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package p2p_test

import (
"net"
"net/netip"
"strings"
"testing"

"github.com/stretchr/testify/require"

"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/libs/utils/require"
"github.com/tendermint/tendermint/libs/utils/tcp"
"github.com/tendermint/tendermint/types"
)

Expand Down Expand Up @@ -202,61 +202,61 @@ func TestNodeAddress_Resolve(t *testing.T) {

testcases := []struct {
address p2p.NodeAddress
expect *p2p.Endpoint
expect p2p.Endpoint
ok bool
}{
// Valid networked addresses (with hostname).
{
p2p.NodeAddress{Protocol: "tcp", Hostname: "127.0.0.1", Port: 80, Path: "/path"},
&p2p.Endpoint{Protocol: "tcp", IP: net.IPv4(127, 0, 0, 1), Port: 80, Path: "/path"},
p2p.Endpoint{Protocol: "tcp", Addr: netip.AddrPortFrom(tcp.IPv4Loopback(), 80), Path: "/path"},
true,
},
{
p2p.NodeAddress{Protocol: "tcp", Hostname: "127.0.0.1"},
&p2p.Endpoint{Protocol: "tcp", IP: net.IPv4(127, 0, 0, 1)},
p2p.Endpoint{Protocol: "tcp", Addr: netip.AddrPortFrom(tcp.IPv4Loopback(), 0)},
true,
},
{
p2p.NodeAddress{Protocol: "tcp", Hostname: "::1"},
&p2p.Endpoint{Protocol: "tcp", IP: net.IPv6loopback},
p2p.Endpoint{Protocol: "tcp", Addr: netip.AddrPortFrom(netip.IPv6Loopback(), 0)},
true,
},
{
p2p.NodeAddress{Protocol: "tcp", Hostname: "8.8.8.8"},
&p2p.Endpoint{Protocol: "tcp", IP: net.IPv4(8, 8, 8, 8)},
p2p.Endpoint{Protocol: "tcp", Addr: netip.AddrPortFrom(netip.AddrFrom4([4]byte{8, 8, 8, 8}), 0)},
true,
},
{
p2p.NodeAddress{Protocol: "tcp", Hostname: "2001:0db8::ff00:0042:8329"},
&p2p.Endpoint{Protocol: "tcp", IP: []byte{
0x20, 0x01, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x00, 0x00, 0x42, 0x83, 0x29}},
p2p.Endpoint{Protocol: "tcp", Addr: netip.AddrPortFrom(netip.AddrFrom16([16]byte{
0x20, 0x01, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x00, 0x00, 0x42, 0x83, 0x29}), 0)},
true,
},
{
p2p.NodeAddress{Protocol: "tcp", Hostname: "some.missing.host.tendermint.com"},
&p2p.Endpoint{},
p2p.Endpoint{},
false,
},

// Valid non-networked addresses.
{
p2p.NodeAddress{Protocol: "memory", NodeID: id},
&p2p.Endpoint{Protocol: "memory", Path: string(id)},
p2p.Endpoint{Protocol: "memory", Path: string(id)},
true,
},
{
p2p.NodeAddress{Protocol: "memory", NodeID: id, Path: string(id)},
&p2p.Endpoint{Protocol: "memory", Path: string(id)},
p2p.Endpoint{Protocol: "memory", Path: string(id)},
true,
},

// Invalid addresses.
{p2p.NodeAddress{}, &p2p.Endpoint{}, false},
{p2p.NodeAddress{Hostname: "127.0.0.1"}, &p2p.Endpoint{}, false},
{p2p.NodeAddress{Protocol: "tcp", Hostname: "127.0.0.1:80"}, &p2p.Endpoint{}, false},
{p2p.NodeAddress{Protocol: "memory"}, &p2p.Endpoint{}, false},
{p2p.NodeAddress{Protocol: "memory", Path: string(id)}, &p2p.Endpoint{}, false},
{p2p.NodeAddress{Protocol: "tcp", Hostname: "💥"}, &p2p.Endpoint{}, false},
{p2p.NodeAddress{}, p2p.Endpoint{}, false},
{p2p.NodeAddress{Hostname: "127.0.0.1"}, p2p.Endpoint{}, false},
{p2p.NodeAddress{Protocol: "tcp", Hostname: "127.0.0.1:80"}, p2p.Endpoint{}, false},
{p2p.NodeAddress{Protocol: "memory"}, p2p.Endpoint{}, false},
{p2p.NodeAddress{Protocol: "memory", Path: string(id)}, p2p.Endpoint{}, false},
{p2p.NodeAddress{Protocol: "tcp", Hostname: "💥"}, p2p.Endpoint{}, false},
}
for _, tc := range testcases {
t.Run(tc.address.String(), func(t *testing.T) {
Expand All @@ -265,39 +265,26 @@ func TestNodeAddress_Resolve(t *testing.T) {
require.Error(t, err)
return
}

// Special handling for localhost tests - accept either IPv4 or IPv6
if tc.address.Hostname == "localhost" && tc.address.Port == 80 && tc.address.Path == "/path" {
hasIPv4 := false
hasIPv6 := false
for _, ep := range endpoints {
if ep.Protocol == "tcp" && ep.Port == 80 && ep.Path == "/path" {
if ep.IP.Equal(net.IPv4(127, 0, 0, 1)) {
hasIPv4 = true
}
if ep.IP.Equal(net.IPv6loopback) {
hasIPv6 = true
}
}
}
require.True(t, hasIPv4 || hasIPv6, "localhost should resolve to either IPv4 or IPv6")
return
ok := false
tc.expect.Addr = tcp.Norm(tc.expect.Addr)
for _, e := range endpoints {
e.Addr = tcp.Norm(e.Addr)
ok = ok || e == tc.expect
}
if !ok {
t.Fatalf("%v not in %v", tc.expect, endpoints)
}

require.Contains(t, endpoints, tc.expect)
})
}
t.Run("Resolve localhost", func(t *testing.T) {
addr := p2p.NodeAddress{Protocol: "tcp", Hostname: "localhost", Port: 80, Path: "/path"}
endpoints, err := addr.Resolve(t.Context())
require.NoError(t, err)

want := &p2p.Endpoint{Protocol: "tcp", Port: 80, Path: "/path"}
require.True(t, len(endpoints) > 0)
for _, got := range endpoints {
require.True(t, got.IP.IsLoopback())
require.True(t, got.Addr.Addr().IsLoopback())
// Any loopback address is acceptable, so ignore it in comparison.
want.IP = got.IP
want := p2p.Endpoint{Protocol: "tcp", Addr: netip.AddrPortFrom(got.Addr.Addr(), 80), Path: "/path"}
require.Equal(t, want, got)
}
})
Expand Down
20 changes: 10 additions & 10 deletions internal/p2p/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/gogo/protobuf/proto"

"github.com/tendermint/tendermint/libs/utils"
"github.com/tendermint/tendermint/types"
)

Expand Down Expand Up @@ -60,7 +61,7 @@ func (pe PeerError) Unwrap() error { return pe.Err }
// Each message is wrapped in an Envelope to specify its sender and receiver.
type Channel struct {
ID ChannelID
inCh <-chan Envelope // inbound messages (peers to reactors)
inCh *Queue // inbound messages (peers to reactors)
outCh chan<- Envelope // outbound messages (reactors to peers)
errCh chan<- PeerError // peer error reporting

Expand All @@ -69,7 +70,7 @@ type Channel struct {

// NewChannel creates a new channel. It is primarily for internal and test
// use, reactors should use Router.OpenChannel().
func NewChannel(id ChannelID, inCh <-chan Envelope, outCh chan<- Envelope, errCh chan<- PeerError) *Channel {
func NewChannel(id ChannelID, inCh *Queue, outCh chan<- Envelope, errCh chan<- PeerError) *Channel {
return &Channel{
ID: id,
inCh: inCh,
Expand Down Expand Up @@ -102,6 +103,8 @@ func (ch *Channel) SendError(ctx context.Context, pe PeerError) error {

func (ch *Channel) String() string { return fmt.Sprintf("p2p.Channel<%d:%s>", ch.ID, ch.name) }

func (ch *Channel) ReceiveLen() int { return ch.inCh.Len() }

// Receive returns a new unbuffered iterator to receive messages from ch.
// The iterator runs until ctx ends.
func (ch *Channel) Receive(ctx context.Context) *ChannelIterator {
Expand All @@ -128,15 +131,12 @@ type ChannelIterator struct {

func iteratorWorker(ctx context.Context, ch *Channel, pipe chan Envelope) {
for {
select {
case <-ctx.Done():
e, err := ch.inCh.Recv(ctx)
if err != nil {
return
}
if err := utils.Send(ctx, pipe, e); err != nil {
return
case envelope := <-ch.inCh:
select {
case <-ctx.Done():
return
case pipe <- envelope:
}
}
}
}
Expand Down
Loading