Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 274 additions & 0 deletions pkg/admin/migration/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
package migration

import (
"context"
"fmt"
"strings"
"time"

crdclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
v1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
apiserviceclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"

"k8s.io/klog/v2"
)

// Skipping resources which might cycle quickly or cause a lot of overhead to migrate
var excludeResources = sets.NewString(
"events",
)

var metadataAccessor = meta.NewAccessor()

type migrator struct {
client dynamic.Interface
discoveryClient discovery.ServerResourcesInterface
crdClient v1.CustomResourceDefinitionInterface
apiserviceClient apiregistrationv1.APIServiceInterface
}

func NewMigrator(kubeConfigPath string) (*migrator, error) {
restConfig, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
if err != nil {
return nil, fmt.Errorf("failed to build rest config: %w", err)
}
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to build kubernetes clientset config: %w", err)
}
crd, err := crdclient.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to build crd clientset config: %w", err)
}
apiservice, err := apiserviceclient.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to build apiservice client config: %w", err)
}
dynamic, err := dynamic.NewForConfig(restConfig)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming the variable the same as the package caught my eye here. If there's another reason to update the PR, you could consider renaming the variable to something like dynamicClient.

if err != nil {
return nil, fmt.Errorf("failed to build dynamic client config: %w", err)
}

return &migrator{
client: dynamic,
discoveryClient: clientset.Discovery(),
crdClient: crd.ApiextensionsV1().CustomResourceDefinitions(),
apiserviceClient: apiservice.ApiregistrationV1().APIServices(),
}, nil
}

func (d *migrator) Start(ctx context.Context) (*MigrationResultList, error) {
return d.start(ctx)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the extra level of indirection for here?

}

func (d *migrator) start(ctx context.Context) (*MigrationResultList, error) {
schemas, err := d.findMigratableResources(ctx)
if err != nil {
return nil, err
}
results := &MigrationResultList{
Status: MigrationSuccess,
}
errorOccured := false
start := time.Now()
klog.Info("schema migration started")

// Currently we are sequentially migrating items, we will need to revisit this if performance becomes a problem
for _, sch := range schemas {
// A list of objects might be very large, they will be chunked results with a continue token
// here we loop for as many times we have a continue token or an error occurred
continueToken := ""
for {
objectList := &unstructured.UnstructuredList{}
migrationErr := retry.OnError(retry.DefaultBackoff, canRetry, func() error {
var err error
objectList, err = d.client.Resource(sch).Namespace(metav1.NamespaceAll).List(ctx, metav1.ListOptions{
Continue: continueToken,
})
if err != nil {
return err
}
return nil
})

// If resource expired error, retry
if migrationErr != nil && errors.IsResourceExpired(migrationErr) {
token, err := inconsistentContinueToken(migrationErr)
if err != nil {
err = fmt.Errorf("failed to get continue token: %w", err)
results.Items = append(results.Items, MigrationResult{
Error: err,
GroupVersionResource: sch,
Timestamp: time.Now()})
break
}
continueToken = token
continue
}

if migrationErr != nil {
errorOccured = true
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if, at this point, we shouldn't just call off the whole thing. We care for "all or nothing".
Do you foresee another component that would handle failure in results?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The failure state is something we need to think in terms of our resources and the customers, typically if it fails it means theres a compatibility error with the resource versions, we should catch that in our testing for our CRs, but if the user has applied different CRDs that fail migration, they should be notified to fix them manually. I was thinking of this function call giving a migrator failure (i.e. can't reach server can't list resources) and a resource migration error, that would be recoverable.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data errors are recoverable, and it may make sense to collect all of them unless it makes the logic significantly more complicated.

migrationErr = fmt.Errorf("could not list resources: %w", migrationErr)
results.Items = append(results.Items, MigrationResult{
Error: migrationErr,
GroupVersionResource: sch,
Timestamp: time.Now()})
break
}

status := MigrationSuccess
for _, object := range objectList.Items {
ref := object
migrationErr := d.migrateOneItem(ctx, sch, &ref)
if migrationErr != nil {
errorOccured = true
status = MigrationFailure
}
}

results.Items = append(results.Items, MigrationResult{
Error: migrationErr,
Status: status,
GroupVersionResource: sch,
Timestamp: time.Now()})

// Check if the list contains a continue token
token, err := metadataAccessor.Continue(objectList)
if err != nil {
err = fmt.Errorf("failed to get continue token: %w", err)
results.Items = append(results.Items, MigrationResult{
Error: err,
GroupVersionResource: sch,
Timestamp: time.Now()})
break
}
if len(token) == 0 {
break
}
continueToken = token
}
}
if errorOccured {
results.Status = MigrationFailure
}
klog.InfoS("schema migration finished", "duration", time.Since(start).String())
return results, nil
}

