Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 4 additions & 203 deletions docs/development/extensions-core/avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,206 +24,7 @@ title: "Apache Avro"

This Apache Druid extension enables Druid to ingest and understand the Apache Avro data format. Make sure to [include](../../development/extensions.md#loading-extensions) `druid-avro-extensions` as an extension.

### Avro Stream Parser

This is for streaming/realtime ingestion.

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `avro_stream`. | no |
| avroBytesDecoder | JSON Object | Specifies how to decode bytes to Avro record. | yes |
| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be an "avro" parseSpec. | yes |

An Avro parseSpec can contain a [`flattenSpec`](../../ingestion/index.md#flattenspec) using either the "root" or "path"
field types, which can be used to read nested Avro records. The "jq" field type is not currently supported for Avro.

For example, using Avro stream parser with schema repo Avro bytes decoder:

```json
"parser" : {
"type" : "avro_stream",
"avroBytesDecoder" : {
"type" : "schema_repo",
"subjectAndIdConverter" : {
"type" : "avro_1124",
"topic" : "${YOUR_TOPIC}"
},
"schemaRepository" : {
"type" : "avro_1124_rest_client",
"url" : "${YOUR_SCHEMA_REPO_END_POINT}",
}
},
"parseSpec" : {
"format": "avro",
"timestampSpec": <standard timestampSpec>,
"dimensionsSpec": <standard dimensionsSpec>,
"flattenSpec": <optional>
}
}
```

#### Avro Bytes Decoder

If `type` is not included, the avroBytesDecoder defaults to `schema_repo`.

##### Inline Schema Based Avro Bytes Decoder

> The "schema_inline" decoder reads Avro records using a fixed schema and does not support schema migration. If you
> may need to migrate schemas in the future, consider one of the other decoders, all of which use a message header that
> allows the parser to identify the proper Avro schema for reading records.

This decoder can be used if all the input events can be read using the same schema. In that case schema can be specified in the input task JSON itself as described below.

```
...
"avroBytesDecoder": {
"type": "schema_inline",
"schema": {
//your schema goes here, for example
"namespace": "org.apache.druid.data",
"name": "User",
"type": "record",
"fields": [
{ "name": "FullName", "type": "string" },
{ "name": "Country", "type": "string" }
]
}
}
...
```

##### Multiple Inline Schemas Based Avro Bytes Decoder

This decoder can be used if different input events can have different read schema. In that case schema can be specified in the input task JSON itself as described below.

```
...
"avroBytesDecoder": {
"type": "multiple_schemas_inline",
"schemas": {
//your id -> schema map goes here, for example
"1": {
"namespace": "org.apache.druid.data",
"name": "User",
"type": "record",
"fields": [
{ "name": "FullName", "type": "string" },
{ "name": "Country", "type": "string" }
]
},
"2": {
"namespace": "org.apache.druid.otherdata",
"name": "UserIdentity",
"type": "record",
"fields": [
{ "name": "Name", "type": "string" },
{ "name": "Location", "type": "string" }
]
},
...
...
}
}
...
```

Note that it is essentially a map of integer schema ID to avro schema object. This parser assumes that record has following format.
first 1 byte is version and must always be 1.
next 4 bytes are integer schema ID serialized using big-endian byte order.
remaining bytes contain serialized avro message.

##### SchemaRepo Based Avro Bytes Decoder

This Avro bytes decoder first extract `subject` and `id` from input message bytes, then use them to lookup the Avro schema with which to decode Avro record from bytes. Details can be found in [schema repo](https://github.com/schema-repo/schema-repo) and [AVRO-1124](https://issues.apache.org/jira/browse/AVRO-1124). You will need an http service like schema repo to hold the avro schema. Towards schema registration on the message producer side, you can refer to `org.apache.druid.data.input.AvroStreamInputRowParserTest#testParse()`.

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `schema_repo`. | no |
| subjectAndIdConverter | JSON Object | Specifies the how to extract subject and id from message bytes. | yes |
| schemaRepository | JSON Object | Specifies the how to lookup Avro schema from subject and id. | yes |

###### Avro-1124 Subject And Id Converter

This section describes the format of the `subjectAndIdConverter` object for the `schema_repo` Avro bytes decoder.

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `avro_1124`. | no |
| topic | String | Specifies the topic of your Kafka stream. | yes |


###### Avro-1124 Schema Repository

This section describes the format of the `schemaRepository` object for the `schema_repo` Avro bytes decoder.

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `avro_1124_rest_client`. | no |
| url | String | Specifies the endpoint url of your Avro-1124 schema repository. | yes |

##### Confluent Schema Registry-based Avro Bytes Decoder

This Avro bytes decoder first extract unique `id` from input message bytes, then use them it lookup in the Schema Registry for the related schema, with which to decode Avro record from bytes.
Details can be found in Schema Registry [documentation](http://docs.confluent.io/current/schema-registry/docs/) and [repository](https://github.com/confluentinc/schema-registry).

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `schema_registry`. | no |
| url | String | Specifies the url endpoint of the Schema Registry. | yes |
| capacity | Integer | Specifies the max size of the cache (default == Integer.MAX_VALUE). | no |

```json
...
"avroBytesDecoder" : {
"type" : "schema_registry",
"url" : <schema-registry-url>
}
...
```

### Avro Hadoop Parser

This is for batch ingestion using the `HadoopDruidIndexer`. The `inputFormat` of `inputSpec` in `ioConfig` must be set to `"org.apache.druid.data.input.avro.AvroValueInputFormat"`. You may want to set Avro reader's schema in `jobProperties` in `tuningConfig`, e.g.: `"avro.schema.input.value.path": "/path/to/your/schema.avsc"` or `"avro.schema.input.value": "your_schema_JSON_object"`, if reader's schema is not set, the schema in Avro object container file will be used, see [Avro specification](http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution). Make sure to include "org.apache.druid.extensions:druid-avro-extensions" as an extension.

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `avro_hadoop`. | no |
| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be an "avro" parseSpec. | yes |

An Avro parseSpec can contain a [`flattenSpec`](../../ingestion/index.md#flattenspec) using either the "root" or "path"
field types, which can be used to read nested Avro records. The "jq" field type is not currently supported for Avro.

For example, using Avro Hadoop parser with custom reader's schema file:

```json
{
"type" : "index_hadoop",
"spec" : {
"dataSchema" : {
"dataSource" : "",
"parser" : {
"type" : "avro_hadoop",
"parseSpec" : {
"format": "avro",
"timestampSpec": <standard timestampSpec>,
"dimensionsSpec": <standard dimensionsSpec>,
"flattenSpec": <optional>
}
}
},
"ioConfig" : {
"type" : "hadoop",
"inputSpec" : {
"type" : "static",
"inputFormat": "org.apache.druid.data.input.avro.AvroValueInputFormat",
"paths" : ""
}
},
"tuningConfig" : {
"jobProperties" : {
"avro.schema.input.value.path" : "/path/to/my/schema.avsc"
}
}
}
}
```
The `druid-avro-extensions` provides two Avro Parsers for stream ingestion and Hadoop batch ingestion.
See [Avro Hadoop Parser](../../ingestion/data-formats.md#avro-hadoop-parser)
and [Avro Stream Parser](../../ingestion/data-formats.md#avro-stream-parser)
for details.
127 changes: 4 additions & 123 deletions docs/development/extensions-core/google.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,127 +37,8 @@ Deep storage can be written to Google Cloud Storage either via this extension or
|`druid.google.bucket`||GCS bucket name.|Must be set.|
|`druid.google.prefix`||GCS prefix.|No-prefix|

## Reading data from Google Cloud Storage

<a name="input-source"></a>

## Google cloud storage batch ingestion input source

This extension also provides an input source for Druid native batch ingestion to support reading objects directly from Google Cloud Storage. Objects can be specified as list of Google Cloud Storage URI strings. The Google Cloud Storage input source is splittable and can be used by [native parallel index tasks](../../ingestion/native-batch.md#parallel-task), where each worker task of `index_parallel` will read a single object.

```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "google",
"uris": ["gs://foo/bar/file.json", "gs://bar/foo/file2.json"]
},
"inputFormat": {
"type": "json"
},
...
},
...
```

```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "google",
"prefixes": ["gs://foo/bar", "gs://bar/foo"]
},
"inputFormat": {
"type": "json"
},
...
},
...
```


```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "google",
"objects": [
{ "bucket": "foo", "path": "bar/file1.json"},
{ "bucket": "bar", "path": "foo/file2.json"}
]
},
"inputFormat": {
"type": "json"
},
...
},
...
```

|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be `google`.|N/A|yes|
|uris|JSON array of URIs where Google Cloud Storage objects to be ingested are located.|N/A|`uris` or `prefixes` or `objects` must be set|
|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested.|N/A|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of Google Cloud Storage objects to be ingested.|N/A|`uris` or `prefixes` or `objects` must be set|


Google Cloud Storage object:

|property|description|default|required?|
|--------|-----------|-------|---------|
|bucket|Name of the Google Cloud Storage bucket|N/A|yes|
|path|The path where data is located.|N/A|yes|

## Firehose

<a name="firehose"></a>

#### StaticGoogleBlobStoreFirehose

This firehose ingests events, similar to the StaticS3Firehose, but from an Google Cloud Store.

As with the S3 blobstore, it is assumed to be gzipped if the extension ends in .gz

This firehose is _splittable_ and can be used by [native parallel index tasks](../../ingestion/native-batch.md#parallel-task).
Since each split represents an object in this firehose, each worker task of `index_parallel` will read an object.

Sample spec:

```json
"firehose" : {
"type" : "static-google-blobstore",
"blobs": [
{
"bucket": "foo",
"path": "/path/to/your/file.json"
},
{
"bucket": "bar",
"path": "/another/path.json"
}
]
}
```

This firehose provides caching and prefetching features. In IndexTask, a firehose can be read twice if intervals or
shardSpecs are not specified, and, in this case, caching can be useful. Prefetching is preferred when direct scan of objects is slow.

|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be `static-google-blobstore`.|N/A|yes|
|blobs|JSON array of Google Blobs.|N/A|yes|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|no|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|no|
|prefetchTriggerBytes|Threshold to trigger prefetching Google Blobs.|maxFetchCapacityBytes / 2|no|
|fetchTimeout|Timeout for fetching a Google Blob.|60000|no|
|maxFetchRetry|Maximum retry for fetching a Google Blob.|3|no|

Google Blobs:

|property|description|default|required?|
|--------|-----------|-------|---------|
|bucket|Name of the Google Cloud bucket|N/A|yes|
|path|The path where data is located.|N/A|yes|
The [Google Cloud Storage input source](../../ingestion/native-batch.md#google-cloud-storage-input-source) is supported by the [Parallel task](../../ingestion/native-batch.md#parallel-task)
to read objects directly from Google Cloud Storage. If you use the [Hadoop task](../../ingestion/hadoop.md),
you can read data from Google Cloud Storage by specifying the paths in your [`inputSpec`](../../ingestion/hadoop.md#inputspec).
Loading