Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

99 changes: 74 additions & 25 deletions cmd/apiserver_receive_adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,55 +18,96 @@ package main

import (
"flag"
"log"
"fmt"

// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// Uncomment the following line to load the gcp plugin
// (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
"knative.dev/eventing/pkg/adapter/apiserver"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/tracing"
"knative.dev/eventing/pkg/utils"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/signals"
)

const (
component = "apiserversource"
)

var (
masterURL = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
)

type envConfig struct {
Namespace string `envconfig:"SYSTEM_NAMESPACE" default:"default"`
Mode string `envconfig:"MODE"`
SinkURI string `split_words:"true" required:"true"`
ApiVersion []string `split_words:"true" required:"true"`
Kind []string `required:"true"`
Controller []bool `required:"true"`
Namespace string `envconfig:"SYSTEM_NAMESPACE" default:"default"`
Mode string `envconfig:"MODE"`
SinkURI string `split_words:"true" required:"true"`
ApiVersion []string `split_words:"true" required:"true"`
Kind []string `required:"true"`
Controller []bool `required:"true"`
ApiServerImporter string `envconfig:"APISERVERIMPORTER" required:"true"`
// MetricsConfigBase64 is a base64 encoded json string of
// metrics.ExporterOptions. This is used to configure the metrics exporter
// options, the config is stored in a config map inside the controllers
// namespace and copied here.
MetricsConfigBase64 string `envconfig:"K_METRICS_CONFIG" required:"true"`

// LoggingConfigBase64 is a base64 encoded json string of logging.Config.
// This is used to configure the logging config, the config is stored in
// a config map inside the controllers namespace and copied here.
LoggingConfigBase64 string `envconfig:"K_LOGGING_CONFIG" required:"true"`
}

// TODO: the controller should take the list of GVR

func main() {
flag.Parse()

logCfg := zap.NewProductionConfig()
logCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
dlogger, err := logCfg.Build()
var env envConfig
err := envconfig.Process("", &env)
if err != nil {
log.Fatalf("Error building logger: %v", err)
panic(fmt.Sprintf("Error processing env var: %s", err))
}
logger := dlogger.Sugar()

var env envConfig
err = envconfig.Process("", &env)
// Convert base64 encoded json logging.Config to logging.Config.
loggingConfig, err := utils.Base64ToLoggingConfig(
env.LoggingConfigBase64)
if err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log an error, and try to use the default one. If it fails, then panic...
As here:
https://github.com/google/knative-gcp/blob/cabd0d258db1d23246d51fac8b181fd4c363c28e/cmd/pubsub/receive_adapter/main.go#L55

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed it

fmt.Printf("[ERROR] filed to process logging config: %s", err.Error())
// Use default logging config.
if loggingConfig, err = logging.NewConfigFromMap(map[string]string{}); err != nil {
// If this fails, there is no recovering.
panic(err)
}
}
logger, _ := logging.NewLoggerFromConfig(loggingConfig, component)
defer flush(logger)

// Convert base64 encoded json metrics.ExporterOptions to
// metrics.ExporterOptions.
metricsConfig, err := utils.Base64ToMetricsOptions(
env.MetricsConfigBase64)
if err != nil {
logger.Fatalw("Error processing environment", zap.Error(err))
logger.Errorf("failed to process metrics options: %s", err.Error())
}

if err := metrics.UpdateExporter(*metricsConfig, logger); err != nil {
logger.Fatalf("Failed to create the metrics exporter: %v", err)
}

reporter, err := apiserver.NewStatsReporter()
if err != nil {
logger.Fatalw("Error building statsreporter", zap.Error(err))
}

// set up signals so we handle the first shutdown signal gracefully
Expand All @@ -77,17 +118,16 @@ func main() {
logger.Fatalw("Error building kubeconfig", zap.Error(err))
}

logger = logger.With(zap.String("controller/apiserver", "adapter"))
logger.Info("Starting the controller")

client, err := dynamic.NewForConfig(cfg)
if err != nil {
logger.Fatalw("Error building dynamic client", zap.Error(err))
}

