Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 7 additions & 23 deletions manager/state/raft/transport/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,18 +229,10 @@ func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error {
//
// By doing it as this watchdog-type structure, we can time out individual
// operations by canceling the context on our own terms.
bump := make(chan struct{})
go func() {
for {
select {
case <-bump:
case <-time.After(p.tr.config.SendTimeout):
cancel()
case <-ctx.Done():
return
}
}
}()
t := time.AfterFunc(p.tr.config.SendTimeout, cancel)
defer t.Stop()

bump := func() { t.Reset(p.tr.config.SendTimeout) }

var err error
var stream api.Raft_StreamRaftMessageClient
Expand All @@ -260,22 +252,14 @@ func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error {
// Stream
for _, msg := range msgs {
err = stream.Send(&msg)

// If the send succeeds, bump the watchdog timer.
//
// We cannot just do a naked send to the bump channel. If we try to
// send, for example, and the timer has elapsed, then the context
// will have been canceled, the watchdog loop will have exited, and
// there would be no receiver. We'd block here forever.
select {
case bump <- struct{}{}:
case <-ctx.Done():
}
if err != nil {
log.G(ctx).WithError(err).Error("error streaming message to peer")
stream.CloseAndRecv()
break
}

// If the send succeeds, bump the watchdog timer.
bump()
}

// Finished sending all the messages.
Expand Down