// findMigratableResources finds all the resources that potentially need
// migration. Although all migratable resources are accessible via multiple
// versions, the returned list only include one version.
//
// It builds the list in these steps:
// 1. build a map from resource name to the groupVersions, excluding subresources, custom resources, or aggregated resources.
// 2. exclude all the resource that is only available from one groupVersions.
// 3. exclude the resource that does not support "list" and "update" (thus not migratable).
//
// More information can be found here:
// https://github.com/kubernetes-sigs/kube-storage-version-migrator/blob/acdee30ced218b79e39c6a701985e8cd8bd33824/pkg/initializer/discover.go#L55-L125
func (d *migrator) findMigratableResources(ctx context.Context) ([]schema.GroupVersionResource, error) {
aggregatedGroups, err := d.findAggregatedGroups(ctx)
if err != nil {
return nil, err
}
ret := []schema.GroupVersionResource{}
resourceLists, err := d.discoveryClient.ServerPreferredResources()
if err != nil {
return nil, err
}
for _, resourceList := range resourceLists {
gv, err := schema.ParseGroupVersion(resourceList.GroupVersion)
if err != nil {
klog.ErrorS(err, "cannot parse group version, ignored", "version", resourceList.GroupVersion)
continue
}

if aggregatedGroups.Has(gv.Group) {
klog.InfoS("ignored because it's an aggregated group", "group", gv.Group)
continue
}
for _, r := range resourceList.APIResources {
// ignore subresources
if strings.Contains(r.Name, "/") {
klog.InfoS("ignored subresource", "group", gv.Group, "name", r.Name, "version", gv.Version)
continue
}
// ignore excluded resources
if excludeResources.Has(r.Name) {
klog.InfoS("ignored excluded resource", "group", gv.Group, "name", r.Name, "version", gv.Version)
continue
}
// ignore resources that cannot be listed and updated
if !sets.NewString(r.Verbs...).HasAll("list", "update") {
klog.InfoS("ignored because verb does not contain list or update", "group", gv.Group, "name", r.Name, "version", gv.Version)
continue
}
ret = append(ret, gv.WithResource(r.Name))
}
}

return ret, nil
}

func (m *migrator) migrateOneItem(ctx context.Context, resource schema.GroupVersionResource, item *unstructured.Unstructured) error {
namespace, err := metadataAccessor.Namespace(item)
if err != nil {
return err
}
name, err := metadataAccessor.Name(item)
if err != nil {
return err
}

for {
err = m.try(ctx, resource, namespace, item)
if err == nil || errors.IsNotFound(err) {
klog.InfoS("successfully migrated object", "name", name, "namespace", namespace, "resource", resource.String())
return nil
}
if canRetry(err) {
seconds, delay := errors.SuggestsClientDelay(err)
klog.ErrorS(err, "migration of an object will be retried", "name", name, "namespace", namespace, "delay", seconds)
if delay {
time.Sleep(time.Duration(seconds) * time.Second)
}
continue
}
// error is not retriable
return fmt.Errorf("can not retry: %+v", err)
}
}

func (m *migrator) try(ctx context.Context, resource schema.GroupVersionResource, namespace string, item *unstructured.Unstructured) error {
_, err := m.client.
Resource(resource).
Namespace(namespace).
Update(ctx, item, metav1.UpdateOptions{})
return err
}

func (d *migrator) findAggregatedGroups(ctx context.Context) (sets.Set[string], error) {
ret := sets.New[string]()
l, err := d.apiserviceClient.List(ctx, metav1.ListOptions{})
if err != nil {
return ret, err
}
for _, apiservice := range l.Items {
if apiservice.Spec.Service != nil {
ret.Insert(apiservice.Spec.Group)
}
}
return ret, nil
}
63 changes: 63 additions & 0 deletions pkg/admin/migration/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package migration

import (
"encoding/json"
"fmt"
"os"
"time"

"k8s.io/apimachinery/pkg/runtime/schema"
)

type MigratonStatus string

const (
MigrationSuccess MigratonStatus = "success"
MigrationFailure MigratonStatus = "failure"
MigrationRunning MigratonStatus = "running"
)

// Container for individual migration attempts
type MigrationResult struct {
Error error `json:"Error,omitempty"`
Timestamp time.Time
Status MigratonStatus
schema.GroupVersionResource
}

type MigrationResultList struct {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we need that. Is there a way to log that some resource was migration from version A to B? We could log that in "real time", not packing all results into a collection and then iterating to log the items

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing, we can do that. The primary reason it's here is just to give us raw access to write the information in any format we want if we need to store the results somewhere easily.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the pre-run phase logging to the standard MicroShift log, or do we see these errors in the greenboot health check log? It would be nice if the greenboot log could at least report "there was a data migration error, check the MicroShift logs for details".

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's currently just using klog, I'm not sure how we send logs to greenboot but I love the idea. We should make it do that

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The klog output is going to go to stdout/stderr. I guess that's going to the systemd unit where pre-run is running, rather than a greenboot-specific log. Can we put these messages (and any others) somewhere for our health-check script to collect and report?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Status MigratonStatus
Items []MigrationResult
}

func (m *MigrationResultList) WriteStatusFile(filePath string) error {
data := fmt.Sprintf(`{"Status": "%s"}`, m.Status)
return os.WriteFile(filePath, []byte(data), 0600)
}

func (m *MigrationResultList) WriteDataFile(filePath string) error {
fileData, err := json.Marshal(m.Items)
if err != nil {
return err
}
return os.WriteFile(filePath, fileData, 0600)
}

type ErrRetriable struct {
error
}

func (ErrRetriable) Temporary() bool { return true }

type ErrNotRetriable struct {
error
}

func (ErrNotRetriable) Temporary() bool { return false }

// TemporaryError is a wrapper interface that is used to determine if an error can be retried.
type TemporaryError interface {
error
// Temporary should return true if this is a temporary error
Temporary() bool
}
Loading