Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

84 changes: 37 additions & 47 deletions cmd/apiserver_receive_adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package main

import (
"context"
"flag"
"fmt"
"log"
Expand All @@ -34,17 +33,15 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
"knative.dev/eventing/pkg/adapter/apiserver"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/tracing"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/signals"
"knative.dev/pkg/source"
)

const (
component = "apiserversource"
"knative.dev/eventing/pkg/adapter"
"knative.dev/eventing/pkg/adapter/apiserver"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/tracing"
)

var (
Expand All @@ -63,44 +60,30 @@ func (s *StringList) Decode(value string) error {
}

type envConfig struct {
Namespace string `envconfig:"SYSTEM_NAMESPACE" default:"default"`
adapter.EnvConfig

Mode string `envconfig:"MODE"`
SinkURI string `split_words:"true" required:"true"`
ApiVersion StringList `split_words:"true" required:"true"`
Kind StringList `required:"true"`
Controller []bool `required:"true"`
LabelSelector StringList `envconfig:"SELECTOR" required:"true"`
OwnerApiVersion StringList `envconfig:"OWNER_API_VERSION" required:"true"`
OwnerKind StringList `envconfig:"OWNER_KIND" required:"true"`
Name string `envconfig:"NAME" required:"true"`
// MetricsConfigJson is a json string of metrics.ExporterOptions.
// This is used to configure the metrics exporter options,
// the config is stored in a config map inside the controllers
// namespace and copied here.
MetricsConfigJson string `envconfig:"K_METRICS_CONFIG" required:"true"`

// LoggingConfigJson is a json string of logging.Config.
// This is used to configure the logging config, the config is stored in
// a config map inside the controllers namespace and copied here.
LoggingConfigJson string `envconfig:"K_LOGGING_CONFIG" required:"true"`
}

// TODO: the controller should take the list of GVR
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we lost this TODO; it looks like there's a dynamic list in the current code. I'm not sure what the original intent of this TODO was, though. Maybe it's obsolete?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@n3wscott , who left this comment in #1175

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is being tracked here #1660 instead of the TODO. Is that alright?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that an issue is IMO better instead of a TODO, @lionelvillard

const (
component = "apiserversource"
)

func main() {
flag.Parse()

var env envConfig
err := envconfig.Process("", &env)
if err != nil {
panic(fmt.Sprintf("Error processing env var: %s", err))
}
ctx := signals.NewContext()

// Report stats on Go memory usage every 30 seconds.
msp := metrics.NewMemStatsAll()
msp.Start(context.Background(), 30*time.Second)
if err := view.Register(msp.DefaultViews()...); err != nil {
log.Fatalf("Error exporting go memstats view: %v", err)
var env envConfig
if err := envconfig.Process("", &env); err != nil {
log.Fatalf("Error processing env var: %s", err)
}

// Convert json logging.Config to logging.Config.
Expand All @@ -116,6 +99,14 @@ func main() {
loggerSugared, _ := logging.NewLoggerFromConfig(loggingConfig, component)
logger := loggerSugared.Desugar()
defer flush(loggerSugared)
ctx = logging.WithLogger(ctx, loggerSugared)

// Report stats on Go memory usage every 30 seconds.
msp := metrics.NewMemStatsAll()
msp.Start(ctx, 30*time.Second)
if err := view.Register(msp.DefaultViews()...); err != nil {
logger.Fatal("Error exporting go memstats view: %v", zap.Error(err))
}

// Convert json metrics.ExporterOptions to metrics.ExporterOptions.
metricsConfig, err := metrics.JsonToMetricsOptions(env.MetricsConfigJson)
Expand All @@ -132,31 +123,29 @@ func main() {
logger.Error("error building statsreporter", zap.Error(err))
}

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
if err = tracing.SetupStaticPublishing(loggerSugared, "", tracing.OnePercentSampling); err != nil {
// If tracing doesn't work, we will log an error, but allow the adapter
// to continue to start.
logger.Error("Error setting up trace publishing", zap.Error(err))
}

eventsClient, err := kncloudevents.NewDefaultClient(env.SinkURI)
if err != nil {
logger.Fatal("error building cloud event client", zap.Error(err))
}

// Configuring the adapter

cfg, err := clientcmd.BuildConfigFromFlags(*masterURL, *kubeconfig)
if err != nil {
logger.Fatal("error building kubeconfig", zap.Error(err))
}

logger.Info("Starting the controller")
client, err := dynamic.NewForConfig(cfg)
if err != nil {
logger.Fatal("error building dynamic client", zap.Error(err))
}

if err = tracing.SetupStaticPublishing(loggerSugared, "", tracing.OnePercentSampling); err != nil {
// If tracing doesn't work, we will log an error, but allow the source
// to continue to start.
logger.Error("Error setting up trace publishing", zap.Error(err))
}

eventsClient, err := kncloudevents.NewDefaultClient(env.SinkURI)
if err != nil {
logger.Fatal("error building cloud event client", zap.Error(err))
}

gvrcs := []apiserver.GVRC(nil)

for i, apiVersion := range env.ApiVersion {
Expand Down Expand Up @@ -187,9 +176,10 @@ func main() {
GVRCs: gvrcs,
}

a := apiserver.NewAdaptor(cfg.Host, client, eventsClient, loggerSugared, opt, reporter, env.Name)
logger.Info("starting kubernetes api adapter", zap.Any("adapter", env))
if err := a.Start(stopCh); err != nil {
adapter := apiserver.NewAdaptor(cfg.Host, client, eventsClient, loggerSugared, opt, reporter, env.Name)
logger.Info("Starting Receive Adapter", zap.Any("adapter", adapter))

if err := adapter.Start(ctx.Done()); err != nil {
logger.Warn("start returned an error", zap.Error(err))
}
}
Expand Down
54 changes: 22 additions & 32 deletions cmd/cronjob_receive_adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,41 +25,28 @@ import (
"github.com/kelseyhightower/envconfig"
"go.opencensus.io/stats/view"
"go.uber.org/zap"
"golang.org/x/net/context"
"knative.dev/eventing/pkg/adapter/cronjobevents"
"knative.dev/eventing/pkg/tracing"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/signals"
"knative.dev/pkg/source"

"knative.dev/eventing/pkg/adapter"
"knative.dev/eventing/pkg/adapter/cronjobevents"
"knative.dev/eventing/pkg/kncloudevents"
)

type envConfig struct {
adapter.EnvConfig

// Environment variable container schedule.
Schedule string `envconfig:"SCHEDULE" required:"true"`

// Environment variable containing data.
Data string `envconfig:"DATA" required:"true"`

// Sink for messages.
Sink string `envconfig:"SINK_URI" required:"true"`

// Environment variable containing the name of the cron job.
Name string `envconfig:"NAME" required:"true"`

// Environment variable containing the namespace of the cron job.
Namespace string `envconfig:"NAMESPACE" required:"true"`

// MetricsConfigJson is a json string of metrics.ExporterOptions.
// This is used to configure the metrics exporter options,
// the config is stored in a config map inside the controllers
// namespace and copied here.
MetricsConfigJson string `envconfig:"K_METRICS_CONFIG" required:"true"`

// LoggingConfigJson is a json string of logging.Config.
// This is used to configure the logging config, the config is stored in
// a config map inside the controllers namespace and copied here.
LoggingConfigJson string `envconfig:"K_LOGGING_CONFIG" required:"true"`
}

const (
Expand All @@ -69,11 +56,11 @@ const (
func main() {
flag.Parse()

ctx := context.Background()
ctx := signals.NewContext()

var env envConfig
err := envconfig.Process("", &env)
if err != nil {
panic(fmt.Sprintf("Error processing env var: %s", err))
if err := envconfig.Process("", &env); err != nil {
log.Fatalf("Error processing env var: %s", err)
}

// Report stats on Go memory usage every 30 seconds.
Expand Down Expand Up @@ -107,33 +94,36 @@ func main() {
logger.Error("failed to create the metrics exporter", zap.Error(err))
}

if err := envconfig.Process("", &env); err != nil {
log.Fatal("Failed to process env var", zap.Error(err))
}
reporter, err := source.NewStatsReporter()
if err != nil {
logger.Error("error building statsreporter", zap.Error(err))
}

if err = tracing.SetupStaticPublishing(loggerSugared, "", tracing.OnePercentSampling); err != nil {
// If tracing doesn't work, we will log an error, but allow the importer to continue to
// start.
// If tracing doesn't work, we will log an error, but allow the adapter
// to continue to start.
logger.Error("Error setting up trace publishing", zap.Error(err))
}

eventsClient, err := kncloudevents.NewDefaultClient(env.SinkURI)
if err != nil {
logger.Fatal("error building cloud event client", zap.Error(err))
}

// Configuring the adapter

adapter := &cronjobevents.Adapter{
Schedule: env.Schedule,
Data: env.Data,
SinkURI: env.Sink,
Name: env.Name,
Namespace: env.Namespace,
Reporter: reporter,
Client: eventsClient,
}

logger.Info("Starting Receive Adapter", zap.Any("adapter", adapter))

stopCh := signals.SetupSignalHandler()

if err := adapter.Start(ctx, stopCh); err != nil {
if err := adapter.Start(ctx, ctx.Done()); err != nil {
logger.Fatal("Failed to start adapter", zap.Error(err))
}
}
Expand Down
37 changes: 37 additions & 0 deletions pkg/adapter/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package adapter

// EnvConfig is the minimal set of configuration parameters
// source adapters should support
type EnvConfig struct {
// SinkURI is the URI messages will be forwarded to.
SinkURI string `envconfig:"SINK_URI" required:"true"`

// Environment variable containing the namespace of the adapter
Namespace string `envconfig:"NAMESPACE" required:"true"`

// MetricsConfigJson is a json string of metrics.ExporterOptions.
// This is used to configure the metrics exporter options,
// the config is stored in a config map inside the controllers
// namespace and copied here.
MetricsConfigJson string `envconfig:"K_METRICS_CONFIG" required:"true"`

// LoggingConfigJson is a json string of logging.Config.
// This is used to configure the logging config, the config is stored in
// a config map inside the controllers namespace and copied here.
LoggingConfigJson string `envconfig:"K_LOGGING_CONFIG" required:"true"`
}
51 changes: 51 additions & 0 deletions pkg/adapter/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package adapter

import (
"os"
"testing"

"github.com/kelseyhightower/envconfig"
)

type myEnvConfig struct {
EnvConfig

Mode string `envconfig:"MODE"`
}

func TestEnvConfig(t *testing.T) {
os.Setenv("SINK_URI", "http://sink")
os.Setenv("NAMESPACE", "ns")
os.Setenv("K_METRICS_CONFIG", "metrics")
os.Setenv("K_LOGGING_CONFIG", "logging")
os.Setenv("MODE", "mymode")

var env myEnvConfig
err := envconfig.Process("", &env)
if err != nil {
t.Errorf("Expected no error: %v", err)
}

if env.Mode != "mymode" {
t.Errorf("Expected mode mymode, got: %s", env.Mode)
}

if env.SinkURI != "http://sink" {
t.Errorf("Expected sinkURI http://sink, got: %s", env.SinkURI)
}
}
Loading