Skip to content
5 changes: 1 addition & 4 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 21 additions & 41 deletions cmd/apiserver_receive_adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,12 @@ package main

import (
"flag"
"time"

"k8s.io/client-go/rest"

// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

"github.com/knative/eventing/pkg/reconciler"

"github.com/kelseyhightower/envconfig"
"github.com/knative/eventing/pkg/adapter/apiserver"
"github.com/knative/eventing/pkg/kncloudevents"
"github.com/knative/pkg/apis/duck"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
kncontroller "github.com/knative/pkg/controller"
"github.com/knative/pkg/signals"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -48,12 +39,16 @@ var (
)

type envConfig struct {
Namespace string `envconfig:"SYSTEM_NAMESPACE" default:"default"`
Mode string `envconfig:"MODE"`
SinkURI string `split_words:"true" required:"true"`
ApiVersion []string `split_words:"true" required:"true"`
Kind []string `required:"true"`
Controller []bool `required:"true"`
}

// TODO: the controller should take the list of GVR

func main() {
flag.Parse()

Expand All @@ -79,11 +74,6 @@ func main() {
logger = logger.With(zap.String("controller/apiserver", "adapter"))
logger.Info("Starting the controller")

numControllers := len(env.ApiVersion)
cfg.QPS = float32(numControllers) * rest.DefaultQPS
cfg.Burst = numControllers * rest.DefaultBurst
opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh)

client, err := dynamic.NewForConfig(cfg)
if err != nil {
logger.Fatalw("Error building dynamic client", zap.Error(err))
Expand All @@ -94,43 +84,33 @@ func main() {
logger.Fatalw("Error building cloud event client", zap.Error(err))
}

controllers := []*kncontroller.Impl{}
gvrcs := []apiserver.GVRC(nil)

// Create one controller per resource.
for i, apiVersion := range env.ApiVersion {
kind := env.Kind[i]
controlled := env.Controller[i]

obj := &duckv1alpha1.AddressableType{}

factory := duck.TypedInformerFactory{
Client: client,
ResyncPeriod: time.Duration(10), // TODO
StopChannel: stopCh,
Type: obj,
}

gv, err := schema.ParseGroupVersion(apiVersion)
if err != nil {
logger.Fatalw("Error parsing APIVersion", zap.Error(err))
}
// TODO: pass down the resource and the kind so we do not have to guess.
gvr, _ := meta.UnsafeGuessKindToResource(schema.GroupVersionKind{Kind: kind, Group: gv.Group, Version: gv.Version})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re, the comment above, is there a TODO here?

gvrcs = append(gvrcs, apiserver.GVRC{
GVR: gvr,
Controller: controlled,
})
}

gvk := schema.GroupVersionKind{Kind: kind, Group: gv.Group, Version: gv.Version}

// This is really bad.
gvr, _ := meta.UnsafeGuessKindToResource(gvk)

// Get and start the informer for gvr
logger.Infof("Starting informer for %v", gvk)
informer, lister, err := factory.Get(gvr)
if err != nil {
logger.Fatalw("Error starting informer", zap.Error(err))
}
controllers = append(controllers, apiserver.NewController(opt, informer, lister, eventsClient, controlled))
opt := apiserver.Options{
Namespace: env.Namespace,
Comment thread
vaikas marked this conversation as resolved.
Mode: env.Mode,
GVRCs: gvrcs,
}

// Start all of the controllers.
logger.Info("Starting controllers.")
go kncontroller.StartAll(stopCh, controllers...)
<-stopCh
a := apiserver.NewAdaptor(cfg.Host, client, eventsClient, logger, opt)
logger.Info("starting kubernetes api adapter")
if err := a.Start(stopCh); err != nil {
logger.Warn("start returned an error,", zap.Error(err))
}
}
5 changes: 4 additions & 1 deletion config/300-apiserversource.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ spec:
sink:
type: object
description: "A reference to the object that should receive events."
mode:
type: string
description: "Mode controls the content of the event payload. One of: 'Ref' (only references of resources), 'Resource' (full resource)."
resources:
items:
properties:
Expand All @@ -61,7 +64,7 @@ spec:
description: "Kind of the objects to watch."
controller:
type: boolean
description: "If true, watch the managing controller. More info: https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/"
description: "If true, emits the managing controller ref. Only supported for mode=Ref. More info: https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/"
type: array
required:
- resources
Expand Down
199 changes: 115 additions & 84 deletions pkg/adapter/apiserver/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,113 +17,144 @@ limitations under the License.
package apiserver

import (
"context"

"github.com/cloudevents/sdk-go/pkg/cloudevents"
eventsclient "github.com/cloudevents/sdk-go/pkg/cloudevents/client"
"github.com/cloudevents/sdk-go/pkg/cloudevents/types"
"github.com/knative/eventing/pkg/reconciler"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
"github.com/knative/pkg/controller"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"fmt"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"

cloudevents "github.com/cloudevents/sdk-go"
"go.uber.org/zap"
)

const (
// ReconcilerName is the name of the reconciler
ReconcilerName = "ApiServerSource"
type Adapter interface {
Start(stopCh <-chan struct{}) error
}

controllerAgentName = "apiserver-source-adapter-controller"
updateEventType = "dev.knative.apiserver.object.update"
deleteEventType = "dev.knative.apiserver.object.delete"
const (
// RefMode produces payloads of ObjectReference
RefMode = "Ref"
// ResourceMode produces payloads of ResourceEvent
ResourceMode = "Resource"
)

// NewController initializes the controller and is called by the generated code
// Registers event handlers to enqueue events
func NewController(
opt reconciler.Options,
informer cache.SharedInformer,
lister cache.GenericLister,
eventsclient eventsclient.Client,
controlled bool) *controller.Impl {

r := &Reconciler{
Base: reconciler.NewBase(opt, controllerAgentName),
lister: lister,
eventsClient: eventsclient,
}
impl := controller.NewImpl(r, r.Logger, ReconcilerName, reconciler.MustNewStatsReporter(ReconcilerName, r.Logger))

r.Logger.Info("Setting up event handlers")
// Options hold the options for the Adapter.
type Options struct {
Mode string
Namespace string
GVRCs []GVRC
}

if controlled {
informer.AddEventHandler(reconciler.Handler(impl.EnqueueControllerOf))
} else {
informer.AddEventHandler(reconciler.Handler(impl.Enqueue))
}
return impl
// GVRC is a pairing of GroupVersionResource and Controller flag.
type GVRC struct {
GVR schema.GroupVersionResource
Controller bool
}

// Reconciler reconciles an ApiServerSource object
type Reconciler struct {
*reconciler.Base
type adapter struct {
gvrcs []GVRC
k8s dynamic.Interface
ce cloudevents.Client
source string
namespace string
logger *zap.SugaredLogger

eventsClient eventsclient.Client
lister cache.GenericLister
mode string
delegate eventDelegate
}

// Reconcile sends a cloud event corresponding to the given key
func (r *Reconciler) Reconcile(ctx context.Context, key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
r.Logger.Errorf("invalid resource key: %s", key)
return nil
func NewAdaptor(source string, k8sClient dynamic.Interface, ceClient cloudevents.Client, logger *zap.SugaredLogger, opt Options) Adapter {
mode := opt.Mode
switch mode {
case ResourceMode, RefMode:
// ok
default:
logger.Warn("unknown mode ", mode)
mode = RefMode
logger.Warn("defaulting mode to ", mode)
}

// Get the resource with this namespace/name
original, err := r.lister.ByNamespace(namespace).Get(name)
if apierrs.IsNotFound(err) {
// The resource may no longer exist, in which case we stop processing.
r.Logger.Error("resource key in work queue no longer exists", zap.Any("key", key))
return nil
} else if err != nil {
return err
a := &adapter{
k8s: k8sClient,
ce: ceClient,
source: source,
logger: logger,
gvrcs: opt.GVRCs,
namespace: opt.Namespace,
mode: mode,
}
return a
}

object := original.(*duckv1alpha1.AddressableType)
type eventDelegate interface {
cache.Store
addControllerWatch(gvr schema.GroupVersionResource)
}

eventType := updateEventType
timestamp := object.GetCreationTimestamp()
if object.GetDeletionTimestamp() != nil {
eventType = deleteEventType
timestamp = *object.GetDeletionTimestamp()
func (a *adapter) Start(stopCh <-chan struct{}) error {
// Local stop channel.
stop := make(chan struct{})

resyncPeriod := time.Duration(10 * time.Hour)

var d eventDelegate
switch a.mode {
case ResourceMode:
d = &resource{
ce: a.ce,
source: a.source,
logger: a.logger,
}

case RefMode:
d = &ref{
ce: a.ce,
source: a.source,
logger: a.logger,
}

default:
return fmt.Errorf("mode %q not understood", a.mode)
}

objectRef := corev1.ObjectReference{
APIVersion: object.APIVersion,
Kind: object.Kind,
Name: object.GetName(),
Namespace: object.GetNamespace(),
}
for _, gvrc := range a.gvrcs {
lw := &cache.ListWatch{
ListFunc: asUnstructuredLister(a.k8s.Resource(gvrc.GVR).Namespace(a.namespace).List),
WatchFunc: asUnstructuredWatcher(a.k8s.Resource(gvrc.GVR).Namespace(a.namespace).Watch),
}

event := cloudevents.Event{
Context: cloudevents.EventContextV02{
ID: string(object.GetUID()),
Type: eventType,
Source: *types.ParseURLRef(object.GetSelfLink()),
Time: &types.Timestamp{Time: timestamp.Time},
}.AsV02(),
Data: objectRef,
if gvrc.Controller {
d.addControllerWatch(gvrc.GVR)
}

reflector := cache.NewReflector(lw, &unstructured.Unstructured{}, d, resyncPeriod)
go reflector.Run(stop)
}

if _, err := r.eventsClient.Send(ctx, event); err != nil {
r.Logger.Error("failed to send cloudevent (retrying)", err)
<-stopCh
stop <- struct{}{}
return nil
}

type unstructuredLister func(metav1.ListOptions) (*unstructured.UnstructuredList, error)

return err
func asUnstructuredLister(ulist unstructuredLister) cache.ListFunc {
return func(opts metav1.ListOptions) (runtime.Object, error) {
ul, err := ulist(opts)
if err != nil {
return nil, err
}
return ul, nil
}
}

return nil
func asUnstructuredWatcher(wf cache.WatchFunc) cache.WatchFunc {
return func(lo metav1.ListOptions) (watch.Interface, error) {
return wf(lo)
}
}
Loading