Skip to content
142 changes: 142 additions & 0 deletions docs/ingestion/data-formats.md
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,148 @@ This query returns:
|--------------------|-----------|---------------|---------------|
| `development` | `wiki-edit` | `1680795276351` | `wiki-edits` |

### Kinesis

The `kinesis` input format lets you parse the Kinesis metadata fields in addition to the Kinesis payload value contents.
It should only be used when ingesting from Kinesis.

The `kinesis` input format wraps around the payload parsing input format and augments the data it outputs with the Kinesis event timestamp and partition key, the `ApproximateArrivalTimestamp ` and `PartitionKey` fields in the Kinesis record.

If there are conflicts between column names in the payload and those created from the metadata, the payload takes precedence.
This ensures that upgrading a Kinesis ingestion to use the Kinesis input format (by taking its existing input format and setting it as the `valueFormat`) can be done without losing any of the payload data.

Configure the Kinesis `inputFormat` as follows:

| Field | Type | Description | Required | Default |
|-------|------|---------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------------|
| `type` | String | Set value to `kinesis`. | yes ||
| `valueFormat` | [InputFormat](#input-format) | The [input format](#input-format) to parse the Kinesis value payload. | yes ||
| `partitionKeyColumnName` | String | The name of the column for the Kinesis partition key. This field is useful when ingesting data from multiple partitions into the same datasource. | no | `kinesis.partitionKey` |
| `timestampColumnName` | String | The name of the column for the Kinesis timestamp. | no | `kinesis.timestamp` |

#### Example

Using `{ "type": "json" }` as the input format would only parse the payload value.
To parse the Kinesis metadata in addition to the payload, use the `kinesis` input format.

For example, consider the following structure for a Kinesis record that represents an edit in a development environment:

- **Kinesis timestamp**: `1680795276351`
- **Kinesis partition key**: `partition-1`
- **Kinesis payload value**: `{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo Toraut","delta":31,"namespace":"Main"}`

You would configure it as follows:

```json
{
"ioConfig": {
"inputFormat": {
"type": "kinesis",
"valueFormat": {
"type": "json"
},
"timestampColumnName": "kinesis.timestamp",
"partitionKeyColumnName": "kinesis.partitionKey"
}
}
}
```

You would parse the example record as follows:

```json
{
"channel": "#sv.wikipedia",
"timestamp": "2016-06-27T00:00:11.080Z",
"page": "Salo Toraut",
"delta": 31,
"namespace": "Main",
"kinesis.timestamp": 1680795276351,
"kinesis.partitionKey": "partition-1"
}
```

If you want to use `kinesis.timestamp` as Druid's primary timestamp (`__time`), specify it as the value for `column` in the `timestampSpec`:

```json
"timestampSpec": {
"column": "kinesis.timestamp",
"format": "millis"
}
```

Finally, add these Kinesis metadata columns to the `dimensionsSpec` or set your `dimensionsSpec` to automatically detect columns.

The following supervisor spec demonstrates how to ingest the Kinesis timestamp, and partition key into Druid dimensions:

<details>
<summary>Click to view the example</summary>

```json
{
"type": "kinesis",
"spec": {
"ioConfig": {
"type": "kinesis",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"topic": "wiki-edits",
"inputFormat": {
"type": "kinesis",
"valueFormat": {
"type": "json"
},
"headerFormat": {
"type": "string"
},
"keyFormat": {
"type": "tsv",
"findColumnsFromHeader": false,
"columns": ["x"]
}
},
"useEarliestOffset": true
},
"dataSchema": {
"dataSource": "wikiticker",
"timestampSpec": {
"column": "timestamp",
"format": "posix"
},
"dimensionsSpec": {
"useSchemaDiscovery": true,
"includeAllDimensions": true
},
"granularitySpec": {
"queryGranularity": "none",
"rollup": false,
"segmentGranularity": "day"
}
},
"tuningConfig": {
"type": "kinesis"
}
}
}
```
</details>

After Druid ingests the data, you can query the Kinesis metadata columns as follows:

```sql
SELECT
"kinesis.timestamp",
"kinesis.partitionKey"
FROM "wikiticker"
```

This query returns:

| `kinesis.timestamp` | `kinesis.topic` |
|---------------------|-----------------|
| `1680795276351` | `partition-1` |

## FlattenSpec

You can use the `flattenSpec` object to flatten nested data, as an alternative to the Druid [nested columns](../querying/nested-columns.md) feature, and for nested input formats unsupported by the feature. It is an object within the `inputFormat` object.
Expand Down
1 change: 1 addition & 0 deletions docs/ingestion/kinesis-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ The Kinesis indexing service supports both [`inputFormat`](data-formats.md#input

The Kinesis indexing service supports the following values for `inputFormat`:

* `kinesis`
* `csv`
* `tvs`
* `json`
Expand Down