Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,4 @@ required = [

[[constraint]]
name = "github.com/tsenart/vegeta"
version = "12.7.0"
version = "12.7.0"
30 changes: 13 additions & 17 deletions cmd/apiserver_receive_adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"knative.dev/eventing/pkg/adapter/apiserver"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/tracing"
"knative.dev/eventing/pkg/utils"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/signals"
Expand Down Expand Up @@ -67,16 +66,16 @@ type envConfig struct {
Controller []bool `required:"true"`
LabelSelector StringList `envconfig:"SELECTOR" required:"true"`
Name string `envconfig:"NAME" required:"true"`
// MetricsConfigBase64 is a base64 encoded json string of
// metrics.ExporterOptions. This is used to configure the metrics exporter
// options, the config is stored in a config map inside the controllers
// MetricsConfigJson is a json string of metrics.ExporterOptions.
// This is used to configure the metrics exporter options,
// the config is stored in a config map inside the controllers
// namespace and copied here.
MetricsConfigBase64 string `envconfig:"K_METRICS_CONFIG" required:"true"`
MetricsConfigJson string `envconfig:"K_METRICS_CONFIG" required:"true"`

// LoggingConfigBase64 is a base64 encoded json string of logging.Config.
// LoggingConfigJson is a json string of logging.Config.
// This is used to configure the logging config, the config is stored in
// a config map inside the controllers namespace and copied here.
LoggingConfigBase64 string `envconfig:"K_LOGGING_CONFIG" required:"true"`
LoggingConfigJson string `envconfig:"K_LOGGING_CONFIG" required:"true"`
}

// TODO: the controller should take the list of GVR
Expand All @@ -89,9 +88,8 @@ func main() {
if err != nil {
panic(fmt.Sprintf("Error processing env var: %s", err))
}
// TODO move this util to pkg
// Convert base64 encoded json logging.Config to logging.Config.
loggingConfig, err := utils.Base64ToLoggingConfig(env.LoggingConfigBase64)
Comment thread
nachocano marked this conversation as resolved.
// Convert json logging.Config to logging.Config.
loggingConfig, err := logging.JsonToLoggingConfig(env.LoggingConfigJson)
if err != nil {
fmt.Printf("[ERROR] failed to process logging config: %s", err.Error())
// Use default logging config.
Expand All @@ -104,10 +102,8 @@ func main() {
logger := loggerSugared.Desugar()
defer flush(loggerSugared)

// Convert base64 encoded json metrics.ExporterOptions to
// metrics.ExporterOptions.
metricsConfig, err := utils.Base64ToMetricsOptions(
env.MetricsConfigBase64)
// Convert json metrics.ExporterOptions to metrics.ExporterOptions.
metricsConfig, err := metrics.JsonToMetricsOptions(env.MetricsConfigJson)
if err != nil {
logger.Error("failed to process metrics options", zap.Error(err))
}
Expand All @@ -116,7 +112,7 @@ func main() {
logger.Error("failed to create the metrics exporter", zap.Error(err))
}

reporter, err := apiserver.NewStatsReporter()
reporter, err := metrics.NewStatsReporter()
if err != nil {
logger.Error("error building statsreporter", zap.Error(err))
}
Expand Down Expand Up @@ -173,9 +169,9 @@ func main() {
}

a := apiserver.NewAdaptor(cfg.Host, client, eventsClient, loggerSugared, opt, reporter, env.Name)
logger.Info("starting kubernetes api adapter.", zap.Any("adapter", env))
logger.Info("starting kubernetes api adapter", zap.Any("adapter", env))
if err := a.Start(stopCh); err != nil {
logger.Warn("start returned an error,", zap.Error(err))
logger.Warn("start returned an error", zap.Error(err))
}
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/adapter/apiserver/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

cloudevents "github.com/cloudevents/sdk-go"
"go.uber.org/zap"
"knative.dev/pkg/metrics"
)

type Adapter interface {
Expand All @@ -41,6 +42,8 @@ const (
RefMode = "Ref"
// ResourceMode produces payloads of ResourceEvent
ResourceMode = "Resource"

resourceGroup = "apiserversources.sources.eventing.knative.dev"
)

// Options hold the options for the Adapter.
Expand All @@ -67,13 +70,13 @@ type adapter struct {

mode string
delegate eventDelegate
reporter StatsReporter
reporter metrics.StatsReporter
name string
}

func NewAdaptor(source string, k8sClient dynamic.Interface,
ceClient cloudevents.Client, logger *zap.SugaredLogger,
opt Options, reporter StatsReporter, name string) Adapter {
opt Options, reporter metrics.StatsReporter, name string) Adapter {
mode := opt.Mode
switch mode {
case ResourceMode, RefMode:
Expand Down
16 changes: 14 additions & 2 deletions pkg/adapter/apiserver/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ import (
dynamicfake "k8s.io/client-go/dynamic/fake"
kncetesting "knative.dev/eventing/pkg/kncloudevents/testing"
rectesting "knative.dev/eventing/pkg/reconciler/testing"
"knative.dev/pkg/metrics"
)

type mockReporter struct{}
type mockReporter struct {
eventCount int
}

func (r *mockReporter) ReportEventCount(args *ReportArgs, responseCode int) error {
func (r *mockReporter) ReportEventCount(args *metrics.ReportArgs, responseCode int) error {
r.eventCount += 1
return nil
}

Expand Down Expand Up @@ -319,3 +323,11 @@ func makeRefAndTestingClient() (*ref, *kncetesting.TestCloudEventsClient) {
reporter: r,
}, ce
}

func validateMetric(t *testing.T, reporter metrics.StatsReporter, want int) {
Comment thread
nachocano marked this conversation as resolved.
if mockReporter, ok := reporter.(*mockReporter); !ok {
t.Errorf("reporter is not a mockReporter")
} else if mockReporter.eventCount != want {
t.Errorf("Expected %d for metric, got %d", want, mockReporter.eventCount)
}
}
14 changes: 8 additions & 6 deletions pkg/adapter/apiserver/ref.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package apiserver

import (
"context"
"knative.dev/pkg/metrics"
"reflect"

"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -37,7 +38,7 @@ type ref struct {
logger *zap.SugaredLogger

controlledGVRs []schema.GroupVersionResource
reporter StatsReporter
reporter metrics.StatsReporter
namespace string
name string
}
Expand Down Expand Up @@ -105,11 +106,12 @@ func (a *ref) addControllerWatch(gvr schema.GroupVersionResource) {
}

func (a *ref) sendEvent(ctx context.Context, event *cloudevents.Event) error {
reportArgs := &ReportArgs{
ns: a.namespace,
eventSource: event.Source(),
eventType: event.Type(),
name: a.name,
reportArgs := &metrics.ReportArgs{
Namespace: a.namespace,
EventSource: event.Source(),
EventType: event.Type(),
Name: a.name,
ResourceGroup: resourceGroup,
}

rctx, _, err := a.ce.Send(ctx, *event)
Expand Down
9 changes: 9 additions & 0 deletions pkg/adapter/apiserver/ref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,42 @@ func TestRefAddEvent(t *testing.T) {
d, ce := makeRefAndTestingClient()
d.Add(simplePod("unit", "test"))
validateSent(t, ce, sourcesv1alpha1.ApiServerSourceAddRefEventType)
validateMetric(t, d.reporter, 1)
Comment thread
nachocano marked this conversation as resolved.
}

func TestRefUpdateEvent(t *testing.T) {
d, ce := makeRefAndTestingClient()
d.Update(simplePod("unit", "test"))
validateSent(t, ce, sourcesv1alpha1.ApiServerSourceUpdateRefEventType)
validateMetric(t, d.reporter, 1)
}

func TestRefDeleteEvent(t *testing.T) {
d, ce := makeRefAndTestingClient()
d.Delete(simplePod("unit", "test"))
validateSent(t, ce, sourcesv1alpha1.ApiServerSourceDeleteRefEventType)
validateMetric(t, d.reporter, 1)
}

func TestRefAddEventNil(t *testing.T) {
d, ce := makeRefAndTestingClient()
d.Add(nil)
validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceAddRefEventType)
validateMetric(t, d.reporter, 0)
}

func TestRefUpdateEventNil(t *testing.T) {
d, ce := makeRefAndTestingClient()
d.Update(nil)
validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceUpdateRefEventType)
validateMetric(t, d.reporter, 0)
}

func TestRefDeleteEventNil(t *testing.T) {
d, ce := makeRefAndTestingClient()
d.Delete(nil)
validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceDeleteRefEventType)
validateMetric(t, d.reporter, 0)
}

func TestRefAddEventAsController(t *testing.T) {
Expand All @@ -52,6 +58,7 @@ func TestRefAddEventAsController(t *testing.T) {
})
d.Add(simpleOwnedPod("unit", "test"))
validateSent(t, ce, sourcesv1alpha1.ApiServerSourceAddRefEventType)
validateMetric(t, d.reporter, 1)
}

func TestRefUpdateEventAsController(t *testing.T) {
Expand All @@ -63,6 +70,7 @@ func TestRefUpdateEventAsController(t *testing.T) {
})
d.Update(simpleOwnedPod("unit", "test"))
validateSent(t, ce, sourcesv1alpha1.ApiServerSourceUpdateRefEventType)
validateMetric(t, d.reporter, 1)
}

func TestRefDeleteEventAsController(t *testing.T) {
Expand All @@ -74,6 +82,7 @@ func TestRefDeleteEventAsController(t *testing.T) {
})
d.Delete(simpleOwnedPod("unit", "test"))
validateSent(t, ce, sourcesv1alpha1.ApiServerSourceDeleteRefEventType)
validateMetric(t, d.reporter, 1)
}

// HACKHACKHACK For test coverage.
Expand Down
22 changes: 12 additions & 10 deletions pkg/adapter/apiserver/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/adapter/apiserver/events"
"knative.dev/pkg/metrics"
)

type resource struct {
ce cloudevents.Client
source string
eventType string
logger *zap.SugaredLogger
reporter StatsReporter
reporter metrics.StatsReporter
namespace string
name string
}
Expand All @@ -45,7 +46,7 @@ func (a *resource) Add(obj interface{}) error {
return err
}

return a.sendEvent(context.Background(), event, a.reporter)
return a.sendEvent(context.Background(), event)
}

