From 1d68e6146f6d6a62f98cfe78b6ad924609367751 Mon Sep 17 00:00:00 2001 From: cyli Date: Fri, 9 Mar 2018 13:48:51 -0800 Subject: [PATCH 1/3] Add tests for the roleManager that it successfully removes demoted nodes both on startup and while a node is demoted while it's running. Also add a failing test, where the roleManager should remove deleted nodes from the cluster membership on startup. These tests involve injecting a fake clock source into roleManager. Signed-off-by: cyli --- manager/manager.go | 2 +- manager/role_manager.go | 38 +++++-- manager/role_manager_test.go | 213 +++++++++++++++++++++++++++++++++++ 3 files changed, 240 insertions(+), 13 deletions(-) create mode 100644 manager/role_manager_test.go diff --git a/manager/manager.go b/manager/manager.go index 0e897bcd14..667c6ab137 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -1078,7 +1078,7 @@ func (m *Manager) becomeLeader(ctx context.Context) { m.taskReaper = taskreaper.New(s) m.scheduler = scheduler.New(s) m.keyManager = keymanager.New(s, keymanager.DefaultConfig()) - m.roleManager = newRoleManager(s, m.raftNode) + m.roleManager = newRoleManager(s, m.raftNode, nil) // TODO(stevvooe): Allocate a context that can be used to // shutdown underlying manager processes when leadership is diff --git a/manager/role_manager.go b/manager/role_manager.go index c2e4f1a04d..32070edb16 100644 --- a/manager/role_manager.go +++ b/manager/role_manager.go @@ -7,6 +7,7 @@ import ( "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/manager/state/raft" "github.com/docker/swarmkit/manager/state/store" + "github.com/pivotal-golang/clock" "golang.org/x/net/context" ) @@ -24,21 +25,34 @@ type roleManager struct { // pending contains changed nodes that have not yet been reconciled in // the raft member list. pending map[string]*api.Node + + // leave this nil except for tests which need to inject a fake time source + clocksource clock.Clock } // newRoleManager creates a new roleManager. -func newRoleManager(store *store.MemoryStore, raftNode *raft.Node) *roleManager { +func newRoleManager(store *store.MemoryStore, raftNode *raft.Node, clocksource clock.Clock) *roleManager { ctx, cancel := context.WithCancel(context.Background()) return &roleManager{ - ctx: ctx, - cancel: cancel, - store: store, - raft: raftNode, - doneChan: make(chan struct{}), - pending: make(map[string]*api.Node), + ctx: ctx, + cancel: cancel, + store: store, + raft: raftNode, + doneChan: make(chan struct{}), + pending: make(map[string]*api.Node), + clocksource: clocksource, } } +// getTicker returns a ticker based on the configured clock source +func (rm *roleManager) getTicker(interval time.Duration) clock.Ticker { + if rm.clocksource == nil { + return clock.NewClock().NewTicker(interval) + } + return rm.clocksource.NewTicker(interval) + +} + // Run is roleManager's main loop. // ctx is only used for logging. func (rm *roleManager) Run(ctx context.Context) { @@ -46,7 +60,7 @@ func (rm *roleManager) Run(ctx context.Context) { var ( nodes []*api.Node - ticker *time.Ticker + ticker clock.Ticker tickerCh <-chan time.Time ) @@ -67,8 +81,8 @@ func (rm *roleManager) Run(ctx context.Context) { rm.reconcileRole(ctx, node) } if len(rm.pending) != 0 { - ticker = time.NewTicker(roleReconcileInterval) - tickerCh = ticker.C + ticker = rm.getTicker(roleReconcileInterval) + tickerCh = ticker.C() } } @@ -79,8 +93,8 @@ func (rm *roleManager) Run(ctx context.Context) { rm.pending[node.ID] = node rm.reconcileRole(ctx, node) if len(rm.pending) != 0 && ticker == nil { - ticker = time.NewTicker(roleReconcileInterval) - tickerCh = ticker.C + ticker = rm.getTicker(roleReconcileInterval) + tickerCh = ticker.C() } case <-tickerCh: for _, node := range rm.pending { diff --git a/manager/role_manager_test.go b/manager/role_manager_test.go new file mode 100644 index 0000000000..2c812e83c2 --- /dev/null +++ b/manager/role_manager_test.go @@ -0,0 +1,213 @@ +package manager + +import ( + "errors" + "io/ioutil" + "log" + "testing" + + "github.com/pivotal-golang/clock/fakeclock" + + "google.golang.org/grpc/grpclog" + + "github.com/docker/swarmkit/api" + cautils "github.com/docker/swarmkit/ca/testutils" + raftutils "github.com/docker/swarmkit/manager/state/raft/testutils" + "github.com/docker/swarmkit/manager/state/store" + "github.com/docker/swarmkit/testutils" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" +) + +func getRaftCluster(t *testing.T, tc *cautils.TestCA) (map[uint64]*raftutils.TestNode, *fakeclock.FakeClock) { + grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags)) + logrus.SetOutput(ioutil.Discard) + + nodes, fc := raftutils.NewRaftCluster(t, tc) + raftutils.WaitForCluster(t, fc, nodes) + return nodes, fc +} + +// While roleManager is running, if a node is demoted, it is removed from the raft cluster. If a node is +// promoted, it is not added to the cluster but its observed role will change to manager. +func TestRoleManagerRemovesDemotedNodesAndAddsPromotedNodes(t *testing.T) { + t.Parallel() + + tc := cautils.NewTestCA(nil) + defer tc.Stop() + + nodes, fc := raftutils.NewRaftCluster(t, tc) + defer raftutils.TeardownCluster(nodes) + + // nodes is not a list, but a map. The IDs are 1, 2, 3 + require.Len(t, nodes[1].GetMemberlist(), 3) + + // create node objects in the memory store + for _, node := range nodes { + s := raftutils.Leader(nodes).MemoryStore() + // Create a new node object + require.NoError(t, s.Update(func(tx store.Tx) error { + return store.CreateNode(tx, &api.Node{ + Role: api.NodeRoleManager, + ID: node.SecurityConfig.ClientTLSCreds.NodeID(), + Spec: api.NodeSpec{ + DesiredRole: api.NodeRoleManager, + Membership: api.NodeMembershipAccepted, + Availability: api.NodeAvailabilityActive, + }, + }) + })) + } + + lead := raftutils.Leader(nodes) + var nonLead *raftutils.TestNode + for _, n := range nodes { + if n != lead { + nonLead = n + break + } + } + rm := newRoleManager(lead.MemoryStore(), lead.Node, fc) + go rm.Run(tc.Context) + defer rm.Stop() + + // demote the node + require.NoError(t, lead.MemoryStore().Update(func(tx store.Tx) error { + n := store.GetNode(tx, nonLead.SecurityConfig.ClientTLSCreds.NodeID()) + n.Spec.DesiredRole = api.NodeRoleWorker + return store.UpdateNode(tx, n) + })) + require.NoError(t, testutils.PollFuncWithTimeout(fc, func() error { + if len(lead.GetMemberlist()) != 2 { + return errors.New("raft node hasn't been removed yet") + } + // use Update just because it returns an error + return lead.MemoryStore().Update(func(tx store.Tx) error { + if n := store.GetNode(tx, nonLead.SecurityConfig.ClientTLSCreds.NodeID()); n.Role != api.NodeRoleWorker { + return errors.New("raft node hasn't been marked as a worker yet") + } + return nil + }) + }, roleReconcileInterval/2)) + + // now promote the node + require.NoError(t, lead.MemoryStore().Update(func(tx store.Tx) error { + n := store.GetNode(tx, nonLead.SecurityConfig.ClientTLSCreds.NodeID()) + n.Spec.DesiredRole = api.NodeRoleManager + return store.UpdateNode(tx, n) + })) + require.NoError(t, testutils.PollFuncWithTimeout(fc, func() error { + if len(lead.GetMemberlist()) != 2 { + return errors.New("raft nodes in membership should not have changed") + } + // use Update just because it returns an error + return lead.MemoryStore().Update(func(tx store.Tx) error { + if n := store.GetNode(tx, nonLead.SecurityConfig.ClientTLSCreds.NodeID()); n.Role != api.NodeRoleManager { + return errors.New("raft node hasn't been marked as a manager yet") + } + return nil + }) + }, roleReconcileInterval/2)) +} + +// If a node was demoted before the roleManager starts up, roleManger will remove +// the node from the cluster membership. +func TestRoleManagerRemovesDemotedNodesOnStartup(t *testing.T) { + t.Parallel() + + tc := cautils.NewTestCA(nil) + defer tc.Stop() + + nodes, fc := raftutils.NewRaftCluster(t, tc) + defer raftutils.TeardownCluster(nodes) + + // nodes is not a list, but a map. The IDs are 1, 2, 3 + require.Len(t, nodes[1].GetMemberlist(), 3) + + // create node objects in the memory store + for i, node := range nodes { + s := raftutils.Leader(nodes).MemoryStore() + desired := api.NodeRoleManager + if i == 3 { + desired = api.NodeRoleWorker + } + // Create a new node object + require.NoError(t, s.Update(func(tx store.Tx) error { + return store.CreateNode(tx, &api.Node{ + Role: api.NodeRoleManager, + ID: node.SecurityConfig.ClientTLSCreds.NodeID(), + Spec: api.NodeSpec{ + DesiredRole: desired, + Membership: api.NodeMembershipAccepted, + Availability: api.NodeAvailabilityActive, + }, + }) + })) + } + demoted := nodes[3] + + lead := raftutils.Leader(nodes) + rm := newRoleManager(lead.MemoryStore(), lead.Node, fc) + go rm.Run(tc.Context) + defer rm.Stop() + + require.NoError(t, testutils.PollFuncWithTimeout(fc, func() error { + if len(lead.GetMemberlist()) != 2 { + return errors.New("raft node hasn't been removed yet") + } + // use Update just because it returns an error + return lead.MemoryStore().Update(func(tx store.Tx) error { + if n := store.GetNode(tx, demoted.SecurityConfig.ClientTLSCreds.NodeID()); n.Role != api.NodeRoleWorker { + return errors.New("raft node hasn't been marked as a worker yet") + } + return nil + }) + }, roleReconcileInterval/2)) +} + +// If a node was removed before the roleManager starts up, roleManger will remove +// the node from the cluster membership. +func TestRoleManagerRemovesDeletedNodesOnStartup(t *testing.T) { + t.Parallel() + + tc := cautils.NewTestCA(nil) + defer tc.Stop() + + nodes, fc := raftutils.NewRaftCluster(t, tc) + defer raftutils.TeardownCluster(nodes) + + // nodes is not a list, but a map. The IDs are 1, 2, 3 + require.Len(t, nodes[1].GetMemberlist(), 3) + + // create node objects in the memory store + for i, node := range nodes { + s := raftutils.Leader(nodes).MemoryStore() + if i == 3 { + continue + } + // Create a new node object + require.NoError(t, s.Update(func(tx store.Tx) error { + return store.CreateNode(tx, &api.Node{ + Role: api.NodeRoleManager, + ID: node.SecurityConfig.ClientTLSCreds.NodeID(), + Spec: api.NodeSpec{ + DesiredRole: api.NodeRoleManager, + Membership: api.NodeMembershipAccepted, + Availability: api.NodeAvailabilityActive, + }, + }) + })) + } + + lead := raftutils.Leader(nodes) + rm := newRoleManager(lead.MemoryStore(), lead.Node, fc) + go rm.Run(tc.Context) + defer rm.Stop() + + require.NoError(t, testutils.PollFuncWithTimeout(fc, func() error { + if len(lead.GetMemberlist()) != 2 { + return errors.New("raft node hasn't been removed yet") + } + return nil + }, roleReconcileInterval/2)) +} From 5322ff568524cdd18631ab14623f25e47aa18122 Mon Sep 17 00:00:00 2001 From: cyli Date: Fri, 9 Mar 2018 16:22:21 -0800 Subject: [PATCH 2/3] On startup, remove any nodes in the cluster membership that are no longer in the raft store. Signed-off-by: cyli --- manager/manager.go | 2 +- manager/role_manager.go | 178 ++++++++++++++++++++++++++--------- manager/role_manager_test.go | 9 +- 3 files changed, 138 insertions(+), 51 deletions(-) diff --git a/manager/manager.go b/manager/manager.go index 667c6ab137..0e897bcd14 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -1078,7 +1078,7 @@ func (m *Manager) becomeLeader(ctx context.Context) { m.taskReaper = taskreaper.New(s) m.scheduler = scheduler.New(s) m.keyManager = keymanager.New(s, keymanager.DefaultConfig()) - m.roleManager = newRoleManager(s, m.raftNode, nil) + m.roleManager = newRoleManager(s, m.raftNode) // TODO(stevvooe): Allocate a context that can be used to // shutdown underlying manager processes when leadership is diff --git a/manager/role_manager.go b/manager/role_manager.go index 32070edb16..ea002fdf00 100644 --- a/manager/role_manager.go +++ b/manager/role_manager.go @@ -6,12 +6,21 @@ import ( "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/manager/state/raft" + "github.com/docker/swarmkit/manager/state/raft/membership" "github.com/docker/swarmkit/manager/state/store" "github.com/pivotal-golang/clock" "golang.org/x/net/context" ) -const roleReconcileInterval = 5 * time.Second +const ( + // roleReconcileInterval is how often to retry removing a node, if a reconciliation or + // removal failed + roleReconcileInterval = 5 * time.Second + + // removalTimeout is how long to wait before a raft member removal fails to be applied + // to the store + removalTimeout = 5 * time.Second +) // roleManager reconciles the raft member list with desired role changes. type roleManager struct { @@ -22,25 +31,29 @@ type roleManager struct { raft *raft.Node doneChan chan struct{} - // pending contains changed nodes that have not yet been reconciled in + // pendingReconciliation contains changed nodes that have not yet been reconciled in // the raft member list. - pending map[string]*api.Node + pendingReconciliation map[string]*api.Node + + // pendingRemoval contains raft members, indexed by node ID, whose nodes have since + // been deleted - these members need to be removed from raft + pendingRemoval map[string]*api.RaftMember // leave this nil except for tests which need to inject a fake time source clocksource clock.Clock } // newRoleManager creates a new roleManager. -func newRoleManager(store *store.MemoryStore, raftNode *raft.Node, clocksource clock.Clock) *roleManager { +func newRoleManager(store *store.MemoryStore, raftNode *raft.Node) *roleManager { ctx, cancel := context.WithCancel(context.Background()) return &roleManager{ - ctx: ctx, - cancel: cancel, - store: store, - raft: raftNode, - doneChan: make(chan struct{}), - pending: make(map[string]*api.Node), - clocksource: clocksource, + ctx: ctx, + cancel: cancel, + store: store, + raft: raftNode, + doneChan: make(chan struct{}), + pendingReconciliation: make(map[string]*api.Node), + pendingRemoval: make(map[string]*api.RaftMember), } } @@ -53,13 +66,30 @@ func (rm *roleManager) getTicker(interval time.Duration) clock.Ticker { } -// Run is roleManager's main loop. -// ctx is only used for logging. +// Run is roleManager's main loop. On startup, it looks at every node object in the cluster and +// attempts to reconcile the raft member list with all the nodes' desired roles. If any nodes +// need to be demoted or promoted, it will add them to a reconciliation queue, and if any raft +// members' node have been deleted, it will add them to a removal queue. + +// These queues are processed immediately, and any nodes that failed to be processed are +// processed again in the next reconciliation interval, so that nodes will hopefully eventually +// be reconciled. As node updates come in, any promotions or demotions are also added to the +// reconciliation queue and reconciled. + +// The removal queue is never added to beyond the startup of the loop, because +// before a node can be removed, it must be demoted, so the demotion should be sufficient to +// get it removed from the cluster membership. + +// The ctx param is only used for logging. func (rm *roleManager) Run(ctx context.Context) { defer close(rm.doneChan) var ( - nodes []*api.Node + nodes []*api.Node + + // ticker and tickerCh are used to time the reconciliation interval, which will + // periodically attempt to re-reconcile nodes that failed to reconcile the first + // time through ticker clock.Ticker tickerCh <-chan time.Time ) @@ -76,11 +106,35 @@ func (rm *roleManager) Run(ctx context.Context) { if err != nil { log.G(ctx).WithError(err).Error("failed to check nodes for role changes") } else { + // Assume all raft members have been deleted from the cluster, until the node list + // tells us otherwise. We can make this assumption because the node object must + // exist first before the raft member object. + + // Background life-cycle for a manager: it joins the cluster, getting a new TLS + // certificate. To get a TLS certificate, it makes an RPC call to the CA server, + // which on successful join adds its information to the cluster node list and + // eventually generates a TLS certificate for it. Once it has a TLS certificate, + // it can contact the other nodes, and makes an RPC call to request to join the + // raft cluster. The node it contacts will add the node to the raft membership. + for _, member := range rm.raft.GetMemberlist() { + rm.pendingRemoval[member.NodeID] = member + } for _, node := range nodes { - rm.pending[node.ID] = node + // if the node exists, we don't want it removed from the raft membership cluster + // necessarily + delete(rm.pendingRemoval, node.ID) + + // reconcile each existing node + rm.pendingReconciliation[node.ID] = node rm.reconcileRole(ctx, node) } - if len(rm.pending) != 0 { + for _, removed := range rm.pendingRemoval { + rm.evictRemovedNode(ctx, removed) + } + // If any reconciliations or member removals failed, we want to try again, so + // make sure that we start the ticker so we can try again and again every + // roleReconciliationInterval seconds until the queues are both empty. + if len(rm.pendingReconciliation) != 0 || len(rm.pendingRemoval) != 0 { ticker = rm.getTicker(roleReconcileInterval) tickerCh = ticker.C() } @@ -90,17 +144,20 @@ func (rm *roleManager) Run(ctx context.Context) { select { case event := <-watcher: node := event.(api.EventUpdateNode).Node - rm.pending[node.ID] = node + rm.pendingReconciliation[node.ID] = node rm.reconcileRole(ctx, node) - if len(rm.pending) != 0 && ticker == nil { + if len(rm.pendingReconciliation) != 0 && ticker == nil { ticker = rm.getTicker(roleReconcileInterval) tickerCh = ticker.C() } case <-tickerCh: - for _, node := range rm.pending { + for _, node := range rm.pendingReconciliation { rm.reconcileRole(ctx, node) } - if len(rm.pending) == 0 { + for _, node := range rm.pendingRemoval { + rm.evictRemovedNode(ctx, node) + } + if len(rm.pendingReconciliation) == 0 && len(rm.pendingRemoval) == 0 { ticker.Stop() ticker = nil tickerCh = nil @@ -114,10 +171,57 @@ func (rm *roleManager) Run(ctx context.Context) { } } +// evictRemovedNode evicts a removed node from the raft cluster membership. This is to cover an edge case in which +// a node might have been removed, but somehow the role was not reconciled (possibly a demotion and a removal happened +// in rapid succession before the raft membership configuration went through). +func (rm *roleManager) evictRemovedNode(ctx context.Context, raftMember *api.RaftMember) { + // Check if the member still exists in the membership + member := rm.raft.GetMemberByNodeID(raftMember.NodeID) + if member != nil { + // We first try to remove the raft node from the raft cluster. On the next tick, if the node + // has been removed from the cluster membership, we then delete it from the removed list + rm.removeMember(ctx, member) + return + } + delete(rm.pendingRemoval, raftMember.NodeID) +} + +// removeMember removes a member from the raft cluster membership +func (rm *roleManager) removeMember(ctx context.Context, member *membership.Member) { + // Quorum safeguard - quorum should have been checked before a node was allowed to be demoted, but if in the + // intervening time some other node disconnected, removing this node would result in a loss of cluster quorum. + // We leave it + if !rm.raft.CanRemoveMember(member.RaftID) { + // TODO(aaronl): Retry later + log.G(ctx).Debugf("can't demote node %s at this time: removing member from raft would result in a loss of quorum", member.NodeID) + return + } + + rmCtx, rmCancel := context.WithTimeout(rm.ctx, removalTimeout) + defer rmCancel() + + if member.RaftID == rm.raft.Config.ID { + // Don't use rmCtx, because we expect to lose + // leadership, which will cancel this context. + log.G(ctx).Info("demoted; transferring leadership") + err := rm.raft.TransferLeadership(context.Background()) + if err == nil { + return + } + log.G(ctx).WithError(err).Info("failed to transfer leadership") + } + if err := rm.raft.RemoveMember(rmCtx, member.RaftID); err != nil { + // TODO(aaronl): Retry later + log.G(ctx).WithError(err).Debugf("can't demote node %s at this time", member.NodeID) + } +} + +// reconcileRole looks at the desired role for a node, and if it is being demoted or promoted, updates the +// node role accordingly. If the node is being demoted, it also removes the node from the raft cluster membership. func (rm *roleManager) reconcileRole(ctx context.Context, node *api.Node) { if node.Role == node.Spec.DesiredRole { // Nothing to do. - delete(rm.pending, node.ID) + delete(rm.pendingReconciliation, node.ID) return } @@ -134,36 +238,16 @@ func (rm *roleManager) reconcileRole(ctx context.Context, node *api.Node) { if err != nil { log.G(ctx).WithError(err).Errorf("failed to promote node %s", node.ID) } else { - delete(rm.pending, node.ID) + delete(rm.pendingReconciliation, node.ID) } } else if node.Spec.DesiredRole == api.NodeRoleWorker && node.Role == api.NodeRoleManager { // Check for node in memberlist member := rm.raft.GetMemberByNodeID(node.ID) if member != nil { - // Quorum safeguard - if !rm.raft.CanRemoveMember(member.RaftID) { - // TODO(aaronl): Retry later - log.G(ctx).Debugf("can't demote node %s at this time: removing member from raft would result in a loss of quorum", node.ID) - return - } - - rmCtx, rmCancel := context.WithTimeout(rm.ctx, 5*time.Second) - defer rmCancel() - - if member.RaftID == rm.raft.Config.ID { - // Don't use rmCtx, because we expect to lose - // leadership, which will cancel this context. - log.G(ctx).Info("demoted; transferring leadership") - err := rm.raft.TransferLeadership(context.Background()) - if err == nil { - return - } - log.G(ctx).WithError(err).Info("failed to transfer leadership") - } - if err := rm.raft.RemoveMember(rmCtx, member.RaftID); err != nil { - // TODO(aaronl): Retry later - log.G(ctx).WithError(err).Debugf("can't demote node %s at this time", node.ID) - } + // We first try to remove the raft node from the raft cluster. On the next tick, if the node + // has been removed from the cluster membership, we then update the store to reflect the fact + // that it has been successfully demoted, and if that works, remove it from the pending list. + rm.removeMember(ctx, member) return } @@ -179,7 +263,7 @@ func (rm *roleManager) reconcileRole(ctx context.Context, node *api.Node) { if err != nil { log.G(ctx).WithError(err).Errorf("failed to demote node %s", node.ID) } else { - delete(rm.pending, node.ID) + delete(rm.pendingReconciliation, node.ID) } } } diff --git a/manager/role_manager_test.go b/manager/role_manager_test.go index 2c812e83c2..2b1b8afcf7 100644 --- a/manager/role_manager_test.go +++ b/manager/role_manager_test.go @@ -67,7 +67,8 @@ func TestRoleManagerRemovesDemotedNodesAndAddsPromotedNodes(t *testing.T) { break } } - rm := newRoleManager(lead.MemoryStore(), lead.Node, fc) + rm := newRoleManager(lead.MemoryStore(), lead.Node) + rm.clocksource = fc go rm.Run(tc.Context) defer rm.Stop() @@ -147,7 +148,8 @@ func TestRoleManagerRemovesDemotedNodesOnStartup(t *testing.T) { demoted := nodes[3] lead := raftutils.Leader(nodes) - rm := newRoleManager(lead.MemoryStore(), lead.Node, fc) + rm := newRoleManager(lead.MemoryStore(), lead.Node) + rm.clocksource = fc go rm.Run(tc.Context) defer rm.Stop() @@ -200,7 +202,8 @@ func TestRoleManagerRemovesDeletedNodesOnStartup(t *testing.T) { } lead := raftutils.Leader(nodes) - rm := newRoleManager(lead.MemoryStore(), lead.Node, fc) + rm := newRoleManager(lead.MemoryStore(), lead.Node) + rm.clocksource = fc go rm.Run(tc.Context) defer rm.Stop() From 36954b5dc780db3ba4b494b61e016ed6f1c0f9fa Mon Sep 17 00:00:00 2001 From: Ying Li Date: Wed, 21 Mar 2018 15:59:15 -0700 Subject: [PATCH 3/3] Also delete nodes from the raft cluster if deletions come in, since a node that is a manager can be removed if it hasn't joined the raft cluster yet. However, perhaps the raft conf change happens right after the check. Signed-off-by: Ying Li --- manager/role_manager.go | 52 +++++++++++++--------- manager/role_manager_test.go | 86 ++++++++++++++++++++++++++++++++++-- 2 files changed, 114 insertions(+), 24 deletions(-) diff --git a/manager/role_manager.go b/manager/role_manager.go index ea002fdf00..e5cf27b62f 100644 --- a/manager/role_manager.go +++ b/manager/role_manager.go @@ -35,9 +35,9 @@ type roleManager struct { // the raft member list. pendingReconciliation map[string]*api.Node - // pendingRemoval contains raft members, indexed by node ID, whose nodes have since - // been deleted - these members need to be removed from raft - pendingRemoval map[string]*api.RaftMember + // pendingRemoval contains the IDs of nodes that have been deleted - if these correspond + // to members in the raft cluster, those members need to be removed from raft + pendingRemoval map[string]struct{} // leave this nil except for tests which need to inject a fake time source clocksource clock.Clock @@ -53,7 +53,7 @@ func newRoleManager(store *store.MemoryStore, raftNode *raft.Node) *roleManager raft: raftNode, doneChan: make(chan struct{}), pendingReconciliation: make(map[string]*api.Node), - pendingRemoval: make(map[string]*api.RaftMember), + pendingRemoval: make(map[string]struct{}), } } @@ -74,11 +74,12 @@ func (rm *roleManager) getTicker(interval time.Duration) clock.Ticker { // These queues are processed immediately, and any nodes that failed to be processed are // processed again in the next reconciliation interval, so that nodes will hopefully eventually // be reconciled. As node updates come in, any promotions or demotions are also added to the -// reconciliation queue and reconciled. +// reconciliation queue and reconciled. As node removals come in, they are added to the removal +// queue to be removed from the raft cluster. -// The removal queue is never added to beyond the startup of the loop, because -// before a node can be removed, it must be demoted, so the demotion should be sufficient to -// get it removed from the cluster membership. +// Removal from a raft cluster is idempotent (and it's the only raft cluster change that will occur +// during reconciliation or removal), so it's fine if a node is in both the removal and reconciliation +// queues. // The ctx param is only used for logging. func (rm *roleManager) Run(ctx context.Context) { @@ -100,7 +101,8 @@ func (rm *roleManager) Run(ctx context.Context) { nodes, err = store.FindNodes(readTx, store.All) return err }, - api.EventUpdateNode{}) + api.EventUpdateNode{}, + api.EventDeleteNode{}) defer cancelWatch() if err != nil { @@ -117,7 +119,7 @@ func (rm *roleManager) Run(ctx context.Context) { // it can contact the other nodes, and makes an RPC call to request to join the // raft cluster. The node it contacts will add the node to the raft membership. for _, member := range rm.raft.GetMemberlist() { - rm.pendingRemoval[member.NodeID] = member + rm.pendingRemoval[member.NodeID] = struct{}{} } for _, node := range nodes { // if the node exists, we don't want it removed from the raft membership cluster @@ -128,8 +130,8 @@ func (rm *roleManager) Run(ctx context.Context) { rm.pendingReconciliation[node.ID] = node rm.reconcileRole(ctx, node) } - for _, removed := range rm.pendingRemoval { - rm.evictRemovedNode(ctx, removed) + for nodeID := range rm.pendingRemoval { + rm.evictRemovedNode(ctx, nodeID) } // If any reconciliations or member removals failed, we want to try again, so // make sure that we start the ticker so we can try again and again every @@ -143,10 +145,18 @@ func (rm *roleManager) Run(ctx context.Context) { for { select { case event := <-watcher: - node := event.(api.EventUpdateNode).Node - rm.pendingReconciliation[node.ID] = node - rm.reconcileRole(ctx, node) - if len(rm.pendingReconciliation) != 0 && ticker == nil { + switch ev := event.(type) { + case api.EventUpdateNode: + rm.pendingReconciliation[ev.Node.ID] = ev.Node + rm.reconcileRole(ctx, ev.Node) + case api.EventDeleteNode: + rm.pendingRemoval[ev.Node.ID] = struct{}{} + rm.evictRemovedNode(ctx, ev.Node.ID) + } + // If any reconciliations or member removals failed, we want to try again, so + // make sure that we start the ticker so we can try again and again every + // roleReconciliationInterval seconds until the queues are both empty. + if (len(rm.pendingReconciliation) != 0 || len(rm.pendingRemoval) != 0) && ticker == nil { ticker = rm.getTicker(roleReconcileInterval) tickerCh = ticker.C() } @@ -154,8 +164,8 @@ func (rm *roleManager) Run(ctx context.Context) { for _, node := range rm.pendingReconciliation { rm.reconcileRole(ctx, node) } - for _, node := range rm.pendingRemoval { - rm.evictRemovedNode(ctx, node) + for nodeID := range rm.pendingRemoval { + rm.evictRemovedNode(ctx, nodeID) } if len(rm.pendingReconciliation) == 0 && len(rm.pendingRemoval) == 0 { ticker.Stop() @@ -174,16 +184,16 @@ func (rm *roleManager) Run(ctx context.Context) { // evictRemovedNode evicts a removed node from the raft cluster membership. This is to cover an edge case in which // a node might have been removed, but somehow the role was not reconciled (possibly a demotion and a removal happened // in rapid succession before the raft membership configuration went through). -func (rm *roleManager) evictRemovedNode(ctx context.Context, raftMember *api.RaftMember) { +func (rm *roleManager) evictRemovedNode(ctx context.Context, nodeID string) { // Check if the member still exists in the membership - member := rm.raft.GetMemberByNodeID(raftMember.NodeID) + member := rm.raft.GetMemberByNodeID(nodeID) if member != nil { // We first try to remove the raft node from the raft cluster. On the next tick, if the node // has been removed from the cluster membership, we then delete it from the removed list rm.removeMember(ctx, member) return } - delete(rm.pendingRemoval, raftMember.NodeID) + delete(rm.pendingRemoval, nodeID) } // removeMember removes a member from the raft cluster membership diff --git a/manager/role_manager_test.go b/manager/role_manager_test.go index 2b1b8afcf7..c8f51916cb 100644 --- a/manager/role_manager_test.go +++ b/manager/role_manager_test.go @@ -79,9 +79,15 @@ func TestRoleManagerRemovesDemotedNodesAndAddsPromotedNodes(t *testing.T) { return store.UpdateNode(tx, n) })) require.NoError(t, testutils.PollFuncWithTimeout(fc, func() error { - if len(lead.GetMemberlist()) != 2 { + memberlist := lead.GetMemberlist() + if len(memberlist) != 2 { return errors.New("raft node hasn't been removed yet") } + for _, m := range memberlist { + if m.NodeID == nonLead.SecurityConfig.ClientTLSCreds.NodeID() { + return errors.New("wrong member was removed") + } + } // use Update just because it returns an error return lead.MemoryStore().Update(func(tx store.Tx) error { if n := store.GetNode(tx, nonLead.SecurityConfig.ClientTLSCreds.NodeID()); n.Role != api.NodeRoleWorker { @@ -154,9 +160,15 @@ func TestRoleManagerRemovesDemotedNodesOnStartup(t *testing.T) { defer rm.Stop() require.NoError(t, testutils.PollFuncWithTimeout(fc, func() error { - if len(lead.GetMemberlist()) != 2 { + memberlist := lead.GetMemberlist() + if len(memberlist) != 2 { return errors.New("raft node hasn't been removed yet") } + for _, m := range memberlist { + if m.NodeID == demoted.SecurityConfig.ClientTLSCreds.NodeID() { + return errors.New("wrong member was removed") + } + } // use Update just because it returns an error return lead.MemoryStore().Update(func(tx store.Tx) error { if n := store.GetNode(tx, demoted.SecurityConfig.ClientTLSCreds.NodeID()); n.Role != api.NodeRoleWorker { @@ -167,6 +179,68 @@ func TestRoleManagerRemovesDemotedNodesOnStartup(t *testing.T) { }, roleReconcileInterval/2)) } +// While roleManager is running, if a node is deleted, it is removed from the raft cluster. +func TestRoleManagerRemovesDeletedNodes(t *testing.T) { + t.Parallel() + + tc := cautils.NewTestCA(nil) + defer tc.Stop() + + nodes, fc := raftutils.NewRaftCluster(t, tc) + defer raftutils.TeardownCluster(nodes) + + // nodes is not a list, but a map. The IDs are 1, 2, 3 + require.Len(t, nodes[1].GetMemberlist(), 3) + + // create node objects in the memory store + for _, node := range nodes { + s := raftutils.Leader(nodes).MemoryStore() + // Create a new node object + require.NoError(t, s.Update(func(tx store.Tx) error { + return store.CreateNode(tx, &api.Node{ + Role: api.NodeRoleManager, + ID: node.SecurityConfig.ClientTLSCreds.NodeID(), + Spec: api.NodeSpec{ + DesiredRole: api.NodeRoleManager, + Membership: api.NodeMembershipAccepted, + Availability: api.NodeAvailabilityActive, + }, + }) + })) + } + + lead := raftutils.Leader(nodes) + var nonLead *raftutils.TestNode + for _, n := range nodes { + if n != lead { + nonLead = n + break + } + } + rm := newRoleManager(lead.MemoryStore(), lead.Node) + rm.clocksource = fc + go rm.Run(tc.Context) + defer rm.Stop() + + // delete the node + require.NoError(t, lead.MemoryStore().Update(func(tx store.Tx) error { + return store.DeleteNode(tx, nonLead.SecurityConfig.ClientTLSCreds.NodeID()) + })) + require.NoError(t, testutils.PollFuncWithTimeout(fc, func() error { + memberlist := lead.GetMemberlist() + if len(memberlist) != 2 { + return errors.New("raft node hasn't been removed yet") + } + for _, m := range memberlist { + if m.NodeID == nonLead.SecurityConfig.ClientTLSCreds.NodeID() { + return errors.New("wrong member was removed") + } + } + return nil + }, roleReconcileInterval/2)) + +} + // If a node was removed before the roleManager starts up, roleManger will remove // the node from the cluster membership. func TestRoleManagerRemovesDeletedNodesOnStartup(t *testing.T) { @@ -208,9 +282,15 @@ func TestRoleManagerRemovesDeletedNodesOnStartup(t *testing.T) { defer rm.Stop() require.NoError(t, testutils.PollFuncWithTimeout(fc, func() error { - if len(lead.GetMemberlist()) != 2 { + memberlist := lead.GetMemberlist() + if len(memberlist) != 2 { return errors.New("raft node hasn't been removed yet") } + for _, m := range memberlist { + if m.NodeID == nodes[3].SecurityConfig.ClientTLSCreds.NodeID() { + return errors.New("wrong member was removed") + } + } return nil }, roleReconcileInterval/2)) }