Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 26 additions & 35 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/go-events"
"github.com/docker/libnetwork/cluster"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/discoverapi"
"github.com/docker/libnetwork/driverapi"
Expand Down Expand Up @@ -40,7 +41,7 @@ type agent struct {
bindAddr string
advertiseAddr string
dataPathAddr string
epTblCancel func()
coreCancelFuncs []func()
driverCancelFuncs map[string][]func()
sync.Mutex
}
Expand Down Expand Up @@ -192,16 +193,12 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
return nil
}

func (c *controller) agentSetup() error {
c.Lock()
clusterProvider := c.cfg.Daemon.ClusterProvider
agent := c.agent
c.Unlock()
func (c *controller) agentSetup(clusterProvider cluster.Provider) error {
agent := c.getAgent()

if clusterProvider == nil {
msg := "Aborting initialization of Libnetwork Agent because cluster provider is now unset"
logrus.Errorf(msg)
return fmt.Errorf(msg)
// If the agent is already present there is no need to try to initilize it again
if agent != nil {
return nil
}

bindAddr := clusterProvider.GetLocalAddress()
Expand All @@ -221,15 +218,15 @@ func (c *controller) agentSetup() error {
listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList)
if advAddr != "" && agent == nil {
if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
logrus.Errorf("Error in agentInit : %v", err)
} else {
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
if capability.DataScope == datastore.GlobalScope {
c.agentDriverNotify(driver)
}
return false
})
logrus.Errorf("error in agentInit: %v", err)
return err
}
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
if capability.DataScope == datastore.GlobalScope {
c.agentDriverNotify(driver)
}
return false
})
}

if len(remoteAddrList) > 0 {
Expand All @@ -238,14 +235,6 @@ func (c *controller) agentSetup() error {
}
}

c.Lock()
if c.agent != nil && c.agentInitDone != nil {
close(c.agentInitDone)
c.agentInitDone = nil
c.agentStopDone = make(chan struct{})
}
c.Unlock()

return nil
}

Expand Down Expand Up @@ -287,16 +276,12 @@ func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) {
}

func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error {
if !c.isAgent() {
return nil
}

bindAddr, err := resolveAddr(bindAddrOrInterface)
if err != nil {
return err
}

keys, tags := c.getKeys(subsysGossip)
keys, _ := c.getKeys(subsysGossip)
hostname, _ := os.Hostname()
nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
logrus.Info("Gossip cluster hostname ", nodeName)
Expand All @@ -312,16 +297,19 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
return err
}

var cancelList []func()
ch, cancel := nDB.Watch(libnetworkEPTable, "", "")
cancelList = append(cancelList, cancel)
nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "")
cancelList = append(cancelList, cancel)

c.Lock()
c.agent = &agent{
networkDB: nDB,
bindAddr: bindAddr,
advertiseAddr: advertiseAddr,
dataPathAddr: dataPathAddr,
epTblCancel: cancel,
coreCancelFuncs: cancelList,
driverCancelFuncs: make(map[string][]func()),
}
c.Unlock()
Expand All @@ -330,7 +318,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)

drvEnc := discoverapi.DriverEncryptionConfig{}
keys, tags = c.getKeys(subsysIPSec)
keys, tags := c.getKeys(subsysIPSec)
drvEnc.Keys = keys
drvEnc.Tags = tags

Expand Down Expand Up @@ -399,14 +387,17 @@ func (c *controller) agentClose() {
cancelList = append(cancelList, cancel)
}
}

// Add also the cancel functions for the network db
for _, cancel := range agent.coreCancelFuncs {
cancelList = append(cancelList, cancel)
}
agent.Unlock()

for _, cancel := range cancelList {
cancel()
}

agent.epTblCancel()

agent.networkDB.Close()
}

Expand Down
16 changes: 15 additions & 1 deletion cluster/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@ import (
"golang.org/x/net/context"
)

const (
// EventSocketChange control socket changed
EventSocketChange = iota
// EventNodeReady cluster node in ready state
EventNodeReady
// EventNodeLeave node is leaving the cluster
EventNodeLeave
// EventNetworkKeysAvailable network keys correctly configured in the networking layer
EventNetworkKeysAvailable
)

// ConfigEventType type of the event produced by the cluster
type ConfigEventType uint8

// Provider provides clustering config details
type Provider interface {
IsManager() bool
Expand All @@ -14,7 +28,7 @@ type Provider interface {
GetAdvertiseAddress() string
GetDataPathAddress() string
GetRemoteAddressList() []string
ListenClusterEvents() <-chan struct{}
ListenClusterEvents() <-chan ConfigEventType
AttachNetwork(string, string, []string) (*network.NetworkingConfig, error)
DetachNetwork(string, string) error
UpdateAttachment(string, string, *network.NetworkingConfig) error
Expand Down
9 changes: 5 additions & 4 deletions cmd/dnet/dnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/docker/docker/pkg/term"
"github.com/docker/libnetwork"
"github.com/docker/libnetwork/api"
"github.com/docker/libnetwork/cluster"
"github.com/docker/libnetwork/config"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/driverapi"
Expand Down Expand Up @@ -234,7 +235,7 @@ type dnetConnection struct {
// addr holds the client address.
addr string
Orchestration *NetworkOrchestration
configEvent chan struct{}
configEvent chan cluster.ConfigEventType
}

// NetworkOrchestration exported
Expand Down Expand Up @@ -275,7 +276,7 @@ func (d *dnetConnection) dnetDaemon(cfgFile string) error {
controller.SetClusterProvider(d)

if d.Orchestration.Agent || d.Orchestration.Manager {
d.configEvent <- struct{}{}
d.configEvent <- cluster.EventNodeReady
}

createDefaultNetwork(controller)
Expand Down Expand Up @@ -335,7 +336,7 @@ func (d *dnetConnection) GetNetworkKeys() []*types.EncryptionKey {
func (d *dnetConnection) SetNetworkKeys([]*types.EncryptionKey) {
}

func (d *dnetConnection) ListenClusterEvents() <-chan struct{} {
func (d *dnetConnection) ListenClusterEvents() <-chan cluster.ConfigEventType {
return d.configEvent
}

Expand Down Expand Up @@ -438,7 +439,7 @@ func newDnetConnection(val string) (*dnetConnection, error) {
return nil, errors.New("dnet currently only supports tcp transport")
}

return &dnetConnection{protoAddrParts[0], protoAddrParts[1], &NetworkOrchestration{}, make(chan struct{}, 10)}, nil
return &dnetConnection{protoAddrParts[0], protoAddrParts[1], &NetworkOrchestration{}, make(chan cluster.ConfigEventType, 10)}, nil
}

func (d *dnetConnection) httpCall(method, path string, data interface{}, headers map[string][]string) (io.ReadCloser, http.Header, int, error) {
Expand Down
4 changes: 1 addition & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ type DaemonCfg struct {
Labels []string
DriverCfg map[string]interface{}
ClusterProvider cluster.Provider
DisableProvider chan struct{}
}

// ClusterCfg represents cluster configuration
Expand Down Expand Up @@ -74,8 +73,7 @@ func ParseConfig(tomlCfgFile string) (*Config, error) {
func ParseConfigOptions(cfgOptions ...Option) *Config {
cfg := &Config{
Daemon: DaemonCfg{
DriverCfg: make(map[string]interface{}),
DisableProvider: make(chan struct{}, 10),
DriverCfg: make(map[string]interface{}),
},
Scopes: make(map[string]*datastore.ScopeCfg),
}
Expand Down
Loading