Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 83 additions & 51 deletions agent/agent.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agent

import (
"bytes"
"fmt"
"math/rand"
"reflect"
Expand Down Expand Up @@ -44,6 +45,8 @@ type Agent struct {
stopOnce sync.Once // only allow stop to be called once
closed chan struct{} // only closed in run
err error // read only after closed is closed

nodeUpdatePeriod time.Duration
}

// New returns a new agent, ready for task dispatch.
Expand All @@ -53,14 +56,15 @@ func New(config *Config) (*Agent, error) {
}

a := &Agent{
config: config,
sessionq: make(chan sessionOperation),
started: make(chan struct{}),
leaving: make(chan struct{}),
left: make(chan struct{}),
stopped: make(chan struct{}),
closed: make(chan struct{}),
ready: make(chan struct{}),
config: config,
sessionq: make(chan sessionOperation),
started: make(chan struct{}),
leaving: make(chan struct{}),
left: make(chan struct{}),
stopped: make(chan struct{}),
closed: make(chan struct{}),
ready: make(chan struct{}),
nodeUpdatePeriod: nodeUpdatePeriod,
}

a.worker = newWorker(config.DB, config.Executor, a)
Expand Down Expand Up @@ -182,13 +186,15 @@ func (a *Agent) run(ctx context.Context) {
log.G(ctx).Debug("(*Agent).run")
defer log.G(ctx).Debug("(*Agent).run exited")

nodeTLSInfo := a.config.NodeTLSInfo

// get the node description
nodeDescription, err := a.nodeDescriptionWithHostname(ctx)
nodeDescription, err := a.nodeDescriptionWithHostname(ctx, nodeTLSInfo)
if err != nil {
log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Error("agent: node description unavailable")
}
// nodeUpdateTicker is used to periodically check for updates to node description
nodeUpdateTicker := time.NewTicker(nodeUpdatePeriod)
nodeUpdateTicker := time.NewTicker(a.nodeUpdatePeriod)
defer nodeUpdateTicker.Stop()

var (
Expand All @@ -214,6 +220,35 @@ func (a *Agent) run(ctx context.Context) {

a.worker.Listen(ctx, reporter)

updateNode := func() {
// skip updating if the registration isn't finished
if registered != nil {
return
}
// get the current node description
newNodeDescription, err := a.nodeDescriptionWithHostname(ctx, nodeTLSInfo)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, a.nodeDescriptionWithHostname can return nil, nil? Shouldn't we simply return inside of the if err != nil?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm not entirely sure about this one...

if err != nil {
log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Error("agent: updated node description unavailable")
}

// if newNodeDescription is nil, it will cause a panic when
// trying to create a session. Typically this can happen
// if the engine goes down
if newNodeDescription == nil {
return
}

// if the node description has changed, update it to the new one
// and close the session. The old session will be stopped and a
// new one will be created with the updated description
if !reflect.DeepEqual(nodeDescription, newNodeDescription) {
nodeDescription = newNodeDescription
// close the session
log.G(ctx).Info("agent: found node update")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

found sounds weird. Received?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually sending the node update

session.sendError(nil)
}
}

for {
select {
case operation := <-sessionq:
Expand Down Expand Up @@ -247,7 +282,7 @@ func (a *Agent) run(ctx context.Context) {
}
}
case msg := <-session.messages:
if err := a.handleSessionMessage(ctx, msg); err != nil {
if err := a.handleSessionMessage(ctx, msg, nodeTLSInfo); err != nil {
log.G(ctx).WithError(err).Error("session message handler failed")
}
case sub := <-session.subscriptions:
Expand Down Expand Up @@ -305,33 +340,17 @@ func (a *Agent) run(ctx context.Context) {
}
session = newSession(ctx, a, delay, session.sessionID, nodeDescription)
registered = session.registered
case <-nodeUpdateTicker.C:
// skip this case if the registration isn't finished
if registered != nil {
continue
}
// get the current node description
newNodeDescription, err := a.nodeDescriptionWithHostname(ctx)
if err != nil {
log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Error("agent: updated node description unavailable")
}

// if newNodeDescription is nil, it will cause a panic when
// trying to create a session. Typically this can happen
// if the engine goes down
if newNodeDescription == nil {
continue
}

// if the node description has changed, update it to the new one
// and close the session. The old session will be stopped and a
// new one will be created with the updated description
if !reflect.DeepEqual(nodeDescription, newNodeDescription) {
nodeDescription = newNodeDescription
// close the session
log.G(ctx).Info("agent: found node update")
session.sendError(nil)
case ev := <-a.config.NotifyTLSChange:
// the TLS info has changed, so force a check to see if we need to restart the session
if tlsInfo, ok := ev.(*api.NodeTLSInfo); ok {
nodeTLSInfo = tlsInfo
updateNode()
nodeUpdateTicker.Stop()
nodeUpdateTicker = time.NewTicker(a.nodeUpdatePeriod)
}
case <-nodeUpdateTicker.C:
// periodically check to see whether the node information has changed, and if so, restart the session
updateNode()
case <-a.stopped:
// TODO(stevvooe): Wait on shutdown and cleanup. May need to pump
// this loop a few times.
Expand All @@ -347,7 +366,7 @@ func (a *Agent) run(ctx context.Context) {
}
}

func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMessage) error {
func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMessage, nti *api.NodeTLSInfo) error {
seen := map[api.Peer]struct{}{}
for _, manager := range message.Managers {
if manager.Peer.Addr == "" {
Expand All @@ -358,18 +377,28 @@ func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMe
seen[*manager.Peer] = struct{}{}
}

if message.Node != nil {
if a.node == nil || !nodesEqual(a.node, message.Node) {
if a.config.NotifyNodeChange != nil {
a.config.NotifyNodeChange <- message.Node.Copy()
}
a.node = message.Node.Copy()
if err := a.config.Executor.Configure(ctx, a.node); err != nil {
log.G(ctx).WithError(err).Error("node configure failed")
}
var changes *NodeChanges
if message.Node != nil && (a.node == nil || !nodesEqual(a.node, message.Node)) {
if a.config.NotifyNodeChange != nil {
changes = &NodeChanges{Node: message.Node.Copy()}
}
a.node = message.Node.Copy()
if err := a.config.Executor.Configure(ctx, a.node); err != nil {
log.G(ctx).WithError(err).Error("node configure failed")
}
}
if len(message.RootCA) > 0 && !bytes.Equal(message.RootCA, nti.TrustRoot) {
if changes == nil {
changes = &NodeChanges{RootCert: message.RootCA}
} else {
changes.RootCert = message.RootCA
}
}

if changes != nil {
a.config.NotifyNodeChange <- changes
}

// prune managers not in list.
for peer := range a.config.ConnBroker.Remotes().Weights() {
if _, ok := seen[peer]; !ok {
Expand Down Expand Up @@ -517,12 +546,15 @@ func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogP
}

// nodeDescriptionWithHostname retrieves node description, and overrides hostname if available
func (a *Agent) nodeDescriptionWithHostname(ctx context.Context) (*api.NodeDescription, error) {
func (a *Agent) nodeDescriptionWithHostname(ctx context.Context, tlsInfo *api.NodeTLSInfo) (*api.NodeDescription, error) {
desc, err := a.config.Executor.Describe(ctx)

// Override hostname
if a.config.Hostname != "" && desc != nil {
desc.Hostname = a.config.Hostname
// Override hostname and TLS info
if desc != nil {
if a.config.Hostname != "" && desc != nil {
desc.Hostname = a.config.Hostname
}
desc.TLSInfo = tlsInfo
}
return desc, err
}
Expand Down
Loading