if err = tracing.SetupStaticPublishing(logger, "apiserversource", tracing.OnePercentSampling); err != nil {
// If tracing doesn't work, we will log an error, but allow the importer to continue to
// start.
if err = tracing.SetupStaticPublishing(logger, "apiserversource",
tracing.OnePercentSampling); err != nil {
// If tracing doesn't work, we will log an error, but allow the importer
// to continue to start.
logger.Error("Error setting up trace publishing", zap.Error(err))
}

Expand All @@ -107,7 +147,10 @@ func main() {
logger.Fatalw("Error parsing APIVersion", zap.Error(err))
}
// TODO: pass down the resource and the kind so we do not have to guess.
gvr, _ := meta.UnsafeGuessKindToResource(schema.GroupVersionKind{Kind: kind, Group: gv.Group, Version: gv.Version})
gvr, _ := meta.UnsafeGuessKindToResource(schema.GroupVersionKind{
Kind: kind,
Group: gv.Group,
Version: gv.Version})
gvrcs = append(gvrcs, apiserver.GVRC{
GVR: gvr,
Controller: controlled,
Expand All @@ -120,9 +163,15 @@ func main() {
GVRCs: gvrcs,
}

a := apiserver.NewAdaptor(cfg.Host, client, eventsClient, logger, opt)
logger.Info("starting kubernetes api adapter")
a := apiserver.NewAdaptor(cfg.Host, client, eventsClient, logger, opt,
reporter, env.ApiServerImporter)
logger.Info("starting kubernetes api adapter.", zap.Any("adapter", env))
if err := a.Start(stopCh); err != nil {
logger.Warn("start returned an error,", zap.Error(err))
}
}

func flush(logger *zap.SugaredLogger) {
_ = logger.Sync()
metrics.FlushExporter()
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,21 @@
- source_labels: [__meta_kubernetes_service_name]
target_label: service

# apiserver-source
- job_name: apiserver-source
scrape_interval: 3s
scrape_timeout: 3s
kubernetes_sd_configs:
- role: pod
relabel_configs:
# Scrape only the the targets matching the following metadata
- source_labels: [ __meta_kubernetes_pod_label_eventing_knative_dev_source, __meta_kubernetes_pod_container_port_name]
action: keep
regex: apiserver-source-controller;metrics
# Rename metadata labels to be reader friendly
- source_labels: [__meta_kubernetes_namespace]
target_label: namespace
- source_labels: [__meta_kubernetes_pod_name]
target_label: pod
- source_labels: [__meta_kubernetes_service_name]
target_label: service
45 changes: 28 additions & 17 deletions pkg/adapter/apiserver/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,15 @@ type adapter struct {
namespace string
logger *zap.SugaredLogger

mode string
delegate eventDelegate
mode string
delegate eventDelegate
reporter StatsReporter
apiServerImporter string
}

func NewAdaptor(source string, k8sClient dynamic.Interface, ceClient cloudevents.Client, logger *zap.SugaredLogger, opt Options) Adapter {
func NewAdaptor(source string, k8sClient dynamic.Interface,
ceClient cloudevents.Client, logger *zap.SugaredLogger,
opt Options, reporter StatsReporter, apiServerImporter string) Adapter {
mode := opt.Mode
switch mode {
case ResourceMode, RefMode:
Expand All @@ -80,13 +84,15 @@ func NewAdaptor(source string, k8sClient dynamic.Interface, ceClient cloudevents
}

a := &adapter{
k8s: k8sClient,
ce: ceClient,
source: source,
logger: logger,
gvrcs: opt.GVRCs,
namespace: opt.Namespace,
mode: mode,
k8s: k8sClient,
ce: ceClient,
source: source,
logger: logger,
gvrcs: opt.GVRCs,
namespace: opt.Namespace,
mode: mode,
reporter: reporter,
apiServerImporter: apiServerImporter,
}
return a
}
Expand All @@ -101,21 +107,26 @@ func (a *adapter) Start(stopCh <-chan struct{}) error {
stop := make(chan struct{})

resyncPeriod := time.Duration(10 * time.Hour)

var d eventDelegate
switch a.mode {
case ResourceMode:
d = &resource{
ce: a.ce,
source: a.source,
logger: a.logger,
ce: a.ce,
source: a.source,
logger: a.logger,
reporter: a.reporter,
namespace: a.namespace,
apiServerImporter: a.apiServerImporter,
}

case RefMode:
d = &ref{
ce: a.ce,
source: a.source,
logger: a.logger,
ce: a.ce,
source: a.source,
logger: a.logger,
reporter: a.reporter,
namespace: a.namespace,
apiServerImporter: a.apiServerImporter,
}

default:
Expand Down
52 changes: 35 additions & 17 deletions pkg/adapter/apiserver/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,11 @@ func TestNewAdaptor(t *testing.T) {
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {

a := NewAdaptor(tc.source, k8s, ce, logger, tc.opt)
r, err := NewStatsReporter()
if err != nil {
t.Fatalf("Failed to create a new reporter: %v", err)
}
a := NewAdaptor(tc.source, k8s, ce, logger, tc.opt, r, "test-importer")

got, ok := a.(*adapter)
if !ok {
Expand Down Expand Up @@ -147,10 +150,14 @@ func TestAdapter_StartRef(t *testing.T) {
},
}},
}
r, err := NewStatsReporter()
if err != nil {
t.Fatalf("Failed to create a new reporter: %v", err)
}

a := NewAdaptor(source, k8s, ce, logger, opt)
a := NewAdaptor(source, k8s, ce, logger, opt, r, "test-importer")

err := errors.New("test never ran")
err = errors.New("test never ran")
stopCh := make(chan struct{})
done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -182,10 +189,13 @@ func TestAdapter_StartResource(t *testing.T) {
},
}},
}
r, err := NewStatsReporter()
if err != nil {
t.Fatalf("Failed to create a new reporter: %v", err)
}
a := NewAdaptor(source, k8s, ce, logger, opt, r, "test-importer")

a := NewAdaptor(source, k8s, ce, logger, opt)

err := errors.New("test never ran")
err = errors.New("test never ran")
stopCh := make(chan struct{})
done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -264,26 +274,34 @@ func validateNotSent(t *testing.T, ce *kncetesting.TestCloudEventsClient, want s
}
}

func makeResourceAndTestingClient() (*resource, *kncetesting.TestCloudEventsClient) {
func makeResourceAndTestingClient(t *testing.T) (*resource, *kncetesting.TestCloudEventsClient) {
ce := kncetesting.NewTestClient()
source := "unit-test"
logger := zap.NewExample().Sugar()

r, err := NewStatsReporter()
if err != nil {
t.Fatalf("Failed to create a new reporter: %v", err)
}
return &resource{
ce: ce,
source: source,
logger: logger,
ce: ce,
source: source,
logger: logger,
reporter: r,
}, ce
}

func makeRefAndTestingClient() (*ref, *kncetesting.TestCloudEventsClient) {
func makeRefAndTestingClient(t *testing.T) (*ref, *kncetesting.TestCloudEventsClient) {
ce := kncetesting.NewTestClient()
source := "unit-test"
logger := zap.NewExample().Sugar()

r, err := NewStatsReporter()
if err != nil {
t.Fatalf("Failed to create a new reporter: %v", err)
}
return &ref{
ce: ce,
source: source,
logger: logger,
ce: ce,
source: source,
logger: logger,
reporter: r,
}, ce
}
6 changes: 4 additions & 2 deletions pkg/adapter/apiserver/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func MakeAddEvent(source string, obj interface{}) (*cloudevents.Event, error) {
}
object := obj.(*unstructured.Unstructured)

return makeEvent(source, sourcesv1alpha1.ApiServerSourceAddEventType, object, object)
return makeEvent(source, sourcesv1alpha1.ApiServerSourceAddEventType,
object, object)
}

func MakeUpdateEvent(source string, obj interface{}) (*cloudevents.Event, error) {
Expand All @@ -43,7 +44,8 @@ func MakeUpdateEvent(source string, obj interface{}) (*cloudevents.Event, error)
}
object := obj.(*unstructured.Unstructured)

return makeEvent(source, sourcesv1alpha1.ApiServerSourceUpdateEventType, object, object)
return makeEvent(source, sourcesv1alpha1.ApiServerSourceUpdateEventType,
object, object)
}

func MakeDeleteEvent(source string, obj interface{}) (*cloudevents.Event, error) {
Expand Down
Loading