diff --git a/config/core/resources/trigger.yaml b/config/core/resources/trigger.yaml index b0a74b5d01a..0b460e730e1 100644 --- a/config/core/resources/trigger.yaml +++ b/config/core/resources/trigger.yaml @@ -58,6 +58,7 @@ spec: delivery: description: Delivery contains the delivery spec for this specific trigger. type: object + x-kubernetes-preserve-unknown-fields: true # This is necessary to enable the experimental feature delivery-timeout properties: backoffDelay: description: 'BackoffDelay is the delay before retrying. More information on Duration format: - https://www.iso.org/iso-8601-date-and-time-format.html - https://en.wikipedia.org/wiki/ISO_8601 For linear policy, backoff delay is backoffDelay*. For exponential policy, backoff delay is backoffDelay*2^.' diff --git a/pkg/apis/sources/v1/ping_validation.go b/pkg/apis/sources/v1/ping_validation.go index d8648974700..c60f49a4f95 100644 --- a/pkg/apis/sources/v1/ping_validation.go +++ b/pkg/apis/sources/v1/ping_validation.go @@ -20,6 +20,7 @@ import ( "context" "encoding/base64" "encoding/json" + "errors" "fmt" "strings" @@ -37,8 +38,10 @@ func (c *PingSource) Validate(ctx context.Context) *apis.FieldError { func (cs *PingSourceSpec) Validate(ctx context.Context) *apis.FieldError { var errs *apis.FieldError - schedule := cs.Schedule + + errs = validateDescriptor(schedule) + if cs.Timezone != "" { schedule = "CRON_TZ=" + cs.Timezone + " " + schedule } @@ -98,3 +101,10 @@ func validateJSON(str string) error { var objmap map[string]interface{} return json.Unmarshal([]byte(str), &objmap) } + +func validateDescriptor(spec string) *apis.FieldError { + if strings.Contains(spec, "@every") { + return apis.ErrInvalidValue(errors.New("unsupported descriptor @every"), "schedule") + } + return nil +} diff --git a/pkg/apis/sources/v1/ping_validation_test.go b/pkg/apis/sources/v1/ping_validation_test.go index f06e399883e..a4ee9092985 100644 --- a/pkg/apis/sources/v1/ping_validation_test.go +++ b/pkg/apis/sources/v1/ping_validation_test.go @@ -362,6 +362,51 @@ func TestPingSourceValidation(t *testing.T) { }, }, want: nil, + }, { + name: "unsupported @every descriptor", + source: PingSource{ + Spec: PingSourceSpec{ + Schedule: "@every 2h", + SourceSpec: duckv1.SourceSpec{ + Sink: duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "v1", + Kind: "broker", + Name: "default", + }, + }, + }, + }, + }, + want: func() *apis.FieldError { + var errs *apis.FieldError + fe := apis.ErrInvalidValue("unsupported descriptor @every", "spec.schedule") + errs = errs.Also(fe) + return errs + }(), + }, { + name: "unsupported @every descriptor with TZ", + source: PingSource{ + Spec: PingSourceSpec{ + Schedule: "@every 2h", + Timezone: "Europe/Paris", + SourceSpec: duckv1.SourceSpec{ + Sink: duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "v1", + Kind: "broker", + Name: "default", + }, + }, + }, + }, + }, + want: func() *apis.FieldError { + var errs *apis.FieldError + fe := apis.ErrInvalidValue("unsupported descriptor @every", "spec.schedule") + errs = errs.Also(fe) + return errs + }(), }, } diff --git a/pkg/apis/sources/v1beta2/ping_validation.go b/pkg/apis/sources/v1beta2/ping_validation.go index 5bbd2d7f0ef..c8fa47a600d 100644 --- a/pkg/apis/sources/v1beta2/ping_validation.go +++ b/pkg/apis/sources/v1beta2/ping_validation.go @@ -20,6 +20,7 @@ import ( "context" "encoding/base64" "encoding/json" + "errors" "fmt" "strings" @@ -37,8 +38,10 @@ func (c *PingSource) Validate(ctx context.Context) *apis.FieldError { func (cs *PingSourceSpec) Validate(ctx context.Context) *apis.FieldError { var errs *apis.FieldError - schedule := cs.Schedule + + errs = validateDescriptor(schedule) + if cs.Timezone != "" { schedule = "CRON_TZ=" + cs.Timezone + " " + schedule } @@ -98,3 +101,10 @@ func validateJSON(str string) error { var objmap map[string]interface{} return json.Unmarshal([]byte(str), &objmap) } + +func validateDescriptor(spec string) *apis.FieldError { + if strings.Contains(spec, "@every") { + return apis.ErrInvalidValue(errors.New("unsupported descriptor @every"), "schedule") + } + return nil +} diff --git a/pkg/apis/sources/v1beta2/ping_validation_test.go b/pkg/apis/sources/v1beta2/ping_validation_test.go index fbbb0a1b01d..664feb948ed 100644 --- a/pkg/apis/sources/v1beta2/ping_validation_test.go +++ b/pkg/apis/sources/v1beta2/ping_validation_test.go @@ -362,6 +362,51 @@ func TestPingSourceValidation(t *testing.T) { }, }, want: nil, + }, { + name: "unsupported @every descriptor", + source: PingSource{ + Spec: PingSourceSpec{ + Schedule: "@every 2h", + SourceSpec: duckv1.SourceSpec{ + Sink: duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "v1", + Kind: "broker", + Name: "default", + }, + }, + }, + }, + }, + want: func() *apis.FieldError { + var errs *apis.FieldError + fe := apis.ErrInvalidValue("unsupported descriptor @every", "spec.schedule") + errs = errs.Also(fe) + return errs + }(), + }, { + name: "unsupported @every descriptor with TZ", + source: PingSource{ + Spec: PingSourceSpec{ + Schedule: "@every 2h", + Timezone: "Europe/Paris", + SourceSpec: duckv1.SourceSpec{ + Sink: duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "v1", + Kind: "broker", + Name: "default", + }, + }, + }, + }, + }, + want: func() *apis.FieldError { + var errs *apis.FieldError + fe := apis.ErrInvalidValue("unsupported descriptor @every", "spec.schedule") + errs = errs.Also(fe) + return errs + }(), }, } diff --git a/test/experimental/delivery_timeout_test.go b/test/experimental/delivery_timeout_test.go index 8a2d1c57187..74e66cd95e5 100644 --- a/test/experimental/delivery_timeout_test.go +++ b/test/experimental/delivery_timeout_test.go @@ -21,9 +21,14 @@ package experimental import ( "testing" + "knative.dev/eventing/pkg/apis/eventing" "knative.dev/eventing/test/experimental/features/delivery_timeout" + "knative.dev/eventing/test/rekt/features/broker" + b "knative.dev/eventing/test/rekt/resources/broker" + rt "knative.dev/eventing/test/rekt/resources/trigger" "knative.dev/pkg/system" "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/k8s" "knative.dev/reconciler-test/pkg/knative" ) @@ -41,3 +46,21 @@ func TestDeliveryTimeout(t *testing.T) { env.Test(ctx, t, delivery_timeout.ChannelToSink()) } + +func TestBrokerTriggerWithDeliveryTimeout(t *testing.T) { + class := eventing.MTChannelBrokerClassValue + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + brokerName := feature.MakeRandomK8sName("broker") + triggerName := feature.MakeRandomK8sName("trigger") + + env.Test(ctx, t, broker.GoesReady(brokerName, b.WithBrokerClass(class), b.WithTimeout("PT1S"))) + env.Test(ctx, t, broker.TriggerGoesReady(triggerName, brokerName, rt.WithTimeout("PT10S"))) +} diff --git a/test/rekt/features/broker/readyness.go b/test/rekt/features/broker/readyness.go index 9e5cb3a780e..e97c38adf33 100644 --- a/test/rekt/features/broker/readyness.go +++ b/test/rekt/features/broker/readyness.go @@ -29,15 +29,14 @@ import ( // TriggerGoesReady returns a feature that tests after the creation of a // Trigger, it becomes ready. This feature assumes the Broker already exists. -func TriggerGoesReady(name, brokerName string) *feature.Feature { - cfg := []manifest.CfgFn(nil) - +func TriggerGoesReady(name, brokerName string, cfg ...manifest.CfgFn) *feature.Feature { f := new(feature.Feature) // The test needs a subscriber. sub := feature.MakeRandomK8sName("sub") f.Setup("install a service", svc.Install(sub, "app", "rekt")) - cfg = append(cfg, trigger.WithSubscriber(svc.AsKReference(sub), "")) + // Append user-provided cfg to the end, in case they are providing their own subscriber. + cfg = append([]manifest.CfgFn{trigger.WithSubscriber(svc.AsKReference(sub), "")}, cfg...) // Install the trigger f.Setup(fmt.Sprintf("install trigger %q", name), trigger.Install(name, brokerName, cfg...)) diff --git a/test/rekt/resources/broker/broker.go b/test/rekt/resources/broker/broker.go index da631b61e90..6d84ae24578 100644 --- a/test/rekt/resources/broker/broker.go +++ b/test/rekt/resources/broker/broker.go @@ -93,6 +93,9 @@ var WithDeadLetterSink = delivery.WithDeadLetterSink // WithRetry adds the retry related config to a Broker spec. var WithRetry = delivery.WithRetry +// WithTimeout adds the timeout related config to the config. +var WithTimeout = delivery.WithTimeout + // Install will create a Broker resource, augmented with the config fn options. func Install(name string, opts ...manifest.CfgFn) feature.StepFn { cfg := map[string]interface{}{ diff --git a/test/rekt/resources/broker/broker.yaml b/test/rekt/resources/broker/broker.yaml index 91ec6d821f5..694cd44d376 100644 --- a/test/rekt/resources/broker/broker.yaml +++ b/test/rekt/resources/broker/broker.yaml @@ -31,6 +31,9 @@ spec: {{ end }} {{ if .delivery }} delivery: + {{ if .delivery.timeout }} + timeout: {{ .delivery.timeout }} + {{ end }} {{ if .delivery.deadLetterSink }} deadLetterSink: {{ if .delivery.deadLetterSink.ref }} diff --git a/test/rekt/resources/delivery/delivery.go b/test/rekt/resources/delivery/delivery.go index 7f38cecd7e1..05342a4132e 100644 --- a/test/rekt/resources/delivery/delivery.go +++ b/test/rekt/resources/delivery/delivery.go @@ -66,3 +66,15 @@ func WithRetry(count int32, backoffPolicy *eventingv1.BackoffPolicyType, backoff } } } + +// WithTimeout adds the timeout related config to the config. +func WithTimeout(timeout string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if _, set := cfg["delivery"]; !set { + cfg["delivery"] = map[string]interface{}{} + } + delivery := cfg["delivery"].(map[string]interface{}) + + delivery["timeout"] = timeout + } +} diff --git a/test/rekt/resources/trigger/trigger.go b/test/rekt/resources/trigger/trigger.go index 9b883dec5b7..d5bc9c2257a 100644 --- a/test/rekt/resources/trigger/trigger.go +++ b/test/rekt/resources/trigger/trigger.go @@ -94,6 +94,9 @@ var WithDeadLetterSink = delivery.WithDeadLetterSink // WithRetry adds the retry related config to a Trigger spec. var WithRetry = delivery.WithRetry +// WithTimeout adds the timeout related config to the config. +var WithTimeout = delivery.WithTimeout + // Install will create a Trigger resource, augmented with the config fn options. func Install(name, brokerName string, opts ...manifest.CfgFn) feature.StepFn { cfg := map[string]interface{}{ diff --git a/test/rekt/resources/trigger/trigger.yaml b/test/rekt/resources/trigger/trigger.yaml index e0fd13d1985..7974149405e 100644 --- a/test/rekt/resources/trigger/trigger.yaml +++ b/test/rekt/resources/trigger/trigger.yaml @@ -49,6 +49,9 @@ spec: {{ end }} {{ if .delivery }} delivery: + {{ if .delivery.timeout }} + timeout: {{ .delivery.timeout }} + {{ end }} {{ if .delivery.deadLetterSink }} deadLetterSink: {{ if .delivery.deadLetterSink.ref }}