diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index 96f5d924b757..b65a1db1deb1 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -223,6 +223,203 @@ The Parquet `inputFormat` has the following components: |flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Parquet file. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) | | binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) | +### Avro Stream + +> You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro Stream input format. + +> See the [Avro Types](../development/extensions-core/avro.md#avro-types) section for how Avro types are handled in Druid + +The `inputFormat` to load data of Avro format in stream ingestion. An example is: +```json +"ioConfig": { + "inputFormat": { + "type": "avro_stream", + "avroBytesDecoder": { + "type": "schema_inline", + "schema": { + //your schema goes here, for example + "namespace": "org.apache.druid.data", + "name": "User", + "type": "record", + "fields": [ + { "name": "FullName", "type": "string" }, + { "name": "Country", "type": "string" } + ] + } + }, + "flattenSpec": { + "useFieldDiscovery": true, + "fields": [ + { + "type": "path", + "name": "someRecord_subInt", + "expr": "$.someRecord.subInt" + } + ] + }, + "binaryAsString": false + }, + ... +} +``` + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +|type| String| This should be set to `avro_stream` to read Avro serialized data| yes | +|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Avro record. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) | +|`avroBytesDecoder`| JSON Object |Specifies how to decode bytes to Avro record. | yes | +| binaryAsString | Boolean | Specifies if the bytes Avro column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) | + +##### Avro Bytes Decoder + +If `type` is not included, the avroBytesDecoder defaults to `schema_repo`. + +###### Inline Schema Based Avro Bytes Decoder + +> The "schema_inline" decoder reads Avro records using a fixed schema and does not support schema migration. If you +> may need to migrate schemas in the future, consider one of the other decoders, all of which use a message header that +> allows the parser to identify the proper Avro schema for reading records. + +This decoder can be used if all the input events can be read using the same schema. In this case, specify the schema in the input task JSON itself, as described below. + +``` +... +"avroBytesDecoder": { + "type": "schema_inline", + "schema": { + //your schema goes here, for example + "namespace": "org.apache.druid.data", + "name": "User", + "type": "record", + "fields": [ + { "name": "FullName", "type": "string" }, + { "name": "Country", "type": "string" } + ] + } +} +... +``` + +###### Multiple Inline Schemas Based Avro Bytes Decoder + +Use this decoder if different input events can have different read schemas. In this case, specify the schema in the input task JSON itself, as described below. + +``` +... +"avroBytesDecoder": { + "type": "multiple_schemas_inline", + "schemas": { + //your id -> schema map goes here, for example + "1": { + "namespace": "org.apache.druid.data", + "name": "User", + "type": "record", + "fields": [ + { "name": "FullName", "type": "string" }, + { "name": "Country", "type": "string" } + ] + }, + "2": { + "namespace": "org.apache.druid.otherdata", + "name": "UserIdentity", + "type": "record", + "fields": [ + { "name": "Name", "type": "string" }, + { "name": "Location", "type": "string" } + ] + }, + ... + ... + } +} +... +``` + +Note that it is essentially a map of integer schema ID to avro schema object. This parser assumes that record has following format. + first 1 byte is version and must always be 1. + next 4 bytes are integer schema ID serialized using big-endian byte order. + remaining bytes contain serialized avro message. + +##### SchemaRepo Based Avro Bytes Decoder + +This Avro bytes decoder first extracts `subject` and `id` from the input message bytes, and then uses them to look up the Avro schema used to decode the Avro record from bytes. For details, see the [schema repo](https://github.com/schema-repo/schema-repo) and [AVRO-1124](https://issues.apache.org/jira/browse/AVRO-1124). You will need an http service like schema repo to hold the avro schema. For information on registering a schema on the message producer side, see `org.apache.druid.data.input.AvroStreamInputRowParserTest#testParse()`. + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | This should say `schema_repo`. | no | +| subjectAndIdConverter | JSON Object | Specifies how to extract the subject and id from message bytes. | yes | +| schemaRepository | JSON Object | Specifies how to look up the Avro schema from subject and id. | yes | + +###### Avro-1124 Subject And Id Converter + +This section describes the format of the `subjectAndIdConverter` object for the `schema_repo` Avro bytes decoder. + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | This should say `avro_1124`. | no | +| topic | String | Specifies the topic of your Kafka stream. | yes | + + +###### Avro-1124 Schema Repository + +This section describes the format of the `schemaRepository` object for the `schema_repo` Avro bytes decoder. + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | This should say `avro_1124_rest_client`. | no | +| url | String | Specifies the endpoint url of your Avro-1124 schema repository. | yes | + +###### Confluent Schema Registry-based Avro Bytes Decoder + +This Avro bytes decoder first extracts a unique `id` from input message bytes, and then uses it to look up the schema in the Schema Registry used to decode the Avro record from bytes. +For details, see the Schema Registry [documentation](http://docs.confluent.io/current/schema-registry/docs/) and [repository](https://github.com/confluentinc/schema-registry). + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | This should say `schema_registry`. | no | +| url | String | Specifies the url endpoint of the Schema Registry. | yes | +| capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no | +| urls | Array | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) | +| config | Json | To send additional configurations, configured for Schema Registry | no | +| headers | Json | To send headers to the Schema Registry | no | + +For a single schema registry instance, use Field `url` or `urls` for multi instances. + +Single Instance: +```json +... +"avroBytesDecoder" : { + "type" : "schema_registry", + "url" : +} +... +``` + +Multiple Instances: +```json +... +"avroBytesDecoder" : { + "type" : "schema_registry", + "urls" : [, , ...], + "config" : { + "basic.auth.credentials.source": "USER_INFO", + "basic.auth.user.info": "fred:letmein", + "schema.registry.ssl.truststore.location": "/some/secrets/kafka.client.truststore.jks", + "schema.registry.ssl.truststore.password": "", + "schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks", + "schema.registry.ssl.keystore.password": "", + "schema.registry.ssl.key.password": "" + ... + }, + "headers": { + "traceID" : "b29c5de2-0db4-490b-b421", + "timeStamp" : "1577191871865", + ... + } +} +... +``` + ### Avro OCF > You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro OCF input format. @@ -876,7 +1073,7 @@ This parser is for [stream ingestion](./index.md#streaming) and reads Avro data | Field | Type | Description | Required | |-------|------|-------------|----------| | type | String | This should say `avro_stream`. | no | -| avroBytesDecoder | JSON Object | Specifies how to decode bytes to Avro record. | yes | +| avroBytesDecoder | JSON Object | Specifies [`avroBytesDecoder`](#Avro Bytes Decoder) to decode bytes to Avro record. | yes | | parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be an "avro" parseSpec. | yes | An Avro parseSpec can contain a [`flattenSpec`](#flattenspec) using either the "root" or "path" @@ -907,156 +1104,6 @@ For example, using Avro stream parser with schema repo Avro bytes decoder: } ``` -#### Avro Bytes Decoder - -If `type` is not included, the avroBytesDecoder defaults to `schema_repo`. - -##### Inline Schema Based Avro Bytes Decoder - -> The "schema_inline" decoder reads Avro records using a fixed schema and does not support schema migration. If you -> may need to migrate schemas in the future, consider one of the other decoders, all of which use a message header that -> allows the parser to identify the proper Avro schema for reading records. - -This decoder can be used if all the input events can be read using the same schema. In this case, specify the schema in the input task JSON itself, as described below. - -``` -... -"avroBytesDecoder": { - "type": "schema_inline", - "schema": { - //your schema goes here, for example - "namespace": "org.apache.druid.data", - "name": "User", - "type": "record", - "fields": [ - { "name": "FullName", "type": "string" }, - { "name": "Country", "type": "string" } - ] - } -} -... -``` - -##### Multiple Inline Schemas Based Avro Bytes Decoder - -Use this decoder if different input events can have different read schemas. In this case, specify the schema in the input task JSON itself, as described below. - -``` -... -"avroBytesDecoder": { - "type": "multiple_schemas_inline", - "schemas": { - //your id -> schema map goes here, for example - "1": { - "namespace": "org.apache.druid.data", - "name": "User", - "type": "record", - "fields": [ - { "name": "FullName", "type": "string" }, - { "name": "Country", "type": "string" } - ] - }, - "2": { - "namespace": "org.apache.druid.otherdata", - "name": "UserIdentity", - "type": "record", - "fields": [ - { "name": "Name", "type": "string" }, - { "name": "Location", "type": "string" } - ] - }, - ... - ... - } -} -... -``` - -Note that it is essentially a map of integer schema ID to avro schema object. This parser assumes that record has following format. - first 1 byte is version and must always be 1. - next 4 bytes are integer schema ID serialized using big-endian byte order. - remaining bytes contain serialized avro message. - -##### SchemaRepo Based Avro Bytes Decoder - -This Avro bytes decoder first extracts `subject` and `id` from the input message bytes, and then uses them to look up the Avro schema used to decode the Avro record from bytes. For details, see the [schema repo](https://github.com/schema-repo/schema-repo) and [AVRO-1124](https://issues.apache.org/jira/browse/AVRO-1124). You will need an http service like schema repo to hold the avro schema. For information on registering a schema on the message producer side, see `org.apache.druid.data.input.AvroStreamInputRowParserTest#testParse()`. - -| Field | Type | Description | Required | -|-------|------|-------------|----------| -| type | String | This should say `schema_repo`. | no | -| subjectAndIdConverter | JSON Object | Specifies how to extract the subject and id from message bytes. | yes | -| schemaRepository | JSON Object | Specifies how to look up the Avro schema from subject and id. | yes | - -###### Avro-1124 Subject And Id Converter - -This section describes the format of the `subjectAndIdConverter` object for the `schema_repo` Avro bytes decoder. - -| Field | Type | Description | Required | -|-------|------|-------------|----------| -| type | String | This should say `avro_1124`. | no | -| topic | String | Specifies the topic of your Kafka stream. | yes | - - -###### Avro-1124 Schema Repository - -This section describes the format of the `schemaRepository` object for the `schema_repo` Avro bytes decoder. - -| Field | Type | Description | Required | -|-------|------|-------------|----------| -| type | String | This should say `avro_1124_rest_client`. | no | -| url | String | Specifies the endpoint url of your Avro-1124 schema repository. | yes | - -##### Confluent Schema Registry-based Avro Bytes Decoder - -This Avro bytes decoder first extracts a unique `id` from input message bytes, and then uses it to look up the schema in the Schema Registry used to decode the Avro record from bytes. -For details, see the Schema Registry [documentation](http://docs.confluent.io/current/schema-registry/docs/) and [repository](https://github.com/confluentinc/schema-registry). - -| Field | Type | Description | Required | -|-------|------|-------------|----------| -| type | String | This should say `schema_registry`. | no | -| url | String | Specifies the url endpoint of the Schema Registry. | yes | -| capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no | -| urls | Array | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) | -| config | Json | To send additional configurations, configured for Schema Registry | no | -| headers | Json | To send headers to the Schema Registry | no | - -For a single schema registry instance, use Field `url` or `urls` for multi instances. - -Single Instance: -```json -... -"avroBytesDecoder" : { - "type" : "schema_registry", - "url" : -} -... -``` - -Multiple Instances: -```json -... -"avroBytesDecoder" : { - "type" : "schema_registry", - "urls" : [, , ...], - "config" : { - "basic.auth.credentials.source": "USER_INFO", - "basic.auth.user.info": "fred:letmein", - "schema.registry.ssl.truststore.location": "/some/secrets/kafka.client.truststore.jks", - "schema.registry.ssl.truststore.password": "", - "schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks", - "schema.registry.ssl.keystore.password": "", - "schema.registry.ssl.key.password": "" - ... - }, - "headers": { - "traceID" : "b29c5de2-0db4-490b-b421", - "timeStamp" : "1577191871865", - ... - } -} -... -``` - ### Protobuf Parser > You need to include the [`druid-protobuf-extensions`](../development/extensions-core/protobuf.md) as an extension to use the Protobuf Parser. diff --git a/extensions-core/avro-extensions/pom.xml b/extensions-core/avro-extensions/pom.xml index b07c4f4252cb..adb64bfa5f4a 100644 --- a/extensions-core/avro-extensions/pom.xml +++ b/extensions-core/avro-extensions/pom.xml @@ -115,6 +115,11 @@ + + commons-io + commons-io + provided + org.schemarepo schema-repo-api diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroExtensionsModule.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroExtensionsModule.java index 29f7ea0e14eb..296c6dab44b6 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroExtensionsModule.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroExtensionsModule.java @@ -55,7 +55,8 @@ public List getJacksonModules() new NamedType(AvroStreamInputRowParser.class, "avro_stream"), new NamedType(AvroHadoopInputRowParser.class, "avro_hadoop"), new NamedType(AvroParseSpec.class, "avro"), - new NamedType(AvroOCFInputFormat.class, "avro_ocf") + new NamedType(AvroOCFInputFormat.class, "avro_ocf"), + new NamedType(AvroStreamInputFormat.class, "avro_stream") ) .setMixInAnnotation(Repository.class, RepositoryMixIn.class) .setMixInAnnotation(JsonUtil.class, JsonUtilMixIn.class) diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java new file mode 100644 index 000000000000..3b59b368f0bf --- /dev/null +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.avro; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.NestedInputFormat; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Objects; + +public class AvroStreamInputFormat extends NestedInputFormat +{ + private final boolean binaryAsString; + + private final AvroBytesDecoder avroBytesDecoder; + + @JsonCreator + public AvroStreamInputFormat( + @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec, + @JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder, + @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString + ) + { + super(flattenSpec); + this.avroBytesDecoder = avroBytesDecoder; + this.binaryAsString = binaryAsString == null ? false : binaryAsString; + } + + @Override + public boolean isSplittable() + { + return false; + } + + @JsonProperty + public AvroBytesDecoder getAvroBytesDecoder() + { + return avroBytesDecoder; + } + + @JsonProperty + public Boolean getBinaryAsString() + { + return binaryAsString; + } + + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + return new AvroStreamReader( + inputRowSchema, + source, + avroBytesDecoder, + getFlattenSpec(), + binaryAsString + ); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final AvroStreamInputFormat that = (AvroStreamInputFormat) o; + return Objects.equals(getFlattenSpec(), that.getFlattenSpec()) && + Objects.equals(avroBytesDecoder, that.avroBytesDecoder) && + Objects.equals(binaryAsString, that.binaryAsString); + } + + @Override + public int hashCode() + { + return Objects.hash(getFlattenSpec(), avroBytesDecoder, binaryAsString); + } +} diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java new file mode 100644 index 000000000000..d8d92eb5a044 --- /dev/null +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.avro; + +import com.google.common.collect.Iterators; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.IOUtils; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.common.parsers.ObjectFlattener; +import org.apache.druid.java.util.common.parsers.ObjectFlatteners; +import org.apache.druid.java.util.common.parsers.ParseException; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class AvroStreamReader extends IntermediateRowParsingReader +{ + private final InputRowSchema inputRowSchema; + private final InputEntity source; + private final AvroBytesDecoder avroBytesDecoder; + private final ObjectFlattener recordFlattener; + + AvroStreamReader( + InputRowSchema inputRowSchema, + InputEntity source, + AvroBytesDecoder avroBytesDecoder, + JSONPathSpec flattenSpec, + boolean binaryAsString + ) + { + this.inputRowSchema = inputRowSchema; + this.source = source; + this.avroBytesDecoder = avroBytesDecoder; + this.recordFlattener = ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(false, binaryAsString)); + } + + @Override + protected CloseableIterator intermediateRowIterator() throws IOException + { + return CloseableIterators.withEmptyBaggage( + Iterators.singletonIterator(avroBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open())))) + ); + } + + @Override + protected List parseInputRows(GenericRecord intermediateRow) throws ParseException + { + return Collections.singletonList( + MapInputRowParser.parse( + inputRowSchema, + recordFlattener.flatten(intermediateRow) + ) + ); + } + + @Override + protected List> toMap(GenericRecord intermediateRow) + { + return Collections.singletonList(recordFlattener.toMap(intermediateRow)); + } +} diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 123f8fae89bf..4b3da38c49c9 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -45,6 +45,11 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder { private final SchemaRegistryClient registry; + private final String url; + private final int capacity; + private final List urls; + private final Map config; + private final Map headers; @JsonCreator public SchemaRegistryBasedAvroBytesDecoder( @@ -55,18 +60,57 @@ public SchemaRegistryBasedAvroBytesDecoder( @JsonProperty("headers") @Nullable Map headers ) { - int identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity; + this.url = url; + this.capacity = capacity == null ? Integer.MAX_VALUE : capacity; + this.urls = urls; + this.config = config; + this.headers = headers; if (url != null && !url.isEmpty()) { - this.registry = new CachedSchemaRegistryClient(url, identityMapCapacity, config, headers); + this.registry = new CachedSchemaRegistryClient(this.url, this.capacity, this.config, this.headers); } else { - this.registry = new CachedSchemaRegistryClient(urls, identityMapCapacity, config, headers); + this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, this.config, this.headers); } } + @JsonProperty + public String getUrl() + { + return url; + } + + @JsonProperty + public int getCapacity() + { + return capacity; + } + + @JsonProperty + public List getUrls() + { + return urls; + } + + @JsonProperty + public Map getConfig() + { + return config; + } + + @JsonProperty + public Map getHeaders() + { + return headers; + } + //For UT only @VisibleForTesting SchemaRegistryBasedAvroBytesDecoder(SchemaRegistryClient registry) { + this.url = null; + this.capacity = Integer.MAX_VALUE; + this.urls = null; + this.config = null; + this.headers = null; this.registry = registry; } @@ -114,12 +158,21 @@ public boolean equals(Object o) SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o; - return Objects.equals(registry, that.registry); + return Objects.equals(url, that.url) && + Objects.equals(capacity, that.capacity) && + Objects.equals(urls, that.urls) && + Objects.equals(config, that.config) && + Objects.equals(headers, that.headers); } @Override public int hashCode() { - return registry != null ? registry.hashCode() : 0; + int result = url != null ? url.hashCode() : 0; + result = 31 * result + capacity; + result = 31 * result + (urls != null ? urls.hashCode() : 0); + result = 31 * result + (config != null ? config.hashCode() : 0); + result = 31 * result + (headers != null ? headers.hashCode() : 0); + return result; } } diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java new file mode 100644 index 000000000000..139616bfdbfa --- /dev/null +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.druid.data.input.avro.AvroExtensionsModule; +import org.apache.druid.data.input.avro.AvroStreamInputFormat; +import org.apache.druid.data.input.avro.SchemaRegistryBasedAvroBytesDecoder; +import org.apache.druid.data.input.avro.SchemaRepoBasedAvroBytesDecoder; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.NestedInputFormat; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.data.input.schemarepo.Avro1124RESTRepositoryClientWrapper; +import org.apache.druid.data.input.schemarepo.Avro1124SubjectAndIdConverter; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.schemarepo.InMemoryRepository; +import org.schemarepo.Repository; +import org.schemarepo.SchemaValidationException; +import org.schemarepo.api.TypedSchemaRepository; +import org.schemarepo.api.converter.AvroSchemaConverter; +import org.schemarepo.api.converter.IdentityConverter; +import org.schemarepo.api.converter.IntegerConverter; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.druid.data.input.AvroStreamInputRowParserTest.assertInputRowCorrect; +import static org.apache.druid.data.input.AvroStreamInputRowParserTest.buildSomeAvroDatum; + +public class AvroStreamInputFormatTest +{ + + private static final String EVENT_TYPE = "eventType"; + private static final String ID = "id"; + private static final String SOME_OTHER_ID = "someOtherId"; + private static final String IS_VALID = "isValid"; + private static final String TOPIC = "aTopic"; + static final List DIMENSIONS = Arrays.asList(EVENT_TYPE, ID, SOME_OTHER_ID, IS_VALID); + private static final List DIMENSIONS_SCHEMALESS = Arrays.asList( + "nested", + SOME_OTHER_ID, + "someStringArray", + "someIntArray", + "someFloat", + EVENT_TYPE, + "someFixed", + "someBytes", + "someUnion", + ID, + "someEnum", + "someLong", + "someInt", + "timestamp" + ); + + + private final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + private TimestampSpec timestampSpec; + private DimensionsSpec dimensionsSpec; + private JSONPathSpec flattenSpec; + + @Before + public void before() + { + timestampSpec = new TimestampSpec("nested", "millis", null); + dimensionsSpec = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(DIMENSIONS), Collections.emptyList(), null); + flattenSpec = new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.PATH, "nested", "someRecord.subLong") + ) + ); + for (Module jacksonModule : new AvroExtensionsModule().getJacksonModules()) { + jsonMapper.registerModule(jacksonModule); + } + } + + @Test + public void testSerde() throws IOException + { + Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io"); + AvroStreamInputFormat inputFormat = new AvroStreamInputFormat( + flattenSpec, + new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository), + false + ); + NestedInputFormat inputFormat2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(inputFormat), + NestedInputFormat.class + ); + + Assert.assertEquals(inputFormat, inputFormat2); + } + + @Test + public void testSerdeForSchemaRegistry() throws IOException + { + AvroStreamInputFormat inputFormat = new AvroStreamInputFormat( + flattenSpec, + new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null, null, null), + false + ); + NestedInputFormat inputFormat2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(inputFormat), + NestedInputFormat.class + ); + Assert.assertEquals(inputFormat, inputFormat2); + } + + @Test + public void testParse() throws SchemaValidationException, IOException + { + Repository repository = new InMemoryRepository(null); + AvroStreamInputFormat inputFormat = new AvroStreamInputFormat( + flattenSpec, + new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository), + false + ); + NestedInputFormat inputFormat2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(inputFormat), + NestedInputFormat.class + ); + repository = ((SchemaRepoBasedAvroBytesDecoder) ((AvroStreamInputFormat) inputFormat2).getAvroBytesDecoder()).getSchemaRepository(); + + // prepare data + GenericRecord someAvroDatum = buildSomeAvroDatum(); + + // encode schema id + Avro1124SubjectAndIdConverter converter = new Avro1124SubjectAndIdConverter(TOPIC); + TypedSchemaRepository repositoryClient = new TypedSchemaRepository<>( + repository, + new IntegerConverter(), + new AvroSchemaConverter(), + new IdentityConverter() + ); + Integer id = repositoryClient.registerSchema(TOPIC, SomeAvroDatum.getClassSchema()); + ByteBuffer byteBuffer = ByteBuffer.allocate(4); + converter.putSubjectAndId(id, byteBuffer); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + out.write(byteBuffer.array()); + // encode data + DatumWriter writer = new SpecificDatumWriter<>(someAvroDatum.getSchema()); + // write avro datum to bytes + writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null)); + + final ByteEntity entity = new ByteEntity(ByteBuffer.wrap(out.toByteArray())); + + InputRow inputRow = inputFormat2.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, null), entity, null).read().next(); + + assertInputRowCorrect(inputRow, DIMENSIONS, false); + } + + @Test + public void testParseSchemaless() throws SchemaValidationException, IOException + { + Repository repository = new InMemoryRepository(null); + AvroStreamInputFormat inputFormat = new AvroStreamInputFormat( + flattenSpec, + new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository), + false + ); + NestedInputFormat inputFormat2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(inputFormat), + NestedInputFormat.class + ); + repository = ((SchemaRepoBasedAvroBytesDecoder) ((AvroStreamInputFormat) inputFormat2).getAvroBytesDecoder()).getSchemaRepository(); + + // prepare data + GenericRecord someAvroDatum = buildSomeAvroDatum(); + + // encode schema id + Avro1124SubjectAndIdConverter converter = new Avro1124SubjectAndIdConverter(TOPIC); + TypedSchemaRepository repositoryClient = new TypedSchemaRepository<>( + repository, + new IntegerConverter(), + new AvroSchemaConverter(), + new IdentityConverter() + ); + Integer id = repositoryClient.registerSchema(TOPIC, SomeAvroDatum.getClassSchema()); + ByteBuffer byteBuffer = ByteBuffer.allocate(4); + converter.putSubjectAndId(id, byteBuffer); + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + out.write(byteBuffer.array()); + // encode data + DatumWriter writer = new SpecificDatumWriter<>(someAvroDatum.getSchema()); + // write avro datum to bytes + writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null)); + + final ByteEntity entity = new ByteEntity(ByteBuffer.wrap(out.toByteArray())); + + InputRow inputRow = inputFormat2.createReader(new InputRowSchema(timestampSpec, new DimensionsSpec(null, null, null), null), entity, null).read().next(); + + assertInputRowCorrect(inputRow, DIMENSIONS_SCHEMALESS, false); + } + } +} diff --git a/integration-tests/src/test/resources/stream/data/avro/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/avro/input_format/input_format.json new file mode 100644 index 000000000000..b6b9c61b68f7 --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/avro/input_format/input_format.json @@ -0,0 +1,81 @@ +{ + "type": "avro_stream", + "avroBytesDecoder": { + "type": "schema_inline", + "schema": { + "namespace": "org.apache.druid", + "name": "wikipedia", + "type": "record", + "fields": [ + { + "name": "timestamp", + "type": "string" + }, + { + "name": "page", + "type": "string" + }, + { + "name": "language", + "type": "string" + }, + { + "name": "user", + "type": "string" + }, + { + "name": "unpatrolled", + "type": "string" + }, + { + "name": "newPage", + "type": "string" + }, + { + "name": "robot", + "type": "string" + }, + { + "name": "anonymous", + "type": "string" + }, + { + "name": "namespace", + "type": "string" + }, + { + "name": "continent", + "type": "string" + }, + { + "name": "country", + "type": "string" + }, + { + "name": "region", + "type": "string" + }, + { + "name": "city", + "type": "string" + }, + { + "name": "added", + "type": "long" + }, + { + "name": "deleted", + "type": "long" + }, + { + "name": "delta", + "type": "long" + } + ] + } + }, + "flattenSpec": { + "useFieldDiscovery": true + }, + "binaryAsString": false +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/stream/data/avro_schema_registry/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/avro_schema_registry/input_format/input_format.json new file mode 100644 index 000000000000..c9c8ce7a8276 --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/avro_schema_registry/input_format/input_format.json @@ -0,0 +1,15 @@ +{ + "type": "avro_stream", + "avroBytesDecoder": { + "type": "schema_registry", + "url": "%%SCHEMA_REGISTRY_HOST%%", + "config": { + "basic.auth.credentials.source": "USER_INFO", + "basic.auth.user.info": "druid:diurd" + } + }, + "flattenSpec": { + "useFieldDiscovery": true + }, + "binaryAsString": false +} \ No newline at end of file diff --git a/web-console/src/druid-models/input-format.tsx b/web-console/src/druid-models/input-format.tsx index 7907498b3bd7..4bd9702fcef5 100644 --- a/web-console/src/druid-models/input-format.tsx +++ b/web-console/src/druid-models/input-format.tsx @@ -42,7 +42,7 @@ export const INPUT_FORMAT_FIELDS: Field[] = [ name: 'type', label: 'Input format', type: 'string', - suggestions: ['json', 'csv', 'tsv', 'regex', 'parquet', 'orc', 'avro_ocf'], + suggestions: ['json', 'csv', 'tsv', 'regex', 'parquet', 'orc', 'avro_ocf', 'avro_stream'], required: true, info: ( <> @@ -127,7 +127,7 @@ export const INPUT_FORMAT_FIELDS: Field[] = [ name: 'binaryAsString', type: 'boolean', defaultValue: false, - defined: (p: InputFormat) => oneOf(p.type, 'parquet', 'orc', 'avro_ocf'), + defined: (p: InputFormat) => oneOf(p.type, 'parquet', 'orc', 'avro_ocf', 'avro_stream'), info: ( <> Specifies if the binary column which is not logically marked as a string should be treated @@ -142,5 +142,5 @@ export function issueWithInputFormat(inputFormat: InputFormat | undefined): stri } export function inputFormatCanFlatten(inputFormat: InputFormat): boolean { - return oneOf(inputFormat.type, 'json', 'parquet', 'orc', 'avro_ocf'); + return oneOf(inputFormat.type, 'json', 'parquet', 'orc', 'avro_ocf', 'avro_stream'); }