From 46c52cdcb67137eeaeabe69a3143d317268a56a7 Mon Sep 17 00:00:00 2001 From: Dani Louca Date: Fri, 6 Apr 2018 16:07:14 -0400 Subject: [PATCH] Adding a recovery mechanism for a split gossip cluster Signed-off-by: Dani Louca (cherry picked from commit 744334d441587278813cddbb3f79eb12a48fad1c) --- networkdb/cluster.go | 66 ++++++++++++++++++++++++++++++++---------- networkdb/networkdb.go | 6 ++-- 2 files changed, 54 insertions(+), 18 deletions(-) diff --git a/networkdb/cluster.go b/networkdb/cluster.go index e2bd9424fe..a081275c5e 100644 --- a/networkdb/cluster.go +++ b/networkdb/cluster.go @@ -2,6 +2,7 @@ package networkdb import ( "bytes" + "context" "crypto/rand" "encoding/hex" "fmt" @@ -20,12 +21,14 @@ const ( // The garbage collection logic for entries leverage the presence of the network. // For this reason the expiration time of the network is put slightly higher than the entry expiration so that // there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network. - reapEntryInterval = 30 * time.Minute - reapNetworkInterval = reapEntryInterval + 5*reapPeriod - reapPeriod = 5 * time.Second - retryInterval = 1 * time.Second - nodeReapInterval = 24 * time.Hour - nodeReapPeriod = 2 * time.Hour + reapEntryInterval = 30 * time.Minute + reapNetworkInterval = reapEntryInterval + 5*reapPeriod + reapPeriod = 5 * time.Second + retryInterval = 1 * time.Second + nodeReapInterval = 24 * time.Hour + nodeReapPeriod = 2 * time.Hour + rejoinClusterDuration = 10 * time.Second + rejoinInterval = 60 * time.Second ) type logWriter struct{} @@ -159,7 +162,7 @@ func (nDB *NetworkDB) clusterInit() error { return fmt.Errorf("failed to create memberlist: %v", err) } - nDB.stopCh = make(chan struct{}) + nDB.ctx, nDB.cancelCtx = context.WithCancel(context.Background()) nDB.memberlist = mlist for _, trigger := range []struct { @@ -171,16 +174,17 @@ func (nDB *NetworkDB) clusterInit() error { {config.PushPullInterval, nDB.bulkSyncTables}, {retryInterval, nDB.reconnectNode}, {nodeReapPeriod, nDB.reapDeadNode}, + {rejoinInterval, nDB.rejoinClusterBootStrap}, } { t := time.NewTicker(trigger.interval) - go nDB.triggerFunc(trigger.interval, t.C, nDB.stopCh, trigger.fn) + go nDB.triggerFunc(trigger.interval, t.C, trigger.fn) nDB.tickers = append(nDB.tickers, t) } return nil } -func (nDB *NetworkDB) retryJoin(members []string, stop <-chan struct{}) { +func (nDB *NetworkDB) retryJoin(ctx context.Context, members []string) { t := time.NewTicker(retryInterval) defer t.Stop() @@ -196,7 +200,7 @@ func (nDB *NetworkDB) retryJoin(members []string, stop <-chan struct{}) { continue } return - case <-stop: + case <-ctx.Done(): return } } @@ -207,8 +211,8 @@ func (nDB *NetworkDB) clusterJoin(members []string) error { mlist := nDB.memberlist if _, err := mlist.Join(members); err != nil { - // In case of failure, keep retrying join until it succeeds or the cluster is shutdown. - go nDB.retryJoin(members, nDB.stopCh) + // In case of failure, we no longer need to explicitly call retryJoin. + // rejoinClusterBootStrap, which runs every minute, will retryJoin for 10sec return fmt.Errorf("could not join node to memberlist: %v", err) } @@ -230,7 +234,8 @@ func (nDB *NetworkDB) clusterLeave() error { return err } - close(nDB.stopCh) + // cancel the context + nDB.cancelCtx() for _, t := range nDB.tickers { t.Stop() @@ -239,19 +244,19 @@ func (nDB *NetworkDB) clusterLeave() error { return mlist.Shutdown() } -func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) { +func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, f func()) { // Use a random stagger to avoid syncronizing randStagger := time.Duration(uint64(rnd.Int63()) % uint64(stagger)) select { case <-time.After(randStagger): - case <-stop: + case <-nDB.ctx.Done(): return } for { select { case <-C: f() - case <-stop: + case <-nDB.ctx.Done(): return } } @@ -270,6 +275,35 @@ func (nDB *NetworkDB) reapDeadNode() { } } +// rejoinClusterBootStrap is called periodically to check if all bootStrap nodes are active in the cluster, +// if not, call the cluster join to merge 2 separate clusters that are formed when all managers +// stopped/started at the same time +func (nDB *NetworkDB) rejoinClusterBootStrap() { + nDB.RLock() + if len(nDB.bootStrapIP) == 0 { + nDB.RUnlock() + return + } + + bootStrapIPs := make([]string, 0, len(nDB.bootStrapIP)) + for _, bootIP := range nDB.bootStrapIP { + for _, node := range nDB.nodes { + if node.Addr.Equal(bootIP) { + // One of the bootstrap nodes is part of the cluster, return + nDB.RUnlock() + return + } + } + bootStrapIPs = append(bootStrapIPs, bootIP.String()) + } + nDB.RUnlock() + // None of the bootStrap nodes are in the cluster, call memberlist join + logrus.Debugf("rejoinClusterBootStrap, calling cluster join with bootStrap %v", bootStrapIPs) + ctx, cancel := context.WithTimeout(nDB.ctx, rejoinClusterDuration) + defer cancel() + nDB.retryJoin(ctx, bootStrapIPs) +} + func (nDB *NetworkDB) reconnectNode() { nDB.RLock() if len(nDB.failedNodes) == 0 { diff --git a/networkdb/networkdb.go b/networkdb/networkdb.go index 834e476508..92b5fa4ff9 100644 --- a/networkdb/networkdb.go +++ b/networkdb/networkdb.go @@ -3,6 +3,7 @@ package networkdb //go:generate protoc -I.:../vendor/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/networkdb,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. networkdb.proto import ( + "context" "fmt" "net" "os" @@ -76,9 +77,10 @@ type NetworkDB struct { // Broadcast queue for node event gossip. nodeBroadcasts *memberlist.TransmitLimitedQueue - // A central stop channel to stop all go routines running on + // A central context to stop all go routines running on // behalf of the NetworkDB instance. - stopCh chan struct{} + ctx context.Context + cancelCtx context.CancelFunc // A central broadcaster for all local watchers watching table // events.