diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index c9c23896a286..96d8597e7f34 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -731,6 +731,148 @@ This query returns: |--------------------|-----------|---------------|---------------| | `development` | `wiki-edit` | `1680795276351` | `wiki-edits` | +### Kinesis + +The `kinesis` input format lets you parse the Kinesis metadata fields in addition to the Kinesis payload value contents. +It should only be used when ingesting from Kinesis. + +The `kinesis` input format wraps around the payload parsing input format and augments the data it outputs with the Kinesis event timestamp and partition key, the `ApproximateArrivalTimestamp ` and `PartitionKey` fields in the Kinesis record. + +If there are conflicts between column names in the payload and those created from the metadata, the payload takes precedence. +This ensures that upgrading a Kinesis ingestion to use the Kinesis input format (by taking its existing input format and setting it as the `valueFormat`) can be done without losing any of the payload data. + +Configure the Kinesis `inputFormat` as follows: + +| Field | Type | Description | Required | Default | +|-------|------|---------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------------| +| `type` | String | Set value to `kinesis`. | yes || +| `valueFormat` | [InputFormat](#input-format) | The [input format](#input-format) to parse the Kinesis value payload. | yes || +| `partitionKeyColumnName` | String | The name of the column for the Kinesis partition key. This field is useful when ingesting data from multiple partitions into the same datasource. | no | `kinesis.partitionKey` | +| `timestampColumnName` | String | The name of the column for the Kinesis timestamp. | no | `kinesis.timestamp` | + +#### Example + +Using `{ "type": "json" }` as the input format would only parse the payload value. +To parse the Kinesis metadata in addition to the payload, use the `kinesis` input format. + +For example, consider the following structure for a Kinesis record that represents an edit in a development environment: + +- **Kinesis timestamp**: `1680795276351` +- **Kinesis partition key**: `partition-1` +- **Kinesis payload value**: `{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo Toraut","delta":31,"namespace":"Main"}` + +You would configure it as follows: + +```json +{ + "ioConfig": { + "inputFormat": { + "type": "kinesis", + "valueFormat": { + "type": "json" + }, + "timestampColumnName": "kinesis.timestamp", + "partitionKeyColumnName": "kinesis.partitionKey" + } + } +} +``` + +You would parse the example record as follows: + +```json +{ + "channel": "#sv.wikipedia", + "timestamp": "2016-06-27T00:00:11.080Z", + "page": "Salo Toraut", + "delta": 31, + "namespace": "Main", + "kinesis.timestamp": 1680795276351, + "kinesis.partitionKey": "partition-1" +} +``` + +If you want to use `kinesis.timestamp` as Druid's primary timestamp (`__time`), specify it as the value for `column` in the `timestampSpec`: + +```json +"timestampSpec": { + "column": "kinesis.timestamp", + "format": "millis" +} +``` + +Finally, add these Kinesis metadata columns to the `dimensionsSpec` or set your `dimensionsSpec` to automatically detect columns. + +The following supervisor spec demonstrates how to ingest the Kinesis timestamp, and partition key into Druid dimensions: + +
+Click to view the example + +```json +{ + "type": "kinesis", + "spec": { + "ioConfig": { + "type": "kinesis", + "consumerProperties": { + "bootstrap.servers": "localhost:9092" + }, + "topic": "wiki-edits", + "inputFormat": { + "type": "kinesis", + "valueFormat": { + "type": "json" + }, + "headerFormat": { + "type": "string" + }, + "keyFormat": { + "type": "tsv", + "findColumnsFromHeader": false, + "columns": ["x"] + } + }, + "useEarliestOffset": true + }, + "dataSchema": { + "dataSource": "wikiticker", + "timestampSpec": { + "column": "timestamp", + "format": "posix" + }, + "dimensionsSpec": { + "useSchemaDiscovery": true, + "includeAllDimensions": true + }, + "granularitySpec": { + "queryGranularity": "none", + "rollup": false, + "segmentGranularity": "day" + } + }, + "tuningConfig": { + "type": "kinesis" + } + } +} +``` +
+ +After Druid ingests the data, you can query the Kinesis metadata columns as follows: + +```sql +SELECT + "kinesis.timestamp", + "kinesis.partitionKey" +FROM "wikiticker" +``` + +This query returns: + +| `kinesis.timestamp` | `kinesis.topic` | +|---------------------|-----------------| +| `1680795276351` | `partition-1` | + ## FlattenSpec You can use the `flattenSpec` object to flatten nested data, as an alternative to the Druid [nested columns](../querying/nested-columns.md) feature, and for nested input formats unsupported by the feature. It is an object within the `inputFormat` object. diff --git a/docs/ingestion/kinesis-ingestion.md b/docs/ingestion/kinesis-ingestion.md index 3b4c5de8548d..8ea2cfd4af35 100644 --- a/docs/ingestion/kinesis-ingestion.md +++ b/docs/ingestion/kinesis-ingestion.md @@ -139,6 +139,7 @@ The Kinesis indexing service supports both [`inputFormat`](data-formats.md#input The Kinesis indexing service supports the following values for `inputFormat`: +* `kinesis` * `csv` * `tvs` * `json`