Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
349 changes: 198 additions & 151 deletions docs/ingestion/data-formats.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,203 @@ The Parquet `inputFormat` has the following components:
|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Parquet file. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) |
| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |

### Avro Stream

> You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro Stream input format.

> See the [Avro Types](../development/extensions-core/avro.md#avro-types) section for how Avro types are handled in Druid

The `inputFormat` to load data of Avro format in stream ingestion. An example is:
```json
"ioConfig": {
"inputFormat": {
"type": "avro_stream",
"avroBytesDecoder": {
"type": "schema_inline",
"schema": {
//your schema goes here, for example
"namespace": "org.apache.druid.data",
"name": "User",
"type": "record",
"fields": [
{ "name": "FullName", "type": "string" },
{ "name": "Country", "type": "string" }
]
}
},
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type": "path",
"name": "someRecord_subInt",
"expr": "$.someRecord.subInt"
}
]
},
"binaryAsString": false
},
...
}
```

| Field | Type | Description | Required |
|-------|------|-------------|----------|
|type| String| This should be set to `avro_stream` to read Avro serialized data| yes |
|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Avro record. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) |
|`avroBytesDecoder`| JSON Object |Specifies how to decode bytes to Avro record. | yes |
| binaryAsString | Boolean | Specifies if the bytes Avro column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move https://github.com/apache/druid/blob/master/docs/ingestion/data-formats.md#avro-bytes-decoder (which currently lives with the 'parsers' documentation) up to this 'input formats' section, and have the parsers section link to the bytes decoder docs here.

##### Avro Bytes Decoder

If `type` is not included, the avroBytesDecoder defaults to `schema_repo`.

###### Inline Schema Based Avro Bytes Decoder

> The "schema_inline" decoder reads Avro records using a fixed schema and does not support schema migration. If you
> may need to migrate schemas in the future, consider one of the other decoders, all of which use a message header that
> allows the parser to identify the proper Avro schema for reading records.

This decoder can be used if all the input events can be read using the same schema. In this case, specify the schema in the input task JSON itself, as described below.

```
...
"avroBytesDecoder": {
"type": "schema_inline",
"schema": {
//your schema goes here, for example
"namespace": "org.apache.druid.data",
"name": "User",
"type": "record",
"fields": [
{ "name": "FullName", "type": "string" },
{ "name": "Country", "type": "string" }
]
}
}
...
```

###### Multiple Inline Schemas Based Avro Bytes Decoder

Use this decoder if different input events can have different read schemas. In this case, specify the schema in the input task JSON itself, as described below.

```
...
"avroBytesDecoder": {
"type": "multiple_schemas_inline",
"schemas": {
//your id -> schema map goes here, for example
"1": {
"namespace": "org.apache.druid.data",
"name": "User",
"type": "record",
"fields": [
{ "name": "FullName", "type": "string" },
{ "name": "Country", "type": "string" }
]
},
"2": {
"namespace": "org.apache.druid.otherdata",
"name": "UserIdentity",
"type": "record",
"fields": [
{ "name": "Name", "type": "string" },
{ "name": "Location", "type": "string" }
]
},
...
...
}
}
...
```

Note that it is essentially a map of integer schema ID to avro schema object. This parser assumes that record has following format.
first 1 byte is version and must always be 1.
next 4 bytes are integer schema ID serialized using big-endian byte order.
remaining bytes contain serialized avro message.

##### SchemaRepo Based Avro Bytes Decoder

