From c295c45830d92c242ce9dadbdc14508e9032e181 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Mon, 1 Feb 2021 11:19:44 +0800 Subject: [PATCH 01/12] dd_protobuf_schema_registry --- .../benchmark/ProtobufParserBenchmark.java | 7 +- distribution/bin/check-licenses.py | 3 + extensions-core/protobuf-extensions/pom.xml | 42 +++++- .../FileBasedProtobufBytesDecoder.java | 127 ++++++++++++++++++ .../input/protobuf/ProtobufBytesDecoder.java | 36 +++++ .../protobuf/ProtobufInputRowParser.java | 94 ++----------- ...hemaRegistryBasedProtobufBytesDecoder.java | 103 ++++++++++++++ .../FileBasedProtobufBytesDecoderTest.java | 76 +++++++++++ .../protobuf/ProtobufInputRowParserTest.java | 56 +------- ...RegistryBasedProtobufBytesDecoderTest.java | 110 +++++++++++++++ licenses.yaml | 23 +++- 11 files changed, 535 insertions(+), 142 deletions(-) create mode 100644 extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java create mode 100644 extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufBytesDecoder.java create mode 100644 extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java create mode 100644 extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoderTest.java create mode 100644 extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java index b408076806cf..df580af3fe63 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java @@ -28,6 +28,7 @@ import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.data.input.protobuf.FileBasedProtobufBytesDecoder; import org.apache.druid.data.input.protobuf.ProtobufInputRowParser; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; @@ -73,6 +74,7 @@ public class ProtobufParserBenchmark private ProtobufInputRowParser flatParser; private byte[] protoInputs; private String protoFilePath; + private FileBasedProtobufBytesDecoder decoder; @Setup public void setup() @@ -109,11 +111,12 @@ public void setup() null, null ); + decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent"); protoFilePath = "ProtoFile"; protoInputs = getProtoInputs(protoFilePath); - nestedParser = new ProtobufInputRowParser(nestedParseSpec, "prototest.desc", "ProtoTestEvent"); - flatParser = new ProtobufInputRowParser(flatParseSpec, "prototest.desc", "ProtoTestEvent"); + nestedParser = new ProtobufInputRowParser(nestedParseSpec, decoder); + flatParser = new ProtobufInputRowParser(flatParseSpec, decoder); } @Benchmark diff --git a/distribution/bin/check-licenses.py b/distribution/bin/check-licenses.py index 0d3f83b0255d..5ce68d19439d 100755 --- a/distribution/bin/check-licenses.py +++ b/distribution/bin/check-licenses.py @@ -225,6 +225,9 @@ def build_compatible_license_names(): compatible_licenses['Apache License v2.0'] = 'Apache License version 2.0' compatible_licenses['Apache License, version 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache 2.0 License'] = 'Apache License version 2.0' + compatible_licenses['Apache License, 2.0'] = 'Apache License version 2.0' + compatible_licenses['Confluent Community License'] = 'Confluent Community License' + compatible_licenses['EPL 2.0'] = 'EPL 2.0' compatible_licenses['Public Domain'] = 'Public Domain' diff --git a/extensions-core/protobuf-extensions/pom.xml b/extensions-core/protobuf-extensions/pom.xml index 8dba4ce097a7..f122046a8320 100644 --- a/extensions-core/protobuf-extensions/pom.xml +++ b/extensions-core/protobuf-extensions/pom.xml @@ -33,6 +33,18 @@ ../../pom.xml + + 6.0.1 + 2.6 + + + + + confluent + https://packages.confluent.io/maven/ + + + org.apache.druid @@ -52,14 +64,31 @@ com.google.guava - guava + protobuf-java + + + + + io.confluent + kafka-schema-registry-client + ${confluent.version} + + + io.confluent + kafka-protobuf-provider + ${confluent.version} + + + protobuf-java-util + com.google.protobuf - com.github.os72 - protobuf-dynamic - 0.9.3 + commons-io + commons-io + ${commons-io.version} + provided com.fasterxml.jackson.core @@ -98,6 +127,11 @@ hamcrest-core test + + org.mockito + mockito-core + test + diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java new file mode 100644 index 000000000000..74c11bdbeafb --- /dev/null +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import io.confluent.kafka.schemaregistry.protobuf.dynamic.DynamicSchema; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.ParseException; + +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.ByteBuffer; +import java.util.Set; + +public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder +{ + private final String descriptorFilePath; + private final String protoMessageType; + private Descriptors.Descriptor descriptor; + + + @JsonCreator + public FileBasedProtobufBytesDecoder( + @JsonProperty("descriptor") String descriptorFilePath, + @JsonProperty("protoMessageType") String protoMessageType + ) + { + this.descriptorFilePath = descriptorFilePath; + this.protoMessageType = protoMessageType; + initDescriptor(); + } + + @VisibleForTesting + void initDescriptor() + { + if (this.descriptor == null) { + this.descriptor = getDescriptor(descriptorFilePath); + } + } + + @SuppressWarnings("checkstyle:RightCurly") + @Override + public DynamicMessage parse(ByteBuffer bytes) + { + try { + DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(bytes)); + return message; + } + catch (Exception e) { + throw new ParseException(e, "Fail to decode protobuf message!"); + } + } + + private Descriptors.Descriptor getDescriptor(String descriptorFilePath) + { + InputStream fin; + + fin = this.getClass().getClassLoader().getResourceAsStream(descriptorFilePath); + if (fin == null) { + URL url; + try { + url = new URL(descriptorFilePath); + } + catch (MalformedURLException e) { + throw new ParseException(e, "Descriptor not found in class path or malformed URL:" + descriptorFilePath); + } + try { + fin = url.openConnection().getInputStream(); + } + catch (IOException e) { + throw new ParseException(e, "Cannot read descriptor file: " + url); + } + } + DynamicSchema dynamicSchema; + try { + dynamicSchema = DynamicSchema.parseFrom(fin); + } + catch (Descriptors.DescriptorValidationException e) { + throw new ParseException(e, "Invalid descriptor file: " + descriptorFilePath); + } + catch (IOException e) { + throw new ParseException(e, "Cannot read descriptor file: " + descriptorFilePath); + } + + Set messageTypes = dynamicSchema.getMessageTypes(); + if (messageTypes.size() == 0) { + throw new ParseException("No message types found in the descriptor: " + descriptorFilePath); + } + + String messageType = protoMessageType == null ? (String) messageTypes.toArray()[0] : protoMessageType; + Descriptors.Descriptor desc = dynamicSchema.getMessageDescriptor(messageType); + if (desc == null) { + throw new ParseException( + StringUtils.format( + "Protobuf message type %s not found in the specified descriptor. Available messages types are %s", + protoMessageType, + messageTypes + ) + ); + } + return desc; + } +} diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufBytesDecoder.java new file mode 100644 index 000000000000..1defa705aec0 --- /dev/null +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufBytesDecoder.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.protobuf.DynamicMessage; + +import java.nio.ByteBuffer; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SchemaRegistryBasedProtobufBytesDecoder.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "file", value = FileBasedProtobufBytesDecoder.class), + @JsonSubTypes.Type(name = "schema_registry", value = SchemaRegistryBasedProtobufBytesDecoder.class) +}) +public interface ProtobufBytesDecoder +{ + DynamicMessage parse(ByteBuffer bytes); +} diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java index 9c04265e6a5f..e4597d37a625 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java @@ -21,14 +21,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.github.os72.protobuf.dynamic.DynamicSchema; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.google.protobuf.ByteString; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.DynamicMessage; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; @@ -37,39 +32,32 @@ import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.JSONParseSpec; import org.apache.druid.data.input.impl.ParseSpec; -import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.Parser; import org.apache.druid.utils.CollectionUtils; -import java.io.IOException; -import java.io.InputStream; -import java.net.MalformedURLException; -import java.net.URL; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; -import java.util.Set; public class ProtobufInputRowParser implements ByteBufferInputRowParser { + private static final Logger LOG = new Logger(ByteBufferInputRowParser.class); + private final ParseSpec parseSpec; - private final String descriptorFilePath; - private final String protoMessageType; - private Descriptor descriptor; + private final ProtobufBytesDecoder protobufBytesDecoder; private Parser parser; private final List dimensions; @JsonCreator public ProtobufInputRowParser( @JsonProperty("parseSpec") ParseSpec parseSpec, - @JsonProperty("descriptor") String descriptorFilePath, - @JsonProperty("protoMessageType") String protoMessageType + @JsonProperty("protoBytesDecoder") ProtobufBytesDecoder protobufBytesDecoder ) { this.parseSpec = parseSpec; - this.descriptorFilePath = descriptorFilePath; - this.protoMessageType = protoMessageType; + this.protobufBytesDecoder = protobufBytesDecoder; this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames(); } @@ -82,39 +70,28 @@ public ParseSpec getParseSpec() @Override public ProtobufInputRowParser withParseSpec(ParseSpec parseSpec) { - return new ProtobufInputRowParser(parseSpec, descriptorFilePath, protoMessageType); - } - - @VisibleForTesting - void initDescriptor() - { - if (this.descriptor == null) { - this.descriptor = getDescriptor(descriptorFilePath); - } + return new ProtobufInputRowParser(parseSpec, protobufBytesDecoder); } @Override public List parseBatch(ByteBuffer input) { if (parser == null) { - // parser should be created when it is really used to avoid unnecessary initialization of the underlying - // parseSpec. parser = parseSpec.makeParser(); - initDescriptor(); } Map record; if (parseSpec instanceof JSONParseSpec && ((JSONParseSpec) parseSpec).getFlattenSpec().getFields().isEmpty()) { try { - DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input)); + DynamicMessage message = protobufBytesDecoder.parse(input); record = CollectionUtils.mapKeys(message.getAllFields(), k -> k.getJsonName()); } - catch (InvalidProtocolBufferException ex) { + catch (Exception ex) { throw new ParseException(ex, "Protobuf message could not be parsed"); } } else { try { - DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input)); + DynamicMessage message = protobufBytesDecoder.parse(input); String json = JsonFormat.printer().print(message); record = parser.parseToMap(json); } @@ -137,55 +114,4 @@ record = parser.parseToMap(json); record )); } - - private Descriptor getDescriptor(String descriptorFilePath) - { - InputStream fin; - - fin = this.getClass().getClassLoader().getResourceAsStream(descriptorFilePath); - if (fin == null) { - URL url; - try { - url = new URL(descriptorFilePath); - } - catch (MalformedURLException e) { - throw new ParseException(e, "Descriptor not found in class path or malformed URL:" + descriptorFilePath); - } - try { - fin = url.openConnection().getInputStream(); - } - catch (IOException e) { - throw new ParseException(e, "Cannot read descriptor file: " + url); - } - } - - DynamicSchema dynamicSchema; - try { - dynamicSchema = DynamicSchema.parseFrom(fin); - } - catch (Descriptors.DescriptorValidationException e) { - throw new ParseException(e, "Invalid descriptor file: " + descriptorFilePath); - } - catch (IOException e) { - throw new ParseException(e, "Cannot read descriptor file: " + descriptorFilePath); - } - - Set messageTypes = dynamicSchema.getMessageTypes(); - if (messageTypes.size() == 0) { - throw new ParseException("No message types found in the descriptor: " + descriptorFilePath); - } - - String messageType = protoMessageType == null ? (String) messageTypes.toArray()[0] : protoMessageType; - Descriptor desc = dynamicSchema.getMessageDescriptor(messageType); - if (desc == null) { - throw new ParseException( - StringUtils.format( - "Protobuf message type %s not found in the specified descriptor. Available messages types are %s", - protoMessageType, - messageTypes - ) - ); - } - return desc; - } } diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java new file mode 100644 index 000000000000..b0f5d3afbf4b --- /dev/null +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.common.parsers.ParseException; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Objects; + +public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDecoder +{ + + private static final Logger LOGGER = new Logger(SchemaRegistryBasedProtobufBytesDecoder.class); + + private final SchemaRegistryClient registry; + + @JsonCreator + public SchemaRegistryBasedProtobufBytesDecoder( + @JsonProperty("url") String url, + @JsonProperty("capacity") Integer capacity + ) + { + int identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity; + registry = new CachedSchemaRegistryClient(Collections.singletonList(url), identityMapCapacity, Collections.singletonList(new ProtobufSchemaProvider()), null); + } + + //For UT only + @VisibleForTesting + SchemaRegistryBasedProtobufBytesDecoder(SchemaRegistryClient registry) + { + this.registry = registry; + } + + @Override + public DynamicMessage parse(ByteBuffer bytes) + { + try { + bytes.get(); // ignore first \0 byte + int id = bytes.getInt(); // extract schema registry id + bytes.get(); // ignore \0 byte before PB message + int length = bytes.limit() - 2 - 4; + ProtobufSchema schema = (ProtobufSchema) registry.getSchemaById(id); + Descriptors.Descriptor descriptor = schema.toDescriptor(); + byte[] rawMessage = new byte[length]; + bytes.get(rawMessage, 0, length); + DynamicMessage message = DynamicMessage.parseFrom(descriptor, rawMessage); + return message; + } + catch (Exception e) { + LOGGER.error(e.getMessage()); + throw new ParseException(e, "Fail to decode protobuf message!"); + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SchemaRegistryBasedProtobufBytesDecoder that = (SchemaRegistryBasedProtobufBytesDecoder) o; + + return Objects.equals(registry, that.registry); + } + + @Override + public int hashCode() + { + return registry != null ? registry.hashCode() : 0; + } +} diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoderTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoderTest.java new file mode 100644 index 000000000000..18db5f12a37b --- /dev/null +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoderTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import org.apache.druid.java.util.common.parsers.ParseException; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class FileBasedProtobufBytesDecoderTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testShortMessageType() + { + //configure parser with desc file, and specify which file name to use + @SuppressWarnings("unused") // expected to create parser without exception + FileBasedProtobufBytesDecoder decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent"); + decoder.initDescriptor(); + } + + @Test + public void testLongMessageType() + { + //configure parser with desc file, and specify which file name to use + @SuppressWarnings("unused") // expected to create parser without exception + FileBasedProtobufBytesDecoder decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "prototest.ProtoTestEvent"); + decoder.initDescriptor(); + } + + @Test(expected = ParseException.class) + public void testBadProto() + { + //configure parser with desc file + @SuppressWarnings("unused") // expected exception + FileBasedProtobufBytesDecoder decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "BadName"); + decoder.initDescriptor(); + } + + @Test(expected = ParseException.class) + public void testMalformedDescriptorUrl() + { + //configure parser with non existent desc file + @SuppressWarnings("unused") // expected exception + FileBasedProtobufBytesDecoder decoder = new FileBasedProtobufBytesDecoder("file:/nonexist.desc", "BadName"); + decoder.initDescriptor(); + } + + @Test + public void testSingleDescriptorNoMessageType() + { + // For the backward compatibility, protoMessageType allows null when the desc file has only one message type. + @SuppressWarnings("unused") // expected to create parser without exception + FileBasedProtobufBytesDecoder decoder = new FileBasedProtobufBytesDecoder("prototest.desc", null); + decoder.initDescriptor(); + } +} diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java index 3171cc577d5f..0bb48a8b9bdc 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java @@ -31,7 +31,6 @@ import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; import org.apache.druid.java.util.common.parsers.JSONPathFieldType; import org.apache.druid.java.util.common.parsers.JSONPathSpec; -import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.js.JavaScriptConfig; import org.hamcrest.CoreMatchers; import org.joda.time.DateTime; @@ -53,6 +52,7 @@ public class ProtobufInputRowParserTest private ParseSpec parseSpec; private ParseSpec flatParseSpec; + private FileBasedProtobufBytesDecoder decoder; @Before public void setUp() @@ -90,60 +90,14 @@ public void setUp() null, null ); - } - - @Test - public void testShortMessageType() - { - //configure parser with desc file, and specify which file name to use - @SuppressWarnings("unused") // expected to create parser without exception - ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "ProtoTestEvent"); - parser.initDescriptor(); - } - - - @Test - public void testLongMessageType() - { - //configure parser with desc file, and specify which file name to use - @SuppressWarnings("unused") // expected to create parser without exception - ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "prototest.ProtoTestEvent"); - parser.initDescriptor(); - } - - - @Test(expected = ParseException.class) - public void testBadProto() - { - //configure parser with desc file - @SuppressWarnings("unused") // expected exception - ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "BadName"); - parser.initDescriptor(); - } - - @Test(expected = ParseException.class) - public void testMalformedDescriptorUrl() - { - //configure parser with non existent desc file - @SuppressWarnings("unused") // expected exception - ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "file:/nonexist.desc", "BadName"); - parser.initDescriptor(); - } - - @Test - public void testSingleDescriptorNoMessageType() - { - // For the backward compatibility, protoMessageType allows null when the desc file has only one message type. - @SuppressWarnings("unused") // expected to create parser without exception - ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", null); - parser.initDescriptor(); + decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent"); } @Test public void testParseNestedData() throws Exception { //configure parser with desc file - ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "ProtoTestEvent"); + ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, decoder); //create binary of proto test event DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); @@ -194,7 +148,7 @@ public void testParseNestedData() throws Exception public void testParseFlatData() throws Exception { //configure parser with desc file - ProtobufInputRowParser parser = new ProtobufInputRowParser(flatParseSpec, "prototest.desc", "ProtoTestEvent"); + ProtobufInputRowParser parser = new ProtobufInputRowParser(flatParseSpec, decoder); //create binary of proto test event DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); @@ -247,7 +201,7 @@ public void testDisableJavaScript() "func", new JavaScriptConfig(false) ); - final ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "ProtoTestEvent"); + final ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, decoder); expectedException.expect(CoreMatchers.instanceOf(IllegalStateException.class)); expectedException.expectMessage("JavaScript is disabled"); diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java new file mode 100644 index 000000000000..eb95fa54ce79 --- /dev/null +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.google.protobuf.DynamicMessage; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import org.apache.commons.io.IOUtils; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.joda.time.DateTime; +import org.joda.time.chrono.ISOChronology; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +public class SchemaRegistryBasedProtobufBytesDecoderTest +{ + private SchemaRegistryClient registry; + + @Before + public void setUp() + { + registry = Mockito.mock(CachedSchemaRegistryClient.class); + } + + @Test + public void testParse() throws Exception + { + // Given + InputStream fin; + fin = this.getClass().getClassLoader().getResourceAsStream("prototest.proto"); + String protobufString = IOUtils.toString(fin, StandardCharsets.UTF_8); + Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new ProtobufSchema(protobufString)); + ProtoTestEventWrapper.ProtoTestEvent event = getTestEvent(); + byte[] bytes = event.toByteArray(); + ByteBuffer bb = ByteBuffer.allocate(bytes.length + 6).put((byte) 0).putInt(1234).put((byte) 0).put(bytes); + bb.rewind(); + // When + DynamicMessage actual = new SchemaRegistryBasedProtobufBytesDecoder(registry).parse(bb); + // Then + Assert.assertEquals(actual.getField(actual.getDescriptorForType().findFieldByName("id")), event.getId()); + } + + @Test(expected = ParseException.class) + public void testParseCorrupted() throws Exception + { + // Given + InputStream fin; + fin = this.getClass().getClassLoader().getResourceAsStream("prototest.proto"); + String protobufString = IOUtils.toString(fin, StandardCharsets.UTF_8); + Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new ProtobufSchema(protobufString)); + byte[] bytes = getTestEvent().toByteArray(); + ByteBuffer bb = ByteBuffer.allocate(bytes.length + 6).put((byte) 0).putInt(1234).put((bytes), 5, 10); + // When + new SchemaRegistryBasedProtobufBytesDecoder(registry).parse(bb); + } + + @Test(expected = ParseException.class) + public void testParseWrongId() throws Exception + { + // Given + Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran")); + byte[] bytes = getTestEvent().toByteArray(); + ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put(bytes); + // When + new SchemaRegistryBasedProtobufBytesDecoder(registry).parse(bb); + } + + private ProtoTestEventWrapper.ProtoTestEvent getTestEvent() + { + DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); + ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder() + .setDescription("description") + .setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE) + .setId(4711L) + .setIsValid(true) + .setSomeOtherId(4712) + .setTimestamp(dateTime.toString()) + .setSomeFloatColumn(47.11F) + .setSomeIntColumn(815) + .setSomeLongColumn(816L) + .build(); + return event; + } +} diff --git a/licenses.yaml b/licenses.yaml index 219256e4fc1e..cbe50261f742 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -3391,6 +3391,27 @@ libraries: --- +name: Kafka Schema Registry Client 6.0.1 +version: 6.0.1 +license_category: binary +module: extensions/druid-protobuf-extensions +license_name: Apache License version 2.0 +libraries: + - io.confluent: kafka-schema-registry-client + - io.confluent: kafka-protobuf-provider + +--- + +name: Kafka Schema Registry Protobuf Provider +version: 6.0.1 +license_category: binary +module: extensions/druid-protobuf-extensions +license_name: Confluent Community License +libraries: + - io.confluent: kafka-protobuf-provider + +--- + name: Apache Velocity Engine version: 2.2 license_category: binary @@ -3587,7 +3608,7 @@ version: 1.11.884 libraries: - com.amazonaws: aws-java-sdk-kinesis - com.amazonaws: aws-java-sdk-sts - - com.amazonaws: jmespath-java + - com.amazonaws: jmespath-java --- From b73ed9b94916e8280ba6e48b1ccf0e1c62c444d9 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Mon, 1 Feb 2021 14:04:02 +0800 Subject: [PATCH 02/12] change licese --- licenses.yaml | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/licenses.yaml b/licenses.yaml index cbe50261f742..3dd11280e9cd 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -3402,16 +3402,6 @@ libraries: --- -name: Kafka Schema Registry Protobuf Provider -version: 6.0.1 -license_category: binary -module: extensions/druid-protobuf-extensions -license_name: Confluent Community License -libraries: - - io.confluent: kafka-protobuf-provider - ---- - name: Apache Velocity Engine version: 2.2 license_category: binary From 05d97261a9cc5419569018c36f2a6553b2ee21d5 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Mon, 1 Feb 2021 16:57:06 +0800 Subject: [PATCH 03/12] delete some annotation --- .../input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java index b0f5d3afbf4b..ca80bb12c41c 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java @@ -52,7 +52,6 @@ public SchemaRegistryBasedProtobufBytesDecoder( registry = new CachedSchemaRegistryClient(Collections.singletonList(url), identityMapCapacity, Collections.singletonList(new ProtobufSchemaProvider()), null); } - //For UT only @VisibleForTesting SchemaRegistryBasedProtobufBytesDecoder(SchemaRegistryClient registry) { From 3683af428812b60704f11c9cb6243cc6b53a338b Mon Sep 17 00:00:00 2001 From: yuanyi Date: Tue, 2 Feb 2021 12:35:27 +0800 Subject: [PATCH 04/12] nodify tests --- ...hemaRegistryBasedProtobufBytesDecoder.java | 31 +++++-------------- ...RegistryBasedProtobufBytesDecoderTest.java | 23 ++++++++++++-- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java index ca80bb12c41c..5a7d4d0fcafc 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java @@ -33,7 +33,6 @@ import java.nio.ByteBuffer; import java.util.Collections; -import java.util.Objects; public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDecoder { @@ -41,6 +40,7 @@ public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDec private static final Logger LOGGER = new Logger(SchemaRegistryBasedProtobufBytesDecoder.class); private final SchemaRegistryClient registry; + private int identityMapCapacity; @JsonCreator public SchemaRegistryBasedProtobufBytesDecoder( @@ -48,10 +48,16 @@ public SchemaRegistryBasedProtobufBytesDecoder( @JsonProperty("capacity") Integer capacity ) { - int identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity; + this.identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity; registry = new CachedSchemaRegistryClient(Collections.singletonList(url), identityMapCapacity, Collections.singletonList(new ProtobufSchemaProvider()), null); } + @VisibleForTesting + int getIdentityMapCapacity() + { + return this.identityMapCapacity; + } + @VisibleForTesting SchemaRegistryBasedProtobufBytesDecoder(SchemaRegistryClient registry) { @@ -78,25 +84,4 @@ public DynamicMessage parse(ByteBuffer bytes) throw new ParseException(e, "Fail to decode protobuf message!"); } } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - SchemaRegistryBasedProtobufBytesDecoder that = (SchemaRegistryBasedProtobufBytesDecoder) o; - - return Objects.equals(registry, that.registry); - } - - @Override - public int hashCode() - { - return registry != null ? registry.hashCode() : 0; - } } diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java index eb95fa54ce79..bcc6abe6fd49 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java @@ -53,7 +53,7 @@ public void testParse() throws Exception { // Given InputStream fin; - fin = this.getClass().getClassLoader().getResourceAsStream("prototest.proto"); + fin = this.getClass().getClassLoader().getResourceAsStream("ProtoTest.proto"); String protobufString = IOUtils.toString(fin, StandardCharsets.UTF_8); Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new ProtobufSchema(protobufString)); ProtoTestEventWrapper.ProtoTestEvent event = getTestEvent(); @@ -71,7 +71,7 @@ public void testParseCorrupted() throws Exception { // Given InputStream fin; - fin = this.getClass().getClassLoader().getResourceAsStream("prototest.proto"); + fin = this.getClass().getClassLoader().getResourceAsStream("ProtoTest.proto"); String protobufString = IOUtils.toString(fin, StandardCharsets.UTF_8); Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new ProtobufSchema(protobufString)); byte[] bytes = getTestEvent().toByteArray(); @@ -91,6 +91,25 @@ public void testParseWrongId() throws Exception new SchemaRegistryBasedProtobufBytesDecoder(registry).parse(bb); } + @Test + public void testDefaultCapacity() throws Exception + { + // Given + SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", null); + // When + Assert.assertEquals(schemaRegistryBasedProtobufBytesDecoder.getIdentityMapCapacity(), Integer.MAX_VALUE); + } + + @Test + public void testGivenCapacity() throws Exception + { + int capacity = 100; + // Given + SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", capacity); + // When + Assert.assertEquals(schemaRegistryBasedProtobufBytesDecoder.getIdentityMapCapacity(), capacity); + } + private ProtoTestEventWrapper.ProtoTestEvent getTestEvent() { DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); From 261e8f4ae27a5c62a00632543f7b34551545ccea Mon Sep 17 00:00:00 2001 From: yuanyi Date: Tue, 2 Feb 2021 17:22:43 +0800 Subject: [PATCH 05/12] delete extra exception --- .../protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java index bcc6abe6fd49..cac8934e9faf 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java @@ -92,7 +92,7 @@ public void testParseWrongId() throws Exception } @Test - public void testDefaultCapacity() throws Exception + public void testDefaultCapacity() { // Given SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", null); @@ -101,7 +101,7 @@ public void testDefaultCapacity() throws Exception } @Test - public void testGivenCapacity() throws Exception + public void testGivenCapacity() { int capacity = 100; // Given From 0750b357144fee00c7060f661a7b9a51b3d8ced2 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Wed, 3 Feb 2021 12:00:47 +0800 Subject: [PATCH 06/12] add licenses --- extensions-core/protobuf-extensions/pom.xml | 34 +++++++++++++++++++++ licenses.yaml | 31 +++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/extensions-core/protobuf-extensions/pom.xml b/extensions-core/protobuf-extensions/pom.xml index f122046a8320..c761bf6b5b4d 100644 --- a/extensions-core/protobuf-extensions/pom.xml +++ b/extensions-core/protobuf-extensions/pom.xml @@ -72,6 +72,16 @@ io.confluent kafka-schema-registry-client ${confluent.version} + + + jersey-common + org.glassfish.jersey.core + + + jakarta.ws.rs-api + jakarta.ws.rs + + io.confluent @@ -82,6 +92,30 @@ protobuf-java-util com.google.protobuf + + okio + com.squareup.okio + + + kotlin-stdlib-common + org.jetbrains.kotlin + + + wire-runtime + com.squareup.wire + + + kotlin-stdlib-jdk7 + org.jetbrains.kotlin + + + kotlin-stdlib-jdk8 + org.jetbrains.kotlin + + + annotations + org.jetbrains + diff --git a/licenses.yaml b/licenses.yaml index 3dd11280e9cd..4ba5535f780a 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -3399,6 +3399,37 @@ license_name: Apache License version 2.0 libraries: - io.confluent: kafka-schema-registry-client - io.confluent: kafka-protobuf-provider + - io.confluent: common-utils + +--- + +name: Squareup Wire +version: 3.2.2 +license_category: binary +module: extensions/druid-protobuf-extensions +license_name: Apache License version 2.0 +libraries: + - com.squareup.wire: wire-schema + +--- + +name: Confluent Kafka Client +version: 6.0.1-ccs +license_category: binary +module: extensions/druid-protobuf-extensions +license_name: Apache License version 2.0 +libraries: + - org.apache.kafka: kafka-clients + +--- + +name: Kotlin +version: 1.4.0 +license_category: binary +module: extensions/druid-protobuf-extensions +license_name: Apache License version 2.0 +libraries: + - org.jetbrains.kotlin: kotlin-stdlib --- From f10d87382dcbff382ec4d8475d3488c883e3439d Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sun, 7 Feb 2021 11:21:06 +0800 Subject: [PATCH 07/12] add descriptor and protoMessageType in ProtobufInputRowParser for adopt to old version --- .../benchmark/ProtobufParserBenchmark.java | 4 +- .../FileBasedProtobufBytesDecoder.java | 1 - .../protobuf/ProtobufInputRowParser.java | 15 ++++- ...hemaRegistryBasedProtobufBytesDecoder.java | 17 ++++-- .../protobuf/ProtobufInputRowParserTest.java | 57 ++++++++++++++++++- ...RegistryBasedProtobufBytesDecoderTest.java | 4 +- 6 files changed, 83 insertions(+), 15 deletions(-) diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java index df580af3fe63..98ff204ae414 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java @@ -115,8 +115,8 @@ public void setup() protoFilePath = "ProtoFile"; protoInputs = getProtoInputs(protoFilePath); - nestedParser = new ProtobufInputRowParser(nestedParseSpec, decoder); - flatParser = new ProtobufInputRowParser(flatParseSpec, decoder); + nestedParser = new ProtobufInputRowParser(nestedParseSpec, decoder, null, null); + flatParser = new ProtobufInputRowParser(flatParseSpec, decoder, null, null); } @Benchmark diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java index 74c11bdbeafb..da4a04600080 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java @@ -62,7 +62,6 @@ void initDescriptor() } } - @SuppressWarnings("checkstyle:RightCurly") @Override public DynamicMessage parse(ByteBuffer bytes) { diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java index e4597d37a625..7624f239bf77 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java @@ -53,12 +53,21 @@ public class ProtobufInputRowParser implements ByteBufferInputRowParser @JsonCreator public ProtobufInputRowParser( @JsonProperty("parseSpec") ParseSpec parseSpec, - @JsonProperty("protoBytesDecoder") ProtobufBytesDecoder protobufBytesDecoder + @JsonProperty("protoBytesDecoder") ProtobufBytesDecoder protobufBytesDecoder, + @Deprecated + @JsonProperty("descriptor") String descriptorFilePath, + @Deprecated + @JsonProperty("protoMessageType") String protoMessageType ) { this.parseSpec = parseSpec; - this.protobufBytesDecoder = protobufBytesDecoder; this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames(); + + if (descriptorFilePath != null || protoMessageType != null) { + this.protobufBytesDecoder = new FileBasedProtobufBytesDecoder(descriptorFilePath, protoMessageType); + } else { + this.protobufBytesDecoder = protobufBytesDecoder; + } } @Override @@ -70,7 +79,7 @@ public ParseSpec getParseSpec() @Override public ProtobufInputRowParser withParseSpec(ParseSpec parseSpec) { - return new ProtobufInputRowParser(parseSpec, protobufBytesDecoder); + return new ProtobufInputRowParser(parseSpec, protobufBytesDecoder, null, null); } @Override diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java index 5a7d4d0fcafc..43f42d63b794 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java @@ -67,13 +67,20 @@ int getIdentityMapCapacity() @Override public DynamicMessage parse(ByteBuffer bytes) { + bytes.get(); // ignore first \0 byte + int id = bytes.getInt(); // extract schema registry id + bytes.get(); // ignore \0 byte before PB message + int length = bytes.limit() - 2 - 4; + Descriptors.Descriptor descriptor; try { - bytes.get(); // ignore first \0 byte - int id = bytes.getInt(); // extract schema registry id - bytes.get(); // ignore \0 byte before PB message - int length = bytes.limit() - 2 - 4; ProtobufSchema schema = (ProtobufSchema) registry.getSchemaById(id); - Descriptors.Descriptor descriptor = schema.toDescriptor(); + descriptor = schema.toDescriptor(); + } + catch (Exception e) { + LOGGER.error(e.getMessage()); + throw new ParseException(e, "Fail to get protobuf schema!"); + } + try { byte[] rawMessage = new byte[length]; bytes.get(rawMessage, 0, length); DynamicMessage message = DynamicMessage.parseFrom(descriptor, rawMessage); diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java index 0bb48a8b9bdc..03d4f874c7d2 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java @@ -97,7 +97,7 @@ public void setUp() public void testParseNestedData() throws Exception { //configure parser with desc file - ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, decoder); + ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, decoder, null, null); //create binary of proto test event DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); @@ -148,7 +148,7 @@ public void testParseNestedData() throws Exception public void testParseFlatData() throws Exception { //configure parser with desc file - ProtobufInputRowParser parser = new ProtobufInputRowParser(flatParseSpec, decoder); + ProtobufInputRowParser parser = new ProtobufInputRowParser(flatParseSpec, decoder, null, null); //create binary of proto test event DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); @@ -201,7 +201,7 @@ public void testDisableJavaScript() "func", new JavaScriptConfig(false) ); - final ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, decoder); + final ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, decoder, null, null); expectedException.expect(CoreMatchers.instanceOf(IllegalStateException.class)); expectedException.expectMessage("JavaScript is disabled"); @@ -210,6 +210,57 @@ public void testDisableJavaScript() parser.parseBatch(ByteBuffer.allocate(1)).get(0); } + @Test + public void testOldParserConfig() throws Exception + { + //configure parser with desc file + ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, null, "prototest.desc", "ProtoTestEvent"); + + //create binary of proto test event + DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); + ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder() + .setDescription("description") + .setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE) + .setId(4711L) + .setIsValid(true) + .setSomeOtherId(4712) + .setTimestamp(dateTime.toString()) + .setSomeFloatColumn(47.11F) + .setSomeIntColumn(815) + .setSomeLongColumn(816L) + .setFoo(ProtoTestEventWrapper.ProtoTestEvent.Foo + .newBuilder() + .setBar("baz")) + .addBar(ProtoTestEventWrapper.ProtoTestEvent.Foo + .newBuilder() + .setBar("bar0")) + .addBar(ProtoTestEventWrapper.ProtoTestEvent.Foo + .newBuilder() + .setBar("bar1")) + .build(); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + event.writeTo(out); + + InputRow row = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0); + + Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); + + assertDimensionEquals(row, "id", "4711"); + assertDimensionEquals(row, "isValid", "true"); + assertDimensionEquals(row, "someOtherId", "4712"); + assertDimensionEquals(row, "description", "description"); + + assertDimensionEquals(row, "eventType", ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE.name()); + assertDimensionEquals(row, "foobar", "baz"); + assertDimensionEquals(row, "bar0", "bar0"); + + + Assert.assertEquals(47.11F, row.getMetric("someFloatColumn").floatValue(), 0.0); + Assert.assertEquals(815.0F, row.getMetric("someIntColumn").floatValue(), 0.0); + Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0); + } + private void assertDimensionEquals(InputRow row, String dimension, Object expected) { List values = row.getDimension(dimension); diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java index cac8934e9faf..d2393b021747 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java @@ -76,6 +76,7 @@ public void testParseCorrupted() throws Exception Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new ProtobufSchema(protobufString)); byte[] bytes = getTestEvent().toByteArray(); ByteBuffer bb = ByteBuffer.allocate(bytes.length + 6).put((byte) 0).putInt(1234).put((bytes), 5, 10); + bb.rewind(); // When new SchemaRegistryBasedProtobufBytesDecoder(registry).parse(bb); } @@ -86,7 +87,8 @@ public void testParseWrongId() throws Exception // Given Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran")); byte[] bytes = getTestEvent().toByteArray(); - ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put(bytes); + ByteBuffer bb = ByteBuffer.allocate(bytes.length + 6).put((byte) 0).putInt(1234).put((byte) 0).put(bytes); + bb.rewind(); // When new SchemaRegistryBasedProtobufBytesDecoder(registry).parse(bb); } From 44f233f95e51bcd94036b19822245bb818bd60f9 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Tue, 23 Feb 2021 15:38:15 +0800 Subject: [PATCH 08/12] seperate kafka-protobuf-provider --- distribution/bin/check-licenses.py | 2 - docs/development/extensions-core/protobuf.md | 68 +++++++++++++++++-- extensions-core/protobuf-extensions/pom.xml | 30 ++------ .../FileBasedProtobufBytesDecoder.java | 2 +- ...hemaRegistryBasedProtobufBytesDecoder.java | 10 ++- licenses.yaml | 21 ------ 6 files changed, 77 insertions(+), 56 deletions(-) diff --git a/distribution/bin/check-licenses.py b/distribution/bin/check-licenses.py index 5ce68d19439d..1e150ba4f5eb 100755 --- a/distribution/bin/check-licenses.py +++ b/distribution/bin/check-licenses.py @@ -226,8 +226,6 @@ def build_compatible_license_names(): compatible_licenses['Apache License, version 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache 2.0 License'] = 'Apache License version 2.0' compatible_licenses['Apache License, 2.0'] = 'Apache License version 2.0' - compatible_licenses['Confluent Community License'] = 'Confluent Community License' - compatible_licenses['EPL 2.0'] = 'EPL 2.0' compatible_licenses['Public Domain'] = 'Public Domain' diff --git a/docs/development/extensions-core/protobuf.md b/docs/development/extensions-core/protobuf.md index 7515e5081117..89b93c2647fb 100644 --- a/docs/development/extensions-core/protobuf.md +++ b/docs/development/extensions-core/protobuf.md @@ -56,7 +56,7 @@ Here is a JSON example of the 'metrics' data schema used in the example. ### Proto file -The corresponding proto file for our 'metrics' dataset looks like this. +The corresponding proto file for our 'metrics' dataset looks like this. You can use protobuf parser with a proto file or Confluent schema registry. ``` syntax = "proto3"; @@ -72,7 +72,7 @@ message Metrics { } ``` -### Descriptor file +### When use descriptor file Next, we use the `protoc` Protobuf compiler to generate the descriptor file and save it as `metrics.desc`. The descriptor file must be either in the classpath or reachable by URL. In this example the descriptor file was saved at `/tmp/metrics.desc`, however this file is also available in the example files. From your Druid install directory: @@ -80,14 +80,34 @@ Next, we use the `protoc` Protobuf compiler to generate the descriptor file and protoc -o /tmp/metrics.desc ./quickstart/protobuf/metrics.proto ``` +### When use schema registry + +At first make sure your schema registry version is later than 5.5. Next, we post this schema to schema registry. + +``` +POST /subjects/test/versions HTTP/1.1 +Host: schemaregistry.example.com +Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json + +{ + "schemaType": "PROTOBUF", + "schema": "syntax = \"proto3\";\nmessage Metrics {\n string unit = 1;\n string http_method = 2;\n int32 value = 3;\n string timestamp = 4;\n string http_code = 5;\n string page = 6;\n string metricType = 7;\n string server = 8;\n}\n" +} +``` + +This feature uses Confluent's protobuf provider which is not included in the Druid distribution and must be installed separately. It can be fetched from Maven Central at: https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/6.0.1/kafka-protobuf-provider-6.0.1.jar. Copy or symlink this file to extensions/protobuf-extensions under the distribution root directory. + ## Create Kafka Supervisor Below is the complete Supervisor spec JSON to be submitted to the Overlord. Make sure these keys are properly configured for successful ingestion. +### When use descriptor file + Important supervisor properties -- `descriptor` for the descriptor file URL -- `protoMessageType` from the proto definition +- `protoBytesDecoder.descriptor` for the descriptor file URL +- `protoBytesDecoder.protoMessageType` from the proto definition +- `protoBytesDecoder.type` set to `file`, indicate use descriptor file to decode protobuf file - `parser` should have `type` set to `protobuf`, but note that the `format` of the `parseSpec` must be `json` ```json @@ -97,8 +117,11 @@ Important supervisor properties "dataSource": "metrics-protobuf", "parser": { "type": "protobuf", - "descriptor": "file:///tmp/metrics.desc", - "protoMessageType": "Metrics", + "protoBytesDecoder": { + "type": "file", + "descriptor": "file:///tmp/metrics.desc", + "protoMessageType": "Metrics" + }, "parseSpec": { "format": "json", "timestampSpec": { @@ -164,6 +187,39 @@ Important supervisor properties } ``` +To adopt to old version. You can use old parser style, which also works. + +```json +{ + "parser": { + "type": "protobuf", + "descriptor": "file:///tmp/metrics.desc", + "protoMessageType": "Metrics" + } +} +``` + +### When use schema registry + +Important supervisor properties +- `protoBytesDecoder.url` for the schema registry URL +- `protoBytesDecoder.capacity` capacity for schema registry cached schemas +- `protoBytesDecoder.type` set to `schema_registry`, indicate use schema registry to decode protobuf file +- `parser` should have `type` set to `protobuf`, but note that the `format` of the `parseSpec` must be `json` + +```json +{ + "parser": { + "type": "protobuf", + "protoBytesDecoder": { + "url": "http://schemaregistry.example.com:8081", + "type": "schema_registry", + "capacity": 100 + } + } +} +``` + ## Adding Protobuf messages to Kafka If necessary, from your Kafka installation directory run the following command to create the Kafka topic diff --git a/extensions-core/protobuf-extensions/pom.xml b/extensions-core/protobuf-extensions/pom.xml index c761bf6b5b4d..b28e477d4556 100644 --- a/extensions-core/protobuf-extensions/pom.xml +++ b/extensions-core/protobuf-extensions/pom.xml @@ -92,31 +92,13 @@ protobuf-java-util com.google.protobuf - - okio - com.squareup.okio - - - kotlin-stdlib-common - org.jetbrains.kotlin - - - wire-runtime - com.squareup.wire - - - kotlin-stdlib-jdk7 - org.jetbrains.kotlin - - - kotlin-stdlib-jdk8 - org.jetbrains.kotlin - - - annotations - org.jetbrains - + provided + + + com.github.os72 + protobuf-dynamic + 0.9.3 commons-io diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java index da4a04600080..35d5c4fb5d05 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java @@ -21,11 +21,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.github.os72.protobuf.dynamic.DynamicSchema; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; -import io.confluent.kafka.schemaregistry.protobuf.dynamic.DynamicSchema; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.ParseException; diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java index 43f42d63b794..f48904bee4f7 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java @@ -26,11 +26,13 @@ import com.google.protobuf.DynamicMessage; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.ParseException; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; @@ -76,9 +78,13 @@ public DynamicMessage parse(ByteBuffer bytes) ProtobufSchema schema = (ProtobufSchema) registry.getSchemaById(id); descriptor = schema.toDescriptor(); } - catch (Exception e) { + catch (RestClientException e) { + LOGGER.error(e.getMessage()); + throw new ParseException(e, "Fail to get protobuf schema because of can not connect to registry or failed http request!"); + } + catch (IOException e) { LOGGER.error(e.getMessage()); - throw new ParseException(e, "Fail to get protobuf schema!"); + throw new ParseException(e, "Fail to get protobuf schema because of invalid schema!"); } try { byte[] rawMessage = new byte[length]; diff --git a/licenses.yaml b/licenses.yaml index 4ba5535f780a..94ede12db404 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -3398,21 +3398,10 @@ module: extensions/druid-protobuf-extensions license_name: Apache License version 2.0 libraries: - io.confluent: kafka-schema-registry-client - - io.confluent: kafka-protobuf-provider - io.confluent: common-utils --- -name: Squareup Wire -version: 3.2.2 -license_category: binary -module: extensions/druid-protobuf-extensions -license_name: Apache License version 2.0 -libraries: - - com.squareup.wire: wire-schema - ---- - name: Confluent Kafka Client version: 6.0.1-ccs license_category: binary @@ -3423,16 +3412,6 @@ libraries: --- -name: Kotlin -version: 1.4.0 -license_category: binary -module: extensions/druid-protobuf-extensions -license_name: Apache License version 2.0 -libraries: - - org.jetbrains.kotlin: kotlin-stdlib - ---- - name: Apache Velocity Engine version: 2.2 license_category: binary From 4777b4b045bf7c5c5955eccb395f2eb5849d9d1a Mon Sep 17 00:00:00 2001 From: yuanyi Date: Tue, 23 Feb 2021 17:43:48 +0800 Subject: [PATCH 09/12] modify protobuf.md --- docs/development/extensions-core/protobuf.md | 13 +++++++++---- extensions-core/protobuf-extensions/pom.xml | 4 ---- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/docs/development/extensions-core/protobuf.md b/docs/development/extensions-core/protobuf.md index 89b93c2647fb..3a0840968b81 100644 --- a/docs/development/extensions-core/protobuf.md +++ b/docs/development/extensions-core/protobuf.md @@ -56,7 +56,7 @@ Here is a JSON example of the 'metrics' data schema used in the example. ### Proto file -The corresponding proto file for our 'metrics' dataset looks like this. You can use protobuf parser with a proto file or Confluent schema registry. +The corresponding proto file for our 'metrics' dataset looks like this. You can use Protobuf parser with a proto file or Confluent schema registry. ``` syntax = "proto3"; @@ -95,7 +95,12 @@ Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+j } ``` -This feature uses Confluent's protobuf provider which is not included in the Druid distribution and must be installed separately. It can be fetched from Maven Central at: https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/6.0.1/kafka-protobuf-provider-6.0.1.jar. Copy or symlink this file to extensions/protobuf-extensions under the distribution root directory. +This feature uses Confluent's Protobuf provider which is not included in the Druid distribution and must be installed separately. You can fetch it and its dependencies from Maven Central at: +- https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/6.0.1/kafka-protobuf-provider-6.0.1.jar +- https://repo1.maven.org/maven2/org/jetbrains/kotlin/kotlin-stdlib/1.4.0/kotlin-stdlib-1.4.0.jar +- https://repo1.maven.org/maven2/com/squareup/wire/wire-schema/3.2.2/wire-schema-3.2.2.jar + +Copy or symlink those files to `extensions/protobuf-extensions` under the distribution root directory. ## Create Kafka Supervisor @@ -107,7 +112,7 @@ Make sure these keys are properly configured for successful ingestion. Important supervisor properties - `protoBytesDecoder.descriptor` for the descriptor file URL - `protoBytesDecoder.protoMessageType` from the proto definition -- `protoBytesDecoder.type` set to `file`, indicate use descriptor file to decode protobuf file +- `protoBytesDecoder.type` set to `file`, indicate use descriptor file to decode Protobuf file - `parser` should have `type` set to `protobuf`, but note that the `format` of the `parseSpec` must be `json` ```json @@ -204,7 +209,7 @@ To adopt to old version. You can use old parser style, which also works. Important supervisor properties - `protoBytesDecoder.url` for the schema registry URL - `protoBytesDecoder.capacity` capacity for schema registry cached schemas -- `protoBytesDecoder.type` set to `schema_registry`, indicate use schema registry to decode protobuf file +- `protoBytesDecoder.type` set to `schema_registry`, indicate use schema registry to decode Protobuf file - `parser` should have `type` set to `protobuf`, but note that the `format` of the `parseSpec` must be `json` ```json diff --git a/extensions-core/protobuf-extensions/pom.xml b/extensions-core/protobuf-extensions/pom.xml index b28e477d4556..29bb4984487c 100644 --- a/extensions-core/protobuf-extensions/pom.xml +++ b/extensions-core/protobuf-extensions/pom.xml @@ -159,10 +159,6 @@ true - - com.google.protobuf - shaded.com.google.protobuf - From 8bd2bbe44361fde56a1debb0ec12424be67e69a9 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Thu, 4 Mar 2021 17:40:41 +0800 Subject: [PATCH 10/12] refine protobuf.md --- docs/development/extensions-core/protobuf.md | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/docs/development/extensions-core/protobuf.md b/docs/development/extensions-core/protobuf.md index 3a0840968b81..e3f16b609f32 100644 --- a/docs/development/extensions-core/protobuf.md +++ b/docs/development/extensions-core/protobuf.md @@ -56,8 +56,7 @@ Here is a JSON example of the 'metrics' data schema used in the example. ### Proto file -The corresponding proto file for our 'metrics' dataset looks like this. You can use Protobuf parser with a proto file or Confluent schema registry. - +The corresponding proto file for our 'metrics' dataset looks like this. You can use Protobuf parser with a proto file or [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html). ``` syntax = "proto3"; message Metrics { @@ -72,7 +71,7 @@ message Metrics { } ``` -### When use descriptor file +### When using a descriptor file Next, we use the `protoc` Protobuf compiler to generate the descriptor file and save it as `metrics.desc`. The descriptor file must be either in the classpath or reachable by URL. In this example the descriptor file was saved at `/tmp/metrics.desc`, however this file is also available in the example files. From your Druid install directory: @@ -80,9 +79,9 @@ Next, we use the `protoc` Protobuf compiler to generate the descriptor file and protoc -o /tmp/metrics.desc ./quickstart/protobuf/metrics.proto ``` -### When use schema registry +### When using Schema Registry -At first make sure your schema registry version is later than 5.5. Next, we post this schema to schema registry. +Make sure your Schema Registry version is later than 5.5. Next, we can post a schema to add it to the registry: ``` POST /subjects/test/versions HTTP/1.1 @@ -95,7 +94,7 @@ Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+j } ``` -This feature uses Confluent's Protobuf provider which is not included in the Druid distribution and must be installed separately. You can fetch it and its dependencies from Maven Central at: +This feature uses Confluent's Protobuf provider which is not included in the Druid distribution and must be installed separately. You can fetch it and its dependencies from the Confluent repository and Maven Central at: - https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/6.0.1/kafka-protobuf-provider-6.0.1.jar - https://repo1.maven.org/maven2/org/jetbrains/kotlin/kotlin-stdlib/1.4.0/kotlin-stdlib-1.4.0.jar - https://repo1.maven.org/maven2/com/squareup/wire/wire-schema/3.2.2/wire-schema-3.2.2.jar @@ -107,7 +106,7 @@ Copy or symlink those files to `extensions/protobuf-extensions` under the distri Below is the complete Supervisor spec JSON to be submitted to the Overlord. Make sure these keys are properly configured for successful ingestion. -### When use descriptor file +### When using a descriptor file Important supervisor properties - `protoBytesDecoder.descriptor` for the descriptor file URL @@ -204,7 +203,7 @@ To adopt to old version. You can use old parser style, which also works. } ``` -### When use schema registry +### When using Schema Registry Important supervisor properties - `protoBytesDecoder.url` for the schema registry URL From 7cc8165662fd7340eaef1e008032ee8201aad7da Mon Sep 17 00:00:00 2001 From: yuanyi Date: Mon, 8 Mar 2021 17:45:44 +0800 Subject: [PATCH 11/12] add config and header --- docs/development/extensions-core/protobuf.md | 32 ++++++++++--- ...hemaRegistryBasedProtobufBytesDecoder.java | 17 +++++-- ...RegistryBasedProtobufBytesDecoderTest.java | 48 ++++++++++++++++++- 3 files changed, 85 insertions(+), 12 deletions(-) diff --git a/docs/development/extensions-core/protobuf.md b/docs/development/extensions-core/protobuf.md index e3f16b609f32..dfea29f3b860 100644 --- a/docs/development/extensions-core/protobuf.md +++ b/docs/development/extensions-core/protobuf.md @@ -85,7 +85,7 @@ Make sure your Schema Registry version is later than 5.5. Next, we can post a sc ``` POST /subjects/test/versions HTTP/1.1 -Host: schemaregistry.example.com +Host: schemaregistry.example1.com Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json { @@ -206,19 +206,37 @@ To adopt to old version. You can use old parser style, which also works. ### When using Schema Registry Important supervisor properties -- `protoBytesDecoder.url` for the schema registry URL -- `protoBytesDecoder.capacity` capacity for schema registry cached schemas -- `protoBytesDecoder.type` set to `schema_registry`, indicate use schema registry to decode Protobuf file -- `parser` should have `type` set to `protobuf`, but note that the `format` of the `parseSpec` must be `json` +- `protoBytesDecoder.url` for the schema registry URL with single instance. +- `protoBytesDecoder.urls` for the schema registry URLs with multi instances. +- `protoBytesDecoder.capacity` capacity for schema registry cached schemas. +- `protoBytesDecoder.config` to send additional configurations, configured for Schema Registry. +- `protoBytesDecoder.headers` to send headers to the Schema Registry. +- `protoBytesDecoder.type` set to `schema_registry`, indicate use schema registry to decode Protobuf file. +- `parser` should have `type` set to `protobuf`, but note that the `format` of the `parseSpec` must be `json`. ```json { "parser": { "type": "protobuf", "protoBytesDecoder": { - "url": "http://schemaregistry.example.com:8081", + "urls": ["http://schemaregistry.example1.com:8081","http://schemaregistry.example2.com:8081"], "type": "schema_registry", - "capacity": 100 + "capacity": 100, + "config" : { + "basic.auth.credentials.source": "USER_INFO", + "basic.auth.user.info": "fred:letmein", + "schema.registry.ssl.truststore.location": "/some/secrets/kafka.client.truststore.jks", + "schema.registry.ssl.truststore.password": "", + "schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks", + "schema.registry.ssl.keystore.password": "", + "schema.registry.ssl.key.password": "", + ... + }, + "headers": { + "traceID" : "b29c5de2-0db4-490b-b421", + "timeStamp" : "1577191871865", + ... + } } } } diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java index f48904bee4f7..b1435dfc3320 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java @@ -32,9 +32,13 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.ParseException; +import javax.annotation.Nullable; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.List; +import java.util.Map; public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDecoder { @@ -46,12 +50,19 @@ public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDec @JsonCreator public SchemaRegistryBasedProtobufBytesDecoder( - @JsonProperty("url") String url, - @JsonProperty("capacity") Integer capacity + @JsonProperty("url") @Deprecated String url, + @JsonProperty("capacity") Integer capacity, + @JsonProperty("urls") @Nullable List urls, + @JsonProperty("config") @Nullable Map config, + @JsonProperty("headers") @Nullable Map headers ) { this.identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity; - registry = new CachedSchemaRegistryClient(Collections.singletonList(url), identityMapCapacity, Collections.singletonList(new ProtobufSchemaProvider()), null); + if (url != null && !url.isEmpty()) { + this.registry = new CachedSchemaRegistryClient(Collections.singletonList(url), identityMapCapacity, Collections.singletonList(new ProtobufSchemaProvider()), config, headers); + } else { + this.registry = new CachedSchemaRegistryClient(urls, identityMapCapacity, Collections.singletonList(new ProtobufSchemaProvider()), config, headers); + } } @VisibleForTesting diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java index d2393b021747..8f4c9219b5a7 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java @@ -19,6 +19,7 @@ package org.apache.druid.data.input.protobuf; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.DynamicMessage; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; @@ -97,7 +98,7 @@ public void testParseWrongId() throws Exception public void testDefaultCapacity() { // Given - SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", null); + SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", null, null, null, null); // When Assert.assertEquals(schemaRegistryBasedProtobufBytesDecoder.getIdentityMapCapacity(), Integer.MAX_VALUE); } @@ -107,7 +108,7 @@ public void testGivenCapacity() { int capacity = 100; // Given - SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", capacity); + SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", capacity, null, null, null); // When Assert.assertEquals(schemaRegistryBasedProtobufBytesDecoder.getIdentityMapCapacity(), capacity); } @@ -128,4 +129,47 @@ private ProtoTestEventWrapper.ProtoTestEvent getTestEvent() .build(); return event; } + + + @Test + public void testMultipleUrls() throws Exception + { + String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}"; + ObjectMapper mapper = new ObjectMapper(); + SchemaRegistryBasedProtobufBytesDecoder decoder; + decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper + .readerFor(ProtobufBytesDecoder.class) + .readValue(json); + + // Then + Assert.assertNotEquals(decoder.hashCode(), 0); + } + + @Test + public void testUrl() throws Exception + { + String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}"; + ObjectMapper mapper = new ObjectMapper(); + SchemaRegistryBasedProtobufBytesDecoder decoder; + decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper + .readerFor(ProtobufBytesDecoder.class) + .readValue(json); + + // Then + Assert.assertNotEquals(decoder.hashCode(), 0); + } + + @Test + public void testConfig() throws Exception + { + String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}"; + ObjectMapper mapper = new ObjectMapper(); + SchemaRegistryBasedProtobufBytesDecoder decoder; + decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper + .readerFor(ProtobufBytesDecoder.class) + .readValue(json); + + // Then + Assert.assertNotEquals(decoder.hashCode(), 0); + } } From a179b99c874b3c81d2aa8262f2f4d20dbe1726b6 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Tue, 9 Mar 2021 10:34:11 +0800 Subject: [PATCH 12/12] bug fixed --- extensions-core/protobuf-extensions/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/extensions-core/protobuf-extensions/pom.xml b/extensions-core/protobuf-extensions/pom.xml index 29bb4984487c..aebbec3224fa 100644 --- a/extensions-core/protobuf-extensions/pom.xml +++ b/extensions-core/protobuf-extensions/pom.xml @@ -131,6 +131,12 @@ jackson-databind provided + + com.google.code.findbugs + jsr305 + 2.0.1 + provided +