Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ verify:
@echo Running the spec phrase checker:
@tools/verify-specs.sh -v spec.md documented-extensions.md json-format.md \
http-transport-binding.md http-webhook.md mqtt-transport-binding.md \
nats-transport-binding.md protobuf-format.md
nats-transport-binding.md protobuf-format.md \
kafka-transport-binding.md
@echo Running the doc phrase checker:
@tools/verify-docs.sh -v .
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ The following documents are available:
| AMQP Transport Binding | [v0.3](https://github.com/cloudevents/spec/blob/v0.3/amqp-transport-binding.md) | [master](https://github.com/cloudevents/spec/blob/master/amqp-transport-binding.md) |
| HTTP Transport Binding | [v0.3](https://github.com/cloudevents/spec/blob/v0.3/http-transport-binding.md) | [master](https://github.com/cloudevents/spec/blob/master/http-transport-binding.md) |
| JSON Event Format | [v0.3](https://github.com/cloudevents/spec/blob/v0.3/json-format.md) | [master](https://github.com/cloudevents/spec/blob/master/json-format.md) |
| Kafka Transport Binding | - | [master](https://github.com/cloudevents/spec/blob/master/kafka-transport-binding.md) |
| MQTT Transport Binding | [v0.3](https://github.com/cloudevents/spec/blob/v0.3/mqtt-transport-binding.md) | [master](https://github.com/cloudevents/spec/blob/master/mqtt-transport-binding.md) |
| NATS Transport Binding | [v0.3](https://github.com/cloudevents/spec/blob/v0.3/nats-transport-binding.md) | [master](https://github.com/cloudevents/spec/blob/master/nats-transport-binding.md) |
| Protocol Buffers Event Format | [v0.3](https://github.com/cloudevents/spec/blob/v0.3/protobuf-format.md) | [master](https://github.com/cloudevents/spec/blob/master/protobuf-format.md) |
Expand Down
281 changes: 281 additions & 0 deletions kafka-transport-binding.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
# Kafka Transport Binding for CloudEvents - Version 0.4-wip

## Abstract

The [Kafka][Kafka] Transport Binding for CloudEvents defines how events are
mapped to [Kafka messages][Kafka-Message-Format].

## Status of this document

This document is a working draft.

## Table of Contents

1. [Introduction](#1-introduction)
- 1.1. [Conformance](#11-conformance)
- 1.2. [Relation to Kafka](#12-relation-to-kafka)
- 1.3. [Content Modes](#13-content-modes)
- 1.4. [Event Formats](#14-event-formats)
- 1.5. [Security](#15-security)
2. [Use of CloudEvents Attributes](#2-use-of-cloudevents-attributes)
- 2.1. [data Attribute](#21-data-attribute)
3. [Kafka Message Mapping](#3-kafka-message-mapping)
- 3.1. [Key Attribute](#31-key-attribute)
- 3.2. [Binary Content Mode](#32-binary-content-mode)
- 3.3. [Structured Content Mode](#33-structured-content-mode)
4. [References](#4-references)

## 1. Introduction

[CloudEvents][CE] is a standardized and transport-neutral definition of the
structure and metadata description of events. This specification defines how
the elements defined in the CloudEvents specification are to be used in the
Kafka protocol as [Kafka messages][Kafka-Message-Format] (aka Kafka records).

### 1.1. Conformance

The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT", "SHOULD",
"SHOULD NOT", "RECOMMENDED", "MAY", and "OPTIONAL" in this document are to be
interpreted as described in [RFC2119][RFC2119].

### 1.2. Relation to Kafka

This specification does not prescribe rules constraining transfer or settlement
of event messages with Kafka; it solely defines how CloudEvents are expressed
in the Kafka protocol as [Kafka messages][Kafka-Message-Format].

### 1.3. Content Modes

The specification defines two content modes for transferring events:
*structured* and *binary*.

The *binary* mode *only* applies to Kafka 0.11.0.0 and above, because Kafka
0.10.x.x and below lack support for message level headers.

In the *binary* content mode, the value of the event `data` attribute MUST be
placed into the Kafka message's value section as-is, with the
`ce_datacontenttype` header value declaring its media type; all other
event attributes MUST be mapped to the Kafka message's
[header section][Kafka-Message-Header].

In the *structured* content mode, event metadata attributes and event data are
placed into the Kafka message value section
using an [event format](#14-event-formats).



### 1.4. Event Formats

Event formats, used with the *structured* content mode, define how an event is
expressed in a particular data format. All implementations of this
specification MUST support the [JSON event format][JSON-format].

### 1.5. Security

This specification does not introduce any new security features for Kafka, or
mandate specific existing features to be used.

## 2. Use of CloudEvents Attributes

This specification does not further define any of the [CloudEvents][CE] event
attributes.

### 2.1. data Attribute

The `data` attribute is assumed to contain opaque application data that is
encoded as declared by the `datacontenttype` attribute.

An application is free to hold the information in any in-memory representation
of its choosing, but as the value is transposed into Kafka as defined in this
specification, core Kafka provides data available as a sequence of bytes.

For instance, if the declared `datacontenttype` is
`application/json;charset=utf-8`, the expectation is that the `data` attribute
value is made available as [UTF-8][RFC3629] encoded JSON text.

## 3. Kafka Message Mapping

With Kafka 0.11.0.0 and above, the content mode is chosen by the sender of the
event. Protocol usage patterns that might allow solicitation of events using a
particular content mode might be defined by an application, but are not defined
here.

The receiver of the event can distinguish between the two content modes by
inspecting the `ce_datacontenttype` [Header][Kafka-Message-Header] of the
Kafka message. If the value is prefixed with the CloudEvents media type
`application/cloudevents`, indicating the use of a known
[event format](#14-event-formats), the receiver uses *structured* mode, otherwise
it defaults to *binary* mode.

If a receiver finds a CloudEvents media type as per the above rule, but with an
event format that it cannot handle, for instance
`application/cloudevents+avro`, it MAY still treat the event as binary and
forward it to another party as-is.

### 3.1. Key Attribute
The 'key' attribute is populated by a partitionKeyExtractor function. The
partitionKeyExtractor is a transport specific function that contains bespoke logic
to extract and populate the value. A default implementation of the extractor will
use the [Partitioning](extensions/partitioning.md) extension value.

### 3.2. Binary Content Mode

The *binary* content mode accommodates any shape of event data, and allows for
efficient transfer and without transcoding effort.

#### 3.2.1. Content Type

For the *binary* mode, the header `ce_datacontenttype` property MUST be
mapped directly to the CloudEvents `datacontenttype` attribute.


#### 3.2.2. Event Data Encoding

The [`data` attribute](#21-data-attribute) byte-sequence MUST be used as the
value of the Kafka message.

#### 3.2.3. Metadata Headers

All [CloudEvents][CE] attributes and
[CloudEvent Attributes Extensions](primer.md#cloudevent-attribute-extensions)
with exception of `data` MUST be individually mapped to and from the Header
fields in the Kafka message.

##### 3.2.3.1 Property Names

CloudEvent attributes are prefixed with "ce_" for use in the
[message-headers][Kafka-Message-Header] section.

Examples:

* `time` maps to `ce_time`
* `id` maps to `ce_id`
* `specversion` maps to `ce_specversion`

##### 3.2.4.2 Property Values

The value for each Kafka header is constructed from the respective
header's Kafka representation, compliant with the [Kafka message
format][Kafka-Message-Format] specification.


#### 3.2.5 Example

This example shows the *binary* mode mapping of an event into the
Kafka message. All other CloudEvents attributes
are mapped to Kafka Header fields with prefix `ce_`.

Mind that `ce_` here does refer to the event `data`
content carried in the payload.

``` text
------------------ Message -------------------

Topic Name: mytopic

------------------- key ----------------------

Key: mykey
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Kafka message key is where things are still a bit blurry to me. How will its value be obtained from the CloudEvents event? Have you foreseen some extractor function or similar?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking more about that one, perhaps a header key should be defined whose value, if present, will be propagated as key of the Kafka message, e.g. cloudEvents_messageKey. Alternatively, the binding, wherever it's running, could be configured with some kind of path expression which gets applied to the data value to extract the value, e.g. /customer/id.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, there is a PR for this: #218

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, thanks. I've commented over there, too. Seems that'd need resolution first.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conflict we have here is that the partition key is a transport specific concern (and the requirements may vary across different transport options) and that the publisher ought not to care about what transport the event route gets bound to. We may also have the situation that an event gets first published via MQTT from a little device and then gets put on Kafka by a device gateway.

I think this binding needs to define a rule by which a key is constructed from the event rather than expecting that the event brings it along.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that the natural way to get a key for a Kafka message is to use the source.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if source is the right place - I'd expect all events produced by same application to have identical source, why partitioning typically applies to events produced by same source.

I like the proposal from @clemensv in #218 :
" instead of putting the burden of producing an event key on the client and then only having one, any transport that requires particular constructs such as keys such define a mechanism by which you can harvest/synthesize a key from an incoming CloudEvent as some sort of transform. The spec doesn't need to prescribe how -- it just needs to say that that's how the key materializes. A transform that just cooks up a random key might also be valid if that's what you want."

KafkaConnect already has "key extraction" transformation, exactly because external records require mapping to Kafka keys, and the logic for doing so varies between use-cases.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gwenshap @clemensv that would mean that the kafka transport spec would leave the determination of the "key" as an exercise for the implementer (or admin configuring the transport) ?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would hope a reasonable implementation of transport would allow plugging in the "key selection" logic, since we can't know in advance what it will be. (Although I'm still quite new to CloudEvents, so take my opinions with a bit of salt)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added a comment on this over at #218. The idea being that instead of having fully pluggable logic for retrieving the key instead there could be a mechanism to just select the key from a given event field when sending a cloud event to a Kafka Topic.


------------------ headers -------------------

ce_specversion: "0.4-wip"
ce_type: "com.example.someevent"
ce_source: "/mycontext/subcontext"
ce_id: "1234-1234-1234"
ce_time: "2018-04-05T03:56:24Z"
ce_datacontenttype: application/avro
.... further attributes ...

------------------- value --------------------

... application data ...

-----------------------------------------------
```

### 3.3. Structured Content Mode

The *structured* content mode keeps event metadata and data together in the
payload, allowing simple forwarding of the same event across multiple routing
hops, and across multiple transports.

#### 3.3.1. Kafka Content-Type

The [Kafka `content-type`] property field MUST be set to the media
type of an [event format](#14-event-formats).

Example for the [JSON format][JSON-format]:

``` text
content-type: application/cloudevents+json; charset=UTF-8
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shows no prefix

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this missing a ce_ or is the sentence in the first paragraph of this section incorrect?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the intro paragraph is wrong

```

#### 3.3.2. Event Data Encoding

The chosen [event format](#14-event-formats) defines how all attributes,
including the `data` attribute, are represented.

The event metadata and data are then rendered in accordance with the [event
format](#14-event-formats) specification and the resulting data becomes the
Kafka application [data][data] section.

#### 3.3.3. Metadata Headers

Implementations MAY include the same Kafka headers as defined for the
[binary mode](#32-binary-content-mode).

#### 3.3.4 Example

This example shows a JSON event format encoded event:

``` text
------------------ Message -------------------

Topic Name: mytopic

------------------- key ----------------------

Key: mykey

------------------ headers -------------------

content-type: application/cloudevents+json; charset=UTF-8

------------------- value --------------------

{
"specversion" : "0.4-wip",
"datacontenttype" : "com.example.someevent",

... further attributes omitted ...

"data" : {
... application data ...
}
}

-----------------------------------------------
```

## 4. References

- [Kafka][Kafka] The distributed stream platform
- [Kafka-Message-Format][Kafka-Message-Format] The Kafka format message
- [RFC2046][RFC2046] Multipurpose Internet Mail Extensions (MIME) Part Two:
Media Types
- [RFC2119][RFC2119] Key words for use in RFCs to Indicate Requirement Levels
- [RFC3629][RFC3629] UTF-8, a transformation format of ISO 10646
- [RFC7159][RFC7159] The JavaScript Object Notation (JSON) Data Interchange
Format

[CE]: ./spec.md
[JSON-format]: ./json-format.md
[Kafka]: https://kafka.apache.org
[Kafka-Message-Format]: https://kafka.apache.org/documentation/#messageformat
[Kafka-Message-Header]: https://kafka.apache.org/documentation/#recordheader
[JSON-Value]: https://tools.ietf.org/html/rfc7159#section-3
[RFC2046]: https://tools.ietf.org/html/rfc2046
[RFC2119]: https://tools.ietf.org/html/rfc2119
[RFC3629]: https://tools.ietf.org/html/rfc3629
[RFC7159]: https://tools.ietf.org/html/rfc7159