From c273a3f55e6915dcb8a8febeb8e079bd6872bf2c Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 17 Apr 2021 13:27:53 +0800 Subject: [PATCH 01/31] add file test --- integration-tests/pom.xml | 6 ++ .../testing/ProtobufEventSerializer.java | 86 ++++++++++++++++++ .../druid/testing/utils/EventSerializer.java | 4 +- .../protobuf/parser/input_row_parser.json | 17 ++++ .../stream/data/protobuf/parser/test.desc | Bin 0 -> 727 bytes .../data/protobuf/serializer/serializer.json | 3 + 6 files changed, 115 insertions(+), 1 deletion(-) create mode 100644 integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java create mode 100644 integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json create mode 100644 integration-tests/src/test/resources/stream/data/protobuf/parser/test.desc create mode 100644 integration-tests/src/test/resources/stream/data/protobuf/serializer/serializer.json diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 2fb9adc239e9..e3d4767e1e0e 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -383,6 +383,12 @@ easymock test + + com.github.os72 + protobuf-dynamic + 0.9.3 + compile + diff --git a/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java new file mode 100644 index 000000000000..8765d85b283d --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing; + +import com.github.os72.protobuf.dynamic.DynamicSchema; +import com.github.os72.protobuf.dynamic.MessageDefinition; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.testing.utils.EventSerializer; + +import java.io.IOException; +import java.util.List; + +public class ProtobufEventSerializer implements EventSerializer +{ + public static final String TYPE = "protobuf"; + + private static final Logger LOGGER = new Logger(ProtobufEventSerializer.class); + + public static final DynamicMessage.Builder MSG_BUILDER; + + static { + DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder(); + MessageDefinition wikiDef = MessageDefinition.newBuilder("Wikipedia") + .addField("optional", "string", "timestamp", 1) + .addField("optional", "string", "page", 2) + .addField("optional", "string", "language", 3) + .addField("optional", "string", "user", 4) + .addField("optional", "string", "unpatrolled", 5) + .addField("optional", "string", "newPage", 6) + .addField("optional", "string", "robot", 7) + .addField("optional", "string", "anonymous", 8) + .addField("optional", "string", "namespace", 9) + .addField("optional", "string", "continent", 10) + .addField("optional", "string", "country", 11) + .addField("optional", "string", "region", 12) + .addField("optional", "string", "city", 13) + .addField("optional", "string", "added", 14) + .addField("optional", "string", "deleted", 15) + .addField("optional", "string", "delta", 16) + .build(); + schemaBuilder.addMessageDefinition(wikiDef); + DynamicSchema schema = null; + try { + schema = schemaBuilder.build(); + } + catch (Descriptors.DescriptorValidationException e) { + LOGGER.error("Could not init protobuf builder."); + } + MSG_BUILDER = schema.newMessageBuilder("wikipedia"); + } + + @Override + public byte[] serialize(List> event) throws IOException + { + Descriptors.Descriptor msgDesc = MSG_BUILDER.getDescriptorForType(); + for (Pair pair : event) { + MSG_BUILDER.setField(msgDesc.findFieldByName(pair.lhs), pair.rhs); + } + return MSG_BUILDER.build().toByteArray(); + } + + @Override + public void close() + { + } +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java index cad5acf79e68..4b46a2228899 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.testing.ProtobufEventSerializer; import java.io.Closeable; import java.io.IOException; @@ -43,7 +44,8 @@ @Type(name = CsvEventSerializer.TYPE, value = CsvEventSerializer.class), @Type(name = DelimitedEventSerializer.TYPE, value = DelimitedEventSerializer.class), @Type(name = AvroEventSerializer.TYPE, value = AvroEventSerializer.class), - @Type(name = AvroSchemaRegistryEventSerializer.TYPE, value = AvroSchemaRegistryEventSerializer.class) + @Type(name = AvroSchemaRegistryEventSerializer.TYPE, value = AvroSchemaRegistryEventSerializer.class), + @Type(name = ProtobufEventSerializer.TYPE, value = ProtobufEventSerializer.class) }) public interface EventSerializer extends Closeable { diff --git a/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json new file mode 100644 index 000000000000..d68cea57e8ea --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json @@ -0,0 +1,17 @@ +{ + "type": "protobuf", + "protoBytesDecoder": { + "descriptor": "%%DESC_FILE%%", + "protoMessageType": "Wikipedia" + }, + "parseSpec": { + "format": "json", + "timestampSpec": { + "column": "timestamp", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"] + } + } +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/stream/data/protobuf/parser/test.desc b/integration-tests/src/test/resources/stream/data/protobuf/parser/test.desc new file mode 100644 index 0000000000000000000000000000000000000000..3c7c6f53c93444ae4cccd62000a0a00e808123d9 GIT binary patch literal 727 zcmYk4O-{ow5QPcNpQnGQ8;lU*M?zx7h8twX63vEXhqG`XJX1IA zDthmGb~0}Qe+huoJNMf4*4dTd2N_Sl35d_;tLb#zsMR9~*KED7ww;U~q8IUgfhX%H zidNSUcWR^MCDRC55x9rERc12~oG`q^(FrGqo$h7KaDp)@0?#lVOs8CL+g8^yVIJeT zNwGV~jQ;+h`%{i2cr*y#K+@Z_buwd^;z1JfV<}_Jert!F%o%5RvO>ftKY6DrEd@t% z6r&X)l{Kz0+BgXu5hzBBBK{LAI~dpR<&*<}M}zP!q`lrW*2o#tDF#i@7h5%MFXxPA z$clg;Ox3k7X2E2R2N~X1>sC7tmkbw}1tXmF$|-rpaES>^kBS*UIj*|MASr39#S}z& z+#LF#=R;40fUG=}!tbLB_)Z>~lVKos L`TN%?|G#hl>O;d$ literal 0 HcmV?d00001 diff --git a/integration-tests/src/test/resources/stream/data/protobuf/serializer/serializer.json b/integration-tests/src/test/resources/stream/data/protobuf/serializer/serializer.json new file mode 100644 index 000000000000..12d8e576f091 --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/protobuf/serializer/serializer.json @@ -0,0 +1,3 @@ +{ + "type": "protobuf" +} \ No newline at end of file From c53cf8388e76acae32f109f106bb4190a4e412c1 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 17 Apr 2021 13:56:44 +0800 Subject: [PATCH 02/31] test --- .../apache/druid/tests/indexer/AbstractStreamIndexingTest.java | 3 +++ .../stream/data/protobuf/parser/input_row_parser.json | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java index 3c9ecda273f8..95f3457bcd98 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java @@ -77,6 +77,8 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest private static final String QUERIES_FILE = "/stream/queries/stream_index_queries.json"; private static final String SUPERVISOR_SPEC_TEMPLATE_FILE = "supervisor_spec_template.json"; private static final String SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE = "supervisor_with_autoscaler_spec_template.json"; + private static final String SUPERVISOR_PROTOBUF_FILE = "test.desc"; + protected static final String DATA_RESOURCE_ROOT = "/stream/data"; protected static final String SUPERVISOR_SPEC_TEMPLATE_PATH = @@ -142,6 +144,7 @@ protected static List listDataFormatResources() throws IOException .stream() .filter(resource -> !SUPERVISOR_SPEC_TEMPLATE_FILE.equals(resource)) .filter(resource -> !SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE.equals(resource)) + .filter(resource -> !SUPERVISOR_PROTOBUF_FILE.equals(resource)) .collect(Collectors.toList()); } diff --git a/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json index d68cea57e8ea..b3f242cf8996 100644 --- a/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json +++ b/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json @@ -1,7 +1,7 @@ { "type": "protobuf", "protoBytesDecoder": { - "descriptor": "%%DESC_FILE%%", + "descriptor": "/stream/data/test.desc", "protoMessageType": "Wikipedia" }, "parseSpec": { From 9836ad2363fa3f44c6a478d80b40df75db4d95ef Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 17 Apr 2021 14:01:52 +0800 Subject: [PATCH 03/31] for test --- .../stream/data/{protobuf/parser => }/test.desc | Bin 1 file changed, 0 insertions(+), 0 deletions(-) rename integration-tests/src/test/resources/stream/data/{protobuf/parser => }/test.desc (100%) diff --git a/integration-tests/src/test/resources/stream/data/protobuf/parser/test.desc b/integration-tests/src/test/resources/stream/data/test.desc similarity index 100% rename from integration-tests/src/test/resources/stream/data/protobuf/parser/test.desc rename to integration-tests/src/test/resources/stream/data/test.desc From c05f098efa55aff36059240749338a87f7e020e2 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 17 Apr 2021 14:23:23 +0800 Subject: [PATCH 04/31] bug fixed --- .../java/org/apache/druid/testing/ProtobufEventSerializer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java index 8765d85b283d..47f42f095f8a 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java @@ -66,7 +66,7 @@ public class ProtobufEventSerializer implements EventSerializer catch (Descriptors.DescriptorValidationException e) { LOGGER.error("Could not init protobuf builder."); } - MSG_BUILDER = schema.newMessageBuilder("wikipedia"); + MSG_BUILDER = schema.newMessageBuilder("Wikipedia"); } @Override From 091c28996bf91ac0692a2324dc27c7b042844520 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 17 Apr 2021 14:34:22 +0800 Subject: [PATCH 05/31] test --- .../java/org/apache/druid/testing/ProtobufEventSerializer.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java index 47f42f095f8a..374f3059b651 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java @@ -74,6 +74,8 @@ public byte[] serialize(List> event) throws IOException { Descriptors.Descriptor msgDesc = MSG_BUILDER.getDescriptorForType(); for (Pair pair : event) { + LOGGER.error(pair.lhs); + LOGGER.error(pair.rhs.toString()); MSG_BUILDER.setField(msgDesc.findFieldByName(pair.lhs), pair.rhs); } return MSG_BUILDER.build().toByteArray(); From 9dcd6162ebdf43675b2ea79bf629811d808ddc2a Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 17 Apr 2021 14:39:21 +0800 Subject: [PATCH 06/31] test --- .../org/apache/druid/testing/ProtobufEventSerializer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java index 374f3059b651..e3f6eaa0ab29 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java @@ -54,9 +54,9 @@ public class ProtobufEventSerializer implements EventSerializer .addField("optional", "string", "country", 11) .addField("optional", "string", "region", 12) .addField("optional", "string", "city", 13) - .addField("optional", "string", "added", 14) - .addField("optional", "string", "deleted", 15) - .addField("optional", "string", "delta", 16) + .addField("optional", "int32", "added", 14) + .addField("optional", "int32", "deleted", 15) + .addField("optional", "int32", "delta", 16) .build(); schemaBuilder.addMessageDefinition(wikiDef); DynamicSchema schema = null; From 9d336451b50007cb133399a710807259cc5558d2 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 17 Apr 2021 14:48:37 +0800 Subject: [PATCH 07/31] test --- .../src/test/resources/stream/data/test.desc | Bin 727 -> 727 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/integration-tests/src/test/resources/stream/data/test.desc b/integration-tests/src/test/resources/stream/data/test.desc index 3c7c6f53c93444ae4cccd62000a0a00e808123d9..d1a95dfee390772369f3c9f5673ad27022deed9c 100644 GIT binary patch delta 25 ccmcc4dYyH{X+}oY$!8d4faFUs$;NaM0C58c$N&HU delta 25 ccmcc4dYyH{X+}oQ$!8d4faFUs$;NaM0C9&0)Bpeg From ec4e2b37efcc405e080c970e8fbd8bb15e7ea9c5 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 17 Apr 2021 15:03:40 +0800 Subject: [PATCH 08/31] bug fixed --- .../java/org/apache/druid/testing/ProtobufEventSerializer.java | 2 -- .../resources/stream/data/protobuf/parser/input_row_parser.json | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java index e3f6eaa0ab29..033d10a0ecc1 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java @@ -74,8 +74,6 @@ public byte[] serialize(List> event) throws IOException { Descriptors.Descriptor msgDesc = MSG_BUILDER.getDescriptorForType(); for (Pair pair : event) { - LOGGER.error(pair.lhs); - LOGGER.error(pair.rhs.toString()); MSG_BUILDER.setField(msgDesc.findFieldByName(pair.lhs), pair.rhs); } return MSG_BUILDER.build().toByteArray(); diff --git a/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json index b3f242cf8996..580d39a7cd7a 100644 --- a/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json +++ b/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json @@ -1,6 +1,7 @@ { "type": "protobuf", "protoBytesDecoder": { + "type": "file", "descriptor": "/stream/data/test.desc", "protoMessageType": "Wikipedia" }, From 9b28568b1eaf7483f598a853180d90dda7c5604e Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 17 Apr 2021 16:29:12 +0800 Subject: [PATCH 09/31] delete auto scaler --- ...ndexingServiceNonTransactionalParallelizedTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java index 967ff524b2ea..5122c931a2a1 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java @@ -56,11 +56,11 @@ public void testKafkaIndexDataWithStartStopSupervisor() throws Exception * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource * and supervisor maintained and scoped within this test only */ - @Test - public void testKafkaIndexDataWithWithAutoscaler() throws Exception - { - doTestIndexDataWithAutoscaler(false); - } +// @Test +// public void testKafkaIndexDataWithWithAutoscaler() throws Exception +// { +// doTestIndexDataWithAutoscaler(false); +// } /** * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource From 3f7c10bffb617651fc4c04d6520bbe9b3117a25c Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 17 Apr 2021 16:36:03 +0800 Subject: [PATCH 10/31] add input format --- ...exingServiceNonTransactionalParallelizedTest.java | 10 +++++----- .../data/protobuf/input_format/input_format.json | 12 ++++++++++++ 2 files changed, 17 insertions(+), 5 deletions(-) create mode 100644 integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java index 5122c931a2a1..967ff524b2ea 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java @@ -56,11 +56,11 @@ public void testKafkaIndexDataWithStartStopSupervisor() throws Exception * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource * and supervisor maintained and scoped within this test only */ -// @Test -// public void testKafkaIndexDataWithWithAutoscaler() throws Exception -// { -// doTestIndexDataWithAutoscaler(false); -// } + @Test + public void testKafkaIndexDataWithWithAutoscaler() throws Exception + { + doTestIndexDataWithAutoscaler(false); + } /** * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource diff --git a/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json new file mode 100644 index 000000000000..d64faa820d3a --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json @@ -0,0 +1,12 @@ +{ + "type": "protobuf", + "protoBytesDecoder": { + "type": "file", + "descriptor": "/stream/data/test.desc", + "protoMessageType": "Wikipedia" + }, + "flattenSpec": { + "useFieldDiscovery": true + }, + "binaryAsString": false +} \ No newline at end of file From 5b45b6caaf779107b91ea46a585b3b058cc2df78 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 17 Apr 2021 16:58:30 +0800 Subject: [PATCH 11/31] add extensions --- integration-tests/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index e3d4767e1e0e..cf48bd26ed32 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -116,6 +116,12 @@ ${project.parent.version} runtime + + org.apache.druid.extensions + druid-protobuf-extensions + ${project.parent.version} + runtime + org.apache.druid.extensions druid-s3-extensions From 5ee72ec7fbf5ed50068d77146c58f03bbd301d12 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 17 Apr 2021 17:20:24 +0800 Subject: [PATCH 12/31] bug fixed --- .../tests/indexer/AbstractKafkaIndexingServiceTest.java | 6 ++++++ .../druid/tests/indexer/AbstractStreamIndexingTest.java | 4 ++++ .../stream/data/protobuf/parser/input_row_parser.json | 2 +- 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java index 5ea11e6992cd..427985f47341 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java @@ -118,6 +118,12 @@ Function generateStreamIngestionPropsTransform( "consumerProperties" ); + spec = StringUtils.replace( + spec, + "%%PROTOBUF_DESC_FILE%%", + PROTOBUF_DESC_FILE_PATH + ); + spec = StringUtils.replace( spec, "%%SCHEMA_REGISTRY_HOST%%", diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java index 95f3457bcd98..4d12e426a9c5 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java @@ -94,6 +94,9 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest protected static final String INPUT_FORMAT = "inputFormat"; protected static final String INPUT_ROW_PARSER = "parser"; + protected static String PROTOBUF_DESC_FILE_PATH = + String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_PROTOBUF_FILE); + private static final String JSON_INPUT_FORMAT_PATH = String.join("/", DATA_RESOURCE_ROOT, "json", INPUT_FORMAT_SPEC_DIR, "input_format.json"); @@ -154,6 +157,7 @@ protected static List listDataFormatResources() throws IOException */ protected static Map findTestSpecs(String resourceRoot) throws IOException { + PROTOBUF_DESC_FILE_PATH=PROTOBUF_DESC_FILE_PATH+resourceRoot; final List specDirs = listResources(resourceRoot); final Map map = new HashMap<>(); for (String eachSpec : specDirs) { diff --git a/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json index 580d39a7cd7a..852a81c924c3 100644 --- a/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json +++ b/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json @@ -2,7 +2,7 @@ "type": "protobuf", "protoBytesDecoder": { "type": "file", - "descriptor": "/stream/data/test.desc", + "descriptor": "%%PROTOBUF_DESC_FILE%%", "protoMessageType": "Wikipedia" }, "parseSpec": { From a79dbf6b21ceb31988401fe0428e01952e6d8b0b Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 17 Apr 2021 17:26:37 +0800 Subject: [PATCH 13/31] bug fixed --- .../apache/druid/tests/indexer/AbstractStreamIndexingTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java index 4d12e426a9c5..93cfe5f5f9c6 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java @@ -157,7 +157,7 @@ protected static List listDataFormatResources() throws IOException */ protected static Map findTestSpecs(String resourceRoot) throws IOException { - PROTOBUF_DESC_FILE_PATH=PROTOBUF_DESC_FILE_PATH+resourceRoot; + PROTOBUF_DESC_FILE_PATH=String.join("/", resourceRoot, PROTOBUF_DESC_FILE_PATH); final List specDirs = listResources(resourceRoot); final Map map = new HashMap<>(); for (String eachSpec : specDirs) { From 61cd476a08520b8423437de8ca2fabe232ca10ef Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 17 Apr 2021 17:29:21 +0800 Subject: [PATCH 14/31] bug fixed --- .../stream/data/protobuf/input_format/input_format.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json index d64faa820d3a..584c20d63345 100644 --- a/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json +++ b/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json @@ -2,7 +2,7 @@ "type": "protobuf", "protoBytesDecoder": { "type": "file", - "descriptor": "/stream/data/test.desc", + "descriptor": "%%PROTOBUF_DESC_FILE%%", "protoMessageType": "Wikipedia" }, "flattenSpec": { From ed624242fac094cd1c54b3d35376b8c82d7bb8bc Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 17 Apr 2021 19:27:27 +0800 Subject: [PATCH 15/31] revert --- .../tests/indexer/AbstractKafkaIndexingServiceTest.java | 7 ------- .../druid/tests/indexer/AbstractStreamIndexingTest.java | 4 ---- 2 files changed, 11 deletions(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java index 427985f47341..54a234da8902 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java @@ -117,13 +117,6 @@ Function generateStreamIngestionPropsTransform( "%%STREAM_PROPERTIES_KEY%%", "consumerProperties" ); - - spec = StringUtils.replace( - spec, - "%%PROTOBUF_DESC_FILE%%", - PROTOBUF_DESC_FILE_PATH - ); - spec = StringUtils.replace( spec, "%%SCHEMA_REGISTRY_HOST%%", diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java index 93cfe5f5f9c6..95f3457bcd98 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java @@ -94,9 +94,6 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest protected static final String INPUT_FORMAT = "inputFormat"; protected static final String INPUT_ROW_PARSER = "parser"; - protected static String PROTOBUF_DESC_FILE_PATH = - String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_PROTOBUF_FILE); - private static final String JSON_INPUT_FORMAT_PATH = String.join("/", DATA_RESOURCE_ROOT, "json", INPUT_FORMAT_SPEC_DIR, "input_format.json"); @@ -157,7 +154,6 @@ protected static List listDataFormatResources() throws IOException */ protected static Map findTestSpecs(String resourceRoot) throws IOException { - PROTOBUF_DESC_FILE_PATH=String.join("/", resourceRoot, PROTOBUF_DESC_FILE_PATH); final List specDirs = listResources(resourceRoot); final Map map = new HashMap<>(); for (String eachSpec : specDirs) { From 2d6da267c47095ce77e063b736fbfa7f97eef970 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 17 Apr 2021 19:52:04 +0800 Subject: [PATCH 16/31] add schema registry test --- integration-tests/pom.xml | 5 + .../druid/testing/utils/EventSerializer.java | 4 +- .../{ => utils}/ProtobufEventSerializer.java | 3 +- ...ProtobufSchemaRegistryEventSerializer.java | 96 +++++++++++++++++++ .../input_format/input_format.json | 15 +++ .../parser/input_row_parser.json | 21 ++++ .../serializer/serializer.json | 3 + 7 files changed, 143 insertions(+), 4 deletions(-) rename integration-tests/src/main/java/org/apache/druid/testing/{ => utils}/ProtobufEventSerializer.java (97%) create mode 100644 integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java create mode 100644 integration-tests/src/test/resources/stream/data/protobuf_schema_registry/input_format/input_format.json create mode 100644 integration-tests/src/test/resources/stream/data/protobuf_schema_registry/parser/input_row_parser.json create mode 100644 integration-tests/src/test/resources/stream/data/protobuf_schema_registry/serializer/serializer.json diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index cf48bd26ed32..4b3d11e508e1 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -368,6 +368,11 @@ + + io.confluent + kafka-protobuf-provider + 5.5.1 + diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java index 4b46a2228899..d7de555a8c20 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; import org.apache.druid.java.util.common.Pair; -import org.apache.druid.testing.ProtobufEventSerializer; import java.io.Closeable; import java.io.IOException; @@ -45,7 +44,8 @@ @Type(name = DelimitedEventSerializer.TYPE, value = DelimitedEventSerializer.class), @Type(name = AvroEventSerializer.TYPE, value = AvroEventSerializer.class), @Type(name = AvroSchemaRegistryEventSerializer.TYPE, value = AvroSchemaRegistryEventSerializer.class), - @Type(name = ProtobufEventSerializer.TYPE, value = ProtobufEventSerializer.class) + @Type(name = ProtobufEventSerializer.TYPE, value = ProtobufEventSerializer.class), + @Type(name = ProtobufSchemaRegistryEventSerializer.TYPE, value = ProtobufSchemaRegistryEventSerializer.class) }) public interface EventSerializer extends Closeable { diff --git a/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java similarity index 97% rename from integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java rename to integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java index 033d10a0ecc1..f4757ef84c5d 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/ProtobufEventSerializer.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.testing; +package org.apache.druid.testing.utils; import com.github.os72.protobuf.dynamic.DynamicSchema; import com.github.os72.protobuf.dynamic.MessageDefinition; @@ -25,7 +25,6 @@ import com.google.protobuf.DynamicMessage; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.testing.utils.EventSerializer; import java.io.IOException; import java.util.List; diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java new file mode 100644 index 000000000000..2c508312b488 --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Descriptors; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.RetryUtils; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testing.IntegrationTestingConfig; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +public class ProtobufSchemaRegistryEventSerializer extends ProtobufEventSerializer +{ + private static final int MAX_INITIALIZE_RETRIES = 10; + public static final String TYPE = "protobuf-schema-registry"; + + private final IntegrationTestingConfig config; + private final CachedSchemaRegistryClient client; + private int schemaId = -1; + + + @JsonCreator + public ProtobufSchemaRegistryEventSerializer( + @JacksonInject IntegrationTestingConfig config + ) + { + this.config = config; + this.client = new CachedSchemaRegistryClient( + StringUtils.format("http://%s", config.getSchemaRegistryHost()), + Integer.MAX_VALUE, + ImmutableMap.of( + "basic.auth.credentials.source", "USER_INFO", + "basic.auth.user.info", "druid:diurd" + ), + ImmutableMap.of() + ); + + } + + @Override + public void initialize(String topic) + { + try { + RetryUtils.retry( + () -> { + schemaId = client.register(topic, new ProtobufSchema(ProtobufEventSerializer.MSG_BUILDER.getDescriptorForType())); + fromRegistry = client.getById(schemaId); + return 0; + }, + (e) -> true, + MAX_INITIALIZE_RETRIES + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public byte[] serialize(List> event) throws IOException + { + Descriptors.Descriptor msgDesc = MSG_BUILDER.getDescriptorForType(); + for (Pair pair : event) { + MSG_BUILDER.setField(msgDesc.findFieldByName(pair.lhs), pair.rhs); + } + byte[] bytes = MSG_BUILDER.build().toByteArray(); + ByteBuffer bb = ByteBuffer.allocate(bytes.length + 6).put((byte) 0).putInt(schemaId).put((byte) 0).put(bytes); + bb.rewind(); + return bb.array(); + } +} diff --git a/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/input_format/input_format.json new file mode 100644 index 000000000000..af8f11403345 --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/input_format/input_format.json @@ -0,0 +1,15 @@ +{ + "type": "protobuf", + "avroBytesDecoder": { + "type": "schema_registry", + "url": "%%SCHEMA_REGISTRY_HOST%%", + "config": { + "basic.auth.credentials.source": "USER_INFO", + "basic.auth.user.info": "druid:diurd" + } + }, + "flattenSpec": { + "useFieldDiscovery": true + }, + "binaryAsString": false +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/parser/input_row_parser.json new file mode 100644 index 000000000000..2db694754f79 --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/parser/input_row_parser.json @@ -0,0 +1,21 @@ +{ + "type": "protobuf", + "protoBytesDecoder" : { + "type": "schema_registry", + "url": "%%SCHEMA_REGISTRY_HOST%%", + "config": { + "basic.auth.credentials.source": "USER_INFO", + "basic.auth.user.info": "druid:diurd" + } + }, + "parseSpec": { + "format": "json", + "timestampSpec": { + "column": "timestamp", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"] + } + } +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/serializer/serializer.json b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/serializer/serializer.json new file mode 100644 index 000000000000..9f1ed6a274f8 --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/serializer/serializer.json @@ -0,0 +1,3 @@ +{ + "type": "protobuf-schema-registry" +} \ No newline at end of file From 4d74ccaa427ed3d24d72049bbc126caf99f06dc6 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 17 Apr 2021 19:56:02 +0800 Subject: [PATCH 17/31] bug fixed --- .../testing/utils/ProtobufSchemaRegistryEventSerializer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java index 2c508312b488..2f0c92b4782f 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java @@ -69,7 +69,6 @@ public void initialize(String topic) RetryUtils.retry( () -> { schemaId = client.register(topic, new ProtobufSchema(ProtobufEventSerializer.MSG_BUILDER.getDescriptorForType())); - fromRegistry = client.getById(schemaId); return 0; }, (e) -> true, From e916ebf4968bde5a76ba648c4ff4d9ac50a10b60 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 17 Apr 2021 20:19:28 +0800 Subject: [PATCH 18/31] bug fixed --- .../protobuf_schema_registry/input_format/input_format.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/input_format/input_format.json index af8f11403345..a0e599329e82 100644 --- a/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/input_format/input_format.json +++ b/integration-tests/src/test/resources/stream/data/protobuf_schema_registry/input_format/input_format.json @@ -1,6 +1,6 @@ { "type": "protobuf", - "avroBytesDecoder": { + "protoBytesDecoder": { "type": "schema_registry", "url": "%%SCHEMA_REGISTRY_HOST%%", "config": { From e27ec334fbfaba749181847bfe9089b1e092db77 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 17 Apr 2021 20:28:37 +0800 Subject: [PATCH 19/31] delete desc --- .../src/test/resources/stream/data/test.desc | Bin 727 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 integration-tests/src/test/resources/stream/data/test.desc diff --git a/integration-tests/src/test/resources/stream/data/test.desc b/integration-tests/src/test/resources/stream/data/test.desc deleted file mode 100644 index d1a95dfee390772369f3c9f5673ad27022deed9c..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 727 zcmYk4O>ToQ5QR&E0T219vZ<@8>W@^Zt8RJ&R#{fF=@L0$MX1C^#Hh**dY+!82dX`B zNLJ{*@7XYKfIkLc)!x2!y|HG+>EVs%Uk3cM`D!{ukdXB zMA7O7;!bXrm=KMSlm`!xwNh^f52pktI6C3vuvfi^2~IKg%7Yik2i-~Ao3>T8V8mm* zG=1zIQmwu}{QU_<7@iD|Z-Mn@W2{ICCU}${xv``+`mi&@USxz*JX;~6rJKByl@gpH z8S>Ezk;-V>Xr-+Hig4tkG=@A4E<8z?0$e9VETlHb#pX(Hw*J&=p%X_8{hj zW=P5dH<+wzSIoi@ww~irhWFK~Ro1~J!3Czl2q&GgQd|>UA|vTgF#{;aRTn9+(r>lM zftRN(lIwR=EMQh%Y^VUp%CL$egQTS1>B_NYQe%Cj#-`|e=&9h4md8@)eZB&^lSZaw O7>Hc@{!K#v@B2UB55qzL From a0fdf6e9f343b45daa0887ec375a041679ac8ecb Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 17 Apr 2021 20:46:16 +0800 Subject: [PATCH 20/31] delete change --- .../druid/tests/indexer/AbstractKafkaIndexingServiceTest.java | 1 + .../apache/druid/tests/indexer/AbstractStreamIndexingTest.java | 3 --- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java index 54a234da8902..5ea11e6992cd 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java @@ -117,6 +117,7 @@ Function generateStreamIngestionPropsTransform( "%%STREAM_PROPERTIES_KEY%%", "consumerProperties" ); + spec = StringUtils.replace( spec, "%%SCHEMA_REGISTRY_HOST%%", diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java index 95f3457bcd98..3c9ecda273f8 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java @@ -77,8 +77,6 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest private static final String QUERIES_FILE = "/stream/queries/stream_index_queries.json"; private static final String SUPERVISOR_SPEC_TEMPLATE_FILE = "supervisor_spec_template.json"; private static final String SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE = "supervisor_with_autoscaler_spec_template.json"; - private static final String SUPERVISOR_PROTOBUF_FILE = "test.desc"; - protected static final String DATA_RESOURCE_ROOT = "/stream/data"; protected static final String SUPERVISOR_SPEC_TEMPLATE_PATH = @@ -144,7 +142,6 @@ protected static List listDataFormatResources() throws IOException .stream() .filter(resource -> !SUPERVISOR_SPEC_TEMPLATE_FILE.equals(resource)) .filter(resource -> !SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE.equals(resource)) - .filter(resource -> !SUPERVISOR_PROTOBUF_FILE.equals(resource)) .collect(Collectors.toList()); } From 7590502d382a943a01ad01ae0bdc889e6300d8d5 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Tue, 20 Apr 2021 13:12:22 +0800 Subject: [PATCH 21/31] add desc --- integration-tests/docker/Dockerfile | 4 ++++ integration-tests/docker/test-data/wikipedia.desc | Bin 0 -> 727 bytes integration-tests/pom.xml | 3 +++ integration-tests/script/copy_resources.sh | 1 + .../script/docker_build_containers.sh | 6 +++--- 5 files changed, 11 insertions(+), 3 deletions(-) create mode 100644 integration-tests/docker/test-data/wikipedia.desc diff --git a/integration-tests/docker/Dockerfile b/integration-tests/docker/Dockerfile index 421d34740552..59e13e518a8b 100644 --- a/integration-tests/docker/Dockerfile +++ b/integration-tests/docker/Dockerfile @@ -25,6 +25,7 @@ RUN APACHE_ARCHIVE_MIRROR_HOST=${APACHE_ARCHIVE_MIRROR_HOST} /root/base-setup.sh FROM druidbase ARG MYSQL_VERSION +ARG CONFLUENT_VERSION # Verify Java version RUN java -version @@ -45,6 +46,9 @@ ADD lib/* /usr/local/druid/lib/ RUN wget -q "https://repo1.maven.org/maven2/mysql/mysql-connector-java/$MYSQL_VERSION/mysql-connector-java-$MYSQL_VERSION.jar" \ -O /usr/local/druid/lib/mysql-connector-java.jar +RUN wget -q "https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/$CONFLUENT_VERSION/kafka-protobuf-provider-$CONFLUENT_VERSION.jar" \ + -O /usr/local/druid/lib/mysql-connector-java.jar + # Add sample data # touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72. RUN find /var/lib/mysql -type f -exec touch {} \; && service mysql start \ diff --git a/integration-tests/docker/test-data/wikipedia.desc b/integration-tests/docker/test-data/wikipedia.desc new file mode 100644 index 0000000000000000000000000000000000000000..3c7c6f53c93444ae4cccd62000a0a00e808123d9 GIT binary patch literal 727 zcmYk4O-{ow5QPcNpQnGQ8;lU*M?zx7h8twX63vEXhqG`XJX1IA zDthmGb~0}Qe+huoJNMf4*4dTd2N_Sl35d_;tLb#zsMR9~*KED7ww;U~q8IUgfhX%H zidNSUcWR^MCDRC55x9rERc12~oG`q^(FrGqo$h7KaDp)@0?#lVOs8CL+g8^yVIJeT zNwGV~jQ;+h`%{i2cr*y#K+@Z_buwd^;z1JfV<}_Jert!F%o%5RvO>ftKY6DrEd@t% z6r&X)l{Kz0+BgXu5hzBBBK{LAI~dpR<&*<}M}zP!q`lrW*2o#tDF#i@7h5%MFXxPA z$clg;Ox3k7X2E2R2N~X1>sC7tmkbw}1tXmF$|-rpaES>^kBS*UIj*|MASr39#S}z& z+#LF#=R;40fUG=}!tbLB_)Z>~lVKos L`TN%?|G#hl>O;d$ literal 0 HcmV?d00001 diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 4b3d11e508e1..2af8522d77e2 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -372,6 +372,7 @@ io.confluent kafka-protobuf-provider 5.5.1 + provided @@ -483,6 +484,8 @@ ${docker.run.skip} ${it.indexer} ${mysql.version} + ${mysql.version} + 5.5.1 ${zk.version} ${project.basedir}/build_run_cluster.sh diff --git a/integration-tests/script/copy_resources.sh b/integration-tests/script/copy_resources.sh index 6495dad44b1a..4dd1f0a00549 100755 --- a/integration-tests/script/copy_resources.sh +++ b/integration-tests/script/copy_resources.sh @@ -80,6 +80,7 @@ fi mkdir -p $SHARED_DIR/wikiticker-it cp ../examples/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz $SHARED_DIR/wikiticker-it/wikiticker-2015-09-12-sampled.json.gz cp docker/wiki-simple-lookup.json $SHARED_DIR/wikiticker-it/wiki-simple-lookup.json +cp docker/test-data/wikipedia.desc $SHARED_DIR/wikiticker-it/wikipedia.desc # copy other files if needed if [ -n "$DRUID_INTEGRATION_TEST_RESOURCE_FILE_DIR_PATH" ] diff --git a/integration-tests/script/docker_build_containers.sh b/integration-tests/script/docker_build_containers.sh index ef3df477dec3..629ffcd9640a 100755 --- a/integration-tests/script/docker_build_containers.sh +++ b/integration-tests/script/docker_build_containers.sh @@ -22,17 +22,17 @@ set -e if [ -z "$DRUID_INTEGRATION_TEST_JVM_RUNTIME" ] then echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is not set. Building druid-cluster with default Java version" - docker build -t druid/cluster --build-arg MYSQL_VERSION $SHARED_DIR/docker + docker build -t druid/cluster --build-arg MYSQL_VERSION CONFLUENT_VERSION $SHARED_DIR/docker else echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is set with value ${DRUID_INTEGRATION_TEST_JVM_RUNTIME}" case "${DRUID_INTEGRATION_TEST_JVM_RUNTIME}" in 8) echo "Build druid-cluster with Java 8" - docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker + docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg MYSQL_VERSION --build-arg CONFLUENT_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker ;; 11) echo "Build druid-cluster with Java 11" - docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker + docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg MYSQL_VERSION --build-arg CONFLUENT_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker ;; *) echo "Invalid JVM Runtime given. Stopping" From fdeb860f40fcba3827e285895938ee5bd24806fc Mon Sep 17 00:00:00 2001 From: yuanyi Date: Tue, 20 Apr 2021 13:25:31 +0800 Subject: [PATCH 22/31] bug fixed --- integration-tests/docker/Dockerfile | 2 +- integration-tests/script/docker_build_containers.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/integration-tests/docker/Dockerfile b/integration-tests/docker/Dockerfile index 59e13e518a8b..7c03673c19ed 100644 --- a/integration-tests/docker/Dockerfile +++ b/integration-tests/docker/Dockerfile @@ -47,7 +47,7 @@ RUN wget -q "https://repo1.maven.org/maven2/mysql/mysql-connector-java/$MYSQL_VE -O /usr/local/druid/lib/mysql-connector-java.jar RUN wget -q "https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/$CONFLUENT_VERSION/kafka-protobuf-provider-$CONFLUENT_VERSION.jar" \ - -O /usr/local/druid/lib/mysql-connector-java.jar + -O /usr/local/druid/lib/kafka-protobuf-provider.jar # Add sample data # touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72. diff --git a/integration-tests/script/docker_build_containers.sh b/integration-tests/script/docker_build_containers.sh index 629ffcd9640a..e6ecd5c0972d 100755 --- a/integration-tests/script/docker_build_containers.sh +++ b/integration-tests/script/docker_build_containers.sh @@ -22,7 +22,7 @@ set -e if [ -z "$DRUID_INTEGRATION_TEST_JVM_RUNTIME" ] then echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is not set. Building druid-cluster with default Java version" - docker build -t druid/cluster --build-arg MYSQL_VERSION CONFLUENT_VERSION $SHARED_DIR/docker + docker build -t druid/cluster --build-arg MYSQL_VERSION --build-arg CONFLUENT_VERSION $SHARED_DIR/docker else echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is set with value ${DRUID_INTEGRATION_TEST_JVM_RUNTIME}" case "${DRUID_INTEGRATION_TEST_JVM_RUNTIME}" in From 9ed15ebfe0d8e797bf4cce9a6274840e23c135b7 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Tue, 20 Apr 2021 13:34:43 +0800 Subject: [PATCH 23/31] test inputformat --- .../stream/data/protobuf/input_format/input_format.json | 2 +- .../resources/stream/data/protobuf/parser/input_row_parser.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json index 584c20d63345..99e1ddae3167 100644 --- a/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json +++ b/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json @@ -2,7 +2,7 @@ "type": "protobuf", "protoBytesDecoder": { "type": "file", - "descriptor": "%%PROTOBUF_DESC_FILE%%", + "descriptor": "/shared/wikiticker-it/wikipedia.desc", "protoMessageType": "Wikipedia" }, "flattenSpec": { diff --git a/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json index 852a81c924c3..f010a05a8e64 100644 --- a/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json +++ b/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json @@ -2,7 +2,7 @@ "type": "protobuf", "protoBytesDecoder": { "type": "file", - "descriptor": "%%PROTOBUF_DESC_FILE%%", + "descriptor": "/shared/wikiticker-it/wikipedia.desc", "protoMessageType": "Wikipedia" }, "parseSpec": { From 1c9258644505e35cce7f0517fb16085ad7db3486 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Tue, 20 Apr 2021 14:15:27 +0800 Subject: [PATCH 24/31] bug fixed --- .../stream/data/protobuf/input_format/input_format.json | 2 +- .../resources/stream/data/protobuf/parser/input_row_parser.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json index 99e1ddae3167..17a91426e53c 100644 --- a/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json +++ b/integration-tests/src/test/resources/stream/data/protobuf/input_format/input_format.json @@ -2,7 +2,7 @@ "type": "protobuf", "protoBytesDecoder": { "type": "file", - "descriptor": "/shared/wikiticker-it/wikipedia.desc", + "descriptor": "file:///shared/wikiticker-it/wikipedia.desc", "protoMessageType": "Wikipedia" }, "flattenSpec": { diff --git a/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json index f010a05a8e64..e55784de05b3 100644 --- a/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json +++ b/integration-tests/src/test/resources/stream/data/protobuf/parser/input_row_parser.json @@ -2,7 +2,7 @@ "type": "protobuf", "protoBytesDecoder": { "type": "file", - "descriptor": "/shared/wikiticker-it/wikipedia.desc", + "descriptor": "file:///shared/wikiticker-it/wikipedia.desc", "protoMessageType": "Wikipedia" }, "parseSpec": { From 7a9edd794724da651aec643964d852eef6927720 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Tue, 20 Apr 2021 15:34:27 +0800 Subject: [PATCH 25/31] bug fixed --- integration-tests/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 2af8522d77e2..e232a3c1b828 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -374,6 +374,11 @@ 5.5.1 provided + + com.google.protobuf + protobuf-java + 3.11.0 + From 60fc067be3c6e69cdde0a45a285ee3b227f756e1 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Tue, 20 Apr 2021 15:57:22 +0800 Subject: [PATCH 26/31] bug fixed --- .../docker/test-data/wikipedia.desc | Bin 727 -> 727 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/integration-tests/docker/test-data/wikipedia.desc b/integration-tests/docker/test-data/wikipedia.desc index 3c7c6f53c93444ae4cccd62000a0a00e808123d9..d1a95dfee390772369f3c9f5673ad27022deed9c 100644 GIT binary patch delta 25 ccmcc4dYyH{X+}oY$!8d4faFUs$;NaM0C58c$N&HU delta 25 ccmcc4dYyH{X+}oQ$!8d4faFUs$;NaM0C9&0)Bpeg From 32b47c51b48915f02a0c9e46c7dfabffb28baa75 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Tue, 20 Apr 2021 16:17:10 +0800 Subject: [PATCH 27/31] bug fixed --- .../apache/druid/testing/utils/ProtobufEventSerializer.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java index f4757ef84c5d..68b1872b1b58 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java @@ -26,7 +26,6 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.logger.Logger; -import java.io.IOException; import java.util.List; public class ProtobufEventSerializer implements EventSerializer @@ -69,7 +68,7 @@ public class ProtobufEventSerializer implements EventSerializer } @Override - public byte[] serialize(List> event) throws IOException + public byte[] serialize(List> event) { Descriptors.Descriptor msgDesc = MSG_BUILDER.getDescriptorForType(); for (Pair pair : event) { From ef3a0a4397fb26e06fe70edc92b3ac50f978f169 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 15 May 2021 14:24:57 +0800 Subject: [PATCH 28/31] delete io exception --- .../testing/utils/ProtobufSchemaRegistryEventSerializer.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java index 2f0c92b4782f..aa3dfaec36be 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java @@ -30,7 +30,6 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.testing.IntegrationTestingConfig; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; @@ -81,7 +80,7 @@ public void initialize(String topic) } @Override - public byte[] serialize(List> event) throws IOException + public byte[] serialize(List> event) { Descriptors.Descriptor msgDesc = MSG_BUILDER.getDescriptorForType(); for (Pair pair : event) { From 994e16fdafc997928fe0b0cc23ecea9c78670d93 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 15 May 2021 20:04:46 +0800 Subject: [PATCH 29/31] change builder not static --- .../testing/utils/ProtobufEventSerializer.java | 15 +++++++-------- .../ProtobufSchemaRegistryEventSerializer.java | 10 ++++++---- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java index 68b1872b1b58..c2b180f21a38 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java @@ -34,7 +34,7 @@ public class ProtobufEventSerializer implements EventSerializer private static final Logger LOGGER = new Logger(ProtobufEventSerializer.class); - public static final DynamicMessage.Builder MSG_BUILDER; + public static DynamicSchema SCHEMA; static { DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder(); @@ -57,24 +57,23 @@ public class ProtobufEventSerializer implements EventSerializer .addField("optional", "int32", "delta", 16) .build(); schemaBuilder.addMessageDefinition(wikiDef); - DynamicSchema schema = null; try { - schema = schemaBuilder.build(); + SCHEMA = schemaBuilder.build(); } catch (Descriptors.DescriptorValidationException e) { - LOGGER.error("Could not init protobuf builder."); + LOGGER.error("Could not init protobuf schema."); } - MSG_BUILDER = schema.newMessageBuilder("Wikipedia"); } @Override public byte[] serialize(List> event) { - Descriptors.Descriptor msgDesc = MSG_BUILDER.getDescriptorForType(); + DynamicMessage.Builder builder = SCHEMA.newMessageBuilder("Wikipedia"); + Descriptors.Descriptor msgDesc = builder.getDescriptorForType(); for (Pair pair : event) { - MSG_BUILDER.setField(msgDesc.findFieldByName(pair.lhs), pair.rhs); + builder.setField(msgDesc.findFieldByName(pair.lhs), pair.rhs); } - return MSG_BUILDER.build().toByteArray(); + return builder.build().toByteArray(); } @Override diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java index aa3dfaec36be..8d80b22af35d 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufSchemaRegistryEventSerializer.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.google.common.collect.ImmutableMap; import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import org.apache.druid.java.util.common.Pair; @@ -67,7 +68,7 @@ public void initialize(String topic) try { RetryUtils.retry( () -> { - schemaId = client.register(topic, new ProtobufSchema(ProtobufEventSerializer.MSG_BUILDER.getDescriptorForType())); + schemaId = client.register(topic, new ProtobufSchema(ProtobufEventSerializer.SCHEMA.newMessageBuilder("Wikipedia").getDescriptorForType())); return 0; }, (e) -> true, @@ -82,11 +83,12 @@ public void initialize(String topic) @Override public byte[] serialize(List> event) { - Descriptors.Descriptor msgDesc = MSG_BUILDER.getDescriptorForType(); + DynamicMessage.Builder builder = SCHEMA.newMessageBuilder("Wikipedia"); + Descriptors.Descriptor msgDesc = builder.getDescriptorForType(); for (Pair pair : event) { - MSG_BUILDER.setField(msgDesc.findFieldByName(pair.lhs), pair.rhs); + builder.setField(msgDesc.findFieldByName(pair.lhs), pair.rhs); } - byte[] bytes = MSG_BUILDER.build().toByteArray(); + byte[] bytes = builder.build().toByteArray(); ByteBuffer bb = ByteBuffer.allocate(bytes.length + 6).put((byte) 0).putInt(schemaId).put((byte) 0).put(bytes); bb.rewind(); return bb.array(); From b7d38b64e0154412de693308e4cabb64464c812a Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 15 May 2021 20:17:23 +0800 Subject: [PATCH 30/31] change pom --- integration-tests/pom.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index e232a3c1b828..5a5884a4c9b0 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -489,7 +489,6 @@ ${docker.run.skip} ${it.indexer} ${mysql.version} - ${mysql.version} 5.5.1 ${zk.version} From 7dec936e708c8bde8b282a8e2a8ec7e0c9ac5db5 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Mon, 17 May 2021 18:49:53 +0800 Subject: [PATCH 31/31] bug fixed --- .../apache/druid/testing/utils/ProtobufEventSerializer.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java index c2b180f21a38..cc9b779efd1a 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/ProtobufEventSerializer.java @@ -34,7 +34,7 @@ public class ProtobufEventSerializer implements EventSerializer private static final Logger LOGGER = new Logger(ProtobufEventSerializer.class); - public static DynamicSchema SCHEMA; + public static final DynamicSchema SCHEMA; static { DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder(); @@ -57,12 +57,14 @@ public class ProtobufEventSerializer implements EventSerializer .addField("optional", "int32", "delta", 16) .build(); schemaBuilder.addMessageDefinition(wikiDef); + DynamicSchema schema = null; try { - SCHEMA = schemaBuilder.build(); + schema = schemaBuilder.build(); } catch (Descriptors.DescriptorValidationException e) { LOGGER.error("Could not init protobuf schema."); } + SCHEMA = schema; } @Override