Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/core/deployments/pingsource-mt-adapter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ spec:
- name: K_NO_SHUTDOWN_AFTER
value: ''
- name: K_SINK_TIMEOUT
value: ''
value: '-1'
- name: POD_NAME
valueFrom:
fieldRef:
Expand Down
35 changes: 22 additions & 13 deletions pkg/adapter/v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ type EnvConfig struct {
LeaderElectionConfigJson string `envconfig:"K_LEADER_ELECTION_CONFIG"`

// Time in seconds to wait for sink to respond
EnvSinkTimeout int `envconfig:"K_SINK_TIMEOUT"`
EnvSinkTimeout string `envconfig:"K_SINK_TIMEOUT"`

// cached zap logger
logger *zap.SugaredLogger
}

// EnvConfigAccessor defines accessors for the minimal
Expand Down Expand Up @@ -121,7 +124,7 @@ type EnvConfigAccessor interface {
// GetLeaderElectionConfig returns leader election configuration.
GetLeaderElectionConfig() (*kle.ComponentConfig, error)

// Get the name of the adapter.
// Get the timeout to apply on a request to a sink
GetSinktimeout() int
}

Expand All @@ -141,18 +144,20 @@ func (e *EnvConfig) GetMetricsConfig() (*metrics.ExporterOptions, error) {
}

func (e *EnvConfig) GetLogger() *zap.SugaredLogger {
loggingConfig, err := logging.JsonToLoggingConfig(e.LoggingConfigJson)
if err != nil {
// Use default logging config.
if loggingConfig, err = logging.NewConfigFromMap(map[string]string{}); err != nil {
// If this fails, there is no recovering.
panic(err)
if e.logger == nil {
loggingConfig, err := logging.JsonToLoggingConfig(e.LoggingConfigJson)
if err != nil {
// Use default logging config.
if loggingConfig, err = logging.NewConfigFromMap(map[string]string{}); err != nil {
// If this fails, there is no recovering.
panic(err)
}
}
}

logger, _ := logging.NewLoggerFromConfig(loggingConfig, e.Component)

return logger
logger, _ := logging.NewLoggerFromConfig(loggingConfig, e.Component)
e.logger = logger
}
return e.logger
}

func (e *EnvConfig) GetSink() string {
Expand All @@ -168,7 +173,11 @@ func (e *EnvConfig) GetName() string {
}

func (e *EnvConfig) GetSinktimeout() int {
return e.EnvSinkTimeout
if duration, err := strconv.Atoi(e.EnvSinkTimeout); err == nil {
return duration
}
e.GetLogger().Warn("Sink timeout configuration is invalid, default to -1 (no timeout)")
return -1
}

func (e *EnvConfig) SetupTracing(logger *zap.SugaredLogger) error {
Expand Down
23 changes: 20 additions & 3 deletions pkg/adapter/v2/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func TestEnvConfig(t *testing.T) {
os.Setenv("K_LOGGING_CONFIG", "logging")
os.Setenv("K_TRACING_CONFIG", "tracing")
os.Setenv("K_LEADER_ELECTION_CONFIG", "leaderelection")
os.Setenv("MODE", "mymode") // note: custom to this test impl
os.Setenv("K_SINK_TIMEOUT", "999") // note: custom to this test impl
os.Setenv("K_SINK_TIMEOUT", "999")
os.Setenv("MODE", "mymode") // note: custom to this test impl

defer func() {
os.Unsetenv("K_SINK")
Expand Down Expand Up @@ -70,8 +70,25 @@ func TestEnvConfig(t *testing.T) {
if sinkTimeout := GetSinkTimeout(nil); sinkTimeout != 999 {
t.Error("Expected GetSinkTimeout to be 999, got:", sinkTimeout)
}
if env.EnvSinkTimeout != 999 {

if env.EnvSinkTimeout != "999" {
t.Error("Expected env.EnvSinkTimeout to be 999, got:", env.EnvSinkTimeout)
}
}

func TestEmptySinkTimeout(t *testing.T) {
os.Setenv("K_SINK_TIMEOUT", "")
defer func() {
os.Unsetenv("K_SINK_TIMEOUT")
}()

var env myEnvConfig
err := envconfig.Process("", &env)
if err != nil {
t.Error("Expected no error:", err)
}

if env.GetSinktimeout() != -1 {
t.Error("Expected env.EnvSinkTimeout to be -1, got:", env.GetSinktimeout())
}
}