diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java index b408076806cf..98ff204ae414 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java @@ -28,6 +28,7 @@ import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.data.input.protobuf.FileBasedProtobufBytesDecoder; import org.apache.druid.data.input.protobuf.ProtobufInputRowParser; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; @@ -73,6 +74,7 @@ public class ProtobufParserBenchmark private ProtobufInputRowParser flatParser; private byte[] protoInputs; private String protoFilePath; + private FileBasedProtobufBytesDecoder decoder; @Setup public void setup() @@ -109,11 +111,12 @@ public void setup() null, null ); + decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent"); protoFilePath = "ProtoFile"; protoInputs = getProtoInputs(protoFilePath); - nestedParser = new ProtobufInputRowParser(nestedParseSpec, "prototest.desc", "ProtoTestEvent"); - flatParser = new ProtobufInputRowParser(flatParseSpec, "prototest.desc", "ProtoTestEvent"); + nestedParser = new ProtobufInputRowParser(nestedParseSpec, decoder, null, null); + flatParser = new ProtobufInputRowParser(flatParseSpec, decoder, null, null); } @Benchmark diff --git a/distribution/bin/check-licenses.py b/distribution/bin/check-licenses.py index 0d3f83b0255d..1e150ba4f5eb 100755 --- a/distribution/bin/check-licenses.py +++ b/distribution/bin/check-licenses.py @@ -225,6 +225,7 @@ def build_compatible_license_names(): compatible_licenses['Apache License v2.0'] = 'Apache License version 2.0' compatible_licenses['Apache License, version 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache 2.0 License'] = 'Apache License version 2.0' + compatible_licenses['Apache License, 2.0'] = 'Apache License version 2.0' compatible_licenses['Public Domain'] = 'Public Domain' diff --git a/docs/development/extensions-core/protobuf.md b/docs/development/extensions-core/protobuf.md index 7515e5081117..dfea29f3b860 100644 --- a/docs/development/extensions-core/protobuf.md +++ b/docs/development/extensions-core/protobuf.md @@ -56,8 +56,7 @@ Here is a JSON example of the 'metrics' data schema used in the example. ### Proto file -The corresponding proto file for our 'metrics' dataset looks like this. - +The corresponding proto file for our 'metrics' dataset looks like this. You can use Protobuf parser with a proto file or [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html). ``` syntax = "proto3"; message Metrics { @@ -72,7 +71,7 @@ message Metrics { } ``` -### Descriptor file +### When using a descriptor file Next, we use the `protoc` Protobuf compiler to generate the descriptor file and save it as `metrics.desc`. The descriptor file must be either in the classpath or reachable by URL. In this example the descriptor file was saved at `/tmp/metrics.desc`, however this file is also available in the example files. From your Druid install directory: @@ -80,14 +79,39 @@ Next, we use the `protoc` Protobuf compiler to generate the descriptor file and protoc -o /tmp/metrics.desc ./quickstart/protobuf/metrics.proto ``` +### When using Schema Registry + +Make sure your Schema Registry version is later than 5.5. Next, we can post a schema to add it to the registry: + +``` +POST /subjects/test/versions HTTP/1.1 +Host: schemaregistry.example1.com +Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json + +{ + "schemaType": "PROTOBUF", + "schema": "syntax = \"proto3\";\nmessage Metrics {\n string unit = 1;\n string http_method = 2;\n int32 value = 3;\n string timestamp = 4;\n string http_code = 5;\n string page = 6;\n string metricType = 7;\n string server = 8;\n}\n" +} +``` + +This feature uses Confluent's Protobuf provider which is not included in the Druid distribution and must be installed separately. You can fetch it and its dependencies from the Confluent repository and Maven Central at: +- https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/6.0.1/kafka-protobuf-provider-6.0.1.jar +- https://repo1.maven.org/maven2/org/jetbrains/kotlin/kotlin-stdlib/1.4.0/kotlin-stdlib-1.4.0.jar +- https://repo1.maven.org/maven2/com/squareup/wire/wire-schema/3.2.2/wire-schema-3.2.2.jar + +Copy or symlink those files to `extensions/protobuf-extensions` under the distribution root directory. + ## Create Kafka Supervisor Below is the complete Supervisor spec JSON to be submitted to the Overlord. Make sure these keys are properly configured for successful ingestion. +### When using a descriptor file + Important supervisor properties -- `descriptor` for the descriptor file URL -- `protoMessageType` from the proto definition +- `protoBytesDecoder.descriptor` for the descriptor file URL +- `protoBytesDecoder.protoMessageType` from the proto definition +- `protoBytesDecoder.type` set to `file`, indicate use descriptor file to decode Protobuf file - `parser` should have `type` set to `protobuf`, but note that the `format` of the `parseSpec` must be `json` ```json @@ -97,8 +121,11 @@ Important supervisor properties "dataSource": "metrics-protobuf", "parser": { "type": "protobuf", - "descriptor": "file:///tmp/metrics.desc", - "protoMessageType": "Metrics", + "protoBytesDecoder": { + "type": "file", + "descriptor": "file:///tmp/metrics.desc", + "protoMessageType": "Metrics" + }, "parseSpec": { "format": "json", "timestampSpec": { @@ -164,6 +191,57 @@ Important supervisor properties } ``` +To adopt to old version. You can use old parser style, which also works. + +```json +{ + "parser": { + "type": "protobuf", + "descriptor": "file:///tmp/metrics.desc", + "protoMessageType": "Metrics" + } +} +``` + +### When using Schema Registry + +Important supervisor properties +- `protoBytesDecoder.url` for the schema registry URL with single instance. +- `protoBytesDecoder.urls` for the schema registry URLs with multi instances. +- `protoBytesDecoder.capacity` capacity for schema registry cached schemas. +- `protoBytesDecoder.config` to send additional configurations, configured for Schema Registry. +- `protoBytesDecoder.headers` to send headers to the Schema Registry. +- `protoBytesDecoder.type` set to `schema_registry`, indicate use schema registry to decode Protobuf file. +- `parser` should have `type` set to `protobuf`, but note that the `format` of the `parseSpec` must be `json`. + +```json +{ + "parser": { + "type": "protobuf", + "protoBytesDecoder": { + "urls": ["http://schemaregistry.example1.com:8081","http://schemaregistry.example2.com:8081"], + "type": "schema_registry", + "capacity": 100, + "config" : { + "basic.auth.credentials.source": "USER_INFO", + "basic.auth.user.info": "fred:letmein", + "schema.registry.ssl.truststore.location": "/some/secrets/kafka.client.truststore.jks", + "schema.registry.ssl.truststore.password": "", + "schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks", + "schema.registry.ssl.keystore.password": "", + "schema.registry.ssl.key.password": "", + ... + }, + "headers": { + "traceID" : "b29c5de2-0db4-490b-b421", + "timeStamp" : "1577191871865", + ... + } + } + } +} +``` + ## Adding Protobuf messages to Kafka If necessary, from your Kafka installation directory run the following command to create the Kafka topic diff --git a/extensions-core/protobuf-extensions/pom.xml b/extensions-core/protobuf-extensions/pom.xml index 8dba4ce097a7..aebbec3224fa 100644 --- a/extensions-core/protobuf-extensions/pom.xml +++ b/extensions-core/protobuf-extensions/pom.xml @@ -33,6 +33,18 @@ ../../pom.xml + + 6.0.1 + 2.6 + + + + + confluent + https://packages.confluent.io/maven/ + + + org.apache.druid @@ -52,15 +64,48 @@ com.google.guava - guava + protobuf-java + + + + + io.confluent + kafka-schema-registry-client + ${confluent.version} + + + jersey-common + org.glassfish.jersey.core + + + jakarta.ws.rs-api + jakarta.ws.rs + + + + + io.confluent + kafka-protobuf-provider + ${confluent.version} + + + protobuf-java-util + com.google.protobuf + provided com.github.os72 protobuf-dynamic 0.9.3 + + commons-io + commons-io + ${commons-io.version} + provided + com.fasterxml.jackson.core jackson-annotations @@ -86,6 +131,12 @@ jackson-databind provided + + com.google.code.findbugs + jsr305 + 2.0.1 + provided + @@ -98,6 +149,11 @@ hamcrest-core test + + org.mockito + mockito-core + test + @@ -109,10 +165,6 @@ true - - com.google.protobuf - shaded.com.google.protobuf - diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java new file mode 100644 index 000000000000..35d5c4fb5d05 --- /dev/null +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.github.os72.protobuf.dynamic.DynamicSchema; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.ParseException; + +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.ByteBuffer; +import java.util.Set; + +public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder +{ + private final String descriptorFilePath; + private final String protoMessageType; + private Descriptors.Descriptor descriptor; + + + @JsonCreator + public FileBasedProtobufBytesDecoder( + @JsonProperty("descriptor") String descriptorFilePath, + @JsonProperty("protoMessageType") String protoMessageType + ) + { + this.descriptorFilePath = descriptorFilePath; + this.protoMessageType = protoMessageType; + initDescriptor(); + } + + @VisibleForTesting + void initDescriptor() + { + if (this.descriptor == null) { + this.descriptor = getDescriptor(descriptorFilePath); + } + } + + @Override + public DynamicMessage parse(ByteBuffer bytes) + { + try { + DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(bytes)); + return message; + } + catch (Exception e) { + throw new ParseException(e, "Fail to decode protobuf message!"); + } + } + + private Descriptors.Descriptor getDescriptor(String descriptorFilePath) + { + InputStream fin; + + fin = this.getClass().getClassLoader().getResourceAsStream(descriptorFilePath); + if (fin == null) { + URL url; + try { + url = new URL(descriptorFilePath); + } + catch (MalformedURLException e) { + throw new ParseException(e, "Descriptor not found in class path or malformed URL:" + descriptorFilePath); + } + try { + fin = url.openConnection().getInputStream(); + } + catch (IOException e) { + throw new ParseException(e, "Cannot read descriptor file: " + url); + } + } + DynamicSchema dynamicSchema; + try { + dynamicSchema = DynamicSchema.parseFrom(fin); + } + catch (Descriptors.DescriptorValidationException e) { + throw new ParseException(e, "Invalid descriptor file: " + descriptorFilePath); + } + catch (IOException e) { + throw new ParseException(e, "Cannot read descriptor file: " + descriptorFilePath); + } + + Set messageTypes = dynamicSchema.getMessageTypes(); + if (messageTypes.size() == 0) { + throw new ParseException("No message types found in the descriptor: " + descriptorFilePath); + } + + String messageType = protoMessageType == null ? (String) messageTypes.toArray()[0] : protoMessageType; + Descriptors.Descriptor desc = dynamicSchema.getMessageDescriptor(messageType); + if (desc == null) { + throw new ParseException( + StringUtils.format( + "Protobuf message type %s not found in the specified descriptor. Available messages types are %s", + protoMessageType, + messageTypes + ) + ); + } + return desc; + } +} diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufBytesDecoder.java new file mode 100644 index 000000000000..1defa705aec0 --- /dev/null +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufBytesDecoder.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.protobuf.DynamicMessage; + +import java.nio.ByteBuffer; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SchemaRegistryBasedProtobufBytesDecoder.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "file", value = FileBasedProtobufBytesDecoder.class), + @JsonSubTypes.Type(name = "schema_registry", value = SchemaRegistryBasedProtobufBytesDecoder.class) +}) +public interface ProtobufBytesDecoder +{ + DynamicMessage parse(ByteBuffer bytes); +} diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java index 9c04265e6a5f..7624f239bf77 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java @@ -21,14 +21,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.github.os72.protobuf.dynamic.DynamicSchema; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.google.protobuf.ByteString; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.DynamicMessage; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; @@ -37,40 +32,42 @@ import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.JSONParseSpec; import org.apache.druid.data.input.impl.ParseSpec; -import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.Parser; import org.apache.druid.utils.CollectionUtils; -import java.io.IOException; -import java.io.InputStream; -import java.net.MalformedURLException; -import java.net.URL; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; -import java.util.Set; public class ProtobufInputRowParser implements ByteBufferInputRowParser { + private static final Logger LOG = new Logger(ByteBufferInputRowParser.class); + private final ParseSpec parseSpec; - private final String descriptorFilePath; - private final String protoMessageType; - private Descriptor descriptor; + private final ProtobufBytesDecoder protobufBytesDecoder; private Parser parser; private final List dimensions; @JsonCreator public ProtobufInputRowParser( @JsonProperty("parseSpec") ParseSpec parseSpec, + @JsonProperty("protoBytesDecoder") ProtobufBytesDecoder protobufBytesDecoder, + @Deprecated @JsonProperty("descriptor") String descriptorFilePath, + @Deprecated @JsonProperty("protoMessageType") String protoMessageType ) { this.parseSpec = parseSpec; - this.descriptorFilePath = descriptorFilePath; - this.protoMessageType = protoMessageType; this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames(); + + if (descriptorFilePath != null || protoMessageType != null) { + this.protobufBytesDecoder = new FileBasedProtobufBytesDecoder(descriptorFilePath, protoMessageType); + } else { + this.protobufBytesDecoder = protobufBytesDecoder; + } } @Override @@ -82,39 +79,28 @@ public ParseSpec getParseSpec() @Override public ProtobufInputRowParser withParseSpec(ParseSpec parseSpec) { - return new ProtobufInputRowParser(parseSpec, descriptorFilePath, protoMessageType); - } - - @VisibleForTesting - void initDescriptor() - { - if (this.descriptor == null) { - this.descriptor = getDescriptor(descriptorFilePath); - } + return new ProtobufInputRowParser(parseSpec, protobufBytesDecoder, null, null); } @Override public List parseBatch(ByteBuffer input) { if (parser == null) { - // parser should be created when it is really used to avoid unnecessary initialization of the underlying - // parseSpec. parser = parseSpec.makeParser(); - initDescriptor(); } Map record; if (parseSpec instanceof JSONParseSpec && ((JSONParseSpec) parseSpec).getFlattenSpec().getFields().isEmpty()) { try { - DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input)); + DynamicMessage message = protobufBytesDecoder.parse(input); record = CollectionUtils.mapKeys(message.getAllFields(), k -> k.getJsonName()); } - catch (InvalidProtocolBufferException ex) { + catch (Exception ex) { throw new ParseException(ex, "Protobuf message could not be parsed"); } } else { try { - DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input)); + DynamicMessage message = protobufBytesDecoder.parse(input); String json = JsonFormat.printer().print(message); record = parser.parseToMap(json); } @@ -137,55 +123,4 @@ record = parser.parseToMap(json); record )); } - - private Descriptor getDescriptor(String descriptorFilePath) - { - InputStream fin; - - fin = this.getClass().getClassLoader().getResourceAsStream(descriptorFilePath); - if (fin == null) { - URL url; - try { - url = new URL(descriptorFilePath); - } - catch (MalformedURLException e) { - throw new ParseException(e, "Descriptor not found in class path or malformed URL:" + descriptorFilePath); - } - try { - fin = url.openConnection().getInputStream(); - } - catch (IOException e) { - throw new ParseException(e, "Cannot read descriptor file: " + url); - } - } - - DynamicSchema dynamicSchema; - try { - dynamicSchema = DynamicSchema.parseFrom(fin); - } - catch (Descriptors.DescriptorValidationException e) { - throw new ParseException(e, "Invalid descriptor file: " + descriptorFilePath); - } - catch (IOException e) { - throw new ParseException(e, "Cannot read descriptor file: " + descriptorFilePath); - } - - Set messageTypes = dynamicSchema.getMessageTypes(); - if (messageTypes.size() == 0) { - throw new ParseException("No message types found in the descriptor: " + descriptorFilePath); - } - - String messageType = protoMessageType == null ? (String) messageTypes.toArray()[0] : protoMessageType; - Descriptor desc = dynamicSchema.getMessageDescriptor(messageType); - if (desc == null) { - throw new ParseException( - StringUtils.format( - "Protobuf message type %s not found in the specified descriptor. Available messages types are %s", - protoMessageType, - messageTypes - ) - ); - } - return desc; - } } diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java new file mode 100644 index 000000000000..b1435dfc3320 --- /dev/null +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.common.parsers.ParseException; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDecoder +{ + + private static final Logger LOGGER = new Logger(SchemaRegistryBasedProtobufBytesDecoder.class); + + private final SchemaRegistryClient registry; + private int identityMapCapacity; + + @JsonCreator + public SchemaRegistryBasedProtobufBytesDecoder( + @JsonProperty("url") @Deprecated String url, + @JsonProperty("capacity") Integer capacity, + @JsonProperty("urls") @Nullable List urls, + @JsonProperty("config") @Nullable Map config, + @JsonProperty("headers") @Nullable Map headers + ) + { + this.identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity; + if (url != null && !url.isEmpty()) { + this.registry = new CachedSchemaRegistryClient(Collections.singletonList(url), identityMapCapacity, Collections.singletonList(new ProtobufSchemaProvider()), config, headers); + } else { + this.registry = new CachedSchemaRegistryClient(urls, identityMapCapacity, Collections.singletonList(new ProtobufSchemaProvider()), config, headers); + } + } + + @VisibleForTesting + int getIdentityMapCapacity() + { + return this.identityMapCapacity; + } + + @VisibleForTesting + SchemaRegistryBasedProtobufBytesDecoder(SchemaRegistryClient registry) + { + this.registry = registry; + } + + @Override + public DynamicMessage parse(ByteBuffer bytes) + { + bytes.get(); // ignore first \0 byte + int id = bytes.getInt(); // extract schema registry id + bytes.get(); // ignore \0 byte before PB message + int length = bytes.limit() - 2 - 4; + Descriptors.Descriptor descriptor; + try { + ProtobufSchema schema = (ProtobufSchema) registry.getSchemaById(id); + descriptor = schema.toDescriptor(); + } + catch (RestClientException e) { + LOGGER.error(e.getMessage()); + throw new ParseException(e, "Fail to get protobuf schema because of can not connect to registry or failed http request!"); + } + catch (IOException e) { + LOGGER.error(e.getMessage()); + throw new ParseException(e, "Fail to get protobuf schema because of invalid schema!"); + } + try { + byte[] rawMessage = new byte[length]; + bytes.get(rawMessage, 0, length); + DynamicMessage message = DynamicMessage.parseFrom(descriptor, rawMessage); + return message; + } + catch (Exception e) { + LOGGER.error(e.getMessage()); + throw new ParseException(e, "Fail to decode protobuf message!"); + } + } +} diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoderTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoderTest.java new file mode 100644 index 000000000000..18db5f12a37b --- /dev/null +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoderTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import org.apache.druid.java.util.common.parsers.ParseException; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class FileBasedProtobufBytesDecoderTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testShortMessageType() + { + //configure parser with desc file, and specify which file name to use + @SuppressWarnings("unused") // expected to create parser without exception + FileBasedProtobufBytesDecoder decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent"); + decoder.initDescriptor(); + } + + @Test + public void testLongMessageType() + { + //configure parser with desc file, and specify which file name to use + @SuppressWarnings("unused") // expected to create parser without exception + FileBasedProtobufBytesDecoder decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "prototest.ProtoTestEvent"); + decoder.initDescriptor(); + } + + @Test(expected = ParseException.class) + public void testBadProto() + { + //configure parser with desc file + @SuppressWarnings("unused") // expected exception + FileBasedProtobufBytesDecoder decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "BadName"); + decoder.initDescriptor(); + } + + @Test(expected = ParseException.class) + public void testMalformedDescriptorUrl() + { + //configure parser with non existent desc file + @SuppressWarnings("unused") // expected exception + FileBasedProtobufBytesDecoder decoder = new FileBasedProtobufBytesDecoder("file:/nonexist.desc", "BadName"); + decoder.initDescriptor(); + } + + @Test + public void testSingleDescriptorNoMessageType() + { + // For the backward compatibility, protoMessageType allows null when the desc file has only one message type. + @SuppressWarnings("unused") // expected to create parser without exception + FileBasedProtobufBytesDecoder decoder = new FileBasedProtobufBytesDecoder("prototest.desc", null); + decoder.initDescriptor(); + } +} diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java index 3171cc577d5f..03d4f874c7d2 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java @@ -31,7 +31,6 @@ import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; import org.apache.druid.java.util.common.parsers.JSONPathFieldType; import org.apache.druid.java.util.common.parsers.JSONPathSpec; -import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.js.JavaScriptConfig; import org.hamcrest.CoreMatchers; import org.joda.time.DateTime; @@ -53,6 +52,7 @@ public class ProtobufInputRowParserTest private ParseSpec parseSpec; private ParseSpec flatParseSpec; + private FileBasedProtobufBytesDecoder decoder; @Before public void setUp() @@ -90,60 +90,14 @@ public void setUp() null, null ); - } - - @Test - public void testShortMessageType() - { - //configure parser with desc file, and specify which file name to use - @SuppressWarnings("unused") // expected to create parser without exception - ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "ProtoTestEvent"); - parser.initDescriptor(); - } - - - @Test - public void testLongMessageType() - { - //configure parser with desc file, and specify which file name to use - @SuppressWarnings("unused") // expected to create parser without exception - ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "prototest.ProtoTestEvent"); - parser.initDescriptor(); - } - - - @Test(expected = ParseException.class) - public void testBadProto() - { - //configure parser with desc file - @SuppressWarnings("unused") // expected exception - ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "BadName"); - parser.initDescriptor(); - } - - @Test(expected = ParseException.class) - public void testMalformedDescriptorUrl() - { - //configure parser with non existent desc file - @SuppressWarnings("unused") // expected exception - ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "file:/nonexist.desc", "BadName"); - parser.initDescriptor(); - } - - @Test - public void testSingleDescriptorNoMessageType() - { - // For the backward compatibility, protoMessageType allows null when the desc file has only one message type. - @SuppressWarnings("unused") // expected to create parser without exception - ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", null); - parser.initDescriptor(); + decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent"); } @Test public void testParseNestedData() throws Exception { //configure parser with desc file - ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "ProtoTestEvent"); + ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, decoder, null, null); //create binary of proto test event DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); @@ -194,7 +148,7 @@ public void testParseNestedData() throws Exception public void testParseFlatData() throws Exception { //configure parser with desc file - ProtobufInputRowParser parser = new ProtobufInputRowParser(flatParseSpec, "prototest.desc", "ProtoTestEvent"); + ProtobufInputRowParser parser = new ProtobufInputRowParser(flatParseSpec, decoder, null, null); //create binary of proto test event DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); @@ -247,7 +201,7 @@ public void testDisableJavaScript() "func", new JavaScriptConfig(false) ); - final ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "ProtoTestEvent"); + final ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, decoder, null, null); expectedException.expect(CoreMatchers.instanceOf(IllegalStateException.class)); expectedException.expectMessage("JavaScript is disabled"); @@ -256,6 +210,57 @@ public void testDisableJavaScript() parser.parseBatch(ByteBuffer.allocate(1)).get(0); } + @Test + public void testOldParserConfig() throws Exception + { + //configure parser with desc file + ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, null, "prototest.desc", "ProtoTestEvent"); + + //create binary of proto test event + DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); + ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder() + .setDescription("description") + .setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE) + .setId(4711L) + .setIsValid(true) + .setSomeOtherId(4712) + .setTimestamp(dateTime.toString()) + .setSomeFloatColumn(47.11F) + .setSomeIntColumn(815) + .setSomeLongColumn(816L) + .setFoo(ProtoTestEventWrapper.ProtoTestEvent.Foo + .newBuilder() + .setBar("baz")) + .addBar(ProtoTestEventWrapper.ProtoTestEvent.Foo + .newBuilder() + .setBar("bar0")) + .addBar(ProtoTestEventWrapper.ProtoTestEvent.Foo + .newBuilder() + .setBar("bar1")) + .build(); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + event.writeTo(out); + + InputRow row = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0); + + Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); + + assertDimensionEquals(row, "id", "4711"); + assertDimensionEquals(row, "isValid", "true"); + assertDimensionEquals(row, "someOtherId", "4712"); + assertDimensionEquals(row, "description", "description"); + + assertDimensionEquals(row, "eventType", ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE.name()); + assertDimensionEquals(row, "foobar", "baz"); + assertDimensionEquals(row, "bar0", "bar0"); + + + Assert.assertEquals(47.11F, row.getMetric("someFloatColumn").floatValue(), 0.0); + Assert.assertEquals(815.0F, row.getMetric("someIntColumn").floatValue(), 0.0); + Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0); + } + private void assertDimensionEquals(InputRow row, String dimension, Object expected) { List values = row.getDimension(dimension); diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java new file mode 100644 index 000000000000..8f4c9219b5a7 --- /dev/null +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.DynamicMessage; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import org.apache.commons.io.IOUtils; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.joda.time.DateTime; +import org.joda.time.chrono.ISOChronology; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +public class SchemaRegistryBasedProtobufBytesDecoderTest +{ + private SchemaRegistryClient registry; + + @Before + public void setUp() + { + registry = Mockito.mock(CachedSchemaRegistryClient.class); + } + + @Test + public void testParse() throws Exception + { + // Given + InputStream fin; + fin = this.getClass().getClassLoader().getResourceAsStream("ProtoTest.proto"); + String protobufString = IOUtils.toString(fin, StandardCharsets.UTF_8); + Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new ProtobufSchema(protobufString)); + ProtoTestEventWrapper.ProtoTestEvent event = getTestEvent(); + byte[] bytes = event.toByteArray(); + ByteBuffer bb = ByteBuffer.allocate(bytes.length + 6).put((byte) 0).putInt(1234).put((byte) 0).put(bytes); + bb.rewind(); + // When + DynamicMessage actual = new SchemaRegistryBasedProtobufBytesDecoder(registry).parse(bb); + // Then + Assert.assertEquals(actual.getField(actual.getDescriptorForType().findFieldByName("id")), event.getId()); + } + + @Test(expected = ParseException.class) + public void testParseCorrupted() throws Exception + { + // Given + InputStream fin; + fin = this.getClass().getClassLoader().getResourceAsStream("ProtoTest.proto"); + String protobufString = IOUtils.toString(fin, StandardCharsets.UTF_8); + Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new ProtobufSchema(protobufString)); + byte[] bytes = getTestEvent().toByteArray(); + ByteBuffer bb = ByteBuffer.allocate(bytes.length + 6).put((byte) 0).putInt(1234).put((bytes), 5, 10); + bb.rewind(); + // When + new SchemaRegistryBasedProtobufBytesDecoder(registry).parse(bb); + } + + @Test(expected = ParseException.class) + public void testParseWrongId() throws Exception + { + // Given + Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran")); + byte[] bytes = getTestEvent().toByteArray(); + ByteBuffer bb = ByteBuffer.allocate(bytes.length + 6).put((byte) 0).putInt(1234).put((byte) 0).put(bytes); + bb.rewind(); + // When + new SchemaRegistryBasedProtobufBytesDecoder(registry).parse(bb); + } + + @Test + public void testDefaultCapacity() + { + // Given + SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", null, null, null, null); + // When + Assert.assertEquals(schemaRegistryBasedProtobufBytesDecoder.getIdentityMapCapacity(), Integer.MAX_VALUE); + } + + @Test + public void testGivenCapacity() + { + int capacity = 100; + // Given + SchemaRegistryBasedProtobufBytesDecoder schemaRegistryBasedProtobufBytesDecoder = new SchemaRegistryBasedProtobufBytesDecoder("http://test", capacity, null, null, null); + // When + Assert.assertEquals(schemaRegistryBasedProtobufBytesDecoder.getIdentityMapCapacity(), capacity); + } + + private ProtoTestEventWrapper.ProtoTestEvent getTestEvent() + { + DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); + ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder() + .setDescription("description") + .setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE) + .setId(4711L) + .setIsValid(true) + .setSomeOtherId(4712) + .setTimestamp(dateTime.toString()) + .setSomeFloatColumn(47.11F) + .setSomeIntColumn(815) + .setSomeLongColumn(816L) + .build(); + return event; + } + + + @Test + public void testMultipleUrls() throws Exception + { + String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}"; + ObjectMapper mapper = new ObjectMapper(); + SchemaRegistryBasedProtobufBytesDecoder decoder; + decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper + .readerFor(ProtobufBytesDecoder.class) + .readValue(json); + + // Then + Assert.assertNotEquals(decoder.hashCode(), 0); + } + + @Test + public void testUrl() throws Exception + { + String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}"; + ObjectMapper mapper = new ObjectMapper(); + SchemaRegistryBasedProtobufBytesDecoder decoder; + decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper + .readerFor(ProtobufBytesDecoder.class) + .readValue(json); + + // Then + Assert.assertNotEquals(decoder.hashCode(), 0); + } + + @Test + public void testConfig() throws Exception + { + String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}"; + ObjectMapper mapper = new ObjectMapper(); + SchemaRegistryBasedProtobufBytesDecoder decoder; + decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper + .readerFor(ProtobufBytesDecoder.class) + .readValue(json); + + // Then + Assert.assertNotEquals(decoder.hashCode(), 0); + } +} diff --git a/licenses.yaml b/licenses.yaml index 219256e4fc1e..94ede12db404 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -3391,6 +3391,27 @@ libraries: --- +name: Kafka Schema Registry Client 6.0.1 +version: 6.0.1 +license_category: binary +module: extensions/druid-protobuf-extensions +license_name: Apache License version 2.0 +libraries: + - io.confluent: kafka-schema-registry-client + - io.confluent: common-utils + +--- + +name: Confluent Kafka Client +version: 6.0.1-ccs +license_category: binary +module: extensions/druid-protobuf-extensions +license_name: Apache License version 2.0 +libraries: + - org.apache.kafka: kafka-clients + +--- + name: Apache Velocity Engine version: 2.2 license_category: binary @@ -3587,7 +3608,7 @@ version: 1.11.884 libraries: - com.amazonaws: aws-java-sdk-kinesis - com.amazonaws: aws-java-sdk-sts - - com.amazonaws: jmespath-java + - com.amazonaws: jmespath-java ---