From 8c8213c032e717fc2e5f11032143af71947fb6cd Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sun, 28 Mar 2021 16:29:58 +0800 Subject: [PATCH 1/9] add avro stream input format --- .../data/input/avro/AvroExtensionsModule.java | 3 +- .../input/avro/AvroStreamInputFormat.java | 96 ++++++++ .../data/input/avro/AvroStreamReader.java | 88 +++++++ .../data/input/AvroStreamInputFormatTest.java | 216 ++++++++++++++++++ 4 files changed, 402 insertions(+), 1 deletion(-) create mode 100644 extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java create mode 100644 extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java create mode 100644 extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroExtensionsModule.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroExtensionsModule.java index 29f7ea0e14eb..296c6dab44b6 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroExtensionsModule.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroExtensionsModule.java @@ -55,7 +55,8 @@ public List getJacksonModules() new NamedType(AvroStreamInputRowParser.class, "avro_stream"), new NamedType(AvroHadoopInputRowParser.class, "avro_hadoop"), new NamedType(AvroParseSpec.class, "avro"), - new NamedType(AvroOCFInputFormat.class, "avro_ocf") + new NamedType(AvroOCFInputFormat.class, "avro_ocf"), + new NamedType(AvroStreamInputFormat.class, "avro_stream") ) .setMixInAnnotation(Repository.class, RepositoryMixIn.class) .setMixInAnnotation(JsonUtil.class, JsonUtilMixIn.class) diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java new file mode 100644 index 000000000000..b8f5ce8ee352 --- /dev/null +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.avro; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.NestedInputFormat; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Objects; + +public class AvroStreamInputFormat extends NestedInputFormat +{ + private final boolean binaryAsString; + + private final AvroBytesDecoder avroBytesDecoder; + + @JsonCreator + public AvroStreamInputFormat( + @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec, + @JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder, + @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString + ) + { + super(flattenSpec); + this.avroBytesDecoder = avroBytesDecoder; + this.binaryAsString = binaryAsString == null ? false : binaryAsString; + } + + @Override + public boolean isSplittable() + { + return false; + } + + @JsonProperty + public AvroBytesDecoder getAvroBytesDecoder() + { + return avroBytesDecoder; + } + + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + return new AvroStreamReader( + inputRowSchema, + source, + avroBytesDecoder, + getFlattenSpec(), + binaryAsString + ); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final AvroStreamInputFormat that = (AvroStreamInputFormat) o; + return Objects.equals(getFlattenSpec(), that.getFlattenSpec()) && + Objects.equals(avroBytesDecoder, that.avroBytesDecoder); + } + + @Override + public int hashCode() + { + return Objects.hash(getFlattenSpec(), avroBytesDecoder); + } +} diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java new file mode 100644 index 000000000000..24b6ff3877c3 --- /dev/null +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.avro; + +import com.google.common.collect.Iterators; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.IOUtils; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.common.parsers.ObjectFlattener; +import org.apache.druid.java.util.common.parsers.ObjectFlatteners; +import org.apache.druid.java.util.common.parsers.ParseException; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class AvroStreamReader extends IntermediateRowParsingReader +{ + private final InputRowSchema inputRowSchema; + private final InputEntity source; + private final AvroBytesDecoder avroBytesDecoder; + private final ObjectFlattener recordFlattener; + + AvroStreamReader( + InputRowSchema inputRowSchema, + InputEntity source, + AvroBytesDecoder avroBytesDecoder, + JSONPathSpec flattenSpec, + boolean binaryAsString + ) + { + this.inputRowSchema = inputRowSchema; + this.source = source; + this.avroBytesDecoder = avroBytesDecoder; + this.recordFlattener = ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(false, binaryAsString)); + } + + @Override + protected CloseableIterator intermediateRowIterator() throws IOException + { + return CloseableIterators.withEmptyBaggage( + Iterators.singletonIterator(avroBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open()) + )))); + } + + @Override + protected List parseInputRows(GenericRecord intermediateRow) throws ParseException + { + return Collections.singletonList( + MapInputRowParser.parse( + inputRowSchema, + recordFlattener.flatten(intermediateRow) + ) + ); + } + + @Override + protected List> toMap(GenericRecord intermediateRow) + { + return Collections.singletonList(recordFlattener.toMap(intermediateRow)); + } +} diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java new file mode 100644 index 000000000000..a2199098508e --- /dev/null +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.druid.data.input.avro.AvroExtensionsModule; +import org.apache.druid.data.input.avro.AvroStreamInputFormat; +import org.apache.druid.data.input.avro.SchemaRepoBasedAvroBytesDecoder; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.NestedInputFormat; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.data.input.schemarepo.Avro1124RESTRepositoryClientWrapper; +import org.apache.druid.data.input.schemarepo.Avro1124SubjectAndIdConverter; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.schemarepo.InMemoryRepository; +import org.schemarepo.Repository; +import org.schemarepo.SchemaValidationException; +import org.schemarepo.api.TypedSchemaRepository; +import org.schemarepo.api.converter.AvroSchemaConverter; +import org.schemarepo.api.converter.IdentityConverter; +import org.schemarepo.api.converter.IntegerConverter; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.druid.data.input.AvroStreamInputRowParserTest.assertInputRowCorrect; +import static org.apache.druid.data.input.AvroStreamInputRowParserTest.buildSomeAvroDatum; + +public class AvroStreamInputFormatTest +{ + + private static final String EVENT_TYPE = "eventType"; + private static final String ID = "id"; + private static final String SOME_OTHER_ID = "someOtherId"; + private static final String IS_VALID = "isValid"; + private static final String TOPIC = "aTopic"; + static final List DIMENSIONS = Arrays.asList(EVENT_TYPE, ID, SOME_OTHER_ID, IS_VALID); + private static final List DIMENSIONS_SCHEMALESS = Arrays.asList( + "nested", + SOME_OTHER_ID, + "someStringArray", + "someIntArray", + "someFloat", + EVENT_TYPE, + "someFixed", + "someBytes", + "someUnion", + ID, + "someEnum", + "someLong", + "someInt", + "timestamp" + ); + + + private final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + private TimestampSpec timestampSpec; + private DimensionsSpec dimensionsSpec; + private JSONPathSpec flattenSpec; + + @Before + public void before() + { + timestampSpec = new TimestampSpec("nested", "millis", null); + dimensionsSpec = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(DIMENSIONS), Collections.emptyList(), null); + flattenSpec = new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.PATH, "nested", "someRecord.subLong") + ) + ); + for (Module jacksonModule : new AvroExtensionsModule().getJacksonModules()) { + jsonMapper.registerModule(jacksonModule); + } + } + + @Test + public void testSerde() throws IOException + { + Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io"); + AvroStreamInputFormat inputFormat = new AvroStreamInputFormat( + flattenSpec, + new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository), + false + ); + NestedInputFormat inputFormat2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(inputFormat), + NestedInputFormat.class + ); + + Assert.assertEquals(inputFormat, inputFormat2); + } + + @Test + public void testParse() throws SchemaValidationException, IOException + { + Repository repository = new InMemoryRepository(null); + AvroStreamInputFormat inputFormat = new AvroStreamInputFormat( + flattenSpec, + new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository), + false + ); + NestedInputFormat inputFormat2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(inputFormat), + NestedInputFormat.class + ); + repository = ((SchemaRepoBasedAvroBytesDecoder) ((AvroStreamInputFormat) inputFormat2).getAvroBytesDecoder()).getSchemaRepository(); + + // prepare data + GenericRecord someAvroDatum = buildSomeAvroDatum(); + + // encode schema id + Avro1124SubjectAndIdConverter converter = new Avro1124SubjectAndIdConverter(TOPIC); + TypedSchemaRepository repositoryClient = new TypedSchemaRepository<>( + repository, + new IntegerConverter(), + new AvroSchemaConverter(), + new IdentityConverter() + ); + Integer id = repositoryClient.registerSchema(TOPIC, SomeAvroDatum.getClassSchema()); + ByteBuffer byteBuffer = ByteBuffer.allocate(4); + converter.putSubjectAndId(id, byteBuffer); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + out.write(byteBuffer.array()); + // encode data + DatumWriter writer = new SpecificDatumWriter<>(someAvroDatum.getSchema()); + // write avro datum to bytes + writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null)); + + final ByteEntity entity = new ByteEntity(ByteBuffer.wrap(out.toByteArray())); + + InputRow inputRow = inputFormat2.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, null), entity, null).read().next(); + + assertInputRowCorrect(inputRow, DIMENSIONS, false); + } + + @Test + public void testParseSchemaless() throws SchemaValidationException, IOException + { + Repository repository = new InMemoryRepository(null); + AvroStreamInputFormat inputFormat = new AvroStreamInputFormat( + flattenSpec, + new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository), + false + ); + NestedInputFormat inputFormat2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(inputFormat), + NestedInputFormat.class + ); + repository = ((SchemaRepoBasedAvroBytesDecoder) ((AvroStreamInputFormat) inputFormat2).getAvroBytesDecoder()).getSchemaRepository(); + + // prepare data + GenericRecord someAvroDatum = buildSomeAvroDatum(); + + // encode schema id + Avro1124SubjectAndIdConverter converter = new Avro1124SubjectAndIdConverter(TOPIC); + TypedSchemaRepository repositoryClient = new TypedSchemaRepository<>( + repository, + new IntegerConverter(), + new AvroSchemaConverter(), + new IdentityConverter() + ); + Integer id = repositoryClient.registerSchema(TOPIC, SomeAvroDatum.getClassSchema()); + ByteBuffer byteBuffer = ByteBuffer.allocate(4); + converter.putSubjectAndId(id, byteBuffer); + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + out.write(byteBuffer.array()); + // encode data + DatumWriter writer = new SpecificDatumWriter<>(someAvroDatum.getSchema()); + // write avro datum to bytes + writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null)); + + final ByteEntity entity = new ByteEntity(ByteBuffer.wrap(out.toByteArray())); + + InputRow inputRow = inputFormat2.createReader(new InputRowSchema(timestampSpec, new DimensionsSpec(null, null, null), null), entity, null).read().next(); + + assertInputRowCorrect(inputRow, DIMENSIONS_SCHEMALESS, false); + } + } +} From 1fdd972a98314a61794f94df423c6d2ed8668bdf Mon Sep 17 00:00:00 2001 From: yuanyi Date: Mon, 29 Mar 2021 13:47:09 +0800 Subject: [PATCH 2/9] bug fixed --- extensions-core/avro-extensions/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/extensions-core/avro-extensions/pom.xml b/extensions-core/avro-extensions/pom.xml index b07c4f4252cb..adb64bfa5f4a 100644 --- a/extensions-core/avro-extensions/pom.xml +++ b/extensions-core/avro-extensions/pom.xml @@ -115,6 +115,11 @@ + + commons-io + commons-io + provided + org.schemarepo schema-repo-api From c9249953a5573ec72b16f86ea3d900455c890c32 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Wed, 31 Mar 2021 16:14:17 +0800 Subject: [PATCH 3/9] add document --- docs/ingestion/data-formats.md | 45 +++++++++++++++++++ .../input/avro/AvroStreamInputFormat.java | 5 ++- .../data/input/avro/AvroStreamReader.java | 4 +- 3 files changed, 50 insertions(+), 4 deletions(-) diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index 96f5d924b757..1adee9dc5f76 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -223,6 +223,51 @@ The Parquet `inputFormat` has the following components: |flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Parquet file. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) | | binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) | +### Avro Stream + +> You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro Stream input format. + +> See the [Avro Types](../development/extensions-core/avro.md#avro-types) section for how Avro types are handled in Druid + +The `inputFormat` to load data of Avro format in stream injestion. An example is: +```json +"ioConfig": { + "inputFormat": { + "type": "avro_stream", + "avroBytesDecoder" : { + "type" : "schema_repo", + "subjectAndIdConverter" : { + "type" : "avro_1124", + "topic" : "${YOUR_TOPIC}" + }, + "schemaRepository" : { + "type" : "avro_1124_rest_client", + "url" : "${YOUR_SCHEMA_REPO_END_POINT}", + } + }, + "flattenSpec": { + "useFieldDiscovery": true, + "fields": [ + { + "type": "path", + "name": "someRecord_subInt", + "expr": "$.someRecord.subInt" + } + ] + }, + "binaryAsString": false + }, + ... +} +``` + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +|type| String| This should be set to `avro_stream` to read Avro serialized data| yes | +|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Avro record. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) | +|`avroBytesDecoder`| JSON Object |Specifies how to decode bytes to Avro record. | yes | +| binaryAsString | Boolean | Specifies if the bytes Avro column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) | + ### Avro OCF > You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro OCF input format. diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java index b8f5ce8ee352..5b0bbd455670 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java @@ -85,12 +85,13 @@ public boolean equals(final Object o) } final AvroStreamInputFormat that = (AvroStreamInputFormat) o; return Objects.equals(getFlattenSpec(), that.getFlattenSpec()) && - Objects.equals(avroBytesDecoder, that.avroBytesDecoder); + Objects.equals(avroBytesDecoder, that.avroBytesDecoder) && + Objects.equals(binaryAsString, that.binaryAsString); } @Override public int hashCode() { - return Objects.hash(getFlattenSpec(), avroBytesDecoder); + return Objects.hash(getFlattenSpec(), avroBytesDecoder, binaryAsString); } } diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java index 24b6ff3877c3..d8d92eb5a044 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java @@ -65,8 +65,8 @@ public class AvroStreamReader extends IntermediateRowParsingReader intermediateRowIterator() throws IOException { return CloseableIterators.withEmptyBaggage( - Iterators.singletonIterator(avroBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open()) - )))); + Iterators.singletonIterator(avroBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open())))) + ); } @Override From 9f08bc1550e4a426d5642a684b1704f75c13fe2a Mon Sep 17 00:00:00 2001 From: yuanyi Date: Wed, 31 Mar 2021 17:37:17 +0800 Subject: [PATCH 4/9] doc fix --- docs/ingestion/data-formats.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index 1adee9dc5f76..dc4d683a3444 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -229,7 +229,7 @@ The Parquet `inputFormat` has the following components: > See the [Avro Types](../development/extensions-core/avro.md#avro-types) section for how Avro types are handled in Druid -The `inputFormat` to load data of Avro format in stream injestion. An example is: +The `inputFormat` to load data of Avro format in stream ingestion. An example is: ```json "ioConfig": { "inputFormat": { From 60de593798da14d40567f6f2be9eee02d73c22bc Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 3 Apr 2021 21:20:41 +0800 Subject: [PATCH 5/9] change doc --- docs/ingestion/data-formats.md | 322 +++++++++++++++++---------------- 1 file changed, 162 insertions(+), 160 deletions(-) diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index dc4d683a3444..b65a1db1deb1 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -234,15 +234,17 @@ The `inputFormat` to load data of Avro format in stream ingestion. An example is "ioConfig": { "inputFormat": { "type": "avro_stream", - "avroBytesDecoder" : { - "type" : "schema_repo", - "subjectAndIdConverter" : { - "type" : "avro_1124", - "topic" : "${YOUR_TOPIC}" - }, - "schemaRepository" : { - "type" : "avro_1124_rest_client", - "url" : "${YOUR_SCHEMA_REPO_END_POINT}", + "avroBytesDecoder": { + "type": "schema_inline", + "schema": { + //your schema goes here, for example + "namespace": "org.apache.druid.data", + "name": "User", + "type": "record", + "fields": [ + { "name": "FullName", "type": "string" }, + { "name": "Country", "type": "string" } + ] } }, "flattenSpec": { @@ -268,6 +270,156 @@ The `inputFormat` to load data of Avro format in stream ingestion. An example is |`avroBytesDecoder`| JSON Object |Specifies how to decode bytes to Avro record. | yes | | binaryAsString | Boolean | Specifies if the bytes Avro column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) | +##### Avro Bytes Decoder + +If `type` is not included, the avroBytesDecoder defaults to `schema_repo`. + +###### Inline Schema Based Avro Bytes Decoder + +> The "schema_inline" decoder reads Avro records using a fixed schema and does not support schema migration. If you +> may need to migrate schemas in the future, consider one of the other decoders, all of which use a message header that +> allows the parser to identify the proper Avro schema for reading records. + +This decoder can be used if all the input events can be read using the same schema. In this case, specify the schema in the input task JSON itself, as described below. + +``` +... +"avroBytesDecoder": { + "type": "schema_inline", + "schema": { + //your schema goes here, for example + "namespace": "org.apache.druid.data", + "name": "User", + "type": "record", + "fields": [ + { "name": "FullName", "type": "string" }, + { "name": "Country", "type": "string" } + ] + } +} +... +``` + +###### Multiple Inline Schemas Based Avro Bytes Decoder + +Use this decoder if different input events can have different read schemas. In this case, specify the schema in the input task JSON itself, as described below. + +``` +... +"avroBytesDecoder": { + "type": "multiple_schemas_inline", + "schemas": { + //your id -> schema map goes here, for example + "1": { + "namespace": "org.apache.druid.data", + "name": "User", + "type": "record", + "fields": [ + { "name": "FullName", "type": "string" }, + { "name": "Country", "type": "string" } + ] + }, + "2": { + "namespace": "org.apache.druid.otherdata", + "name": "UserIdentity", + "type": "record", + "fields": [ + { "name": "Name", "type": "string" }, + { "name": "Location", "type": "string" } + ] + }, + ... + ... + } +} +... +``` + +Note that it is essentially a map of integer schema ID to avro schema object. This parser assumes that record has following format. + first 1 byte is version and must always be 1. + next 4 bytes are integer schema ID serialized using big-endian byte order. + remaining bytes contain serialized avro message. + +##### SchemaRepo Based Avro Bytes Decoder + +This Avro bytes decoder first extracts `subject` and `id` from the input message bytes, and then uses them to look up the Avro schema used to decode the Avro record from bytes. For details, see the [schema repo](https://github.com/schema-repo/schema-repo) and [AVRO-1124](https://issues.apache.org/jira/browse/AVRO-1124). You will need an http service like schema repo to hold the avro schema. For information on registering a schema on the message producer side, see `org.apache.druid.data.input.AvroStreamInputRowParserTest#testParse()`. + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | This should say `schema_repo`. | no | +| subjectAndIdConverter | JSON Object | Specifies how to extract the subject and id from message bytes. | yes | +| schemaRepository | JSON Object | Specifies how to look up the Avro schema from subject and id. | yes | + +###### Avro-1124 Subject And Id Converter + +This section describes the format of the `subjectAndIdConverter` object for the `schema_repo` Avro bytes decoder. + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | This should say `avro_1124`. | no | +| topic | String | Specifies the topic of your Kafka stream. | yes | + + +###### Avro-1124 Schema Repository + +This section describes the format of the `schemaRepository` object for the `schema_repo` Avro bytes decoder. + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | This should say `avro_1124_rest_client`. | no | +| url | String | Specifies the endpoint url of your Avro-1124 schema repository. | yes | + +###### Confluent Schema Registry-based Avro Bytes Decoder + +This Avro bytes decoder first extracts a unique `id` from input message bytes, and then uses it to look up the schema in the Schema Registry used to decode the Avro record from bytes. +For details, see the Schema Registry [documentation](http://docs.confluent.io/current/schema-registry/docs/) and [repository](https://github.com/confluentinc/schema-registry). + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | This should say `schema_registry`. | no | +| url | String | Specifies the url endpoint of the Schema Registry. | yes | +| capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no | +| urls | Array | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) | +| config | Json | To send additional configurations, configured for Schema Registry | no | +| headers | Json | To send headers to the Schema Registry | no | + +For a single schema registry instance, use Field `url` or `urls` for multi instances. + +Single Instance: +```json +... +"avroBytesDecoder" : { + "type" : "schema_registry", + "url" : +} +... +``` + +Multiple Instances: +```json +... +"avroBytesDecoder" : { + "type" : "schema_registry", + "urls" : [, , ...], + "config" : { + "basic.auth.credentials.source": "USER_INFO", + "basic.auth.user.info": "fred:letmein", + "schema.registry.ssl.truststore.location": "/some/secrets/kafka.client.truststore.jks", + "schema.registry.ssl.truststore.password": "", + "schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks", + "schema.registry.ssl.keystore.password": "", + "schema.registry.ssl.key.password": "" + ... + }, + "headers": { + "traceID" : "b29c5de2-0db4-490b-b421", + "timeStamp" : "1577191871865", + ... + } +} +... +``` + ### Avro OCF > You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro OCF input format. @@ -921,7 +1073,7 @@ This parser is for [stream ingestion](./index.md#streaming) and reads Avro data | Field | Type | Description | Required | |-------|------|-------------|----------| | type | String | This should say `avro_stream`. | no | -| avroBytesDecoder | JSON Object | Specifies how to decode bytes to Avro record. | yes | +| avroBytesDecoder | JSON Object | Specifies [`avroBytesDecoder`](#Avro Bytes Decoder) to decode bytes to Avro record. | yes | | parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be an "avro" parseSpec. | yes | An Avro parseSpec can contain a [`flattenSpec`](#flattenspec) using either the "root" or "path" @@ -952,156 +1104,6 @@ For example, using Avro stream parser with schema repo Avro bytes decoder: } ``` -#### Avro Bytes Decoder - -If `type` is not included, the avroBytesDecoder defaults to `schema_repo`. - -##### Inline Schema Based Avro Bytes Decoder - -> The "schema_inline" decoder reads Avro records using a fixed schema and does not support schema migration. If you -> may need to migrate schemas in the future, consider one of the other decoders, all of which use a message header that -> allows the parser to identify the proper Avro schema for reading records. - -This decoder can be used if all the input events can be read using the same schema. In this case, specify the schema in the input task JSON itself, as described below. - -``` -... -"avroBytesDecoder": { - "type": "schema_inline", - "schema": { - //your schema goes here, for example - "namespace": "org.apache.druid.data", - "name": "User", - "type": "record", - "fields": [ - { "name": "FullName", "type": "string" }, - { "name": "Country", "type": "string" } - ] - } -} -... -``` - -##### Multiple Inline Schemas Based Avro Bytes Decoder - -Use this decoder if different input events can have different read schemas. In this case, specify the schema in the input task JSON itself, as described below. - -``` -... -"avroBytesDecoder": { - "type": "multiple_schemas_inline", - "schemas": { - //your id -> schema map goes here, for example - "1": { - "namespace": "org.apache.druid.data", - "name": "User", - "type": "record", - "fields": [ - { "name": "FullName", "type": "string" }, - { "name": "Country", "type": "string" } - ] - }, - "2": { - "namespace": "org.apache.druid.otherdata", - "name": "UserIdentity", - "type": "record", - "fields": [ - { "name": "Name", "type": "string" }, - { "name": "Location", "type": "string" } - ] - }, - ... - ... - } -} -... -``` - -Note that it is essentially a map of integer schema ID to avro schema object. This parser assumes that record has following format. - first 1 byte is version and must always be 1. - next 4 bytes are integer schema ID serialized using big-endian byte order. - remaining bytes contain serialized avro message. - -##### SchemaRepo Based Avro Bytes Decoder - -This Avro bytes decoder first extracts `subject` and `id` from the input message bytes, and then uses them to look up the Avro schema used to decode the Avro record from bytes. For details, see the [schema repo](https://github.com/schema-repo/schema-repo) and [AVRO-1124](https://issues.apache.org/jira/browse/AVRO-1124). You will need an http service like schema repo to hold the avro schema. For information on registering a schema on the message producer side, see `org.apache.druid.data.input.AvroStreamInputRowParserTest#testParse()`. - -| Field | Type | Description | Required | -|-------|------|-------------|----------| -| type | String | This should say `schema_repo`. | no | -| subjectAndIdConverter | JSON Object | Specifies how to extract the subject and id from message bytes. | yes | -| schemaRepository | JSON Object | Specifies how to look up the Avro schema from subject and id. | yes | - -###### Avro-1124 Subject And Id Converter - -This section describes the format of the `subjectAndIdConverter` object for the `schema_repo` Avro bytes decoder. - -| Field | Type | Description | Required | -|-------|------|-------------|----------| -| type | String | This should say `avro_1124`. | no | -| topic | String | Specifies the topic of your Kafka stream. | yes | - - -###### Avro-1124 Schema Repository - -This section describes the format of the `schemaRepository` object for the `schema_repo` Avro bytes decoder. - -| Field | Type | Description | Required | -|-------|------|-------------|----------| -| type | String | This should say `avro_1124_rest_client`. | no | -| url | String | Specifies the endpoint url of your Avro-1124 schema repository. | yes | - -##### Confluent Schema Registry-based Avro Bytes Decoder - -This Avro bytes decoder first extracts a unique `id` from input message bytes, and then uses it to look up the schema in the Schema Registry used to decode the Avro record from bytes. -For details, see the Schema Registry [documentation](http://docs.confluent.io/current/schema-registry/docs/) and [repository](https://github.com/confluentinc/schema-registry). - -| Field | Type | Description | Required | -|-------|------|-------------|----------| -| type | String | This should say `schema_registry`. | no | -| url | String | Specifies the url endpoint of the Schema Registry. | yes | -| capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no | -| urls | Array | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) | -| config | Json | To send additional configurations, configured for Schema Registry | no | -| headers | Json | To send headers to the Schema Registry | no | - -For a single schema registry instance, use Field `url` or `urls` for multi instances. - -Single Instance: -```json -... -"avroBytesDecoder" : { - "type" : "schema_registry", - "url" : -} -... -``` - -Multiple Instances: -```json -... -"avroBytesDecoder" : { - "type" : "schema_registry", - "urls" : [, , ...], - "config" : { - "basic.auth.credentials.source": "USER_INFO", - "basic.auth.user.info": "fred:letmein", - "schema.registry.ssl.truststore.location": "/some/secrets/kafka.client.truststore.jks", - "schema.registry.ssl.truststore.password": "", - "schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks", - "schema.registry.ssl.keystore.password": "", - "schema.registry.ssl.key.password": "" - ... - }, - "headers": { - "traceID" : "b29c5de2-0db4-490b-b421", - "timeStamp" : "1577191871865", - ... - } -} -... -``` - ### Protobuf Parser > You need to include the [`druid-protobuf-extensions`](../development/extensions-core/protobuf.md) as an extension to use the Protobuf Parser. From 573a9abd3b3deba9eb8aa32ebd2dc08b136570d0 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Mon, 5 Apr 2021 18:14:47 +0800 Subject: [PATCH 6/9] add integretion test --- .../data/avro/input_format/input_format.json | 81 +++++++++++++++++++ .../input_format/input_format.json | 15 ++++ web-console/src/druid-models/input-format.tsx | 6 +- 3 files changed, 99 insertions(+), 3 deletions(-) create mode 100644 integration-tests/src/test/resources/stream/data/avro/input_format/input_format.json create mode 100644 integration-tests/src/test/resources/stream/data/avro_schema_registry/input_format/input_format.json diff --git a/integration-tests/src/test/resources/stream/data/avro/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/avro/input_format/input_format.json new file mode 100644 index 000000000000..b6b9c61b68f7 --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/avro/input_format/input_format.json @@ -0,0 +1,81 @@ +{ + "type": "avro_stream", + "avroBytesDecoder": { + "type": "schema_inline", + "schema": { + "namespace": "org.apache.druid", + "name": "wikipedia", + "type": "record", + "fields": [ + { + "name": "timestamp", + "type": "string" + }, + { + "name": "page", + "type": "string" + }, + { + "name": "language", + "type": "string" + }, + { + "name": "user", + "type": "string" + }, + { + "name": "unpatrolled", + "type": "string" + }, + { + "name": "newPage", + "type": "string" + }, + { + "name": "robot", + "type": "string" + }, + { + "name": "anonymous", + "type": "string" + }, + { + "name": "namespace", + "type": "string" + }, + { + "name": "continent", + "type": "string" + }, + { + "name": "country", + "type": "string" + }, + { + "name": "region", + "type": "string" + }, + { + "name": "city", + "type": "string" + }, + { + "name": "added", + "type": "long" + }, + { + "name": "deleted", + "type": "long" + }, + { + "name": "delta", + "type": "long" + } + ] + } + }, + "flattenSpec": { + "useFieldDiscovery": true + }, + "binaryAsString": false +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/stream/data/avro_schema_registry/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/avro_schema_registry/input_format/input_format.json new file mode 100644 index 000000000000..c9c8ce7a8276 --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/avro_schema_registry/input_format/input_format.json @@ -0,0 +1,15 @@ +{ + "type": "avro_stream", + "avroBytesDecoder": { + "type": "schema_registry", + "url": "%%SCHEMA_REGISTRY_HOST%%", + "config": { + "basic.auth.credentials.source": "USER_INFO", + "basic.auth.user.info": "druid:diurd" + } + }, + "flattenSpec": { + "useFieldDiscovery": true + }, + "binaryAsString": false +} \ No newline at end of file diff --git a/web-console/src/druid-models/input-format.tsx b/web-console/src/druid-models/input-format.tsx index 7907498b3bd7..4bd9702fcef5 100644 --- a/web-console/src/druid-models/input-format.tsx +++ b/web-console/src/druid-models/input-format.tsx @@ -42,7 +42,7 @@ export const INPUT_FORMAT_FIELDS: Field[] = [ name: 'type', label: 'Input format', type: 'string', - suggestions: ['json', 'csv', 'tsv', 'regex', 'parquet', 'orc', 'avro_ocf'], + suggestions: ['json', 'csv', 'tsv', 'regex', 'parquet', 'orc', 'avro_ocf', 'avro_stream'], required: true, info: ( <> @@ -127,7 +127,7 @@ export const INPUT_FORMAT_FIELDS: Field[] = [ name: 'binaryAsString', type: 'boolean', defaultValue: false, - defined: (p: InputFormat) => oneOf(p.type, 'parquet', 'orc', 'avro_ocf'), + defined: (p: InputFormat) => oneOf(p.type, 'parquet', 'orc', 'avro_ocf', 'avro_stream'), info: ( <> Specifies if the binary column which is not logically marked as a string should be treated @@ -142,5 +142,5 @@ export function issueWithInputFormat(inputFormat: InputFormat | undefined): stri } export function inputFormatCanFlatten(inputFormat: InputFormat): boolean { - return oneOf(inputFormat.type, 'json', 'parquet', 'orc', 'avro_ocf'); + return oneOf(inputFormat.type, 'json', 'parquet', 'orc', 'avro_ocf', 'avro_stream'); } From 0d825f1797514d11e3f062170e10530fbd5ddf6b Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sun, 11 Apr 2021 11:38:06 +0800 Subject: [PATCH 7/9] bug fixed --- .../SchemaRegistryBasedAvroBytesDecoder.java | 63 +++++++++++++++++-- .../data/input/AvroStreamInputFormatTest.java | 16 +++++ 2 files changed, 74 insertions(+), 5 deletions(-) diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 123f8fae89bf..5b6975565dc4 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -45,6 +45,11 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder { private final SchemaRegistryClient registry; + private final String url; + private final int capacity; + private final List urls; + private final Map config; + private final Map headers; @JsonCreator public SchemaRegistryBasedAvroBytesDecoder( @@ -55,18 +60,57 @@ public SchemaRegistryBasedAvroBytesDecoder( @JsonProperty("headers") @Nullable Map headers ) { - int identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity; + this.url = url; + this.capacity = capacity == null ? Integer.MAX_VALUE : capacity; + this.urls = urls; + this.config = config; + this.headers = headers; if (url != null && !url.isEmpty()) { - this.registry = new CachedSchemaRegistryClient(url, identityMapCapacity, config, headers); + this.registry = new CachedSchemaRegistryClient(this.url, this.capacity, this.config, this.headers); } else { - this.registry = new CachedSchemaRegistryClient(urls, identityMapCapacity, config, headers); + this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, this.config, this.headers); } } + @JsonProperty + public String getUrl() + { + return url; + } + + @JsonProperty + public int getCapacity() + { + return capacity; + } + + @JsonProperty + public List getUrls() + { + return urls; + } + + @JsonProperty + public Map geConfig() + { + return config; + } + + @JsonProperty + public Map getHeaders() + { + return headers; + } + //For UT only @VisibleForTesting SchemaRegistryBasedAvroBytesDecoder(SchemaRegistryClient registry) { + this.url = null; + this.capacity = Integer.MAX_VALUE; + this.urls = null; + this.config = null; + this.headers = null; this.registry = registry; } @@ -114,12 +158,21 @@ public boolean equals(Object o) SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o; - return Objects.equals(registry, that.registry); + return Objects.equals(url, that.url) && + Objects.equals(capacity, that.capacity) && + Objects.equals(urls, that.urls) && + Objects.equals(config, that.config) && + Objects.equals(headers, that.headers); } @Override public int hashCode() { - return registry != null ? registry.hashCode() : 0; + int result = url != null ? url.hashCode() : 0; + result = 31 * result + capacity; + result = 31 * result + (urls != null ? urls.hashCode() : 0); + result = 31 * result + (config != null ? config.hashCode() : 0); + result = 31 * result + (headers != null ? headers.hashCode() : 0); + return result; } } diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java index a2199098508e..139616bfdbfa 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java @@ -29,6 +29,7 @@ import org.apache.avro.specific.SpecificDatumWriter; import org.apache.druid.data.input.avro.AvroExtensionsModule; import org.apache.druid.data.input.avro.AvroStreamInputFormat; +import org.apache.druid.data.input.avro.SchemaRegistryBasedAvroBytesDecoder; import org.apache.druid.data.input.avro.SchemaRepoBasedAvroBytesDecoder; import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -127,6 +128,21 @@ public void testSerde() throws IOException Assert.assertEquals(inputFormat, inputFormat2); } + @Test + public void testSerdeForSchemaRegistry() throws IOException + { + AvroStreamInputFormat inputFormat = new AvroStreamInputFormat( + flattenSpec, + new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null, null, null), + false + ); + NestedInputFormat inputFormat2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(inputFormat), + NestedInputFormat.class + ); + Assert.assertEquals(inputFormat, inputFormat2); + } + @Test public void testParse() throws SchemaValidationException, IOException { From 4eeeeda872867b6e6542d1f4d2e16ca659deab6f Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sun, 11 Apr 2021 15:36:33 +0800 Subject: [PATCH 8/9] bug fixed --- .../data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 5b6975565dc4..4b3da38c49c9 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -91,7 +91,7 @@ public List getUrls() } @JsonProperty - public Map geConfig() + public Map getConfig() { return config; } From f8dbbb4bbaef9fbc2839c8c2489f587bc334477b Mon Sep 17 00:00:00 2001 From: yuanyi Date: Mon, 12 Apr 2021 16:45:29 +0800 Subject: [PATCH 9/9] add string as binary getter --- .../apache/druid/data/input/avro/AvroStreamInputFormat.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java index 5b0bbd455670..3b59b368f0bf 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java @@ -62,6 +62,12 @@ public AvroBytesDecoder getAvroBytesDecoder() return avroBytesDecoder; } + @JsonProperty + public Boolean getBinaryAsString() + { + return binaryAsString; + } + @Override public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) {