From 862ba164bb559543a3f6e4f7cf6ac8d13bda4405 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 20 Mar 2021 15:32:50 +0800 Subject: [PATCH 01/10] add protobuf inputformat --- .../protobuf/ProtobufExtensionsModule.java | 3 +- .../input/protobuf/ProtobufInputFormat.java | 64 +++++++ .../data/input/protobuf/ProtobufReader.java | 88 +++++++++ .../protobuf/ProtobufInputFormatTest.java | 177 ++++++++++++++++++ 4 files changed, 331 insertions(+), 1 deletion(-) create mode 100644 extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java create mode 100644 extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java create mode 100644 extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufExtensionsModule.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufExtensionsModule.java index 9b05932ceb98..f823ad48cf9f 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufExtensionsModule.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufExtensionsModule.java @@ -37,7 +37,8 @@ public List getJacksonModules() return Collections.singletonList( new SimpleModule("ProtobufInputRowParserModule") .registerSubtypes( - new NamedType(ProtobufInputRowParser.class, "protobuf") + new NamedType(ProtobufInputRowParser.class, "protobuf"), + new NamedType(ProtobufInputFormat.class, "protobuf_format") ) ); } diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java new file mode 100644 index 000000000000..236bd48abf5a --- /dev/null +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.NestedInputFormat; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; + +import javax.annotation.Nullable; + +import java.io.File; + +public class ProtobufInputFormat extends NestedInputFormat +{ + private final ProtobufBytesDecoder protobufBytesDecoder; + + @JsonCreator + public ProtobufInputFormat( + @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec, + @JsonProperty("protoBytesDecoder") ProtobufBytesDecoder protobufBytesDecoder + ) + { + super(flattenSpec); + this.protobufBytesDecoder = protobufBytesDecoder; + } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + return new ProtobufReader( + inputRowSchema, + source, + protobufBytesDecoder, + getFlattenSpec() + ); + } +} diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java new file mode 100644 index 000000000000..4ed02c6481ef --- /dev/null +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterators; +import com.google.protobuf.util.JsonFormat; +import org.apache.commons.io.IOUtils; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.common.parsers.ObjectFlattener; +import org.apache.druid.java.util.common.parsers.ObjectFlatteners; +import org.apache.druid.java.util.common.parsers.ParseException; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class ProtobufReader extends IntermediateRowParsingReader +{ + private final InputRowSchema inputRowSchema; + private final InputEntity source; + private final ObjectFlattener recordFlattener; + private final ProtobufBytesDecoder protobufBytesDecoder; + + ProtobufReader( + InputRowSchema inputRowSchema, + InputEntity source, + ProtobufBytesDecoder protobufBytesDecoder, + JSONPathSpec flattenSpec + ) + { + this.inputRowSchema = inputRowSchema; + this.source = source; + this.protobufBytesDecoder = protobufBytesDecoder; + this.recordFlattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(true)); + } + + @Override + protected CloseableIterator intermediateRowIterator() throws IOException + { + return CloseableIterators.withEmptyBaggage( + Iterators.singletonIterator(JsonFormat.printer().print(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open()))) + ))); + } + + @Override + protected List parseInputRows(String intermediateRow) throws ParseException, JsonProcessingException + { + JsonNode document = new ObjectMapper().readValue(intermediateRow, JsonNode.class); + final Map flattened = recordFlattener.flatten(document); + return Collections.singletonList(MapInputRowParser.parse(inputRowSchema, flattened)); + } + + @Override + protected List> toMap(String intermediateRow) throws JsonProcessingException + { + return Collections.singletonList(new ObjectMapper().readValue(intermediateRow, Map.class)); + } +} diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java new file mode 100644 index 000000000000..2684fc265084 --- /dev/null +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.google.common.collect.Lists; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.joda.time.DateTime; +import org.joda.time.chrono.ISOChronology; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; + +public class ProtobufInputFormatTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private TimestampSpec timestampSpec; + private DimensionsSpec dimensionsSpec; + private JSONPathSpec flattenSpec; + private FileBasedProtobufBytesDecoder decoder; + + @Before + public void setUp() + { + timestampSpec = new TimestampSpec("timestamp", "iso", null); + dimensionsSpec = new DimensionsSpec(Lists.newArrayList( + new StringDimensionSchema("event"), + new StringDimensionSchema("id"), + new StringDimensionSchema("someOtherId"), + new StringDimensionSchema("isValid") + ), null, null); + flattenSpec = new JSONPathSpec( + true, + Lists.newArrayList( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "eventType", "eventType"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "foobar", "$.foo.bar"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar0", "$.bar[0].bar") + ) + ); + decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent"); + } + + @Test + public void testParseNestedData() throws Exception + { + //configure parser with desc file + ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(flattenSpec, decoder); + + //create binary of proto test event + DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); + ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder() + .setDescription("description") + .setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE) + .setId(4711L) + .setIsValid(true) + .setSomeOtherId(4712) + .setTimestamp(dateTime.toString()) + .setSomeFloatColumn(47.11F) + .setSomeIntColumn(815) + .setSomeLongColumn(816L) + .setFoo(ProtoTestEventWrapper.ProtoTestEvent.Foo + .newBuilder() + .setBar("baz")) + .addBar(ProtoTestEventWrapper.ProtoTestEvent.Foo + .newBuilder() + .setBar("bar0")) + .addBar(ProtoTestEventWrapper.ProtoTestEvent.Foo + .newBuilder() + .setBar("bar1")) + .build(); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + event.writeTo(out); + + final ByteEntity entity = new ByteEntity(ByteBuffer.wrap(out.toByteArray())); + + InputRow row = protobufInputFormat.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, Collections.emptyList()), entity, null).read().next(); + + Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); + + assertDimensionEquals(row, "id", "4711"); + assertDimensionEquals(row, "isValid", "true"); + assertDimensionEquals(row, "someOtherId", "4712"); + assertDimensionEquals(row, "description", "description"); + + assertDimensionEquals(row, "eventType", ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE.name()); + assertDimensionEquals(row, "foobar", "baz"); + assertDimensionEquals(row, "bar0", "bar0"); + + + Assert.assertEquals(47.11F, row.getMetric("someFloatColumn").floatValue(), 0.0); + Assert.assertEquals(815.0F, row.getMetric("someIntColumn").floatValue(), 0.0); + Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0); + } + + @Test + public void testParseFlatData() throws Exception + { + //configure parser with desc file + ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(null, decoder); + + //create binary of proto test event + DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); + ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder() + .setDescription("description") + .setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE) + .setId(4711L) + .setIsValid(true) + .setSomeOtherId(4712) + .setTimestamp(dateTime.toString()) + .setSomeFloatColumn(47.11F) + .setSomeIntColumn(815) + .setSomeLongColumn(816L) + .build(); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + event.writeTo(out); + + final ByteEntity entity = new ByteEntity(ByteBuffer.wrap(out.toByteArray())); + + InputRow row = protobufInputFormat.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, Collections.emptyList()), entity, null).read().next(); + + System.out.println(row); + + Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); + + assertDimensionEquals(row, "id", "4711"); + assertDimensionEquals(row, "isValid", "true"); + assertDimensionEquals(row, "someOtherId", "4712"); + assertDimensionEquals(row, "description", "description"); + + + Assert.assertEquals(47.11F, row.getMetric("someFloatColumn").floatValue(), 0.0); + Assert.assertEquals(815.0F, row.getMetric("someIntColumn").floatValue(), 0.0); + Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0); + } + + private void assertDimensionEquals(InputRow row, String dimension, Object expected) + { + List values = row.getDimension(dimension); + Assert.assertEquals(1, values.size()); + Assert.assertEquals(expected, values.get(0)); + } +} From e939a658d954eb3084eebc65a80bf25b0fee309b Mon Sep 17 00:00:00 2001 From: yuanyi Date: Mon, 22 Mar 2021 17:18:31 +0800 Subject: [PATCH 02/10] repair pom --- extensions-core/protobuf-extensions/pom.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/extensions-core/protobuf-extensions/pom.xml b/extensions-core/protobuf-extensions/pom.xml index aebbec3224fa..dad58bb4d1d0 100644 --- a/extensions-core/protobuf-extensions/pom.xml +++ b/extensions-core/protobuf-extensions/pom.xml @@ -137,6 +137,10 @@ 2.0.1 provided + + com.fasterxml.jackson.core + jackson-core + From 697c5925d33c6d76604e57fba4409f30f45d1cec Mon Sep 17 00:00:00 2001 From: yuanyi Date: Wed, 24 Mar 2021 18:46:10 +0800 Subject: [PATCH 03/10] alter intermediateRow to type of Dynamicmessage --- .../protobuf/ProtobufExtensionsModule.java | 2 +- .../data/input/protobuf/ProtobufReader.java | 43 ++++++++++++++----- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufExtensionsModule.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufExtensionsModule.java index f823ad48cf9f..a2293ec794cc 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufExtensionsModule.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufExtensionsModule.java @@ -38,7 +38,7 @@ public List getJacksonModules() new SimpleModule("ProtobufInputRowParserModule") .registerSubtypes( new NamedType(ProtobufInputRowParser.class, "protobuf"), - new NamedType(ProtobufInputFormat.class, "protobuf_format") + new NamedType(ProtobufInputFormat.class, "protobuf") ) ); } diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java index 4ed02c6481ef..38db4a830186 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Iterators; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; import org.apache.commons.io.IOUtils; import org.apache.druid.data.input.InputEntity; @@ -37,6 +39,7 @@ import org.apache.druid.java.util.common.parsers.ObjectFlattener; import org.apache.druid.java.util.common.parsers.ObjectFlatteners; import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.utils.CollectionUtils; import java.io.IOException; import java.nio.ByteBuffer; @@ -44,10 +47,11 @@ import java.util.List; import java.util.Map; -public class ProtobufReader extends IntermediateRowParsingReader +public class ProtobufReader extends IntermediateRowParsingReader { private final InputRowSchema inputRowSchema; private final InputEntity source; + private final JSONPathSpec flattenSpec; private final ObjectFlattener recordFlattener; private final ProtobufBytesDecoder protobufBytesDecoder; @@ -61,28 +65,47 @@ public class ProtobufReader extends IntermediateRowParsingReader this.inputRowSchema = inputRowSchema; this.source = source; this.protobufBytesDecoder = protobufBytesDecoder; + this.flattenSpec = flattenSpec; this.recordFlattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(true)); } @Override - protected CloseableIterator intermediateRowIterator() throws IOException + protected CloseableIterator intermediateRowIterator() throws IOException { return CloseableIterators.withEmptyBaggage( - Iterators.singletonIterator(JsonFormat.printer().print(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open()))) - ))); + Iterators.singletonIterator(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open()) + )))); } @Override - protected List parseInputRows(String intermediateRow) throws ParseException, JsonProcessingException + protected List parseInputRows(DynamicMessage intermediateRow) throws ParseException, JsonProcessingException { - JsonNode document = new ObjectMapper().readValue(intermediateRow, JsonNode.class); - final Map flattened = recordFlattener.flatten(document); - return Collections.singletonList(MapInputRowParser.parse(inputRowSchema, flattened)); + Map record; + + if (flattenSpec == null) { + try { + record = CollectionUtils.mapKeys(intermediateRow.getAllFields(), k -> k.getJsonName()); + } + catch (Exception ex) { + throw new ParseException(ex, "Protobuf message could not be parsed"); + } + } else { + try { + String json = JsonFormat.printer().print(intermediateRow); + JsonNode document = new ObjectMapper().readValue(json, JsonNode.class); + record = recordFlattener.flatten(document); + } + catch (InvalidProtocolBufferException e) { + throw new ParseException(e, "Protobuf message could not be parsed"); + } + } + + return Collections.singletonList(MapInputRowParser.parse(inputRowSchema, record)); } @Override - protected List> toMap(String intermediateRow) throws JsonProcessingException + protected List> toMap(DynamicMessage intermediateRow) throws JsonProcessingException, InvalidProtocolBufferException { - return Collections.singletonList(new ObjectMapper().readValue(intermediateRow, Map.class)); + return Collections.singletonList(new ObjectMapper().readValue(JsonFormat.printer().print(intermediateRow), Map.class)); } } From ecdd234e3429f2d76e451c427319d6ab1c310c90 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sun, 28 Mar 2021 12:53:20 +0800 Subject: [PATCH 04/10] add document --- docs/ingestion/data-formats.md | 117 +++++++++++++++++++++++++++++++-- 1 file changed, 113 insertions(+), 4 deletions(-) diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index 96f5d924b757..b675bcfa39a0 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -265,6 +265,36 @@ The `inputFormat` to load data of Avro OCF format. An example is: } ``` +### Protobuf + +> You need to include the [`druid-avro-extensions`](../development/extensions-core/protobuf.md) as an extension to use the Avro OCF input format. + +The `inputFormat` to load data of Protobuf format. An example is: +```json +"ioConfig": { + "inputFormat": { + "type": "protobuf", + "protoBytesDecoder": { + "type": "file", + "descriptor": "file:///tmp/metrics.desc", + "protoMessageType": "Metrics" + } + "flattenSpec": { + "useFieldDiscovery": true, + "fields": [ + { + "type": "path", + "name": "someRecord_subInt", + "expr": "$.someRecord.subInt" + } + ] + }, + "binaryAsString": false + }, + ... +} +``` + | Field | Type | Description | Required | |-------|------|-------------|----------| |type| String| This should be set to `avro_ocf` to read Avro OCF file| yes | @@ -1066,8 +1096,7 @@ This parser is for [stream ingestion](./index.md#streaming) and reads Protocol b | Field | Type | Description | Required | |-------|------|-------------|----------| | type | String | This should say `protobuf`. | yes | -| descriptor | String | Protobuf descriptor file name in the classpath or URL. | yes | -| protoMessageType | String | Protobuf message type in the descriptor. Both short name and fully qualified name are accepted. The parser uses the first message type found in the descriptor if not specified. | no | +| protoBytesDecoder | JSON Object | Specifies how to decode bytes to Protobuf record. | yes | | parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. The format must be JSON. See [JSON ParseSpec](./index.md) for more configuration options. Note that timeAndDims parseSpec is no longer supported. | yes | Sample spec: @@ -1075,8 +1104,11 @@ Sample spec: ```json "parser": { "type": "protobuf", - "descriptor": "file:///tmp/metrics.desc", - "protoMessageType": "Metrics", + "protoBytesDecoder": { + "type": "file", + "descriptor": "file:///tmp/metrics.desc", + "protoMessageType": "Metrics" + }, "parseSpec": { "format": "json", "timestampSpec": { @@ -1104,6 +1136,83 @@ Sample spec: See the [extension description](../development/extensions-core/protobuf.md) for more details and examples. +#### Protobuf Bytes Decoder + +If `type` is not included, the avroBytesDecoder defaults to `schema_registry`. + +##### File-based Protobuf Bytes Decoder + +This Protobuf bytes decoder first read a descriptor file, and then parse it to get schema used to decode the Protobuf record from bytes. + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | This should say `file`. | yes | +| descriptor | String | Protobuf descriptor file name in the classpath or URL. | yes | +| protoMessageType | String | Protobuf message type in the descriptor. Both short name and fully qualified name are accepted. The parser uses the first message type found in the descriptor if not specified. | no | + +Sample spec: + +```json +"protoBytesDecoder": { + "type": "file", + "descriptor": "file:///tmp/metrics.desc", + "protoMessageType": "Metrics" +} +``` + +##### Confluent Schema Registry-based Protobuf Bytes Decoder + +This Protobuf bytes decoder first extracts a unique `id` from input message bytes, and then uses it to look up the schema in the Schema Registry used to decode the Avro record from bytes. +For details, see the Schema Registry [documentation](http://docs.confluent.io/current/schema-registry/docs/) and [repository](https://github.com/confluentinc/schema-registry). + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | This should say `schema_registry`. | yes | +| url | String | Specifies the url endpoint of the Schema Registry. | yes | +| capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no | +| urls | Array | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) | +| config | Json | To send additional configurations, configured for Schema Registry | no | +| headers | Json | To send headers to the Schema Registry | no | + +For a single schema registry instance, use Field `url` or `urls` for multi instances. + +Single Instance: + +```json +... +"protoBytesDecoder": { + "url": , + "type": "schema_registry" +} +... +``` + +Multiple Instances: +```json +... +"protoBytesDecoder": { + "urls": [, , ...], + "type": "schema_registry", + "capacity": 100, + "config" : { + "basic.auth.credentials.source": "USER_INFO", + "basic.auth.user.info": "fred:letmein", + "schema.registry.ssl.truststore.location": "/some/secrets/kafka.client.truststore.jks", + "schema.registry.ssl.truststore.password": "", + "schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks", + "schema.registry.ssl.keystore.password": "", + "schema.registry.ssl.key.password": "", + ... + }, + "headers": { + "traceID" : "b29c5de2-0db4-490b-b421", + "timeStamp" : "1577191871865", + ... + } +} +... +``` + ## ParseSpec > The Parser is deprecated for [native batch tasks](./native-batch.md), [Kafka indexing service](../development/extensions-core/kafka-ingestion.md), From f9060eab01a82f390785a4e091b3a023a6cd61f6 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sun, 28 Mar 2021 16:41:42 +0800 Subject: [PATCH 05/10] refine test --- .../druid/data/input/protobuf/ProtobufInputFormatTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java index 2684fc265084..65c8657869e0 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java @@ -39,7 +39,6 @@ import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; -import java.util.Collections; import java.util.List; public class ProtobufInputFormatTest @@ -107,7 +106,7 @@ public void testParseNestedData() throws Exception final ByteEntity entity = new ByteEntity(ByteBuffer.wrap(out.toByteArray())); - InputRow row = protobufInputFormat.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, Collections.emptyList()), entity, null).read().next(); + InputRow row = protobufInputFormat.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, null), entity, null).read().next(); Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); @@ -151,7 +150,7 @@ public void testParseFlatData() throws Exception final ByteEntity entity = new ByteEntity(ByteBuffer.wrap(out.toByteArray())); - InputRow row = protobufInputFormat.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, Collections.emptyList()), entity, null).read().next(); + InputRow row = protobufInputFormat.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, null), entity, null).read().next(); System.out.println(row); From 4173eed821b4ba164c1860da7376de22bc76d451 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sun, 28 Mar 2021 16:51:33 +0800 Subject: [PATCH 06/10] fix document --- docs/ingestion/data-formats.md | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index b675bcfa39a0..a115511cddc5 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -265,9 +265,16 @@ The `inputFormat` to load data of Avro OCF format. An example is: } ``` +| Field | Type | Description | Required | +|-------|------|-------------|----------| +|type| String| This should be set to `avro_ocf` to read Avro OCF file| yes | +|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Avro records. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) | +|schema| JSON Object |Define a reader schema to be used when parsing Avro records, this is useful when parsing multiple versions of Avro OCF file data | no (default will decode using the writer schema contained in the OCF file) | +| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) | + ### Protobuf -> You need to include the [`druid-avro-extensions`](../development/extensions-core/protobuf.md) as an extension to use the Avro OCF input format. +> You need to include the [`druid-protobuf-extensions`](../development/extensions-core/protobuf.md) as an extension to use the Protobuf input format. The `inputFormat` to load data of Protobuf format. An example is: ```json @@ -297,10 +304,9 @@ The `inputFormat` to load data of Protobuf format. An example is: | Field | Type | Description | Required | |-------|------|-------------|----------| -|type| String| This should be set to `avro_ocf` to read Avro OCF file| yes | -|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Avro records. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) | -|schema| JSON Object |Define a reader schema to be used when parsing Avro records, this is useful when parsing multiple versions of Avro OCF file data | no (default will decode using the writer schema contained in the OCF file) | -| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) | +|type| String| This should be set to `protobuf` to read Protobuf file| yes | +|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Protobuf record. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) | +|protoBytesDecoder| JSON Object |Specifies how to decode bytes to Protobuf record. | yes | ### FlattenSpec From 7f475f74f20cabf7611a8bacdec1a8aeddf34681 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Mon, 29 Mar 2021 13:31:15 +0800 Subject: [PATCH 07/10] add protoBytesDecoder --- website/.spelling | 1 + 1 file changed, 1 insertion(+) diff --git a/website/.spelling b/website/.spelling index 50a22a561fff..3873b1bbb485 100644 --- a/website/.spelling +++ b/website/.spelling @@ -630,6 +630,7 @@ Avro-1124 SchemaRepo avro avroBytesDecoder +protoBytesDecoder flattenSpec jq org.apache.druid.extensions From 1751a3f1ce1e7fba04d2f014ce5a892c6f8f36be Mon Sep 17 00:00:00 2001 From: yuanyi Date: Tue, 30 Mar 2021 21:09:42 +0800 Subject: [PATCH 08/10] refine document and add ser test --- docs/ingestion/data-formats.md | 11 ++++--- extensions-core/protobuf-extensions/pom.xml | 6 ++++ .../FileBasedProtobufBytesDecoder.java | 29 +++++++++++++++++++ .../input/protobuf/ProtobufInputFormat.java | 21 ++++++++++++++ .../data/input/protobuf/ProtobufReader.java | 4 +-- .../protobuf/ProtobufInputFormatTest.java | 25 ++++++++++++++++ 6 files changed, 88 insertions(+), 8 deletions(-) diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index a115511cddc5..96f2b928933f 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -295,8 +295,7 @@ The `inputFormat` to load data of Protobuf format. An example is: "expr": "$.someRecord.subInt" } ] - }, - "binaryAsString": false + } }, ... } @@ -304,9 +303,9 @@ The `inputFormat` to load data of Protobuf format. An example is: | Field | Type | Description | Required | |-------|------|-------------|----------| -|type| String| This should be set to `protobuf` to read Protobuf file| yes | +|type| String| This should be set to `protobuf` to read Protobuf serialized data| yes | |flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Protobuf record. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) | -|protoBytesDecoder| JSON Object |Specifies how to decode bytes to Protobuf record. | yes | +|`protoBytesDecoder`| JSON Object |Specifies how to decode bytes to Protobuf record. | yes | ### FlattenSpec @@ -1102,7 +1101,7 @@ This parser is for [stream ingestion](./index.md#streaming) and reads Protocol b | Field | Type | Description | Required | |-------|------|-------------|----------| | type | String | This should say `protobuf`. | yes | -| protoBytesDecoder | JSON Object | Specifies how to decode bytes to Protobuf record. | yes | +| `protoBytesDecoder` | JSON Object | Specifies how to decode bytes to Protobuf record. | yes | | parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. The format must be JSON. See [JSON ParseSpec](./index.md) for more configuration options. Note that timeAndDims parseSpec is no longer supported. | yes | Sample spec: @@ -1144,7 +1143,7 @@ more details and examples. #### Protobuf Bytes Decoder -If `type` is not included, the avroBytesDecoder defaults to `schema_registry`. +If `type` is not included, the `protoBytesDecoder` defaults to `schema_registry`. ##### File-based Protobuf Bytes Decoder diff --git a/extensions-core/protobuf-extensions/pom.xml b/extensions-core/protobuf-extensions/pom.xml index dad58bb4d1d0..c523ec209261 100644 --- a/extensions-core/protobuf-extensions/pom.xml +++ b/extensions-core/protobuf-extensions/pom.xml @@ -158,6 +158,12 @@ mockito-core test + + org.apache.druid + druid-processing + ${project.parent.version} + test + diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java index 35d5c4fb5d05..e3f044b3a8dc 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java @@ -34,6 +34,7 @@ import java.net.MalformedURLException; import java.net.URL; import java.nio.ByteBuffer; +import java.util.Objects; import java.util.Set; public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder @@ -54,6 +55,18 @@ public FileBasedProtobufBytesDecoder( initDescriptor(); } + @JsonProperty + public String getDescriptor() + { + return descriptorFilePath; + } + + @JsonProperty + public String getProtoMessageType() + { + return protoMessageType; + } + @VisibleForTesting void initDescriptor() { @@ -123,4 +136,20 @@ private Descriptors.Descriptor getDescriptor(String descriptorFilePath) } return desc; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FileBasedProtobufBytesDecoder that = (FileBasedProtobufBytesDecoder) o; + + return Objects.equals(descriptorFilePath, that.descriptorFilePath); + } + } diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java index 236bd48abf5a..d7d37ac0a266 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java @@ -30,6 +30,7 @@ import javax.annotation.Nullable; import java.io.File; +import java.util.Objects; public class ProtobufInputFormat extends NestedInputFormat { @@ -45,6 +46,12 @@ public ProtobufInputFormat( this.protobufBytesDecoder = protobufBytesDecoder; } + @JsonProperty + public ProtobufBytesDecoder getProtoBytesDecoder() + { + return protobufBytesDecoder; + } + @Override public boolean isSplittable() { @@ -61,4 +68,18 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity getFlattenSpec() ); } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ProtobufInputFormat that = (ProtobufInputFormat) o; + return Objects.equals(getFlattenSpec(), that.getFlattenSpec()) && + Objects.equals(protobufBytesDecoder, that.protobufBytesDecoder); + } } diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java index 38db4a830186..5f7aed65f262 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java @@ -73,8 +73,8 @@ public class ProtobufReader extends IntermediateRowParsingReader protected CloseableIterator intermediateRowIterator() throws IOException { return CloseableIterators.withEmptyBaggage( - Iterators.singletonIterator(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open()) - )))); + Iterators.singletonIterator(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open())))) + ); } @Override diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java index 65c8657869e0..f0e0a8bbf233 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java @@ -19,13 +19,17 @@ package org.apache.druid.data.input.protobuf; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.NestedInputFormat; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; import org.apache.druid.java.util.common.parsers.JSONPathFieldType; import org.apache.druid.java.util.common.parsers.JSONPathSpec; @@ -38,6 +42,7 @@ import org.junit.rules.ExpectedException; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; @@ -51,6 +56,8 @@ public class ProtobufInputFormatTest private JSONPathSpec flattenSpec; private FileBasedProtobufBytesDecoder decoder; + private final ObjectMapper jsonMapper = new DefaultObjectMapper(); + @Before public void setUp() { @@ -70,6 +77,24 @@ public void setUp() ) ); decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent"); + for (Module jacksonModule : new ProtobufExtensionsModule().getJacksonModules()) { + jsonMapper.registerModule(jacksonModule); + } + } + + @Test + public void testSerde() throws IOException + { + ProtobufInputFormat inputFormat = new ProtobufInputFormat( + flattenSpec, + decoder + ); + NestedInputFormat inputFormat2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(inputFormat), + NestedInputFormat.class + ); + + Assert.assertEquals(inputFormat, inputFormat2); } @Test From 287206998fe17c15bee9321daca356d20a81b620 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Wed, 31 Mar 2021 10:03:13 +0800 Subject: [PATCH 09/10] add hash --- .../data/input/protobuf/FileBasedProtobufBytesDecoder.java | 6 ++++++ .../druid/data/input/protobuf/ProtobufInputFormat.java | 7 +++++++ 2 files changed, 13 insertions(+) diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java index e3f044b3a8dc..1a9438fb7351 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java @@ -152,4 +152,10 @@ public boolean equals(Object o) return Objects.equals(descriptorFilePath, that.descriptorFilePath); } + @Override + public int hashCode() + { + return Objects.hash(descriptorFilePath, protoMessageType); + } + } diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java index d7d37ac0a266..36ae06875e3d 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java @@ -82,4 +82,11 @@ public boolean equals(final Object o) return Objects.equals(getFlattenSpec(), that.getFlattenSpec()) && Objects.equals(protobufBytesDecoder, that.protobufBytesDecoder); } + + @Override + public int hashCode() + { + return Objects.hash(protobufBytesDecoder, getFlattenSpec()); + } + } From 6251f000a6967f4767be5433834e151b37428c05 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sun, 11 Apr 2021 15:59:57 +0800 Subject: [PATCH 10/10] add schema registry ser test --- .../FileBasedProtobufBytesDecoder.java | 3 +- ...hemaRegistryBasedProtobufBytesDecoder.java | 84 +++++++++++++++++-- .../protobuf/ProtobufInputFormatTest.java | 14 ++++ 3 files changed, 95 insertions(+), 6 deletions(-) diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java index 1a9438fb7351..cda5eb3f0494 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java @@ -149,7 +149,8 @@ public boolean equals(Object o) FileBasedProtobufBytesDecoder that = (FileBasedProtobufBytesDecoder) o; - return Objects.equals(descriptorFilePath, that.descriptorFilePath); + return Objects.equals(descriptorFilePath, that.descriptorFilePath) && + Objects.equals(protoMessageType, that.protoMessageType); } @Override diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java index b1435dfc3320..2d4cc8dd5556 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java @@ -39,6 +39,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDecoder { @@ -46,7 +47,11 @@ public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDec private static final Logger LOGGER = new Logger(SchemaRegistryBasedProtobufBytesDecoder.class); private final SchemaRegistryClient registry; - private int identityMapCapacity; + private final String url; + private final int capacity; + private final List urls; + private final Map config; + private final Map headers; @JsonCreator public SchemaRegistryBasedProtobufBytesDecoder( @@ -57,23 +62,62 @@ public SchemaRegistryBasedProtobufBytesDecoder( @JsonProperty("headers") @Nullable Map headers ) { - this.identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity; + this.url = url; + this.capacity = capacity == null ? Integer.MAX_VALUE : capacity; + this.urls = urls; + this.config = config; + this.headers = headers; if (url != null && !url.isEmpty()) { - this.registry = new CachedSchemaRegistryClient(Collections.singletonList(url), identityMapCapacity, Collections.singletonList(new ProtobufSchemaProvider()), config, headers); + this.registry = new CachedSchemaRegistryClient(Collections.singletonList(this.url), this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), this.config, this.headers); } else { - this.registry = new CachedSchemaRegistryClient(urls, identityMapCapacity, Collections.singletonList(new ProtobufSchemaProvider()), config, headers); + this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), this.config, this.headers); } } + @JsonProperty + public String getUrl() + { + return url; + } + + @JsonProperty + public int getCapacity() + { + return capacity; + } + + @JsonProperty + public List getUrls() + { + return urls; + } + + @JsonProperty + public Map getConfig() + { + return config; + } + + @JsonProperty + public Map getHeaders() + { + return headers; + } + @VisibleForTesting int getIdentityMapCapacity() { - return this.identityMapCapacity; + return this.capacity; } @VisibleForTesting SchemaRegistryBasedProtobufBytesDecoder(SchemaRegistryClient registry) { + this.url = null; + this.capacity = Integer.MAX_VALUE; + this.urls = null; + this.config = null; + this.headers = null; this.registry = registry; } @@ -108,4 +152,34 @@ public DynamicMessage parse(ByteBuffer bytes) throw new ParseException(e, "Fail to decode protobuf message!"); } } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SchemaRegistryBasedProtobufBytesDecoder that = (SchemaRegistryBasedProtobufBytesDecoder) o; + + return Objects.equals(url, that.url) && + Objects.equals(capacity, that.capacity) && + Objects.equals(urls, that.urls) && + Objects.equals(config, that.config) && + Objects.equals(headers, that.headers); + } + + @Override + public int hashCode() + { + int result = url != null ? url.hashCode() : 0; + result = 31 * result + capacity; + result = 31 * result + (urls != null ? urls.hashCode() : 0); + result = 31 * result + (config != null ? config.hashCode() : 0); + result = 31 * result + (headers != null ? headers.hashCode() : 0); + return result; + } } diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java index f0e0a8bbf233..c73f513dd905 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java @@ -97,6 +97,20 @@ public void testSerde() throws IOException Assert.assertEquals(inputFormat, inputFormat2); } + @Test + public void testSerdeForSchemaRegistry() throws IOException + { + ProtobufInputFormat inputFormat = new ProtobufInputFormat( + flattenSpec, + new SchemaRegistryBasedProtobufBytesDecoder("http://test:8081", 100, null, null, null) + ); + NestedInputFormat inputFormat2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(inputFormat), + NestedInputFormat.class + ); + Assert.assertEquals(inputFormat, inputFormat2); + } + @Test public void testParseNestedData() throws Exception {