This Avro bytes decoder first extracts `subject` and `id` from the input message bytes, and then uses them to look up the Avro schema used to decode the Avro record from bytes. For details, see the [schema repo](https://github.com/schema-repo/schema-repo) and [AVRO-1124](https://issues.apache.org/jira/browse/AVRO-1124). You will need an http service like schema repo to hold the avro schema. For information on registering a schema on the message producer side, see `org.apache.druid.data.input.AvroStreamInputRowParserTest#testParse()`.

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `schema_repo`. | no |
| subjectAndIdConverter | JSON Object | Specifies how to extract the subject and id from message bytes. | yes |
| schemaRepository | JSON Object | Specifies how to look up the Avro schema from subject and id. | yes |

###### Avro-1124 Subject And Id Converter

This section describes the format of the `subjectAndIdConverter` object for the `schema_repo` Avro bytes decoder.

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `avro_1124`. | no |
| topic | String | Specifies the topic of your Kafka stream. | yes |


###### Avro-1124 Schema Repository

This section describes the format of the `schemaRepository` object for the `schema_repo` Avro bytes decoder.

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `avro_1124_rest_client`. | no |
| url | String | Specifies the endpoint url of your Avro-1124 schema repository. | yes |

###### Confluent Schema Registry-based Avro Bytes Decoder

This Avro bytes decoder first extracts a unique `id` from input message bytes, and then uses it to look up the schema in the Schema Registry used to decode the Avro record from bytes.
For details, see the Schema Registry [documentation](http://docs.confluent.io/current/schema-registry/docs/) and [repository](https://github.com/confluentinc/schema-registry).

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `schema_registry`. | no |
| url | String | Specifies the url endpoint of the Schema Registry. | yes |
| capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no |
| urls | Array<String> | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) |
| config | Json | To send additional configurations, configured for Schema Registry | no |
| headers | Json | To send headers to the Schema Registry | no |

For a single schema registry instance, use Field `url` or `urls` for multi instances.

Single Instance:
```json
...
"avroBytesDecoder" : {
"type" : "schema_registry",
"url" : <schema-registry-url>
}
...
```

Multiple Instances:
```json
...
"avroBytesDecoder" : {
"type" : "schema_registry",
"urls" : [<schema-registry-url-1>, <schema-registry-url-2>, ...],
"config" : {
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "fred:letmein",
"schema.registry.ssl.truststore.location": "/some/secrets/kafka.client.truststore.jks",
"schema.registry.ssl.truststore.password": "<password>",
"schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks",
"schema.registry.ssl.keystore.password": "<password>",
"schema.registry.ssl.key.password": "<password>"
...
},
"headers": {
"traceID" : "b29c5de2-0db4-490b-b421",
"timeStamp" : "1577191871865",
...
}
}
...
```

### Avro OCF

> You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro OCF input format.
Expand Down Expand Up @@ -876,7 +1073,7 @@ This parser is for [stream ingestion](./index.md#streaming) and reads Avro data
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `avro_stream`. | no |
| avroBytesDecoder | JSON Object | Specifies how to decode bytes to Avro record. | yes |
| avroBytesDecoder | JSON Object | Specifies [`avroBytesDecoder`](#Avro Bytes Decoder) to decode bytes to Avro record. | yes |
| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be an "avro" parseSpec. | yes |

An Avro parseSpec can contain a [`flattenSpec`](#flattenspec) using either the "root" or "path"
Expand Down Expand Up @@ -907,156 +1104,6 @@ For example, using Avro stream parser with schema repo Avro bytes decoder:
}
```

#### Avro Bytes Decoder

If `type` is not included, the avroBytesDecoder defaults to `schema_repo`.

##### Inline Schema Based Avro Bytes Decoder

> The "schema_inline" decoder reads Avro records using a fixed schema and does not support schema migration. If you
> may need to migrate schemas in the future, consider one of the other decoders, all of which use a message header that
> allows the parser to identify the proper Avro schema for reading records.

This decoder can be used if all the input events can be read using the same schema. In this case, specify the schema in the input task JSON itself, as described below.

```
...
"avroBytesDecoder": {
"type": "schema_inline",
"schema": {
//your schema goes here, for example
"namespace": "org.apache.druid.data",
"name": "User",
"type": "record",
"fields": [
{ "name": "FullName", "type": "string" },
{ "name": "Country", "type": "string" }
]
}
}
...
```

##### Multiple Inline Schemas Based Avro Bytes Decoder

Use this decoder if different input events can have different read schemas. In this case, specify the schema in the input task JSON itself, as described below.

```
...
"avroBytesDecoder": {
"type": "multiple_schemas_inline",
"schemas": {
//your id -> schema map goes here, for example
"1": {
"namespace": "org.apache.druid.data",
"name": "User",
"type": "record",
"fields": [
{ "name": "FullName", "type": "string" },
{ "name": "Country", "type": "string" }
]
},
"2": {
"namespace": "org.apache.druid.otherdata",
"name": "UserIdentity",
"type": "record",
"fields": [
{ "name": "Name", "type": "string" },
{ "name": "Location", "type": "string" }
]
},
...
...
}
}
...
```

Note that it is essentially a map of integer schema ID to avro schema object. This parser assumes that record has following format.
first 1 byte is version and must always be 1.
next 4 bytes are integer schema ID serialized using big-endian byte order.
remaining bytes contain serialized avro message.

##### SchemaRepo Based Avro Bytes Decoder

This Avro bytes decoder first extracts `subject` and `id` from the input message bytes, and then uses them to look up the Avro schema used to decode the Avro record from bytes. For details, see the [schema repo](https://github.com/schema-repo/schema-repo) and [AVRO-1124](https://issues.apache.org/jira/browse/AVRO-1124). You will need an http service like schema repo to hold the avro schema. For information on registering a schema on the message producer side, see `org.apache.druid.data.input.AvroStreamInputRowParserTest#testParse()`.

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `schema_repo`. | no |
| subjectAndIdConverter | JSON Object | Specifies how to extract the subject and id from message bytes. | yes |
| schemaRepository | JSON Object | Specifies how to look up the Avro schema from subject and id. | yes |

###### Avro-1124 Subject And Id Converter

This section describes the format of the `subjectAndIdConverter` object for the `schema_repo` Avro bytes decoder.

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `avro_1124`. | no |
| topic | String | Specifies the topic of your Kafka stream. | yes |


###### Avro-1124 Schema Repository

This section describes the format of the `schemaRepository` object for the `schema_repo` Avro bytes decoder.

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `avro_1124_rest_client`. | no |
| url | String | Specifies the endpoint url of your Avro-1124 schema repository. | yes |

##### Confluent Schema Registry-based Avro Bytes Decoder

This Avro bytes decoder first extracts a unique `id` from input message bytes, and then uses it to look up the schema in the Schema Registry used to decode the Avro record from bytes.
For details, see the Schema Registry [documentation](http://docs.confluent.io/current/schema-registry/docs/) and [repository](https://github.com/confluentinc/schema-registry).

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `schema_registry`. | no |
| url | String | Specifies the url endpoint of the Schema Registry. | yes |
| capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no |
| urls | Array<String> | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) |
| config | Json | To send additional configurations, configured for Schema Registry | no |
| headers | Json | To send headers to the Schema Registry | no |

For a single schema registry instance, use Field `url` or `urls` for multi instances.

Single Instance:
```json
...
"avroBytesDecoder" : {
"type" : "schema_registry",
"url" : <schema-registry-url>
}
...
```

Multiple Instances:
```json
...
"avroBytesDecoder" : {
"type" : "schema_registry",
"urls" : [<schema-registry-url-1>, <schema-registry-url-2>, ...],
"config" : {
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "fred:letmein",
"schema.registry.ssl.truststore.location": "/some/secrets/kafka.client.truststore.jks",
"schema.registry.ssl.truststore.password": "<password>",
"schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks",
"schema.registry.ssl.keystore.password": "<password>",
"schema.registry.ssl.key.password": "<password>"
...
},
"headers": {
"traceID" : "b29c5de2-0db4-490b-b421",
"timeStamp" : "1577191871865",
...
}
}
...
```

### Protobuf Parser

> You need to include the [`druid-protobuf-extensions`](../development/extensions-core/protobuf.md) as an extension to use the Protobuf Parser.
Expand Down
5 changes: 5 additions & 0 deletions extensions-core/avro-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.schemarepo</groupId>
<artifactId>schema-repo-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public List<? extends Module> getJacksonModules()
new NamedType(AvroStreamInputRowParser.class, "avro_stream"),
new NamedType(AvroHadoopInputRowParser.class, "avro_hadoop"),
new NamedType(AvroParseSpec.class, "avro"),
new NamedType(AvroOCFInputFormat.class, "avro_ocf")
new NamedType(AvroOCFInputFormat.class, "avro_ocf"),
new NamedType(AvroStreamInputFormat.class, "avro_stream")
)
.setMixInAnnotation(Repository.class, RepositoryMixIn.class)
.setMixInAnnotation(JsonUtil.class, JsonUtilMixIn.class)
Expand Down
Loading