diff --git a/integration-tests/docker/Dockerfile b/integration-tests/docker/Dockerfile
index f56ff7663683..b3af96bfa51c 100644
--- a/integration-tests/docker/Dockerfile
+++ b/integration-tests/docker/Dockerfile
@@ -26,6 +26,7 @@ RUN APACHE_ARCHIVE_MIRROR_HOST=${APACHE_ARCHIVE_MIRROR_HOST} /root/base-setup.sh
FROM druidbase
ARG MYSQL_VERSION
+ARG CONFLUENT_VERSION
# Verify Java version
RUN java -version
@@ -46,6 +47,9 @@ ADD lib/* /usr/local/druid/lib/
RUN wget -q "https://repo1.maven.org/maven2/mysql/mysql-connector-java/$MYSQL_VERSION/mysql-connector-java-$MYSQL_VERSION.jar" \
-O /usr/local/druid/lib/mysql-connector-java.jar
+RUN wget -q "https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/$CONFLUENT_VERSION/kafka-protobuf-provider-$CONFLUENT_VERSION.jar" \
+ -O /usr/local/druid/lib/kafka-protobuf-provider.jar
+
# Add sample data
# touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
RUN find /var/lib/mysql -type f -exec touch {} \; && service mysql start \
diff --git a/integration-tests/docker/test-data/wikipedia.desc b/integration-tests/docker/test-data/wikipedia.desc
new file mode 100644
index 000000000000..d1a95dfee390
Binary files /dev/null and b/integration-tests/docker/test-data/wikipedia.desc differ
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 8f3f0886dd23..edabb270fb81 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -120,6 +120,12 @@
${project.parent.version}
runtime
+
+ org.apache.druid.extensions
+ druid-protobuf-extensions
+ ${project.parent.version}
+ runtime
+
org.apache.druid.extensions
druid-s3-extensions
@@ -366,6 +372,17 @@
+
+ io.confluent
+ kafka-protobuf-provider
+ 5.5.1
+ provided
+
+
+ com.google.protobuf
+ protobuf-java
+ 3.11.0
+
@@ -387,6 +404,12 @@
easymock
test
+
+ com.github.os72
+ protobuf-dynamic
+ 0.9.3
+ compile
+
@@ -470,6 +493,7 @@
${docker.run.skip}
${it.indexer}
${mysql.version}
+ 5.5.1
${apache.kafka.version}
${zk.version}
diff --git a/integration-tests/script/copy_resources.sh b/integration-tests/script/copy_resources.sh
index 6495dad44b1a..4dd1f0a00549 100755
--- a/integration-tests/script/copy_resources.sh
+++ b/integration-tests/script/copy_resources.sh
@@ -80,6 +80,7 @@ fi
mkdir -p $SHARED_DIR/wikiticker-it
cp ../examples/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz $SHARED_DIR/wikiticker-it/wikiticker-2015-09-12-sampled.json.gz
cp docker/wiki-simple-lookup.json $SHARED_DIR/wikiticker-it/wiki-simple-lookup.json
+cp docker/test-data/wikipedia.desc $SHARED_DIR/wikiticker-it/wikipedia.desc
# copy other files if needed
if [ -n "$DRUID_INTEGRATION_TEST_RESOURCE_FILE_DIR_PATH" ]
diff --git a/integration-tests/script/docker_build_containers.sh b/integration-tests/script/docker_build_containers.sh
index 6a3867af1355..0819e3a10f2d 100755
--- a/integration-tests/script/docker_build_containers.sh
+++ b/integration-tests/script/docker_build_containers.sh
@@ -22,17 +22,17 @@ set -e
if [ -z "$DRUID_INTEGRATION_TEST_JVM_RUNTIME" ]
then
echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is not set. Building druid-cluster with default Java version"
- docker build -t druid/cluster --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION $SHARED_DIR/docker
+ docker build -t druid/cluster --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION $SHARED_DIR/docker
else
echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is set with value ${DRUID_INTEGRATION_TEST_JVM_RUNTIME}"
case "${DRUID_INTEGRATION_TEST_JVM_RUNTIME}" in
8)
echo "Build druid-cluster with Java 8"
- docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
+ docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
;;
11)
echo "Build druid-cluster with Java 11"
- docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
+ docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
;;
*)
echo "Invalid JVM Runtime given. Stopping"
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java
index cad5acf79e68..d7de555a8c20 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java
@@ -43,7 +43,9 @@
@Type(name = CsvEventSerializer.TYPE, value = CsvEventSerializer.class),
@Type(name = DelimitedEventSerializer.TYPE, value = DelimitedEventSerializer.class),
@Type(name = AvroEventSerializer.TYPE, value = AvroEventSerializer.class),
- @Type(name = AvroSchemaRegistryEventSerializer.TYPE, value = AvroSchemaRegistryEventSerializer.class)
+ @Type(name = AvroSchemaRegistryEventSerializer.TYPE, value = AvroSchemaRegistryEventSerializer.class),
+ @Type(name = ProtobufEventSerializer.TYPE, value = ProtobufEventSerializer.class),
+ @Type(name = ProtobufSchemaRegistryEventSerializer.TYPE, value = ProtobufSchemaRegistryEventSerializer.class)
})
public interface EventSerializer extends Closeable
{
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java
new file mode 100644
index 000000000000..cc9b779efd1a
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.github.os72.protobuf.dynamic.DynamicSchema;
+import com.github.os72.protobuf.dynamic.MessageDefinition;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.util.List;
+
+public class ProtobufEventSerializer implements EventSerializer
+{
+ public static final String TYPE = "protobuf";
+
+ private static final Logger LOGGER = new Logger(ProtobufEventSerializer.class);
+
+ public static final DynamicSchema SCHEMA;
+
+ static {
+ DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder();
+ MessageDefinition wikiDef = MessageDefinition.newBuilder("Wikipedia")
+ .addField("optional", "string", "timestamp", 1)
+ .addField("optional", "string", "page", 2)
+ .addField("optional", "string", "language", 3)
+ .addField("optional", "string", "user", 4)
+ .addField("optional", "string", "unpatrolled", 5)
+ .addField("optional", "string", "newPage", 6)
+ .addField("optional", "string", "robot", 7)
+ .addField("optional", "string", "anonymous", 8)
+ .addField("optional", "string", "namespace", 9)
+ .addField("optional", "string", "continent", 10)
+ .addField("optional", "string", "country", 11)
+ .addField("optional", "string", "region", 12)
+ .addField("optional", "string", "city", 13)
+ .addField("optional", "int32", "added", 14)
+ .addField("optional", "int32", "deleted", 15)
+ .addField("optional", "int32", "delta", 16)
+ .build();
+ schemaBuilder.addMessageDefinition(wikiDef);
+ DynamicSchema schema = null;
+ try {
+ schema = schemaBuilder.build();
+ }
+ catch (Descriptors.DescriptorValidationException e) {
+ LOGGER.error("Could not init protobuf schema.");
+ }
+ SCHEMA = schema;
+ }
+
+ @Override
+ public byte[] serialize(List> event)
+ {
+ DynamicMessage.Builder builder = SCHEMA.newMessageBuilder("Wikipedia");
+ Descriptors.Descriptor msgDesc = builder.getDescriptorForType();
+ for (Pair pair : event) {
+ builder.setField(msgDesc.findFieldByName(pair.lhs), pair.rhs);
+ }
+ return builder.build().toByteArray();
+ }
+
+ @Override
+ public void close()
+ {
+ }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java
new file mode 100644
index 000000000000..8d80b22af35d
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testing.IntegrationTestingConfig;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class ProtobufSchemaRegistryEventSerializer extends ProtobufEventSerializer
+{
+ private static final int MAX_INITIALIZE_RETRIES = 10;
+ public static final String TYPE = "protobuf-schema-registry";
+
+ private final IntegrationTestingConfig config;
+ private final CachedSchemaRegistryClient client;
+ private int schemaId = -1;
+
+
+ @JsonCreator
+ public ProtobufSchemaRegistryEventSerializer(
+ @JacksonInject IntegrationTestingConfig config
+ )
+ {
+ this.config = config;
+ this.client = new CachedSchemaRegistryClient(
+ StringUtils.format("http://%s", config.getSchemaRegistryHost()),
+ Integer.MAX_VALUE,
+ ImmutableMap.of(
+ "basic.auth.credentials.source", "USER_INFO",
+ "basic.auth.user.info", "druid:diurd"
+ ),
+ ImmutableMap.of()
+ );
+
+ }
+
+ @Override
+ public void initialize(String topic)
+ {
+ try {
+ RetryUtils.retry(
+ () -> {
+ schemaId = client.register(topic, new ProtobufSchema(ProtobufEventSerializer.SCHEMA.newMessageBuilder("Wikipedia").getDescriptorForType()));
+ return 0;
+ },
+ (e) -> true,
+ MAX_INITIALIZE_RETRIES
+ );
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public byte[] serialize(List> event)
+ {
+ DynamicMessage.Builder builder = SCHEMA.newMessageBuilder("Wikipedia");
+ Descriptors.Descriptor msgDesc = builder.getDescriptorForType();
+ for (Pair pair : event) {
+ builder.setField(msgDesc.findFieldByName(pair.lhs), pair.rhs);
+ }
+ byte[] bytes = builder.build().toByteArray();
+ ByteBuffer bb = ByteBuffer.allocate(bytes.length + 6).put((byte) 0).putInt(schemaId).put((byte) 0).put(bytes);
+ bb.rewind();
+ return bb.array();
+ }
+}
diff --git a/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json
new file mode 100644
index 000000000000..17a91426e53c
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json
@@ -0,0 +1,12 @@
+{
+ "type": "protobuf",
+ "protoBytesDecoder": {
+ "type": "file",
+ "descriptor": "file:///shared/wikiticker-it/wikipedia.desc",
+ "protoMessageType": "Wikipedia"
+ },
+ "flattenSpec": {
+ "useFieldDiscovery": true
+ },
+ "binaryAsString": false
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json
new file mode 100644
index 000000000000..e55784de05b3
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json
@@ -0,0 +1,18 @@
+{
+ "type": "protobuf",
+ "protoBytesDecoder": {
+ "type": "file",
+ "descriptor": "file:///shared/wikiticker-it/wikipedia.desc",
+ "protoMessageType": "Wikipedia"
+ },
+ "parseSpec": {
+ "format": "json",
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "auto"
+ },
+ "dimensionsSpec": {
+ "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"]
+ }
+ }
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/protobuf/serializer/serializer.json b/integration-tests/src/test/resources/stream/data/protobuf/serializer/serializer.json
new file mode 100644
index 000000000000..12d8e576f091
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/protobuf/serializer/serializer.json
@@ -0,0 +1,3 @@
+{
+ "type": "protobuf"
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/input_format/input_format.json
new file mode 100644
index 000000000000..a0e599329e82
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/input_format/input_format.json
@@ -0,0 +1,15 @@
+{
+ "type": "protobuf",
+ "protoBytesDecoder": {
+ "type": "schema_registry",
+ "url": "%%SCHEMA_REGISTRY_HOST%%",
+ "config": {
+ "basic.auth.credentials.source": "USER_INFO",
+ "basic.auth.user.info": "druid:diurd"
+ }
+ },
+ "flattenSpec": {
+ "useFieldDiscovery": true
+ },
+ "binaryAsString": false
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/parser/input_row_parser.json
new file mode 100644
index 000000000000..2db694754f79
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/parser/input_row_parser.json
@@ -0,0 +1,21 @@
+{
+ "type": "protobuf",
+ "protoBytesDecoder" : {
+ "type": "schema_registry",
+ "url": "%%SCHEMA_REGISTRY_HOST%%",
+ "config": {
+ "basic.auth.credentials.source": "USER_INFO",
+ "basic.auth.user.info": "druid:diurd"
+ }
+ },
+ "parseSpec": {
+ "format": "json",
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "auto"
+ },
+ "dimensionsSpec": {
+ "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"]
+ }
+ }
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/serializer/serializer.json b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/serializer/serializer.json
new file mode 100644
index 000000000000..9f1ed6a274f8
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/serializer/serializer.json
@@ -0,0 +1,3 @@
+{
+ "type": "protobuf-schema-registry"
+}
\ No newline at end of file