Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 10 additions & 51 deletions cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,17 @@ package main

import (
"flag"
"fmt"
"log"
"net/http"
"sync"
"time"

"contrib.go.opencensus.io/exporter/prometheus"
"github.com/kelseyhightower/envconfig"
"go.opencensus.io/stats/view"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"k8s.io/client-go/kubernetes"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/broker"
"knative.dev/eventing/pkg/provisioners"
"knative.dev/eventing/pkg/tracing"
"knative.dev/eventing/pkg/utils"
"knative.dev/pkg/configmap"
"knative.dev/pkg/metrics"
"knative.dev/pkg/signals"
"knative.dev/pkg/system"
"sigs.k8s.io/controller-runtime/pkg/client/config"
Expand All @@ -47,21 +40,11 @@ type envConfig struct {
Namespace string `envconfig:"NAMESPACE" required:"true"`
}

var (
metricsPort = 9090

writeTimeout = 1 * time.Minute
shutdownTimeout = 1 * time.Minute

wg sync.WaitGroup
)

func main() {
logConfig := provisioners.NewLoggingConfig()
logConfig.LoggingLevel["provisioner"] = zapcore.DebugLevel
logger := provisioners.NewProvisionerLoggerFromConfig(logConfig).Desugar()
defer logger.Sync()

defer flush(logger)
flag.Parse()

logger.Info("Starting...")
Expand Down Expand Up @@ -101,32 +84,13 @@ func main() {
}
err = mgr.Add(receiver)
if err != nil {
logger.Fatal("Unable to start the receiver", zap.Error(err), zap.Any("receiver", receiver))
logger.Fatal("Unable to start the receiver", zap.Error(err), zap.Any("broker_receiver", receiver))
}

// Metrics
e, err := prometheus.NewExporter(prometheus.Options{})
if err != nil {
logger.Fatal("Unable to create Prometheus exporter", zap.Error(err))
}
view.RegisterExporter(e)
sm := http.NewServeMux()
sm.Handle("/metrics", e)
metricsSrv := &http.Server{
Addr: fmt.Sprintf(":%d", metricsPort),
Handler: e,
ErrorLog: zap.NewStdLog(logger),
WriteTimeout: writeTimeout,
}
// TODO watch logging config map.

err = mgr.Add(&utils.RunnableServer{
Server: metricsSrv,
ShutdownTimeout: shutdownTimeout,
WaitGroup: &wg,
})
if err != nil {
logger.Fatal("Unable to add metrics runnableServer", zap.Error(err))
}
// Watch the observability config map and dynamically update metrics exporter.
configMapWatcher.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_receiver", logger.Sugar()))

// Set up signals so we handle the first shutdown signal gracefully.
stopCh := signals.SetupSignalHandler()
Expand All @@ -143,14 +107,9 @@ func main() {
logger.Fatal("Manager.Start() returned an error", zap.Error(err))
}
logger.Info("Exiting...")
}

go func() {
<-time.After(shutdownTimeout)
log.Fatalf("Shutdown took longer than %v", shutdownTimeout)
}()

// Wait for runnables to stop. This blocks indefinitely, but the above
// goroutine will exit the process if it takes longer than shutdownTimeout.
wg.Wait()
logger.Info("Done.")
func flush(logger *zap.Logger) {
logger.Sync()
metrics.FlushExporter()
}
59 changes: 9 additions & 50 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,23 @@ package main

import (
"flag"
"fmt"
"log"
"net/http"
"net/url"
"sync"
"time"

// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

"contrib.go.opencensus.io/exporter/prometheus"
cloudevents "github.com/cloudevents/sdk-go"
cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http"
"github.com/kelseyhightower/envconfig"
"go.opencensus.io/stats/view"
"go.uber.org/zap"
"k8s.io/client-go/kubernetes"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/broker/ingress"
"knative.dev/eventing/pkg/provisioners"
"knative.dev/eventing/pkg/tracing"
"knative.dev/eventing/pkg/utils"
"knative.dev/pkg/configmap"
"knative.dev/pkg/metrics"
"knative.dev/pkg/signals"
"knative.dev/pkg/system"
pkgtracing "knative.dev/pkg/tracing"
Expand All @@ -55,19 +49,10 @@ type envConfig struct {
Namespace string `envconfig:"NAMESPACE" required:"true"`
}

var (
metricsPort = 9090

writeTimeout = 1 * time.Minute
shutdownTimeout = 1 * time.Minute

wg sync.WaitGroup
)

func main() {
logConfig := provisioners.NewLoggingConfig()
logger := provisioners.NewProvisionerLoggerFromConfig(logConfig).Desugar()
defer logger.Sync()
defer flush(logger)
flag.Parse()
crlog.SetLogger(crlog.ZapLogger(false))

Expand Down Expand Up @@ -132,29 +117,10 @@ func main() {
logger.Fatal("Unable to add handler", zap.Error(err))
}

// Metrics
e, err := prometheus.NewExporter(prometheus.Options{})
if err != nil {
logger.Fatal("Unable to create Prometheus exporter", zap.Error(err))
}
view.RegisterExporter(e)
sm := http.NewServeMux()
sm.Handle("/metrics", e)
metricsSrv := &http.Server{
Addr: fmt.Sprintf(":%d", metricsPort),
Handler: e,
ErrorLog: zap.NewStdLog(logger),
WriteTimeout: writeTimeout,
}
// TODO watch logging config map.

err = mgr.Add(&utils.RunnableServer{
Server: metricsSrv,
ShutdownTimeout: shutdownTimeout,
WaitGroup: &wg,
})
if err != nil {
logger.Fatal("Unable to add metrics runnableServer", zap.Error(err))
}
// Watch the observability config map and dynamically update metrics exporter.
configMapWatcher.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_ingress", logger.Sugar()))

// Set up signals so we handle the first shutdown signal gracefully.
stopCh := signals.SetupSignalHandler()
Expand All @@ -169,16 +135,9 @@ func main() {
logger.Error("manager.Start() returned an error", zap.Error(err))
}
logger.Info("Exiting...")
}

// TODO Gracefully shutdown the ingress server. CloudEvents SDK doesn't seem
// to let us do that today.
go func() {
<-time.After(shutdownTimeout)
log.Fatalf("Shutdown took longer than %v", shutdownTimeout)
}()

// Wait for runnables to stop. This blocks indefinitely, but the above
// goroutine will exit the process if it takes longer than shutdownTimeout.
wg.Wait()
logger.Info("Done.")
func flush(logger *zap.Logger) {
logger.Sync()
metrics.FlushExporter()
}
8 changes: 8 additions & 0 deletions pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,10 @@ func envVars(containerName string) []corev1.EnvVar {
Name: "BROKER",
Value: brokerName,
},
{
Name: "METRICS_DOMAIN",
Value: "knative.dev/eventing",
},
}
case ingressContainerName:
return []corev1.EnvVar{
Expand Down Expand Up @@ -911,6 +915,10 @@ func envVars(containerName string) []corev1.EnvVar {
Name: "BROKER",
Value: brokerName,
},
{
Name: "METRICS_DOMAIN",
Value: "knative.dev/eventing",
},
}
}
return []corev1.EnvVar{}
Expand Down
4 changes: 4 additions & 0 deletions pkg/reconciler/broker/resources/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ func MakeFilterDeployment(args *FilterArgs) *appsv1.Deployment {
Name: "BROKER",
Value: args.Broker.Name,
},
{
Name: "METRICS_DOMAIN",
Value: "knative.dev/eventing",
},
},
Ports: []corev1.ContainerPort{
{
Expand Down
4 changes: 4 additions & 0 deletions pkg/reconciler/broker/resources/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ func MakeIngress(args *IngressArgs) *appsv1.Deployment {
Name: "BROKER",
Value: args.Broker.Name,
},
{
Name: "METRICS_DOMAIN",
Value: "knative.dev/eventing",
},
},
Ports: []corev1.ContainerPort{
{
Expand Down