Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
4589c3a
Added metrics endpoint
sayanh Aug 29, 2019
18e4586
Added event type and event source
sayanh Aug 29, 2019
418e3e5
Added config test
sayanh Aug 30, 2019
0982be1
Changes based on PR review
sayanh Aug 30, 2019
ec0bd10
Updated knative.dev/pkg
sayanh Aug 30, 2019
311778a
More refactorings based on PR review
sayanh Sep 1, 2019
6e85c31
Fixed result
sayanh Sep 1, 2019
416d5f1
Moved config to utils
sayanh Sep 1, 2019
8d3fd3b
Fix name
sayanh Sep 1, 2019
b5c7652
Stats reporter refactor
nachocano Aug 26, 2019
502e355
updating lock
nachocano Aug 26, 2019
3aeb383
updates after code review
nachocano Aug 26, 2019
32f723e
go imports
nachocano Aug 26, 2019
a089b2d
not working
nachocano Aug 31, 2019
d044b0d
updates
nachocano Sep 1, 2019
4951406
temp
sayanh Sep 2, 2019
01ec663
Added HTTP response codes and classes to metrics
sayanh Sep 2, 2019
07230eb
Cleaned
sayanh Sep 2, 2019
0e065b5
Cleaned
sayanh Sep 2, 2019
d37e614
trying out stuff§
sayanh Sep 3, 2019
180f5d6
Rebasing stuff
sayanh Sep 3, 2019
2efd308
Fixing adapter tests
sayanh Sep 3, 2019
0c14a00
Fix gen code
sayanh Sep 3, 2019
e6b8962
Changes based on PR review
sayanh Sep 3, 2019
f73ddce
Changes based on PR review
sayanh Sep 3, 2019
48fcc64
Fixed rebase issues
sayanh Sep 5, 2019
ab0d043
Not passing t anymore
sayanh Sep 6, 2019
d18d1dd
Rename context to loggingContext
sayanh Sep 6, 2019
b4c4059
Remove unnecessary comments
sayanh Sep 6, 2019
0ebae80
More fixes based on review
sayanh Sep 6, 2019
9426433
Change metricsconfig signature
sayanh Sep 6, 2019
6af5ef9
Fix logging to desugared
sayanh Sep 6, 2019
bc8e6e2
Fix logging to desugared
sayanh Sep 6, 2019
891a1f1
Added logs for error
sayanh Sep 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

89 changes: 66 additions & 23 deletions cmd/apiserver_receive_adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,31 @@ package main

import (
"flag"
"log"
"fmt"
"strings"

// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// Uncomment the following line to load the gcp plugin
// (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
"knative.dev/eventing/pkg/adapter/apiserver"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/tracing"
"knative.dev/eventing/pkg/utils"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/signals"
)

const (
component = "apiserversource"
)

