Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.data.input.protobuf.FileBasedProtobufBytesDecoder;
import org.apache.druid.data.input.protobuf.ProtobufInputRowParser;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class ProtobufParserBenchmark
private ProtobufInputRowParser flatParser;
private byte[] protoInputs;
private String protoFilePath;
private FileBasedProtobufBytesDecoder decoder;

@Setup
public void setup()
Expand Down Expand Up @@ -109,11 +111,12 @@ public void setup()
null,
null
);
decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent");

protoFilePath = "ProtoFile";
protoInputs = getProtoInputs(protoFilePath);
nestedParser = new ProtobufInputRowParser(nestedParseSpec, "prototest.desc", "ProtoTestEvent");
flatParser = new ProtobufInputRowParser(flatParseSpec, "prototest.desc", "ProtoTestEvent");
nestedParser = new ProtobufInputRowParser(nestedParseSpec, decoder, null, null);
flatParser = new ProtobufInputRowParser(flatParseSpec, decoder, null, null);
}

@Benchmark
Expand Down
1 change: 1 addition & 0 deletions distribution/bin/check-licenses.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ def build_compatible_license_names():
compatible_licenses['Apache License v2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache License, version 2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache 2.0 License'] = 'Apache License version 2.0'
compatible_licenses['Apache License, 2.0'] = 'Apache License version 2.0'

compatible_licenses['Public Domain'] = 'Public Domain'

Expand Down
92 changes: 85 additions & 7 deletions docs/development/extensions-core/protobuf.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ Here is a JSON example of the 'metrics' data schema used in the example.

### Proto file

The corresponding proto file for our 'metrics' dataset looks like this.

The corresponding proto file for our 'metrics' dataset looks like this. You can use Protobuf parser with a proto file or [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html).
```
syntax = "proto3";
message Metrics {
Expand All @@ -72,22 +71,47 @@ message Metrics {
}
```

### Descriptor file
### When using a descriptor file

Next, we use the `protoc` Protobuf compiler to generate the descriptor file and save it as `metrics.desc`. The descriptor file must be either in the classpath or reachable by URL. In this example the descriptor file was saved at `/tmp/metrics.desc`, however this file is also available in the example files. From your Druid install directory:

```
protoc -o /tmp/metrics.desc ./quickstart/protobuf/metrics.proto
```

### When using Schema Registry

Make sure your Schema Registry version is later than 5.5. Next, we can post a schema to add it to the registry:

```
POST /subjects/test/versions HTTP/1.1
Host: schemaregistry.example1.com
Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json

{
"schemaType": "PROTOBUF",
"schema": "syntax = \"proto3\";\nmessage Metrics {\n string unit = 1;\n string http_method = 2;\n int32 value = 3;\n string timestamp = 4;\n string http_code = 5;\n string page = 6;\n string metricType = 7;\n string server = 8;\n}\n"
}
```

This feature uses Confluent's Protobuf provider which is not included in the Druid distribution and must be installed separately. You can fetch it and its dependencies from the Confluent repository and Maven Central at:
- https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/6.0.1/kafka-protobuf-provider-6.0.1.jar
- https://repo1.maven.org/maven2/org/jetbrains/kotlin/kotlin-stdlib/1.4.0/kotlin-stdlib-1.4.0.jar
- https://repo1.maven.org/maven2/com/squareup/wire/wire-schema/3.2.2/wire-schema-3.2.2.jar

Copy or symlink those files to `extensions/protobuf-extensions` under the distribution root directory.

## Create Kafka Supervisor

Below is the complete Supervisor spec JSON to be submitted to the Overlord.
Make sure these keys are properly configured for successful ingestion.

### When using a descriptor file

Important supervisor properties
- `descriptor` for the descriptor file URL
- `protoMessageType` from the proto definition
- `protoBytesDecoder.descriptor` for the descriptor file URL
- `protoBytesDecoder.protoMessageType` from the proto definition
- `protoBytesDecoder.type` set to `file`, indicate use descriptor file to decode Protobuf file
- `parser` should have `type` set to `protobuf`, but note that the `format` of the `parseSpec` must be `json`

