Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 50 additions & 16 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package networkdb

import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"fmt"
Expand All @@ -20,12 +21,14 @@ const (
// The garbage collection logic for entries leverage the presence of the network.
// For this reason the expiration time of the network is put slightly higher than the entry expiration so that
// there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network.
reapEntryInterval = 30 * time.Minute
reapNetworkInterval = reapEntryInterval + 5*reapPeriod
reapPeriod = 5 * time.Second
retryInterval = 1 * time.Second
nodeReapInterval = 24 * time.Hour
nodeReapPeriod = 2 * time.Hour
reapEntryInterval = 30 * time.Minute
reapNetworkInterval = reapEntryInterval + 5*reapPeriod
reapPeriod = 5 * time.Second
retryInterval = 1 * time.Second
nodeReapInterval = 24 * time.Hour
nodeReapPeriod = 2 * time.Hour
rejoinClusterDuration = 10 * time.Second
rejoinInterval = 60 * time.Second
)

type logWriter struct{}
Expand Down Expand Up @@ -159,7 +162,7 @@ func (nDB *NetworkDB) clusterInit() error {
return fmt.Errorf("failed to create memberlist: %v", err)
}

nDB.stopCh = make(chan struct{})
nDB.ctx, nDB.cancelCtx = context.WithCancel(context.Background())
nDB.memberlist = mlist

for _, trigger := range []struct {
Expand All @@ -171,16 +174,17 @@ func (nDB *NetworkDB) clusterInit() error {
{config.PushPullInterval, nDB.bulkSyncTables},
{retryInterval, nDB.reconnectNode},
{nodeReapPeriod, nDB.reapDeadNode},
{rejoinInterval, nDB.rejoinClusterBootStrap},
} {
t := time.NewTicker(trigger.interval)
go nDB.triggerFunc(trigger.interval, t.C, nDB.stopCh, trigger.fn)
go nDB.triggerFunc(trigger.interval, t.C, trigger.fn)
nDB.tickers = append(nDB.tickers, t)
}

return nil
}

func (nDB *NetworkDB) retryJoin(members []string, stop <-chan struct{}) {
func (nDB *NetworkDB) retryJoin(ctx context.Context, members []string) {
t := time.NewTicker(retryInterval)
defer t.Stop()

Expand All @@ -196,7 +200,7 @@ func (nDB *NetworkDB) retryJoin(members []string, stop <-chan struct{}) {
continue
}
return
case <-stop:
case <-ctx.Done():
return
}
}
Expand All @@ -207,8 +211,8 @@ func (nDB *NetworkDB) clusterJoin(members []string) error {
mlist := nDB.memberlist

if _, err := mlist.Join(members); err != nil {
// In case of failure, keep retrying join until it succeeds or the cluster is shutdown.
go nDB.retryJoin(members, nDB.stopCh)
// In case of failure, we no longer need to explicitly call retryJoin.
// rejoinClusterBootStrap, which runs every minute, will retryJoin for 10sec
return fmt.Errorf("could not join node to memberlist: %v", err)
}

Expand All @@ -230,7 +234,8 @@ func (nDB *NetworkDB) clusterLeave() error {
return err
}

close(nDB.stopCh)
// cancel the context
nDB.cancelCtx()

for _, t := range nDB.tickers {
t.Stop()
Expand All @@ -239,19 +244,19 @@ func (nDB *NetworkDB) clusterLeave() error {
return mlist.Shutdown()
}

func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) {
func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, f func()) {
// Use a random stagger to avoid syncronizing
randStagger := time.Duration(uint64(rnd.Int63()) % uint64(stagger))
select {
case <-time.After(randStagger):
case <-stop:
case <-nDB.ctx.Done():
return
}
for {
select {
case <-C:
f()
case <-stop:
case <-nDB.ctx.Done():
return
}
}
Expand All @@ -270,6 +275,35 @@ func (nDB *NetworkDB) reapDeadNode() {
}
}

// rejoinClusterBootStrap is called periodically to check if all bootStrap nodes are active in the cluster,
// if not, call the cluster join to merge 2 separate clusters that are formed when all managers
// stopped/started at the same time
func (nDB *NetworkDB) rejoinClusterBootStrap() {
nDB.RLock()
if len(nDB.bootStrapIP) == 0 {
nDB.RUnlock()
return
}

bootStrapIPs := make([]string, 0, len(nDB.bootStrapIP))
for _, bootIP := range nDB.bootStrapIP {
for _, node := range nDB.nodes {
if node.Addr.Equal(bootIP) {
// One of the bootstrap nodes is part of the cluster, return
nDB.RUnlock()
return
}
}
bootStrapIPs = append(bootStrapIPs, bootIP.String())
}
nDB.RUnlock()
// None of the bootStrap nodes are in the cluster, call memberlist join
logrus.Debugf("rejoinClusterBootStrap, calling cluster join with bootStrap %v", bootStrapIPs)
ctx, cancel := context.WithTimeout(nDB.ctx, rejoinClusterDuration)
defer cancel()
nDB.retryJoin(ctx, bootStrapIPs)
}

func (nDB *NetworkDB) reconnectNode() {
nDB.RLock()
if len(nDB.failedNodes) == 0 {
Expand Down
6 changes: 4 additions & 2 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package networkdb
//go:generate protoc -I.:../vendor/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/networkdb,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. networkdb.proto

import (
"context"
"fmt"
"net"
"os"
Expand Down Expand Up @@ -76,9 +77,10 @@ type NetworkDB struct {
// Broadcast queue for node event gossip.
nodeBroadcasts *memberlist.TransmitLimitedQueue

// A central stop channel to stop all go routines running on
// A central context to stop all go routines running on
// behalf of the NetworkDB instance.
stopCh chan struct{}
ctx context.Context
cancelCtx context.CancelFunc

// A central broadcaster for all local watchers watching table
// events.
Expand Down