From b55cb6e69219b7e8840226a98b9fe728a0e59e6f Mon Sep 17 00:00:00 2001 From: Ying Li Date: Thu, 27 Jul 2017 15:51:52 -0700 Subject: [PATCH 1/3] Store a local signing root CA on the CA server - do not use the SecurityConfig's. Signed-off-by: Ying Li --- ca/server.go | 42 +++++++++++++++++++++++------------------ ca/server_test.go | 34 +++++++++++++-------------------- ca/testutils/cautils.go | 2 +- 3 files changed, 38 insertions(+), 40 deletions(-) diff --git a/ca/server.go b/ca/server.go index 681eb355f6..e0190b2523 100644 --- a/ca/server.go +++ b/ca/server.go @@ -43,6 +43,8 @@ type Server struct { cancel func() store *store.MemoryStore securityConfig *SecurityConfig + clusterID string + localRootCA *RootCA externalCA *ExternalCA externalCAPool *x509.CertPool joinTokens *api.JoinTokens @@ -69,9 +71,6 @@ type Server struct { // of the SecurityConfig signingMu sync.Mutex - // before we update the security config with the new root CA, we need to be able to save the root certs - rootPaths CertPaths - // lets us monitor and finish root rotations rootReconciler *rootRotationReconciler rootReconciliationRetryInterval time.Duration @@ -85,16 +84,17 @@ func DefaultCAConfig() api.CAConfig { } // NewServer creates a CA API server. -func NewServer(store *store.MemoryStore, securityConfig *SecurityConfig, rootCAPaths CertPaths) *Server { +func NewServer(store *store.MemoryStore, securityConfig *SecurityConfig) *Server { return &Server{ store: store, securityConfig: securityConfig, + clusterID: securityConfig.ClientTLSCreds.Organization(), + localRootCA: securityConfig.RootCA(), externalCA: NewExternalCA(nil, nil), pending: make(map[string]*api.Node), started: make(chan struct{}), reconciliationRetryInterval: defaultReconciliationRetryInterval, rootReconciliationRetryInterval: defaultRootReconciliationInterval, - rootPaths: rootCAPaths, } } @@ -106,6 +106,14 @@ func (s *Server) ExternalCA() *ExternalCA { return s.externalCA } +// RootCA returns the current local root CA - this is exposed to support unit testing only, and the root CA +// should really be a private field +func (s *Server) RootCA() *RootCA { + s.signingMu.Lock() + defer s.signingMu.Unlock() + return s.localRootCA +} + // SetReconciliationRetryInterval changes the time interval between // reconciliation attempts. This function must be called before Run. func (s *Server) SetReconciliationRetryInterval(reconciliationRetryInterval time.Duration) { @@ -130,7 +138,7 @@ func (s *Server) GetUnlockKey(ctx context.Context, request *api.GetUnlockKeyRequ // a cached value. resp := api.GetUnlockKeyResponse{} s.store.View(func(tx store.ReadTx) { - cluster := store.GetCluster(tx, s.securityConfig.ClientTLSCreds.Organization()) + cluster := store.GetCluster(tx, s.clusterID) resp.Version = cluster.Meta.Version if cluster.Spec.EncryptionConfig.AutoLockManagers { for _, encryptionKey := range cluster.UnlockKeys { @@ -270,14 +278,14 @@ func (s *Server) IssueNodeCertificate(ctx context.Context, request *api.IssueNod // If the remote node is a worker (either forwarded by a manager, or calling directly), // issue a renew worker certificate entry with the correct ID - nodeID, err := AuthorizeForwardedRoleAndOrg(ctx, []string{WorkerRole}, []string{ManagerRole}, s.securityConfig.ClientTLSCreds.Organization(), blacklistedCerts) + nodeID, err := AuthorizeForwardedRoleAndOrg(ctx, []string{WorkerRole}, []string{ManagerRole}, s.clusterID, blacklistedCerts) if err == nil { return s.issueRenewCertificate(ctx, nodeID, request.CSR) } // If the remote node is a manager (either forwarded by another manager, or calling directly), // issue a renew certificate entry with the correct ID - nodeID, err = AuthorizeForwardedRoleAndOrg(ctx, []string{ManagerRole}, []string{ManagerRole}, s.securityConfig.ClientTLSCreds.Organization(), blacklistedCerts) + nodeID, err = AuthorizeForwardedRoleAndOrg(ctx, []string{ManagerRole}, []string{ManagerRole}, s.clusterID, blacklistedCerts) if err == nil { return s.issueRenewCertificate(ctx, nodeID, request.CSR) } @@ -409,8 +417,11 @@ func (s *Server) GetRootCACertificate(ctx context.Context, request *api.GetRootC "method": "GetRootCACertificate", }) + s.signingMu.Lock() + defer s.signingMu.Unlock() + return &api.GetRootCACertificateResponse{ - Certificate: s.securityConfig.RootCA().Certs, + Certificate: s.localRootCA.Certs, }, nil } @@ -428,7 +439,7 @@ func (s *Server) Run(ctx context.Context) error { // we need to set it on the server, because `Server.UpdateRootCA` can be called from outside the Run function s.rootReconciler = &rootRotationReconciler{ ctx: log.WithField(ctx, "method", "(*Server).rootRotationReconciler"), - clusterID: s.securityConfig.ClientTLSCreds.Organization(), + clusterID: s.clusterID, store: s.store, batchUpdateInterval: s.rootReconciliationRetryInterval, } @@ -694,13 +705,8 @@ func (s *Server) UpdateRootCA(ctx context.Context, cluster *api.Cluster) error { if err != nil { return errors.Wrap(err, "invalid Root CA object in cluster") } - if err := s.securityConfig.UpdateRootCA(&updatedRootCA); err != nil { - return errors.Wrap(err, "updating Root CA failed") - } - if err := SaveRootCA(updatedRootCA, s.rootPaths); err != nil { - return errors.Wrap(err, "unable to save new root CA certificates") - } + s.localRootCA = &updatedRootCA s.externalCAPool = updatedRootCA.Pool if rCA.RootRotation != nil { // the external CA has to trust the new CA cert @@ -763,7 +769,7 @@ func (s *Server) evaluateAndSignNodeCert(ctx context.Context, node *api.Node) er // signNodeCert does the bulk of the work for signing a certificate func (s *Server) signNodeCert(ctx context.Context, node *api.Node) error { s.signingMu.Lock() - rootCA := s.securityConfig.RootCA() + rootCA := s.localRootCA externalCA := s.externalCA s.signingMu.Unlock() @@ -786,7 +792,7 @@ func (s *Server) signNodeCert(ctx context.Context, node *api.Node) error { rawCSR = node.Certificate.CSR cn = node.Certificate.CN ou = role - org = s.securityConfig.ClientTLSCreds.Organization() + org = s.clusterID ) // Try using the external CA first. diff --git a/ca/server_test.go b/ca/server_test.go index 8c5b6628b3..94bc984be7 100644 --- a/ca/server_test.go +++ b/ca/server_test.go @@ -575,7 +575,7 @@ func TestCAServerUpdateRootCA(t *testing.T) { } { require.NoError(t, tc.CAServer.UpdateRootCA(tc.Context, testCase.clusterObj)) - rootCA := tc.ServingSecurityConfig.RootCA() + rootCA := tc.CAServer.RootCA() require.Equal(t, testCase.rootCARoots, rootCA.Certs) var signingCert, signingKey []byte if s, err := rootCA.Signer(); err == nil { @@ -1030,12 +1030,12 @@ func TestRootRotationReconciliationWithChanges(t *testing.T) { if testcase.expectedRootCA.RootRotation != nil { expectedKey = testcase.expectedRootCA.RootRotation.CAKey } - s, err := rt.tc.ServingSecurityConfig.RootCA().Signer() + s, err := rt.tc.CAServer.RootCA().Signer() if err != nil { return err } if !bytes.Equal(s.Key, expectedKey) { - return fmt.Errorf("the security config has not been updated correctly") + return fmt.Errorf("the CA Server's root CA has not been updated correctly") } } return nil @@ -1178,12 +1178,12 @@ func TestRootRotationReconciliationNoChanges(t *testing.T) { require.Equal(t, expected.Certificate.Status, node.Certificate.Status, "node %s: %s", node.ID, testcase.descr) } - // ensure that the security config's root CA object has the same expected key + // ensure that the server's root CA object has the same expected key expectedKey := testcase.rootCA.CAKey if testcase.rootCA.RootRotation != nil { expectedKey = testcase.rootCA.RootRotation.CAKey } - s, err := rt.tc.ServingSecurityConfig.RootCA().Signer() + s, err := rt.tc.CAServer.RootCA().Signer() require.NoError(t, err, testcase.descr) require.Equal(t, s.Key, expectedKey, testcase.descr) } @@ -1212,19 +1212,18 @@ func TestRootRotationReconciliationRace(t *testing.T) { var ( otherServers = make([]*ca.Server, 5) - secConfigs = make([]*ca.SecurityConfig, 5) serverContexts = make([]context.Context, 5) paths = make([]*ca.SecurityConfigPaths, 5) ) for i := 0; i < 5; i++ { // to make sure we get some collision // start a competing CA server - secConfigs[i], err = tc.NewNodeConfig(ca.ManagerRole) - require.NoError(t, err) - paths[i] = ca.NewConfigPaths(filepath.Join(tempDir, fmt.Sprintf("%d", i))) - otherServers[i] = ca.NewServer(tc.MemoryStore, secConfigs[i], paths[i].RootCA) + // the sec config is only used to get the organization, the initial root CA copy, and any updates to + // TLS certificates, so all the servers can share the same one + otherServers[i] = ca.NewServer(tc.MemoryStore, tc.ServingSecurityConfig) + // offset each server's reconciliation interval somewhat so that some will // pre-empt others otherServers[i].SetRootReconciliationInterval(time.Millisecond * time.Duration((i+1)*10)) @@ -1254,15 +1253,8 @@ func TestRootRotationReconciliationRace(t *testing.T) { select { case event := <-clusterWatch: clusterEvent := event.(api.EventUpdateCluster) - for i, s := range otherServers { // the security config of each + for _, s := range otherServers { s.UpdateRootCA(tc.Context, clusterEvent.Cluster) - // also update the TLS configs with a new TLS creds, otherwise we won't be able to update the - // root CA the second time around - tlsKeyPair, issuerInfo, err := secConfigs[i].RootCA().IssueAndSaveNewCertificates( - ca.NewKeyReadWriter(paths[i].Node, nil, nil), "cn", "ou", "org") - if err == nil { - secConfigs[i].UpdateTLSCredentials(tlsKeyPair, issuerInfo) - } } case <-done: return @@ -1334,8 +1326,8 @@ func TestRootRotationReconciliationRace(t *testing.T) { if !bytes.Equal(cluster.RootCA.CAKey, rotationKey) { return errors.New("expected root key is wrong") } - for _, secConfig := range secConfigs { - s, err := secConfig.RootCA().Signer() + for _, server := range append(otherServers, tc.CAServer) { + s, err := server.RootCA().Signer() if err != nil { return err } @@ -1362,7 +1354,7 @@ func TestRootRotationReconciliationThrottled(t *testing.T) { // immediately stop the CA server - we want to run our own tc.CAServer.Stop() - caServer := ca.NewServer(tc.MemoryStore, tc.ServingSecurityConfig, tc.Paths.RootCA) + caServer := ca.NewServer(tc.MemoryStore, tc.ServingSecurityConfig) // set the reconciliation interval to something ridiculous, so we can make sure the first // batch does update all of them caServer.SetRootReconciliationInterval(time.Hour) diff --git a/ca/testutils/cautils.go b/ca/testutils/cautils.go index b176245da3..c7976bdc2d 100644 --- a/ca/testutils/cautils.go +++ b/ca/testutils/cautils.go @@ -211,7 +211,7 @@ func NewTestCAFromAPIRootCA(t *testing.T, tempBaseDir string, apiRootCA api.Root clusterObj := createClusterObject(t, s, organization, apiRootCA, &rootCA, externalCAs...) - caServer := ca.NewServer(s, managerConfig, paths.RootCA) + caServer := ca.NewServer(s, managerConfig) caServer.SetReconciliationRetryInterval(50 * time.Millisecond) caServer.SetRootReconciliationInterval(50 * time.Millisecond) api.RegisterCAServer(grpcServer, caServer) From 32522512dd0a579346569a7751935e5d3d8a3daa Mon Sep 17 00:00:00 2001 From: Ying Li Date: Fri, 28 Jul 2017 11:05:47 -0700 Subject: [PATCH 2/3] Add a cluster watch back to the CA server, so it no longer requires some external force to call `UpdateRootCA`. Signed-off-by: Ying Li --- ca/server.go | 38 ++++++-------- ca/server_test.go | 111 +++++----------------------------------- ca/testutils/cautils.go | 31 +---------- 3 files changed, 32 insertions(+), 148 deletions(-) diff --git a/ca/server.go b/ca/server.go index e0190b2523..806681b6ce 100644 --- a/ca/server.go +++ b/ca/server.go @@ -25,13 +25,6 @@ const ( defaultRootReconciliationInterval = 3 * time.Second ) -// APISecurityConfigUpdater knows how to update a SecurityConfig from an api.Cluster object -type APISecurityConfigUpdater interface { - UpdateRootCA(ctx context.Context, cluster *api.Cluster) error -} - -var _ APISecurityConfigUpdater = &Server{} - // Server is the CA and NodeCA API gRPC server. // TODO(aaronl): At some point we may want to have separate implementations of // CA, NodeCA, and other hypothetical future CA services. At the moment, @@ -88,13 +81,13 @@ func NewServer(store *store.MemoryStore, securityConfig *SecurityConfig) *Server return &Server{ store: store, securityConfig: securityConfig, - clusterID: securityConfig.ClientTLSCreds.Organization(), localRootCA: securityConfig.RootCA(), externalCA: NewExternalCA(nil, nil), pending: make(map[string]*api.Node), started: make(chan struct{}), reconciliationRetryInterval: defaultReconciliationRetryInterval, rootReconciliationRetryInterval: defaultRootReconciliationInterval, + // clusterID will be set on every call to Run } } @@ -436,19 +429,10 @@ func (s *Server) Run(ctx context.Context) error { s.wg.Add(1) s.ctx, s.cancel = context.WithCancel(log.WithModule(ctx, "ca")) ctx = s.ctx - // we need to set it on the server, because `Server.UpdateRootCA` can be called from outside the Run function - s.rootReconciler = &rootRotationReconciler{ - ctx: log.WithField(ctx, "method", "(*Server).rootRotationReconciler"), - clusterID: s.clusterID, - store: s.store, - batchUpdateInterval: s.rootReconciliationRetryInterval, - } - rootReconciler := s.rootReconciler s.mu.Unlock() defer s.wg.Done() defer func() { s.mu.Lock() - s.rootReconciler = nil s.mu.Unlock() }() @@ -475,10 +459,19 @@ func (s *Server) Run(ctx context.Context) error { api.EventCreateNode{}, api.EventUpdateNode{}, api.EventDeleteNode{}, + api.EventUpdateCluster{}, ) // call once to ensure that the join tokens and local/external CA signer are always set - s.UpdateRootCA(ctx, cluster) + s.clusterID = cluster.ID + rootReconciler := &rootRotationReconciler{ + ctx: log.WithField(ctx, "method", "(*Server).rootRotationReconciler"), + clusterID: s.clusterID, + store: s.store, + batchUpdateInterval: s.rootReconciliationRetryInterval, + } + + s.UpdateRootCA(ctx, cluster, rootReconciler) // Do this after updateCluster has been called, so isRunning never returns true without // the join tokens and external CA/security config's root CA being set correctly @@ -534,6 +527,10 @@ func (s *Server) Run(ctx context.Context) error { rootReconciler.UpdateNode(v.Node) case api.EventDeleteNode: rootReconciler.DeleteNode(v.Node) + case api.EventUpdateCluster: + if v.Cluster.ID == s.clusterID { + s.UpdateRootCA(ctx, v.Cluster, rootReconciler) + } } case <-externalTLSCredsChange: // The TLS certificates can rotate independently of the root CA (and hence which roots the @@ -647,10 +644,9 @@ func filterExternalCAURLS(ctx context.Context, desiredCert, defaultCert []byte, // UpdateRootCA is called when there are cluster changes, and it ensures that the local RootCA is // always aware of changes in clusterExpiry and the Root CA key material - this can be called by // anything to update the root CA material -func (s *Server) UpdateRootCA(ctx context.Context, cluster *api.Cluster) error { +func (s *Server) UpdateRootCA(ctx context.Context, cluster *api.Cluster, reconciler *rootRotationReconciler) error { s.mu.Lock() s.joinTokens = cluster.RootCA.JoinTokens.Copy() - reconciler := s.rootReconciler s.mu.Unlock() rCA := cluster.RootCA.Copy() if reconciler != nil { @@ -670,7 +666,7 @@ func (s *Server) UpdateRootCA(ctx context.Context, cluster *api.Cluster) error { if rootCAChanged { setOrUpdate := "set" if !firstSeenCluster { - log.G(ctx).Debug("Updating security config and external CA due to change in cluster Root CA") + log.G(ctx).Debug("Updating signing root CA and external CA due to change in cluster Root CA") setOrUpdate = "updated" } expiry := DefaultNodeCertExpiration diff --git a/ca/server_test.go b/ca/server_test.go index 94bc984be7..04db11f7af 100644 --- a/ca/server_test.go +++ b/ca/server_test.go @@ -573,7 +573,7 @@ func TestCAServerUpdateRootCA(t *testing.T) { externalCertSignedBy: cert, }, } { - require.NoError(t, tc.CAServer.UpdateRootCA(tc.Context, testCase.clusterObj)) + require.NoError(t, tc.CAServer.UpdateRootCA(tc.Context, testCase.clusterObj, nil)) rootCA := tc.CAServer.RootCA() require.Equal(t, testCase.rootCARoots, rootCA.Certs) @@ -610,12 +610,6 @@ func TestCAServerUpdateRootCA(t *testing.T) { require.Equal(t, ca.ErrNoExternalCAURLs, err) } } - - // If we can't save the root cert, we can't update the root CA even if it's completely valid - require.NoError(t, os.RemoveAll(tc.TempDir)) - require.NoError(t, ioutil.WriteFile(tc.TempDir, []byte("cant create directory if this is file"), 0700)) - tc.CAServer.UpdateRootCA(tc.Context, fakeClusterSpec(cautils.ECDSA256SHA256Cert, cautils.ECDSA256Key, nil, nil)) - require.Equal(t, tc.RootCA.Certs, tc.ServingSecurityConfig.RootCA().Certs) } type rootRotationTester struct { @@ -1044,7 +1038,7 @@ func TestRootRotationReconciliationWithChanges(t *testing.T) { } // These are the root rotation test cases where we expect there to be no changes made to either -// the nodes or the root CA object +// the nodes or the root CA object, although the server's signing root CA may change. func TestRootRotationReconciliationNoChanges(t *testing.T) { t.Parallel() if cautils.External { @@ -1076,35 +1070,10 @@ func TestRootRotationReconciliationNoChanges(t *testing.T) { require.NotNil(t, startCluster) testcases := []struct { - nodes map[string]*api.Node // what nodes we should start with - rootCA *api.RootCA // what root CA we should start with - descr string - caServerStopped bool // if the server is running, only then will a reconciliation loop happen + nodes map[string]*api.Node // what nodes we should start with + rootCA *api.RootCA // what root CA we should start with + descr string }{ - { - descr: ("If the CA server is not running no reconciliation happens even if a root rotation " + - "is in progress"), - caServerStopped: true, - nodes: map[string]*api.Node{ - "0": getFakeAPINode(t, "0", api.IssuanceStatePending, nil, false), - "1": getFakeAPINode(t, "1", api.IssuanceStateIssued, oldNodeTLSInfo, true), - "2": getFakeAPINode(t, "2", api.IssuanceStateRenew, nil, true), - "3": getFakeAPINode(t, "3", api.IssuanceStateRotate, nil, true), - "4": getFakeAPINode(t, "4", api.IssuanceStatePending, nil, true), - "5": getFakeAPINode(t, "5", api.IssuanceStateFailed, nil, true), - "6": getFakeAPINode(t, "6", api.IssuanceStateIssued, oldNodeTLSInfo, false), - }, - rootCA: &api.RootCA{ - CACert: startCluster.RootCA.CACert, - CAKey: startCluster.RootCA.CAKey, - CACertHash: startCluster.RootCA.CACertHash, - RootRotation: &api.RootRotation{ - CACert: rotationCert, - CAKey: rotationKey, - CrossSignedCACert: rotationCrossSigned, - }, - }, - }, { descr: ("If all nodes have the right TLS info or are already rotated, rotating, or pending, " + "there will be no changes needed"), @@ -1149,10 +1118,7 @@ func TestRootRotationReconciliationNoChanges(t *testing.T) { rt.tc.CAServer.Stop() rt.convergeWantedNodes(testcase.nodes, testcase.descr) rt.convergeRootCA(&startCluster.RootCA, testcase.descr) // no root rotation - - if !testcase.caServerStopped { - startCAServer(rt.tc.Context, rt.tc.CAServer) - } + startCAServer(rt.tc.Context, rt.tc.CAServer) rt.convergeRootCA(testcase.rootCA, testcase.descr) time.Sleep(500 * time.Millisecond) @@ -1233,34 +1199,6 @@ func TestRootRotationReconciliationRace(t *testing.T) { startCAServer(serverContexts[i], otherServers[i]) defer otherServers[i].Stop() } - clusterWatch, clusterWatchCancel, err := store.ViewAndWatch( - tc.MemoryStore, func(tx store.ReadTx) error { - // don't bother getting the cluster - the CA serverß have already done that when first running - return nil - }, - api.EventUpdateCluster{ - Cluster: &api.Cluster{ID: tc.Organization}, - Checks: []api.ClusterCheckFunc{api.ClusterCheckID}, - }, - ) - require.NoError(t, err) - defer clusterWatchCancel() - - done := make(chan struct{}) - defer close(done) - go func() { - for { - select { - case event := <-clusterWatch: - clusterEvent := event.(api.EventUpdateCluster) - for _, s := range otherServers { - s.UpdateRootCA(tc.Context, clusterEvent.Cluster) - } - case <-done: - return - } - } - }() oldNodeTLSInfo := &api.NodeTLSInfo{ TrustRoot: tc.RootCA.Certs, @@ -1326,13 +1264,13 @@ func TestRootRotationReconciliationRace(t *testing.T) { if !bytes.Equal(cluster.RootCA.CAKey, rotationKey) { return errors.New("expected root key is wrong") } - for _, server := range append(otherServers, tc.CAServer) { + for i, server := range append(otherServers) { s, err := server.RootCA().Signer() if err != nil { return err } if !bytes.Equal(s.Key, rotationKey) { - return errors.New("all the sec configs haven't been updated yet") + return errors.Errorf("all the servers' root CAs haven't been updated yet: server %d", i) } } return nil @@ -1361,35 +1299,14 @@ func TestRootRotationReconciliationThrottled(t *testing.T) { startCAServer(tc.Context, caServer) defer caServer.Stop() - var nodes []*api.Node - clusterWatch, clusterWatchCancel, err := store.ViewAndWatch( - tc.MemoryStore, func(tx store.ReadTx) error { - // don't bother getting the cluster - the CA server has already done that when first running - var err error - nodes, err = store.FindNodes(tx, store.All) - return err - }, - api.EventUpdateCluster{ - Cluster: &api.Cluster{ID: tc.Organization}, - Checks: []api.ClusterCheckFunc{api.ClusterCheckID}, - }, + var ( + nodes []*api.Node + err error ) + tc.MemoryStore.View(func(tx store.ReadTx) { + nodes, err = store.FindNodes(tx, store.All) + }) require.NoError(t, err) - defer clusterWatchCancel() - - done := make(chan struct{}) - defer close(done) - go func() { - for { - select { - case event := <-clusterWatch: - clusterEvent := event.(api.EventUpdateCluster) - caServer.UpdateRootCA(tc.Context, clusterEvent.Cluster) - case <-done: - return - } - } - }() // create twice the batch size of nodes err = tc.MemoryStore.Batch(func(batch *store.Batch) error { diff --git a/ca/testutils/cautils.go b/ca/testutils/cautils.go index c7976bdc2d..e2ee5a4f3b 100644 --- a/ca/testutils/cautils.go +++ b/ca/testutils/cautils.go @@ -50,13 +50,12 @@ type TestCA struct { ManagerToken string ConnBroker *connectionbroker.Broker KeyReadWriter *ca.KeyReadWriter - ctxCancel, watchCancel func() + ctxCancel func() securityConfigCleanups []func() error } // Stop cleans up after TestCA func (tc *TestCA) Stop() { - tc.watchCancel() tc.ctxCancel() for _, qClose := range tc.securityConfigCleanups { qClose() @@ -223,33 +222,6 @@ func NewTestCAFromAPIRootCA(t *testing.T, tempBaseDir string, apiRootCA api.Root } ctx, ctxCancel := context.WithCancel(log.WithLogger(context.Background(), log.L.WithFields(fields))) - clusterWatch, clusterWatchCancel, err := store.ViewAndWatch( - s, func(tx store.ReadTx) error { - cluster := store.GetCluster(tx, organization) - caServer.UpdateRootCA(ctx, cluster) - return nil - }, - api.EventUpdateCluster{ - Cluster: &api.Cluster{ID: organization}, - Checks: []api.ClusterCheckFunc{api.ClusterCheckID}, - }, - ) - assert.NoError(t, err) - go func() { - for { - select { - case event := <-clusterWatch: - clusterEvent := event.(api.EventUpdateCluster) - if err := caServer.UpdateRootCA(ctx, clusterEvent.Cluster); err != nil { - log.G(ctx).WithError(err).Error("ca utils CA server could not update root CA") - } - case <-ctx.Done(): - clusterWatchCancel() - return - } - } - }() - go grpcServer.Serve(l) go caServer.Run(ctx) @@ -280,7 +252,6 @@ func NewTestCAFromAPIRootCA(t *testing.T, tempBaseDir string, apiRootCA api.Root ManagerToken: clusterObj.RootCA.JoinTokens.Manager, ConnBroker: connectionbroker.New(remotes), KeyReadWriter: krw, - watchCancel: clusterWatchCancel, ctxCancel: ctxCancel, securityConfigCleanups: []func() error{qClose1, qClose2, qClose3}, } From 82c36fe16ec5a4400fa8b4c36b883d509500360a Mon Sep 17 00:00:00 2001 From: Ying Li Date: Fri, 28 Jul 2017 18:08:05 -0700 Subject: [PATCH 3/3] Use a single codepath for updating the root CA in the security config for both manager and worker nodes. Signed-off-by: Ying Li --- ca/server.go | 53 +++++++----- ca/server_test.go | 4 +- manager/controlapi/ca_rotation_test.go | 3 - manager/controlapi/cluster.go | 19 ++--- manager/controlapi/server.go | 5 +- manager/controlapi/server_test.go | 2 +- manager/manager.go | 19 +++-- manager/manager_test.go | 108 ------------------------- node/node.go | 7 +- node/node_test.go | 27 +++++-- 10 files changed, 76 insertions(+), 171 deletions(-) diff --git a/ca/server.go b/ca/server.go index 806681b6ce..a5feb179ce 100644 --- a/ca/server.go +++ b/ca/server.go @@ -239,7 +239,7 @@ func (s *Server) IssueNodeCertificate(ctx context.Context, request *api.IssueNod return nil, grpc.Errorf(codes.InvalidArgument, codes.InvalidArgument.String()) } - if _, err := s.isRunningLocked(); err != nil { + if err := s.isReadyLocked(); err != nil { return nil, err } @@ -250,8 +250,7 @@ func (s *Server) IssueNodeCertificate(ctx context.Context, request *api.IssueNod ) s.store.View(func(readTx store.ReadTx) { - clusters, err = store.FindClusters(readTx, store.ByName("default")) - + clusters, err = store.FindClusters(readTx, store.ByName(store.DefaultClusterName)) }) // Not having a cluster object yet means we can't check @@ -473,7 +472,7 @@ func (s *Server) Run(ctx context.Context) error { s.UpdateRootCA(ctx, cluster, rootReconciler) - // Do this after updateCluster has been called, so isRunning never returns true without + // Do this after updateCluster has been called, so Ready() and isRunning never returns true without // the join tokens and external CA/security config's root CA being set correctly s.mu.Lock() close(s.started) @@ -576,6 +575,7 @@ func (s *Server) Stop() error { } s.cancel() s.started = make(chan struct{}) + s.joinTokens = nil s.mu.Unlock() // Wait for Run to complete @@ -602,6 +602,18 @@ func (s *Server) isRunningLocked() (context.Context, error) { return ctx, nil } +func (s *Server) isReadyLocked() error { + s.mu.Lock() + defer s.mu.Unlock() + if !s.isRunning() { + return grpc.Errorf(codes.Aborted, "CA signer is stopped") + } + if s.joinTokens == nil { + return grpc.Errorf(codes.Aborted, "CA signer is still starting") + } + return nil +} + func (s *Server) isRunning() bool { if s.ctx == nil { return false @@ -684,27 +696,16 @@ func (s *Server) UpdateRootCA(ctx context.Context, cluster *api.Cluster, reconci log.G(ctx).Warn("no certificate expiration specified, using default") } // Attempt to update our local RootCA with the new parameters - var intermediates []byte - signingCert := rCA.CACert - signingKey := rCA.CAKey - externalCACert := rCA.CACert - if rCA.RootRotation != nil { - signingCert = rCA.RootRotation.CrossSignedCACert - signingKey = rCA.RootRotation.CAKey - intermediates = rCA.RootRotation.CrossSignedCACert - externalCACert = rCA.RootRotation.CACert - } - if signingKey == nil { - signingCert = nil - } - updatedRootCA, err := NewRootCA(rCA.CACert, signingCert, signingKey, expiry, intermediates) + updatedRootCA, err := RootCAFromAPI(ctx, rCA, expiry) if err != nil { return errors.Wrap(err, "invalid Root CA object in cluster") } s.localRootCA = &updatedRootCA s.externalCAPool = updatedRootCA.Pool + externalCACert := rCA.CACert if rCA.RootRotation != nil { + externalCACert = rCA.RootRotation.CACert // the external CA has to trust the new CA cert s.externalCAPool = x509.NewCertPool() s.externalCAPool.AppendCertsFromPEM(rCA.CACert) @@ -899,3 +900,19 @@ func isFinalState(status api.IssuanceStatus) bool { return false } + +// RootCAFromAPI creates a RootCA object from an api.RootCA object +func RootCAFromAPI(ctx context.Context, apiRootCA *api.RootCA, expiry time.Duration) (RootCA, error) { + var intermediates []byte + signingCert := apiRootCA.CACert + signingKey := apiRootCA.CAKey + if apiRootCA.RootRotation != nil { + signingCert = apiRootCA.RootRotation.CrossSignedCACert + signingKey = apiRootCA.RootRotation.CAKey + intermediates = apiRootCA.RootRotation.CrossSignedCACert + } + if signingKey == nil { + signingCert = nil + } + return NewRootCA(apiRootCA.CACert, signingCert, signingKey, expiry, intermediates) +} diff --git a/ca/server_test.go b/ca/server_test.go index 04db11f7af..a3b5b891ba 100644 --- a/ca/server_test.go +++ b/ca/server_test.go @@ -1264,13 +1264,13 @@ func TestRootRotationReconciliationRace(t *testing.T) { if !bytes.Equal(cluster.RootCA.CAKey, rotationKey) { return errors.New("expected root key is wrong") } - for i, server := range append(otherServers) { + for i, server := range otherServers { s, err := server.RootCA().Signer() if err != nil { return err } if !bytes.Equal(s.Key, rotationKey) { - return errors.Errorf("all the servers' root CAs haven't been updated yet: server %d", i) + return errors.Errorf("server %d's root CAs hasn't been updated yet", i) } } return nil diff --git a/manager/controlapi/ca_rotation_test.go b/manager/controlapi/ca_rotation_test.go index 6b029074c5..1fccd14385 100644 --- a/manager/controlapi/ca_rotation_test.go +++ b/manager/controlapi/ca_rotation_test.go @@ -15,7 +15,6 @@ import ( "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/ca" "github.com/docker/swarmkit/ca/testutils" - "github.com/docker/swarmkit/manager/state/store" "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -66,8 +65,6 @@ func getSecurityConfig(t *testing.T, localRootCA *ca.RootCA, cluster *api.Cluste secConfig, cancel, err := localRootCA.CreateSecurityConfig(context.Background(), ca.NewKeyReadWriter(paths.Node, nil, nil), ca.CertificateRequestConfig{}) require.NoError(t, err) cancel() - - require.NoError(t, ca.NewServer(store.NewMemoryStore(nil), secConfig, paths.RootCA).UpdateRootCA(context.Background(), cluster)) return secConfig } diff --git a/manager/controlapi/cluster.go b/manager/controlapi/cluster.go index 7e9dea2ce5..329313a950 100644 --- a/manager/controlapi/cluster.go +++ b/manager/controlapi/cluster.go @@ -104,17 +104,14 @@ func (s *Server) UpdateCluster(ctx context.Context, request *api.UpdateClusterRe if cluster == nil { return grpc.Errorf(codes.NotFound, "cluster %s not found", request.ClusterID) } - // This ensures that we always have the latest security config, so our ca.SecurityConfig.RootCA and - // ca.SecurityConfig.externalCA objects are up-to-date with the current api.Cluster.RootCA and - // api.Cluster.Spec.ExternalCA objects, respectively. Note that if, during this update, the cluster gets - // updated again with different CA info and the security config gets changed under us, that's still fine because - // this cluster update would fail anyway due to its version being too low on write. - if err := s.scu.UpdateRootCA(ctx, cluster); err != nil { + // This ensures that we have the current rootCA with which to generate tokens (expiration doesn't matter + // for generating the tokens) + rootCA, err := ca.RootCAFromAPI(ctx, &cluster.RootCA, ca.DefaultNodeCertExpiration) + if err != nil { log.G(ctx).WithField( - "method", "(*controlapi.Server).UpdateCluster").WithError(err).Error("could not update security config") - return grpc.Errorf(codes.Internal, "could not update security config") + "method", "(*controlapi.Server).UpdateCluster").WithError(err).Error("invalid cluster root CA") + return grpc.Errorf(codes.Internal, "error loading cluster rootCA for update") } - rootCA := s.securityConfig.RootCA() cluster.Meta.Version = *request.ClusterVersion cluster.Spec = *request.Spec.Copy() @@ -122,10 +119,10 @@ func (s *Server) UpdateCluster(ctx context.Context, request *api.UpdateClusterRe expireBlacklistedCerts(cluster) if request.Rotation.WorkerJoinToken { - cluster.RootCA.JoinTokens.Worker = ca.GenerateJoinToken(rootCA) + cluster.RootCA.JoinTokens.Worker = ca.GenerateJoinToken(&rootCA) } if request.Rotation.ManagerJoinToken { - cluster.RootCA.JoinTokens.Manager = ca.GenerateJoinToken(rootCA) + cluster.RootCA.JoinTokens.Manager = ca.GenerateJoinToken(&rootCA) } updatedRootCA, err := validateCAConfig(ctx, s.securityConfig, cluster) diff --git a/manager/controlapi/server.go b/manager/controlapi/server.go index c2490ba002..a049a55e89 100644 --- a/manager/controlapi/server.go +++ b/manager/controlapi/server.go @@ -18,18 +18,15 @@ type Server struct { store *store.MemoryStore raft *raft.Node securityConfig *ca.SecurityConfig - scu ca.APISecurityConfigUpdater pg plugingetter.PluginGetter } // NewServer creates a Cluster API server. -func NewServer(store *store.MemoryStore, raft *raft.Node, securityConfig *ca.SecurityConfig, - scu ca.APISecurityConfigUpdater, pg plugingetter.PluginGetter) *Server { +func NewServer(store *store.MemoryStore, raft *raft.Node, securityConfig *ca.SecurityConfig, pg plugingetter.PluginGetter) *Server { return &Server{ store: store, raft: raft, securityConfig: securityConfig, - scu: scu, pg: pg, } } diff --git a/manager/controlapi/server_test.go b/manager/controlapi/server_test.go index 934bd6cb29..8cfdad6a90 100644 --- a/manager/controlapi/server_test.go +++ b/manager/controlapi/server_test.go @@ -48,7 +48,7 @@ func newTestServer(t *testing.T) *testServer { ts.Store = store.NewMemoryStore(&stateutils.MockProposer{}) assert.NotNil(t, ts.Store) - ts.Server = NewServer(ts.Store, nil, securityConfig, ca.NewServer(ts.Store, securityConfig, tc.Paths.RootCA), nil) + ts.Server = NewServer(ts.Store, nil, securityConfig, nil) assert.NotNil(t, ts.Server) temp, err := ioutil.TempFile("", "test-socket") diff --git a/manager/manager.go b/manager/manager.go index a9bda3be35..33f938e193 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -224,7 +224,7 @@ func New(config *Config) (*Manager, error) { m := &Manager{ config: *config, - caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig, config.RootCAPaths), + caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig), dispatcher: dispatcher.New(raftNode, dispatcher.DefaultConfig(), drivers.New(config.PluginGetter), config.SecurityConfig), logbroker: logbroker.New(raftNode.MemoryStore()), watchServer: watchapi.NewServer(raftNode.MemoryStore()), @@ -404,7 +404,7 @@ func (m *Manager) Run(parent context.Context) error { return err } - baseControlAPI := controlapi.NewServer(m.raftNode.MemoryStore(), m.raftNode, m.config.SecurityConfig, m.caserver, m.config.PluginGetter) + baseControlAPI := controlapi.NewServer(m.raftNode.MemoryStore(), m.raftNode, m.config.SecurityConfig, m.config.PluginGetter) baseResourceAPI := resourceapi.New(m.raftNode.MemoryStore()) healthServer := health.NewHealthServer() localHealthServer := health.NewHealthServer() @@ -720,16 +720,14 @@ func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error { func (m *Manager) watchForClusterChanges(ctx context.Context) error { clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization() + var cluster *api.Cluster clusterWatch, clusterWatchCancel, err := store.ViewAndWatch(m.raftNode.MemoryStore(), func(tx store.ReadTx) error { - cluster := store.GetCluster(tx, clusterID) + cluster = store.GetCluster(tx, clusterID) if cluster == nil { return fmt.Errorf("unable to get current cluster") } - if err := m.caserver.UpdateRootCA(ctx, cluster); err != nil { - log.G(ctx).WithError(err).Error("could not update security config") - } - return m.updateKEK(ctx, cluster) + return nil }, api.EventUpdateCluster{ Cluster: &api.Cluster{ID: clusterID}, @@ -739,14 +737,15 @@ func (m *Manager) watchForClusterChanges(ctx context.Context) error { if err != nil { return err } + if err := m.updateKEK(ctx, cluster); err != nil { + return err + } + go func() { for { select { case event := <-clusterWatch: clusterEvent := event.(api.EventUpdateCluster) - if err := m.caserver.UpdateRootCA(ctx, clusterEvent.Cluster); err != nil { - log.G(ctx).WithError(err).Error("could not update security config") - } m.updateKEK(ctx, clusterEvent.Cluster) case <-ctx.Done(): clusterWatchCancel() diff --git a/manager/manager_test.go b/manager/manager_test.go index b6b8820331..bf72ef1b30 100644 --- a/manager/manager_test.go +++ b/manager/manager_test.go @@ -424,114 +424,6 @@ func TestManagerLockUnlock(t *testing.T) { <-done } -// If the root CA material is updated in the memory store, a manager will update its own -// security configs even if it's "not the leader" (which we will fake by calling `becomeFollower`) -func TestManagerUpdatesSecurityConfig(t *testing.T) { - temp, err := ioutil.TempFile("", "test-manager-update-security-config") - require.NoError(t, err) - require.NoError(t, temp.Close()) - require.NoError(t, os.Remove(temp.Name())) - - defer os.RemoveAll(temp.Name()) - - stateDir, err := ioutil.TempDir("", "test-raft") - require.NoError(t, err) - defer os.RemoveAll(stateDir) - - tc := cautils.NewTestCA(t) - defer tc.Stop() - - managerSecurityConfig, err := tc.NewNodeConfig(ca.ManagerRole) - require.NoError(t, err) - - _, _, err = managerSecurityConfig.KeyReader().Read() - require.NoError(t, err) - - m, err := New(&Config{ - RemoteAPI: &RemoteAddrs{ListenAddr: "127.0.0.1:0"}, - ControlAPI: temp.Name(), - StateDir: stateDir, - SecurityConfig: managerSecurityConfig, - RootCAPaths: tc.Paths.RootCA, - }) - require.NoError(t, err) - require.NotNil(t, m) - - done := make(chan error) - defer close(done) - go func() { - done <- m.Run(tc.Context) - }() - - // wait until the CA server is running - opts := []grpc.DialOption{ - grpc.WithTimeout(10 * time.Second), - grpc.WithTransportCredentials(managerSecurityConfig.ClientTLSCreds), - } - - conn, err := grpc.Dial(m.Addr(), opts...) - require.NoError(t, err) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := api.NewCAClient(conn) - - require.NoError(t, testutils.PollFuncWithTimeout(nil, func() error { - ctx, _ := context.WithTimeout(tc.Context, 500*time.Millisecond) - _, err := client.GetRootCACertificate(ctx, &api.GetRootCACertificateRequest{}) - return err - }, time.Second)) - - // wait until the cluster is up - var clusters []*api.Cluster - - require.NoError(t, testutils.PollFuncWithTimeout(nil, func() error { - var err error - m.raftNode.MemoryStore().View(func(tx store.ReadTx) { - clusters, err = store.FindClusters(tx, store.ByName(store.DefaultClusterName)) - }) - if err != nil { - return err - } - if len(clusters) == 0 { - return fmt.Errorf("cluster not ready yet") - } - return nil - }, 1*time.Second)) - - // stop running CA server and other leader functions - m.mu.Lock() - m.becomeFollower() - m.mu.Unlock() - - newRootCert, _, err := cautils.CreateRootCertAndKey("rootOther") - require.NoError(t, err) - updatedCA := append(tc.RootCA.Certs, newRootCert...) - - // Update the RootCA to have a bundle - require.NoError(t, m.raftNode.MemoryStore().Update(func(tx store.Tx) error { - cluster := store.GetCluster(tx, clusters[0].ID) - cluster.RootCA.CACert = updatedCA - return store.UpdateCluster(tx, cluster) - })) - - // wait for the manager's security config to be updated - require.NoError(t, testutils.PollFuncWithTimeout(nil, func() error { - if !bytes.Equal(managerSecurityConfig.RootCA().Certs, updatedCA) { - return fmt.Errorf("root CA not updated yet") - } - return nil - }, 1*time.Second)) - - m.Stop(tc.Context, false) - - // After stopping we should MAY receive an error from ListenAndServe if - // all this happened before WaitForLeader completed, so don't check the - // error. - <-done -} - // Tests manager rotates encryption of root key data in the raft store func TestManagerEncryptsDecryptsRootKeyMaterial(t *testing.T) { tc := cautils.NewTestCA(t) diff --git a/node/node.go b/node/node.go index d919b49a21..1d7b975965 100644 --- a/node/node.go +++ b/node/node.go @@ -308,8 +308,6 @@ func (n *Node) run(ctx context.Context) (err error) { case <-agentDone: return case nodeChanges := <-n.notifyNodeChange: - currentRole := n.currentRole() - if nodeChanges.Node != nil { // This is a bit complex to be backward compatible with older CAs that // don't support the Node.Role field. They only use what's presently @@ -335,10 +333,7 @@ func (n *Node) run(ctx context.Context) (err error) { } if nodeChanges.RootCert != nil { - // We only want to update the root CA if this is a worker node. Manager nodes directly watch the raft - // store and update the root CA, with the necessary signer, from the raft store (since the managers - // need the CA key as well to potentially issue new TLS certificates). - if currentRole == api.NodeRoleManager || bytes.Equal(nodeChanges.RootCert, securityConfig.RootCA().Certs) { + if bytes.Equal(nodeChanges.RootCert, securityConfig.RootCA().Certs) { continue } newRootCA, err := ca.NewRootCA(nodeChanges.RootCert, nil, nil, ca.DefaultNodeCertExpiration, nil) diff --git a/node/node_test.go b/node/node_test.go index 8f10f14eea..709b0d3d98 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -254,7 +254,7 @@ func TestLoadSecurityConfigDownloadAllCerts(t *testing.T) { require.NoError(t, err) } -func TestManagerIgnoresDispatcherRootCAUpdate(t *testing.T) { +func TestManagerRespectsDispatcherRootCAUpdate(t *testing.T) { tmpDir, err := ioutil.TempDir("", "manager-root-ca-update") require.NoError(t, err) defer os.RemoveAll(tmpDir) @@ -278,23 +278,34 @@ func TestManagerIgnoresDispatcherRootCAUpdate(t *testing.T) { require.FailNow(t, "node did not ready in time") } - certPath := filepath.Join(tmpDir, certDirectory, "swarm-root-ca.crt") - currentCACerts, err := ioutil.ReadFile(certPath) + // ensure that we have a second dispatcher that we can connect to when we shut down ours + paths := ca.NewConfigPaths(filepath.Join(tmpDir, certDirectory)) + rootCA, err := ca.GetLocalRootCA(paths.RootCA) require.NoError(t, err) - parsedCerts, err := helpers.ParseCertificatesPEM(currentCACerts) + managerSecConfig, cancel, err := ca.LoadSecurityConfig(context.Background(), rootCA, ca.NewKeyReadWriter(paths.Node, nil, nil), false) require.NoError(t, err) - require.Len(t, parsedCerts, 1) + defer cancel() - // fake an update from the dispatcher, because the dispatcher is actually the local manager + mockDispatcher, cleanup := agentutils.NewMockDispatcher(t, managerSecConfig, false) + defer cleanup() + node.remotes.Observe(api.Peer{Addr: mockDispatcher.Addr}, 1) + + currentCACerts := rootCA.Certs + + // shut down our current manager so that when the root CA changes, the manager doesn't "fix" it. + node.manager.Stop(context.Background(), false) + + // fake an update from a remote dispatcher node.notifyNodeChange <- &agent.NodeChanges{ RootCert: append(currentCACerts, cautils.ECDSA256SHA256Cert...), } - // the node root CA certificates do not change + // the node root CA certificates have changed now time.Sleep(250 * time.Millisecond) + certPath := filepath.Join(tmpDir, certDirectory, "swarm-root-ca.crt") caCerts, err := ioutil.ReadFile(certPath) require.NoError(t, err) - require.Equal(t, currentCACerts, caCerts) + require.NotEqual(t, currentCACerts, caCerts) require.NoError(t, node.Stop(context.Background())) }