Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
7ee77fe
first pass to update broker controller
nachocano Apr 19, 2019
f54fe07
starting informers properly, listening to own resources
nachocano Apr 19, 2019
68bb41d
fix and commenting out UTs
nachocano Apr 19, 2019
20d4e7d
space
nachocano Apr 19, 2019
4f66cf7
adding listers
nachocano Apr 22, 2019
8bc4c87
Merge remote-tracking branch 'upstream/master' into broker-controller
nachocano Apr 22, 2019
acf45b7
updating tests
nachocano Apr 22, 2019
fd0447a
Merge remote-tracking branch 'upstream/master' into broker-controller
nachocano Apr 22, 2019
eac14f9
more tests
nachocano Apr 23, 2019
9400105
adding deployment functional
nachocano Apr 23, 2019
4e692f0
compiling
nachocano Apr 23, 2019
56d5efa
more UTs
nachocano Apr 23, 2019
62a813a
more UTs
nachocano Apr 23, 2019
17d15ea
Merge branch 'broker-controller' of github.com:nachocano/eventing int…
nachocano Apr 23, 2019
592653e
more UTs
nachocano Apr 23, 2019
8bf83e9
more UTs
nachocano Apr 23, 2019
53ef443
more UTs
nachocano Apr 23, 2019
147c800
Merge remote-tracking branch 'upstream/master' into broker-controller
nachocano Apr 24, 2019
7ecd402
updating deps
nachocano Apr 24, 2019
f859c51
updating UTs
nachocano Apr 24, 2019
ef78bcb
updating UTs
nachocano Apr 24, 2019
b98fac2
Merge remote-tracking branch 'upstream/master' into broker-controller
nachocano Apr 24, 2019
60cd3fd
Merge remote-tracking branch 'upstream/master' into broker-controller
nachocano Apr 24, 2019
6b25869
Updating code gen
nachocano Apr 24, 2019
fe50c14
inlining method
nachocano Apr 24, 2019
da6314d
update after comment
nachocano Apr 24, 2019
d95ea3e
review comments
nachocano Apr 24, 2019
0b6f0bc
Merge branch 'broker-controller' of github.com:nachocano/eventing int…
nachocano Apr 24, 2019
da24fe0
serviceInformer and namespaceInformer
nachocano Apr 24, 2019
4e3ada7
attempt to fix
nachocano Apr 24, 2019
4c8c11a
solved
nachocano Apr 24, 2019
6f39fb1
Updates after code review
nachocano Apr 25, 2019
3dcf1a5
rebasing
nachocano Apr 25, 2019
768afe9
Updating gopkg.lock
nachocano Apr 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

122 changes: 17 additions & 105 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,16 @@ limitations under the License.
package main

