Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 259 additions & 0 deletions ca/reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
package ca

import (
"bytes"
"context"
"fmt"
"reflect"
"sync"
"time"

"github.com/cloudflare/cfssl/helpers"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/api/equality"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/state/store"
"github.com/pkg/errors"
)

// IssuanceStateRotateMaxBatchSize is the maximum number of nodes we'll tell to rotate their certificates in any given update
const IssuanceStateRotateMaxBatchSize = 30

func hasIssuer(n *api.Node, info *IssuerInfo) bool {
if n.Description == nil || n.Description.TLSInfo == nil {
return false
}
return bytes.Equal(info.Subject, n.Description.TLSInfo.CertIssuerSubject) && bytes.Equal(info.PublicKey, n.Description.TLSInfo.CertIssuerPublicKey)
}

var errRootRotationChanged = errors.New("target root rotation has changed")

// rootRotationReconciler keeps track of all the nodes in the store so that we can determine which ones need reconciliation when nodes are updated
// or the root CA is updated. This is meant to be used with watches on nodes and the cluster, and provides functions to be called when the
// cluster's RootCA has changed and when a node is added, updated, or removed.
type rootRotationReconciler struct {
mu sync.Mutex
clusterID string
batchUpdateInterval time.Duration
ctx context.Context
store *store.MemoryStore

currentRootCA *api.RootCA
currentIssuer IssuerInfo
unconvergedNodes map[string]*api.Node

wg sync.WaitGroup
cancel func()
}

// IssuerFromAPIRootCA returns the desired issuer given an API root CA object
func IssuerFromAPIRootCA(rootCA *api.RootCA) (*IssuerInfo, error) {
wantedIssuer := rootCA.CACert
if rootCA.RootRotation != nil {
wantedIssuer = rootCA.RootRotation.CACert
}
issuerCerts, err := helpers.ParseCertificatesPEM(wantedIssuer)
if err != nil {
return nil, errors.Wrap(err, "invalid certificate in cluster root CA object")
}
if len(issuerCerts) == 0 {
return nil, errors.New("invalid certificate in cluster root CA object")
}
return &IssuerInfo{
Subject: issuerCerts[0].RawSubject,
PublicKey: issuerCerts[0].RawSubjectPublicKeyInfo,
}, nil
}

// assumption: UpdateRootCA will never be called with a `nil` root CA because the caller will be acting in response to
// a store update event
func (r *rootRotationReconciler) UpdateRootCA(newRootCA *api.RootCA) {
issuerInfo, err := IssuerFromAPIRootCA(newRootCA)
if err != nil {
log.G(r.ctx).WithError(err).Error("unable to update process the current root CA")
return
}

var (
shouldStartNewLoop, waitForPrevLoop bool
loopCtx context.Context
)
r.mu.Lock()
defer func() {
r.mu.Unlock()
if shouldStartNewLoop {
if waitForPrevLoop {
r.wg.Wait()
}
go r.runReconcilerLoop(loopCtx, newRootCA)
}
}()

// check if the issuer has changed, first
if reflect.DeepEqual(&r.currentIssuer, issuerInfo) {
r.currentRootCA = newRootCA
return
}
// If the issuer has changed, iterate through all the nodes to figure out which ones need rotation
if newRootCA.RootRotation != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what situation would we have an issuer mismatch, but no RootRotation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the root rotation were abandoned, for instance. Previously the issuer would have been the new root cert, but if before the root rotation finished someone rotated the desired cert back to the original cert, the root rotation could be done.

var nodes []*api.Node
r.store.View(func(tx store.ReadTx) {
nodes, err = store.FindNodes(tx, store.ByMembership(api.NodeMembershipAccepted))
})
if err != nil {
log.G(r.ctx).WithError(err).Error("unable to list nodes, so unable to process the current root CA")
return
}

// from here on out, there will be no more errors that cause us to have to abandon updating the Root CA,
// so we can start making changes to r's fields
r.unconvergedNodes = make(map[string]*api.Node)
for _, n := range nodes {
if !hasIssuer(n, issuerInfo) {
r.unconvergedNodes[n.ID] = n
}
}
shouldStartNewLoop = true
if r.cancel != nil { // there's already a loop going, so cancel it
r.cancel()
waitForPrevLoop = true
}
loopCtx, r.cancel = context.WithCancel(r.ctx)
} else {
r.unconvergedNodes = nil
}
r.currentRootCA = newRootCA
r.currentIssuer = *issuerInfo
}

// assumption: UpdateNode will never be called with a `nil` node because the caller will be acting in response to
// a store update event
func (r *rootRotationReconciler) UpdateNode(node *api.Node) {
r.mu.Lock()
defer r.mu.Unlock()
// if we're not in the middle of a root rotation, or if this node does not have membership, ignore it
if r.currentRootCA == nil || r.currentRootCA.RootRotation == nil || node.Spec.Membership != api.NodeMembershipAccepted {
return
}
if hasIssuer(node, &r.currentIssuer) {
delete(r.unconvergedNodes, node.ID)
} else {
r.unconvergedNodes[node.ID] = node
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In wich situation would a node not be in this r.unconvergedNodes map but UpdateNode gets called for it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If everything is working correctly, then it's unlikely, but if you are rotating the root cert back and forth really quickly, it's possible that the root ca in the reconciler finishes updating before the root CA in the signer finishes updating, and the node could get a new TLS cert signed with the previous root.

Previously there was a bug where this update took too long, and blocked the signer root CA update, and the nodes got a few TLS cert updates that were signed with the wrong key, and it eventually recovered.

}
}