```json
Expand All @@ -97,8 +121,11 @@ Important supervisor properties
"dataSource": "metrics-protobuf",
"parser": {
"type": "protobuf",
"descriptor": "file:///tmp/metrics.desc",
"protoMessageType": "Metrics",
"protoBytesDecoder": {
"type": "file",
"descriptor": "file:///tmp/metrics.desc",
"protoMessageType": "Metrics"
},
"parseSpec": {
"format": "json",
"timestampSpec": {
Expand Down Expand Up @@ -164,6 +191,57 @@ Important supervisor properties
}
```

To adopt to old version. You can use old parser style, which also works.

```json
{
"parser": {
"type": "protobuf",
"descriptor": "file:///tmp/metrics.desc",
"protoMessageType": "Metrics"
}
}
```

### When using Schema Registry

Important supervisor properties
- `protoBytesDecoder.url` for the schema registry URL with single instance.
- `protoBytesDecoder.urls` for the schema registry URLs with multi instances.
- `protoBytesDecoder.capacity` capacity for schema registry cached schemas.
- `protoBytesDecoder.config` to send additional configurations, configured for Schema Registry.
- `protoBytesDecoder.headers` to send headers to the Schema Registry.
- `protoBytesDecoder.type` set to `schema_registry`, indicate use schema registry to decode Protobuf file.
- `parser` should have `type` set to `protobuf`, but note that the `format` of the `parseSpec` must be `json`.

```json
{
"parser": {
"type": "protobuf",
"protoBytesDecoder": {
"urls": ["http://schemaregistry.example1.com:8081","http://schemaregistry.example2.com:8081"],
"type": "schema_registry",
"capacity": 100,
"config" : {
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "fred:letmein",
"schema.registry.ssl.truststore.location": "/some/secrets/kafka.client.truststore.jks",
"schema.registry.ssl.truststore.password": "<password>",
"schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks",
"schema.registry.ssl.keystore.password": "<password>",
"schema.registry.ssl.key.password": "<password>",
...
},
"headers": {
"traceID" : "b29c5de2-0db4-490b-b421",
"timeStamp" : "1577191871865",
...
}
}
}
}
```

## Adding Protobuf messages to Kafka

If necessary, from your Kafka installation directory run the following command to create the Kafka topic
Expand Down
62 changes: 57 additions & 5 deletions extensions-core/protobuf-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@
<relativePath>../../pom.xml</relativePath>
</parent>

<properties>
<confluent.version>6.0.1</confluent.version>
<commons-io.version>2.6</commons-io.version>
</properties>

<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.apache.druid</groupId>
Expand All @@ -52,15 +64,48 @@
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>${confluent.version}</version>
<exclusions>
<exclusion>
<artifactId>jersey-common</artifactId>
<groupId>org.glassfish.jersey.core</groupId>
</exclusion>
<exclusion>
<artifactId>jakarta.ws.rs-api</artifactId>
<groupId>jakarta.ws.rs</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-provider</artifactId>
<version>${confluent.version}</version>
<exclusions>
<exclusion>
<artifactId>protobuf-java-util</artifactId>
<groupId>com.google.protobuf</groupId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.os72</groupId>
<artifactId>protobuf-dynamic</artifactId>
<version>0.9.3</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand All @@ -86,6 +131,12 @@
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>2.0.1</version>
<scope>provided</scope>
</dependency>

<!-- test -->
<dependency>
Expand All @@ -98,6 +149,11 @@
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -109,10 +165,6 @@
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<relocations>
<relocation>
<pattern>com.google.protobuf</pattern>
<shadedPattern>shaded.com.google.protobuf</shadedPattern>
</relocation>
</relocations>
</configuration>
<executions>
Expand Down
Loading