From da98934b1a17945deba893f46f3e0431bb3255e6 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Sat, 4 Jun 2016 16:51:13 -0700 Subject: [PATCH 1/2] agent: handle initial session message The role of the session message has expanded greatly in the last week, including node and networking information. Previously, it was sufficient to drop the first message and listen for updates. Now, there is critical bootstrapping data that must be propagated at session startup time. This change ensures this. Signed-off-by: Stephen J Day --- agent/session.go | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/agent/session.go b/agent/session.go index 9fd5388169..0574418ef7 100644 --- a/agent/session.go +++ b/agent/session.go @@ -142,15 +142,10 @@ func (s *session) start(ctx context.Context) error { return err } - if msg.Disconnect { - stream.CloseSend() - return errSessionDisconnect - } - s.sessionID = msg.SessionID s.session = stream - return nil + return s.handleSessionMessage(ctx, msg) } func (s *session) heartbeat(ctx context.Context) error { @@ -197,21 +192,28 @@ func (s *session) listen(ctx context.Context) error { defer s.session.CloseSend() log.G(ctx).Debugf("(*session).listen") for { - resp, err := s.session.Recv() + msg, err := s.session.Recv() if err != nil { return err } - select { - case s.messages <- resp: - case <-s.closed: - return errSessionClosed - case <-ctx.Done(): - return ctx.Err() + if err := s.handleSessionMessage(ctx, msg); err != nil { + return err } } } +func (s *session) handleSessionMessage(ctx context.Context, msg *api.SessionMessage) error { + select { + case s.messages <- msg: + return nil + case <-s.closed: + return errSessionClosed + case <-ctx.Done(): + return ctx.Err() + } +} + func (s *session) watch(ctx context.Context) error { log.G(ctx).Debugf("(*session).watch") client := api.NewDispatcherClient(s.agent.config.Conn) From 74c5fe92593063023a70212234488d3899a6e6b6 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Sat, 4 Jun 2016 18:06:32 -0700 Subject: [PATCH 2/2] api: remove disconnect field from session message After some close study of session handling and GRPC, we found it to be sufficient to close the Server's transport stream to force a reconnection at the agent-level. Under this realization, we have removed the `disconnect` field from `api.SessionMessage`. Connection rebalancing is not yet implemented, but we have confirmed that the current behavior is sufficient to support in the future without changes in the agent. The detected connection closer on the client-side forces GRPC to call into the picker and select a new manager node. Signed-off-by: Stephen J Day --- api/dispatcher.pb.go | 132 +++++++++----------------- api/dispatcher.proto | 6 +- manager/dispatcher/dispatcher.go | 44 +++++---- manager/dispatcher/dispatcher_test.go | 1 - 4 files changed, 73 insertions(+), 110 deletions(-) diff --git a/api/dispatcher.pb.go b/api/dispatcher.pb.go index 008fea71bd..339cbbf4b7 100644 --- a/api/dispatcher.pb.go +++ b/api/dispatcher.pb.go @@ -94,12 +94,9 @@ type SessionMessage struct { Node *Node `protobuf:"bytes,2,opt,name=node" json:"node,omitempty"` // Managers provides a weight list of alternative dispatchers Managers []*WeightedPeer `protobuf:"bytes,3,rep,name=managers" json:"managers,omitempty"` - // Disconnect instructs the agent to disconnect from the current disptacher - // and select a new one. - Disconnect bool `protobuf:"varint,4,opt,name=disconnect,proto3" json:"disconnect,omitempty"` // Symmetric encryption key distributed by the lead manager. Used by agents // for securing network bootstrapping and communication. - NetworkBootstrapKeys []*EncryptionKey `protobuf:"bytes,5,rep,name=network_bootstrap_keys,json=networkBootstrapKeys" json:"network_bootstrap_keys,omitempty"` + NetworkBootstrapKeys []*EncryptionKey `protobuf:"bytes,4,rep,name=network_bootstrap_keys,json=networkBootstrapKeys" json:"network_bootstrap_keys,omitempty"` } func (m *SessionMessage) Reset() { *m = SessionMessage{} } @@ -252,9 +249,8 @@ func (m *SessionMessage) Copy() *SessionMessage { } o := &SessionMessage{ - SessionID: m.SessionID, - Node: m.Node.Copy(), - Disconnect: m.Disconnect, + SessionID: m.SessionID, + Node: m.Node.Copy(), } if m.Managers != nil { @@ -385,7 +381,7 @@ func (this *SessionMessage) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 9) + s := make([]string, 0, 8) s = append(s, "&api.SessionMessage{") s = append(s, "SessionID: "+fmt.Sprintf("%#v", this.SessionID)+",\n") if this.Node != nil { @@ -394,7 +390,6 @@ func (this *SessionMessage) GoString() string { if this.Managers != nil { s = append(s, "Managers: "+fmt.Sprintf("%#v", this.Managers)+",\n") } - s = append(s, "Disconnect: "+fmt.Sprintf("%#v", this.Disconnect)+",\n") if this.NetworkBootstrapKeys != nil { s = append(s, "NetworkBootstrapKeys: "+fmt.Sprintf("%#v", this.NetworkBootstrapKeys)+",\n") } @@ -840,19 +835,9 @@ func (m *SessionMessage) MarshalTo(data []byte) (int, error) { i += n } } - if m.Disconnect { - data[i] = 0x20 - i++ - if m.Disconnect { - data[i] = 1 - } else { - data[i] = 0 - } - i++ - } if len(m.NetworkBootstrapKeys) > 0 { for _, msg := range m.NetworkBootstrapKeys { - data[i] = 0x2a + data[i] = 0x22 i++ i = encodeVarintDispatcher(data, i, uint64(msg.Size())) n, err := msg.MarshalTo(data[i:]) @@ -1257,9 +1242,6 @@ func (m *SessionMessage) Size() (n int) { n += 1 + l + sovDispatcher(uint64(l)) } } - if m.Disconnect { - n += 2 - } if len(m.NetworkBootstrapKeys) > 0 { for _, e := range m.NetworkBootstrapKeys { l = e.Size() @@ -1376,7 +1358,6 @@ func (this *SessionMessage) String() string { `SessionID:` + fmt.Sprintf("%v", this.SessionID) + `,`, `Node:` + strings.Replace(fmt.Sprintf("%v", this.Node), "Node", "Node", 1) + `,`, `Managers:` + strings.Replace(fmt.Sprintf("%v", this.Managers), "WeightedPeer", "WeightedPeer", 1) + `,`, - `Disconnect:` + fmt.Sprintf("%v", this.Disconnect) + `,`, `NetworkBootstrapKeys:` + strings.Replace(fmt.Sprintf("%v", this.NetworkBootstrapKeys), "EncryptionKey", "EncryptionKey", 1) + `,`, `}`, }, "") @@ -1667,26 +1648,6 @@ func (m *SessionMessage) Unmarshal(data []byte) error { } iNdEx = postIndex case 4: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Disconnect", wireType) - } - var v int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowDispatcher - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - v |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - m.Disconnect = bool(v != 0) - case 5: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field NetworkBootstrapKeys", wireType) } @@ -2435,46 +2396,45 @@ var ( ) var fileDescriptorDispatcher = []byte{ - // 643 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x9c, 0x54, 0xcd, 0x6e, 0xd3, 0x4c, - 0x14, 0xad, 0xd3, 0x34, 0x6d, 0x6e, 0x9a, 0x4f, 0xf9, 0x86, 0x8a, 0x5a, 0x56, 0x49, 0x83, 0x03, - 0x12, 0x12, 0x25, 0x41, 0x45, 0x62, 0x81, 0x22, 0x84, 0xa2, 0x54, 0xa2, 0x42, 0xfc, 0xc8, 0x05, - 0xb2, 0x8c, 0x26, 0xf6, 0x28, 0x35, 0x69, 0x3d, 0x66, 0x66, 0xac, 0x92, 0x05, 0x12, 0x4f, 0x80, - 0x10, 0x2b, 0xf6, 0xec, 0x79, 0x8e, 0x88, 0x15, 0x4b, 0x56, 0x15, 0xed, 0x03, 0x20, 0x1e, 0x81, - 0xb1, 0x3d, 0x4e, 0x42, 0xea, 0x40, 0xd3, 0xc5, 0xc8, 0x33, 0x67, 0xce, 0xb9, 0xf7, 0xe8, 0xde, - 0x3b, 0x86, 0x92, 0xe3, 0x72, 0x1f, 0x0b, 0x7b, 0x9f, 0xb0, 0x9a, 0xcf, 0xa8, 0xa0, 0x08, 0x39, - 0xd4, 0xee, 0xcb, 0x93, 0x7d, 0x10, 0x70, 0x21, 0xbf, 0xd8, 0x77, 0x8d, 0x82, 0x18, 0xf8, 0x84, - 0xc7, 0x04, 0xa3, 0x48, 0xbb, 0xaf, 0x88, 0x2d, 0x92, 0xe3, 0x5a, 0x8f, 0xf6, 0x68, 0xb4, 0xad, - 0x87, 0x3b, 0x85, 0x5e, 0xf2, 0x0f, 0x82, 0x9e, 0xeb, 0xd5, 0xe3, 0x8f, 0x02, 0xd7, 0x9d, 0x80, - 0x61, 0xe1, 0x52, 0xaf, 0x9e, 0x6c, 0xe2, 0x0b, 0xb3, 0x0d, 0xff, 0xed, 0x11, 0xce, 0x25, 0x60, - 0x91, 0xd7, 0x01, 0xe1, 0x02, 0xed, 0x40, 0xc1, 0x21, 0xdc, 0x66, 0xae, 0x1f, 0xd2, 0x74, 0xad, - 0xa2, 0xdd, 0x28, 0x6c, 0x57, 0x6b, 0x67, 0xbd, 0xd5, 0x9e, 0x50, 0x87, 0xb4, 0xc6, 0x54, 0x6b, - 0x52, 0x67, 0x7e, 0xce, 0x8c, 0x22, 0x3f, 0x96, 0x1f, 0xdc, 0x23, 0x68, 0x0b, 0x80, 0xc7, 0x48, - 0xc7, 0x75, 0xa2, 0xc0, 0xf9, 0x66, 0xf1, 0xf4, 0x78, 0x33, 0xaf, 0x78, 0xbb, 0x2d, 0x2b, 0xaf, - 0x08, 0xbb, 0x8e, 0x64, 0x67, 0x3d, 0x99, 0x40, 0xcf, 0x44, 0x06, 0xf4, 0x59, 0x06, 0xac, 0x88, - 0x85, 0x1a, 0xb0, 0x72, 0x88, 0x3d, 0x99, 0x85, 0x71, 0x7d, 0xb1, 0xb2, 0x28, 0x15, 0x95, 0x34, - 0x45, 0x9b, 0xb8, 0xbd, 0x7d, 0x41, 0x9c, 0x67, 0x84, 0x30, 0x6b, 0xa4, 0x40, 0x65, 0x00, 0xd9, - 0x0d, 0x9b, 0x7a, 0x9e, 0x2c, 0xaf, 0x9e, 0x95, 0x19, 0x57, 0xac, 0x09, 0x04, 0xb5, 0xe1, 0xb2, - 0x47, 0xc4, 0x11, 0x65, 0xfd, 0x4e, 0x97, 0x52, 0xc1, 0x05, 0xc3, 0x7e, 0xa7, 0x4f, 0x06, 0x5c, - 0x5f, 0x8a, 0x72, 0x5d, 0x4d, 0xcb, 0xb5, 0xe3, 0xd9, 0x6c, 0x10, 0x15, 0xe3, 0x11, 0x19, 0x58, - 0x6b, 0x2a, 0x40, 0x33, 0xd1, 0x4b, 0x90, 0x9b, 0x0f, 0xa0, 0xf4, 0x90, 0x60, 0x26, 0xba, 0x04, - 0x8b, 0xa4, 0x01, 0x73, 0x95, 0xc9, 0x7c, 0x0a, 0xff, 0x4f, 0x44, 0xe0, 0x3e, 0xf5, 0x38, 0x41, - 0xf7, 0x20, 0xe7, 0x13, 0xe6, 0x52, 0x47, 0xb5, 0x6f, 0x23, 0xcd, 0x5f, 0x4b, 0x4d, 0x42, 0x33, - 0x3b, 0x3c, 0xde, 0x5c, 0xb0, 0x94, 0xc2, 0xfc, 0x90, 0x81, 0xf5, 0x17, 0xbe, 0x83, 0x05, 0x79, - 0x8e, 0x79, 0x7f, 0x4f, 0x60, 0x11, 0xf0, 0x0b, 0x59, 0x43, 0x2f, 0x61, 0x39, 0x88, 0x02, 0x25, - 0x2d, 0x69, 0xa4, 0xd9, 0x98, 0x91, 0xab, 0x36, 0x46, 0x62, 0x86, 0x95, 0x04, 0x33, 0x28, 0x94, - 0xa6, 0x2f, 0x51, 0x15, 0x96, 0x85, 0xc4, 0xc6, 0xb6, 0x40, 0xda, 0xca, 0x85, 0x34, 0xe9, 0x29, - 0x17, 0x5e, 0x49, 0x43, 0x77, 0x21, 0xc7, 0x23, 0x91, 0x1a, 0xaa, 0x72, 0x9a, 0x9f, 0x09, 0x27, - 0x8a, 0x6d, 0x1a, 0xa0, 0x9f, 0x75, 0x19, 0x97, 0xda, 0x6c, 0xc0, 0x6a, 0x88, 0x5e, 0xac, 0x44, - 0xe6, 0x7d, 0xa5, 0x4e, 0x9e, 0x48, 0x0d, 0x96, 0x42, 0xaf, 0x5c, 0x0a, 0x17, 0x67, 0x4d, 0x7d, - 0x28, 0xb0, 0x62, 0xda, 0xf6, 0xfb, 0x2c, 0x40, 0x6b, 0xf4, 0x1f, 0x41, 0x6f, 0x60, 0x59, 0xa5, - 0x41, 0x66, 0x9a, 0xf4, 0xcf, 0xa7, 0x6e, 0xfc, 0x8d, 0xa3, 0x1c, 0x99, 0xd5, 0xaf, 0x5f, 0x7e, - 0x7e, 0xca, 0x5c, 0x81, 0x55, 0x7e, 0x84, 0xd9, 0xe1, 0xad, 0x70, 0x84, 0x09, 0x83, 0x62, 0x7c, - 0x52, 0x0f, 0xe8, 0xb6, 0x86, 0xde, 0x42, 0x7e, 0x34, 0x86, 0xe8, 0x5a, 0x5a, 0xdc, 0xe9, 0x39, - 0x37, 0xae, 0xff, 0x83, 0xa5, 0x0a, 0x7c, 0x1e, 0x03, 0xe8, 0xa3, 0x06, 0xa5, 0xe9, 0x16, 0xa1, - 0x9b, 0x73, 0x8c, 0x9b, 0xb1, 0x75, 0x3e, 0xf2, 0x3c, 0xa6, 0x18, 0x2c, 0x45, 0xcd, 0x45, 0x95, - 0x59, 0x6d, 0x1c, 0x65, 0x9f, 0xcd, 0x98, 0xaf, 0x0f, 0xcd, 0x8d, 0xe1, 0x49, 0x79, 0xe1, 0xbb, - 0x5c, 0xbf, 0x4e, 0xca, 0xda, 0xbb, 0xd3, 0xb2, 0x36, 0x94, 0xeb, 0x9b, 0x5c, 0x3f, 0xe4, 0xea, - 0xe6, 0xa2, 0x9f, 0xfe, 0x9d, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x95, 0xbc, 0xad, 0xe1, 0x7c, - 0x06, 0x00, 0x00, + // 625 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x9c, 0x54, 0x4f, 0x6b, 0x13, 0x41, + 0x14, 0xef, 0xa6, 0x69, 0x6a, 0x5e, 0x1a, 0x89, 0x63, 0xb1, 0xcb, 0x52, 0xd3, 0xb8, 0x51, 0x10, + 0xac, 0x89, 0x54, 0xf0, 0x20, 0x41, 0x24, 0xa4, 0x60, 0x11, 0xff, 0xb0, 0x55, 0x73, 0x0c, 0x93, + 0xdd, 0x21, 0x5d, 0xd3, 0xee, 0xac, 0x33, 0xb3, 0xd4, 0x1c, 0x04, 0xc1, 0xbb, 0x88, 0x27, 0x3f, + 0x85, 0x9f, 0x23, 0x78, 0xf2, 0xe8, 0xa9, 0xd8, 0x7e, 0x00, 0xf1, 0x23, 0x38, 0xbb, 0x3b, 0x9b, + 0xc4, 0x74, 0xa3, 0x4d, 0x0f, 0xc3, 0xcc, 0xbc, 0xf9, 0xfd, 0x7e, 0xf3, 0xdb, 0xf7, 0xde, 0x2c, + 0x94, 0x1c, 0x97, 0xfb, 0x58, 0xd8, 0x7b, 0x84, 0xd5, 0x7c, 0x46, 0x05, 0x45, 0xc8, 0xa1, 0x76, + 0x5f, 0xee, 0xec, 0xfd, 0x80, 0x0b, 0x39, 0x63, 0xdf, 0x35, 0x0a, 0x62, 0xe0, 0x13, 0x1e, 0x03, + 0x8c, 0x22, 0xed, 0xbe, 0x26, 0xb6, 0x48, 0xb6, 0xab, 0x3d, 0xda, 0xa3, 0xd1, 0xb2, 0x1e, 0xae, + 0x54, 0xf4, 0xb2, 0xbf, 0x1f, 0xf4, 0x5c, 0xaf, 0x1e, 0x4f, 0x2a, 0xb8, 0xe6, 0x04, 0x0c, 0x0b, + 0x97, 0x7a, 0xf5, 0x64, 0x11, 0x1f, 0x98, 0x6d, 0xb8, 0xb8, 0x4b, 0x38, 0x97, 0x01, 0x8b, 0xbc, + 0x09, 0x08, 0x17, 0x68, 0x1b, 0x0a, 0x0e, 0xe1, 0x36, 0x73, 0xfd, 0x10, 0xa6, 0x6b, 0x15, 0xed, + 0x66, 0x61, 0xab, 0x5a, 0x3b, 0xed, 0xad, 0xf6, 0x94, 0x3a, 0xa4, 0x35, 0x86, 0x5a, 0x93, 0x3c, + 0xf3, 0x43, 0x66, 0xa4, 0xfc, 0x44, 0x4e, 0xb8, 0x47, 0xd0, 0x26, 0x00, 0x8f, 0x23, 0x1d, 0xd7, + 0x89, 0x84, 0xf3, 0xcd, 0xe2, 0xc9, 0xd1, 0x46, 0x5e, 0xe1, 0x76, 0x5a, 0x56, 0x5e, 0x01, 0x76, + 0x1c, 0x89, 0xce, 0x7a, 0xf2, 0x02, 0x3d, 0x13, 0x19, 0xd0, 0x67, 0x19, 0xb0, 0x22, 0x14, 0x6a, + 0xc0, 0x85, 0x03, 0xec, 0xc9, 0x5b, 0x18, 0xd7, 0x17, 0x2b, 0x8b, 0x92, 0x51, 0x49, 0x63, 0xb4, + 0x89, 0xdb, 0xdb, 0x13, 0xc4, 0x79, 0x4e, 0x08, 0xb3, 0x46, 0x0c, 0xd4, 0x86, 0x2b, 0x1e, 0x11, + 0x87, 0x94, 0xf5, 0x3b, 0x5d, 0x4a, 0x05, 0x17, 0x0c, 0xfb, 0x9d, 0x3e, 0x19, 0x70, 0x3d, 0x1b, + 0x69, 0x5d, 0x4b, 0xd3, 0xda, 0xf6, 0x6c, 0x36, 0x88, 0x3e, 0xf6, 0x31, 0x19, 0x58, 0xab, 0x4a, + 0xa0, 0x99, 0xf0, 0x65, 0x90, 0x9b, 0x0f, 0xa1, 0xf4, 0x88, 0x60, 0x26, 0xba, 0x04, 0x8b, 0x24, + 0xc1, 0x73, 0xa5, 0xc1, 0x7c, 0x06, 0x97, 0x26, 0x14, 0xb8, 0x4f, 0x3d, 0x4e, 0xd0, 0x7d, 0xc8, + 0xf9, 0x84, 0xb9, 0xd4, 0x51, 0xe5, 0x59, 0x4f, 0xf3, 0xd7, 0x52, 0x95, 0x6e, 0x66, 0x87, 0x47, + 0x1b, 0x0b, 0x96, 0x62, 0x98, 0x9f, 0x32, 0xb0, 0xf6, 0xd2, 0x77, 0xb0, 0x20, 0x2f, 0x30, 0xef, + 0xef, 0x0a, 0x2c, 0x02, 0x7e, 0x2e, 0x6b, 0xe8, 0x15, 0x2c, 0x07, 0x91, 0x50, 0x92, 0xf2, 0x46, + 0x9a, 0x8d, 0x19, 0x77, 0xd5, 0xc6, 0x91, 0x18, 0x61, 0x25, 0x62, 0x06, 0x85, 0xd2, 0xf4, 0x21, + 0xaa, 0xc2, 0xb2, 0x90, 0xb1, 0xb1, 0x2d, 0x90, 0xb6, 0x72, 0x21, 0x4c, 0x7a, 0xca, 0x85, 0x47, + 0xd2, 0xd0, 0x3d, 0xc8, 0xf1, 0x88, 0xa4, 0x9a, 0xa6, 0x9c, 0xe6, 0x67, 0xc2, 0x89, 0x42, 0x9b, + 0x06, 0xe8, 0xa7, 0x5d, 0xc6, 0xa9, 0x36, 0x1b, 0xb0, 0x12, 0x46, 0xcf, 0x97, 0x22, 0xf3, 0x81, + 0x62, 0x27, 0x4f, 0xa0, 0x06, 0x4b, 0xa1, 0x57, 0x2e, 0x89, 0x8b, 0xb3, 0xba, 0x3a, 0x24, 0x58, + 0x31, 0x6c, 0xeb, 0x63, 0x16, 0xa0, 0x35, 0xfa, 0x4f, 0xa0, 0xb7, 0xb0, 0xac, 0xae, 0x41, 0x66, + 0x1a, 0xf5, 0xef, 0xa7, 0x6c, 0xfc, 0x0b, 0xa3, 0x1c, 0x99, 0xd5, 0x6f, 0x5f, 0x7f, 0x7d, 0xc9, + 0x5c, 0x85, 0x15, 0x7e, 0x88, 0xd9, 0xc1, 0xed, 0xb0, 0x85, 0x09, 0x83, 0x62, 0xbc, 0x53, 0x0f, + 0xe4, 0x8e, 0x86, 0xde, 0x41, 0x7e, 0xd4, 0x86, 0xe8, 0x7a, 0x9a, 0xee, 0x74, 0x9f, 0x1b, 0x37, + 0xfe, 0x83, 0x52, 0x09, 0x3e, 0x8b, 0x01, 0xf4, 0x59, 0x83, 0xd2, 0x74, 0x89, 0xd0, 0xad, 0x39, + 0xda, 0xcd, 0xd8, 0x3c, 0x1b, 0x78, 0x1e, 0x53, 0x0c, 0x96, 0xa2, 0xe2, 0xa2, 0xca, 0xac, 0x32, + 0x8e, 0x6e, 0x9f, 0x8d, 0x98, 0xaf, 0x0e, 0xcd, 0xf5, 0xe1, 0x71, 0x79, 0xe1, 0x87, 0x1c, 0xbf, + 0x8f, 0xcb, 0xda, 0xfb, 0x93, 0xb2, 0x36, 0x94, 0xe3, 0xbb, 0x1c, 0x3f, 0xe5, 0xe8, 0xe6, 0xa2, + 0x9f, 0xfa, 0xdd, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x41, 0x11, 0xe4, 0x16, 0x5c, 0x06, 0x00, + 0x00, } diff --git a/api/dispatcher.proto b/api/dispatcher.proto index ca835a9664..4e03cc9272 100644 --- a/api/dispatcher.proto +++ b/api/dispatcher.proto @@ -109,13 +109,9 @@ message SessionMessage { // Managers provides a weight list of alternative dispatchers repeated WeightedPeer managers = 3; - // Disconnect instructs the agent to disconnect from the current disptacher - // and select a new one. - bool disconnect = 4; - // Symmetric encryption key distributed by the lead manager. Used by agents // for securing network bootstrapping and communication. - repeated EncryptionKey network_bootstrap_keys = 5; + repeated EncryptionKey network_bootstrap_keys = 4; } // HeartbeatRequest provides identifying properties for a single heartbeat. diff --git a/manager/dispatcher/dispatcher.go b/manager/dispatcher/dispatcher.go index dad604dd84..005f6a92b8 100644 --- a/manager/dispatcher/dispatcher.go +++ b/manager/dispatcher/dispatcher.go @@ -10,6 +10,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/transport" "github.com/Sirupsen/logrus" "github.com/docker/swarm-v2/api" @@ -699,7 +700,6 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio SessionID: sessionID, Node: nodeObj, Managers: d.getManagers(), - Disconnect: false, NetworkBootstrapKeys: d.networkBootstrapKeys, }); err != nil { return err @@ -710,6 +710,26 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio keyMgrUpdates, keyMgrCancel := d.keyMgrQueue.Watch() defer keyMgrCancel() + // disconnect is a helper forcibly shutdown connection + disconnect := func() error { + // force disconnect by shutting down the stream. + transportStream, ok := transport.StreamFromContext(stream.Context()) + if ok { + // if we have the transport stream, we can signal a disconnect + // in the client. + if err := transportStream.ServerTransport().Close(); err != nil { + log.WithError(err).Error("session end") + } + } + + nodeStatus := api.NodeStatus{State: api.NodeStatus_DISCONNECTED, Message: "node is currently trying to find new manager"} + if err := d.nodeRemove(nodeID, nodeStatus); err != nil { + log.WithError(err).Error("failed to remove node") + } + // still return an abort if the transport closure was ineffective. + return grpc.Errorf(codes.Aborted, "node must disconnect") + } + for { // After each message send, we need to check the nodes sessionID hasn't // changed. If it has, we will the stream and make the node @@ -718,10 +738,9 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio if err != nil { return err } - var ( - disconnectError error - mgrs []*api.WeightedPeer - ) + + var mgrs []*api.WeightedPeer + select { case ev := <-managerUpdates: mgrs = ev.([]*api.WeightedPeer) @@ -730,34 +749,23 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio case <-stream.Context().Done(): return stream.Context().Err() case <-node.Disconnect: - disconnectError = grpc.Errorf(codes.Aborted, "node must disconnect") + return disconnect() case <-d.ctx.Done(): - disconnectError = grpc.Errorf(codes.Aborted, "dispatcher stopped") + return disconnect() case <-keyMgrUpdates: } if mgrs == nil { mgrs = d.getManagers() } - if disconnectError != nil { - nodeStatus := api.NodeStatus{State: api.NodeStatus_DISCONNECTED, Message: "node is currently trying to find new manager"} - if err := d.nodeRemove(nodeID, nodeStatus); err != nil { - log.WithError(err).Error("failed to remove node") - } - } if err := stream.Send(&api.SessionMessage{ SessionID: sessionID, Node: nodeObj, Managers: mgrs, - Disconnect: disconnectError != nil, NetworkBootstrapKeys: d.networkBootstrapKeys, }); err != nil { return err } - if disconnectError != nil { - log.WithError(disconnectError).Error("session end") - return disconnectError - } } } diff --git a/manager/dispatcher/dispatcher_test.go b/manager/dispatcher/dispatcher_test.go index 5196e98929..3485c1c2c9 100644 --- a/manager/dispatcher/dispatcher_test.go +++ b/manager/dispatcher/dispatcher_test.go @@ -540,7 +540,6 @@ func TestSession(t *testing.T) { msg, err := stream.Recv() assert.Equal(t, 1, len(msg.Managers)) - assert.False(t, msg.Disconnect) } func TestSessionNoCert(t *testing.T) {