Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,18 +285,35 @@ func (nDB *NetworkDB) rejoinClusterBootStrap() {
return
}

myself, _ := nDB.nodes[nDB.config.NodeID]
bootStrapIPs := make([]string, 0, len(nDB.bootStrapIP))
for _, bootIP := range nDB.bootStrapIP {
for _, node := range nDB.nodes {
if node.Addr.Equal(bootIP) {
// One of the bootstrap nodes is part of the cluster, return
nDB.RUnlock()
return
// botostrap IPs are usually IP:port from the Join
var bootstrapIP net.IP
ipStr, _, err := net.SplitHostPort(bootIP)
if err != nil {
// try to parse it as an IP with port
// Note this seems to be the case for swarm that do not specify any port
ipStr = bootIP
}
bootstrapIP = net.ParseIP(ipStr)
if bootstrapIP != nil {
for _, node := range nDB.nodes {
if node.Addr.Equal(bootstrapIP) && !node.Addr.Equal(myself.Addr) {
// One of the bootstrap nodes (and not myself) is part of the cluster, return
nDB.RUnlock()
return
}
}
bootStrapIPs = append(bootStrapIPs, bootIP)
}
bootStrapIPs = append(bootStrapIPs, bootIP.String())
}
nDB.RUnlock()
if len(bootStrapIPs) == 0 {
// this will also avoid to call the Join with an empty list erasing the current bootstrap ip list
logrus.Debug("rejoinClusterBootStrap did not find any valid IP")
return
}
// None of the bootStrap nodes are in the cluster, call memberlist join
logrus.Debugf("rejoinClusterBootStrap, calling cluster join with bootStrap %v", bootStrapIPs)
ctx, cancel := context.WithTimeout(nDB.ctx, rejoinClusterDuration)
Expand Down
9 changes: 3 additions & 6 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package networkdb
import (
"context"
"fmt"
"net"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -96,7 +95,7 @@ type NetworkDB struct {

// bootStrapIP is the list of IPs that can be used to bootstrap
// the gossip.
bootStrapIP []net.IP
bootStrapIP []string

// lastStatsTimestamp is the last timestamp when the stats got printed
lastStatsTimestamp time.Time
Expand Down Expand Up @@ -268,10 +267,8 @@ func New(c *Config) (*NetworkDB, error) {
// instances passed by the caller in the form of addr:port
func (nDB *NetworkDB) Join(members []string) error {
nDB.Lock()
nDB.bootStrapIP = make([]net.IP, 0, len(members))
for _, m := range members {
nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m))
}
nDB.bootStrapIP = append([]string(nil), members...)
logrus.Infof("The new bootstrap node list is:%v", nDB.bootStrapIP)
nDB.Unlock()
return nDB.clusterJoin(members)
}
Expand Down
70 changes: 65 additions & 5 deletions networkdb/networkdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,22 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}

func launchNode(t *testing.T, conf Config) *NetworkDB {
db, err := New(&conf)
require.NoError(t, err)
return db
}

func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB {
var dbs []*NetworkDB
for i := 0; i < num; i++ {
localConfig := *conf
localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1))
db, err := New(&localConfig)
require.NoError(t, err)

db := launchNode(t, localConfig)
if i != 0 {
err = db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)})
assert.NoError(t, err)
assert.NoError(t, db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)}))
}

dbs = append(dbs, db)
Expand Down Expand Up @@ -803,3 +806,60 @@ func TestParallelDelete(t *testing.T) {

closeNetworkDBInstances(dbs)
}

func TestNetworkDBIslands(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel)
dbs := createNetworkDBInstances(t, 5, "node", DefaultConfig())

// Get the node IP used currently
node, _ := dbs[0].nodes[dbs[0].config.NodeID]
baseIPStr := node.Addr.String()
// Node 0,1,2 are going to be the 3 bootstrap nodes
members := []string{fmt.Sprintf("%s:%d", baseIPStr, dbs[0].config.BindPort),
fmt.Sprintf("%s:%d", baseIPStr, dbs[1].config.BindPort),
fmt.Sprintf("%s:%d", baseIPStr, dbs[2].config.BindPort)}
// Rejoining will update the list of the bootstrap members
for i := 3; i < 5; i++ {
assert.NoError(t, dbs[i].Join(members))
}

// Now the 3 bootstrap nodes will cleanly leave, and will be properly removed from the other 2 nodes
for i := 0; i < 3; i++ {
logrus.Infof("node %d leaving", i)
dbs[i].Close()
time.Sleep(2 * time.Second)
}

// Give some time to let the system propagate the messages and free up the ports
time.Sleep(10 * time.Second)

// Verify that the nodes are actually all gone and marked appropiately
for i := 3; i < 5; i++ {
assert.Len(t, dbs[i].leftNodes, 3)
assert.Len(t, dbs[i].failedNodes, 0)
}

// Spawn again the first 3 nodes with different names but same IP:port
for i := 0; i < 3; i++ {
logrus.Infof("node %d coming back", i)
dbs[i].config.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
dbs[i] = launchNode(t, *dbs[i].config)
time.Sleep(2 * time.Second)
}

// Give some time for the reconnect routine to run, it runs every 60s
time.Sleep(50 * time.Second)

// Verify that the cluster is again all connected. Note that the 3 previous node did not do any join
for i := 0; i < 5; i++ {
assert.Len(t, dbs[i].nodes, 5)
assert.Len(t, dbs[i].failedNodes, 0)
if i < 3 {
// nodes from 0 to 3 has no left nodes
assert.Len(t, dbs[i].leftNodes, 0)
} else {
// nodes from 4 to 5 has the 3 previous left nodes
assert.Len(t, dbs[i].leftNodes, 3)
}
}
}