Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions agent/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,10 @@ func (s *session) start(ctx context.Context) error {
return err
}

if msg.Disconnect {
stream.CloseSend()
return errSessionDisconnect
}

s.sessionID = msg.SessionID
s.session = stream

return nil
return s.handleSessionMessage(ctx, msg)
}

func (s *session) heartbeat(ctx context.Context) error {
Expand Down Expand Up @@ -197,21 +192,28 @@ func (s *session) listen(ctx context.Context) error {
defer s.session.CloseSend()
log.G(ctx).Debugf("(*session).listen")
for {
resp, err := s.session.Recv()
msg, err := s.session.Recv()
if err != nil {
return err
}

select {
case s.messages <- resp:
case <-s.closed:
return errSessionClosed
case <-ctx.Done():
return ctx.Err()
if err := s.handleSessionMessage(ctx, msg); err != nil {
return err
}
}
}

func (s *session) handleSessionMessage(ctx context.Context, msg *api.SessionMessage) error {
select {
case s.messages <- msg:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You still need to handle Disconnect message somewhere here don't you?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Odd. That should have been in (*Agent).handleSessionMessage. Looks like it was removed here: 1e193df#diff-6ea70e0a4de93ebcb83000a028edc163L290.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I remembered seeing from before but went back and looked there and wasn't there.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. We are removing this field and found a much better way to force disconnect.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. SGTM

return nil
case <-s.closed:
return errSessionClosed
case <-ctx.Done():
return ctx.Err()
}
}

func (s *session) watch(ctx context.Context) error {
log.G(ctx).Debugf("(*session).watch")
client := api.NewDispatcherClient(s.agent.config.Conn)
Expand Down
132 changes: 46 additions & 86 deletions api/dispatcher.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 1 addition & 5 deletions api/dispatcher.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,9 @@ message SessionMessage {
// Managers provides a weight list of alternative dispatchers
repeated WeightedPeer managers = 3;

// Disconnect instructs the agent to disconnect from the current disptacher
// and select a new one.
bool disconnect = 4;

// Symmetric encryption key distributed by the lead manager. Used by agents
// for securing network bootstrapping and communication.
repeated EncryptionKey network_bootstrap_keys = 5;
repeated EncryptionKey network_bootstrap_keys = 4;
}

// HeartbeatRequest provides identifying properties for a single heartbeat.
Expand Down
44 changes: 26 additions & 18 deletions manager/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/transport"

"github.com/Sirupsen/logrus"
"github.com/docker/swarm-v2/api"
Expand Down Expand Up @@ -699,7 +700,6 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
SessionID: sessionID,
Node: nodeObj,
Managers: d.getManagers(),
Disconnect: false,
NetworkBootstrapKeys: d.networkBootstrapKeys,
}); err != nil {
return err
Expand All @@ -710,6 +710,26 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
keyMgrUpdates, keyMgrCancel := d.keyMgrQueue.Watch()
defer keyMgrCancel()

// disconnect is a helper forcibly shutdown connection
disconnect := func() error {
// force disconnect by shutting down the stream.
transportStream, ok := transport.StreamFromContext(stream.Context())
if ok {
// if we have the transport stream, we can signal a disconnect
// in the client.
if err := transportStream.ServerTransport().Close(); err != nil {
log.WithError(err).Error("session end")
}
}

nodeStatus := api.NodeStatus{State: api.NodeStatus_DISCONNECTED, Message: "node is currently trying to find new manager"}
if err := d.nodeRemove(nodeID, nodeStatus); err != nil {
log.WithError(err).Error("failed to remove node")
}
// still return an abort if the transport closure was ineffective.
return grpc.Errorf(codes.Aborted, "node must disconnect")
}

for {
// After each message send, we need to check the nodes sessionID hasn't
// changed. If it has, we will the stream and make the node
Expand All @@ -718,10 +738,9 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
if err != nil {
return err
}
var (
disconnectError error
mgrs []*api.WeightedPeer
)

var mgrs []*api.WeightedPeer

select {
case ev := <-managerUpdates:
mgrs = ev.([]*api.WeightedPeer)
Expand All @@ -730,34 +749,23 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
case <-stream.Context().Done():
return stream.Context().Err()
case <-node.Disconnect:
disconnectError = grpc.Errorf(codes.Aborted, "node must disconnect")
return disconnect()
case <-d.ctx.Done():
disconnectError = grpc.Errorf(codes.Aborted, "dispatcher stopped")
return disconnect()
case <-keyMgrUpdates:
}
if mgrs == nil {
mgrs = d.getManagers()
}
if disconnectError != nil {
nodeStatus := api.NodeStatus{State: api.NodeStatus_DISCONNECTED, Message: "node is currently trying to find new manager"}
if err := d.nodeRemove(nodeID, nodeStatus); err != nil {
log.WithError(err).Error("failed to remove node")
}
}

if err := stream.Send(&api.SessionMessage{
SessionID: sessionID,
Node: nodeObj,
Managers: mgrs,
Disconnect: disconnectError != nil,
NetworkBootstrapKeys: d.networkBootstrapKeys,
}); err != nil {
return err
}
if disconnectError != nil {
log.WithError(disconnectError).Error("session end")
return disconnectError
}
}
}

Expand Down
1 change: 0 additions & 1 deletion manager/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,6 @@ func TestSession(t *testing.T) {

msg, err := stream.Recv()
assert.Equal(t, 1, len(msg.Managers))
assert.False(t, msg.Disconnect)
}

func TestSessionNoCert(t *testing.T) {
Expand Down