diff --git a/docs/delivery/README.md b/docs/delivery/README.md new file mode 100644 index 00000000000..f4c5dd0a058 --- /dev/null +++ b/docs/delivery/README.md @@ -0,0 +1,124 @@ +# Delivery Design + +This document synthetizes the [error handling design document](https://docs.google.com/document/d/1qRrzGoHJQO-oc5p-yRK8IRfugd-FM_PXyM7lN5kcqks). + +## Problem + +Sending events can fail for a variety of reasons (downstream system is down, application logic rejects the invalid event, a runtime exception occurs, etc...) but right now there is no way to control or define the expected behavior in these situations. + +## Requirements + +* Be able to handle events that failed to be delivered + * to channel subscribers. + * to source sink. + * to broker/triggers. +* Be able to identify events that could not be delivered (Observability) +* Be able to leverage existing error handling mechanisms provided by the +underlying platform (eg. RabbitMQ dead letter exchange, Amazon SQS dead letter queue, Azure Service Bus dead letter queue, etc...). +* Be able to redirect of error'ed events from a channel. + +### Out of Scope + +* Security: No security model has been defined yet at the Knative eventing level. Native brokers might define their own security model and Knative eventing should not prevent using it. + +## Day 1 Proposal + +### Dead Letter Sink + +Channels are responsible for forwarding received events to subscribers. When they fail to do so, they are responsible for sending the failing events to an **dead letter sink**, potentially one per subscriber. + +Similarly Event Sources are responsible for sending events to a sink and when they fail to do so, they are responsible for sending the failing events to an **dead letter sink**. + +The dead letter sink can be a channel but it does not have to. + +### Dead-Letter Channel + +Knative Channel implementations may leverage existing platform native error handling support they might provide, like [_Dead Letter Channel_](https://www.enterpriseintegrationpatterns.com/patterns/messaging/DeadLetterChannel.html), to forward failed events from their _Dead Letter Channel_ to the configured error sink(s). + +### Retry + +Channel implementations, event sources and brokers should retry sending events before redirecting them to the dead letter sink. +While there are many different ways to implement the retry logic +(immediate retry, retry queue, etc...), implementations should +rely on a common set of configuration parameters, such as +the number of retries and the interval between retries. + +### Delivery Specification + +The goal of this delivery specification is to formally define the vocabulary related to capabilities defined above (dead letter sink, dead-letter queues and retry) to provide consistency across all Knative event sources, channel implementations and brokers. + +The minimal delivery specification looks like this: + +```go + +// DeliverySpec contains the delivery options for event senders, +// such as channelable and source. +type DeliverySpec struct { + // DeadLetterSink is the sink receiving events that couldn't be sent to + // a destination. + // +optional + DeadLetterSink *apisv1alpha1.Destination `json:"deadLetterSink,omitempty"` + + // Retry is the minimum number of retries the sender should attempt when + // sending a event before moving it to the dead letter sink + // +optional + Retry *int32 `json:"retry,omitempty"` + + // BackoffPolicy is the retry backoff policy (linear, exponential) + // +optional + BackoffPolicy *BackoffPolicyType `json:"backoffPolicy,omitempty"` + + // BackoffDelay is the delay before retrying. + // More information on Duration format: https://www.ietf.org/rfc/rfc3339.txt + // + // For linear policy, backoff delay is the time interval between retries. + // For exponential policy , backoff delay is backoffDelay*2^ + // +optional + BackoffDelay *string +} + +// BackoffPolicyType is the type for backoff policies +type BackoffPolicyType string + +const ( + // Linear backoff policy + BackoffPolicyLinear BackoffPolicyType = "linear" + + // Exponential backoff policy + BackoffPolicyExponential BackoffPolicyType = "exponential" +) +``` + +Channel, brokers and event sources are not required to support all these capabilities and are free to add more delivery options. + +### Exposing underlying DLC + +Channel implementation supporting dead letter channel should advertise it in their status. + +```go + +// DeliveryStatus contains the Status of an object supporting delivery options. +type DeliveryStatus struct { + // DeadLetterChannel is the reference to the native, platform specific channel where failed events are sent to. + // +optional + DeadLetterChannel *corev1.ObjectReference `json:"deadLetterChannel,omitempty"` +} +``` + +### Error events + +The error event is the original events annotated with various CloudEvents attributes, eg. to be able to tell why the event could not be delivered. + +Note that multiple copies of the same event can be sent to the error sink due to multiple subscription failures. + +Brokers might decide to change the event type before reposting the failed event into the broker. This could be done by having a special error sink specific to broker. + +### CloudEvent extensions + +(might move to the CloudEvent spec repository) + +Here a possible set of CloudEvent extensions: + +* deadlettersubscriberuri: The URI of the subscriber +* deadletterreason: The reason for dead lettering the event +* deadletterretry: How many times the channel tried to send the event diff --git a/docs/delivery/imc-one-per-subcriber.yaml b/docs/delivery/imc-one-per-subcriber.yaml new file mode 100644 index 00000000000..e1b0fc33dfd --- /dev/null +++ b/docs/delivery/imc-one-per-subcriber.yaml @@ -0,0 +1,16 @@ +apiVersion: messaging.knative.dev/v1alpha1 +kind: InMemoryChannel +metadata: + name: imc +spec: + # Channel delivery configuration + delivery: + retry: 5 + backoffPolicy: exponential + backoffDelay: 3s + subscribable: # populated by Subscription + subscribers: + - subscriberURI: mysub.default.svc.cluster.local + delivery: + deadLetterSinkURI: mydls.default.svc.cluster.local + diff --git a/docs/delivery/imc.yaml b/docs/delivery/imc.yaml new file mode 100644 index 00000000000..f5e0a72c51e --- /dev/null +++ b/docs/delivery/imc.yaml @@ -0,0 +1,17 @@ +apiVersion: messaging.knative.dev/v1alpha1 +kind: InMemoryChannel +metadata: + name: imc +spec: + delivery: + deadLetterSink: + apiVersion: serving.knative.dev/v1beta1 + kind: Service + name: handle-error + retry: 5 + backoffPolicy: exponential + backoffDelay: 3s + subscribable: # populated by Subscription + subscribers: + - subscriberURI: mysub.default.svc.cluster.local + diff --git a/pkg/apis/duck/v1alpha1/channel_template_types.go b/pkg/apis/duck/v1alpha1/channel_template_types.go index 819127d7048..cb80d7d3d59 100644 --- a/pkg/apis/duck/v1alpha1/channel_template_types.go +++ b/pkg/apis/duck/v1alpha1/channel_template_types.go @@ -16,8 +16,10 @@ limitations under the License. package v1alpha1 -import "k8s.io/apimachinery/pkg/runtime" -import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type ChannelTemplateSpec struct { diff --git a/pkg/apis/duck/v1alpha1/channelable_types.go b/pkg/apis/duck/v1alpha1/channelable_types.go index 57fc0093056..778ec808d95 100644 --- a/pkg/apis/duck/v1alpha1/channelable_types.go +++ b/pkg/apis/duck/v1alpha1/channelable_types.go @@ -25,6 +25,7 @@ import ( duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/apis/duck/v1alpha1" duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" + apisv1alpha1 "knative.dev/pkg/apis/v1alpha1" ) // +genclient @@ -46,6 +47,10 @@ type Channelable struct { // ChannelableSpec contains Spec of the Channelable object type ChannelableSpec struct { SubscribableTypeSpec `json:",inline"` + + // DeliverySpec contains options controlling the event delivery + // +optional + Delivery *DeliverySpec `json:"delivery,omitempty"` } // ChannelableStatus contains the Status of a Channelable object. @@ -58,6 +63,9 @@ type ChannelableStatus struct { v1alpha1.AddressStatus `json:",inline"` // Subscribers is populated with the statuses of each of the Channelable's subscribers. SubscribableTypeStatus `json:",inline"` + // ErrorChannel is set by the channel when it supports native error handling via a channel + // +optional + ErrorChannel *corev1.ObjectReference `json:"errorChannel,omitempty"` } var ( @@ -82,6 +90,23 @@ func (c *Channelable) Populate() { ReplyURI: "sink2", }}, } + retry := int32(5) + linear := BackoffPolicyLinear + delay := "5s" + c.Spec.Delivery = &DeliverySpec{ + DeadLetterSink: &apisv1alpha1.Destination{ + Ref: &corev1.ObjectReference{ + Name: "aname", + }, + URI: &apis.URL{ + Scheme: "http", + Host: "test-error-domain", + }, + }, + Retry: &retry, + BackoffPolicy: &linear, + BackoffDelay: &delay, + } c.Status = ChannelableStatus{ AddressStatus: v1alpha1.AddressStatus{ Address: &v1alpha1.Addressable{ diff --git a/pkg/apis/duck/v1alpha1/channelable_types_test.go b/pkg/apis/duck/v1alpha1/channelable_types_test.go index 06a0b3af7c5..2e22b40fe53 100644 --- a/pkg/apis/duck/v1alpha1/channelable_types_test.go +++ b/pkg/apis/duck/v1alpha1/channelable_types_test.go @@ -23,6 +23,7 @@ import ( "knative.dev/pkg/apis" "knative.dev/pkg/apis/duck/v1alpha1" duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" + apisv1alpha1 "knative.dev/pkg/apis/v1alpha1" "github.com/google/go-cmp/cmp" ) @@ -40,6 +41,9 @@ func TestChannelableGetListType(t *testing.T) { func TestChannelablePopulate(t *testing.T) { got := &Channelable{} + retry := int32(5) + linear := BackoffPolicyLinear + delay := "5s" want := &Channelable{ Spec: ChannelableSpec{ SubscribableTypeSpec: SubscribableTypeSpec{ @@ -57,7 +61,22 @@ func TestChannelablePopulate(t *testing.T) { }}, }, }, + Delivery: &DeliverySpec{ + DeadLetterSink: &apisv1alpha1.Destination{ + Ref: &corev1.ObjectReference{ + Name: "aname", + }, + URI: &apis.URL{ + Scheme: "http", + Host: "test-error-domain", + }, + }, + Retry: &retry, + BackoffPolicy: &linear, + BackoffDelay: &delay, + }, }, + Status: ChannelableStatus{ AddressStatus: v1alpha1.AddressStatus{ Address: &v1alpha1.Addressable{ diff --git a/pkg/apis/duck/v1alpha1/delivery_types.go b/pkg/apis/duck/v1alpha1/delivery_types.go new file mode 100644 index 00000000000..8a829be7ab1 --- /dev/null +++ b/pkg/apis/duck/v1alpha1/delivery_types.go @@ -0,0 +1,67 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + corev1 "k8s.io/api/core/v1" + apisv1alpha1 "knative.dev/pkg/apis/v1alpha1" +) + +// DeliverySpec contains the delivery options for event senders, +// such as channelable and source. +type DeliverySpec struct { + // DeadLetterSink is the sink receiving event that couldn't be sent to + // a destination. + // +optional + DeadLetterSink *apisv1alpha1.Destination `json:"deadLetterSink,omitempty"` + + // Retry is the minimum number of retries the sender should attempt when + // sending an event before moving it to the dead letter sink. + // +optional + Retry *int32 `json:"retry,omitempty"` + + // BackoffPolicy is the retry backoff policy (linear, exponential) + // +optional + BackoffPolicy *BackoffPolicyType `json:"backoffPolicy,omitempty"` + + // BackoffDelay is the delay before retrying. + // More information on Duration format: https://www.ietf.org/rfc/rfc3339.txt + // + // For linear policy, backoff delay is the time interval between retries. + // For exponential policy , backoff delay is backoffDelay*2^ + // +optional + BackoffDelay *string +} + +// BackoffPolicyType is the type for backoff policies +type BackoffPolicyType string + +const ( + // Linear backoff policy + BackoffPolicyLinear BackoffPolicyType = "linear" + + // Exponential backoff policy + BackoffPolicyExponential BackoffPolicyType = "exponential" +) + +// DeliveryStatus contains the Status of an object supporting delivery options. +type DeliveryStatus struct { + // DeadLetterChannel is the reference to the native, platform specific channel + // where failed events are sent to. + // +optional + DeadLetterChannel *corev1.ObjectReference `json:"deadLetterChannel,omitempty"` +} diff --git a/pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go index e6976a3f758..93dcab3ce34 100644 --- a/pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go @@ -23,6 +23,7 @@ package v1alpha1 import ( v1 "k8s.io/api/core/v1" runtime "k8s.io/apimachinery/pkg/runtime" + apisv1alpha1 "knative.dev/pkg/apis/v1alpha1" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -151,6 +152,11 @@ func (in *ChannelableList) DeepCopyObject() runtime.Object { func (in *ChannelableSpec) DeepCopyInto(out *ChannelableSpec) { *out = *in in.SubscribableTypeSpec.DeepCopyInto(&out.SubscribableTypeSpec) + if in.Delivery != nil { + in, out := &in.Delivery, &out.Delivery + *out = new(DeliverySpec) + (*in).DeepCopyInto(*out) + } return } @@ -170,6 +176,11 @@ func (in *ChannelableStatus) DeepCopyInto(out *ChannelableStatus) { in.Status.DeepCopyInto(&out.Status) in.AddressStatus.DeepCopyInto(&out.AddressStatus) in.SubscribableTypeStatus.DeepCopyInto(&out.SubscribableTypeStatus) + if in.ErrorChannel != nil { + in, out := &in.ErrorChannel, &out.ErrorChannel + *out = new(v1.ObjectReference) + **out = **in + } return } @@ -183,6 +194,63 @@ func (in *ChannelableStatus) DeepCopy() *ChannelableStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DeliverySpec) DeepCopyInto(out *DeliverySpec) { + *out = *in + if in.DeadLetterSink != nil { + in, out := &in.DeadLetterSink, &out.DeadLetterSink + *out = new(apisv1alpha1.Destination) + (*in).DeepCopyInto(*out) + } + if in.Retry != nil { + in, out := &in.Retry, &out.Retry + *out = new(int32) + **out = **in + } + if in.BackoffPolicy != nil { + in, out := &in.BackoffPolicy, &out.BackoffPolicy + *out = new(BackoffPolicyType) + **out = **in + } + if in.BackoffDelay != nil { + in, out := &in.BackoffDelay, &out.BackoffDelay + *out = new(string) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeliverySpec. +func (in *DeliverySpec) DeepCopy() *DeliverySpec { + if in == nil { + return nil + } + out := new(DeliverySpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DeliveryStatus) DeepCopyInto(out *DeliveryStatus) { + *out = *in + if in.DeadLetterChannel != nil { + in, out := &in.DeadLetterChannel, &out.DeadLetterChannel + *out = new(v1.ObjectReference) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeliveryStatus. +func (in *DeliveryStatus) DeepCopy() *DeliveryStatus { + if in == nil { + return nil + } + out := new(DeliveryStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Resource) DeepCopyInto(out *Resource) { *out = *in