Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/apis/config/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/apis/duck/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/apis/duck/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/apis/eventing/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/apis/eventing/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/apis/flows/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/apis/messaging/config/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/apis/messaging/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/apis/sources/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/apis/sources/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 13 additions & 8 deletions pkg/reconciler/broker/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ import (
"k8s.io/client-go/dynamic"
corev1listers "k8s.io/client-go/listers/core/v1"

"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/network"
pkgreconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/resolver"
"knative.dev/pkg/system"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
Expand All @@ -39,14 +48,6 @@ import (
"knative.dev/eventing/pkg/duck"
"knative.dev/eventing/pkg/reconciler/broker/resources"
"knative.dev/eventing/pkg/reconciler/sugar/trigger/path"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/network"
pkgreconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/resolver"
"knative.dev/pkg/system"
)

var brokerGVK = eventingv1.SchemeGroupVersion.WithKind("Broker")
Expand Down Expand Up @@ -155,6 +156,10 @@ func (r *Reconciler) resolveDeadLetterSink(ctx context.Context, b *eventingv1.Br
// resolve the trigger's dlq first, fall back to the broker's
for _, delivery := range []*eventingduckv1.DeliverySpec{t.Spec.Delivery, b.Spec.Delivery} {
if delivery != nil && delivery.DeadLetterSink != nil {
if delivery.DeadLetterSink.Ref != nil && delivery.DeadLetterSink.Ref.Namespace == "" {
delivery = delivery.DeepCopy() // Do not mutate informer copy.
delivery.DeadLetterSink.Ref.Namespace = t.GetNamespace()
}
dlqURI, err := r.uriResolver.URIFromDestinationV1(ctx, *delivery.DeadLetterSink, b)
if err != nil {
logging.FromContext(ctx).Errorw("Unable to get the dead letter sink's URI", zap.Error(err))
Expand Down
101 changes: 86 additions & 15 deletions pkg/reconciler/broker/trigger/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientgotesting "k8s.io/client-go/testing"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/apis/sources/v1beta2"
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable"
"knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger"
"knative.dev/eventing/pkg/duck"
"knative.dev/eventing/pkg/reconciler/broker/resources"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
v1addr "knative.dev/pkg/client/injection/ducks/duck/v1/addressable"
Expand All @@ -53,11 +42,24 @@ import (
"knative.dev/pkg/ptr"
"knative.dev/pkg/resolver"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/apis/sources/v1beta2"
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable"
"knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger"
"knative.dev/eventing/pkg/duck"
"knative.dev/eventing/pkg/reconciler/broker/resources"

_ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake"
. "knative.dev/pkg/reconciler/testing"

_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake"
. "knative.dev/eventing/pkg/reconciler/testing/v1"
rtv1beta2 "knative.dev/eventing/pkg/reconciler/testing/v1beta2"
_ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake"
. "knative.dev/pkg/reconciler/testing"
)

const (
Expand Down Expand Up @@ -306,6 +308,71 @@ func TestReconcile(t *testing.T) {
WithTriggerStatusDeadLetterSinkURI("http://example.com"),
WithTriggerDeadLetterSinkResolvedSucceeded()),
}},
}, {
Name: "Trigger with Broker DLQ",
Key: testKey,
Objects: []runtime.Object{
NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
WithBrokerConfig(config()),
WithInitBrokerConditions,
WithBrokerReady,
WithBrokerResourceVersion(""),
WithBrokerAddressURI(brokerAddress),
WithChannelAddressAnnotation(triggerChannelURL),
WithChannelAPIVersionAnnotation(triggerChannelAPIVersion),
WithChannelKindAnnotation(triggerChannelKind),
WithChannelNameAnnotation(triggerChannelName),
WithDeadLeaderSink(&duckv1.KReference{
Kind: "Broker",
Name: brokerName,
APIVersion: "eventing.knative.dev/v1",
}, ""),
),
createChannel(testNS, true),
imcConfigMap(),
NewEndpoints(filterServiceName, systemNS,
WithEndpointsLabels(FilterLabels()),
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
NewEndpoints(ingressServiceName, systemNS,
WithEndpointsLabels(IngressLabels()),
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
makeSubscriberAddressableAsUnstructured(testNS),
NewTrigger(triggerName, testNS, brokerName,
WithTriggerUID(triggerUID),
WithTriggerSubscriberRefAndURIReference(subscriberGVK, subscriberName, testNS, subscriberURIReference),
WithInitTriggerConditions,
),
},
WantErr: false,
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewTrigger(triggerName, testNS, brokerName,
WithTriggerUID(triggerUID),
WithTriggerSubscriberRefAndURIReference(subscriberGVK, subscriberName, testNS, subscriberURIReference),
// The first reconciliation will initialize the status conditions.
WithInitTriggerConditions,
WithTriggerBrokerReady(),
WithTriggerSubscriptionNotConfigured(),
WithTriggerStatusSubscriberURI(subscriberResolvedTargetURI),
WithTriggerSubscriberResolvedSucceeded(),
WithTriggerDependencyReady(),
WithTriggerStatusDeadLetterSinkURI("http://broker-ingress.knative-testing.svc.cluster.local/test-namespace/test-broker"),
WithTriggerDeadLetterSinkResolvedSucceeded(),
),
}},
WantCreates: []runtime.Object{
makeFilterSubscription(testNS, func(subscription *messagingv1.Subscription) {
subscription.Spec.Delivery = &eventingduckv1.DeliverySpec{
DeadLetterSink: &duckv1.Destination{
Ref: &duckv1.KReference{
Kind: "Broker",
Name: brokerName,
APIVersion: "eventing.knative.dev/v1",
},
},
}
}),
},
}, {
Name: "Subscription Create fails",
Key: testKey,
Expand Down Expand Up @@ -1054,8 +1121,12 @@ func makeServiceURI() *apis.URL {
Path: fmt.Sprintf("/triggers/%s/%s/%s", testNS, triggerName, triggerUID),
}
}
func makeFilterSubscription(subscriberNamespace string) *messagingv1.Subscription {
return resources.NewSubscription(makeTrigger(subscriberNamespace), createTriggerChannelRef(), makeBrokerRef(), makeServiceURI(), makeEmptyDelivery())
func makeFilterSubscription(subscriberNamespace string, opts ...func(subscription *messagingv1.Subscription)) *messagingv1.Subscription {
s := resources.NewSubscription(makeTrigger(subscriberNamespace), createTriggerChannelRef(), makeBrokerRef(), makeServiceURI(), makeEmptyDelivery())
for _, opt := range opts {
opt(s)
}
return s
}

func makeTrigger(subscriberNamespace string) *eventingv1.Trigger {
Expand Down