Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ import (
"github.com/openshift/cluster-version-operator/pkg/payload/precondition"
preconditioncv "github.com/openshift/cluster-version-operator/pkg/payload/precondition/clusterversion"
"github.com/openshift/cluster-version-operator/pkg/verify"
"github.com/openshift/cluster-version-operator/pkg/verify/verifyconfigmap"
"github.com/openshift/cluster-version-operator/pkg/verify/store"
"github.com/openshift/cluster-version-operator/pkg/verify/store/configmap"
"github.com/openshift/cluster-version-operator/pkg/verify/store/serial"
)

const (
Expand Down Expand Up @@ -306,8 +308,8 @@ func loadConfigMapVerifierDataFromUpdate(update *payload.Update, clientBuilder v

// allow the verifier to consult the cluster for signature data, and also configure
// a process that writes signatures back to that store
signatureStore := verifyconfigmap.NewStore(configMapClient, nil)
verifier = verifier.WithStores(signatureStore)
signatureStore := configmap.NewStore(configMapClient, nil)
verifier.Store = &serial.Store{Stores: []store.Store{signatureStore, verifier.Store}}
persister := verify.NewSignatureStorePersister(signatureStore, verifier)
return verifier, persister, nil
}
Expand Down
18 changes: 15 additions & 3 deletions pkg/verify/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"github.com/pkg/errors"
"golang.org/x/crypto/openpgp"
"k8s.io/klog"

"github.com/openshift/cluster-version-operator/pkg/verify/store"
"github.com/openshift/cluster-version-operator/pkg/verify/store/parallel"
)