func (a *resource) Update(obj interface{}) error {
Expand All @@ -55,7 +56,7 @@ func (a *resource) Update(obj interface{}) error {
return err
}

return a.sendEvent(context.Background(), event, a.reporter)
return a.sendEvent(context.Background(), event)

return nil
}
Expand All @@ -67,15 +68,16 @@ func (a *resource) Delete(obj interface{}) error {
return err
}

return a.sendEvent(context.Background(), event, a.reporter)
return a.sendEvent(context.Background(), event)
}

func (a *resource) sendEvent(ctx context.Context, event *cloudevents.Event, reporter StatsReporter) error {
reportArgs := &ReportArgs{
ns: a.namespace,
eventSource: event.Source(),
eventType: event.Type(),
name: a.name,
func (a *resource) sendEvent(ctx context.Context, event *cloudevents.Event) error {
reportArgs := &metrics.ReportArgs{
Namespace: a.namespace,
EventSource: event.Source(),
EventType: event.Type(),
Name: a.name,
ResourceGroup: resourceGroup,
}

rctx, _, err := a.ce.Send(ctx, *event)
Expand Down
6 changes: 6 additions & 0 deletions pkg/adapter/apiserver/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,42 @@ func TestResourceAddEvent(t *testing.T) {
d, ce := makeResourceAndTestingClient()
d.Add(simplePod("unit", "test"))
validateSent(t, ce, sourcesv1alpha1.ApiServerSourceAddEventType)
validateMetric(t, d.reporter, 1)
}

func TestResourceUpdateEvent(t *testing.T) {
d, ce := makeResourceAndTestingClient()
d.Update(simplePod("unit", "test"))
validateSent(t, ce, sourcesv1alpha1.ApiServerSourceUpdateEventType)
validateMetric(t, d.reporter, 1)
}

func TestResourceDeleteEvent(t *testing.T) {
d, ce := makeResourceAndTestingClient()
d.Delete(simplePod("unit", "test"))
validateSent(t, ce, sourcesv1alpha1.ApiServerSourceDeleteEventType)
validateMetric(t, d.reporter, 1)
}

func TestResourceAddEventNil(t *testing.T) {
d, ce := makeResourceAndTestingClient()
d.Add(nil)
validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceAddEventType)
validateMetric(t, d.reporter, 0)
}

func TestResourceUpdateEventNil(t *testing.T) {
d, ce := makeResourceAndTestingClient()
d.Update(nil)
validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceUpdateEventType)
validateMetric(t, d.reporter, 0)
}

func TestResourceDeleteEventNil(t *testing.T) {
d, ce := makeResourceAndTestingClient()
d.Delete(nil)
validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceDeleteEventType)
validateMetric(t, d.reporter, 0)
}

func TestResourceCoverageHacks(t *testing.T) {
Expand Down
Loading