Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 48 additions & 14 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package networkdb

import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"fmt"
Expand All @@ -17,10 +18,12 @@ import (
)

const (
reapPeriod = 5 * time.Second
retryInterval = 1 * time.Second
nodeReapInterval = 24 * time.Hour
nodeReapPeriod = 2 * time.Hour
reapPeriod = 5 * time.Second
rejoinClusterDuration = 10 * time.Second
rejoinInterval = 60 * time.Second
retryInterval = 1 * time.Second
nodeReapInterval = 24 * time.Hour
nodeReapPeriod = 2 * time.Hour
)

type logWriter struct{}
Expand Down Expand Up @@ -154,7 +157,7 @@ func (nDB *NetworkDB) clusterInit() error {
return fmt.Errorf("failed to create memberlist: %v", err)
}

nDB.stopCh = make(chan struct{})
nDB.ctx, nDB.cancelCtx = context.WithCancel(context.Background())
nDB.memberlist = mlist

for _, trigger := range []struct {
Expand All @@ -166,16 +169,17 @@ func (nDB *NetworkDB) clusterInit() error {
{config.PushPullInterval, nDB.bulkSyncTables},
{retryInterval, nDB.reconnectNode},
{nodeReapPeriod, nDB.reapDeadNode},
{rejoinInterval, nDB.rejoinClusterBootStrap},
} {
t := time.NewTicker(trigger.interval)
go nDB.triggerFunc(trigger.interval, t.C, nDB.stopCh, trigger.fn)
go nDB.triggerFunc(trigger.interval, t.C, trigger.fn)
nDB.tickers = append(nDB.tickers, t)
}

return nil
}

func (nDB *NetworkDB) retryJoin(members []string, stop <-chan struct{}) {
func (nDB *NetworkDB) retryJoin(ctx context.Context, members []string) {
t := time.NewTicker(retryInterval)
defer t.Stop()

Expand All @@ -191,7 +195,7 @@ func (nDB *NetworkDB) retryJoin(members []string, stop <-chan struct{}) {
continue
}
return
case <-stop:
case <-ctx.Done():
return
}
}
Expand All @@ -202,8 +206,8 @@ func (nDB *NetworkDB) clusterJoin(members []string) error {
mlist := nDB.memberlist

if _, err := mlist.Join(members); err != nil {
// In case of failure, keep retrying join until it succeeds or the cluster is shutdown.
go nDB.retryJoin(members, nDB.stopCh)
// In case of failure, we no longer need to explicitly call retryJoin.
// rejoinClusterBootStrap, which runs every minute, will retryJoin for 10sec
return fmt.Errorf("could not join node to memberlist: %v", err)
}

Expand All @@ -225,7 +229,8 @@ func (nDB *NetworkDB) clusterLeave() error {
return err
}

close(nDB.stopCh)
// cancel the context
nDB.cancelCtx()

for _, t := range nDB.tickers {
t.Stop()
Expand All @@ -234,19 +239,19 @@ func (nDB *NetworkDB) clusterLeave() error {
return mlist.Shutdown()
}

func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) {
func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, f func()) {
// Use a random stagger to avoid syncronizing
randStagger := time.Duration(uint64(rnd.Int63()) % uint64(stagger))
select {
case <-time.After(randStagger):
case <-stop:
case <-nDB.ctx.Done():
return
}
for {
select {
case <-C:
f()
case <-stop:
case <-nDB.ctx.Done():
return
}
}
Expand All @@ -270,6 +275,35 @@ func (nDB *NetworkDB) reapDeadNode() {
}
}

// rejoinClusterBootStrap is called periodically to check if all bootStrap nodes are active in the cluster,
// if not, call the cluster join to merge 2 separate clusters that are formed when all managers
// stopped/started at the same time
func (nDB *NetworkDB) rejoinClusterBootStrap() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it make sense to also attempt to refresh nDB.bootStrapIP here through a call to something like GetRemoteAddressList in case the list has changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fcrisciani and I had a discussion about other use cases that can lead us to the same "split cluster" and the possibility of re-checking/updating the bootStrap IPs (through GetRemoteAddressList)
1- Re-ip the managers .
2- Demote/Promote managers/workers .

The first one is not an issue as a re-ip to all managers will force all nodes in the cluster to restart .
As for 2), customers will only hit it if they demoted all managers in the cluster + restarting those managers without restarting the workers... We felt like this is a very edge case.

This has been said, If you guys think we should still refresh the bootStrapIP, then we can add the logic to the newly introduced rejoinClusterBootStrap

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the 2 problems can be handled separately. Memberist with this PR will honor the bootstrapIP that got passed at the beginning.
The second issue will need a periodic check of the GetRemoteAddressList and if the list change, the routine have to call the networkDB.Join with the new bootstrapIPs

nDB.RLock()
if len(nDB.bootStrapIP) == 0 {
nDB.RUnlock()
return
}

bootStrapIPs := make([]string, 0, len(nDB.bootStrapIP))
for _, bootIP := range nDB.bootStrapIP {
for _, node := range nDB.nodes {
if node.Addr.Equal(bootIP) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dani-docker thinking more about this, I guess here it's missing the check that the IP is != from current node IP else this fix won't work for the managers. Every manager will see itself in the list and won't try to reconnect

// One of the bootstrap nodes is part of the cluster, return
nDB.RUnlock()
return
}
}
bootStrapIPs = append(bootStrapIPs, bootIP.String())
}
nDB.RUnlock()
// None of the bootStrap nodes are in the cluster, call memberlist join
logrus.Debugf("rejoinClusterBootStrap, calling cluster join with bootStrap %v", bootStrapIPs)
ctx, cancel := context.WithTimeout(nDB.ctx, rejoinClusterDuration)
defer cancel()
nDB.retryJoin(ctx, bootStrapIPs)
}

func (nDB *NetworkDB) reconnectNode() {
nDB.RLock()
if len(nDB.failedNodes) == 0 {
Expand Down
15 changes: 5 additions & 10 deletions networkdb/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,11 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
// If we are here means that the event is fresher and the node is known. Update the laport time
n.ltime = nEvent.LTime

// If it is a node leave event for a manager and this is the only manager we
// know of we want the reconnect logic to kick in. In a single manager
// cluster manager's gossip can't be bootstrapped unless some other node
// connects to it.
if len(nDB.bootStrapIP) == 1 && nEvent.Type == NodeEventTypeLeave {
for _, ip := range nDB.bootStrapIP {
if ip.Equal(n.Addr) {
return true
}
}
// If the node is not known from memberlist we cannot process save any state of it else if it actually
// dies we won't receive any notification and we will remain stuck with it
if _, ok := nDB.nodes[nEvent.NodeName]; !ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the nDB.findNode check necessary above [L28] given this check?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only to filter based on the lamport clock

logrus.Error("node: %s is unknown to memberlist", nEvent.NodeName)
return false
}

switch nEvent.Type {
Expand Down
6 changes: 4 additions & 2 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package networkdb
//go:generate protoc -I.:../vendor/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/networkdb,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. networkdb.proto

import (
"context"
"fmt"
"net"
"os"
Expand Down Expand Up @@ -77,9 +78,10 @@ type NetworkDB struct {
// Broadcast queue for node event gossip.
nodeBroadcasts *memberlist.TransmitLimitedQueue

// A central stop channel to stop all go routines running on
// A central context to stop all go routines running on
// behalf of the NetworkDB instance.
stopCh chan struct{}
ctx context.Context
cancelCtx context.CancelFunc

// A central broadcaster for all local watchers watching table
// events.
Expand Down