diff --git a/main.go b/main.go index 2144da6..c5a220a 100644 --- a/main.go +++ b/main.go @@ -22,30 +22,35 @@ import ( "fmt" "os" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/discovery" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" + retrywatch "k8s.io/client-go/tools/watch" "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/client" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. _ "k8s.io/client-go/plugin/pkg/client/auth" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/kcp" "sigs.k8s.io/controller-runtime/pkg/log/zap" - datav1alpha1 "github.com/kcp-dev/controller-runtime-example/api/v1alpha1" + apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1" + "github.com/kcp-dev/kcp/pkg/apis/third_party/conditions/util/conditions" // +kubebuilder:scaffold:imports + datav1alpha1 "github.com/kcp-dev/controller-runtime-example/api/v1alpha1" "github.com/kcp-dev/controller-runtime-example/controllers" - - apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1" ) var ( @@ -67,7 +72,7 @@ func main() { var enableLeaderElection bool var probeAddr string var apiExportName string - flag.StringVar(&apiExportName, "api-export-name", "", "The name of the APIExport.") + flag.StringVar(&apiExportName, "api-export-name", "data.my.domain", "The name of the APIExport.") flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") flag.BoolVar(&enableLeaderElection, "leader-elect", false, @@ -161,48 +166,59 @@ func main() { // +kubebuilder:rbac:groups="apis.kcp.dev",resources=apiexports,verbs=get;list;watch // restConfigForAPIExport returns a *rest.Config properly configured to communicate with the endpoint for the -// APIExport's virtual workspace. +// APIExport's virtual workspace. It blocks until the controller APIExport VirtualWorkspaceURLsReady condition +// becomes truthy, which happens when the APIExport is bound for the first time. func restConfigForAPIExport(ctx context.Context, cfg *rest.Config, apiExportName string) (*rest.Config, error) { - scheme := runtime.NewScheme() - if err := apisv1alpha1.AddToScheme(scheme); err != nil { - return nil, fmt.Errorf("error adding apis.kcp.dev/v1alpha1 to scheme: %w", err) - } - - apiExportClient, err := client.New(cfg, client.Options{Scheme: scheme}) + apiExportClient, err := client.NewWithWatch(cfg, client.Options{Scheme: scheme}) if err != nil { return nil, fmt.Errorf("error creating APIExport client: %w", err) } - var apiExport apisv1alpha1.APIExport - - if apiExportName != "" { - if err := apiExportClient.Get(ctx, types.NamespacedName{Name: apiExportName}, &apiExport); err != nil { - return nil, fmt.Errorf("error getting APIExport %q: %w", apiExportName, err) - } - } else { - setupLog.Info("api-export-name is empty - listing") - exports := &apisv1alpha1.APIExportList{} - if err := apiExportClient.List(ctx, exports); err != nil { - return nil, fmt.Errorf("error listing APIExports: %w", err) - } - if len(exports.Items) == 0 { - return nil, fmt.Errorf("no APIExport found") - } - if len(exports.Items) > 1 { - return nil, fmt.Errorf("more than one APIExport found") - } - apiExport = exports.Items[0] + list := &apisv1alpha1.APIExportList{} + selector := fields.OneTermEqualSelector("metadata.name", apiExportName) + err = apiExportClient.List(ctx, list, client.MatchingFieldsSelector{Selector: selector}) + if err != nil { + return nil, fmt.Errorf("error watching for APIExport: %w", err) } - - if len(apiExport.Status.VirtualWorkspaces) < 1 { - return nil, fmt.Errorf("APIExport %q status.virtualWorkspaces is empty", apiExportName) + if len(list.Items) > 0 && isAPIExportReady(&list.Items[0]) { + cfg = rest.CopyConfig(cfg) + // TODO: sharding support + cfg.Host = list.Items[0].Status.VirtualWorkspaces[0].URL + return cfg, nil } - cfg = rest.CopyConfig(cfg) - // TODO(ncdc): sharding support - cfg.Host = apiExport.Status.VirtualWorkspaces[0].URL + setupLog.Info("Watching for APIExport to become ready", "name", apiExportName) - return cfg, nil + rw, err := retrywatch.NewRetryWatcher(list.ResourceVersion, watcher(apiExportClient.Watch).FilteredBy(selector)) + if err != nil { + return nil, fmt.Errorf("error creating retry watcher for APIExport: %w", err) + } + defer rw.Stop() + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case e := <-rw.ResultChan(): + switch e.Type { + case watch.Error: + return nil, fmt.Errorf("error watching for APIExport: %w", apierrors.FromObject(e.Object)) + + case watch.Added, watch.Modified: + apiExport, ok := e.Object.(*apisv1alpha1.APIExport) + if !ok { + return nil, fmt.Errorf("unexpected event object: %v", e.Object) + } + if !isAPIExportReady(apiExport) { + continue + } + cfg = rest.CopyConfig(cfg) + // TODO: sharding support + cfg.Host = apiExport.Status.VirtualWorkspaces[0].URL + return cfg, nil + } + } + } } func kcpAPIsGroupPresent(restConfig *rest.Config) bool { @@ -228,3 +244,29 @@ func kcpAPIsGroupPresent(restConfig *rest.Config) bool { } return false } + +func isAPIExportReady(apiExport *apisv1alpha1.APIExport) bool { + if !conditions.IsTrue(apiExport, apisv1alpha1.APIExportVirtualWorkspaceURLsReady) { + setupLog.Info("APIExport virtual workspace URLs are not ready", "APIExport", apiExport.Name) + return false + } + + if len(apiExport.Status.VirtualWorkspaces) == 0 { + setupLog.Info("APIExport does not have any virtual workspace URLs", "APIExport", apiExport.Name) + return false + } + + return true +} + +type watcher func(ctx context.Context, obj client.ObjectList, opts ...client.ListOption) (watch.Interface, error) + +func (w watcher) Watch(options metav1.ListOptions) (watch.Interface, error) { + return w(context.TODO(), &apisv1alpha1.APIExportList{}, &client.ListOptions{Raw: &options}) +} + +func (w watcher) FilteredBy(selector fields.Selector) watcher { + return func(ctx context.Context, obj client.ObjectList, opts ...client.ListOption) (watch.Interface, error) { + return w(ctx, obj, append(opts, client.MatchingFieldsSelector{Selector: selector})...) + } +}