Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 153 additions & 12 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package agent

import (
"crypto/tls"
"errors"
"fmt"
"net"
"os"
"sync"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

events "github.com/docker/go-events"
agentutils "github.com/docker/swarmkit/agent/testutils"
"github.com/docker/swarmkit/api"
Expand All @@ -14,11 +21,27 @@ import (
"github.com/docker/swarmkit/connectionbroker"
"github.com/docker/swarmkit/remotes"
"github.com/docker/swarmkit/testutils"
"github.com/docker/swarmkit/xnet"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
)

var localDispatcher = false

// TestMain runs every test in this file twice - once with a local dispatcher, and
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there separate code paths for local and remote dispatchers?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are forking the context twice for the same reason.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there separate code paths for local and remote dispatchers?

Sort of, although it's not here. We are adding the extra context so we can cancel it because closing the connection doesn't work on local connections. connectionbroker ignores closes on local connections, so the session can't actually be restarted if it's a local connection, if we just close the connection. This is sort of a regression test - it fails without the context changes in the rest of this PR.

// once again with a remote dispatcher
func TestMain(m *testing.M) {
localDispatcher = false
dispatcherRPCTimeout = 500 * time.Millisecond
if status := m.Run(); status != 0 {
os.Exit(status)
}

localDispatcher = true
os.Exit(m.Run())
}

func TestAgent(t *testing.T) {
// TODO(stevvooe): The current agent is fairly monolithic, making it hard
// to test without implementing or mocking an entire master. We'd like to
Expand Down Expand Up @@ -237,6 +260,13 @@ func TestSessionRestartedOnNodeDescriptionChange(t *testing.T) {
require.Equal(t, "testAgent", gotSession.Description.Hostname)
currSession = gotSession

// If nothing changes, the session is not re-established
tlsCh <- gotSession.Description.TLSInfo
time.Sleep(1 * time.Second)
gotSession, closedSessions = tester.dispatcher.GetSessions()
require.Equal(t, currSession, gotSession)
require.Len(t, closedSessions, 1)

newTLSInfo := &api.NodeTLSInfo{
TrustRoot: cautils.ECDSA256SHA256Cert,
CertIssuerPublicKey: []byte("public key"),
Expand All @@ -259,12 +289,71 @@ func TestSessionRestartedOnNodeDescriptionChange(t *testing.T) {
require.Equal(t, newTLSInfo, gotSession.Description.TLSInfo)
}

// If the dispatcher returns an error, if it times out, or if it's unreachable, no matter
// what the agent attempts to reconnect and rebuild a new session.
func TestSessionReconnectsIfDispatcherErrors(t *testing.T) {
tlsCh := make(chan events.Event, 1)
defer close(tlsCh)

tester := agentTestEnv(t, nil, tlsCh)
defer tester.cleanup()

// create a second dispatcher we can fall back on
anotherConfig, err := tester.testCA.NewNodeConfig(ca.ManagerRole)
require.NoError(t, err)
anotherDispatcher, stop := agentutils.NewMockDispatcher(t, anotherConfig, false) // this one is not local, because the other one may be
defer stop()

var counter int
anotherDispatcher.SetSessionHandler(func(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error {
if counter == 0 {
counter++
return errors.New("terminate immediately")
}
// hang forever until the other side cancels, and then set the session to nil so we use the default one
defer anotherDispatcher.SetSessionHandler(nil)
<-stream.Context().Done()
return stream.Context().Err()
})

// ok, agent should have connect to the first dispatcher by now - if it has, kill the first dispatcher and ensure
// the agent connects to the second one
require.NoError(t, testutils.PollFuncWithTimeout(nil, func() error {
gotSession, closedSessions := tester.dispatcher.GetSessions()
if gotSession == nil {
return errors.New("no current session")
}
if len(closedSessions) != 0 {
return fmt.Errorf("expecting 0 closed sessions, got %d", len(closedSessions))
}
return nil
}, 2*time.Second))
tester.stopDispatcher()
tester.remotes.setPeer(api.Peer{Addr: anotherDispatcher.Addr})
tester.agent.config.ConnBroker.SetLocalConn(nil)

// It should have connected with the second dispatcher 3 times - first because the first dispatcher died,
// second because the dispatcher returned an error, third time because the session timed out. So there should
// be 2 closed sessions.
require.NoError(t, testutils.PollFuncWithTimeout(nil, func() error {
gotSession, closedSessions := anotherDispatcher.GetSessions()
if gotSession == nil {
return errors.New("no current session")
}
if len(closedSessions) != 2 {
return fmt.Errorf("expecting 2 closed sessions, got %d", len(closedSessions))
}
return nil
}, 5*time.Second))
}

type agentTester struct {
agent *Agent
dispatcher *agentutils.MockDispatcher
executor *agentutils.TestExecutor
cleanup func()
testCA *cautils.TestCA
agent *Agent
dispatcher *agentutils.MockDispatcher
executor *agentutils.TestExecutor
stopDispatcher, cleanup func()
testCA *cautils.TestCA
remotes *fakeRemotes
}

func agentTestEnv(t *testing.T, nodeChangeCh chan *NodeChanges, tlsChangeCh chan events.Event) *agentTester {
Expand All @@ -277,10 +366,28 @@ func agentTestEnv(t *testing.T, nodeChangeCh chan *NodeChanges, tlsChangeCh chan
managerSecurityConfig, err := tc.NewNodeConfig(ca.ManagerRole)
require.NoError(t, err)

mockDispatcher, mockDispatcherStop := agentutils.NewMockDispatcher(t, managerSecurityConfig)
mockDispatcher, mockDispatcherStop := agentutils.NewMockDispatcher(t, managerSecurityConfig, localDispatcher)
cleanup = append(cleanup, mockDispatcherStop)

remotes := remotes.NewRemotes(api.Peer{Addr: mockDispatcher.Addr})
fr := &fakeRemotes{}
broker := connectionbroker.New(fr)
if localDispatcher {
insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
conn, err := grpc.Dial(
mockDispatcher.Addr,
grpc.WithTransportCredentials(insecureCreds),
grpc.WithDialer(
func(addr string, timeout time.Duration) (net.Conn, error) {
return xnet.DialTimeoutLocal(addr, timeout)
}),
)
require.NoError(t, err)
cleanup = append(cleanup, func() { conn.Close() })

broker.SetLocalConn(conn)
} else {
fr.setPeer(api.Peer{Addr: mockDispatcher.Addr})
}

db, cleanupStorage := storageTestEnv(t)
cleanup = append(cleanup, func() { cleanupStorage() })
Expand All @@ -289,7 +396,7 @@ func agentTestEnv(t *testing.T, nodeChangeCh chan *NodeChanges, tlsChangeCh chan

agent, err := New(&Config{
Executor: executor,
ConnBroker: connectionbroker.New(remotes),
ConnBroker: broker,
Credentials: agentSecurityConfig.ClientTLSCreds,
DB: db,
NotifyNodeChange: nodeChangeCh,
Expand Down Expand Up @@ -321,15 +428,49 @@ func agentTestEnv(t *testing.T, nodeChangeCh chan *NodeChanges, tlsChangeCh chan
}

return &agentTester{
agent: agent,
dispatcher: mockDispatcher,
executor: executor,
testCA: tc,
agent: agent,
dispatcher: mockDispatcher,
stopDispatcher: mockDispatcherStop,
executor: executor,
testCA: tc,
cleanup: func() {
// go in reverse order
for i := len(cleanup) - 1; i >= 0; i-- {
cleanup[i]()
}
},
remotes: fr,
}
}

// fakeRemotes is a Remotes interface that just always selects the current remote until
// it is switched out
type fakeRemotes struct {
mu sync.Mutex
peer api.Peer
}

func (f *fakeRemotes) Weights() map[api.Peer]int {
f.mu.Lock()
defer f.mu.Unlock()
return map[api.Peer]int{f.peer: 1}
}

func (f *fakeRemotes) Select(...string) (api.Peer, error) {
f.mu.Lock()
defer f.mu.Unlock()
return f.peer, nil
}

// do nothing
func (f *fakeRemotes) Observe(peer api.Peer, weight int) {}
func (f *fakeRemotes) ObserveIfExists(peer api.Peer, weight int) {}
func (f *fakeRemotes) Remove(addrs ...api.Peer) {}

func (f *fakeRemotes) setPeer(p api.Peer) {
f.mu.Lock()
f.peer = p
f.mu.Unlock()
}

var _ remotes.Remotes = &fakeRemotes{}
18 changes: 14 additions & 4 deletions agent/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ import (
"google.golang.org/grpc/codes"
)

const dispatcherRPCTimeout = 5 * time.Second

var (
dispatcherRPCTimeout = 5 * time.Second
errSessionDisconnect = errors.New("agent: session disconnect") // instructed to disconnect
errSessionClosed = errors.New("agent: session closed")
)
Expand All @@ -39,12 +38,14 @@ type session struct {
assignments chan *api.AssignmentsMessage
subscriptions chan *api.SubscriptionMessage

cancel func() // this is assumed to be never nil, and set whenever a session is created
registered chan struct{} // closed registration
closed chan struct{}
closeOnce sync.Once
}

func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionID string, description *api.NodeDescription) *session {
sessionCtx, sessionCancel := context.WithCancel(ctx)
s := &session{
agent: agent,
sessionID: sessionID,
Expand All @@ -54,6 +55,7 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI
subscriptions: make(chan *api.SubscriptionMessage),
registered: make(chan struct{}),
closed: make(chan struct{}),
cancel: sessionCancel,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe (*session).start should use this cancel function instead of forking the context again.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using this cancelfunc instead of the forked context's cancel func in start seems like it muddles responsibilities. Unless there's some overhead concern I'm not aware of, this seems fine to me as-is.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think either would work. If the session fails or times out, the session needs to be closed and restarted anyway, so I think the same session + cancellation can be applicable. I'm happy to go either way.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with the same session+cancellation because it seems like newSession is the only thing that calls session.run, which is contingent upon session.start succeeding. Again though, happy to go either way.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually @dperny was correct - we do need to fork the context again. If we don't, and we use the same cancel function, then in the case were a session times out:

The select loop in run (https://github.com/docker/swarmkit/pull/2134/files#diff-15ffe95a2da45f70696ffa3c01949601R89) may select ctx.Done() first and not write the error to s.err, in which case the session is not closed and rebuilt.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way that weird case can be documented in a comment somewhere? It's one of those cases where moving parts in very disparate parts of the system affect each other and it'll be really nonobvious what's going on here in the future

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dperny there's quite a long comment in start :)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I suppose I should look at the diff before I say things I'm sorry today is not a 10/10 day for me.

}

// TODO(stevvooe): Need to move connection management up a level or create
Expand All @@ -69,7 +71,7 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI
}
s.conn = cc

go s.run(ctx, delay, description)
go s.run(sessionCtx, delay, description)
return s
}

Expand Down Expand Up @@ -114,6 +116,14 @@ func (s *session) start(ctx context.Context, description *api.NodeDescription) e
// Note: we don't defer cancellation of this context, because the
// streaming RPC is used after this function returned. We only cancel
// it in the timeout case to make sure the goroutine completes.

// We also fork this context again from the `run` context, because on
// `dispatcherRPCTimeout`, we want to cancel establishing a session and
// return an error. If we cancel the `run` context instead of forking,
// then in `run` it's possible that we just terminate the function because
// `ctx` is done and hence fail to propagate the timeout error to the agent.
// If the error is not propogated to the agent, the agent will not close
// the session or rebuild a new sesssion.
sessionCtx, cancelSession := context.WithCancel(ctx)

// Need to run Session in a goroutine since there's no way to set a
Expand Down Expand Up @@ -402,10 +412,10 @@ func (s *session) sendError(err error) {
// of event loop.
func (s *session) close() error {
s.closeOnce.Do(func() {
s.cancel()
if s.conn != nil {
s.conn.Close(false)
}

close(s.closed)
})

Expand Down
Loading