diff --git a/sei-tendermint/config/toml.go b/sei-tendermint/config/toml.go index 9a98b4f21c..b1b3e121f9 100644 --- a/sei-tendermint/config/toml.go +++ b/sei-tendermint/config/toml.go @@ -3,6 +3,7 @@ package config import ( "bytes" "fmt" + "github.com/tendermint/tendermint/libs/utils/tcp" "os" "path/filepath" "strings" @@ -709,6 +710,7 @@ func ResetTestRootWithChainID(dir, testName string, chainID string) (*Config, er } config := TestConfig().SetRoot(rootDir) + config.P2P.ListenAddress = tcp.TestReserveAddr().String() config.Instrumentation.Namespace = fmt.Sprintf("%s_%s_%s", testName, chainID, tmrand.Str(16)) return config, nil } diff --git a/sei-tendermint/internal/hashable/hashable.go b/sei-tendermint/internal/hashable/hashable.go index d6c603c114..0b26d6ae18 100644 --- a/sei-tendermint/internal/hashable/hashable.go +++ b/sei-tendermint/internal/hashable/hashable.go @@ -27,8 +27,8 @@ func ParseHash[T Hashable](raw []byte) (Hash[T], error) { return Hash[T](raw), nil } -// ProtoHash hashes a Hashable proto object. -func ProtoHash[T Hashable](a T) Hash[T] { +// ToHash hashes a Hashable proto object. +func ToHash[T Hashable](a T) Hash[T] { return sha256.Sum256(MarshalCanonical(a)) } diff --git a/sei-tendermint/internal/mux/mux.go b/sei-tendermint/internal/mux/mux.go new file mode 100644 index 0000000000..2e7eb25016 --- /dev/null +++ b/sei-tendermint/internal/mux/mux.go @@ -0,0 +1,487 @@ +// mux package provides a TCP connection multiplexer - it allows to run +// multiple reliable independent bidirectional streams over a single TCP connection: +// The data is sent in frames of bounded size in round robin fashion over all the streams +// (fairness). There is no head-of-line blocking: a sender is not allowed to send bytes, +// until peer allows it - a TCP-like buffer window is maintained: peer declares the +// maximal size of message it is willing to consume, and the number of messages it currently +// can buffer locally. +// +// Each mux stream has its own Kind number. Kind numbers are supposed to identify the stream-level communication +// protocol (for example, if you implement an RPC server on top of this multiplexer, each RPC will have its own Kind number). +// +// # LOW LEVEL PROTOCOL +// +// Multiplexer traffic consists of frames. Frame looks as follows: +// [header size (1B)] [header] [payload] +// Header is a binary protobuf message with size up to 255B (because size is sent as a single byte). +// This message is intentionally flat with a small number of fields (see mux.proto), so that it is encoded efficiently. +// In particular header contains PayloadSize field which indicates the size of the payload of the frame which is sent after the header. +// There are multiple frame types: +// * OPEN (opens a stream) +// * RESIZE (extends the window of the stream, allowing peer to send more messages) +// * MSG (actual payload of the stream) +// * CLOSE (closes the stream - this is the last frame of the stream) +package mux + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "github.com/tendermint/tendermint/internal/mux/pb" + "github.com/tendermint/tendermint/internal/p2p/conn" + "github.com/tendermint/tendermint/internal/protoutils" + "github.com/tendermint/tendermint/libs/utils" + "github.com/tendermint/tendermint/libs/utils/scope" + "google.golang.org/protobuf/proto" + "io" +) + +const handshakeMaxSize = 10 * 1024 // 10kB + +var errUnknownStream = errors.New("frame for an unknown stream") +var errTooManyAccepts = errors.New("too many concurrent accepted streams") +var errFrameAfterClose = errors.New("received frame after CLOSE frame") +var errTooManyMsgs = errors.New("too many messages") +var errTooLargeMsg = errors.New("message too large") +var errUnknownKind = errors.New("unknown kind") + +type errConn struct{ error } + +type Config struct { + // Maximal number of bytes in a frame (excluding header). + FrameSize uint64 + // Limits on the number of concurrent streams of each kind. + Kinds map[StreamKind]*StreamKindConfig +} + +type StreamKindConfig struct { + // Maximal number of concurrent outbound streams. + MaxConnects uint64 + // Maximal number of concurrent inbound streams. + MaxAccepts uint64 +} + +type handshake struct { + Kinds map[StreamKind]*StreamKindConfig +} + +var handshakeConv = protoutils.Conv[*handshake, *pb.Handshake]{ + Encode: func(h *handshake) *pb.Handshake { + var kinds []*pb.StreamKindConfig + for kind, c := range h.Kinds { + kinds = append(kinds, &pb.StreamKindConfig{ + Kind: uint64(kind), + MaxConnects: c.MaxConnects, + MaxAccepts: c.MaxAccepts, + }) + } + return &pb.Handshake{Kinds: kinds} + }, + Decode: func(x *pb.Handshake) (*handshake, error) { + kinds := map[StreamKind]*StreamKindConfig{} + for _, pc := range x.Kinds { + kinds[StreamKind(pc.Kind)] = &StreamKindConfig{ + MaxConnects: pc.MaxConnects, + MaxAccepts: pc.MaxAccepts, + } + } + return &handshake{Kinds: kinds}, nil + }, +} + +type frame struct { + Header *pb.Header + Payload []byte +} + +type kindState struct { + connectsQueue chan *streamState + acceptsQueue chan *streamState +} + +type runnerInner struct { + nextID streamID + streams map[streamID]*streamState + acceptsSem map[StreamKind]uint64 +} + +// State of the running multiplexer. +type runner struct { + mux *Mux + inner utils.RWMutex[*runnerInner] +} + +func newRunner(mux *Mux) *runner { + return &runner{ + mux: mux, + inner: utils.NewRWMutex(&runnerInner{ + nextID: 0, + streams: map[streamID]*streamState{}, + acceptsSem: map[StreamKind]uint64{}, + }), + } +} + +// getOrAccept() gets the current state of the stream with the given id (kind is ignored). +// If the stream does not exist yet, it tries to create it as an accept (inbound) stream. +// In that case the inbound stream limit for the given kind is checked. +func (r *runner) getOrAccept(id streamID, kind StreamKind) (*streamState, error) { + fmt.Printf("getOrAccept(%v)\n", id) + for inner := range r.inner.RLock() { + s, ok := inner.streams[id] + if ok { + return s, nil + } + } + for inner := range r.inner.Lock() { + fmt.Printf("accepting stream %v\n", id) + if id.isConnect() { + return nil, errUnknownStream + } + if inner.acceptsSem[kind] == 0 { + return nil, errTooManyAccepts + } + inner.acceptsSem[kind] -= 1 + s := newStreamState(id, kind) + inner.streams[id] = s + return s, nil + } + panic("unreachable") +} + +func (i *runnerInner) newConnectStream(kind StreamKind) *streamState { + // Non-blocking since we just closed a connect Stream. + s := newStreamState(i.nextID, kind) + i.streams[s.id] = s + i.nextID += 2 + return s +} + +func (r *runner) tryPrune(id streamID) { + for inner := range r.inner.Lock() { + // Check if the stream is fully closed. + s, ok := inner.streams[id] + if !ok { + return + } + for sInner := range s.inner.Lock() { + if !sInner.closed.remote || !sInner.closed.local { + return + } + } + // Delete stream state. + delete(inner.streams, id) + // Free the stream capacity. + if id.isConnect() { + r.mux.kinds[s.kind].connectsQueue <- inner.newConnectStream(s.kind) + } else { + inner.acceptsSem[s.kind] += 1 + } + } +} + +// runSend handles the frame queue. +// The frames from all streams are interleaved in a round robin fashion. +// frames have bounded size to make sure that large messages do not slow down smaller ones. +// Stream priorities are not implemented (not needed). +// WARNING: it respects ctx only partially, because conn does not. +func (r *runner) runSend(ctx context.Context, conn conn.Conn) error { + for { + // Collect frames in round robin over streams. + var frames []*frame + flush := false + for queue, ctrl := range r.mux.queue.Lock() { + if err := ctrl.WaitUntil(ctx, func() bool { return len(queue) > 0 }); err != nil { + return err + } + frames = make([]*frame, 0, len(queue)) + for id := range queue { + frames = append(frames, queue.Pop(id, r.mux.cfg.FrameSize)) + } + flush = len(queue) == 0 + } + // Send the frames + for _, f := range frames { + id := streamID(f.Header.Id) + if f.Header.GetMsgEnd() { + // Notify sender about local buffer capacity. + for inner := range r.inner.RLock() { + for sInner, ctrl := range inner.streams[id].inner.Lock() { + sInner.send.bufBegin += 1 + ctrl.Updated() + } + } + } + if f.Header.GetClose() { + r.tryPrune(id) + } + headerRaw, err := proto.Marshal(f.Header) + if err != nil { + panic(err) + } + if _, err := conn.Write([]byte{byte(len(headerRaw))}); err != nil { + return errConn{err} + } + if _, err := conn.Write(headerRaw); err != nil { + return errConn{err} + } + if _, err := conn.Write(f.Payload); err != nil { + return errConn{err} + } + } + if flush { + if err := conn.Flush(); err != nil { + return errConn{err} + } + } + } +} + +// runRecv receives and processes the incoming frames sequentially. +func (r *runner) runRecv(conn conn.Conn) error { + for { + // frame size is hard capped here at 255B. + // Currently we have 7 varint fields (up to 77B) + var headerSize [1]byte + if _, err := conn.Read(headerSize[:]); err != nil { + return errConn{err} + } + headerRaw := make([]byte, headerSize[0]) + if _, err := io.ReadFull(conn, headerRaw[:]); err != nil { + return errConn{err} + } + var h pb.Header + if err := proto.Unmarshal(headerRaw, &h); err != nil { + return err + } + id := streamIDFromRemote(h.Id) + kind := StreamKind(h.GetKind()) + + s, err := r.getOrAccept(id, kind) + if err != nil { + return err + } + for sInner := range s.inner.Lock() { + fmt.Printf("sInner.closed.remote[%v] = %v, %v\n", s.id, sInner.closed.remote, &h) + if sInner.closed.remote { + return errFrameAfterClose + } + } + // Process the frame content in order: OPEN, RESIZE, MSG, CLOSE + if h.Kind != nil { + if err := s.RemoteOpen(h.GetMaxMsgSize()); err != nil { + return err + } + if !s.id.isConnect() { + r.mux.kinds[kind].acceptsQueue <- s + } + } + if we := h.GetWindowEnd(); we > 0 { + s.RemoteWindowEnd(we) + } + if ps := h.GetPayloadSize(); ps > 0 { + if err := s.RemotePayloadSize(ps); err != nil { + return err + } + // Read the payload. + payload := make([]byte, ps) + if _, err := io.ReadFull(conn, payload[:]); err != nil { + return err + } + s.RemotePayload(payload) + } + if h.GetMsgEnd() { + if err := s.RemoteMsgEnd(); err != nil { + return err + } + } + if h.GetClose() { + if err := s.RemoteClose(); err != nil { + return err + } + r.tryPrune(s.id) + } + } +} + +// Run runs the multiplexer for the given connection. +// It closes the connection before return. +func (m *Mux) Run(ctx context.Context, conn conn.Conn) error { + return scope.Run(ctx, func(ctx context.Context, s scope.Scope) error { + // Close on cancel. + s.Spawn(func() error { + <-ctx.Done() + s.Cancel(ctx.Err()) + conn.Close() + return nil + }) + + // Handshake exchange. + handshake, err := scope.Run1(ctx, func(ctx context.Context, s scope.Scope) (*handshake, error) { + s.Spawn(func() error { + handshakeRaw, err := proto.Marshal(handshakeConv.Encode(&handshake{Kinds: m.cfg.Kinds})) + if err != nil { + panic(err) + } + sizeRaw := binary.LittleEndian.AppendUint32(nil, uint32(len(handshakeRaw))) + if _, err := conn.Write(sizeRaw); err != nil { + return errConn{err} + } + if _, err := conn.Write(handshakeRaw); err != nil { + return errConn{err} + } + if err := conn.Flush(); err != nil { + return errConn{err} + } + return nil + }) + var sizeRaw [4]byte + if _, err := io.ReadFull(conn, sizeRaw[:]); err != nil { + return nil, errConn{err} + } + size := binary.LittleEndian.Uint32(sizeRaw[:]) + if size > handshakeMaxSize { + return nil, fmt.Errorf("handshake too large") + } + handshakeRaw := make([]byte, size) + if _, err := io.ReadFull(conn, handshakeRaw[:]); err != nil { + return nil, errConn{err} + } + var handshakeProto pb.Handshake + if err := proto.Unmarshal(handshakeRaw, &handshakeProto); err != nil { + return nil, err + } + return handshakeConv.Decode(&handshakeProto) + }) + if err != nil { + return err + } + + // Initialize runner with handshake data. + r := newRunner(m) + for inner := range r.inner.Lock() { + for kind, cfg := range m.cfg.Kinds { + remCfg, ok := handshake.Kinds[kind] + if !ok { + remCfg = &StreamKindConfig{} + } + inner.acceptsSem[kind] = min(cfg.MaxAccepts, remCfg.MaxConnects) + for range min(cfg.MaxConnects, remCfg.MaxAccepts) { + m.kinds[kind].connectsQueue <- inner.newConnectStream(kind) + } + } + } + // Run the tasks. + s.Spawn(func() error { return r.runSend(ctx, conn) }) + s.Spawn(func() error { return r.runRecv(conn) }) + return nil + }) +} + +// queue is a queue of frames to send, consumed by runSend. +type queue map[streamID]*frame + +// Get returns the frame corresponding to the given stream id. +// If it doesn't exist, it initializes the frame first. +func (q queue) Get(id streamID) *frame { + f, ok := q[id] + if ok { + return f + } + q[id] = &frame{Header: &pb.Header{Id: uint64(id)}} + return q[id] +} + +// Pop removes a frame of the given stream from the queue. +// Panics if there is no frame for this id. +// If a frame is too large (payload larger than maxPayload) it splits +// the frame into 2 smaller ones and returns the first one. +func (q queue) Pop(id streamID, maxPayload uint64) *frame { + f, ok := q[id] + if !ok { + panic(fmt.Errorf("missing frame")) + } + if uint64(len(f.Payload)) <= maxPayload { + delete(q, id) + return f + } + // Split the frame into first and second. + first := &frame{ + Header: &pb.Header{ + Id: f.Header.Id, + Kind: f.Header.Kind, + MaxMsgSize: f.Header.MaxMsgSize, + WindowEnd: f.Header.WindowEnd, + PayloadSize: &maxPayload, + // Close and MsgEnd fields are left in the second frame. + }, + Payload: f.Payload[:maxPayload], + } + // Clear the fields from the first frame. + f.Header.Kind = nil + f.Header.MaxMsgSize = nil + f.Header.WindowEnd = nil + f.Payload = f.Payload[maxPayload:] + f.Header.PayloadSize = utils.Alloc(uint64(len(f.Payload))) + return first +} + +type Mux struct { + cfg *Config + kinds map[StreamKind]*kindState + queue *utils.Watch[queue] +} + +// NewMux constructs a new multipexer. +// Remember to spawn Mux.Run() afterwards. +func NewMux(cfg *Config) *Mux { + kinds := map[StreamKind]*kindState{} + for kind, c := range cfg.Kinds { + kinds[kind] = &kindState{ + acceptsQueue: make(chan *streamState, c.MaxAccepts), + connectsQueue: make(chan *streamState, c.MaxConnects), + } + } + queue := utils.NewWatch(queue{}) + return &Mux{cfg: cfg, kinds: kinds, queue: &queue} +} + +// Connect establishes a new stream of the given kind. +// Blocks until the number of concurrent connects falls below the allowed limit. +// Then it waits until peer accepts the connection. +// Remember to Close() the stream after use. +func (m *Mux) Connect(ctx context.Context, kind StreamKind, maxMsgSize uint64, window uint64) (*Stream, error) { + ks, ok := m.kinds[kind] + if !ok { + return nil, fmt.Errorf("%w %v", errUnknownKind, kind) + } + state, err := utils.Recv(ctx, ks.connectsQueue) + if err != nil { + return nil, err + } + s := &Stream{state, m.queue} + if err := s.open(ctx, maxMsgSize, window); err != nil { + return nil, err + } + return s, nil +} + +// Accept accepts an incoming stream of the given kind. +// Blocks until peer opens a connect stream. +// Remember to Close() the stream after use. +func (m *Mux) Accept(ctx context.Context, kind StreamKind, maxMsgSize uint64, window uint64) (*Stream, error) { + ks, ok := m.kinds[kind] + if !ok { + return nil, fmt.Errorf("%w %v", errUnknownKind, kind) + } + state, err := utils.Recv(ctx, ks.acceptsQueue) + if err != nil { + return nil, err + } + s := &Stream{state, m.queue} + if err := s.open(ctx, maxMsgSize, window); err != nil { + return nil, err + } + return s, nil +} diff --git a/sei-tendermint/internal/mux/mux.proto b/sei-tendermint/internal/mux/mux.proto new file mode 100644 index 0000000000..fe15162f0b --- /dev/null +++ b/sei-tendermint/internal/mux/mux.proto @@ -0,0 +1,34 @@ +syntax = "proto3"; + +package mux; + +option go_package = "github.com/tendermint/tendermint/internal/mux/pb"; + +message StreamKindConfig { + uint64 kind = 1; + uint64 max_connects = 2; + uint64 max_accepts = 3; +} + +message Handshake { + repeated StreamKindConfig kinds = 1; +} + +// Flat small message, representing a unit of multiplexer information. +// The frames for the same id can be sent merged, +// since they have disjoint field sets. +// +// OPEN = id,kind,max_msg_size +// RESIZE = id,window_end +// MSG = id,payload_size,[msg_end] +// CLOSE = id,close +message Header { + // TOTAL: 59B + uint64 id = 1; // 11B + optional uint64 kind = 2; //11B + optional uint64 max_msg_size = 3; // 11B + optional uint64 window_end = 4; // 11B + optional uint64 payload_size = 5; // 11B + optional bool msg_end = 6; // 2B + optional bool close = 7; // 2B +} diff --git a/sei-tendermint/internal/mux/mux_test.go b/sei-tendermint/internal/mux/mux_test.go new file mode 100644 index 0000000000..0b7c28e632 --- /dev/null +++ b/sei-tendermint/internal/mux/mux_test.go @@ -0,0 +1,662 @@ +package mux + +import ( + "context" + "errors" + "fmt" + "github.com/tendermint/tendermint/crypto/ed25519" + "github.com/tendermint/tendermint/internal/p2p/conn" + "github.com/tendermint/tendermint/libs/utils" + "github.com/tendermint/tendermint/libs/utils/require" + "github.com/tendermint/tendermint/libs/utils/scope" + "github.com/tendermint/tendermint/libs/utils/tcp" + "sync/atomic" + "testing" +) + +// Ignores cancellation and connection errors. +// Wrap Mux.Run() calls in this - in tests we manage both ends of the connection, +// so there is a race condition between disconnects and cancellation, when test +// is shutting down. +func ignoreDisconnect(err error) error { + if utils.ErrorAs[errConn](err).IsPresent() { + return nil + } + return utils.IgnoreCancel(err) +} + +func testConn(t *testing.T) (*conn.SecretConnection, *conn.SecretConnection) { + c1, c2 := tcp.TestPipe() + var scs [2]*conn.SecretConnection + utils.OrPanic(scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error { + for i, c := range utils.Slice(c1, c2) { + t.Cleanup(func() { c.Close() }) + s.Spawn(func() error { + scs[i] = utils.OrPanic1(conn.MakeSecretConnection(ctx, c, ed25519.GenerateSecretKey())) + return nil + }) + } + return nil + })) + return scs[0], scs[1] +} + +// Arbitrary nontrivial transformation to make sure that +// server actually does something. +func transform(msg []byte) []byte { + out := make([]byte, len(msg)) + copy(out, msg) + for i := range out { + out[i] = out[i]*9 + 5 + } + return out +} + +func runServer(ctx context.Context, rng utils.Rng, mux *Mux) error { + return scope.Run(ctx, func(ctx context.Context, s scope.Scope) error { + for kind := range mux.cfg.Kinds { + _ = rng.Split() + s.Spawn(func() error { + var count atomic.Int64 + for { + // Accept a stream. + maxMsgSize := uint64(rng.Intn(10000) + 100) + window := uint64(rng.Intn(10) + 1) + stream, err := mux.Accept(ctx, kind, maxMsgSize, window) + if err != nil { + return utils.IgnoreCancel(err) + } + // Assert that concurrent stream limit is respected. + if got, wantMax := uint64(count.Add(1)), mux.cfg.Kinds[kind].MaxAccepts; got > wantMax { + return fmt.Errorf("got %v concurrent accepts, want <= %v", got, wantMax) + } + s.Spawn(func() error { + defer stream.Close() + defer count.Add(-1) + // Handle the stream. + for { + msg, err := stream.Recv(ctx, true) + if err != nil { + if errors.Is(err, errRemoteClosed) || errors.Is(err, context.Canceled) { + return nil + } + return fmt.Errorf("stream.Recv(): %w", err) + } + if err := stream.Send(ctx, transform(msg)); err != nil { + if errors.Is(err, errRemoteClosed) { + return nil + } + return fmt.Errorf("stream.Send(): %w", err) + } + } + }) + } + }) + } + return nil + }) +} + +type clientSet struct { + mux *Mux + kind StreamKind + count atomic.Int64 +} + +func (cs *clientSet) StreamingClient() *client { + return &client{clientSet: cs, streaming: true} +} + +func (cs *clientSet) SynchronousClient() *client { + return &client{clientSet: cs, streaming: false} +} + +func (cs *clientSet) BlockedClient() *client { + return &client{clientSet: cs, streaming: true, blocked: true} +} + +type client struct { + *clientSet + streaming bool + blocked bool +} + +func (c *client) Run(ctx context.Context, rng utils.Rng) error { + // Connect to server. + maxMsgSize := uint64(rng.Intn(10000) + 100) + window := uint64(rng.Intn(10) + 1) + stream, err := c.mux.Connect(ctx, c.kind, maxMsgSize, window) + if err != nil { + return fmt.Errorf("mux.Connect(): %w", err) + } + // Assert that concurrent stream limit is respected. + if got, wantMax := uint64(c.count.Add(1)), c.mux.cfg.Kinds[c.kind].MaxConnects; got > wantMax { + return fmt.Errorf("got %v concurrent connects, want <= %v", got, wantMax) + } + defer stream.Close() + defer c.count.Add(-1) + + // Prepare requests. + maxReqSize := int(min(maxMsgSize, stream.maxSendMsgSize())) + var reqs [][]byte + for range rng.Intn(10) { + size := rng.Intn(maxReqSize) + reqs = append(reqs, utils.GenBytes(rng, size)) + } + return scope.Run(ctx, func(ctx context.Context, s scope.Scope) error { + if c.streaming { + s.Spawn(func() error { + // Stream the requests. + for _, req := range reqs { + if err := stream.Send(ctx, req); err != nil { + return fmt.Errorf("stream.Send(): %w", err) + } + } + return nil + }) + if c.blocked { + <-ctx.Done() + return nil + } + // Verify the responses. + for _, req := range reqs { + resp, err := stream.Recv(ctx, true) + if err != nil { + return fmt.Errorf("stream.Recv(): %w", err) + } + if err := utils.TestDiff(transform(req), resp); err != nil { + return err + } + } + } else { + for _, req := range reqs { + if err := stream.Send(ctx, req); err != nil { + return fmt.Errorf("stream.Send(): %w", err) + } + resp, err := stream.Recv(ctx, true) + if err != nil { + return fmt.Errorf("stream.Recv(): %w", err) + } + if err := utils.TestDiff(transform(req), resp); err != nil { + return err + } + } + } + return nil + }) +} + +func makeMux(rng utils.Rng, kindCount int) *Mux { + kinds := map[StreamKind]*StreamKindConfig{} + for kind := range StreamKind(kindCount) { + kinds[kind] = &StreamKindConfig{ + // > 1, so that blocked client doesn't hog all the streams + MaxAccepts: uint64(rng.Intn(5) + 2), + MaxConnects: uint64(rng.Intn(5) + 2), + } + } + return NewMux(&Config{FrameSize: 10 * 1024, Kinds: kinds}) +} + +func runClients(ctx context.Context, rng utils.Rng, mux *Mux) error { + return scope.Run(ctx, func(ctx context.Context, s scope.Scope) error { + for kind := range mux.cfg.Kinds { + cs := &clientSet{mux: mux, kind: kind} + // Client which is blocked and doesn't receive responses. + clientRng := rng.Split() + s.SpawnBgNamed("blocked", func() error { + return utils.IgnoreCancel(cs.BlockedClient().Run(ctx, clientRng)) + }) + // Clients which send requests sequentially. + for range 5 { + clientRng := rng.Split() + s.SpawnNamed("sync", func() error { return cs.SynchronousClient().Run(ctx, clientRng) }) + } + // Clients which send requests concurrently. + for range 20 { + clientRng := rng.Split() + s.Spawn(func() error { return cs.StreamingClient().Run(ctx, clientRng) }) + } + } + return nil + }) +} + +// Happy path test. +// * Uses SecretConnection for transport. +// * Tests both streaming and sequential stream communication. +// * Checks if concurrent streams limits are respected. +// * Checks that there is no head of line blocking. +func TestHappyPath(t *testing.T) { + rng := utils.TestRng() + kindCount := 5 + c1, c2 := testConn(t) + err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error { + for _, c := range utils.Slice(c1, c2) { + mux := makeMux(rng, kindCount) + serverRng := rng.Split() + s.SpawnBgNamed("mux", func() error { return ignoreDisconnect(mux.Run(ctx, c)) }) + s.SpawnBgNamed("server", func() error { return utils.IgnoreCancel(runServer(ctx, serverRng, mux)) }) + clientRng := rng.Split() + s.SpawnNamed("client", func() error { return runClients(ctx, clientRng, mux) }) + } + return nil + }) + if err != nil { + t.Fatal(err) + } +} + +func genStreamKind(rng utils.Rng) StreamKind { + return StreamKind(rng.Uint64()) +} + +func genStreamKindConfig(rng utils.Rng) *StreamKindConfig { + return &StreamKindConfig{ + MaxAccepts: rng.Uint64(), + MaxConnects: rng.Uint64(), + } +} + +func genHandshake(rng utils.Rng) *handshake { + return &handshake{ + Kinds: utils.GenMap(rng, genStreamKind, genStreamKindConfig), + } +} + +func TestConv(t *testing.T) { + rng := utils.TestRng() + require.NoError(t, handshakeConv.Test(genHandshake(rng))) +} + +func makeConfig(kinds ...StreamKind) *Config { + cfg := &Config{ + FrameSize: 1024, + Kinds: map[StreamKind]*StreamKindConfig{}, + } + for _, kind := range kinds { + cfg.Kinds[kind] = &StreamKindConfig{MaxAccepts: 1, MaxConnects: 1} + } + return cfg +} + +func TestStreamKindsMismatch(t *testing.T) { + c1, c2 := testConn(t) + err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error { + var k0, k1, k2 StreamKind = 0, 1, 2 + muxs := utils.Slice( + NewMux(makeConfig(k0, k1)), + NewMux(makeConfig(k1, k2)), + ) + for i, c := range utils.Slice(c1, c2) { + s.SpawnBg(func() error { return ignoreDisconnect(muxs[i].Run(ctx, c)) }) + } + // Connecting/accepting of unconfigured kind should error. + if _, err := muxs[0].Connect(ctx, k2, 10, 10); !errors.Is(err, errUnknownKind) { + return fmt.Errorf("got %v, want %v", err, errUnknownKind) + } + if _, err := muxs[1].Accept(ctx, k0, 10, 10); !errors.Is(err, errUnknownKind) { + return fmt.Errorf("got %v, want %v", err, errUnknownKind) + } + + // Connecting/accepting, when other end does not support given kind, should block. + s.SpawnBg(func() error { + if _, err := muxs[0].Connect(ctx, k0, 10, 10); !errors.Is(err, context.Canceled) { + return fmt.Errorf("got %v, want canceled", err) + } + return nil + }) + s.SpawnBg(func() error { + if _, err := muxs[1].Accept(ctx, k2, 10, 10); !errors.Is(err, context.Canceled) { + return fmt.Errorf("got %v, want canceled", err) + } + return nil + }) + + // Stream of the shared kind should work. + msg := []byte("hello") + s.Spawn(func() error { + stream, err := muxs[0].Connect(ctx, k1, 0, 0) + if err != nil { + return fmt.Errorf("mux1.Connect(): %w", err) + } + if err := stream.Send(ctx, msg); err != nil { + return fmt.Errorf("stream.Send(): %w", err) + } + return nil + }) + s.Spawn(func() error { + stream, err := muxs[1].Accept(ctx, k1, uint64(len(msg)), 1) + if err != nil { + return fmt.Errorf("mux2.Accept(): %w", err) + } + got, err := stream.Recv(ctx, false) + if err != nil { + return fmt.Errorf("stream.Recv(): %w", err) + } + return utils.TestDiff(msg, got) + }) + return nil + }) + if err != nil { + t.Fatal(err) + } +} + +// Test checking that closing a stream does not drop messages on the floor: +// sending and receiving still works as long as messages fit into a window. +func TestClosedStream(t *testing.T) { + c1, c2 := testConn(t) + err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error { + kind := StreamKind(0) + window := uint64(4) + msg := []byte("hello") + muxs := utils.Slice( + NewMux(makeConfig(kind)), + NewMux(makeConfig(kind)), + ) + for i, c := range utils.Slice(c1, c2) { + s.SpawnBg(func() error { return ignoreDisconnect(muxs[i].Run(ctx, c)) }) + } + s.Spawn(func() error { + // Just accept a single stream and close immediately. + stream, err := muxs[0].Accept(ctx, kind, uint64(len(msg)), window) + if err != nil { + return fmt.Errorf("mux1.Accept(): %w", err) + } + stream.Close() + // Receive the messages anyway. + // Window will not be updated (freeBuf flag is ignored). + for range window { + if _, err := stream.Recv(ctx, true); err != nil { + return fmt.Errorf("stream.Recv(): %w", err) + } + } + // Try to receive with empty window - should block until remote closes stream. + if _, err := stream.Recv(ctx, true); !errors.Is(err, errRemoteClosed) { + return fmt.Errorf("stream.Recv(): %v, want %v", err, errRemoteClosed) + } + return nil + }) + // Open a stream. + stream, err := muxs[1].Connect(ctx, kind, 0, 0) + if err != nil { + return fmt.Errorf("mux2.Connect(): %w", err) + } + defer stream.Close() + // Fill the available window. + for range window { + if err := stream.Send(ctx, msg); err != nil { + return fmt.Errorf("stream.Send(): %w", err) + } + } + // Try to send after window is full. + if err := stream.Send(ctx, msg); !errors.Is(err, errRemoteClosed) { + return fmt.Errorf("stream.Send(): %v, want %v", err, errRemoteClosed) + } + // Try to send after local close. + stream.Close() + if err := stream.Send(ctx, msg); !errors.Is(err, errClosed) { + return fmt.Errorf("stream.Send(): %v, want %v", err, errRemoteClosed) + } + return nil + }) + if err != nil { + t.Fatal(err) + } +} + +func TestProtocol_TooLargeMsg(t *testing.T) { + c1, c2 := testConn(t) + err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error { + kind := StreamKind(0) + maxMsgSize := uint64(10) + + // Bad mux. + badMux := NewMux(makeConfig(kind)) + s.SpawnBg(func() error { return ignoreDisconnect(badMux.Run(ctx, c1)) }) + s.SpawnBg(func() error { + t.Log("Connect stream.") + stream, err := badMux.Connect(ctx, kind, 0, 0) + if err != nil { + return fmt.Errorf("mux2.Connect(): %w", err) + } + defer stream.Close() + rng := utils.TestRng() + badMsg := utils.GenBytes(rng, int(maxMsgSize)+1) + t.Log("Malform stream state to allow too large messages.") + for inner := range stream.state.inner.Lock() { + inner.send.maxMsgSize = uint64(len(badMsg)) + } + t.Log("Send too large message") + return utils.IgnoreCancel(stream.Send(ctx, badMsg)) + }) + + mux := NewMux(makeConfig(kind)) + s.SpawnBg(func() error { + t.Log("Accept stream") + stream, err := mux.Accept(ctx, kind, maxMsgSize, 10) + if err != nil { + return fmt.Errorf("mux.Accept(): %w", err) + } + stream.Close() + return utils.IgnoreCancel(err) + }) + err := mux.Run(ctx, c2) + t.Logf("mux terminated: %v", err) + if !errors.Is(err, errTooLargeMsg) { + return fmt.Errorf("err = %v, want %v", err, errTooLargeMsg) + } + return nil + }) + if err != nil { + t.Fatal(err) + } +} + +func TestProtocol_TooManyMsgs(t *testing.T) { + c1, c2 := testConn(t) + err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error { + kind := StreamKind(0) + maxMsgSize := uint64(10) + window := uint64(3) + + // Bad mux. + badMux := NewMux(makeConfig(kind)) + s.SpawnBg(func() error { return ignoreDisconnect(badMux.Run(ctx, c1)) }) + s.SpawnBg(func() error { + t.Log("Connect stream.") + stream, err := badMux.Connect(ctx, kind, 0, 0) + if err != nil { + return fmt.Errorf("mux2.Connect(): %w", err) + } + defer stream.Close() + t.Log("Malform stream state to allow too many messages.") + for inner := range stream.state.inner.Lock() { + inner.send.end = window + 1 + } + t.Log("Send too many messages") + rng := utils.TestRng() + for range window { + if err := stream.Send(ctx, utils.GenBytes(rng, int(maxMsgSize))); err != nil { + return err + } + } + return utils.IgnoreCancel(stream.Send(ctx, utils.GenBytes(rng, int(maxMsgSize)))) + }) + + mux := NewMux(makeConfig(kind)) + s.SpawnBg(func() error { + t.Log("Accept stream") + stream, err := mux.Accept(ctx, kind, maxMsgSize, window) + if err != nil { + return fmt.Errorf("mux.Accept(): %w", err) + } + stream.Close() + return nil + }) + err := mux.Run(ctx, c2) + t.Logf("mux terminated: %v", err) + if !errors.Is(err, errTooManyMsgs) { + return fmt.Errorf("err = %v, want %v", err, errTooManyMsgs) + } + return nil + }) + if err != nil { + t.Fatal(err) + } +} + +func TestProtocol_FrameAfterClose(t *testing.T) { + c1, c2 := testConn(t) + err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error { + kind := StreamKind(0) + + // Bad mux. + badMux := NewMux(makeConfig(kind)) + maxMsgSize := uint64(10) + s.SpawnBg(func() error { return ignoreDisconnect(badMux.Run(ctx, c1)) }) + s.SpawnBg(func() error { + t.Log("Connect stream.") + stream, err := badMux.Connect(ctx, kind, 0, 0) + if err != nil { + return fmt.Errorf("mux2.Connect(): %w", err) + } + defer stream.Close() + t.Log("Trigger sending CLOSED frame without actually closing.") + for inner := range stream.state.inner.Lock() { + stream.close(inner) + inner.closed.local = false + } + t.Log("Send 2 messages.") + // We need to send 2 messages, because CLOSE might get delivered together with the 1st message + // in which case peer would not realize that they were supposed to be sent out of order. + rng := utils.TestRng() + for range 2 { + if err := stream.Send(ctx, utils.GenBytes(rng, int(maxMsgSize))); err != nil { + return utils.IgnoreCancel(err) + } + } + return nil + + }) + + mux := NewMux(makeConfig(kind)) + s.SpawnBg(func() error { + t.Log("Accept stream") + stream, err := mux.Accept(ctx, kind, maxMsgSize, 10) + if err != nil { + return fmt.Errorf("mux.Accept(): %w", err) + } + defer stream.Close() + t.Log("Keep stream open") + <-ctx.Done() + return nil + }) + err := mux.Run(ctx, c2) + t.Logf("mux terminated: %v", err) + if !errors.Is(err, errFrameAfterClose) { + return fmt.Errorf("err = %v, want %v", err, errFrameAfterClose) + } + return nil + }) + if err != nil { + t.Fatal(err) + } +} + +func TestProtocol_TooManyAccepts(t *testing.T) { + c1, c2 := testConn(t) + err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error { + kind := StreamKind(0) + + // Bad mux. + badMux := NewMux(makeConfig(kind)) + s.SpawnBg(func() error { return ignoreDisconnect(badMux.Run(ctx, c1)) }) + s.SpawnBg(func() error { + // Artificially connect too many streams. + for queue, ctrl := range badMux.queue.Lock() { + for i := range badMux.cfg.Kinds[kind].MaxConnects + 1 { + f := queue.Get(streamID(2 * i)) + f.Header.Kind = utils.Alloc(uint64(kind)) + } + ctrl.Updated() + } + return nil + }) + + mux := NewMux(makeConfig(kind)) + err := mux.Run(ctx, c2) + t.Logf("mux terminated: %v", err) + if !errors.Is(err, errTooManyAccepts) { + return fmt.Errorf("err = %v, want %v", err, errTooManyAccepts) + } + return nil + }) + if err != nil { + t.Fatal(err) + } +} + +func TestProtocol_UnknownStream(t *testing.T) { + c1, c2 := testConn(t) + err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error { + kind := StreamKind(0) + + // Bad mux. + badMux := NewMux(makeConfig(kind)) + s.SpawnBg(func() error { return ignoreDisconnect(badMux.Run(ctx, c1)) }) + s.SpawnBg(func() error { + // Artificially accept a stream without connect. + for queue, ctrl := range badMux.queue.Lock() { + f := queue.Get(streamID(1)) + f.Header.Kind = utils.Alloc(uint64(kind)) + ctrl.Updated() + } + return nil + }) + + mux := NewMux(makeConfig(kind)) + err := mux.Run(ctx, c2) + t.Logf("mux terminated: %v", err) + if !errors.Is(err, errUnknownStream) { + return fmt.Errorf("err = %v, want %v", err, errUnknownStream) + } + return nil + }) + if err != nil { + t.Fatal(err) + } +} + +func TestProtocol_UnknownKind(t *testing.T) { + c1, c2 := testConn(t) + err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error { + kind := StreamKind(0) + badKind := StreamKind(1) + + // Bad mux. + badMux := NewMux(makeConfig(kind)) + s.SpawnBg(func() error { return ignoreDisconnect(badMux.Run(ctx, c1)) }) + s.SpawnBg(func() error { + // Artificially connect a stream of unknown kind. + for queue, ctrl := range badMux.queue.Lock() { + f := queue.Get(streamID(0)) + f.Header.Kind = utils.Alloc(uint64(badKind)) + ctrl.Updated() + } + return nil + }) + mux := NewMux(makeConfig(kind)) + err := mux.Run(ctx, c2) + t.Logf("mux terminated: %v", err) + // Unknown kinds are treated as if they has 0 allowed accepts. + if !errors.Is(err, errTooManyAccepts) { + return fmt.Errorf("err = %v, want %v", err, errTooManyAccepts) + } + return nil + }) + if err != nil { + t.Fatal(err) + } +} diff --git a/sei-tendermint/internal/mux/pb/mux.pb.go b/sei-tendermint/internal/mux/pb/mux.pb.go new file mode 100644 index 0000000000..5adab78b41 --- /dev/null +++ b/sei-tendermint/internal/mux/pb/mux.pb.go @@ -0,0 +1,309 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.10 +// protoc (unknown) +// source: mux/mux.proto + +package pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type StreamKindConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + Kind uint64 `protobuf:"varint,1,opt,name=kind,proto3" json:"kind,omitempty"` + MaxConnects uint64 `protobuf:"varint,2,opt,name=max_connects,json=maxConnects,proto3" json:"max_connects,omitempty"` + MaxAccepts uint64 `protobuf:"varint,3,opt,name=max_accepts,json=maxAccepts,proto3" json:"max_accepts,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StreamKindConfig) Reset() { + *x = StreamKindConfig{} + mi := &file_mux_mux_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StreamKindConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamKindConfig) ProtoMessage() {} + +func (x *StreamKindConfig) ProtoReflect() protoreflect.Message { + mi := &file_mux_mux_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamKindConfig.ProtoReflect.Descriptor instead. +func (*StreamKindConfig) Descriptor() ([]byte, []int) { + return file_mux_mux_proto_rawDescGZIP(), []int{0} +} + +func (x *StreamKindConfig) GetKind() uint64 { + if x != nil { + return x.Kind + } + return 0 +} + +func (x *StreamKindConfig) GetMaxConnects() uint64 { + if x != nil { + return x.MaxConnects + } + return 0 +} + +func (x *StreamKindConfig) GetMaxAccepts() uint64 { + if x != nil { + return x.MaxAccepts + } + return 0 +} + +type Handshake struct { + state protoimpl.MessageState `protogen:"open.v1"` + Kinds []*StreamKindConfig `protobuf:"bytes,1,rep,name=kinds,proto3" json:"kinds,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Handshake) Reset() { + *x = Handshake{} + mi := &file_mux_mux_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Handshake) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Handshake) ProtoMessage() {} + +func (x *Handshake) ProtoReflect() protoreflect.Message { + mi := &file_mux_mux_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Handshake.ProtoReflect.Descriptor instead. +func (*Handshake) Descriptor() ([]byte, []int) { + return file_mux_mux_proto_rawDescGZIP(), []int{1} +} + +func (x *Handshake) GetKinds() []*StreamKindConfig { + if x != nil { + return x.Kinds + } + return nil +} + +// Flat small message, representing a unit of multiplexer information. +// The frames for the same id can be sent merged, +// since they have disjoint field sets. +// +// OPEN = id,kind,max_msg_size +// RESIZE = id,window_end +// MSG = id,payload_size,[msg_end] +// CLOSE = id,close +type Header struct { + state protoimpl.MessageState `protogen:"open.v1"` + // TOTAL: 59B + Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` // 11B + Kind *uint64 `protobuf:"varint,2,opt,name=kind,proto3,oneof" json:"kind,omitempty"` //11B + MaxMsgSize *uint64 `protobuf:"varint,3,opt,name=max_msg_size,json=maxMsgSize,proto3,oneof" json:"max_msg_size,omitempty"` // 11B + WindowEnd *uint64 `protobuf:"varint,4,opt,name=window_end,json=windowEnd,proto3,oneof" json:"window_end,omitempty"` // 11B + PayloadSize *uint64 `protobuf:"varint,5,opt,name=payload_size,json=payloadSize,proto3,oneof" json:"payload_size,omitempty"` // 11B + MsgEnd *bool `protobuf:"varint,6,opt,name=msg_end,json=msgEnd,proto3,oneof" json:"msg_end,omitempty"` // 2B + Close *bool `protobuf:"varint,7,opt,name=close,proto3,oneof" json:"close,omitempty"` // 2B + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Header) Reset() { + *x = Header{} + mi := &file_mux_mux_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Header) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Header) ProtoMessage() {} + +func (x *Header) ProtoReflect() protoreflect.Message { + mi := &file_mux_mux_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Header.ProtoReflect.Descriptor instead. +func (*Header) Descriptor() ([]byte, []int) { + return file_mux_mux_proto_rawDescGZIP(), []int{2} +} + +func (x *Header) GetId() uint64 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *Header) GetKind() uint64 { + if x != nil && x.Kind != nil { + return *x.Kind + } + return 0 +} + +func (x *Header) GetMaxMsgSize() uint64 { + if x != nil && x.MaxMsgSize != nil { + return *x.MaxMsgSize + } + return 0 +} + +func (x *Header) GetWindowEnd() uint64 { + if x != nil && x.WindowEnd != nil { + return *x.WindowEnd + } + return 0 +} + +func (x *Header) GetPayloadSize() uint64 { + if x != nil && x.PayloadSize != nil { + return *x.PayloadSize + } + return 0 +} + +func (x *Header) GetMsgEnd() bool { + if x != nil && x.MsgEnd != nil { + return *x.MsgEnd + } + return false +} + +func (x *Header) GetClose() bool { + if x != nil && x.Close != nil { + return *x.Close + } + return false +} + +var File_mux_mux_proto protoreflect.FileDescriptor + +const file_mux_mux_proto_rawDesc = "" + + "\n" + + "\rmux/mux.proto\x12\x03mux\"j\n" + + "\x10StreamKindConfig\x12\x12\n" + + "\x04kind\x18\x01 \x01(\x04R\x04kind\x12!\n" + + "\fmax_connects\x18\x02 \x01(\x04R\vmaxConnects\x12\x1f\n" + + "\vmax_accepts\x18\x03 \x01(\x04R\n" + + "maxAccepts\"8\n" + + "\tHandshake\x12+\n" + + "\x05kinds\x18\x01 \x03(\v2\x15.mux.StreamKindConfigR\x05kinds\"\xad\x02\n" + + "\x06Header\x12\x0e\n" + + "\x02id\x18\x01 \x01(\x04R\x02id\x12\x17\n" + + "\x04kind\x18\x02 \x01(\x04H\x00R\x04kind\x88\x01\x01\x12%\n" + + "\fmax_msg_size\x18\x03 \x01(\x04H\x01R\n" + + "maxMsgSize\x88\x01\x01\x12\"\n" + + "\n" + + "window_end\x18\x04 \x01(\x04H\x02R\twindowEnd\x88\x01\x01\x12&\n" + + "\fpayload_size\x18\x05 \x01(\x04H\x03R\vpayloadSize\x88\x01\x01\x12\x1c\n" + + "\amsg_end\x18\x06 \x01(\bH\x04R\x06msgEnd\x88\x01\x01\x12\x19\n" + + "\x05close\x18\a \x01(\bH\x05R\x05close\x88\x01\x01B\a\n" + + "\x05_kindB\x0f\n" + + "\r_max_msg_sizeB\r\n" + + "\v_window_endB\x0f\n" + + "\r_payload_sizeB\n" + + "\n" + + "\b_msg_endB\b\n" + + "\x06_closeB2Z0github.com/tendermint/tendermint/internal/mux/pbb\x06proto3" + +var ( + file_mux_mux_proto_rawDescOnce sync.Once + file_mux_mux_proto_rawDescData []byte +) + +func file_mux_mux_proto_rawDescGZIP() []byte { + file_mux_mux_proto_rawDescOnce.Do(func() { + file_mux_mux_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_mux_mux_proto_rawDesc), len(file_mux_mux_proto_rawDesc))) + }) + return file_mux_mux_proto_rawDescData +} + +var file_mux_mux_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_mux_mux_proto_goTypes = []any{ + (*StreamKindConfig)(nil), // 0: mux.StreamKindConfig + (*Handshake)(nil), // 1: mux.Handshake + (*Header)(nil), // 2: mux.Header +} +var file_mux_mux_proto_depIdxs = []int32{ + 0, // 0: mux.Handshake.kinds:type_name -> mux.StreamKindConfig + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_mux_mux_proto_init() } +func file_mux_mux_proto_init() { + if File_mux_mux_proto != nil { + return + } + file_mux_mux_proto_msgTypes[2].OneofWrappers = []any{} + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_mux_mux_proto_rawDesc), len(file_mux_mux_proto_rawDesc)), + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_mux_mux_proto_goTypes, + DependencyIndexes: file_mux_mux_proto_depIdxs, + MessageInfos: file_mux_mux_proto_msgTypes, + }.Build() + File_mux_mux_proto = out.File + file_mux_mux_proto_goTypes = nil + file_mux_mux_proto_depIdxs = nil +} diff --git a/sei-tendermint/internal/mux/stream.go b/sei-tendermint/internal/mux/stream.go new file mode 100644 index 0000000000..dd8c3d74e9 --- /dev/null +++ b/sei-tendermint/internal/mux/stream.go @@ -0,0 +1,155 @@ +package mux + +import ( + "context" + "errors" + "fmt" + "github.com/tendermint/tendermint/libs/utils" +) + +var errRemoteClosed = errors.New("remote closed") +var errClosed = errors.New("closed") + +type Stream struct { + state *streamState + queue *utils.Watch[queue] +} + +func (s *Stream) maxSendMsgSize() uint64 { + for inner := range s.state.inner.Lock() { + return inner.send.maxMsgSize + } + panic("unreachable") +} + +// open() opens the recv end of the Stream. Permits the peer to send "window" messages, up to maxMsgSize bytes each. +// Up to maxMsgSize*window bytes will be cached locally during the life of this Stream. +// Whenever you call Recv, you specify whether window should grow (i.e. whether to report that the messages +// have been consumed and he can send more). +func (s *Stream) open(ctx context.Context, maxMsgSize uint64, window uint64) error { + for inner, ctrl := range s.state.inner.Lock() { + if inner.recv.opened { + return fmt.Errorf("already opened") + } + inner.recv.opened = true + inner.recv.maxMsgSize = maxMsgSize + inner.recv.end = window + inner.recv.msgs = make([][]byte, window) + for queue, ctrl := range s.queue.Lock() { + if len(queue) == 0 { + ctrl.Updated() + } + f := queue.Get(s.state.id) + f.Header.Kind = utils.Alloc(uint64(s.state.kind)) + f.Header.MaxMsgSize = utils.Alloc(maxMsgSize) + f.Header.WindowEnd = utils.Alloc(window) + } + if err := ctrl.WaitUntil(ctx, func() bool { return inner.send.remoteOpened }); err != nil { + s.close(inner) + ctrl.Updated() + return err + } + } + return nil +} + +// Send sends a message to peer. Blocks until: +// * peer has permitted to send them a message (i.e. there is space in their local buffer) +// * the previous message has been sent by the multiplexer (at most 1 message per Stream is cached at all times) +// Returns an error if Close() was called already. +// Returns an error if the message is too large (exceeds maxMsgSize declared by the peer). +func (s *Stream) Send(ctx context.Context, msg []byte) error { + for inner, ctrl := range s.state.inner.Lock() { + // Wait until the local buffer is empty && remote buffer has capacity. + if err := ctrl.WaitUntil(ctx, func() bool { + // Will we never be able to send... + never := inner.closed.local || (inner.closed.remote && inner.send.begin == inner.send.end) + // ...or we can send now. + return never || (inner.send.bufBegin == inner.send.begin && inner.send.begin < inner.send.end) + }); err != nil { + return err + } + if inner.closed.local { + return errClosed + } + if inner.send.begin == inner.send.end { + return errRemoteClosed + } + // We check msg size AFTER waiting because maxMsgSize could be set AFTER we wait. + if uint64(len(msg)) > inner.send.maxMsgSize { + return errTooLargeMsg + } + inner.send.begin += 1 + // Push msg to the queue. + for queue, ctrl := range s.queue.Lock() { + if len(queue) == 0 { + ctrl.Updated() + } + f := queue.Get(s.state.id) + f.Payload = msg + f.Header.PayloadSize = utils.Alloc(uint64(len(msg))) + f.Header.MsgEnd = utils.Alloc(true) + } + } + return nil +} + +func (s *Stream) close(inner *streamStateInner) { + if inner.closed.local { + return + } + inner.closed.local = true + for queue, ctrl := range s.queue.Lock() { + if len(queue) == 0 { + ctrl.Updated() + } + f := queue.Get(s.state.id) + f.Header.Close = utils.Alloc(true) + } +} + +// Close sends a final CLOSE flag to the peer. +// All subsequent Send calls will fail. +// Recv calls will no longer be able to free buffer space. +// NOTE: we may consider separating Close into SendClose and RecvClose, +// to make send and recv parts of the stream entirely independent. +func (s *Stream) Close() { + for inner, ctrl := range s.state.inner.Lock() { + s.close(inner) + ctrl.Updated() + } +} + +// Recv receives a message from peer. Blocks until message is available OR +// until peer has closed their end of the Stream. +// If freeBuffer is set, it permits the peer to send more messages (since local buffer was freed). +func (s *Stream) Recv(ctx context.Context, freeBuffer bool) ([]byte, error) { + for inner, ctrl := range s.state.inner.Lock() { + if err := ctrl.WaitUntil(ctx, func() bool { + // A message is available or peer closed the Stream. + return inner.recv.begin < inner.recv.used || inner.closed.remote + }); err != nil { + return nil, err + } + if inner.recv.begin == inner.recv.used { + return nil, errRemoteClosed + } + i := inner.recv.begin % uint64(len(inner.recv.msgs)) + msg := inner.recv.msgs[i] + inner.recv.msgs[i] = nil + inner.recv.begin += 1 + // Free buffer if requested AND the stream was not closed locally. + if freeBuffer && !inner.closed.local { + inner.recv.end = inner.recv.begin + uint64(len(inner.recv.msgs)) + for queue, ctrl := range s.queue.Lock() { + if len(queue) == 0 { + ctrl.Updated() + } + f := queue.Get(s.state.id) + f.Header.WindowEnd = utils.Alloc(inner.recv.end) + } + } + return msg, nil + } + panic("unreachable") +} diff --git a/sei-tendermint/internal/mux/stream_state.go b/sei-tendermint/internal/mux/stream_state.go new file mode 100644 index 0000000000..0187cc4a7d --- /dev/null +++ b/sei-tendermint/internal/mux/stream_state.go @@ -0,0 +1,127 @@ +package mux + +import ( + "fmt" + "github.com/tendermint/tendermint/libs/utils" +) + +type streamID uint64 +type StreamKind uint64 + +func (id streamID) isConnect() bool { return id&1 == 0 } + +// The least significant bit of streamID decides whether the stream is +// outbound (connect) or inbound (accept). When receiving streamID from peer +// we need to convert it to local streamID. +func streamIDFromRemote(x uint64) streamID { return streamID(x ^ 1) } + +type closeState struct { + local bool + remote bool +} + +type sendState struct { + remoteOpened bool + maxMsgSize uint64 + bufBegin uint64 + begin uint64 + end uint64 +} + +type recvState struct { + opened bool + maxMsgSize uint64 + begin uint64 + used uint64 + end uint64 + msgs [][]byte +} + +type streamStateInner struct { + send sendState + recv recvState + closed closeState +} + +type streamState struct { + id streamID + kind StreamKind + inner utils.Watch[*streamStateInner] +} + +func newStreamState(id streamID, kind StreamKind) *streamState { + return &streamState{ + id: id, + kind: kind, + inner: utils.NewWatch(&streamStateInner{}), + } +} + +func (s *streamState) RemoteOpen(maxMsgSize uint64) error { + for inner, ctrl := range s.inner.Lock() { + if inner.send.remoteOpened { + return fmt.Errorf("already opened") + } + // Do not allow remote open before we connect. + if s.id.isConnect() && !inner.recv.opened { + return errUnknownStream + } + inner.send.remoteOpened = true + inner.send.maxMsgSize = maxMsgSize + ctrl.Updated() + } + return nil +} + +func (s *streamState) RemoteClose() error { + fmt.Printf("RemoteClose\n") + for inner, ctrl := range s.inner.Lock() { + if inner.closed.remote { + return fmt.Errorf("already closed") + } + inner.closed.remote = true + ctrl.Updated() + } + return nil +} + +func (s *streamState) RemoteWindowEnd(windowEnd uint64) { + for inner, ctrl := range s.inner.Lock() { + if inner.send.end < windowEnd { + inner.send.end = windowEnd + ctrl.Updated() + } + } +} + +// RemotePayloadSize checks if there is place for the payload. +func (s *streamState) RemotePayloadSize(payloadSize uint64) error { + for inner := range s.inner.Lock() { + if inner.recv.used == inner.recv.end { + return errTooManyMsgs + } + i := int(inner.recv.used) % len(inner.recv.msgs) + if inner.recv.maxMsgSize-uint64(len(inner.recv.msgs[i])) < payloadSize { + return errTooLargeMsg + } + } + return nil +} + +func (s *streamState) RemotePayload(payload []byte) { + for inner := range s.inner.Lock() { + i := int(inner.recv.used) % len(inner.recv.msgs) + inner.recv.msgs[i] = append(inner.recv.msgs[i], payload...) + } +} + +func (s *streamState) RemoteMsgEnd() error { + for inner, ctrl := range s.inner.Lock() { + if inner.recv.used == inner.recv.end { + return fmt.Errorf("buffer full") + } + inner.recv.used += 1 + ctrl.Updated() + } + return nil +} diff --git a/sei-tendermint/internal/protoutils/conv.go b/sei-tendermint/internal/protoutils/conv.go new file mode 100644 index 0000000000..3bbe83e8b1 --- /dev/null +++ b/sei-tendermint/internal/protoutils/conv.go @@ -0,0 +1,63 @@ +package protoutils + +import ( + "errors" + "fmt" + "github.com/tendermint/tendermint/libs/utils" +) + +// Conv is a pair of functions to encode and decode between a type and a Message. +type Conv[T any, P Message] struct { + Encode func(T) P + Decode func(P) (T, error) +} + +// EncodeSlice encodes a slice of T into a slice of P. +func (c Conv[T, P]) EncodeSlice(t []T) []P { + p := make([]P, len(t)) + for i := range t { + p[i] = c.Encode(t[i]) + } + return p +} + +// DecodeSlice decodes a slice of P into a slice of T. +func (c Conv[T, P]) DecodeSlice(p []P) ([]T, error) { + t := make([]T, len(p)) + var err error + for i := range p { + if t[i], err = c.Decode(p[i]); err != nil { + return nil, fmt.Errorf("[%d]: %w", i, err) + } + } + return t, nil +} + +// EncodeOpt encodes utils.Option[T], mapping utils.None to utils.Zero[P](). +func (c Conv[T, P]) EncodeOpt(mv utils.Option[T]) P { + v, ok := mv.Get() + if !ok { + return utils.Zero[P]() + } + return c.Encode(v) +} + +// DecodeReq decodes a ProtoMessage into a T, returning an error if p is nil. +func (c Conv[T, P]) DecodeReq(p P) (T, error) { + if p == utils.Zero[P]() { + return utils.Zero[T](), errors.New("missing") + } + return c.Decode(p) +} + +// DecodeOpt decodes a ProtoMessage into a T, returning nil if p is nil. +func (c Conv[T, P]) DecodeOpt(p P) (utils.Option[T], error) { + if p == utils.Zero[P]() { + return utils.None[T](), nil + } + t, err := c.DecodeReq(p) + if err != nil { + return utils.None[T](), err + } + return utils.Some(t), nil +} diff --git a/sei-tendermint/internal/protoutils/msg.go b/sei-tendermint/internal/protoutils/msg.go new file mode 100644 index 0000000000..b1a8e7eec3 --- /dev/null +++ b/sei-tendermint/internal/protoutils/msg.go @@ -0,0 +1,17 @@ +package protoutils + +import ( + "google.golang.org/protobuf/proto" +) + +// Message is comparable proto.Message. +type Message interface { + comparable + proto.Message +} + +// Clone clones a proto.Message object. +func Clone[T Message](item T) T { return proto.Clone(item).(T) } + +// Equal compares two Message objects. +func Equal[T Message](a, b T) bool { return proto.Equal(a, b) } diff --git a/sei-tendermint/internal/protoutils/testonly.go b/sei-tendermint/internal/protoutils/testonly.go new file mode 100644 index 0000000000..5069737832 --- /dev/null +++ b/sei-tendermint/internal/protoutils/testonly.go @@ -0,0 +1,24 @@ +package protoutils + +import ( + "fmt" + "github.com/tendermint/tendermint/libs/utils" + "google.golang.org/protobuf/proto" +) + +// Test tests whether reencoding a value is an identity operation. +func (c *Conv[T, P]) Test(want T) error { + p := c.Encode(want) + raw, err := proto.Marshal(p) + if err != nil { + return fmt.Errorf("Marshal(): %w", err) + } + if err := proto.Unmarshal(raw, p); err != nil { + return fmt.Errorf("Unmarshal(): %w", err) + } + got, err := c.Decode(p) + if err != nil { + return fmt.Errorf("Decode(Encode()): %w", err) + } + return utils.TestDiff(want, got) +} diff --git a/sei-tendermint/libs/utils/testonly.go b/sei-tendermint/libs/utils/testonly.go index aab2ac4f68..97953cdf3c 100644 --- a/sei-tendermint/libs/utils/testonly.go +++ b/sei-tendermint/libs/utils/testonly.go @@ -19,7 +19,7 @@ type ReadOnly struct{} // isReadOnly returns true if t embeds ReadOnly. func isReadOnly(t reflect.Type) bool { - want := reflect.TypeOf(ReadOnly{}) + want := reflect.TypeFor[ReadOnly]() if t.Kind() != reflect.Struct { return false } @@ -89,6 +89,13 @@ func (rng Rng) Int63() int64 { panic("unreachable") } +func (rng Rng) Uint64() uint64 { + for inner := range rng.inner.Lock() { + return inner.Uint64() + } + panic("unreachable") +} + func (rng Rng) Int() int { for inner := range rng.inner.Lock() { return inner.Int() @@ -116,7 +123,8 @@ func (rng Rng) Shuffle(n int, swap func(i, j int)) { } } -// TestRngSplit returns a new random number splitted from the given one. +// Split returns a new random number splitted from the given one. +// It should be used to provide deterministic rngs to independent goroutines. // This is a very primitive splitting, known to result with dependent randomness. // If that ever causes a problem, we can switch to SplitMix. func (rng Rng) Split() Rng { diff --git a/sei-tendermint/node/node_test.go b/sei-tendermint/node/node_test.go index 1699d726a3..98bd15f76d 100644 --- a/sei-tendermint/node/node_test.go +++ b/sei-tendermint/node/node_test.go @@ -58,8 +58,7 @@ func TestNodeStartStop(t *testing.T) { t.Cleanup(func() { n.Wait() }) - // TODO: Cannot guarantee no leaks, because go-cache leaks goroutines by design. - // t.Cleanup(leaktest.CheckTimeout(t, time.Second)) + t.Cleanup(leaktest.CheckTimeout(t, time.Second)) require.NoError(t, n.Start(ctx)) // wait for the node to produce a block