Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 58 additions & 11 deletions cmd/cronjob_receive_adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@ package main

import (
"flag"
"fmt"
"log"

"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/net/context"
"knative.dev/eventing/pkg/adapter/cronjobevents"
"knative.dev/eventing/pkg/tracing"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/signals"
"knative.dev/pkg/source"
)

type envConfig struct {
Expand All @@ -44,29 +47,67 @@ type envConfig struct {

// Environment variable containing the namespace of the cron job.
Namespace string `envconfig:"NAMESPACE" required:"true"`

// MetricsConfigJson is a json string of metrics.ExporterOptions.
// This is used to configure the metrics exporter options,
// the config is stored in a config map inside the controllers
// namespace and copied here.
MetricsConfigJson string `envconfig:"K_METRICS_CONFIG" required:"true"`

// LoggingConfigJson is a json string of logging.Config.
// This is used to configure the logging config, the config is stored in
// a config map inside the controllers namespace and copied here.
LoggingConfigJson string `envconfig:"K_LOGGING_CONFIG" required:"true"`
}

const (
component = "cronjobsource"
)

func main() {
flag.Parse()

ctx := context.Background()
logCfg := zap.NewProductionConfig()
logCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
dlogger, err := logCfg.Build()
var env envConfig
err := envconfig.Process("", &env)
if err != nil {
log.Fatalf("Error building logger: %v", err)
panic(fmt.Sprintf("Error processing env var: %s", err))
}
// Convert json logging.Config to logging.Config.
loggingConfig, err := logging.JsonToLoggingConfig(env.LoggingConfigJson)
if err != nil {
fmt.Printf("[ERROR] failed to process logging config: %s", err)
// Use default logging config.
if loggingConfig, err = logging.NewConfigFromMap(map[string]string{}); err != nil {
// If this fails, there is no recovering.
panic(err)
}
}
loggerSugared, _ := logging.NewLoggerFromConfig(loggingConfig, component)
logger := loggerSugared.Desugar()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you 😁

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anytime

defer flush(loggerSugared)

// Convert json metrics.ExporterOptions to metrics.ExporterOptions.
metricsConfig, err := metrics.JsonToMetricsOptions(env.MetricsConfigJson)
if err != nil {
logger.Fatal("failed to process metrics options", zap.Error(err))
}

if err := metrics.UpdateExporter(*metricsConfig, loggerSugared); err != nil {
logger.Error("failed to create the metrics exporter", zap.Error(err))
}
logger := dlogger.Sugar()

var env envConfig
if err := envconfig.Process("", &env); err != nil {
log.Fatal("Failed to process env var", zap.Error(err))
}

if err = tracing.SetupStaticPublishing(logger, "cronjobsource", tracing.OnePercentSampling); err != nil {
// If tracing doesn't work, we will log an error, but allow the source to continue to
reporter, err := source.NewStatsReporter()
if err != nil {
logger.Error("error building statsreporter", zap.Error(err))
}
if err = tracing.SetupStaticPublishing(loggerSugared, "cronjobsource", tracing.OnePercentSampling); err != nil {
// If tracing doesn't work, we will log an error, but allow the importer to continue to
// start.
logger.Errorw("Error setting up trace publishing", err)
logger.Error("Error setting up trace publishing", zap.Error(err))
}

adapter := &cronjobevents.Adapter{
Expand All @@ -75,6 +116,7 @@ func main() {
SinkURI: env.Sink,
Name: env.Name,
Namespace: env.Namespace,
Reporter: reporter,
}

logger.Info("Starting Receive Adapter", zap.Any("adapter", adapter))
Expand All @@ -85,3 +127,8 @@ func main() {
logger.Fatal("Failed to start adapter", zap.Error(err))
}
}

func flush(logger *zap.SugaredLogger) {
_ = logger.Sync()
metrics.FlushExporter()
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,22 @@
target_label: pod
- source_labels: [__meta_kubernetes_service_name]
target_label: service

# cronjob-source
- job_name: cronjob-source
scrape_interval: 3s
scrape_timeout: 3s
kubernetes_sd_configs:
- role: pod
relabel_configs:
# Scrape only the the targets matching the following metadata
- source_labels: [ __meta_kubernetes_pod_label_eventing_knative_dev_source, __meta_kubernetes_pod_container_port_name]
action: keep
regex: cronjob-source-controller;metrics
# Rename metadata labels to be reader friendly
- source_labels: [__meta_kubernetes_namespace]
target_label: namespace
- source_labels: [__meta_kubernetes_pod_name]
target_label: pod
- source_labels: [__meta_kubernetes_service_name]
target_label: service
21 changes: 19 additions & 2 deletions pkg/adapter/cronjobevents/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
sourcesv1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/pkg/logging"
"knative.dev/pkg/source"
)

// TODO: this should be a k8s cron.
Expand All @@ -49,8 +50,14 @@ type Adapter struct {

// client sends cloudevents.
client cloudevents.Client

Reporter source.StatsReporter
}

const (
resourceGroup = "cronjobsources.sources.eventing.knative.dev"
)

// Initialize cloudevent client
func (a *Adapter) initClient() error {
if a.client == nil {
Expand Down Expand Up @@ -92,10 +99,20 @@ func (a *Adapter) cronTick() {
event.SetType(sourcesv1alpha1.CronJobEventType)
event.SetSource(sourcesv1alpha1.CronJobEventSource(a.Namespace, a.Name))
event.SetData(message(a.Data))
reportArgs := &source.ReportArgs{
Namespace: a.Namespace,
EventSource: event.Source(),
EventType: event.Type(),
Name: a.Name,
ResourceGroup: resourceGroup,
}

if _, _, err := a.client.Send(context.TODO(), event); err != nil {
logger.Error("failed to send cloudevent", err)
rctx, _, err := a.client.Send(context.TODO(), event)
rtctx := cloudevents.HTTPTransportContextFrom(rctx)
if err != nil {
logger.Error("failed to send cloudevent", zap.Error(err))
}
a.Reporter.ReportEventCount(reportArgs, rtctx.StatusCode)
}

type Message struct {
Expand Down
32 changes: 30 additions & 2 deletions pkg/adapter/cronjobevents/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,18 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"knative.dev/pkg/source"
)

type mockReporter struct {
eventCount int
}

func (r *mockReporter) ReportEventCount(args *source.ReportArgs, responseCode int) error {
r.eventCount++
return nil
}

func TestStart_ServeHTTP(t *testing.T) {
testCases := map[string]struct {
schedule string
Expand Down Expand Up @@ -55,10 +65,12 @@ func TestStart_ServeHTTP(t *testing.T) {
sinkServer := httptest.NewServer(h)
defer sinkServer.Close()

r := &mockReporter{}
a := &Adapter{
Schedule: tc.schedule,
Data: "data",
SinkURI: sinkServer.URL,
Reporter: r,
}

if err := a.initClient(); err != nil {
Expand All @@ -77,6 +89,7 @@ func TestStart_ServeHTTP(t *testing.T) {
}()

a.cronTick() // force a tick.
validateMetric(t, a.Reporter, 1)

if tc.reqBody != string(h.body) {
t.Errorf("expected request body %q, but got %q", tc.reqBody, h.body)
Expand All @@ -89,8 +102,10 @@ func TestStart_ServeHTTP(t *testing.T) {
func TestStartBadCron(t *testing.T) {
schedule := "bad"

r := &mockReporter{}
a := &Adapter{
Schedule: schedule,
Reporter: r,
}

stop := make(chan struct{})
Expand All @@ -99,6 +114,8 @@ func TestStartBadCron(t *testing.T) {
t.Errorf("failed to fail, %v", err)

}

validateMetric(t, a.Reporter, 0)
}

func TestPostMessage_ServeHTTP(t *testing.T) {
Expand All @@ -125,9 +142,11 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
sinkServer := httptest.NewServer(h)
defer sinkServer.Close()

r := &mockReporter{}
a := &Adapter{
Data: "data",
SinkURI: sinkServer.URL,
Data: "data",
SinkURI: sinkServer.URL,
Reporter: r,
}

if err := a.initClient(); err != nil {
Expand All @@ -139,6 +158,7 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
if tc.reqBody != string(h.body) {
t.Errorf("expected request body %q, but got %q", tc.reqBody, h.body)
}
validateMetric(t, a.Reporter, 1)
})
}
}
Expand Down Expand Up @@ -206,3 +226,11 @@ func sinkAccepted(writer http.ResponseWriter, req *http.Request) {
func sinkRejected(writer http.ResponseWriter, _ *http.Request) {
writer.WriteHeader(http.StatusRequestTimeout)
}

func validateMetric(t *testing.T, reporter source.StatsReporter, want int) {
if mockReporter, ok := reporter.(*mockReporter); !ok {
t.Errorf("reporter is not a mockReporter")
} else if mockReporter.eventCount != want {
t.Errorf("Expected %d for metric, got %d", want, mockReporter.eventCount)
}
}
5 changes: 5 additions & 0 deletions pkg/reconciler/cronjobsource/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
)

const (
Expand Down Expand Up @@ -69,6 +71,7 @@ func NewController(
deploymentLister: deploymentInformer.Lister(),
eventTypeLister: eventTypeInformer.Lister(),
env: *env,
loggingContext: ctx,
}
impl := controller.NewImpl(r, r.Logger, ReconcilerName)
r.sinkReconciler = duck.NewSinkReconciler(ctx, impl.EnqueueKey)
Expand All @@ -85,6 +88,8 @@ func NewController(
FilterFunc: controller.Filter(v1alpha1.SchemeGroupVersion.WithKind("CronJobSource")),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})
cmw.Watch(logging.ConfigMapName(), r.UpdateFromLoggingConfigMap)
cmw.Watch(metrics.ConfigMapName(), r.UpdateFromMetricsConfigMap)

return impl
}
34 changes: 32 additions & 2 deletions pkg/reconciler/cronjobsource/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
"os"
"testing"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/configmap"
logtesting "knative.dev/pkg/logging/testing"
. "knative.dev/pkg/reconciler/testing"

// Fake injection informers
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventtype/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/sources/v1alpha1/cronjobsource/fake"
Expand Down Expand Up @@ -51,6 +52,15 @@ func TestNew(t *testing.T) {
t.Fatalf("Failed to unset env var: %v", err)
}
}()

if err := os.Setenv("METRICS_DOMAIN", "knative.dev/eventing"); err != nil {
t.Fatalf("Failed to set env var: %v", err)
}
defer func() {
if err := os.Unsetenv("METRICS_DOMAIN"); err != nil {
t.Fatalf("Failed to unset env var: %v", err)
}
}()
} else {
defer func() {
r := recover()
Expand All @@ -61,7 +71,27 @@ func TestNew(t *testing.T) {
}

ctx, _ := SetupFakeContext(t)
c := NewController(ctx, configmap.NewFixedWatcher())
c := NewController(ctx, configmap.NewStaticWatcher(
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-observability",
Namespace: "knative-eventing",
},
Data: map[string]string{
"_example": "test-config",
},
}, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-logging",
Namespace: "knative-eventing",
},
Data: map[string]string{
"zap-logger-config": "test-config",
"loglevel.controller": "info",
"loglevel.webhook": "info",
},
},
))

if c == nil {
t.Fatal("Expected NewController to return a non-nil value")
Expand Down
Loading