From dfa04bb8434dc90a46e9e5093dcd2e46cbd52aeb Mon Sep 17 00:00:00 2001 From: cyli Date: Thu, 23 Feb 2017 21:15:42 -0800 Subject: [PATCH 1/2] All managers, not just the current leader running a CA, will update their SecurityConfig's RootCA upon a change in the cluster's RootCA configuration. Signed-off-by: cyli --- ca/server.go | 37 ++++++++++---- ca/testutils/cautils.go | 2 +- manager/manager.go | 11 +++-- manager/manager_test.go | 107 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 143 insertions(+), 14 deletions(-) diff --git a/ca/server.go b/ca/server.go index 4057edd31f..53e9ce2793 100644 --- a/ca/server.go +++ b/ca/server.go @@ -36,6 +36,10 @@ type Server struct { joinTokens *api.JoinTokens reconciliationRetryInterval time.Duration + // skipAutoUpdateRootCA, if set to true, causes the CA server to skip automatically updating + // the signing credentials based on changes in the memory store + skipAutoUpdateRootCA bool + // pending is a map of nodes with pending certificates issuance or // renewal. They are indexed by node ID. pending map[string]*api.Node @@ -53,13 +57,14 @@ func DefaultCAConfig() api.CAConfig { } // NewServer creates a CA API server. -func NewServer(store *store.MemoryStore, securityConfig *SecurityConfig) *Server { +func NewServer(store *store.MemoryStore, securityConfig *SecurityConfig, skipAutoUpdateRootCA bool) *Server { return &Server{ store: store, securityConfig: securityConfig, pending: make(map[string]*api.Node), started: make(chan struct{}), reconciliationRetryInterval: defaultReconciliationRetryInterval, + skipAutoUpdateRootCA: skipAutoUpdateRootCA, } } @@ -382,6 +387,13 @@ func (s *Server) Run(ctx context.Context) error { // Retrieve the channels to keep track of changes in the cluster // Retrieve all the currently registered nodes var nodes []*api.Node + specifiers := []state.Event{ + state.EventCreateNode{}, + state.EventUpdateNode{}, + } + if !s.skipAutoUpdateRootCA { + specifiers = append(specifiers, state.EventUpdateCluster{}) + } updates, cancel, err := store.ViewAndWatch( s.store, func(readTx store.ReadTx) error { @@ -392,14 +404,14 @@ func (s *Server) Run(ctx context.Context) error { if len(clusters) != 1 { return errors.New("could not find cluster object") } - s.updateCluster(ctx, clusters[0]) + if !s.skipAutoUpdateRootCA { + s.UpdateRootCA(ctx, clusters[0]) + } nodes, err = store.FindNodes(readTx, store.All) return err }, - state.EventCreateNode{}, - state.EventUpdateNode{}, - state.EventUpdateCluster{}, + specifiers..., ) // Do this after updateCluster has been called, so isRunning never @@ -434,6 +446,12 @@ func (s *Server) Run(ctx context.Context) error { // Watch for new nodes being created, new nodes being updated, and changes // to the cluster for { + select { + case <-ctx.Done(): + return nil + default: + } + select { case event := <-updates: switch v := event.(type) { @@ -446,7 +464,7 @@ func (s *Server) Run(ctx context.Context) error { s.evaluateAndSignNodeCert(ctx, v.Node) } case state.EventUpdateCluster: - s.updateCluster(ctx, v.Cluster) + s.UpdateRootCA(ctx, v.Cluster) } case <-ticker.C: for _, node := range s.pending { @@ -512,9 +530,10 @@ func (s *Server) isRunning() bool { return true } -// updateCluster is called when there are cluster changes, and it ensures that the local RootCA is -// always aware of changes in clusterExpiry and the Root CA key material -func (s *Server) updateCluster(ctx context.Context, cluster *api.Cluster) { +// UpdateRootCA is called when there are cluster changes, and it ensures that the local RootCA is +// always aware of changes in clusterExpiry and the Root CA key material - this can be called by +// anything to update the root CA material +func (s *Server) UpdateRootCA(ctx context.Context, cluster *api.Cluster) { s.mu.Lock() s.joinTokens = cluster.RootCA.JoinTokens.Copy() s.mu.Unlock() diff --git a/ca/testutils/cautils.go b/ca/testutils/cautils.go index d0f155a157..40e1a23e86 100644 --- a/ca/testutils/cautils.go +++ b/ca/testutils/cautils.go @@ -167,7 +167,7 @@ func NewTestCA(t *testing.T, krwGenerators ...func(ca.CertPaths) *ca.KeyReadWrit workerToken := ca.GenerateJoinToken(&rootCA) createClusterObject(t, s, organization, workerToken, managerToken, externalCAs...) - caServer := ca.NewServer(s, managerConfig) + caServer := ca.NewServer(s, managerConfig, false) caServer.SetReconciliationRetryInterval(50 * time.Millisecond) api.RegisterCAServer(grpcServer, caServer) api.RegisterNodeCAServer(grpcServer, caServer) diff --git a/manager/manager.go b/manager/manager.go index 27496df7d0..ac4247e682 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -208,8 +208,9 @@ func New(config *Config) (*Manager, error) { } m := &Manager{ - config: *config, - caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig), + config: *config, + // manager will tell the CA server to update the RootCA, so skip having the CA server autoupdate the root ca material + caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig, true), dispatcher: dispatcher.New(raftNode, dispatcher.DefaultConfig()), logbroker: logbroker.New(raftNode.MemoryStore()), server: grpc.NewServer(opts...), @@ -514,7 +515,7 @@ func (m *Manager) Run(parent context.Context) error { } raftConfig := c.Spec.Raft - if err := m.watchForKEKChanges(ctx); err != nil { + if err := m.watchForClusterChanges(ctx); err != nil { return err } @@ -679,7 +680,7 @@ func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error { return nil } -func (m *Manager) watchForKEKChanges(ctx context.Context) error { +func (m *Manager) watchForClusterChanges(ctx context.Context) error { clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization() clusterWatch, clusterWatchCancel, err := store.ViewAndWatch(m.raftNode.MemoryStore(), func(tx store.ReadTx) error { @@ -687,6 +688,7 @@ func (m *Manager) watchForKEKChanges(ctx context.Context) error { if cluster == nil { return fmt.Errorf("unable to get current cluster") } + m.caserver.UpdateRootCA(ctx, cluster) return m.updateKEK(ctx, cluster) }, state.EventUpdateCluster{ @@ -702,6 +704,7 @@ func (m *Manager) watchForKEKChanges(ctx context.Context) error { select { case event := <-clusterWatch: clusterEvent := event.(state.EventUpdateCluster) + m.caserver.UpdateRootCA(ctx, clusterEvent.Cluster) m.updateKEK(ctx, clusterEvent.Cluster) case <-ctx.Done(): clusterWatchCancel() diff --git a/manager/manager_test.go b/manager/manager_test.go index 1d0bd43d9d..2406dfc484 100644 --- a/manager/manager_test.go +++ b/manager/manager_test.go @@ -400,3 +400,110 @@ func TestManagerLockUnlock(t *testing.T) { // error. <-done } + +// If the root CA material is updated in the memory store, a manager will update its own +// security configs even if it's "not the leader" (which we will fake by calling `becomeFollower`) +func TestManagerUpdatesSecurityConfig(t *testing.T) { + ctx := context.Background() + + temp, err := ioutil.TempFile("", "test-manager-update-security-config") + require.NoError(t, err) + require.NoError(t, temp.Close()) + require.NoError(t, os.Remove(temp.Name())) + + defer os.RemoveAll(temp.Name()) + + stateDir, err := ioutil.TempDir("", "test-raft") + require.NoError(t, err) + defer os.RemoveAll(stateDir) + + tc := testutils.NewTestCA(t) + defer tc.Stop() + + managerSecurityConfig, err := tc.NewNodeConfig(ca.ManagerRole) + require.NoError(t, err) + + _, _, err = managerSecurityConfig.KeyReader().Read() + require.NoError(t, err) + + m, err := New(&Config{ + RemoteAPI: &RemoteAddrs{ListenAddr: "127.0.0.1:0"}, + ControlAPI: temp.Name(), + StateDir: stateDir, + SecurityConfig: managerSecurityConfig, + }) + require.NoError(t, err) + require.NotNil(t, m) + + done := make(chan error) + defer close(done) + go func() { + done <- m.Run(ctx) + }() + + // wait until the CA server is running + opts := []grpc.DialOption{ + grpc.WithTimeout(10 * time.Second), + grpc.WithTransportCredentials(managerSecurityConfig.ClientTLSCreds), + } + + conn, err := grpc.Dial(m.Addr(), opts...) + require.NoError(t, err) + defer func() { + require.NoError(t, conn.Close()) + }() + + client := api.NewCAClient(conn) + + require.NoError(t, raftutils.PollFuncWithTimeout(nil, func() error { + ctx, _ := context.WithTimeout(context.Background(), 500*time.Millisecond) + _, err := client.GetRootCACertificate(ctx, &api.GetRootCACertificateRequest{}) + return err + }, time.Second)) + + // wait until the cluster is up + var clusters []*api.Cluster + + require.NoError(t, raftutils.PollFuncWithTimeout(nil, func() error { + var err error + m.raftNode.MemoryStore().View(func(tx store.ReadTx) { + clusters, err = store.FindClusters(tx, store.ByName(store.DefaultClusterName)) + }) + if err != nil { + return err + } + if len(clusters) == 0 { + return fmt.Errorf("cluster not ready yet") + } + return nil + }, 1*time.Second)) + + // stop running CA server and other leader functions + m.becomeFollower() + + newRootCert, _, err := testutils.CreateRootCertAndKey("rootOther") + require.NoError(t, err) + updatedCA := append(tc.RootCA.Cert, newRootCert...) + + // Update the RootCA to have a bundle + require.NoError(t, m.raftNode.MemoryStore().Update(func(tx store.Tx) error { + cluster := store.GetCluster(tx, clusters[0].ID) + cluster.RootCA.CACert = updatedCA + return store.UpdateCluster(tx, cluster) + })) + + // wait for the manager's security config to be updated + require.NoError(t, raftutils.PollFuncWithTimeout(nil, func() error { + if !bytes.Equal(managerSecurityConfig.RootCA().Cert, updatedCA) { + return fmt.Errorf("root CA not updated yet") + } + return nil + }, 1*time.Second)) + + m.Stop(ctx, false) + + // After stopping we should MAY receive an error from ListenAndServe if + // all this happened before WaitForLeader completed, so don't check the + // error. + <-done +} From 4d0ba73730f7ba060b3bcd1fa7ba1564f22d1af3 Mon Sep 17 00:00:00 2001 From: cyli Date: Mon, 27 Feb 2017 16:59:12 -0800 Subject: [PATCH 2/2] Move the cluster watch that updates the root CA out of the CA server entirely Signed-off-by: cyli --- ca/server.go | 24 ++++-------------------- ca/testutils/cautils.go | 31 ++++++++++++++++++++++++++++++- manager/manager.go | 5 ++--- 3 files changed, 36 insertions(+), 24 deletions(-) diff --git a/ca/server.go b/ca/server.go index 53e9ce2793..562fcdd0e4 100644 --- a/ca/server.go +++ b/ca/server.go @@ -36,10 +36,6 @@ type Server struct { joinTokens *api.JoinTokens reconciliationRetryInterval time.Duration - // skipAutoUpdateRootCA, if set to true, causes the CA server to skip automatically updating - // the signing credentials based on changes in the memory store - skipAutoUpdateRootCA bool - // pending is a map of nodes with pending certificates issuance or // renewal. They are indexed by node ID. pending map[string]*api.Node @@ -57,14 +53,13 @@ func DefaultCAConfig() api.CAConfig { } // NewServer creates a CA API server. -func NewServer(store *store.MemoryStore, securityConfig *SecurityConfig, skipAutoUpdateRootCA bool) *Server { +func NewServer(store *store.MemoryStore, securityConfig *SecurityConfig) *Server { return &Server{ store: store, securityConfig: securityConfig, pending: make(map[string]*api.Node), started: make(chan struct{}), reconciliationRetryInterval: defaultReconciliationRetryInterval, - skipAutoUpdateRootCA: skipAutoUpdateRootCA, } } @@ -387,13 +382,6 @@ func (s *Server) Run(ctx context.Context) error { // Retrieve the channels to keep track of changes in the cluster // Retrieve all the currently registered nodes var nodes []*api.Node - specifiers := []state.Event{ - state.EventCreateNode{}, - state.EventUpdateNode{}, - } - if !s.skipAutoUpdateRootCA { - specifiers = append(specifiers, state.EventUpdateCluster{}) - } updates, cancel, err := store.ViewAndWatch( s.store, func(readTx store.ReadTx) error { @@ -404,14 +392,12 @@ func (s *Server) Run(ctx context.Context) error { if len(clusters) != 1 { return errors.New("could not find cluster object") } - if !s.skipAutoUpdateRootCA { - s.UpdateRootCA(ctx, clusters[0]) - } - + s.UpdateRootCA(ctx, clusters[0]) // call once to ensure that the join tokens are always set nodes, err = store.FindNodes(readTx, store.All) return err }, - specifiers..., + state.EventCreateNode{}, + state.EventUpdateNode{}, ) // Do this after updateCluster has been called, so isRunning never @@ -463,8 +449,6 @@ func (s *Server) Run(ctx context.Context) error { if !isFinalState(v.Node.Certificate.Status) { s.evaluateAndSignNodeCert(ctx, v.Node) } - case state.EventUpdateCluster: - s.UpdateRootCA(ctx, v.Cluster) } case <-ticker.C: for _, node := range s.pending { diff --git a/ca/testutils/cautils.go b/ca/testutils/cautils.go index 40e1a23e86..881fce6e6a 100644 --- a/ca/testutils/cautils.go +++ b/ca/testutils/cautils.go @@ -21,6 +21,7 @@ import ( "github.com/docker/swarmkit/connectionbroker" "github.com/docker/swarmkit/identity" "github.com/docker/swarmkit/ioutils" + "github.com/docker/swarmkit/manager/state" "github.com/docker/swarmkit/manager/state/store" "github.com/docker/swarmkit/remotes" "github.com/opencontainers/go-digest" @@ -48,10 +49,12 @@ type TestCA struct { ManagerToken string ConnBroker *connectionbroker.Broker KeyReadWriter *ca.KeyReadWriter + watchCancel func() } // Stop cleans up after TestCA func (tc *TestCA) Stop() { + tc.watchCancel() os.RemoveAll(tc.TempDir) for _, conn := range tc.Conns { conn.Close() @@ -167,13 +170,38 @@ func NewTestCA(t *testing.T, krwGenerators ...func(ca.CertPaths) *ca.KeyReadWrit workerToken := ca.GenerateJoinToken(&rootCA) createClusterObject(t, s, organization, workerToken, managerToken, externalCAs...) - caServer := ca.NewServer(s, managerConfig, false) + caServer := ca.NewServer(s, managerConfig) caServer.SetReconciliationRetryInterval(50 * time.Millisecond) api.RegisterCAServer(grpcServer, caServer) api.RegisterNodeCAServer(grpcServer, caServer) ctx := context.Background() + clusterWatch, clusterWatchCancel, err := store.ViewAndWatch( + s, func(tx store.ReadTx) error { + cluster := store.GetCluster(tx, organization) + caServer.UpdateRootCA(ctx, cluster) + return nil + }, + state.EventUpdateCluster{ + Cluster: &api.Cluster{ID: organization}, + Checks: []state.ClusterCheckFunc{state.ClusterCheckID}, + }, + ) + assert.NoError(t, err) + go func() { + for { + select { + case event := <-clusterWatch: + clusterEvent := event.(state.EventUpdateCluster) + caServer.UpdateRootCA(ctx, clusterEvent.Cluster) + case <-ctx.Done(): + clusterWatchCancel() + return + } + } + }() + go grpcServer.Serve(l) go caServer.Run(ctx) @@ -202,6 +230,7 @@ func NewTestCA(t *testing.T, krwGenerators ...func(ca.CertPaths) *ca.KeyReadWrit ManagerToken: managerToken, ConnBroker: connectionbroker.New(remotes), KeyReadWriter: krw, + watchCancel: clusterWatchCancel, } } diff --git a/manager/manager.go b/manager/manager.go index ac4247e682..7d5fcd0983 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -208,9 +208,8 @@ func New(config *Config) (*Manager, error) { } m := &Manager{ - config: *config, - // manager will tell the CA server to update the RootCA, so skip having the CA server autoupdate the root ca material - caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig, true), + config: *config, + caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig), dispatcher: dispatcher.New(raftNode, dispatcher.DefaultConfig()), logbroker: logbroker.New(raftNode.MemoryStore()), server: grpc.NewServer(opts...),