var (
masterURL = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
Expand All @@ -60,52 +66,84 @@ type envConfig struct {
Kind StringList `required:"true"`
Controller []bool `required:"true"`
LabelSelector StringList `envconfig:"SELECTOR" required:"true"`
Name string `envconfig:"NAME" required:"true"`
// MetricsConfigBase64 is a base64 encoded json string of
// metrics.ExporterOptions. This is used to configure the metrics exporter
// options, the config is stored in a config map inside the controllers
// namespace and copied here.
MetricsConfigBase64 string `envconfig:"K_METRICS_CONFIG" required:"true"`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a foolish question, but why are we base64 encoding these? Flags tend to make sense, as it can be very annoying to get the escaping just right, but these are environment variables, where we don't have that problem.

Copy link
Copy Markdown
Contributor Author

@sayanh sayanh Sep 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My rationale was to be consistent with knative-gcp in here. @n3wscott What's your
take on this?


// LoggingConfigBase64 is a base64 encoded json string of logging.Config.
// This is used to configure the logging config, the config is stored in
// a config map inside the controllers namespace and copied here.
LoggingConfigBase64 string `envconfig:"K_LOGGING_CONFIG" required:"true"`
}

// TODO: the controller should take the list of GVR

func main() {
flag.Parse()

logCfg := zap.NewProductionConfig()
logCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
dlogger, err := logCfg.Build()
var env envConfig
err := envconfig.Process("", &env)
if err != nil {
log.Fatalf("Error building logger: %v", err)
panic(fmt.Sprintf("Error processing env var: %s", err))
}
// TODO move this util to pkg
// Convert base64 encoded json logging.Config to logging.Config.
loggingConfig, err := utils.Base64ToLoggingConfig(env.LoggingConfigBase64)
if err != nil {
fmt.Printf("[ERROR] failed to process logging config: %s", err.Error())
// Use default logging config.
if loggingConfig, err = logging.NewConfigFromMap(map[string]string{}); err != nil {
// If this fails, there is no recovering.
panic(err)
}
}
loggerSugared, _ := logging.NewLoggerFromConfig(loggingConfig, component)
logger := loggerSugared.Desugar()
defer flush(loggerSugared)

// Convert base64 encoded json metrics.ExporterOptions to
// metrics.ExporterOptions.
metricsConfig, err := utils.Base64ToMetricsOptions(
Comment thread
sayanh marked this conversation as resolved.
env.MetricsConfigBase64)
if err != nil {
logger.Error("failed to process metrics options ", zap.Error(err))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.Error("failed to process metrics options ", zap.Error(err))
logger.Error("failed to process metrics options", zap.Error(err))

Don't need the extra space when using the desugared logger. Not here or the other lines.

Once, again, I'll let it submit as-is, please send a follow up PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do, thanks @Harwayne!

}
logger := dlogger.Sugar()

var env envConfig
err = envconfig.Process("", &env)
if err := metrics.UpdateExporter(*metricsConfig, loggerSugared); err != nil {
logger.Error("failed to create the metrics exporter ", zap.Error(err))
}

reporter, err := apiserver.NewStatsReporter()
if err != nil {
logger.Fatalw("Error processing environment", zap.Error(err))
logger.Error("error building statsreporter", zap.Error(err))
}

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

cfg, err := clientcmd.BuildConfigFromFlags(*masterURL, *kubeconfig)
if err != nil {
logger.Fatalw("Error building kubeconfig", zap.Error(err))
logger.Fatal("error building kubeconfig", zap.Error(err))
}

logger = logger.With(zap.String("controller/apiserver", "adapter"))
logger.Info("Starting the controller")

client, err := dynamic.NewForConfig(cfg)
if err != nil {
logger.Fatalw("Error building dynamic client", zap.Error(err))
logger.Fatal("error building dynamic client", zap.Error(err))
}

if err = tracing.SetupStaticPublishing(logger, "apiserversource", tracing.OnePercentSampling); err != nil {
// If tracing doesn't work, we will log an error, but allow the importer to continue to
// start.
if err = tracing.SetupStaticPublishing(loggerSugared, "apiserversource", tracing.OnePercentSampling); err != nil {
// If tracing doesn't work, we will log an error, but allow the importer
// to continue to start.
logger.Error("Error setting up trace publishing", zap.Error(err))
}

eventsClient, err := kncloudevents.NewDefaultClient(env.SinkURI)
if err != nil {
logger.Fatalw("Error building cloud event client", zap.Error(err))
logger.Fatal("error building cloud event client", zap.Error(err))
}

gvrcs := []apiserver.GVRC(nil)
Expand All @@ -117,7 +155,7 @@ func main() {

gv, err := schema.ParseGroupVersion(apiVersion)
if err != nil {
logger.Fatalw("Error parsing APIVersion", zap.Error(err))
logger.Fatal("error parsing APIVersion", zap.Error(err))
}
// TODO: pass down the resource and the kind so we do not have to guess.
gvr, _ := meta.UnsafeGuessKindToResource(schema.GroupVersionKind{Kind: kind, Group: gv.Group, Version: gv.Version})
Expand All @@ -134,9 +172,14 @@ func main() {
GVRCs: gvrcs,
}

a := apiserver.NewAdaptor(cfg.Host, client, eventsClient, logger, opt)
logger.Info("starting kubernetes api adapter")
a := apiserver.NewAdaptor(cfg.Host, client, eventsClient, loggerSugared, opt, reporter, env.Name)
logger.Info("starting kubernetes api adapter.", zap.Any("adapter", env))
if err := a.Start(stopCh); err != nil {
logger.Warn("start returned an error,", zap.Error(err))
}
}

func flush(logger *zap.SugaredLogger) {
_ = logger.Sync()
metrics.FlushExporter()
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,21 @@
- source_labels: [__meta_kubernetes_service_name]
target_label: service

# apiserver-source
- job_name: apiserver-source
scrape_interval: 3s
scrape_timeout: 3s
kubernetes_sd_configs:
- role: pod
relabel_configs:
# Scrape only the the targets matching the following metadata
- source_labels: [ __meta_kubernetes_pod_label_eventing_knative_dev_source, __meta_kubernetes_pod_container_port_name]
action: keep
regex: apiserver-source-controller;metrics
# Rename metadata labels to be reader friendly
- source_labels: [__meta_kubernetes_namespace]
target_label: namespace
- source_labels: [__meta_kubernetes_pod_name]
target_label: pod
- source_labels: [__meta_kubernetes_service_name]
target_label: service
27 changes: 19 additions & 8 deletions pkg/adapter/apiserver/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,13 @@ type adapter struct {

mode string
delegate eventDelegate
reporter StatsReporter
name string
}

func NewAdaptor(source string, k8sClient dynamic.Interface, ceClient cloudevents.Client, logger *zap.SugaredLogger, opt Options) Adapter {
func NewAdaptor(source string, k8sClient dynamic.Interface,
ceClient cloudevents.Client, logger *zap.SugaredLogger,
opt Options, reporter StatsReporter, name string) Adapter {
mode := opt.Mode
switch mode {
case ResourceMode, RefMode:
Expand All @@ -88,6 +92,8 @@ func NewAdaptor(source string, k8sClient dynamic.Interface, ceClient cloudevents
gvrcs: opt.GVRCs,
namespace: opt.Namespace,
mode: mode,
reporter: reporter,
name: name,
}
return a
}
Expand All @@ -102,21 +108,26 @@ func (a *adapter) Start(stopCh <-chan struct{}) error {
stop := make(chan struct{})

resyncPeriod := time.Duration(10 * time.Hour)

var d eventDelegate
switch a.mode {
case ResourceMode:
d = &resource{
ce: a.ce,
source: a.source,
logger: a.logger,
ce: a.ce,
source: a.source,
logger: a.logger,
reporter: a.reporter,
namespace: a.namespace,
name: a.name,
}

case RefMode:
d = &ref{
ce: a.ce,
source: a.source,
logger: a.logger,
ce: a.ce,
source: a.source,
logger: a.logger,
reporter: a.reporter,
namespace: a.namespace,
name: a.name,
}

default:
Expand Down
36 changes: 22 additions & 14 deletions pkg/adapter/apiserver/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ import (
rectesting "knative.dev/eventing/pkg/reconciler/testing"
)

type mockReporter struct{}

func (r *mockReporter) ReportEventCount(args *ReportArgs, responseCode int) error {
return nil
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would still prefer if we asserted the metrics were incremented in the test, but I won't hold up this PR for it.

}

func TestNewAdaptor(t *testing.T) {
ce := kncetesting.NewTestClient()
logger := zap.NewExample().Sugar()
Expand Down Expand Up @@ -132,8 +138,8 @@ func TestNewAdaptor(t *testing.T) {
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {

a := NewAdaptor(tc.source, k8s, ce, logger, tc.opt)
r := &mockReporter{}
a := NewAdaptor(tc.source, k8s, ce, logger, tc.opt, r, "test-importer")

got, ok := a.(*adapter)
if !ok {
Expand Down Expand Up @@ -171,8 +177,8 @@ func TestAdapter_StartRef(t *testing.T) {
},
}},
}

a := NewAdaptor(source, k8s, ce, logger, opt)
r := &mockReporter{}
a := NewAdaptor(source, k8s, ce, logger, opt, r, "test-importer")

err := errors.New("test never ran")
stopCh := make(chan struct{})
Expand Down Expand Up @@ -206,8 +212,8 @@ func TestAdapter_StartResource(t *testing.T) {
},
}},
}

a := NewAdaptor(source, k8s, ce, logger, opt)
r := &mockReporter{}
a := NewAdaptor(source, k8s, ce, logger, opt, r, "test-importer")

err := errors.New("test never ran")
stopCh := make(chan struct{})
Expand Down Expand Up @@ -292,22 +298,24 @@ func makeResourceAndTestingClient() (*resource, *kncetesting.TestCloudEventsClie
ce := kncetesting.NewTestClient()
source := "unit-test"
logger := zap.NewExample().Sugar()

r := &mockReporter{}
return &resource{
ce: ce,
source: source,
logger: logger,
ce: ce,
source: source,
logger: logger,
reporter: r,
}, ce
}

func makeRefAndTestingClient() (*ref, *kncetesting.TestCloudEventsClient) {
ce := kncetesting.NewTestClient()
source := "unit-test"
logger := zap.NewExample().Sugar()

r := &mockReporter{}
return &ref{
ce: ce,
source: source,
logger: logger,
ce: ce,
source: source,
logger: logger,
reporter: r,
}, ce
}
Loading