Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions docs/delivery/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Delivery Design

This document synthetizes the [error handling design document](https://docs.google.com/document/d/1qRrzGoHJQO-oc5p-yRK8IRfugd-FM_PXyM7lN5kcqks).

## Problem

Sending events can fail for a variety of reasons (downstream system is down, application logic rejects the invalid event, a runtime exception occurs, etc...) but right now there is no way to control or define the expected behavior in these situations.

## Requirements

* Be able to handle events that failed to be delivered
* to channel subscribers.
* to source sink.
* to broker/triggers.
* Be able to identify events that could not be delivered (Observability)
* Be able to leverage existing error handling mechanisms provided by the
underlying platform (eg. RabbitMQ dead letter exchange, Amazon SQS dead letter queue, Azure Service Bus dead letter queue, etc...).
* Be able to redirect of error'ed events from a channel.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

events? or messages? let's be consistent, and I think event is the right term, over message(s) /cc @nachocano

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with using event everywhere.


### Out of Scope

* Security: No security model has been defined yet at the Knative eventing level. Native brokers might define their own security model and Knative eventing should not prevent using it.

## Day 1 Proposal

### Dead Letter Sink

Channels are responsible for forwarding received events to subscribers. When they fail to do so, they are responsible for sending the failing events to an **dead letter sink**, potentially one per subscriber.

Similarly Event Sources are responsible for sending events to a sink and when they fail to do so, they are responsible for sending the failing events to an **dead letter sink**.

The dead letter sink can be a channel but it does not have to.

### Dead-Letter Channel

Knative Channel implementations may leverage existing platform native error handling support they might provide, like [_Dead Letter Channel_](https://www.enterpriseintegrationpatterns.com/patterns/messaging/DeadLetterChannel.html), to forward failed events from their _Dead Letter Channel_ to the configured error sink(s).

### Retry

Channel implementations, event sources and brokers should retry sending events before redirecting them to the dead letter sink.
While there are many different ways to implement the retry logic
(immediate retry, retry queue, etc...), implementations should
rely on a common set of configuration parameters, such as
the number of retries and the interval between retries.

### Delivery Specification

The goal of this delivery specification is to formally define the vocabulary related to capabilities defined above (dead letter sink, dead-letter queues and retry) to provide consistency across all Knative event sources, channel implementations and brokers.

The minimal delivery specification looks like this:

```go

// DeliverySpec contains the delivery options for event senders,
// such as channelable and source.
type DeliverySpec struct {
// DeadLetterSink is the sink receiving events that couldn't be sent to
// a destination.
// +optional
DeadLetterSink *apisv1alpha1.Destination `json:"deadLetterSink,omitempty"`

// Retry is the minimum number of retries the sender should attempt when
// sending a event before moving it to the dead letter sink
// +optional
Retry *int32 `json:"retry,omitempty"`

// BackoffPolicy is the retry backoff policy (linear, exponential)
// +optional
BackoffPolicy *BackoffPolicyType `json:"backoffPolicy,omitempty"`

// BackoffDelay is the delay before retrying.
// More information on Duration format: https://www.ietf.org/rfc/rfc3339.txt
//
// For linear policy, backoff delay is the time interval between retries.
// For exponential policy , backoff delay is backoffDelay*2^<numberOfRetries>
// +optional
BackoffDelay *string
}

// BackoffPolicyType is the type for backoff policies
type BackoffPolicyType string

const (
// Linear backoff policy
BackoffPolicyLinear BackoffPolicyType = "linear"

// Exponential backoff policy
BackoffPolicyExponential BackoffPolicyType = "exponential"
)
```

Channel, brokers and event sources are not required to support all these capabilities and are free to add more delivery options.

### Exposing underlying DLC

Channel implementation supporting dead letter channel should advertise it in their status.

```go

// DeliveryStatus contains the Status of an object supporting delivery options.
type DeliveryStatus struct {
// DeadLetterChannel is the reference to the native, platform specific channel where failed events are sent to.
// +optional
DeadLetterChannel *corev1.ObjectReference `json:"deadLetterChannel,omitempty"`
}
```

### Error events

The error event is the original events annotated with various CloudEvents attributes, eg. to be able to tell why the event could not be delivered.

Note that multiple copies of the same event can be sent to the error sink due to multiple subscription failures.

Brokers might decide to change the event type before reposting the failed event into the broker. This could be done by having a special error sink specific to broker.
Copy link
Copy Markdown
Contributor

@sayanh sayanh Oct 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean brokers will aggregate the events(in case of multiple copies) based on id? Why brokers will change the event type? If done, the subscribers will be impacted. Can you give an example where it makes sense to change the event type?


### CloudEvent extensions

(might move to the CloudEvent spec repository)

Here a possible set of CloudEvent extensions:

* deadlettersubscriberuri: The URI of the subscriber
* deadletterreason: The reason for dead lettering the event
* deadletterretry: How many times the channel tried to send the event
16 changes: 16 additions & 0 deletions docs/delivery/imc-one-per-subcriber.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apiVersion: messaging.knative.dev/v1alpha1
kind: InMemoryChannel
metadata:
name: imc
spec:
# Channel delivery configuration
delivery:
retry: 5
backoffPolicy: exponential
backoffDelay: 3s
subscribable: # populated by Subscription
subscribers:
- subscriberURI: mysub.default.svc.cluster.local
delivery:
deadLetterSinkURI: mydls.default.svc.cluster.local

17 changes: 17 additions & 0 deletions docs/delivery/imc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: messaging.knative.dev/v1alpha1
kind: InMemoryChannel
metadata:
name: imc
spec:
delivery:
deadLetterSink:
apiVersion: serving.knative.dev/v1beta1
kind: Service
name: handle-error
retry: 5
backoffPolicy: exponential
backoffDelay: 3s
subscribable: # populated by Subscription
subscribers:
- subscriberURI: mysub.default.svc.cluster.local

6 changes: 4 additions & 2 deletions pkg/apis/duck/v1alpha1/channel_template_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ limitations under the License.

package v1alpha1

import "k8s.io/apimachinery/pkg/runtime"
import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type ChannelTemplateSpec struct {
Expand Down
25 changes: 25 additions & 0 deletions pkg/apis/duck/v1alpha1/channelable_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/apis/duck/v1alpha1"
duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1"
apisv1alpha1 "knative.dev/pkg/apis/v1alpha1"
)

// +genclient
Expand All @@ -46,6 +47,10 @@ type Channelable struct {
// ChannelableSpec contains Spec of the Channelable object
type ChannelableSpec struct {
SubscribableTypeSpec `json:",inline"`

// DeliverySpec contains options controlling the event delivery
// +optional
Delivery *DeliverySpec `json:"delivery,omitempty"`
}

// ChannelableStatus contains the Status of a Channelable object.
Expand All @@ -58,6 +63,9 @@ type ChannelableStatus struct {
v1alpha1.AddressStatus `json:",inline"`
// Subscribers is populated with the statuses of each of the Channelable's subscribers.
SubscribableTypeStatus `json:",inline"`
// ErrorChannel is set by the channel when it supports native error handling via a channel
// +optional
ErrorChannel *corev1.ObjectReference `json:"errorChannel,omitempty"`
}

var (
Expand All @@ -82,6 +90,23 @@ func (c *Channelable) Populate() {
ReplyURI: "sink2",
}},
}
retry := int32(5)
linear := BackoffPolicyLinear
delay := "5s"
c.Spec.Delivery = &DeliverySpec{
DeadLetterSink: &apisv1alpha1.Destination{
Ref: &corev1.ObjectReference{
Name: "aname",
},
URI: &apis.URL{
Scheme: "http",
Host: "test-error-domain",
},
},
Retry: &retry,
BackoffPolicy: &linear,
BackoffDelay: &delay,
}
c.Status = ChannelableStatus{
AddressStatus: v1alpha1.AddressStatus{
Address: &v1alpha1.Addressable{
Expand Down
19 changes: 19 additions & 0 deletions pkg/apis/duck/v1alpha1/channelable_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"knative.dev/pkg/apis"
"knative.dev/pkg/apis/duck/v1alpha1"
duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1"
apisv1alpha1 "knative.dev/pkg/apis/v1alpha1"

"github.com/google/go-cmp/cmp"
)
Expand All @@ -40,6 +41,9 @@ func TestChannelableGetListType(t *testing.T) {
func TestChannelablePopulate(t *testing.T) {
got := &Channelable{}

retry := int32(5)
linear := BackoffPolicyLinear
delay := "5s"
want := &Channelable{
Spec: ChannelableSpec{
SubscribableTypeSpec: SubscribableTypeSpec{
Expand All @@ -57,7 +61,22 @@ func TestChannelablePopulate(t *testing.T) {
}},
},
},
Delivery: &DeliverySpec{
DeadLetterSink: &apisv1alpha1.Destination{
Ref: &corev1.ObjectReference{
Name: "aname",
},
URI: &apis.URL{
Scheme: "http",
Host: "test-error-domain",
},
},
Retry: &retry,
BackoffPolicy: &linear,
BackoffDelay: &delay,
},
},

Status: ChannelableStatus{
AddressStatus: v1alpha1.AddressStatus{
Address: &v1alpha1.Addressable{
Expand Down
67 changes: 67 additions & 0 deletions pkg/apis/duck/v1alpha1/delivery_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
corev1 "k8s.io/api/core/v1"
apisv1alpha1 "knative.dev/pkg/apis/v1alpha1"
)

// DeliverySpec contains the delivery options for event senders,
// such as channelable and source.
type DeliverySpec struct {
// DeadLetterSink is the sink receiving event that couldn't be sent to
// a destination.
// +optional
DeadLetterSink *apisv1alpha1.Destination `json:"deadLetterSink,omitempty"`

// Retry is the minimum number of retries the sender should attempt when
// sending an event before moving it to the dead letter sink.
// +optional
Retry *int32 `json:"retry,omitempty"`

// BackoffPolicy is the retry backoff policy (linear, exponential)
// +optional
BackoffPolicy *BackoffPolicyType `json:"backoffPolicy,omitempty"`

// BackoffDelay is the delay before retrying.
// More information on Duration format: https://www.ietf.org/rfc/rfc3339.txt
//
// For linear policy, backoff delay is the time interval between retries.
// For exponential policy , backoff delay is backoffDelay*2^<numberOfRetries>
// +optional
BackoffDelay *string
}

// BackoffPolicyType is the type for backoff policies
type BackoffPolicyType string

const (
// Linear backoff policy
BackoffPolicyLinear BackoffPolicyType = "linear"

// Exponential backoff policy
BackoffPolicyExponential BackoffPolicyType = "exponential"
)

// DeliveryStatus contains the Status of an object supporting delivery options.
type DeliveryStatus struct {
// DeadLetterChannel is the reference to the native, platform specific channel
// where failed events are sent to.
// +optional
DeadLetterChannel *corev1.ObjectReference `json:"deadLetterChannel,omitempty"`
}
Loading