Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions integration-tests/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ RUN APACHE_ARCHIVE_MIRROR_HOST=${APACHE_ARCHIVE_MIRROR_HOST} /root/base-setup.sh

FROM druidbase
ARG MYSQL_VERSION
ARG CONFLUENT_VERSION

# Verify Java version
RUN java -version
Expand All @@ -46,6 +47,9 @@ ADD lib/* /usr/local/druid/lib/
RUN wget -q "https://repo1.maven.org/maven2/mysql/mysql-connector-java/$MYSQL_VERSION/mysql-connector-java-$MYSQL_VERSION.jar" \
-O /usr/local/druid/lib/mysql-connector-java.jar

RUN wget -q "https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/$CONFLUENT_VERSION/kafka-protobuf-provider-$CONFLUENT_VERSION.jar" \
-O /usr/local/druid/lib/kafka-protobuf-provider.jar

Comment on lines +50 to +52
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should be able to avoid this by enabling the protobuf-extension which already includes this dependency

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't actually include the dependency, there are licensing issues and that jar isn't solely Apache licensed, but actually dual Apache and Confluent community license, so people need to fetch it themselves similar to MySQL, see #10839 (comment)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, I did not realize that. Maybe a comment would be worthwhile

# Add sample data
# touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
RUN find /var/lib/mysql -type f -exec touch {} \; && service mysql start \
Expand Down
Binary file added integration-tests/docker/test-data/wikipedia.desc
Binary file not shown.
24 changes: 24 additions & 0 deletions integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@
<version>${project.parent.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-protobuf-extensions</artifactId>
<version>${project.parent.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-s3-extensions</artifactId>
Expand Down Expand Up @@ -366,6 +372,17 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-provider</artifactId>
<version>5.5.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.11.0</version>
</dependency>
Comment on lines +375 to +385
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove all this by enabling the protobuf extension, no?


<!-- Tests -->
<dependency>
Expand All @@ -387,6 +404,12 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.os72</groupId>
<artifactId>protobuf-dynamic</artifactId>
<version>0.9.3</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -470,6 +493,7 @@
<DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER>${docker.run.skip}</DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER>
<DRUID_INTEGRATION_TEST_INDEXER>${it.indexer}</DRUID_INTEGRATION_TEST_INDEXER>
<MYSQL_VERSION>${mysql.version}</MYSQL_VERSION>
<CONFLUENT_VERSION>5.5.1</CONFLUENT_VERSION>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might make sense to pull this up all the way to the top level pom.xml, then here, the avro extension, and the protobuf extension all share it to keep the version synced instead of each defining their own. I do notice that protobuf is using a newer version than avro and here, so I would be ok if we want to save this for a follow-up PR since it wouldn't be directly related adding this integration test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, 5.5.1 can support protobuf and in avro extension, its version is 5.5.1 too. In protobuf extension, its version is 6.0.1. I think those two versions are both OK because there is no change for core function. So which version is better?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably best to align the Confluent version with the Kafka version we use since Confluent dependencies often require Kafka dependencies. We still need to exclude those Kafka dependencies, but keeping them close should avoid compatibility issues.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already define a confluent version here https://github.com/apache/druid/blob/master/extensions-core/protobuf-extensions/pom.xml#L37 which is newer, so we should at least align that

<KAFKA_VERSION>${apache.kafka.version}</KAFKA_VERSION>
<ZK_VERSION>${zk.version}</ZK_VERSION>
</environmentVariables>
Expand Down
1 change: 1 addition & 0 deletions integration-tests/script/copy_resources.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ fi
mkdir -p $SHARED_DIR/wikiticker-it
cp ../examples/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz $SHARED_DIR/wikiticker-it/wikiticker-2015-09-12-sampled.json.gz
cp docker/wiki-simple-lookup.json $SHARED_DIR/wikiticker-it/wiki-simple-lookup.json
cp docker/test-data/wikipedia.desc $SHARED_DIR/wikiticker-it/wikipedia.desc

# copy other files if needed
if [ -n "$DRUID_INTEGRATION_TEST_RESOURCE_FILE_DIR_PATH" ]
Expand Down
6 changes: 3 additions & 3 deletions integration-tests/script/docker_build_containers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ set -e
if [ -z "$DRUID_INTEGRATION_TEST_JVM_RUNTIME" ]
then
echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is not set. Building druid-cluster with default Java version"
docker build -t druid/cluster --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION $SHARED_DIR/docker
docker build -t druid/cluster --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION $SHARED_DIR/docker
else
echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is set with value ${DRUID_INTEGRATION_TEST_JVM_RUNTIME}"
case "${DRUID_INTEGRATION_TEST_JVM_RUNTIME}" in
8)
echo "Build druid-cluster with Java 8"
docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
;;
11)
echo "Build druid-cluster with Java 11"
docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
;;
*)
echo "Invalid JVM Runtime given. Stopping"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
@Type(name = CsvEventSerializer.TYPE, value = CsvEventSerializer.class),
@Type(name = DelimitedEventSerializer.TYPE, value = DelimitedEventSerializer.class),
@Type(name = AvroEventSerializer.TYPE, value = AvroEventSerializer.class),
@Type(name = AvroSchemaRegistryEventSerializer.TYPE, value = AvroSchemaRegistryEventSerializer.class)
@Type(name = AvroSchemaRegistryEventSerializer.TYPE, value = AvroSchemaRegistryEventSerializer.class),
@Type(name = ProtobufEventSerializer.TYPE, value = ProtobufEventSerializer.class),
@Type(name = ProtobufSchemaRegistryEventSerializer.TYPE, value = ProtobufSchemaRegistryEventSerializer.class)
})
public interface EventSerializer extends Closeable
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testing.utils;

import com.github.os72.protobuf.dynamic.DynamicSchema;
import com.github.os72.protobuf.dynamic.MessageDefinition;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;

import java.util.List;

public class ProtobufEventSerializer implements EventSerializer
{
public static final String TYPE = "protobuf";

private static final Logger LOGGER = new Logger(ProtobufEventSerializer.class);

public static final DynamicSchema SCHEMA;

static {
DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder();
MessageDefinition wikiDef = MessageDefinition.newBuilder("Wikipedia")
.addField("optional", "string", "timestamp", 1)
.addField("optional", "string", "page", 2)
.addField("optional", "string", "language", 3)
.addField("optional", "string", "user", 4)
.addField("optional", "string", "unpatrolled", 5)
.addField("optional", "string", "newPage", 6)
.addField("optional", "string", "robot", 7)
.addField("optional", "string", "anonymous", 8)
.addField("optional", "string", "namespace", 9)
.addField("optional", "string", "continent", 10)
.addField("optional", "string", "country", 11)
.addField("optional", "string", "region", 12)
.addField("optional", "string", "city", 13)
.addField("optional", "int32", "added", 14)
.addField("optional", "int32", "deleted", 15)
.addField("optional", "int32", "delta", 16)
.build();
schemaBuilder.addMessageDefinition(wikiDef);
DynamicSchema schema = null;
try {
schema = schemaBuilder.build();
}
catch (Descriptors.DescriptorValidationException e) {
LOGGER.error("Could not init protobuf schema.");
}
SCHEMA = schema;
}

@Override
public byte[] serialize(List<Pair<String, Object>> event)
{
DynamicMessage.Builder builder = SCHEMA.newMessageBuilder("Wikipedia");
Descriptors.Descriptor msgDesc = builder.getDescriptorForType();
for (Pair<String, Object> pair : event) {
builder.setField(msgDesc.findFieldByName(pair.lhs), pair.rhs);
}
return builder.build().toByteArray();
}

@Override
public void close()
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testing.utils;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.testing.IntegrationTestingConfig;

import java.nio.ByteBuffer;
import java.util.List;

public class ProtobufSchemaRegistryEventSerializer extends ProtobufEventSerializer
{
private static final int MAX_INITIALIZE_RETRIES = 10;
public static final String TYPE = "protobuf-schema-registry";

private final IntegrationTestingConfig config;
private final CachedSchemaRegistryClient client;
private int schemaId = -1;


@JsonCreator
public ProtobufSchemaRegistryEventSerializer(
@JacksonInject IntegrationTestingConfig config
)
{
this.config = config;
this.client = new CachedSchemaRegistryClient(
StringUtils.format("http://%s", config.getSchemaRegistryHost()),
Integer.MAX_VALUE,
ImmutableMap.of(
"basic.auth.credentials.source", "USER_INFO",
"basic.auth.user.info", "druid:diurd"
),
ImmutableMap.of()
);

}

@Override
public void initialize(String topic)
{
try {
RetryUtils.retry(
() -> {
schemaId = client.register(topic, new ProtobufSchema(ProtobufEventSerializer.SCHEMA.newMessageBuilder("Wikipedia").getDescriptorForType()));
return 0;
},
(e) -> true,
MAX_INITIALIZE_RETRIES
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public byte[] serialize(List<Pair<String, Object>> event)
{
DynamicMessage.Builder builder = SCHEMA.newMessageBuilder("Wikipedia");
Descriptors.Descriptor msgDesc = builder.getDescriptorForType();
for (Pair<String, Object> pair : event) {
builder.setField(msgDesc.findFieldByName(pair.lhs), pair.rhs);
}
byte[] bytes = builder.build().toByteArray();
ByteBuffer bb = ByteBuffer.allocate(bytes.length + 6).put((byte) 0).putInt(schemaId).put((byte) 0).put(bytes);
bb.rewind();
return bb.array();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"type": "protobuf",
"protoBytesDecoder": {
"type": "file",
"descriptor": "file:///shared/wikiticker-it/wikipedia.desc",
"protoMessageType": "Wikipedia"
},
"flattenSpec": {
"useFieldDiscovery": true
},
"binaryAsString": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"type": "protobuf",
"protoBytesDecoder": {
"type": "file",
"descriptor": "file:///shared/wikiticker-it/wikipedia.desc",
"protoMessageType": "Wikipedia"
},
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"type": "protobuf"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"type": "protobuf",
"protoBytesDecoder": {
"type": "schema_registry",
"url": "%%SCHEMA_REGISTRY_HOST%%",
"config": {
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "druid:diurd"
}
},
"flattenSpec": {
"useFieldDiscovery": true
},
"binaryAsString": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"type": "protobuf",
"protoBytesDecoder" : {
"type": "schema_registry",
"url": "%%SCHEMA_REGISTRY_HOST%%",
"config": {
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "druid:diurd"
}
},
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"type": "protobuf-schema-registry"
}