Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 87 additions & 15 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@
version = "0.0.1"

[[constraint]]
branch = "master"
name = "k8s.io/api"
version = "kubernetes-1.8.2"

[[constraint]]
name = "k8s.io/apimachinery"
version = "kubernetes-1.8.2"

[[constraint]]
name = "k8s.io/client-go"
version = "6.0.0"
version = "kubernetes-1.8.2"
96 changes: 96 additions & 0 deletions pkg/k8sclient/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package k8sclient

import (
"fmt"
"net"
"os"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

var (
restMapper *discovery.DeferredDiscoveryRESTMapper
clientPool dynamic.ClientPool
)

// init initializes the restMapper and clientPool needed to create a resource client dynamically
func init() {
kubeClient, kubeConfig := mustNewKubeClientAndConfig()
cachedDiscoveryClient := cached.NewMemCacheClient(kubeClient.Discovery())
restMapper = discovery.NewDeferredDiscoveryRESTMapper(cachedDiscoveryClient, meta.InterfacesForUnstructured)
restMapper.Reset()
kubeConfig.ContentConfig = dynamic.ContentConfig()
clientPool = dynamic.NewClientPool(kubeConfig, restMapper, dynamic.LegacyAPIPathResolverFunc)
}

// GetResourceClient returns the dynamic client and pluralName for the resource specified by the apiVersion and kind
func GetResourceClient(apiVersion, kind, namespace string) (dynamic.ResourceInterface, string, error) {
gv, err := schema.ParseGroupVersion(apiVersion)
if err != nil {
return nil, "", fmt.Errorf("failed to parse apiVersion: %v", err)
}
gvk := schema.GroupVersionKind{
Group: gv.Group,
Version: gv.Version,
Kind: kind,
}

client, err := clientPool.ClientForGroupVersionKind(gvk)
if err != nil {
return nil, "", fmt.Errorf("failed to get client for GroupVersionKind(%s): %v", gvk.String(), err)
}
resource, err := apiResource(gvk, restMapper)
if err != nil {
return nil, "", fmt.Errorf("failed to get resource type: %v", err)
}
pluralName := resource.Name
resourceClient := client.Resource(resource, namespace)
return resourceClient, pluralName, nil
}

// apiResource consults the REST mapper to translate an <apiVersion, kind, namespace> tuple to a metav1.APIResource struct.
func apiResource(gvk schema.GroupVersionKind, restMapper *discovery.DeferredDiscoveryRESTMapper) (*metav1.APIResource, error) {
mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, fmt.Errorf("failed to get the resource REST mapping for GroupVersionKind(%s): %v", gvk.String(), err)
}
resource := &metav1.APIResource{
Name: mapping.Resource,
Namespaced: mapping.Scope == meta.RESTScopeNamespace,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just curious when will an APIResource not be in a Namespace?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it's not common but if you watch cluster-wide resources like StorageClass or ClusterRole then those are non-namespaced.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks for the clarification!

Kind: gvk.Kind,
}
return resource, nil
}

// mustNewKubeClientAndConfig returns the in-cluster config and kubernetes client
func mustNewKubeClientAndConfig() (kubernetes.Interface, *rest.Config) {
cfg, err := inClusterConfig()
if err != nil {
panic(err)
}
return kubernetes.NewForConfigOrDie(cfg), cfg
}

// inClusterConfig returns the in-cluster config accessible inside a pod
func inClusterConfig() (*rest.Config, error) {
// Work around https://github.com/kubernetes/kubernetes/issues/40973
// See https://github.com/coreos/etcd-operator/issues/731#issuecomment-283804819
if len(os.Getenv("KUBERNETES_SERVICE_HOST")) == 0 {
addrs, err := net.LookupHost("kubernetes.default.svc")
if err != nil {
panic(err)
}
os.Setenv("KUBERNETES_SERVICE_HOST", addrs[0])
}
if len(os.Getenv("KUBERNETES_SERVICE_PORT")) == 0 {
os.Setenv("KUBERNETES_SERVICE_PORT", "443")
}
return rest.InClusterConfig()
}
28 changes: 13 additions & 15 deletions pkg/sdk/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ package sdk
import (
"context"

"github.com/coreos/operator-sdk/pkg/k8sclient"
sdkHandler "github.com/coreos/operator-sdk/pkg/sdk/handler"
sdkInformer "github.com/coreos/operator-sdk/pkg/sdk/informer"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
)

var (
Expand All @@ -17,13 +14,18 @@ var (
)

// Watch watches for changes on the given resource.
// obj is an instance of the resource type, e.g. &Pod{}.
// resourcePluralName is the plural name of the resource, e.g. “pods”.
// resourceClient is the rest client for the resource, e.g. `kubeclient.CoreV1().RESTClient()`.
// opts provide more options for doing the watch.
// apiVersion for a resource is of the format "Group/Version" except for the "Core" group whose APIVersion is just "v1". For e.g:
// - Deployments have Group "apps" and Version "v1beta2" giving the APIVersion "apps/v1beta2"
// - Pods have Group "Core" and Version "v1" giving the APIVersion "v1"
// - The custom resource Memcached might have Group "cache.example.com" and Version "v1alpha1" giving the APIVersion "cache.example.com/v1alpha1"
// kind is the Kind of the resource, e.g "Pod" for pods
// Consult the API reference for the Group, Version and Kind of a resource: https://kubernetes.io/docs/reference/
// namespace is the Namespace to watch for the resource
// TODO: support opts for specifying label selector
func Watch(resourcePluralName, namespace string, obj runtime.Object, resourceClient rest.Interface) {
informer := sdkInformer.New(resourcePluralName, namespace, obj, resourceClient)
func Watch(apiVersion, kind, namespace string) {
// TODO: Error handling for watch failure
resourceClient, resourcePluralName, _ := k8sclient.GetResourceClient(apiVersion, kind, namespace)
informer := sdkInformer.New(resourcePluralName, namespace, resourceClient)
informers = append(informers, informer)
}

Expand All @@ -36,11 +38,7 @@ func Handle(handler sdkHandler.Handler) {
// Run starts the process of Watching resources, handling Events, and processing Actions
func Run(ctx context.Context) {
for _, informer := range informers {
err := informer.Run(ctx)
if err != nil {
logrus.Errorf("failed to run informer: %v", err)
return
}
go informer.Run(ctx)
}
<-ctx.Done()
}
Loading