Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion cmd/autoscaler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ go_library(
importpath = "github.com/knative/serving/cmd/autoscaler",
visibility = ["//visibility:private"],
deps = [
"//cmd/util:go_default_library",
"//pkg/apis/serving/v1alpha1:go_default_library",
"//pkg/autoscaler:go_default_library",
"//pkg/client/clientset/versioned:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//pkg/logging:go_default_library",
"//pkg/logging/logkey:go_default_library",
"//vendor/github.com/gorilla/websocket:go_default_library",
"//vendor/github.com/josephburnett/k8sflag/pkg/k8sflag:go_default_library",
"//vendor/go.opencensus.io/exporter/prometheus:go_default_library",
"//vendor/go.opencensus.io/stats/view:go_default_library",
"//vendor/go.uber.org/zap:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
Expand Down
112 changes: 51 additions & 61 deletions cmd/autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,24 @@ package main

import (
"bytes"
"context"
"encoding/gob"
"flag"
"log"
"net/http"
"os"
"time"

"go.opencensus.io/exporter/prometheus"
"go.opencensus.io/stats/view"
"go.uber.org/zap"

"github.com/josephburnett/k8sflag/pkg/k8sflag"
"github.com/knative/serving/cmd/util"
"github.com/knative/serving/pkg/apis/serving/v1alpha1"
ela_autoscaler "github.com/knative/serving/pkg/autoscaler"
clientset "github.com/knative/serving/pkg/client/clientset/versioned"
"github.com/knative/serving/pkg/logging"
"github.com/knative/serving/pkg/logging/logkey"
"github.com/josephburnett/k8sflag/pkg/k8sflag"

"github.com/golang/glog"
"github.com/gorilla/websocket"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -64,46 +66,24 @@ var (
elaConfig string
elaRevision string
elaAutoscalerPort string
logger *zap.SugaredLogger

// Revision-level configuration
concurrencyModel = flag.String("concurrencyModel", string(v1alpha1.RevisionRequestConcurrencyModelMulti), "")

// Cluster-level configuration
enableScaleToZero = k8sflag.Bool("autoscale.enable-scale-to-zero", false)
multiConcurrencyTarget = k8sflag.Float64("autoscale.multi-concurrency-target", 0.0, k8sflag.Required)
singleConcurrencyTarget = k8sflag.Float64("autoscale.single-concurrency-target", 0.0, k8sflag.Required)
autoscaleFlagSet = k8sflag.NewFlagSet("/etc/config-autoscaler")
enableScaleToZero = autoscaleFlagSet.Bool("enable-scale-to-zero", false)
multiConcurrencyTarget = autoscaleFlagSet.Float64("multi-concurrency-target", 0.0, k8sflag.Required)
singleConcurrencyTarget = autoscaleFlagSet.Float64("single-concurrency-target", 0.0, k8sflag.Required)
)

func init() {
elaNamespace = os.Getenv("ELA_NAMESPACE")
if elaNamespace == "" {
glog.Fatal("No ELA_NAMESPACE provided.")
}
glog.Infof("ELA_NAMESPACE=%v", elaNamespace)

elaDeployment = os.Getenv("ELA_DEPLOYMENT")
if elaDeployment == "" {
glog.Fatal("No ELA_DEPLOYMENT provided.")
}
glog.Infof("ELA_DEPLOYMENT=%v", elaDeployment)

elaConfig = os.Getenv("ELA_CONFIGURATION")
if elaConfig == "" {
glog.Fatal("No ELA_CONFIGURATION provided.")
}
glog.Infof("ELA_CONFIGURATION=%v", elaConfig)

elaRevision = os.Getenv("ELA_REVISION")
if elaRevision == "" {
glog.Fatal("No ELA_REVISION provided.")
}
glog.Infof("ELA_REVISION=%v", elaRevision)

elaAutoscalerPort = os.Getenv("ELA_AUTOSCALER_PORT")
if elaAutoscalerPort == "" {
glog.Fatal("No ELA_AUTOSCALER_PORT provided.")
}
glog.Infof("ELA_AUTOSCALER_PORT=%v", elaAutoscalerPort)
func initEnv() {
elaNamespace = util.GetRequiredEnvOrFatal("ELA_NAMESPACE", logger)
elaDeployment = util.GetRequiredEnvOrFatal("ELA_DEPLOYMENT", logger)
elaConfig = util.GetRequiredEnvOrFatal("ELA_CONFIGURATION", logger)
elaRevision = util.GetRequiredEnvOrFatal("ELA_REVISION", logger)
elaAutoscalerPort = util.GetRequiredEnvOrFatal("ELA_AUTOSCALER_PORT", logger)
}

func autoscaler() {
Expand All @@ -114,22 +94,23 @@ func autoscaler() {
case string(v1alpha1.RevisionRequestConcurrencyModelMulti):
targetConcurrency = multiConcurrencyTarget
default:
log.Fatalf("Unrecognized concurrency model: " + *concurrencyModel)
logger.Fatalf("Unrecognized concurrency model: " + *concurrencyModel)
}
config := ela_autoscaler.Config{
TargetConcurrency: targetConcurrency,
MaxScaleUpRate: k8sflag.Float64("autoscale.max-scale-up-rate", 0.0, k8sflag.Required),
StableWindow: k8sflag.Duration("autoscale.stable-window", nil, k8sflag.Required),
PanicWindow: k8sflag.Duration("autoscale.panic-window", nil, k8sflag.Required),
ScaleToZeroThreshold: k8sflag.Duration("autoscale.scale-to-zero-threshold", nil, k8sflag.Required, k8sflag.Dynamic),
MaxScaleUpRate: autoscaleFlagSet.Float64("max-scale-up-rate", 0.0, k8sflag.Required),
StableWindow: autoscaleFlagSet.Duration("stable-window", nil, k8sflag.Required),
PanicWindow: autoscaleFlagSet.Duration("panic-window", nil, k8sflag.Required),
ScaleToZeroThreshold: autoscaleFlagSet.Duration("scale-to-zero-threshold", nil, k8sflag.Required, k8sflag.Dynamic),
}
a := ela_autoscaler.NewAutoscaler(config, statsReporter)
ticker := time.NewTicker(2 * time.Second)
ctx := logging.WithLogger(context.TODO(), logger)

for {
select {
case <-ticker.C:
scale, ok := a.Scale(time.Now())
scale, ok := a.Scale(ctx, time.Now())
if ok {
// Flag guard scale to zero.
if !enableScaleToZero.Get() && scale == 0 {
Expand All @@ -144,7 +125,7 @@ func autoscaler() {
}
}
case s := <-statChan:
a.Record(s)
a.Record(ctx, s)
}
}
}
Expand All @@ -161,7 +142,7 @@ func scaleSerializer() {
for {
select {
case p := <-scaleChan:
glog.Warning("Scaling is not keeping up with autoscaling requests.")
logger.Info("Scaling is not keeping up with autoscaling requests.")
desiredPodCount = p
default:
break FastForward
Expand All @@ -176,10 +157,10 @@ func scaleTo(podCount int32) {
dc := kubeClient.ExtensionsV1beta1().Deployments(elaNamespace)
deployment, err := dc.Get(elaDeployment, metav1.GetOptions{})
if err != nil {
glog.Error("Error getting Deployment %q: %s", elaDeployment, err)
logger.Error("Error getting Deployment %q: %s", elaDeployment, zap.Error(err))
return
}
glog.Infof("===SCALE=== %v %v %v %v %v",
logger.Debugf("===SCALE=== %v %v %v %v %v",
time.Now().Unix(),
podCount,
deployment.Status.Replicas,
Expand All @@ -193,33 +174,33 @@ func scaleTo(podCount int32) {
return
}

glog.Infof("Scaling to %v", podCount)
logger.Infof("Scaling to %v", podCount)
if podCount == 0 {
revisionClient := elaClient.ServingV1alpha1().Revisions(elaNamespace)
revision, err := revisionClient.Get(elaRevision, metav1.GetOptions{})

if err != nil {
glog.Errorf("Error getting Revision %q: %s", elaRevision, err)
logger.Errorf("Error getting Revision %q: %s", elaRevision, zap.Error(err))
}
revision.Spec.ServingState = v1alpha1.RevisionServingStateReserve
revision, err = revisionClient.Update(revision)
if err != nil {
glog.Errorf("Error updating Revision %q: %s", elaRevision, err)
logger.Errorf("Error updating Revision %q: %s", elaRevision, zap.Error(err))
}

}
deployment.Spec.Replicas = &podCount
_, err = dc.Update(deployment)
if err != nil {
glog.Errorf("Error updating Deployment %q: %s", elaDeployment, err)
logger.Errorf("Error updating Deployment %q: %s", elaDeployment, err)
}
glog.Info("Successfully scaled.")
logger.Info("Successfully scaled.")
}

func handler(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
glog.Error(err)
logger.Error("Failed to upgrade http connection to websocket", zap.Error(err))
return
}
for {
Expand All @@ -228,14 +209,14 @@ func handler(w http.ResponseWriter, r *http.Request) {
return
}
if messageType != websocket.BinaryMessage {
glog.Error("Dropping non-binary message.")
logger.Error("Dropping non-binary message.")
continue
}
dec := gob.NewDecoder(bytes.NewBuffer(msg))
var stat ela_autoscaler.Stat
err = dec.Decode(&stat)
if err != nil {
glog.Error(err)
logger.Error("Failed to decode stats", zap.Error(err))
continue
}
statChan <- stat
Expand All @@ -244,33 +225,42 @@ func handler(w http.ResponseWriter, r *http.Request) {

func main() {
flag.Parse()
glog.Info("Autoscaler up")
logger = logging.NewLoggerFromDefaultConfigMap("loglevel.autoscaler").Named("ela-autoscaler")
defer logger.Sync()

initEnv()
logger = logger.With(
zap.String(logkey.Namespace, elaNamespace),
zap.String(logkey.Configuration, elaConfig),
zap.String(logkey.Revision, elaRevision))

logger.Info("Starting autoscaler")
config, err := rest.InClusterConfig()
if err != nil {
glog.Fatal(err)
logger.Fatal("Failed to get in cluster configuration", zap.Error(err))
}
config.Timeout = time.Duration(5 * time.Second)
kc, err := kubernetes.NewForConfig(config)
if err != nil {
glog.Fatal(err)
logger.Fatal("Failed to create a new clientset", zap.Error(err))
}
kubeClient = kc
ec, err := clientset.NewForConfig(config)
if err != nil {
glog.Fatal(err)
logger.Fatal("Failed to create a new clientset", zap.Error(err))
}
elaClient = ec

exporter, err := prometheus.NewExporter(prometheus.Options{Namespace: "autoscaler"})
if err != nil {
glog.Fatal(err)
logger.Fatal("Failed to create prometheus exporter", zap.Error(err))
}
view.RegisterExporter(exporter)
view.SetReportingPeriod(1 * time.Second)

reporter, err := ela_autoscaler.NewStatsReporter(elaNamespace, elaConfig, elaRevision)
if err != nil {
glog.Fatal(err)
logger.Fatal("Failed to create stats reporter", zap.Error(err))
}
statsReporter = reporter

Expand Down
28 changes: 18 additions & 10 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,25 @@ var (
queueSidecarImage string
autoscalerImage string

autoscaleConcurrencyQuantumOfTime = k8sflag.Duration("autoscale.concurrency-quantum-of-time", nil, k8sflag.Required)
autoscaleEnableScaleToZero = k8sflag.Bool("autoscale.enable-scale-to-zero", false)
autoscaleEnableSingleConcurrency = k8sflag.Bool("autoscale.enable-single-concurrency", false)

loggingEnableVarLogCollection = k8sflag.Bool("logging.enable-var-log-collection", false)
loggingFluentSidecarImage = k8sflag.String("logging.fluentd-sidecar-image", "")
loggingFluentSidecarOutputConfig = k8sflag.String("logging.fluentd-sidecar-output-config", "")
loggingURLTemplate = k8sflag.String("logging.revision-url-template", "")
loggingZapCfg = k8sflag.String("logging.ela-controller.zap-config", "")
autoscaleFlagSet = k8sflag.NewFlagSet("/etc/config-autoscaler")
autoscaleConcurrencyQuantumOfTime = autoscaleFlagSet.Duration("concurrency-quantum-of-time", nil, k8sflag.Required)
autoscaleEnableScaleToZero = autoscaleFlagSet.Bool("enable-scale-to-zero", false)
autoscaleEnableSingleConcurrency = autoscaleFlagSet.Bool("enable-single-concurrency", false)

observabilityFlagSet = k8sflag.NewFlagSet("/etc/config-observability")
loggingEnableVarLogCollection = observabilityFlagSet.Bool("logging.enable-var-log-collection", false)
loggingFluentSidecarImage = observabilityFlagSet.String("logging.fluentd-sidecar-image", "")
loggingFluentSidecarOutputConfig = observabilityFlagSet.String("logging.fluentd-sidecar-output-config", "")
loggingURLTemplate = observabilityFlagSet.String("logging.revision-url-template", "")

loggingFlagSet = k8sflag.NewFlagSet("/etc/config-logging")
zapConfig = loggingFlagSet.String("zap-logger-config", "")
queueProxyLoggingLevel = loggingFlagSet.String("loglevel.queueproxy", "")
)

func main() {
flag.Parse()
logger := logging.NewLogger(loggingZapCfg.Get()).Named("ela-controller")
logger := logging.NewLoggerFromDefaultConfigMap("loglevel.controller").Named("ela-controller")
defer logger.Sync()

if loggingEnableVarLogCollection.Get() {
Expand Down Expand Up @@ -139,6 +144,9 @@ func main() {
FluentdSidecarImage: loggingFluentSidecarImage.Get(),
FluentdSidecarOutputConfig: loggingFluentSidecarOutputConfig.Get(),
LoggingURLTemplate: loggingURLTemplate.Get(),

QueueProxyLoggingConfig: zapConfig.Get(),
QueueProxyLoggingLevel: queueProxyLoggingLevel.Get(),
}

// Build all of our controllers, with the clients constructed above.
Expand Down
5 changes: 4 additions & 1 deletion cmd/queue/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ go_library(
importpath = "github.com/knative/serving/cmd/queue",
visibility = ["//visibility:private"],
deps = [
"//cmd/util:go_default_library",
"//pkg/apis/serving/v1alpha1:go_default_library",
"//pkg/autoscaler:go_default_library",
"//pkg/logging:go_default_library",
"//pkg/logging/logkey:go_default_library",
"//pkg/queue:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/gorilla/websocket:go_default_library",
"//vendor/go.uber.org/zap:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
],
Expand Down
Loading