diff --git a/manager/state/raft/raft.go b/manager/state/raft/raft.go index afdf2ca4eb..28c7cfa47e 100644 --- a/manager/state/raft/raft.go +++ b/manager/state/raft/raft.go @@ -180,9 +180,12 @@ type NodeOptions struct { ClockSource clock.Clock // SendTimeout is the timeout on the sending messages to other raft // nodes. Leave this as 0 to get the default value. - SendTimeout time.Duration - TLSCredentials credentials.TransportCredentials - KeyRotator EncryptionKeyRotator + SendTimeout time.Duration + // LargeSendTimeout is the timeout on the sending snapshots to other raft + // nodes. Leave this as 0 to get the default value. + LargeSendTimeout time.Duration + TLSCredentials credentials.TransportCredentials + KeyRotator EncryptionKeyRotator // DisableStackDump prevents Run from dumping goroutine stacks when the // store becomes stuck. DisableStackDump bool @@ -204,6 +207,11 @@ func NewNode(opts NodeOptions) *Node { if opts.SendTimeout == 0 { opts.SendTimeout = 2 * time.Second } + if opts.LargeSendTimeout == 0 { + // a "slow" 100Mbps connection can send over 240MB data in 20 seconds + // which is well over the gRPC message limit of 128MB allowed by SwarmKit + opts.LargeSendTimeout = 20 * time.Second + } raftStore := raft.NewMemoryStorage() @@ -349,6 +357,7 @@ func (n *Node) initTransport() { transportConfig := &transport.Config{ HeartbeatInterval: time.Duration(n.Config.ElectionTick) * n.opts.TickInterval, SendTimeout: n.opts.SendTimeout, + LargeSendTimeout: n.opts.LargeSendTimeout, Credentials: n.opts.TLSCredentials, Raft: n, } diff --git a/manager/state/raft/transport/mock_raft_test.go b/manager/state/raft/transport/mock_raft_test.go index 7e7256de6a..703570fb84 100644 --- a/manager/state/raft/transport/mock_raft_test.go +++ b/manager/state/raft/transport/mock_raft_test.go @@ -60,6 +60,7 @@ func newMockRaft() (*mockRaft, error) { cfg := &Config{ HeartbeatInterval: 3 * time.Second, SendTimeout: 2 * time.Second, + LargeSendTimeout: 20 * time.Second, Raft: mr, } tr := New(cfg) diff --git a/manager/state/raft/transport/peer.go b/manager/state/raft/transport/peer.go index 55639af13f..8c7ca75458 100644 --- a/manager/state/raft/transport/peer.go +++ b/manager/state/raft/transport/peer.go @@ -133,7 +133,14 @@ func (p *peer) resolveAddr(ctx context.Context, id uint64) (string, error) { } func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error { - ctx, cancel := context.WithTimeout(ctx, p.tr.config.SendTimeout) + timeout := p.tr.config.SendTimeout + // if a snapshot is being sent, set timeout to LargeSendTimeout because + // sending snapshots can take more time than other messages sent between peers. + // The same applies to AppendEntries as well, where messages can get large. + if m.Type == raftpb.MsgSnap || m.Type == raftpb.MsgApp { + timeout = p.tr.config.LargeSendTimeout + } + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() _, err := api.NewRaftClient(p.conn()).ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m}) if grpc.Code(err) == codes.NotFound && grpc.ErrorDesc(err) == membership.ErrMemberRemoved.Error() { diff --git a/manager/state/raft/transport/transport.go b/manager/state/raft/transport/transport.go index b259013d8a..bd5a04eec7 100644 --- a/manager/state/raft/transport/transport.go +++ b/manager/state/raft/transport/transport.go @@ -35,6 +35,7 @@ type Raft interface { type Config struct { HeartbeatInterval time.Duration SendTimeout time.Duration + LargeSendTimeout time.Duration Credentials credentials.TransportCredentials RaftID string