Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/core/resources/trigger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ spec:
delivery:
description: Delivery contains the delivery spec for this specific trigger.
type: object
x-kubernetes-preserve-unknown-fields: true # This is necessary to enable the experimental feature delivery-timeout
properties:
backoffDelay:
description: 'BackoffDelay is the delay before retrying. More information on Duration format: - https://www.iso.org/iso-8601-date-and-time-format.html - https://en.wikipedia.org/wiki/ISO_8601 For linear policy, backoff delay is backoffDelay*<numberOfRetries>. For exponential policy, backoff delay is backoffDelay*2^<numberOfRetries>.'
Expand Down
23 changes: 23 additions & 0 deletions test/experimental/delivery_timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@ package experimental
import (
"testing"

"knative.dev/eventing/pkg/apis/eventing"
"knative.dev/eventing/test/experimental/features/delivery_timeout"
"knative.dev/eventing/test/rekt/features/broker"
b "knative.dev/eventing/test/rekt/resources/broker"
rt "knative.dev/eventing/test/rekt/resources/trigger"
"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/k8s"
"knative.dev/reconciler-test/pkg/knative"
)
Expand All @@ -41,3 +46,21 @@ func TestDeliveryTimeout(t *testing.T) {

env.Test(ctx, t, delivery_timeout.ChannelToSink())
}

func TestBrokerTriggerWithDeliveryTimeout(t *testing.T) {
class := eventing.MTChannelBrokerClassValue

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
)

brokerName := feature.MakeRandomK8sName("broker")
triggerName := feature.MakeRandomK8sName("trigger")

env.Test(ctx, t, broker.GoesReady(brokerName, b.WithBrokerClass(class), b.WithTimeout("PT1S")))
env.Test(ctx, t, broker.TriggerGoesReady(triggerName, brokerName, rt.WithTimeout("PT10S")))
}
7 changes: 3 additions & 4 deletions test/rekt/features/broker/readyness.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@ import (

// TriggerGoesReady returns a feature that tests after the creation of a
// Trigger, it becomes ready. This feature assumes the Broker already exists.
func TriggerGoesReady(name, brokerName string) *feature.Feature {
cfg := []manifest.CfgFn(nil)

func TriggerGoesReady(name, brokerName string, cfg ...manifest.CfgFn) *feature.Feature {
f := new(feature.Feature)

// The test needs a subscriber.
sub := feature.MakeRandomK8sName("sub")
f.Setup("install a service", svc.Install(sub, "app", "rekt"))
cfg = append(cfg, trigger.WithSubscriber(svc.AsKReference(sub), ""))
// Append user-provided cfg to the end, in case they are providing their own subscriber.
cfg = append([]manifest.CfgFn{trigger.WithSubscriber(svc.AsKReference(sub), "")}, cfg...)

// Install the trigger
f.Setup(fmt.Sprintf("install trigger %q", name), trigger.Install(name, brokerName, cfg...))
Expand Down
3 changes: 3 additions & 0 deletions test/rekt/resources/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ var WithDeadLetterSink = delivery.WithDeadLetterSink
// WithRetry adds the retry related config to a Broker spec.
var WithRetry = delivery.WithRetry

// WithTimeout adds the timeout related config to the config.
var WithTimeout = delivery.WithTimeout

// Install will create a Broker resource, augmented with the config fn options.
func Install(name string, opts ...manifest.CfgFn) feature.StepFn {
cfg := map[string]interface{}{
Expand Down
3 changes: 3 additions & 0 deletions test/rekt/resources/broker/broker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ spec:
{{ end }}
{{ if .delivery }}
delivery:
{{ if .delivery.timeout }}
timeout: {{ .delivery.timeout }}
{{ end }}
{{ if .delivery.deadLetterSink }}
deadLetterSink:
{{ if .delivery.deadLetterSink.ref }}
Expand Down
12 changes: 12 additions & 0 deletions test/rekt/resources/delivery/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,15 @@ func WithRetry(count int32, backoffPolicy *eventingv1.BackoffPolicyType, backoff
}
}
}

// WithTimeout adds the timeout related config to the config.
func WithTimeout(timeout string) manifest.CfgFn {
return func(cfg map[string]interface{}) {
if _, set := cfg["delivery"]; !set {
cfg["delivery"] = map[string]interface{}{}
}
delivery := cfg["delivery"].(map[string]interface{})

delivery["timeout"] = timeout
}
}
3 changes: 3 additions & 0 deletions test/rekt/resources/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ var WithDeadLetterSink = delivery.WithDeadLetterSink
// WithRetry adds the retry related config to a Trigger spec.
var WithRetry = delivery.WithRetry

// WithTimeout adds the timeout related config to the config.
var WithTimeout = delivery.WithTimeout

// Install will create a Trigger resource, augmented with the config fn options.
func Install(name, brokerName string, opts ...manifest.CfgFn) feature.StepFn {
cfg := map[string]interface{}{
Expand Down
3 changes: 3 additions & 0 deletions test/rekt/resources/trigger/trigger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ spec:
{{ end }}
{{ if .delivery }}
delivery:
{{ if .delivery.timeout }}
timeout: {{ .delivery.timeout }}
{{ end }}
{{ if .delivery.deadLetterSink }}
deadLetterSink:
{{ if .delivery.deadLetterSink.ref }}
Expand Down