From 6932d6089ce6ee84ad0368a4c51e2839611fa7cf Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Wed, 7 Oct 2020 10:42:50 -0400 Subject: [PATCH 1/2] fix crash when pingsource adapter is scaled up without any sources --- config/core/deployments/pingsource-mt-adapter.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/core/deployments/pingsource-mt-adapter.yaml b/config/core/deployments/pingsource-mt-adapter.yaml index 710b6c92b0a..63723bb12cc 100644 --- a/config/core/deployments/pingsource-mt-adapter.yaml +++ b/config/core/deployments/pingsource-mt-adapter.yaml @@ -54,7 +54,7 @@ spec: - name: K_NO_SHUTDOWN_AFTER value: '' - name: K_SINK_TIMEOUT - value: '' + value: '0' - name: POD_NAME valueFrom: fieldRef: From 7530718ddbf7116845813eff05d6889d24e27f53 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Thu, 8 Oct 2020 13:30:48 -0400 Subject: [PATCH 2/2] K_SINK_TIMEOUT now accepts any string value --- .../deployments/pingsource-mt-adapter.yaml | 2 +- pkg/adapter/v2/config.go | 35 ++++++++++++------- pkg/adapter/v2/config_test.go | 23 ++++++++++-- 3 files changed, 43 insertions(+), 17 deletions(-) diff --git a/config/core/deployments/pingsource-mt-adapter.yaml b/config/core/deployments/pingsource-mt-adapter.yaml index 63723bb12cc..db3cccd85f9 100644 --- a/config/core/deployments/pingsource-mt-adapter.yaml +++ b/config/core/deployments/pingsource-mt-adapter.yaml @@ -54,7 +54,7 @@ spec: - name: K_NO_SHUTDOWN_AFTER value: '' - name: K_SINK_TIMEOUT - value: '0' + value: '-1' - name: POD_NAME valueFrom: fieldRef: diff --git a/pkg/adapter/v2/config.go b/pkg/adapter/v2/config.go index f0cffcca53d..ed3e59d1990 100644 --- a/pkg/adapter/v2/config.go +++ b/pkg/adapter/v2/config.go @@ -90,7 +90,10 @@ type EnvConfig struct { LeaderElectionConfigJson string `envconfig:"K_LEADER_ELECTION_CONFIG"` // Time in seconds to wait for sink to respond - EnvSinkTimeout int `envconfig:"K_SINK_TIMEOUT"` + EnvSinkTimeout string `envconfig:"K_SINK_TIMEOUT"` + + // cached zap logger + logger *zap.SugaredLogger } // EnvConfigAccessor defines accessors for the minimal @@ -121,7 +124,7 @@ type EnvConfigAccessor interface { // GetLeaderElectionConfig returns leader election configuration. GetLeaderElectionConfig() (*kle.ComponentConfig, error) - // Get the name of the adapter. + // Get the timeout to apply on a request to a sink GetSinktimeout() int } @@ -141,18 +144,20 @@ func (e *EnvConfig) GetMetricsConfig() (*metrics.ExporterOptions, error) { } func (e *EnvConfig) GetLogger() *zap.SugaredLogger { - loggingConfig, err := logging.JsonToLoggingConfig(e.LoggingConfigJson) - if err != nil { - // Use default logging config. - if loggingConfig, err = logging.NewConfigFromMap(map[string]string{}); err != nil { - // If this fails, there is no recovering. - panic(err) + if e.logger == nil { + loggingConfig, err := logging.JsonToLoggingConfig(e.LoggingConfigJson) + if err != nil { + // Use default logging config. + if loggingConfig, err = logging.NewConfigFromMap(map[string]string{}); err != nil { + // If this fails, there is no recovering. + panic(err) + } } - } - logger, _ := logging.NewLoggerFromConfig(loggingConfig, e.Component) - - return logger + logger, _ := logging.NewLoggerFromConfig(loggingConfig, e.Component) + e.logger = logger + } + return e.logger } func (e *EnvConfig) GetSink() string { @@ -168,7 +173,11 @@ func (e *EnvConfig) GetName() string { } func (e *EnvConfig) GetSinktimeout() int { - return e.EnvSinkTimeout + if duration, err := strconv.Atoi(e.EnvSinkTimeout); err == nil { + return duration + } + e.GetLogger().Warn("Sink timeout configuration is invalid, default to -1 (no timeout)") + return -1 } func (e *EnvConfig) SetupTracing(logger *zap.SugaredLogger) error { diff --git a/pkg/adapter/v2/config_test.go b/pkg/adapter/v2/config_test.go index e29022d7b4b..acfd76dd863 100644 --- a/pkg/adapter/v2/config_test.go +++ b/pkg/adapter/v2/config_test.go @@ -35,8 +35,8 @@ func TestEnvConfig(t *testing.T) { os.Setenv("K_LOGGING_CONFIG", "logging") os.Setenv("K_TRACING_CONFIG", "tracing") os.Setenv("K_LEADER_ELECTION_CONFIG", "leaderelection") - os.Setenv("MODE", "mymode") // note: custom to this test impl - os.Setenv("K_SINK_TIMEOUT", "999") // note: custom to this test impl + os.Setenv("K_SINK_TIMEOUT", "999") + os.Setenv("MODE", "mymode") // note: custom to this test impl defer func() { os.Unsetenv("K_SINK") @@ -70,8 +70,25 @@ func TestEnvConfig(t *testing.T) { if sinkTimeout := GetSinkTimeout(nil); sinkTimeout != 999 { t.Error("Expected GetSinkTimeout to be 999, got:", sinkTimeout) } - if env.EnvSinkTimeout != 999 { + + if env.EnvSinkTimeout != "999" { t.Error("Expected env.EnvSinkTimeout to be 999, got:", env.EnvSinkTimeout) } +} +func TestEmptySinkTimeout(t *testing.T) { + os.Setenv("K_SINK_TIMEOUT", "") + defer func() { + os.Unsetenv("K_SINK_TIMEOUT") + }() + + var env myEnvConfig + err := envconfig.Process("", &env) + if err != nil { + t.Error("Expected no error:", err) + } + + if env.GetSinktimeout() != -1 { + t.Error("Expected env.EnvSinkTimeout to be -1, got:", env.GetSinktimeout()) + } }