import (
"context"
"flag"
"log"
"net/http"
"os"
"time"

"k8s.io/apimachinery/pkg/runtime"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
informers "github.com/knative/eventing/pkg/client/informers/externalversions"
"github.com/knative/eventing/pkg/logconfig"
"github.com/knative/eventing/pkg/logging"
Expand All @@ -42,35 +36,19 @@ import (
"github.com/knative/eventing/pkg/reconciler/subscription"
"github.com/knative/eventing/pkg/reconciler/trigger"
"github.com/knative/eventing/pkg/reconciler/v1alpha1/broker"
istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3"
"github.com/knative/pkg/configmap"
kncontroller "github.com/knative/pkg/controller"
"github.com/knative/pkg/logging/logkey"
"github.com/knative/pkg/signals"
"github.com/knative/pkg/system"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
controllerruntime "sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
)

const (
metricsScrapeAddr = ":9090"
metricsScrapePath = "/metrics"
)

var (
hardcodedLoggingConfig bool
)

// SchemeFunc adds types to a Scheme.
type SchemeFunc func(*runtime.Scheme) error

// ProvideFunc adds a controller to a Manager.
type ProvideFunc func(manager.Manager, *zap.Logger) (controller.Controller, error)

func main() {
flag.Parse()
logf.SetLogger(logf.ZapLogger(false))
Expand All @@ -87,16 +65,9 @@ func main() {
logger.Fatalf("Error building kubeconfig: %v", err)
}

go startPkgController(stopCh, cfg, logger, atomicLevel)
go startControllerRuntime(stopCh, cfg, logger, atomicLevel)
<-stopCh
}

func startPkgController(stopCh <-chan struct{}, cfg *rest.Config, logger *zap.SugaredLogger, atomicLevel zap.AtomicLevel) {
logger = logger.With(zap.String("controller/impl", "pkg"))
logger.Info("Starting the controller")

const numControllers = 4
const numControllers = 5
cfg.QPS = numControllers * rest.DefaultQPS
cfg.Burst = numControllers * rest.DefaultBurst
opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh)
Expand All @@ -114,6 +85,7 @@ func startPkgController(stopCh <-chan struct{}, cfg *rest.Config, logger *zap.Su
serviceInformer := kubeInformerFactory.Core().V1().Services()
Comment thread
nachocano marked this conversation as resolved.
namespaceInformer := kubeInformerFactory.Core().V1().Namespaces()
configMapInformer := kubeInformerFactory.Core().V1().ConfigMaps()
deploymentInformer := kubeInformerFactory.Apps().V1().Deployments()

// Build all of our controllers, with the clients constructed above.
// Add new controllers to this array.
Expand All @@ -139,6 +111,20 @@ func startPkgController(stopCh <-chan struct{}, cfg *rest.Config, logger *zap.Su
brokerInformer,
serviceInformer,
),
broker.NewController(
opt,
brokerInformer,
subscriptionInformer,
channelInformer,
serviceInformer,
deploymentInformer,
broker.ReconcilerArgs{
IngressImage: getRequiredEnv("BROKER_INGRESS_IMAGE"),
IngressServiceAccountName: getRequiredEnv("BROKER_INGRESS_SERVICE_ACCOUNT"),
FilterImage: getRequiredEnv("BROKER_FILTER_IMAGE"),
FilterServiceAccountName: getRequiredEnv("BROKER_FILTER_SERVICE_ACCOUNT"),
},
),
}
if len(controllers) != numControllers {
logger.Fatalf("Number of controllers and QPS settings mismatch: %d != %d", len(controllers), numControllers)
Expand All @@ -165,89 +151,15 @@ func startPkgController(stopCh <-chan struct{}, cfg *rest.Config, logger *zap.Su
configMapInformer.Informer(),
serviceInformer.Informer(),
namespaceInformer.Informer(),
deploymentInformer.Informer(),
); err != nil {
logger.Fatalf("Failed to start informers: %v", err)
}

// Start all of the controllers.
logger.Info("Starting controllers.")
go kncontroller.StartAll(stopCh, controllers...)
}

// TODO: remove after done integrating all controllers.
func startControllerRuntime(stopCh <-chan struct{}, cfg *rest.Config, logger *zap.SugaredLogger, atomicLevel zap.AtomicLevel) {
logger = logger.With(zap.String("controller/impl", "cr"))
logger.Info("Starting the controller")

kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
logger.Fatalf("Error building kubernetes clientset: %v", err)
}

// Watch the logging config map and dynamically update logging levels.
configMapWatcher := configmap.NewInformedWatcher(kubeClient, system.Namespace())
configMapWatcher.Watch(logconfig.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, logconfig.Controller))
if err = configMapWatcher.Start(stopCh); err != nil {
logger.Fatalf("Failed to start controller config map watcher: %v", err)
}

// Setup a Manager
mgr, err := manager.New(cfg, manager.Options{})
if err != nil {
logger.Fatalf("Failed to create manager: %v", err)
}

// Add custom types to this array to get them into the manager's scheme.
schemeFuncs := []SchemeFunc{
istiov1alpha3.AddToScheme,
eventingv1alpha1.AddToScheme,
}
for _, schemeFunc := range schemeFuncs {
if err = schemeFunc(mgr.GetScheme()); err != nil {
logger.Fatalf("Error adding type to manager's scheme: %v", err)
}
}

// Add each controller's ProvideController func to this list to have the
// manager run it.
providers := []ProvideFunc{
broker.ProvideController(
broker.ReconcilerArgs{
IngressImage: getRequiredEnv("BROKER_INGRESS_IMAGE"),
IngressServiceAccountName: getRequiredEnv("BROKER_INGRESS_SERVICE_ACCOUNT"),
FilterImage: getRequiredEnv("BROKER_FILTER_IMAGE"),
FilterServiceAccountName: getRequiredEnv("BROKER_FILTER_SERVICE_ACCOUNT"),
}),
}
for _, provider := range providers {
if _, err = provider(mgr, logger.Desugar()); err != nil {
logger.Fatalf("Error adding controller to manager: %v", err)
}
}

// Start the Manager
go func() {
if localErr := mgr.Start(stopCh); localErr != nil {
logger.Fatalf("Error starting manager: %v", localErr)
}
}()

// Start the endpoint that Prometheus scraper talks to
srv := &http.Server{Addr: metricsScrapeAddr}
http.Handle(metricsScrapePath, promhttp.Handler())
go func() {
logger.Infof("Starting metrics listener at %s", metricsScrapeAddr)
if localErr := srv.ListenAndServe(); localErr != nil {
logger.Infof("HTTPserver: ListenAndServe() finished with error: %s", localErr)
}
}()

<-stopCh

// Close the http server gracefully
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
srv.Shutdown(ctx)
}

func init() {
Expand Down
Loading