diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go index 8dacc0311f8..34313ee444c 100644 --- a/cmd/webhook/main.go +++ b/cmd/webhook/main.go @@ -53,6 +53,7 @@ import ( messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" "knative.dev/eventing/pkg/apis/sources" + pingdefaultconfig "knative.dev/eventing/pkg/apis/sources/config" sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" sourcesv1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1" sourcesv1alpha2 "knative.dev/eventing/pkg/apis/sources/v1alpha2" @@ -156,9 +157,11 @@ func NewValidationAdmissionController(ctx context.Context, cmw configmap.Watcher channelStore := channeldefaultconfig.NewStore(logging.FromContext(ctx).Named("channel-config-store")) channelStore.WatchConfigs(cmw) + pingstore := pingdefaultconfig.NewStore(logging.FromContext(ctx).Named("ping-config-store")) + pingstore.WatchConfigs(cmw) // Decorate contexts with the current state of the config. ctxFunc := func(ctx context.Context) context.Context { - return channelStore.ToContext(store.ToContext(ctx)) + return channelStore.ToContext(pingstore.ToContext(store.ToContext(ctx))) } return validation.NewAdmissionController(ctx, diff --git a/config/400-config-ping-defaults.yaml b/config/400-config-ping-defaults.yaml new file mode 120000 index 00000000000..380f85f9342 --- /dev/null +++ b/config/400-config-ping-defaults.yaml @@ -0,0 +1 @@ +core/configmaps/default-pingsource.yaml \ No newline at end of file diff --git a/config/core/configmaps/default-pingsource.yaml b/config/core/configmaps/default-pingsource.yaml new file mode 100644 index 00000000000..ef294239927 --- /dev/null +++ b/config/core/configmaps/default-pingsource.yaml @@ -0,0 +1,27 @@ +# Copyright 2020 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-ping-webhook + namespace: knative-eventing + labels: + eventing.knative.dev/release: devel + annotations: + knative.dev/example-checksum: "6eaeecba" +data: + ping-config: | + # dataMaxSize: 10 # Max number of bytes allowed to be sent for message excluding any base64 decoding. + # Default is no limit set for data diff --git a/pkg/apis/sources/config/ping_defaults.go b/pkg/apis/sources/config/ping_defaults.go new file mode 100644 index 00000000000..77aa40ca87f --- /dev/null +++ b/pkg/apis/sources/config/ping_defaults.go @@ -0,0 +1,87 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "encoding/json" + "fmt" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/yaml" +) + +const ( + // PingDefaultsConfigName is the name of config map for the default + // configs that pings should use. + PingDefaultsConfigName = "config-ping-webhook" + + // PingDefaulterKey is the key in the ConfigMap to get the name of the default + // Ping CRD. + PingDefaulterKey = "ping-config" + + PingDataMaxSize = -1 +) + +// NewPingDefaultsConfigFromMap creates a Defaults from the supplied Map +func NewPingDefaultsConfigFromMap(data map[string]string) (*PingDefaults, error) { + nc := &PingDefaults{DataMaxSize: PingDataMaxSize} + + // Parse out the Broker Configuration Cluster default section + value, present := data[PingDefaulterKey] + if !present || value == "" { + return nil, fmt.Errorf("ConfigMap is missing (or empty) key: %q : %v", PingDefaulterKey, data) + } + if err := parseEntry(value, nc); err != nil { + return nil, fmt.Errorf("Failed to parse the entry: %s", err) + } + return nc, nil +} + +func parseEntry(entry string, out interface{}) error { + j, err := yaml.YAMLToJSON([]byte(entry)) + if err != nil { + return fmt.Errorf("ConfigMap's value could not be converted to JSON: %s : %v", err, entry) + } + return json.Unmarshal(j, &out) +} + +// NewPingDefaultsConfigFromConfigMap creates a PingDefaults from the supplied configMap +func NewPingDefaultsConfigFromConfigMap(config *corev1.ConfigMap) (*PingDefaults, error) { + return NewPingDefaultsConfigFromMap(config.Data) +} + +// PingDefaults includes the default values to be populated by the webhook. +type PingDefaults struct { + DataMaxSize int `json:"dataMaxSize"` +} + +func (d *PingDefaults) GetPingConfig() *PingDefaults { + if d.DataMaxSize < 0 { + d.DataMaxSize = PingDataMaxSize + } + return d + +} + +func (d *PingDefaults) DeepCopy() *PingDefaults { + if d == nil { + return nil + } + out := new(PingDefaults) + *out = *d + return out +} diff --git a/pkg/apis/sources/config/store.go b/pkg/apis/sources/config/store.go new file mode 100644 index 00000000000..3b5a567d6c9 --- /dev/null +++ b/pkg/apis/sources/config/store.go @@ -0,0 +1,97 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "context" + + "knative.dev/pkg/configmap" +) + +type pingCfgKey struct{} + +// Config holds the collection of configurations that we attach to contexts. +// +k8s:deepcopy-gen=false +type Config struct { + PingDefaults *PingDefaults +} + +// FromContext extracts a Config from the provided context. +func FromContext(ctx context.Context) *Config { + x, ok := ctx.Value(pingCfgKey{}).(*Config) + if ok { + return x + } + return nil +} + +// FromContextOrDefaults is like FromContext, but when no Config is attached it +// returns a Config populated with the defaults for each of the Config fields. +func FromContextOrDefaults(ctx context.Context) *Config { + if cfg := FromContext(ctx); cfg != nil { + return cfg + } + pingDefaults, err := NewPingDefaultsConfigFromMap(map[string]string{}) + if err != nil || pingDefaults == nil { + pingDefaults = &PingDefaults{DataMaxSize: PingDataMaxSize} + pingDefaults.GetPingConfig() + } + + return &Config{ + PingDefaults: pingDefaults, + } +} + +// ToContext attaches the provided Config to the provided context, returning the +// new context with the Config attached. +func ToContext(ctx context.Context, c *Config) context.Context { + return context.WithValue(ctx, pingCfgKey{}, c) +} + +// Store is a typed wrapper around configmap.Untyped store to handle our configmaps. +// +k8s:deepcopy-gen=false +type Store struct { + *configmap.UntypedStore +} + +// NewStore creates a new store of Configs and optionally calls functions when ConfigMaps are updated. +func NewStore(logger configmap.Logger, onAfterStore ...func(name string, value interface{})) *Store { + store := &Store{ + UntypedStore: configmap.NewUntypedStore( + "pingdefaults", + logger, + configmap.Constructors{ + PingDefaultsConfigName: NewPingDefaultsConfigFromConfigMap, + }, + onAfterStore..., + ), + } + + return store +} + +// ToContext attaches the current Config state to the provided context. +func (s *Store) ToContext(ctx context.Context) context.Context { + return ToContext(ctx, s.Load()) +} + +// Load creates a Config from the current config state of the Store. +func (s *Store) Load() *Config { + return &Config{ + PingDefaults: s.UntypedLoad(PingDefaultsConfigName).(*PingDefaults).DeepCopy(), + } +} diff --git a/pkg/apis/sources/v1alpha2/ping_validation.go b/pkg/apis/sources/v1alpha2/ping_validation.go index ae7808482e0..8d048949461 100644 --- a/pkg/apis/sources/v1alpha2/ping_validation.go +++ b/pkg/apis/sources/v1alpha2/ping_validation.go @@ -18,11 +18,13 @@ package v1alpha2 import ( "context" + "fmt" "github.com/robfig/cron/v3" "knative.dev/pkg/apis" "knative.dev/eventing/pkg/apis/eventing" + "knative.dev/eventing/pkg/apis/sources/config" ) func (c *PingSource) Validate(ctx context.Context) *apis.FieldError { @@ -38,6 +40,14 @@ func (cs *PingSourceSpec) Validate(ctx context.Context) *apis.FieldError { errs = errs.Also(fe) } + pingConfig := config.FromContextOrDefaults(ctx) + pingDefaults := pingConfig.PingDefaults.GetPingConfig() + + if bsize := len(cs.JsonData); pingDefaults.DataMaxSize > -1 && bsize > pingDefaults.DataMaxSize { + fe := apis.ErrInvalidValue(fmt.Sprintf("the jsonData length of %d bytes exceeds limit set at %d.", bsize, pingDefaults.DataMaxSize), "jsonData") + errs = errs.Also(fe) + } + if fe := cs.Sink.Validate(ctx); fe != nil { errs = errs.Also(fe.ViaField("sink")) } diff --git a/pkg/apis/sources/v1alpha2/ping_validation_test.go b/pkg/apis/sources/v1alpha2/ping_validation_test.go index b3d9fd7bc11..381b30035ec 100644 --- a/pkg/apis/sources/v1alpha2/ping_validation_test.go +++ b/pkg/apis/sources/v1alpha2/ping_validation_test.go @@ -18,10 +18,12 @@ package v1alpha2 import ( "context" + "strings" "testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/eventing/pkg/apis/eventing" + "knative.dev/eventing/pkg/apis/sources/config" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -33,6 +35,7 @@ func TestPingSourceValidation(t *testing.T) { tests := []struct { name string source PingSource + ctx func(ctx context.Context) context.Context want *apis.FieldError }{{ name: "valid spec", @@ -111,14 +114,74 @@ func TestPingSourceValidation(t *testing.T) { errs = errs.Also(fe) return errs }(), + }, { + name: "too big json", + source: PingSource{ + Spec: PingSourceSpec{ + Schedule: "*/2 * * * *", + JsonData: bigString(), + SourceSpec: duckv1.SourceSpec{ + Sink: duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "v1alpha1", + Kind: "broker", + Name: "default", + }, + }, + }, + }, + }, + ctx: func(ctx context.Context) context.Context { + + return config.ToContext(ctx, &config.Config{PingDefaults: &config.PingDefaults{DataMaxSize: 4096}}) + }, + want: func() *apis.FieldError { + var errs *apis.FieldError + fe := apis.ErrInvalidValue("the jsonData length of 5000 bytes exceeds limit set at 4096.", "spec.jsonData") + errs = errs.Also(fe) + return errs + }(), + }, { + name: "too big json not checked ok", + source: PingSource{ + Spec: PingSourceSpec{ + Schedule: "*/2 * * * *", + JsonData: bigString(), + SourceSpec: duckv1.SourceSpec{ + Sink: duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "v1alpha1", + Kind: "broker", + Name: "default", + }, + }, + }, + }, + }, + want: nil, }} for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got := test.source.Validate(context.TODO()) + ctx := context.TODO() + if test.ctx != nil { + ctx = test.ctx(ctx) + } + got := test.source.Validate(ctx) if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" { t.Error("ContainerSourceSpec.Validate (-want, +got) =", diff) } }) } } + +func bigString() string { + var b strings.Builder + b.Grow(5000) + b.WriteString("\"") + for i := 0; i < 4998; i++ { + b.WriteString("a") + } + b.WriteString("\"") + return b.String() +} diff --git a/pkg/apis/sources/v1beta1/ping_validation.go b/pkg/apis/sources/v1beta1/ping_validation.go index e65ceacbac6..df07d799b46 100644 --- a/pkg/apis/sources/v1beta1/ping_validation.go +++ b/pkg/apis/sources/v1beta1/ping_validation.go @@ -18,10 +18,13 @@ package v1beta1 import ( "context" + "fmt" "strings" "github.com/robfig/cron/v3" "knative.dev/pkg/apis" + + "knative.dev/eventing/pkg/apis/sources/config" ) func (c *PingSource) Validate(ctx context.Context) *apis.FieldError { @@ -46,6 +49,15 @@ func (cs *PingSourceSpec) Validate(ctx context.Context) *apis.FieldError { } } + pingConfig := config.FromContextOrDefaults(ctx) + + pingDefaults := pingConfig.PingDefaults.GetPingConfig() + + if bsize := len(cs.JsonData); pingDefaults.DataMaxSize > -1 && bsize > pingDefaults.DataMaxSize { + fe := apis.ErrInvalidValue(fmt.Sprintf("the jsonData length of %d bytes exceeds limit set at %d.", bsize, pingDefaults.DataMaxSize), "jsonData") + errs = errs.Also(fe) + } + if fe := cs.Sink.Validate(ctx); fe != nil { errs = errs.Also(fe.ViaField("sink")) } diff --git a/pkg/apis/sources/v1beta1/ping_validation_test.go b/pkg/apis/sources/v1beta1/ping_validation_test.go index 33d4c25fe19..b751d000716 100644 --- a/pkg/apis/sources/v1beta1/ping_validation_test.go +++ b/pkg/apis/sources/v1beta1/ping_validation_test.go @@ -18,18 +18,22 @@ package v1beta1 import ( "context" + "strings" "testing" duckv1 "knative.dev/pkg/apis/duck/v1" "github.com/google/go-cmp/cmp" "knative.dev/pkg/apis" + + "knative.dev/eventing/pkg/apis/sources/config" ) func TestPingSourceValidation(t *testing.T) { tests := []struct { name string source PingSource + ctx func(ctx context.Context) context.Context want *apis.FieldError }{{ name: "valid spec", @@ -121,14 +125,73 @@ func TestPingSourceValidation(t *testing.T) { errs = errs.Also(fe) return errs }(), + }, { + name: "too big json", + source: PingSource{ + Spec: PingSourceSpec{ + Schedule: "*/2 * * * *", + JsonData: bigString(), + SourceSpec: duckv1.SourceSpec{ + Sink: duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "v1alpha1", + Kind: "broker", + Name: "default", + }, + }, + }, + }, + }, + ctx: func(ctx context.Context) context.Context { + return config.ToContext(ctx, &config.Config{PingDefaults: &config.PingDefaults{DataMaxSize: 4096}}) + }, + want: func() *apis.FieldError { + var errs *apis.FieldError + fe := apis.ErrInvalidValue("the jsonData length of 5000 bytes exceeds limit set at 4096.", "spec.jsonData") + errs = errs.Also(fe) + return errs + }(), + }, { + name: "too big json but ok", + source: PingSource{ + Spec: PingSourceSpec{ + Schedule: "*/2 * * * *", + JsonData: bigString(), + SourceSpec: duckv1.SourceSpec{ + Sink: duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "v1alpha1", + Kind: "broker", + Name: "default", + }, + }, + }, + }, + }, + want: nil, }} for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got := test.source.Validate(context.TODO()) + ctx := context.TODO() + if test.ctx != nil { + ctx = test.ctx(ctx) + } + got := test.source.Validate(ctx) if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" { t.Error("PingSourceSpec.Validate (-want, +got) =", diff) } }) } } + +func bigString() string { + var b strings.Builder + b.Grow(5000) + b.WriteString("\"") + for i := 0; i < 4998; i++ { + b.WriteString("a") + } + b.WriteString("\"") + return b.String() +} diff --git a/pkg/apis/sources/v1beta2/ping_validation.go b/pkg/apis/sources/v1beta2/ping_validation.go index 325fe9136e2..189a38ccbdb 100644 --- a/pkg/apis/sources/v1beta2/ping_validation.go +++ b/pkg/apis/sources/v1beta2/ping_validation.go @@ -20,12 +20,15 @@ import ( "context" "encoding/base64" "encoding/json" + "fmt" "strings" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/robfig/cron/v3" "knative.dev/pkg/apis" + + "knative.dev/eventing/pkg/apis/sources/config" ) func (c *PingSource) Validate(ctx context.Context) *apis.FieldError { @@ -50,6 +53,9 @@ func (cs *PingSourceSpec) Validate(ctx context.Context) *apis.FieldError { } } + pingConfig := config.FromContextOrDefaults(ctx) + pingDefaults := pingConfig.PingDefaults.GetPingConfig() + if fe := cs.Sink.Validate(ctx); fe != nil { errs = errs.Also(fe.ViaField("sink")) } @@ -57,6 +63,10 @@ func (cs *PingSourceSpec) Validate(ctx context.Context) *apis.FieldError { if cs.Data != "" && cs.DataBase64 != "" { errs = errs.Also(apis.ErrMultipleOneOf("data", "dataBase64")) } else if cs.DataBase64 != "" { + if bsize := len(cs.DataBase64); pingDefaults.DataMaxSize > -1 && bsize > pingDefaults.DataMaxSize { + fe := apis.ErrInvalidValue(fmt.Sprintf("the dataBase64 length of %d bytes exceeds limit set at %d.", bsize, pingDefaults.DataMaxSize), "dataBase64") + errs = errs.Also(fe) + } decoded, err := base64.StdEncoding.DecodeString(cs.DataBase64) // invalid base64 string if err != nil { @@ -69,13 +79,18 @@ func (cs *PingSourceSpec) Validate(ctx context.Context) *apis.FieldError { } } } - } else if cs.Data != "" && cs.ContentType == cloudevents.ApplicationJSON { - // validate if data is valid JSON - if err := validateJSON(cs.Data); err != nil { - errs = errs.Also(apis.ErrInvalidValue(err, "data")) + } else if cs.Data != "" { + if bsize := len(cs.Data); pingDefaults.DataMaxSize > -1 && bsize > pingDefaults.DataMaxSize { + fe := apis.ErrInvalidValue(fmt.Sprintf("the data length of %d bytes exceeds limit set at %d.", bsize, pingDefaults.DataMaxSize), "data") + errs = errs.Also(fe) + } + if cs.ContentType == cloudevents.ApplicationJSON { + // validate if data is valid JSON + if err := validateJSON(cs.Data); err != nil { + errs = errs.Also(apis.ErrInvalidValue(err, "data")) + } } } - return errs } diff --git a/pkg/apis/sources/v1beta2/ping_validation_test.go b/pkg/apis/sources/v1beta2/ping_validation_test.go index 96b2c8cac3f..fbbb0a1b01d 100644 --- a/pkg/apis/sources/v1beta2/ping_validation_test.go +++ b/pkg/apis/sources/v1beta2/ping_validation_test.go @@ -19,6 +19,7 @@ package v1beta2 import ( "context" "encoding/base64" + "strings" "testing" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -27,12 +28,15 @@ import ( "github.com/google/go-cmp/cmp" "knative.dev/pkg/apis" + + "knative.dev/eventing/pkg/apis/sources/config" ) func TestPingSourceValidation(t *testing.T) { tests := []struct { name string source PingSource + ctx func(ctx context.Context) context.Context want *apis.FieldError }{ { @@ -260,15 +264,127 @@ func TestPingSourceValidation(t *testing.T) { errs = errs.Also(fe) return errs }(), + }, { + name: "invalid DataBase64 is to big ", + source: PingSource{ + Spec: PingSourceSpec{ + Schedule: "*/2 * * * *", + ContentType: cloudevents.TextPlain, + DataBase64: base64.StdEncoding.EncodeToString([]byte(bigString())), + SourceSpec: duckv1.SourceSpec{ + Sink: duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "v1", + Kind: "broker", + Name: "default", + }, + }, + }, + }, + }, + ctx: func(ctx context.Context) context.Context { + return config.ToContext(ctx, &config.Config{PingDefaults: &config.PingDefaults{DataMaxSize: 4096}}) + }, + want: func() *apis.FieldError { + var errs *apis.FieldError + fe := apis.ErrInvalidValue("the dataBase64 length of 6668 bytes exceeds limit set at 4096.", "spec.dataBase64") + errs = errs.Also(fe) + return errs + }(), + }, { + name: "invalid Data is to big ", + source: PingSource{ + Spec: PingSourceSpec{ + Schedule: "*/2 * * * *", + ContentType: cloudevents.TextPlain, + Data: bigString(), + SourceSpec: duckv1.SourceSpec{ + Sink: duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "v1", + Kind: "broker", + Name: "default", + }, + }, + }, + }, + }, + ctx: func(ctx context.Context) context.Context { + + return config.ToContext(ctx, &config.Config{PingDefaults: &config.PingDefaults{DataMaxSize: 4096}}) + }, + want: func() *apis.FieldError { + var errs *apis.FieldError + fe := apis.ErrInvalidValue("the data length of 5000 bytes exceeds limit set at 4096.", "spec.data") + errs = errs.Also(fe) + return errs + }(), + }, { + name: "big data ok ", + source: PingSource{ + + Spec: PingSourceSpec{ + Schedule: "*/2 * * * *", + ContentType: cloudevents.TextPlain, + Data: bigString(), + SourceSpec: duckv1.SourceSpec{ + Sink: duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "v1", + Kind: "broker", + Name: "default", + }, + }, + }, + }, + }, + ctx: func(ctx context.Context) context.Context { + return config.ToContext(ctx, &config.Config{PingDefaults: &config.PingDefaults{DataMaxSize: -1}}) + }, + want: nil, + }, { + name: "big data still ok ", + source: PingSource{ + + Spec: PingSourceSpec{ + Schedule: "*/2 * * * *", + ContentType: cloudevents.TextPlain, + Data: bigString(), + SourceSpec: duckv1.SourceSpec{ + Sink: duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "v1", + Kind: "broker", + Name: "default", + }, + }, + }, + }, + }, + want: nil, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got := test.source.Validate(context.TODO()) + ctx := context.TODO() + if test.ctx != nil { + ctx = test.ctx(ctx) + } + got := test.source.Validate(ctx) if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" { t.Error("PingSourceSpec.Validate (-want, +got) =", diff) } }) } } +func bigString() string { + var b strings.Builder + b.Grow(5000) + b.WriteString("\"") + for i := 0; i < 4998; i++ { + b.WriteString("a") + } + b.WriteString("\"") + return b.String() +}