diff --git a/manager/role_manager.go b/manager/role_manager.go index c2e4f1a04d..e5cf27b62f 100644 --- a/manager/role_manager.go +++ b/manager/role_manager.go @@ -6,11 +6,21 @@ import ( "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/manager/state/raft" + "github.com/docker/swarmkit/manager/state/raft/membership" "github.com/docker/swarmkit/manager/state/store" + "github.com/pivotal-golang/clock" "golang.org/x/net/context" ) -const roleReconcileInterval = 5 * time.Second +const ( + // roleReconcileInterval is how often to retry removing a node, if a reconciliation or + // removal failed + roleReconcileInterval = 5 * time.Second + + // removalTimeout is how long to wait before a raft member removal fails to be applied + // to the store + removalTimeout = 5 * time.Second +) // roleManager reconciles the raft member list with desired role changes. type roleManager struct { @@ -21,32 +31,67 @@ type roleManager struct { raft *raft.Node doneChan chan struct{} - // pending contains changed nodes that have not yet been reconciled in + // pendingReconciliation contains changed nodes that have not yet been reconciled in // the raft member list. - pending map[string]*api.Node + pendingReconciliation map[string]*api.Node + + // pendingRemoval contains the IDs of nodes that have been deleted - if these correspond + // to members in the raft cluster, those members need to be removed from raft + pendingRemoval map[string]struct{} + + // leave this nil except for tests which need to inject a fake time source + clocksource clock.Clock } // newRoleManager creates a new roleManager. func newRoleManager(store *store.MemoryStore, raftNode *raft.Node) *roleManager { ctx, cancel := context.WithCancel(context.Background()) return &roleManager{ - ctx: ctx, - cancel: cancel, - store: store, - raft: raftNode, - doneChan: make(chan struct{}), - pending: make(map[string]*api.Node), + ctx: ctx, + cancel: cancel, + store: store, + raft: raftNode, + doneChan: make(chan struct{}), + pendingReconciliation: make(map[string]*api.Node), + pendingRemoval: make(map[string]struct{}), + } +} + +// getTicker returns a ticker based on the configured clock source +func (rm *roleManager) getTicker(interval time.Duration) clock.Ticker { + if rm.clocksource == nil { + return clock.NewClock().NewTicker(interval) } + return rm.clocksource.NewTicker(interval) + } -// Run is roleManager's main loop. -// ctx is only used for logging. +// Run is roleManager's main loop. On startup, it looks at every node object in the cluster and +// attempts to reconcile the raft member list with all the nodes' desired roles. If any nodes +// need to be demoted or promoted, it will add them to a reconciliation queue, and if any raft +// members' node have been deleted, it will add them to a removal queue. + +// These queues are processed immediately, and any nodes that failed to be processed are +// processed again in the next reconciliation interval, so that nodes will hopefully eventually +// be reconciled. As node updates come in, any promotions or demotions are also added to the +// reconciliation queue and reconciled. As node removals come in, they are added to the removal +// queue to be removed from the raft cluster. + +// Removal from a raft cluster is idempotent (and it's the only raft cluster change that will occur +// during reconciliation or removal), so it's fine if a node is in both the removal and reconciliation +// queues. + +// The ctx param is only used for logging. func (rm *roleManager) Run(ctx context.Context) { defer close(rm.doneChan) var ( - nodes []*api.Node - ticker *time.Ticker + nodes []*api.Node + + // ticker and tickerCh are used to time the reconciliation interval, which will + // periodically attempt to re-reconcile nodes that failed to reconcile the first + // time through + ticker clock.Ticker tickerCh <-chan time.Time ) @@ -56,37 +101,73 @@ func (rm *roleManager) Run(ctx context.Context) { nodes, err = store.FindNodes(readTx, store.All) return err }, - api.EventUpdateNode{}) + api.EventUpdateNode{}, + api.EventDeleteNode{}) defer cancelWatch() if err != nil { log.G(ctx).WithError(err).Error("failed to check nodes for role changes") } else { + // Assume all raft members have been deleted from the cluster, until the node list + // tells us otherwise. We can make this assumption because the node object must + // exist first before the raft member object. + + // Background life-cycle for a manager: it joins the cluster, getting a new TLS + // certificate. To get a TLS certificate, it makes an RPC call to the CA server, + // which on successful join adds its information to the cluster node list and + // eventually generates a TLS certificate for it. Once it has a TLS certificate, + // it can contact the other nodes, and makes an RPC call to request to join the + // raft cluster. The node it contacts will add the node to the raft membership. + for _, member := range rm.raft.GetMemberlist() { + rm.pendingRemoval[member.NodeID] = struct{}{} + } for _, node := range nodes { - rm.pending[node.ID] = node + // if the node exists, we don't want it removed from the raft membership cluster + // necessarily + delete(rm.pendingRemoval, node.ID) + + // reconcile each existing node + rm.pendingReconciliation[node.ID] = node rm.reconcileRole(ctx, node) } - if len(rm.pending) != 0 { - ticker = time.NewTicker(roleReconcileInterval) - tickerCh = ticker.C + for nodeID := range rm.pendingRemoval { + rm.evictRemovedNode(ctx, nodeID) + } + // If any reconciliations or member removals failed, we want to try again, so + // make sure that we start the ticker so we can try again and again every + // roleReconciliationInterval seconds until the queues are both empty. + if len(rm.pendingReconciliation) != 0 || len(rm.pendingRemoval) != 0 { + ticker = rm.getTicker(roleReconcileInterval) + tickerCh = ticker.C() } } for { select { case event := <-watcher: - node := event.(api.EventUpdateNode).Node - rm.pending[node.ID] = node - rm.reconcileRole(ctx, node) - if len(rm.pending) != 0 && ticker == nil { - ticker = time.NewTicker(roleReconcileInterval) - tickerCh = ticker.C + switch ev := event.(type) { + case api.EventUpdateNode: + rm.pendingReconciliation[ev.Node.ID] = ev.Node + rm.reconcileRole(ctx, ev.Node) + case api.EventDeleteNode: + rm.pendingRemoval[ev.Node.ID] = struct{}{} + rm.evictRemovedNode(ctx, ev.Node.ID) + } + // If any reconciliations or member removals failed, we want to try again, so + // make sure that we start the ticker so we can try again and again every + // roleReconciliationInterval seconds until the queues are both empty. + if (len(rm.pendingReconciliation) != 0 || len(rm.pendingRemoval) != 0) && ticker == nil { + ticker = rm.getTicker(roleReconcileInterval) + tickerCh = ticker.C() } case <-tickerCh: - for _, node := range rm.pending { + for _, node := range rm.pendingReconciliation { rm.reconcileRole(ctx, node) } - if len(rm.pending) == 0 { + for nodeID := range rm.pendingRemoval { + rm.evictRemovedNode(ctx, nodeID) + } + if len(rm.pendingReconciliation) == 0 && len(rm.pendingRemoval) == 0 { ticker.Stop() ticker = nil tickerCh = nil @@ -100,10 +181,57 @@ func (rm *roleManager) Run(ctx context.Context) { } } +// evictRemovedNode evicts a removed node from the raft cluster membership. This is to cover an edge case in which +// a node might have been removed, but somehow the role was not reconciled (possibly a demotion and a removal happened +// in rapid succession before the raft membership configuration went through). +func (rm *roleManager) evictRemovedNode(ctx context.Context, nodeID string) { + // Check if the member still exists in the membership + member := rm.raft.GetMemberByNodeID(nodeID) + if member != nil { + // We first try to remove the raft node from the raft cluster. On the next tick, if the node + // has been removed from the cluster membership, we then delete it from the removed list + rm.removeMember(ctx, member) + return + } + delete(rm.pendingRemoval, nodeID) +} + +// removeMember removes a member from the raft cluster membership +func (rm *roleManager) removeMember(ctx context.Context, member *membership.Member) { + // Quorum safeguard - quorum should have been checked before a node was allowed to be demoted, but if in the + // intervening time some other node disconnected, removing this node would result in a loss of cluster quorum. + // We leave it + if !rm.raft.CanRemoveMember(member.RaftID) { + // TODO(aaronl): Retry later + log.G(ctx).Debugf("can't demote node %s at this time: removing member from raft would result in a loss of quorum", member.NodeID) + return + } + + rmCtx, rmCancel := context.WithTimeout(rm.ctx, removalTimeout) + defer rmCancel() + + if member.RaftID == rm.raft.Config.ID { + // Don't use rmCtx, because we expect to lose + // leadership, which will cancel this context. + log.G(ctx).Info("demoted; transferring leadership") + err := rm.raft.TransferLeadership(context.Background()) + if err == nil { + return + } + log.G(ctx).WithError(err).Info("failed to transfer leadership") + } + if err := rm.raft.RemoveMember(rmCtx, member.RaftID); err != nil { + // TODO(aaronl): Retry later + log.G(ctx).WithError(err).Debugf("can't demote node %s at this time", member.NodeID) + } +} + +// reconcileRole looks at the desired role for a node, and if it is being demoted or promoted, updates the +// node role accordingly. If the node is being demoted, it also removes the node from the raft cluster membership. func (rm *roleManager) reconcileRole(ctx context.Context, node *api.Node) { if node.Role == node.Spec.DesiredRole { // Nothing to do. - delete(rm.pending, node.ID) + delete(rm.pendingReconciliation, node.ID) return } @@ -120,36 +248,16 @@ func (rm *roleManager) reconcileRole(ctx context.Context, node *api.Node) { if err != nil { log.G(ctx).WithError(err).Errorf("failed to promote node %s", node.ID) } else { - delete(rm.pending, node.ID) + delete(rm.pendingReconciliation, node.ID) } } else if node.Spec.DesiredRole == api.NodeRoleWorker && node.Role == api.NodeRoleManager { // Check for node in memberlist member := rm.raft.GetMemberByNodeID(node.ID) if member != nil { - // Quorum safeguard - if !rm.raft.CanRemoveMember(member.RaftID) { - // TODO(aaronl): Retry later - log.G(ctx).Debugf("can't demote node %s at this time: removing member from raft would result in a loss of quorum", node.ID) - return - } - - rmCtx, rmCancel := context.WithTimeout(rm.ctx, 5*time.Second) - defer rmCancel() - - if member.RaftID == rm.raft.Config.ID { - // Don't use rmCtx, because we expect to lose - // leadership, which will cancel this context. - log.G(ctx).Info("demoted; transferring leadership") - err := rm.raft.TransferLeadership(context.Background()) - if err == nil { - return - } - log.G(ctx).WithError(err).Info("failed to transfer leadership") - } - if err := rm.raft.RemoveMember(rmCtx, member.RaftID); err != nil { - // TODO(aaronl): Retry later - log.G(ctx).WithError(err).Debugf("can't demote node %s at this time", node.ID) - } + // We first try to remove the raft node from the raft cluster. On the next tick, if the node + // has been removed from the cluster membership, we then update the store to reflect the fact + // that it has been successfully demoted, and if that works, remove it from the pending list. + rm.removeMember(ctx, member) return } @@ -165,7 +273,7 @@ func (rm *roleManager) reconcileRole(ctx context.Context, node *api.Node) { if err != nil { log.G(ctx).WithError(err).Errorf("failed to demote node %s", node.ID) } else { - delete(rm.pending, node.ID) + delete(rm.pendingReconciliation, node.ID) } } } diff --git a/manager/role_manager_test.go b/manager/role_manager_test.go new file mode 100644 index 0000000000..c8f51916cb --- /dev/null +++ b/manager/role_manager_test.go @@ -0,0 +1,296 @@ +package manager + +import ( + "errors" + "io/ioutil" + "log" + "testing" + + "github.com/pivotal-golang/clock/fakeclock" + + "google.golang.org/grpc/grpclog" + + "github.com/docker/swarmkit/api" + cautils "github.com/docker/swarmkit/ca/testutils" + raftutils "github.com/docker/swarmkit/manager/state/raft/testutils" + "github.com/docker/swarmkit/manager/state/store" + "github.com/docker/swarmkit/testutils" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" +) + +func getRaftCluster(t *testing.T, tc *cautils.TestCA) (map[uint64]*raftutils.TestNode, *fakeclock.FakeClock) { + grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags)) + logrus.SetOutput(ioutil.Discard) + + nodes, fc := raftutils.NewRaftCluster(t, tc) + raftutils.WaitForCluster(t, fc, nodes) + return nodes, fc +} + +// While roleManager is running, if a node is demoted, it is removed from the raft cluster. If a node is +// promoted, it is not added to the cluster but its observed role will change to manager. +func TestRoleManagerRemovesDemotedNodesAndAddsPromotedNodes(t *testing.T) { + t.Parallel() + + tc := cautils.NewTestCA(nil) + defer tc.Stop() + + nodes, fc := raftutils.NewRaftCluster(t, tc) + defer raftutils.TeardownCluster(nodes) + + // nodes is not a list, but a map. The IDs are 1, 2, 3 + require.Len(t, nodes[1].GetMemberlist(), 3) + + // create node objects in the memory store + for _, node := range nodes { + s := raftutils.Leader(nodes).MemoryStore() + // Create a new node object + require.NoError(t, s.Update(func(tx store.Tx) error { + return store.CreateNode(tx, &api.Node{ + Role: api.NodeRoleManager, + ID: node.SecurityConfig.ClientTLSCreds.NodeID(), + Spec: api.NodeSpec{ + DesiredRole: api.NodeRoleManager, + Membership: api.NodeMembershipAccepted, + Availability: api.NodeAvailabilityActive, + }, + }) + })) + } + + lead := raftutils.Leader(nodes) + var nonLead *raftutils.TestNode + for _, n := range nodes { + if n != lead { + nonLead = n + break + } + } + rm := newRoleManager(lead.MemoryStore(), lead.Node) + rm.clocksource = fc + go rm.Run(tc.Context) + defer rm.Stop() + + // demote the node + require.NoError(t, lead.MemoryStore().Update(func(tx store.Tx) error { + n := store.GetNode(tx, nonLead.SecurityConfig.ClientTLSCreds.NodeID()) + n.Spec.DesiredRole = api.NodeRoleWorker + return store.UpdateNode(tx, n) + })) + require.NoError(t, testutils.PollFuncWithTimeout(fc, func() error { + memberlist := lead.GetMemberlist() + if len(memberlist) != 2 { + return errors.New("raft node hasn't been removed yet") + } + for _, m := range memberlist { + if m.NodeID == nonLead.SecurityConfig.ClientTLSCreds.NodeID() { + return errors.New("wrong member was removed") + } + } + // use Update just because it returns an error + return lead.MemoryStore().Update(func(tx store.Tx) error { + if n := store.GetNode(tx, nonLead.SecurityConfig.ClientTLSCreds.NodeID()); n.Role != api.NodeRoleWorker { + return errors.New("raft node hasn't been marked as a worker yet") + } + return nil + }) + }, roleReconcileInterval/2)) + + // now promote the node + require.NoError(t, lead.MemoryStore().Update(func(tx store.Tx) error { + n := store.GetNode(tx, nonLead.SecurityConfig.ClientTLSCreds.NodeID()) + n.Spec.DesiredRole = api.NodeRoleManager + return store.UpdateNode(tx, n) + })) + require.NoError(t, testutils.PollFuncWithTimeout(fc, func() error { + if len(lead.GetMemberlist()) != 2 { + return errors.New("raft nodes in membership should not have changed") + } + // use Update just because it returns an error + return lead.MemoryStore().Update(func(tx store.Tx) error { + if n := store.GetNode(tx, nonLead.SecurityConfig.ClientTLSCreds.NodeID()); n.Role != api.NodeRoleManager { + return errors.New("raft node hasn't been marked as a manager yet") + } + return nil + }) + }, roleReconcileInterval/2)) +} + +// If a node was demoted before the roleManager starts up, roleManger will remove +// the node from the cluster membership. +func TestRoleManagerRemovesDemotedNodesOnStartup(t *testing.T) { + t.Parallel() + + tc := cautils.NewTestCA(nil) + defer tc.Stop() + + nodes, fc := raftutils.NewRaftCluster(t, tc) + defer raftutils.TeardownCluster(nodes) + + // nodes is not a list, but a map. The IDs are 1, 2, 3 + require.Len(t, nodes[1].GetMemberlist(), 3) + + // create node objects in the memory store + for i, node := range nodes { + s := raftutils.Leader(nodes).MemoryStore() + desired := api.NodeRoleManager + if i == 3 { + desired = api.NodeRoleWorker + } + // Create a new node object + require.NoError(t, s.Update(func(tx store.Tx) error { + return store.CreateNode(tx, &api.Node{ + Role: api.NodeRoleManager, + ID: node.SecurityConfig.ClientTLSCreds.NodeID(), + Spec: api.NodeSpec{ + DesiredRole: desired, + Membership: api.NodeMembershipAccepted, + Availability: api.NodeAvailabilityActive, + }, + }) + })) + } + demoted := nodes[3] + + lead := raftutils.Leader(nodes) + rm := newRoleManager(lead.MemoryStore(), lead.Node) + rm.clocksource = fc + go rm.Run(tc.Context) + defer rm.Stop() + + require.NoError(t, testutils.PollFuncWithTimeout(fc, func() error { + memberlist := lead.GetMemberlist() + if len(memberlist) != 2 { + return errors.New("raft node hasn't been removed yet") + } + for _, m := range memberlist { + if m.NodeID == demoted.SecurityConfig.ClientTLSCreds.NodeID() { + return errors.New("wrong member was removed") + } + } + // use Update just because it returns an error + return lead.MemoryStore().Update(func(tx store.Tx) error { + if n := store.GetNode(tx, demoted.SecurityConfig.ClientTLSCreds.NodeID()); n.Role != api.NodeRoleWorker { + return errors.New("raft node hasn't been marked as a worker yet") + } + return nil + }) + }, roleReconcileInterval/2)) +} + +// While roleManager is running, if a node is deleted, it is removed from the raft cluster. +func TestRoleManagerRemovesDeletedNodes(t *testing.T) { + t.Parallel() + + tc := cautils.NewTestCA(nil) + defer tc.Stop() + + nodes, fc := raftutils.NewRaftCluster(t, tc) + defer raftutils.TeardownCluster(nodes) + + // nodes is not a list, but a map. The IDs are 1, 2, 3 + require.Len(t, nodes[1].GetMemberlist(), 3) + + // create node objects in the memory store + for _, node := range nodes { + s := raftutils.Leader(nodes).MemoryStore() + // Create a new node object + require.NoError(t, s.Update(func(tx store.Tx) error { + return store.CreateNode(tx, &api.Node{ + Role: api.NodeRoleManager, + ID: node.SecurityConfig.ClientTLSCreds.NodeID(), + Spec: api.NodeSpec{ + DesiredRole: api.NodeRoleManager, + Membership: api.NodeMembershipAccepted, + Availability: api.NodeAvailabilityActive, + }, + }) + })) + } + + lead := raftutils.Leader(nodes) + var nonLead *raftutils.TestNode + for _, n := range nodes { + if n != lead { + nonLead = n + break + } + } + rm := newRoleManager(lead.MemoryStore(), lead.Node) + rm.clocksource = fc + go rm.Run(tc.Context) + defer rm.Stop() + + // delete the node + require.NoError(t, lead.MemoryStore().Update(func(tx store.Tx) error { + return store.DeleteNode(tx, nonLead.SecurityConfig.ClientTLSCreds.NodeID()) + })) + require.NoError(t, testutils.PollFuncWithTimeout(fc, func() error { + memberlist := lead.GetMemberlist() + if len(memberlist) != 2 { + return errors.New("raft node hasn't been removed yet") + } + for _, m := range memberlist { + if m.NodeID == nonLead.SecurityConfig.ClientTLSCreds.NodeID() { + return errors.New("wrong member was removed") + } + } + return nil + }, roleReconcileInterval/2)) + +} + +// If a node was removed before the roleManager starts up, roleManger will remove +// the node from the cluster membership. +func TestRoleManagerRemovesDeletedNodesOnStartup(t *testing.T) { + t.Parallel() + + tc := cautils.NewTestCA(nil) + defer tc.Stop() + + nodes, fc := raftutils.NewRaftCluster(t, tc) + defer raftutils.TeardownCluster(nodes) + + // nodes is not a list, but a map. The IDs are 1, 2, 3 + require.Len(t, nodes[1].GetMemberlist(), 3) + + // create node objects in the memory store + for i, node := range nodes { + s := raftutils.Leader(nodes).MemoryStore() + if i == 3 { + continue + } + // Create a new node object + require.NoError(t, s.Update(func(tx store.Tx) error { + return store.CreateNode(tx, &api.Node{ + Role: api.NodeRoleManager, + ID: node.SecurityConfig.ClientTLSCreds.NodeID(), + Spec: api.NodeSpec{ + DesiredRole: api.NodeRoleManager, + Membership: api.NodeMembershipAccepted, + Availability: api.NodeAvailabilityActive, + }, + }) + })) + } + + lead := raftutils.Leader(nodes) + rm := newRoleManager(lead.MemoryStore(), lead.Node) + rm.clocksource = fc + go rm.Run(tc.Context) + defer rm.Stop() + + require.NoError(t, testutils.PollFuncWithTimeout(fc, func() error { + memberlist := lead.GetMemberlist() + if len(memberlist) != 2 { + return errors.New("raft node hasn't been removed yet") + } + for _, m := range memberlist { + if m.NodeID == nodes[3].SecurityConfig.ClientTLSCreds.NodeID() { + return errors.New("wrong member was removed") + } + } + return nil + }, roleReconcileInterval/2)) +}