From f250f7e581dfaffffa50b1575a3cb258a2b97856 Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Thu, 24 Nov 2022 13:18:23 +0100 Subject: [PATCH 1/8] Wait until APIExport virtual workspace URLs are ready --- main.go | 81 ++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 49 insertions(+), 32 deletions(-) diff --git a/main.go b/main.go index 2144da6..d58a544 100644 --- a/main.go +++ b/main.go @@ -22,30 +22,31 @@ import ( "fmt" "os" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/discovery" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/client" + "k8s.io/utils/strings/slices" - // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) - // to ensure that exec-entrypoint and run can make use of them. _ "k8s.io/client-go/plugin/pkg/client/auth" ctrl "sigs.k8s.io/controller-runtime" + // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) + // to ensure that exec-entrypoint and run can make use of them. + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/kcp" "sigs.k8s.io/controller-runtime/pkg/log/zap" - datav1alpha1 "github.com/kcp-dev/controller-runtime-example/api/v1alpha1" + apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1" + "github.com/kcp-dev/kcp/pkg/apis/third_party/conditions/util/conditions" // +kubebuilder:scaffold:imports + datav1alpha1 "github.com/kcp-dev/controller-runtime-example/api/v1alpha1" "github.com/kcp-dev/controller-runtime-example/controllers" - - apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1" ) var ( @@ -168,41 +169,57 @@ func restConfigForAPIExport(ctx context.Context, cfg *rest.Config, apiExportName return nil, fmt.Errorf("error adding apis.kcp.dev/v1alpha1 to scheme: %w", err) } - apiExportClient, err := client.New(cfg, client.Options{Scheme: scheme}) + apiExportClient, err := client.NewWithWatch(cfg, client.Options{Scheme: scheme}) if err != nil { return nil, fmt.Errorf("error creating APIExport client: %w", err) } - var apiExport apisv1alpha1.APIExport - + var opts []client.ListOption if apiExportName != "" { - if err := apiExportClient.Get(ctx, types.NamespacedName{Name: apiExportName}, &apiExport); err != nil { - return nil, fmt.Errorf("error getting APIExport %q: %w", apiExportName, err) - } - } else { - setupLog.Info("api-export-name is empty - listing") - exports := &apisv1alpha1.APIExportList{} - if err := apiExportClient.List(ctx, exports); err != nil { - return nil, fmt.Errorf("error listing APIExports: %w", err) - } - if len(exports.Items) == 0 { - return nil, fmt.Errorf("no APIExport found") - } - if len(exports.Items) > 1 { - return nil, fmt.Errorf("more than one APIExport found") - } - apiExport = exports.Items[0] + opts = append(opts, client.MatchingFieldsSelector{ + Selector: fields.OneTermEqualSelector("metadata.name", apiExportName), + }) } - if len(apiExport.Status.VirtualWorkspaces) < 1 { - return nil, fmt.Errorf("APIExport %q status.virtualWorkspaces is empty", apiExportName) + watch, err := apiExportClient.Watch(ctx, &apisv1alpha1.APIExportList{}, opts...) + if err != nil { + return nil, fmt.Errorf("error watching for APIExport: %w", err) } - cfg = rest.CopyConfig(cfg) - // TODO(ncdc): sharding support - cfg.Host = apiExport.Status.VirtualWorkspaces[0].URL + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case e := <-watch.ResultChan(): + apiExport, ok := e.Object.(*apisv1alpha1.APIExport) + if !ok { + continue + } + + if !slices.Contains(apiExport.Spec.LatestResourceSchemas, "today.widgets.data.my.domain") { + // This is not this controller APIExport + continue + } + + setupLog.Info("APIExport event received", "name", apiExport.Name, "event", e.Type) + + if !conditions.IsTrue(apiExport, apisv1alpha1.APIExportVirtualWorkspaceURLsReady) { + setupLog.Info("APIExport virtual workspace URLs are not ready", "APIExport", apiExport.Name) + continue + } - return cfg, nil + if len(apiExport.Status.VirtualWorkspaces) == 0 { + setupLog.Info("APIExport does not have any virtual workspace URLs", "APIExport", apiExport.Name) + continue + } + + setupLog.Info("Using APIExport to configure client", "APIExport", apiExport.Name) + cfg = rest.CopyConfig(cfg) + // TODO(ncdc): sharding support + cfg.Host = apiExport.Status.VirtualWorkspaces[0].URL + return cfg, nil + } + } } func kcpAPIsGroupPresent(restConfig *rest.Config) bool { From 48e00a73def1b467c624b75e4e8cd4ebef4111d4 Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Wed, 30 Nov 2022 14:24:57 +0100 Subject: [PATCH 2/8] Only match resource schemas when APIExport name is not provided --- main.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/main.go b/main.go index d58a544..6070e37 100644 --- a/main.go +++ b/main.go @@ -21,6 +21,7 @@ import ( "flag" "fmt" "os" + "strings" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" @@ -29,12 +30,12 @@ import ( clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/klog/v2" - "k8s.io/utils/strings/slices" - _ "k8s.io/client-go/plugin/pkg/client/auth" - ctrl "sigs.k8s.io/controller-runtime" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. + _ "k8s.io/client-go/plugin/pkg/client/auth" + + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/kcp" @@ -196,13 +197,14 @@ func restConfigForAPIExport(ctx context.Context, cfg *rest.Config, apiExportName continue } - if !slices.Contains(apiExport.Spec.LatestResourceSchemas, "today.widgets.data.my.domain") { + setupLog.Info("APIExport event received", "name", apiExport.Name, "event", e.Type) + + if resources := apiExport.Spec.LatestResourceSchemas; apiExportName == "" && + (len(resources) == 0 || !strings.HasSuffix(resources[0], datav1alpha1.GroupVersion.Group)) { // This is not this controller APIExport continue } - setupLog.Info("APIExport event received", "name", apiExport.Name, "event", e.Type) - if !conditions.IsTrue(apiExport, apisv1alpha1.APIExportVirtualWorkspaceURLsReady) { setupLog.Info("APIExport virtual workspace URLs are not ready", "APIExport", apiExport.Name) continue From 7973a1339c8620882a99ee10b6c0d46b1f5df457 Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Wed, 30 Nov 2022 14:31:07 +0100 Subject: [PATCH 3/8] Retry APIExports watch when channel got closed on idle --- main.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index 6070e37..ca930fa 100644 --- a/main.go +++ b/main.go @@ -191,7 +191,16 @@ func restConfigForAPIExport(ctx context.Context, cfg *rest.Config, apiExportName select { case <-ctx.Done(): return nil, ctx.Err() - case e := <-watch.ResultChan(): + case e, ok := <-watch.ResultChan(): + if !ok { + // The channel has been closed. Let's retry watching in case it timed out on idle, + // or fail in case connection to the server cannot be re-established. + watch, err = apiExportClient.Watch(ctx, &apisv1alpha1.APIExportList{}, opts...) + if err != nil { + return nil, fmt.Errorf("error watching for APIExport: %w", err) + } + } + apiExport, ok := e.Object.(*apisv1alpha1.APIExport) if !ok { continue From f5d99541c66991c961b9a7af00032d40f3a559fc Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Wed, 30 Nov 2022 14:31:48 +0100 Subject: [PATCH 4/8] Close watch channel when context is done --- main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/main.go b/main.go index ca930fa..97085ff 100644 --- a/main.go +++ b/main.go @@ -190,6 +190,7 @@ func restConfigForAPIExport(ctx context.Context, cfg *rest.Config, apiExportName for { select { case <-ctx.Done(): + watch.Stop() return nil, ctx.Err() case e, ok := <-watch.ResultChan(): if !ok { From 5892587a7f03806dcf16826f88174be52aa857a1 Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Wed, 30 Nov 2022 14:34:11 +0100 Subject: [PATCH 5/8] Document blocking behavior of restConfigForAPIExport --- main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index 97085ff..2c37b54 100644 --- a/main.go +++ b/main.go @@ -163,7 +163,8 @@ func main() { // +kubebuilder:rbac:groups="apis.kcp.dev",resources=apiexports,verbs=get;list;watch // restConfigForAPIExport returns a *rest.Config properly configured to communicate with the endpoint for the -// APIExport's virtual workspace. +// APIExport's virtual workspace. It blocks until the controller APIExport VirtualWorkspaceURLsReady condition +// becomes truthy, which happens when the APIExport is bound for the first time. func restConfigForAPIExport(ctx context.Context, cfg *rest.Config, apiExportName string) (*rest.Config, error) { scheme := runtime.NewScheme() if err := apisv1alpha1.AddToScheme(scheme); err != nil { From 754a2c11b934fc9cec829790a3fd10c3222c7a41 Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Thu, 1 Dec 2022 10:10:59 +0100 Subject: [PATCH 6/8] Default APIExport name to avoid watching for multiple APIExports --- main.go | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/main.go b/main.go index 2c37b54..75e660e 100644 --- a/main.go +++ b/main.go @@ -21,7 +21,6 @@ import ( "flag" "fmt" "os" - "strings" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" @@ -69,7 +68,7 @@ func main() { var enableLeaderElection bool var probeAddr string var apiExportName string - flag.StringVar(&apiExportName, "api-export-name", "", "The name of the APIExport.") + flag.StringVar(&apiExportName, "api-export-name", "data.my.domain", "The name of the APIExport.") flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") flag.BoolVar(&enableLeaderElection, "leader-elect", false, @@ -176,14 +175,10 @@ func restConfigForAPIExport(ctx context.Context, cfg *rest.Config, apiExportName return nil, fmt.Errorf("error creating APIExport client: %w", err) } - var opts []client.ListOption - if apiExportName != "" { - opts = append(opts, client.MatchingFieldsSelector{ - Selector: fields.OneTermEqualSelector("metadata.name", apiExportName), - }) + selector := client.MatchingFieldsSelector{ + Selector: fields.OneTermEqualSelector("metadata.name", apiExportName), } - - watch, err := apiExportClient.Watch(ctx, &apisv1alpha1.APIExportList{}, opts...) + watch, err := apiExportClient.Watch(ctx, &apisv1alpha1.APIExportList{}, selector) if err != nil { return nil, fmt.Errorf("error watching for APIExport: %w", err) } @@ -197,7 +192,7 @@ func restConfigForAPIExport(ctx context.Context, cfg *rest.Config, apiExportName if !ok { // The channel has been closed. Let's retry watching in case it timed out on idle, // or fail in case connection to the server cannot be re-established. - watch, err = apiExportClient.Watch(ctx, &apisv1alpha1.APIExportList{}, opts...) + watch, err = apiExportClient.Watch(ctx, &apisv1alpha1.APIExportList{}, selector) if err != nil { return nil, fmt.Errorf("error watching for APIExport: %w", err) } @@ -210,12 +205,6 @@ func restConfigForAPIExport(ctx context.Context, cfg *rest.Config, apiExportName setupLog.Info("APIExport event received", "name", apiExport.Name, "event", e.Type) - if resources := apiExport.Spec.LatestResourceSchemas; apiExportName == "" && - (len(resources) == 0 || !strings.HasSuffix(resources[0], datav1alpha1.GroupVersion.Group)) { - // This is not this controller APIExport - continue - } - if !conditions.IsTrue(apiExport, apisv1alpha1.APIExportVirtualWorkspaceURLsReady) { setupLog.Info("APIExport virtual workspace URLs are not ready", "APIExport", apiExport.Name) continue From 51a87fc6f04f81bdca74b99cff1b1ac345fa17d9 Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Thu, 1 Dec 2022 10:11:24 +0100 Subject: [PATCH 7/8] Re-use global schema for APIExport watch client --- main.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/main.go b/main.go index 75e660e..60ad6a6 100644 --- a/main.go +++ b/main.go @@ -165,11 +165,6 @@ func main() { // APIExport's virtual workspace. It blocks until the controller APIExport VirtualWorkspaceURLsReady condition // becomes truthy, which happens when the APIExport is bound for the first time. func restConfigForAPIExport(ctx context.Context, cfg *rest.Config, apiExportName string) (*rest.Config, error) { - scheme := runtime.NewScheme() - if err := apisv1alpha1.AddToScheme(scheme); err != nil { - return nil, fmt.Errorf("error adding apis.kcp.dev/v1alpha1 to scheme: %w", err) - } - apiExportClient, err := client.NewWithWatch(cfg, client.Options{Scheme: scheme}) if err != nil { return nil, fmt.Errorf("error creating APIExport client: %w", err) From 1899c144c3a857d823e31ea101ce41a1f014061b Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Fri, 2 Dec 2022 12:20:39 +0100 Subject: [PATCH 8/8] Use client-go retry watcher to watch for APIExport --- main.go | 98 ++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 63 insertions(+), 35 deletions(-) diff --git a/main.go b/main.go index 60ad6a6..c5a220a 100644 --- a/main.go +++ b/main.go @@ -22,12 +22,16 @@ import ( "fmt" "os" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/discovery" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" + retrywatch "k8s.io/client-go/tools/watch" "k8s.io/klog/v2" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) @@ -170,51 +174,49 @@ func restConfigForAPIExport(ctx context.Context, cfg *rest.Config, apiExportName return nil, fmt.Errorf("error creating APIExport client: %w", err) } - selector := client.MatchingFieldsSelector{ - Selector: fields.OneTermEqualSelector("metadata.name", apiExportName), - } - watch, err := apiExportClient.Watch(ctx, &apisv1alpha1.APIExportList{}, selector) + list := &apisv1alpha1.APIExportList{} + selector := fields.OneTermEqualSelector("metadata.name", apiExportName) + err = apiExportClient.List(ctx, list, client.MatchingFieldsSelector{Selector: selector}) if err != nil { return nil, fmt.Errorf("error watching for APIExport: %w", err) } + if len(list.Items) > 0 && isAPIExportReady(&list.Items[0]) { + cfg = rest.CopyConfig(cfg) + // TODO: sharding support + cfg.Host = list.Items[0].Status.VirtualWorkspaces[0].URL + return cfg, nil + } + + setupLog.Info("Watching for APIExport to become ready", "name", apiExportName) + + rw, err := retrywatch.NewRetryWatcher(list.ResourceVersion, watcher(apiExportClient.Watch).FilteredBy(selector)) + if err != nil { + return nil, fmt.Errorf("error creating retry watcher for APIExport: %w", err) + } + defer rw.Stop() for { select { case <-ctx.Done(): - watch.Stop() return nil, ctx.Err() - case e, ok := <-watch.ResultChan(): - if !ok { - // The channel has been closed. Let's retry watching in case it timed out on idle, - // or fail in case connection to the server cannot be re-established. - watch, err = apiExportClient.Watch(ctx, &apisv1alpha1.APIExportList{}, selector) - if err != nil { - return nil, fmt.Errorf("error watching for APIExport: %w", err) + case e := <-rw.ResultChan(): + switch e.Type { + case watch.Error: + return nil, fmt.Errorf("error watching for APIExport: %w", apierrors.FromObject(e.Object)) + + case watch.Added, watch.Modified: + apiExport, ok := e.Object.(*apisv1alpha1.APIExport) + if !ok { + return nil, fmt.Errorf("unexpected event object: %v", e.Object) } + if !isAPIExportReady(apiExport) { + continue + } + cfg = rest.CopyConfig(cfg) + // TODO: sharding support + cfg.Host = apiExport.Status.VirtualWorkspaces[0].URL + return cfg, nil } - - apiExport, ok := e.Object.(*apisv1alpha1.APIExport) - if !ok { - continue - } - - setupLog.Info("APIExport event received", "name", apiExport.Name, "event", e.Type) - - if !conditions.IsTrue(apiExport, apisv1alpha1.APIExportVirtualWorkspaceURLsReady) { - setupLog.Info("APIExport virtual workspace URLs are not ready", "APIExport", apiExport.Name) - continue - } - - if len(apiExport.Status.VirtualWorkspaces) == 0 { - setupLog.Info("APIExport does not have any virtual workspace URLs", "APIExport", apiExport.Name) - continue - } - - setupLog.Info("Using APIExport to configure client", "APIExport", apiExport.Name) - cfg = rest.CopyConfig(cfg) - // TODO(ncdc): sharding support - cfg.Host = apiExport.Status.VirtualWorkspaces[0].URL - return cfg, nil } } } @@ -242,3 +244,29 @@ func kcpAPIsGroupPresent(restConfig *rest.Config) bool { } return false } + +func isAPIExportReady(apiExport *apisv1alpha1.APIExport) bool { + if !conditions.IsTrue(apiExport, apisv1alpha1.APIExportVirtualWorkspaceURLsReady) { + setupLog.Info("APIExport virtual workspace URLs are not ready", "APIExport", apiExport.Name) + return false + } + + if len(apiExport.Status.VirtualWorkspaces) == 0 { + setupLog.Info("APIExport does not have any virtual workspace URLs", "APIExport", apiExport.Name) + return false + } + + return true +} + +type watcher func(ctx context.Context, obj client.ObjectList, opts ...client.ListOption) (watch.Interface, error) + +func (w watcher) Watch(options metav1.ListOptions) (watch.Interface, error) { + return w(context.TODO(), &apisv1alpha1.APIExportList{}, &client.ListOptions{Raw: &options}) +} + +func (w watcher) FilteredBy(selector fields.Selector) watcher { + return func(ctx context.Context, obj client.ObjectList, opts ...client.ListOption) (watch.Interface, error) { + return w(ctx, obj, append(opts, client.MatchingFieldsSelector{Selector: selector})...) + } +}