From b47f869e840b801f21be87836182dc5f703e3da6 Mon Sep 17 00:00:00 2001 From: hasbro17 Date: Mon, 19 Feb 2018 22:20:51 -0800 Subject: [PATCH 1/2] dep: add constraints --- Gopkg.lock | 102 +++++++++++++++++++++++++++++++++++++++++++++-------- Gopkg.toml | 8 +++-- 2 files changed, 93 insertions(+), 17 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 4ffba03485..a9d052f678 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1,18 +1,66 @@ # This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. +[[projects]] + name = "github.com/PuerkitoBio/purell" + packages = ["."] + revision = "0bcb03f4b4d0a9428594752bd2a3b9aa0a9d4bd4" + version = "v1.1.0" + +[[projects]] + branch = "master" + name = "github.com/PuerkitoBio/urlesc" + packages = ["."] + revision = "de5bf2ad457846296e2031421a34e2568e304e35" + [[projects]] name = "github.com/davecgh/go-spew" packages = ["spew"] revision = "346938d642f2ec3594ed81d874461961cd0faa76" version = "v1.1.0" +[[projects]] + name = "github.com/emicklei/go-restful" + packages = [".","log"] + revision = "26b41036311f2da8242db402557a0dbd09dc83da" + version = "v2.6.0" + +[[projects]] + name = "github.com/emicklei/go-restful-swagger12" + packages = ["."] + revision = "dcef7f55730566d41eae5db10e7d6981829720f6" + version = "1.0.1" + [[projects]] name = "github.com/ghodss/yaml" packages = ["."] revision = "0ca9ea5df5451ffdf184b4428c902747c2c11cd7" version = "v1.0.0" +[[projects]] + branch = "master" + name = "github.com/go-openapi/jsonpointer" + packages = ["."] + revision = "779f45308c19820f1a69e9a4cd965f496e0da10f" + +[[projects]] + branch = "master" + name = "github.com/go-openapi/jsonreference" + packages = ["."] + revision = "36d33bfe519efae5632669801b180bf1a245da3b" + +[[projects]] + branch = "master" + name = "github.com/go-openapi/spec" + packages = ["."] + revision = "1de3e0542de65ad8d75452a595886fdd0befb363" + +[[projects]] + branch = "master" + name = "github.com/go-openapi/swag" + packages = ["."] + revision = "84f4bee7c0a6db40e3166044c7983c1c32125429" + [[projects]] name = "github.com/gogo/protobuf" packages = ["proto","sortkeys"] @@ -25,6 +73,12 @@ packages = ["."] revision = "23def4e6c14b4da8ac2ed8007337bc5eb5007998" +[[projects]] + name = "github.com/golang/protobuf" + packages = ["proto","ptypes","ptypes/any","ptypes/duration","ptypes/timestamp"] + revision = "925541529c1fa6821df4e44ce2723319eb2be768" + version = "v1.0.0" + [[projects]] branch = "master" name = "github.com/google/btree" @@ -37,6 +91,12 @@ packages = ["."] revision = "24818f796faf91cd76ec7bddd72458fbced7a6c1" +[[projects]] + name = "github.com/googleapis/gnostic" + packages = ["OpenAPIv2","compiler","extensions"] + revision = "ee43cbb60db7bd22502942cccbc39059117352ab" + version = "v0.1.0" + [[projects]] branch = "master" name = "github.com/gregjones/httpcache" @@ -58,8 +118,8 @@ [[projects]] name = "github.com/json-iterator/go" packages = ["."] - revision = "28452fcdec4e44348d2af0d91d1e9e38da3a9e0a" - version = "1.0.5" + revision = "e7c7f3b33712573affdcc7a107218e7926b9a05b" + version = "1.0.6" [[projects]] name = "github.com/juju/ratelimit" @@ -67,6 +127,12 @@ revision = "59fac5042749a5afb9af70e813da1dd5474f0167" version = "1.0.1" +[[projects]] + branch = "master" + name = "github.com/mailru/easyjson" + packages = ["buffer","jlexer","jwriter"] + revision = "32fa128f234d041f196a9f3e0fea5ac9772c08e1" + [[projects]] branch = "master" name = "github.com/petar/GoLLRB" @@ -101,13 +167,13 @@ branch = "master" name = "golang.org/x/crypto" packages = ["ssh/terminal"] - revision = "650f4a345ab4e5b245a3034b110ebc7299e68186" + revision = "432090b8f568c018896cd8a0fb0345872bbac6ce" [[projects]] branch = "master" name = "golang.org/x/net" packages = ["context","http2","http2/hpack","idna","lex/httplex"] - revision = "dc948dff8834a7fe1ca525f8d04e261c2b56e70d" + revision = "cbe0f9307d0156177f9dd5dc85da1a31abc5f2fb" [[projects]] branch = "master" @@ -118,7 +184,7 @@ [[projects]] branch = "master" name = "golang.org/x/text" - packages = ["collate","collate/build","internal/colltab","internal/gen","internal/tag","internal/triegen","internal/ucd","language","secure/bidirule","transform","unicode/bidi","unicode/cldr","unicode/norm","unicode/rangetable"] + packages = ["collate","collate/build","internal/colltab","internal/gen","internal/tag","internal/triegen","internal/ucd","language","secure/bidirule","transform","unicode/bidi","unicode/cldr","unicode/norm","unicode/rangetable","width"] revision = "4e4a3210bb54bb31f6ab2cdca2edcc0b50c420c1" [[projects]] @@ -134,26 +200,32 @@ revision = "d670f9405373e636a5a2765eea47fac0c9bc91a4" [[projects]] - branch = "master" name = "k8s.io/api" - packages = ["core/v1"] - revision = "0d0b2f481328d8bae556061a08a02175054059f4" + packages = ["admissionregistration/v1alpha1","apps/v1beta1","apps/v1beta2","authentication/v1","authentication/v1beta1","authorization/v1","authorization/v1beta1","autoscaling/v1","autoscaling/v2beta1","batch/v1","batch/v1beta1","batch/v2alpha1","certificates/v1beta1","core/v1","extensions/v1beta1","networking/v1","policy/v1beta1","rbac/v1","rbac/v1alpha1","rbac/v1beta1","scheduling/v1alpha1","settings/v1alpha1","storage/v1","storage/v1beta1"] + revision = "4df58c811fe2e65feb879227b2b245e4dc26e7ad" + version = "kubernetes-1.8.2" [[projects]] - branch = "master" name = "k8s.io/apimachinery" - packages = ["pkg/api/errors","pkg/api/meta","pkg/api/resource","pkg/apis/meta/internalversion","pkg/apis/meta/v1","pkg/apis/meta/v1/unstructured","pkg/apis/meta/v1beta1","pkg/conversion","pkg/conversion/queryparams","pkg/fields","pkg/labels","pkg/runtime","pkg/runtime/schema","pkg/runtime/serializer","pkg/runtime/serializer/json","pkg/runtime/serializer/protobuf","pkg/runtime/serializer/recognizer","pkg/runtime/serializer/streaming","pkg/runtime/serializer/versioning","pkg/selection","pkg/types","pkg/util/cache","pkg/util/clock","pkg/util/diff","pkg/util/errors","pkg/util/framer","pkg/util/intstr","pkg/util/json","pkg/util/net","pkg/util/runtime","pkg/util/sets","pkg/util/validation","pkg/util/validation/field","pkg/util/wait","pkg/util/yaml","pkg/version","pkg/watch","third_party/forked/golang/reflect"] - revision = "da3b134bab57ebf23dedfec7e0004614dd4b5a24" + packages = ["pkg/api/equality","pkg/api/errors","pkg/api/meta","pkg/api/resource","pkg/apis/meta/internalversion","pkg/apis/meta/v1","pkg/apis/meta/v1/unstructured","pkg/apis/meta/v1alpha1","pkg/conversion","pkg/conversion/queryparams","pkg/conversion/unstructured","pkg/fields","pkg/labels","pkg/runtime","pkg/runtime/schema","pkg/runtime/serializer","pkg/runtime/serializer/json","pkg/runtime/serializer/protobuf","pkg/runtime/serializer/recognizer","pkg/runtime/serializer/streaming","pkg/runtime/serializer/versioning","pkg/selection","pkg/types","pkg/util/cache","pkg/util/clock","pkg/util/diff","pkg/util/errors","pkg/util/framer","pkg/util/intstr","pkg/util/json","pkg/util/net","pkg/util/runtime","pkg/util/sets","pkg/util/validation","pkg/util/validation/field","pkg/util/wait","pkg/util/yaml","pkg/version","pkg/watch","third_party/forked/golang/reflect"] + revision = "019ae5ada31de202164b118aee88ee2d14075c31" + version = "kubernetes-1.8.2" [[projects]] name = "k8s.io/client-go" - packages = ["pkg/version","rest","rest/watch","tools/cache","tools/clientcmd/api","tools/metrics","tools/pager","transport","util/buffer","util/cert","util/flowcontrol","util/integer","util/workqueue"] - revision = "78700dec6369ba22221b72770783300f143df150" - version = "v6.0.0" + packages = ["discovery","discovery/cached","dynamic","kubernetes","kubernetes/scheme","kubernetes/typed/admissionregistration/v1alpha1","kubernetes/typed/apps/v1beta1","kubernetes/typed/apps/v1beta2","kubernetes/typed/authentication/v1","kubernetes/typed/authentication/v1beta1","kubernetes/typed/authorization/v1","kubernetes/typed/authorization/v1beta1","kubernetes/typed/autoscaling/v1","kubernetes/typed/autoscaling/v2beta1","kubernetes/typed/batch/v1","kubernetes/typed/batch/v1beta1","kubernetes/typed/batch/v2alpha1","kubernetes/typed/certificates/v1beta1","kubernetes/typed/core/v1","kubernetes/typed/extensions/v1beta1","kubernetes/typed/networking/v1","kubernetes/typed/policy/v1beta1","kubernetes/typed/rbac/v1","kubernetes/typed/rbac/v1alpha1","kubernetes/typed/rbac/v1beta1","kubernetes/typed/scheduling/v1alpha1","kubernetes/typed/settings/v1alpha1","kubernetes/typed/storage/v1","kubernetes/typed/storage/v1beta1","pkg/version","rest","rest/watch","tools/cache","tools/clientcmd/api","tools/metrics","tools/pager","tools/reference","transport","util/cert","util/flowcontrol","util/integer","util/workqueue"] + revision = "35ccd4336052e7d73018b1382413534936f34eee" + version = "kubernetes-1.8.2" + +[[projects]] + branch = "master" + name = "k8s.io/kube-openapi" + packages = ["pkg/common"] + revision = "50ae88d24ede7b8bad68e23c805b5d3da5c8abaf" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "ea8e5405c04eb9033cac9e3b2c12053ea5a964d910643aef3ca430ab915c2231" + inputs-digest = "d9b4bcb6e163b85ca0485cb41c0f19fa40f52838b597fec53cf6a8f3718d9c99" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 9317d28bf1..e17076df37 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -7,9 +7,13 @@ version = "0.0.1" [[constraint]] - branch = "master" + name = "k8s.io/api" + version = "kubernetes-1.8.2" + +[[constraint]] name = "k8s.io/apimachinery" + version = "kubernetes-1.8.2" [[constraint]] name = "k8s.io/client-go" - version = "6.0.0" + version = "kubernetes-1.8.2" From 326984e1d5a8931009253ef3bd4a448fdc1a3beb Mon Sep 17 00:00:00 2001 From: hasbro17 Date: Wed, 21 Feb 2018 10:49:03 -0800 Subject: [PATCH 2/2] *: create dynamic client for watching a resource --- pkg/k8sclient/client.go | 96 ++++++++++++++++++++++++++++++++++++ pkg/sdk/api.go | 28 +++++------ pkg/sdk/informer/informer.go | 32 +++++++----- 3 files changed, 130 insertions(+), 26 deletions(-) create mode 100644 pkg/k8sclient/client.go diff --git a/pkg/k8sclient/client.go b/pkg/k8sclient/client.go new file mode 100644 index 0000000000..0f97db340b --- /dev/null +++ b/pkg/k8sclient/client.go @@ -0,0 +1,96 @@ +package k8sclient + +import ( + "fmt" + "net" + "os" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" + "k8s.io/client-go/discovery/cached" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +var ( + restMapper *discovery.DeferredDiscoveryRESTMapper + clientPool dynamic.ClientPool +) + +// init initializes the restMapper and clientPool needed to create a resource client dynamically +func init() { + kubeClient, kubeConfig := mustNewKubeClientAndConfig() + cachedDiscoveryClient := cached.NewMemCacheClient(kubeClient.Discovery()) + restMapper = discovery.NewDeferredDiscoveryRESTMapper(cachedDiscoveryClient, meta.InterfacesForUnstructured) + restMapper.Reset() + kubeConfig.ContentConfig = dynamic.ContentConfig() + clientPool = dynamic.NewClientPool(kubeConfig, restMapper, dynamic.LegacyAPIPathResolverFunc) +} + +// GetResourceClient returns the dynamic client and pluralName for the resource specified by the apiVersion and kind +func GetResourceClient(apiVersion, kind, namespace string) (dynamic.ResourceInterface, string, error) { + gv, err := schema.ParseGroupVersion(apiVersion) + if err != nil { + return nil, "", fmt.Errorf("failed to parse apiVersion: %v", err) + } + gvk := schema.GroupVersionKind{ + Group: gv.Group, + Version: gv.Version, + Kind: kind, + } + + client, err := clientPool.ClientForGroupVersionKind(gvk) + if err != nil { + return nil, "", fmt.Errorf("failed to get client for GroupVersionKind(%s): %v", gvk.String(), err) + } + resource, err := apiResource(gvk, restMapper) + if err != nil { + return nil, "", fmt.Errorf("failed to get resource type: %v", err) + } + pluralName := resource.Name + resourceClient := client.Resource(resource, namespace) + return resourceClient, pluralName, nil +} + +// apiResource consults the REST mapper to translate an tuple to a metav1.APIResource struct. +func apiResource(gvk schema.GroupVersionKind, restMapper *discovery.DeferredDiscoveryRESTMapper) (*metav1.APIResource, error) { + mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return nil, fmt.Errorf("failed to get the resource REST mapping for GroupVersionKind(%s): %v", gvk.String(), err) + } + resource := &metav1.APIResource{ + Name: mapping.Resource, + Namespaced: mapping.Scope == meta.RESTScopeNamespace, + Kind: gvk.Kind, + } + return resource, nil +} + +// mustNewKubeClientAndConfig returns the in-cluster config and kubernetes client +func mustNewKubeClientAndConfig() (kubernetes.Interface, *rest.Config) { + cfg, err := inClusterConfig() + if err != nil { + panic(err) + } + return kubernetes.NewForConfigOrDie(cfg), cfg +} + +// inClusterConfig returns the in-cluster config accessible inside a pod +func inClusterConfig() (*rest.Config, error) { + // Work around https://github.com/kubernetes/kubernetes/issues/40973 + // See https://github.com/coreos/etcd-operator/issues/731#issuecomment-283804819 + if len(os.Getenv("KUBERNETES_SERVICE_HOST")) == 0 { + addrs, err := net.LookupHost("kubernetes.default.svc") + if err != nil { + panic(err) + } + os.Setenv("KUBERNETES_SERVICE_HOST", addrs[0]) + } + if len(os.Getenv("KUBERNETES_SERVICE_PORT")) == 0 { + os.Setenv("KUBERNETES_SERVICE_PORT", "443") + } + return rest.InClusterConfig() +} diff --git a/pkg/sdk/api.go b/pkg/sdk/api.go index 2743f54c77..e1f786651e 100644 --- a/pkg/sdk/api.go +++ b/pkg/sdk/api.go @@ -3,12 +3,9 @@ package sdk import ( "context" + "github.com/coreos/operator-sdk/pkg/k8sclient" sdkHandler "github.com/coreos/operator-sdk/pkg/sdk/handler" sdkInformer "github.com/coreos/operator-sdk/pkg/sdk/informer" - - "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/rest" ) var ( @@ -17,13 +14,18 @@ var ( ) // Watch watches for changes on the given resource. -// obj is an instance of the resource type, e.g. &Pod{}. -// resourcePluralName is the plural name of the resource, e.g. “pods”. -// resourceClient is the rest client for the resource, e.g. `kubeclient.CoreV1().RESTClient()`. -// opts provide more options for doing the watch. +// apiVersion for a resource is of the format "Group/Version" except for the "Core" group whose APIVersion is just "v1". For e.g: +// - Deployments have Group "apps" and Version "v1beta2" giving the APIVersion "apps/v1beta2" +// - Pods have Group "Core" and Version "v1" giving the APIVersion "v1" +// - The custom resource Memcached might have Group "cache.example.com" and Version "v1alpha1" giving the APIVersion "cache.example.com/v1alpha1" +// kind is the Kind of the resource, e.g "Pod" for pods +// Consult the API reference for the Group, Version and Kind of a resource: https://kubernetes.io/docs/reference/ +// namespace is the Namespace to watch for the resource // TODO: support opts for specifying label selector -func Watch(resourcePluralName, namespace string, obj runtime.Object, resourceClient rest.Interface) { - informer := sdkInformer.New(resourcePluralName, namespace, obj, resourceClient) +func Watch(apiVersion, kind, namespace string) { + // TODO: Error handling for watch failure + resourceClient, resourcePluralName, _ := k8sclient.GetResourceClient(apiVersion, kind, namespace) + informer := sdkInformer.New(resourcePluralName, namespace, resourceClient) informers = append(informers, informer) } @@ -36,11 +38,7 @@ func Handle(handler sdkHandler.Handler) { // Run starts the process of Watching resources, handling Events, and processing Actions func Run(ctx context.Context) { for _, informer := range informers { - err := informer.Run(ctx) - if err != nil { - logrus.Errorf("failed to run informer: %v", err) - return - } + go informer.Run(ctx) } <-ctx.Done() } diff --git a/pkg/sdk/informer/informer.go b/pkg/sdk/informer/informer.go index b73975d36a..fe97a99737 100644 --- a/pkg/sdk/informer/informer.go +++ b/pkg/sdk/informer/informer.go @@ -2,20 +2,21 @@ package informer import ( "context" - "errors" "time" "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/fields" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/rest" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) type Informer interface { - Run(ctx context.Context) error + Run(ctx context.Context) } type informer struct { @@ -26,7 +27,7 @@ type informer struct { context context.Context } -func New(resourcePluralName, namespace string, objType runtime.Object, resourceClient rest.Interface) Informer { +func New(resourcePluralName, namespace string, resourceClient dynamic.ResourceInterface) Informer { i := &informer{ resourcePluralName: resourcePluralName, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), resourcePluralName), @@ -34,8 +35,7 @@ func New(resourcePluralName, namespace string, objType runtime.Object, resourceC } i.sharedIndexInformer = cache.NewSharedIndexInformer( - cache.NewListWatchFromClient(resourceClient, resourcePluralName, namespace, fields.Everything()), - objType, 0, cache.Indexers{}, + newListWatcherFromResourceClient(resourceClient), &unstructured.Unstructured{}, 0, cache.Indexers{}, ) i.sharedIndexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: i.handleAddResourceEvent, @@ -45,22 +45,32 @@ func New(resourcePluralName, namespace string, objType runtime.Object, resourceC return i } -func (i *informer) Run(ctx context.Context) error { +func newListWatcherFromResourceClient(resourceClient dynamic.ResourceInterface) *cache.ListWatch { + listFunc := func(options metav1.ListOptions) (runtime.Object, error) { + return resourceClient.List(options) + } + watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { + return resourceClient.Watch(options) + } + return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc} +} + +func (i *informer) Run(ctx context.Context) { i.context = ctx defer i.queue.ShutDown() - logrus.Info("starting %s controller", i.resourcePluralName) + logrus.Infof("starting %s controller", i.resourcePluralName) go i.sharedIndexInformer.Run(ctx.Done()) if !cache.WaitForCacheSync(ctx.Done(), i.sharedIndexInformer.HasSynced) { - return errors.New("Timed out waiting for caches to sync") + panic("Timed out waiting for caches to sync") } const numWorkers = 1 for n := 0; n < numWorkers; n++ { go wait.Until(i.runWorker, time.Second, ctx.Done()) } - return nil + logrus.Infof("stopping %s controller", i.resourcePluralName) } func (i *informer) handleAddResourceEvent(obj interface{}) {