// assumption: DeleteNode will never be called with a `nil` node because the caller will be acting in response to
// a store update event
func (r *rootRotationReconciler) DeleteNode(node *api.Node) {
r.mu.Lock()
delete(r.unconvergedNodes, node.ID)
r.mu.Unlock()
}

func (r *rootRotationReconciler) runReconcilerLoop(ctx context.Context, loopRootCA *api.RootCA) {
r.wg.Add(1)
defer r.wg.Done()
for {
r.mu.Lock()
if len(r.unconvergedNodes) == 0 {
r.mu.Unlock()

err := r.store.Update(func(tx store.Tx) error {
return r.finishRootRotation(tx, loopRootCA)
})
if err == nil {
log.G(r.ctx).Info("completed root rotation")
return
}
log.G(r.ctx).WithError(err).Error("could not complete root rotation")
if err == errRootRotationChanged {
// if the root rotation has changed, this loop will be cancelled anyway, so may as well abort early
return
}
} else {
var toUpdate []*api.Node
for _, n := range r.unconvergedNodes {
iState := n.Certificate.Status.State
if iState != api.IssuanceStateRenew && iState != api.IssuanceStatePending && iState != api.IssuanceStateRotate {
n = n.Copy()
n.Certificate.Status.State = api.IssuanceStateRotate
toUpdate = append(toUpdate, n)
if len(toUpdate) >= IssuanceStateRotateMaxBatchSize {
break
}
}
}
r.mu.Unlock()

if err := r.batchUpdateNodes(toUpdate); err != nil {
log.G(r.ctx).WithError(err).Errorf("store error when trying to batch update %d nodes to request certificate rotation", len(toUpdate))
}
}

select {
case <-ctx.Done():
return
case <-time.After(r.batchUpdateInterval):
}
}
}

// This function assumes that the expected root CA has root rotation. This is intended to be used by
// `reconcileNodeRootsAndCerts`, which uses the root CA from the `lastSeenClusterRootCA`, and checks
// that it has a root rotation before calling this function.
func (r *rootRotationReconciler) finishRootRotation(tx store.Tx, expectedRootCA *api.RootCA) error {
cluster := store.GetCluster(tx, r.clusterID)
if cluster == nil {
return fmt.Errorf("unable to get cluster %s", r.clusterID)
}

// If the RootCA object has changed (because another root rotation was started or because some other node
// had finished the root rotation), we cannot finish the root rotation that we were working on.
if !equality.RootCAEqualStable(expectedRootCA, &cluster.RootCA) {
return errRootRotationChanged
}

var signerCert []byte
if len(cluster.RootCA.RootRotation.CAKey) > 0 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we guaranteed to not have a nil cluster.RootCA and cluster.RootCA.RootRotation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because we compare it to the expectedRootCA first, which is guaranteed not to be nil or have a nil RootRotation. If the current root CA is not equal to the expected one, we abort.

signerCert = cluster.RootCA.RootRotation.CACert
}
// we don't actually have to parse out the default node expiration from the cluster - we are just using
// the ca.RootCA object to generate new tokens and the digest
updatedRootCA, err := NewRootCA(cluster.RootCA.RootRotation.CACert, signerCert, cluster.RootCA.RootRotation.CAKey,
DefaultNodeCertExpiration, nil)
if err != nil {
return errors.Wrap(err, "invalid cluster root rotation object")
}
cluster.RootCA = api.RootCA{
CACert: cluster.RootCA.RootRotation.CACert,
CAKey: cluster.RootCA.RootRotation.CAKey,
CACertHash: updatedRootCA.Digest.String(),
JoinTokens: api.JoinTokens{
Worker: GenerateJoinToken(&updatedRootCA),
Manager: GenerateJoinToken(&updatedRootCA),
},
LastForcedRotation: cluster.RootCA.LastForcedRotation,
}
return store.UpdateCluster(tx, cluster)
}

func (r *rootRotationReconciler) batchUpdateNodes(toUpdate []*api.Node) error {
if len(toUpdate) == 0 {
return nil
}
_, err := r.store.Batch(func(batch *store.Batch) error {
// Directly update the nodes rather than get + update, and ignore version errors. Since
// `rootRotationReconciler` should be hooked up to all node update/delete/create events, we should have
// close to the latest versions of all the nodes. If not, the node will updated later and the
// next batch of updates should catch it.
for _, n := range toUpdate {
if err := batch.Update(func(tx store.Tx) error {
return store.UpdateNode(tx, n)
}); err != nil && err != store.ErrSequenceConflict {
log.G(r.ctx).WithError(err).Errorf("unable to update node %s to request a certificate rotation", n.ID)
}
}
return nil
})
return err
}
Loading