Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

136 changes: 136 additions & 0 deletions cmd/apiserver_receive_adapter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
Copyright 2019 The Knative Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"flag"
"time"

"k8s.io/client-go/rest"

// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

"github.com/knative/eventing/pkg/reconciler"

"github.com/kelseyhightower/envconfig"
"github.com/knative/eventing/pkg/adapter/apiserver"
"github.com/knative/eventing/pkg/kncloudevents"
"github.com/knative/pkg/apis/duck"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
kncontroller "github.com/knative/pkg/controller"
"github.com/knative/pkg/signals"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
)

var (
masterURL = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
)

type envConfig struct {
SinkURI string `split_words:"true" required:"true"`
ApiVersion []string `split_words:"true" required:"true"`
Kind []string `required:"true"`
Controller []bool `required:"true"`
}

func main() {
flag.Parse()

logCfg := zap.NewProductionConfig()
logCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
dlogger, err := logCfg.Build()
logger := dlogger.Sugar()

var env envConfig
err = envconfig.Process("", &env)
if err != nil {
logger.Fatalw("Error processing environment", zap.Error(err))
}

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

cfg, err := clientcmd.BuildConfigFromFlags(*masterURL, *kubeconfig)
if err != nil {
logger.Fatalw("Error building kubeconfig", zap.Error(err))
}

logger = logger.With(zap.String("controller/apiserver", "adapter"))
logger.Info("Starting the controller")

numControllers := len(env.ApiVersion)
cfg.QPS = float32(numControllers) * rest.DefaultQPS
cfg.Burst = numControllers * rest.DefaultBurst
opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh)

client, err := dynamic.NewForConfig(cfg)
if err != nil {
logger.Fatalw("Error building dynamic client", zap.Error(err))
}

eventsClient, err := kncloudevents.NewDefaultClient(env.SinkURI)
if err != nil {
logger.Fatalw("Error building cloud event client", zap.Error(err))
}

controllers := []*kncontroller.Impl{}

// Create one controller per resource.
for i, apiVersion := range env.ApiVersion {
kind := env.Kind[i]
controlled := env.Controller[i]

obj := &duckv1alpha1.AddressableType{}

factory := duck.TypedInformerFactory{
Client: client,
ResyncPeriod: time.Duration(10), // TODO
StopChannel: stopCh,
Type: obj,
}

gv, err := schema.ParseGroupVersion(apiVersion)
if err != nil {
logger.Fatalw("Error parsing APIVersion", zap.Error(err))
}

gvk := schema.GroupVersionKind{Kind: kind, Group: gv.Group, Version: gv.Version}

// This is really bad.
gvr, _ := meta.UnsafeGuessKindToResource(gvk)

// Get and start the informer for gvr
logger.Infof("Starting informer for %v", gvk)
informer, lister, err := factory.Get(gvr)
if err != nil {
logger.Fatalw("Error starting informer", zap.Error(err))
}
controllers = append(controllers, apiserver.NewController(opt, informer, lister, eventsClient, controlled))
}

// Start all of the controllers.
logger.Info("Starting controllers.")
go kncontroller.StartAll(stopCh, controllers...)
<-stopCh
}
10 changes: 9 additions & 1 deletion cmd/sources-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/knative/eventing/pkg/logconfig"
"github.com/knative/eventing/pkg/logging"
"github.com/knative/eventing/pkg/reconciler"
"github.com/knative/eventing/pkg/reconciler/apiserversource"
"github.com/knative/eventing/pkg/reconciler/cronjobsource"
"github.com/knative/pkg/configmap"
kncontroller "github.com/knative/pkg/controller"
Expand Down Expand Up @@ -65,7 +66,7 @@ func main() {
logger = logger.With(zap.String("controller/impl", "pkg"))
logger.Info("Starting the controller")

const numControllers = 2
const numControllers = 3
cfg.QPS = numControllers * rest.DefaultQPS
cfg.Burst = numControllers * rest.DefaultBurst
opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh)
Expand All @@ -76,6 +77,7 @@ func main() {
// Eventing
cronJobSourceInformer := eventingInformerFactory.Sources().V1alpha1().CronJobSources()
containerSourceInformer := eventingInformerFactory.Sources().V1alpha1().ContainerSources()
apiserverSourceInformer := eventingInformerFactory.Sources().V1alpha1().ApiServerSources()

// Kube
deploymentInformer := kubeInformerFactory.Apps().V1().Deployments()
Expand All @@ -94,6 +96,11 @@ func main() {
containerSourceInformer,
deploymentInformer,
),
apiserversource.NewController(
opt,
apiserverSourceInformer,
deploymentInformer,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just pass the RA Image here instead of buried in the controller?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is a general issue applying to other sources, like cronjob

),
}
// This line asserts at compile time that the length of controllers is equal to numControllers.
// It is based on https://go101.org/article/tips.html#assert-at-compile-time, which notes that
Expand All @@ -117,6 +124,7 @@ func main() {
// Eventing
cronJobSourceInformer.Informer(),
containerSourceInformer.Informer(),
apiserverSourceInformer.Informer(),
// Kube
deploymentInformer.Informer(),
); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions config/200-controller-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,7 @@ rules:
- "containersources"
- "containersources/status"
- "containersources/finalizers"
- "apiserversources"
- "apiserversources/status"
- "apiserversources/finalizers"
verbs: *everything
88 changes: 88 additions & 0 deletions config/300-apiserversource.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Copyright 2019 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
creationTimestamp: null
labels:
eventing.knative.dev/source: "true"
knative.dev/crd-install: "true"
name: apiserversources.sources.eventing.knative.dev
spec:
group: sources.eventing.knative.dev
names:
categories:
- all
- knative
- eventing
- sources
kind: ApiServerSource
plural: apiserversources
scope: Namespaced
subresources:
status: {}
validation:
openAPIV3Schema:
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
properties:
serviceAccountName:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a "description" to all these fields? Most of the existing ones don't specify them, but we should so that you get a more descriptive message what these fields are. This can be done as a follow on, just jotting it down as I see them.

type: string
sink:
type: object
resources:
items:
properties:
apiVersion:
type: string
kind:
type: string
type: array
required:
- resources
- sink
type: object
status:
properties:
conditions:
items:
properties:
lastTransitionTime:
type: string
message:
type: string
reason:
type: string
severity:
type: string
status:
type: string
type:
type: string
required:
- type
- status
type: object
type: array
sinkUri:
type: string
type: object
version: v1alpha1
2 changes: 2 additions & 0 deletions config/500-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ spec:
value: eventing-broker-filter
- name: CRONJOB_RA_IMAGE
value: github.com/knative/eventing/cmd/cronjob_receive_adapter
- name: APISERVER_RA_IMAGE
value: github.com/knative/eventing/cmd/apiserver_receiver_adapter
ports:
- containerPort: 9090
name: metrics
Expand Down
2 changes: 2 additions & 0 deletions config/500-sources-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ spec:
# This is the Go import path for cron job receive adapter binary
# that is containerized and substituted here.
value: github.com/knative/eventing/cmd/cronjob_receive_adapter
- name: APISERVER_RA_IMAGE
value: github.com/knative/eventing/cmd/apiserver_receive_adapter
volumes:
- name: config-logging
configMap:
Expand Down
Loading