diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml index 4c640a62fe18..466ed02476e6 100644 --- a/benchmarks/pom.xml +++ b/benchmarks/pom.xml @@ -169,6 +169,12 @@ junit test + + org.apache.druid.extensions + druid-protobuf-extensions + 0.19.0-SNAPSHOT + test + diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java new file mode 100644 index 000000000000..b408076806cf --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark; + +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.JSONParseSpec; +import org.apache.druid.data.input.impl.ParseSpec; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.data.input.protobuf.ProtobufInputRowParser; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 10) +@Measurement(iterations = 25) +public class ProtobufParserBenchmark +{ + @Param({"75000"}) + private int rowsPerSegment; + + private static final Logger log = new Logger(ProtobufParserBenchmark.class); + + static { + NullHandling.initializeForTests(); + } + + private ParseSpec nestedParseSpec; + private ProtobufInputRowParser nestedParser; + private ParseSpec flatParseSpec; + private ProtobufInputRowParser flatParser; + private byte[] protoInputs; + private String protoFilePath; + + @Setup + public void setup() + { + nestedParseSpec = new JSONParseSpec( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(Lists.newArrayList( + new StringDimensionSchema("event"), + new StringDimensionSchema("id"), + new StringDimensionSchema("someOtherId"), + new StringDimensionSchema("isValid") + ), null, null), + new JSONPathSpec( + true, + Lists.newArrayList( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "eventType", "eventType"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "foobar", "$.foo.bar"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar0", "$.bar[0].bar") + ) + ), + null, + null + ); + + flatParseSpec = new JSONParseSpec( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(Lists.newArrayList( + new StringDimensionSchema("event"), + new StringDimensionSchema("id"), + new StringDimensionSchema("someOtherId"), + new StringDimensionSchema("isValid") + ), null, null), + null, + null, + null + ); + + protoFilePath = "ProtoFile"; + protoInputs = getProtoInputs(protoFilePath); + nestedParser = new ProtobufInputRowParser(nestedParseSpec, "prototest.desc", "ProtoTestEvent"); + flatParser = new ProtobufInputRowParser(flatParseSpec, "prototest.desc", "ProtoTestEvent"); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void consumeFlatData(Blackhole blackhole) + { + for (int i = 0; i < rowsPerSegment; i++) { + InputRow row = flatParser.parseBatch(ByteBuffer.wrap(protoInputs)).get(0); + blackhole.consume(row); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void consumeNestedData(Blackhole blackhole) + { + for (int i = 0; i < rowsPerSegment; i++) { + InputRow row = nestedParser.parseBatch(ByteBuffer.wrap(protoInputs)).get(0); + blackhole.consume(row); + } + + } + private byte[] getProtoInputs(String fileName) + { + String filePath = this.getClass().getClassLoader().getResource(fileName).getPath(); + byte[] bytes = null; + try { + File file = new File(filePath); + bytes = new byte[(int) file.length()]; + bytes = Files.toByteArray(file); + } + catch (FileNotFoundException e) { + log.error("Cannot find the file: " + filePath); + e.printStackTrace(); + } + catch (IOException e) { + e.printStackTrace(); + } + return bytes; + } +} diff --git a/benchmarks/src/test/resources/ProtoFile b/benchmarks/src/test/resources/ProtoFile new file mode 100644 index 000000000000..ab5847cac615 --- /dev/null +++ b/benchmarks/src/test/resources/ProtoFile @@ -0,0 +1,4 @@ +ç$2012-07-12T09:30:00.000Z è$(2 description=¤p parseBatch(ByteBuffer input) parser = parseSpec.makeParser(); initDescriptor(); } - String json; - try { - DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input)); - json = JsonFormat.printer().print(message); - } - catch (InvalidProtocolBufferException e) { - throw new ParseException(e, "Protobuf message could not be parsed"); + Map record; + + if (parseSpec instanceof JSONParseSpec && ((JSONParseSpec) parseSpec).getFlattenSpec().getFields().isEmpty()) { + try { + DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input)); + record = CollectionUtils.mapKeys(message.getAllFields(), k -> k.getJsonName()); + } + catch (InvalidProtocolBufferException ex) { + throw new ParseException(ex, "Protobuf message could not be parsed"); + } + } else { + try { + DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input)); + String json = JsonFormat.printer().print(message); + record = parser.parseToMap(json); + } + catch (InvalidProtocolBufferException e) { + throw new ParseException(e, "Protobuf message could not be parsed"); + } } - Map record = parser.parseToMap(json); final List dimensions; if (!this.dimensions.isEmpty()) { dimensions = this.dimensions; diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java index 5cd26af43b15..3171cc577d5f 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java @@ -52,6 +52,7 @@ public class ProtobufInputRowParserTest public ExpectedException expectedException = ExpectedException.none(); private ParseSpec parseSpec; + private ParseSpec flatParseSpec; @Before public void setUp() @@ -76,6 +77,19 @@ public void setUp() null ); + flatParseSpec = new JSONParseSpec( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(Lists.newArrayList( + new StringDimensionSchema("event"), + new StringDimensionSchema("id"), + new StringDimensionSchema("someOtherId"), + new StringDimensionSchema("isValid") + ), null, null), + + null, + null, + null + ); } @Test @@ -126,7 +140,7 @@ public void testSingleDescriptorNoMessageType() } @Test - public void testParse() throws Exception + public void testParseNestedData() throws Exception { //configure parser with desc file ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "ProtoTestEvent"); @@ -158,7 +172,6 @@ public void testParse() throws Exception event.writeTo(out); InputRow row = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0); - System.out.println(row); Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); @@ -177,6 +190,45 @@ public void testParse() throws Exception Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0); } + @Test + public void testParseFlatData() throws Exception + { + //configure parser with desc file + ProtobufInputRowParser parser = new ProtobufInputRowParser(flatParseSpec, "prototest.desc", "ProtoTestEvent"); + + //create binary of proto test event + DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); + ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder() + .setDescription("description") + .setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE) + .setId(4711L) + .setIsValid(true) + .setSomeOtherId(4712) + .setTimestamp(dateTime.toString()) + .setSomeFloatColumn(47.11F) + .setSomeIntColumn(815) + .setSomeLongColumn(816L) + .build(); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + event.writeTo(out); + + InputRow row = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0); + System.out.println(row); + + Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); + + assertDimensionEquals(row, "id", "4711"); + assertDimensionEquals(row, "isValid", "true"); + assertDimensionEquals(row, "someOtherId", "4712"); + assertDimensionEquals(row, "description", "description"); + + + Assert.assertEquals(47.11F, row.getMetric("someFloatColumn").floatValue(), 0.0); + Assert.assertEquals(815.0F, row.getMetric("someIntColumn").floatValue(), 0.0); + Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0); + } + @Test public void testDisableJavaScript() {