// ReleaseAnnotationConfigMapVerifier is an annotation set on a config map in the
Expand Down Expand Up @@ -48,7 +51,7 @@ const ReleaseAnnotationConfigMapVerifier = "release.openshift.io/verification-co
// store and the lookup order is internally defined.
func NewFromConfigMapData(src string, data map[string]string, clientBuilder ClientBuilder) (*ReleaseVerifier, error) {
verifiers := make(map[string]openpgp.EntityList)
var stores []*url.URL
var stores []store.Store
for k, v := range data {
switch {
case strings.HasPrefix(k, "verifier-public-key-"):
Expand All @@ -63,7 +66,16 @@ func NewFromConfigMapData(src string, data map[string]string, clientBuilder Clie
if err != nil || (u.Scheme != "http" && u.Scheme != "https" && u.Scheme != "file") {
return nil, fmt.Errorf("%s has an invalid key %q: must be a valid URL with scheme file://, http://, or https://", src, k)
}
stores = append(stores, u)
if u.Scheme == "file" {
stores = append(stores, &fileStore{
directory: u.Path,
})
} else {
stores = append(stores, &httpStore{
uri: u,
httpClient: clientBuilder.HTTPClient,
})
}
default:
klog.Warningf("An unexpected key was found in %s and will be ignored (expected store-* or verifier-public-key-*): %s", src, k)
}
Expand All @@ -75,7 +87,7 @@ func NewFromConfigMapData(src string, data map[string]string, clientBuilder Clie
return nil, fmt.Errorf("%s did not provide any GPG public keys to verify signatures from and cannot be used", src)
}

return NewReleaseVerifier(verifiers, stores, clientBuilder), nil
return NewReleaseVerifier(verifiers, &parallel.Store{Stores: stores}), nil
}

func loadArmoredOrUnarmoredGPGKeyRing(data []byte) (openpgp.EntityList, error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package verifyconfigmap
package configmap

import (
"context"
Expand All @@ -16,6 +16,8 @@ import (
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
"k8s.io/klog"

"github.com/openshift/cluster-version-operator/pkg/verify/store"
)

// ReleaseLabelConfigMap is a label applied to a configmap inside the
Expand Down Expand Up @@ -87,10 +89,10 @@ func digestToKeyPrefix(digest string) (string, error) {
return fmt.Sprintf("%s-%s", algo, hash), nil
}

// DigestSignatures returns a list of signatures that match the request
// Signatures returns a list of signatures that match the request
// digest out of config maps labelled with ReleaseLabelConfigMap in the
// openshift-config-managed namespace.
func (s *Store) DigestSignatures(ctx context.Context, digest string) ([][]byte, error) {
func (s *Store) Signatures(ctx context.Context, name string, digest string, fn store.Callback) error {
// avoid repeatedly reloading config maps
items := s.mostRecentConfigMaps()
r := s.limiter.Reserve()
Expand All @@ -100,31 +102,36 @@ func (s *Store) DigestSignatures(ctx context.Context, digest string) ([][]byte,
})
if err != nil {
s.rememberMostRecentConfigMaps([]corev1.ConfigMap{})
return nil, err
return err
}
items = configMaps.Items
s.rememberMostRecentConfigMaps(configMaps.Items)
}

prefix, err := digestToKeyPrefix(digest)
if err != nil {
return nil, err
return err
}

var signatures [][]byte
for _, cm := range items {
klog.V(4).Infof("searching for %s in signature config map %s", prefix, cm.ObjectMeta.Name)
for k, v := range cm.BinaryData {
if strings.HasPrefix(k, prefix) {
klog.V(4).Infof("key %s from signature config map %s matches %s", k, cm.ObjectMeta.Name, digest)
signatures = append(signatures, v)
done, err := fn(ctx, v, nil)
if err != nil || done {
return err
}
if err := ctx.Err(); err != nil {
return err
}
}
}
}
return signatures, nil
return nil
}

// Store attempts to persist the provided signatures into a form DigestSignatures will
// Store attempts to persist the provided signatures into a form Signatures will
// retrieve.
func (s *Store) Store(ctx context.Context, signaturesByDigest map[string][][]byte) error {
cm := &corev1.ConfigMap{
Expand Down
36 changes: 36 additions & 0 deletions pkg/verify/store/memory/memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Package memory implements an in-memory signature store. This is
// mostly useful for testing.
package memory

import (
"context"

"github.com/openshift/cluster-version-operator/pkg/verify/store"
)

// Store provides access to signatures stored in memory.
type Store struct {
// Data maps digests to slices of signatures.
Data map[string][][]byte
}

// Signatures fetches signatures for the provided digest.
func (s *Store) Signatures(ctx context.Context, name string, digest string, fn store.Callback) error {
for _, signature := range s.Data[digest] {
done, err := fn(ctx, signature, nil)
if err != nil || done {
return err
}
if err := ctx.Err(); err != nil {
return err
}
}

return nil
}

// String returns a description of where this store finds
// signatures.
func (s *Store) String() string {
return "in-memory signature store"
}
92 changes: 92 additions & 0 deletions pkg/verify/store/parallel/parallel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Package parallel combines several signature stores in a single store.
// Signatures are searched in each substore simultaneously until a
// match is found.
package parallel

import (
"context"
"fmt"
"strings"

"github.com/openshift/cluster-version-operator/pkg/verify/store"
)

type signatureResponse struct {
signature []byte
errIn error
}

// Store provides access to signatures stored in sub-stores.
type Store struct {
Stores []store.Store
}

// Signatures fetches signatures for the provided digest.
func (s *Store) Signatures(ctx context.Context, name string, digest string, fn store.Callback) error {
nestedCtx, cancel := context.WithCancel(ctx)
defer cancel()
responses := make(chan signatureResponse, len(s.Stores))
errorChannelCount := 0
errorChannel := make(chan error, 1)

for i := range s.Stores {
errorChannelCount++
go func(ctx context.Context, wrappedStore store.Store, name string, digest string, responses chan signatureResponse, errorChannel chan error) {
errorChannel <- wrappedStore.Signatures(ctx, name, digest, func(ctx context.Context, signature []byte, errIn error) (done bool, err error) {
select {
case <-ctx.Done():
return true, nil
case responses <- signatureResponse{signature: signature, errIn: errIn}:
}
return false, nil
})
}(nestedCtx, s.Stores[i], name, digest, responses, errorChannel)
}

allDone := false
var loopError error
for errorChannelCount > 0 {
if allDone {
err := <-errorChannel
errorChannelCount--
if loopError == nil && err != nil && err != context.Canceled && err != context.DeadlineExceeded {
loopError = err
}
} else {
select {
case response := <-responses:
done, err := fn(ctx, response.signature, response.errIn)
if done || err != nil {
allDone = true
loopError = err
cancel()
}
case err := <-errorChannel:
errorChannelCount--
if loopError == nil && err != nil && err != context.Canceled && err != context.DeadlineExceeded {
loopError = err
}
}
}
}
close(responses)
close(errorChannel)
if loopError != nil {
return loopError
}
return ctx.Err() // because we discard context errors from the wrapped stores
}

// String returns a description of where this store finds
// signatures.
func (s *Store) String() string {
wrapped := "no stores"
if len(s.Stores) > 0 {
names := make([]string, 0, len(s.Stores))
for _, store := range s.Stores {
names = append(names, store.String())
}
wrapped = strings.Join(names, ", ")
}
return fmt.Sprintf("parallel signature store wrapping %s", wrapped)
}
Loading