From 396826119a39fe8efa5bcfcea8274d50925796a8 Mon Sep 17 00:00:00 2001 From: xionghuilin Date: Mon, 8 Jun 2020 10:37:52 +0800 Subject: [PATCH 1/5] optimize for protobuf parsing --- .../benchmark/ProtobufParserBenchmark.java | 171 ++++++++++++++++++ benchmarks/src/test/resources/ProtoFile | 4 + benchmarks/src/test/resources/prototest.desc | Bin 0 -> 639 bytes .../protobuf/ProtobufInputRowParser.java | 27 ++- 4 files changed, 194 insertions(+), 8 deletions(-) create mode 100644 benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java create mode 100644 benchmarks/src/test/resources/ProtoFile create mode 100644 benchmarks/src/test/resources/prototest.desc diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java new file mode 100644 index 000000000000..be7c13cc7f39 --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark; + +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.JSONParseSpec; +import org.apache.druid.data.input.impl.ParseSpec; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.data.input.protobuf.ProtobufInputRowParser; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 10) +@Measurement(iterations = 25) +public class ProtobufParserBenchmark +{ + @Param({"75000"}) + private int rowsPerSegment; + + private static final Logger log = new Logger(ProtobufParserBenchmark.class); + + static { + NullHandling.initializeForTests(); + } + + private ParseSpec nestedParseSpec; + private ProtobufInputRowParser nestedParser; + private ParseSpec flattenParseSpec; + private ProtobufInputRowParser flattenParser; + private byte[] protoInputs; + private String protoFilePath; + + @Setup + public void setup() + { + log.info("SETUP CALLED AT " + +System.currentTimeMillis()); + + nestedParseSpec = new JSONParseSpec( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(Lists.newArrayList( + new StringDimensionSchema("event"), + new StringDimensionSchema("id"), + new StringDimensionSchema("someOtherId"), + new StringDimensionSchema("isValid") + ), null, null), + new JSONPathSpec( + true, + Lists.newArrayList( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "eventType", "eventType"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "foobar", "$.foo.bar"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar0", "$.bar[0].bar") + ) + ), + null + ); + + flattenParseSpec = new JSONParseSpec( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(Lists.newArrayList( + new StringDimensionSchema("event"), + new StringDimensionSchema("id"), + new StringDimensionSchema("someOtherId"), + new StringDimensionSchema("isValid") + ), null, null), + null, + null + ); + + protoFilePath = "ProtoFile"; + protoInputs = getProtoInputs(protoFilePath); + nestedParser = new ProtobufInputRowParser(nestedParseSpec, "prototest.desc", "ProtoTestEvent"); + flattenParser = new ProtobufInputRowParser(flattenParseSpec, "prototest.desc", "ProtoTestEvent"); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void consumeFlattenData(Blackhole blackhole) + { + for (int i = 0; i < rowsPerSegment; i++) { + InputRow row = flattenParser.parseBatch(ByteBuffer.wrap(protoInputs)).get(0); + blackhole.consume(row); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void consumeNestedData(Blackhole blackhole) + { + for (int i = 0; i < rowsPerSegment; i++) { + InputRow row = nestedParser.parseBatch(ByteBuffer.wrap(protoInputs)).get(0); + blackhole.consume(row); + } + + } + private byte[] getProtoInputs(String fileName) + { + String filePath = this.getClass().getClassLoader().getResource(fileName).getPath(); + byte[] bytes = null; + try { + File file = new File(filePath); + bytes = new byte[(int) file.length()]; + bytes = Files.toByteArray(file); + } + catch (FileNotFoundException e) { + log.error("Cannot find the file: " + filePath); + e.printStackTrace(); + } + catch (IOException e) { + e.printStackTrace(); + } + return bytes; + } + + public static void main(String[] args) throws RunnerException + { + Options opt = new OptionsBuilder() + .include(ProtobufParserBenchmark.class.getSimpleName()) + .build(); + new Runner(opt).run(); + } +} diff --git a/benchmarks/src/test/resources/ProtoFile b/benchmarks/src/test/resources/ProtoFile new file mode 100644 index 000000000000..ab5847cac615 --- /dev/null +++ b/benchmarks/src/test/resources/ProtoFile @@ -0,0 +1,4 @@ +ç$2012-07-12T09:30:00.000Z è$(2 description=¤p;yI8fcQ?vceYm1*ieFoF^=hGbRQR zHXEh3P4*GOP>J(bI#;E~YFj1SSM5eDAB#AXF;DS{+FE2Ep*GZYdw`NpVG` z404E33v8Vnq}yC|^(94!Zotv0C!S6>b(V^mbpETSu#eXt8hgU-P2J%s*aK;AyyeG<4w$~xVl;MsRe1riPe q*XZ6ui;Jm8Gdi9zFA|G5@**m!7pK``bD>6yN%y>XFNT6<852J;-NKdt literal 0 HcmV?d00001 diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java index 81ddb4440928..b504b028ad19 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java @@ -100,16 +100,27 @@ public List parseBatch(ByteBuffer input) parser = parseSpec.makeParser(); initDescriptor(); } - String json; - try { - DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input)); - json = JsonFormat.printer().print(message); - } - catch (InvalidProtocolBufferException e) { - throw new ParseException(e, "Protobuf message could not be parsed"); + Map record; + + if (parseSpec instanceof JSONParseSpec && ((JSONParseSpec) parseSpec).getFlattenSpec().getFields().isEmpty()) { + try { + DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input)); + record = CollectionUtils.mapKeys(message.getAllFields(), k -> k.getJsonName()); + } + catch (InvalidProtocolBufferException ex) { + throw new ParseException(ex, "Protobuf message could not be parsed"); + } + } else { + try { + DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input)); + String json = JsonFormat.printer().print(message); + record = parser.parseToMap(json); + } + catch (InvalidProtocolBufferException e) { + throw new ParseException(e, "Protobuf message could not be parsed"); + } } - Map record = parser.parseToMap(json); final List dimensions; if (!this.dimensions.isEmpty()) { dimensions = this.dimensions; From cccd904827ef7e08f3d043b7544ff6ba4d779a12 Mon Sep 17 00:00:00 2001 From: xionghuilin Date: Mon, 8 Jun 2020 16:26:53 +0800 Subject: [PATCH 2/5] fix import error and maven dependency --- benchmarks/pom.xml | 6 ++++++ .../org/apache/druid/benchmark/ProtobufParserBenchmark.java | 2 ++ .../druid/data/input/protobuf/ProtobufInputRowParser.java | 2 ++ 3 files changed, 10 insertions(+) diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml index 4c640a62fe18..466ed02476e6 100644 --- a/benchmarks/pom.xml +++ b/benchmarks/pom.xml @@ -169,6 +169,12 @@ junit test + + org.apache.druid.extensions + druid-protobuf-extensions + 0.19.0-SNAPSHOT + test + diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java index be7c13cc7f39..1c250bdb902c 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java @@ -99,6 +99,7 @@ public void setup() new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar0", "$.bar[0].bar") ) ), + null, null ); @@ -111,6 +112,7 @@ public void setup() new StringDimensionSchema("isValid") ), null, null), null, + null, null ); diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java index b504b028ad19..9c04265e6a5f 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java @@ -35,10 +35,12 @@ import org.apache.druid.data.input.ByteBufferInputRowParser; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.impl.JSONParseSpec; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.Parser; +import org.apache.druid.utils.CollectionUtils; import java.io.IOException; import java.io.InputStream; From a7e5e23f39cf2d2306b7647bca28bac04423bb34 Mon Sep 17 00:00:00 2001 From: xionghuilin Date: Mon, 8 Jun 2020 20:04:51 +0800 Subject: [PATCH 3/5] add unit test in protobufInputrowParserTest for flatten data --- .../protobuf/ProtobufInputRowParserTest.java | 55 ++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java index 5cd26af43b15..184d21d96cac 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java @@ -52,6 +52,7 @@ public class ProtobufInputRowParserTest public ExpectedException expectedException = ExpectedException.none(); private ParseSpec parseSpec; + private ParseSpec flattenParseSpec; @Before public void setUp() @@ -76,6 +77,19 @@ public void setUp() null ); + flattenParseSpec = new JSONParseSpec( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(Lists.newArrayList( + new StringDimensionSchema("event"), + new StringDimensionSchema("id"), + new StringDimensionSchema("someOtherId"), + new StringDimensionSchema("isValid") + ), null, null), + + null, + null, + null + ); } @Test @@ -126,7 +140,7 @@ public void testSingleDescriptorNoMessageType() } @Test - public void testParse() throws Exception + public void testParseNestedData() throws Exception { //configure parser with desc file ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "ProtoTestEvent"); @@ -177,6 +191,45 @@ public void testParse() throws Exception Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0); } + @Test + public void testParseFlattenData() throws Exception + { + //configure parser with desc file + ProtobufInputRowParser parser = new ProtobufInputRowParser(flattenParseSpec, "prototest.desc", "ProtoTestEvent"); + + //create binary of proto test event + DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); + ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder() + .setDescription("description") + .setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE) + .setId(4711L) + .setIsValid(true) + .setSomeOtherId(4712) + .setTimestamp(dateTime.toString()) + .setSomeFloatColumn(47.11F) + .setSomeIntColumn(815) + .setSomeLongColumn(816L) + .build(); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + event.writeTo(out); + + InputRow row = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0); + System.out.println(row); + + Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); + + assertDimensionEquals(row, "id", "4711"); + assertDimensionEquals(row, "isValid", "true"); + assertDimensionEquals(row, "someOtherId", "4712"); + assertDimensionEquals(row, "description", "description"); + + + Assert.assertEquals(47.11F, row.getMetric("someFloatColumn").floatValue(), 0.0); + Assert.assertEquals(815.0F, row.getMetric("someIntColumn").floatValue(), 0.0); + Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0); + } + @Test public void testDisableJavaScript() { From 95df02f8681cb66b3a0392d8dab64522e6aa4249 Mon Sep 17 00:00:00 2001 From: xionghuilin Date: Tue, 16 Jun 2020 20:51:35 +0800 Subject: [PATCH 4/5] solve code duplication (remove the log and main()) --- .../druid/benchmark/ProtobufParserBenchmark.java | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java index 1c250bdb902c..8fac9e8b4310 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java @@ -45,10 +45,6 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; -import org.openjdk.jmh.runner.Runner; -import org.openjdk.jmh.runner.RunnerException; -import org.openjdk.jmh.runner.options.Options; -import org.openjdk.jmh.runner.options.OptionsBuilder; import java.io.File; import java.io.FileNotFoundException; @@ -81,8 +77,6 @@ public class ProtobufParserBenchmark @Setup public void setup() { - log.info("SETUP CALLED AT " + +System.currentTimeMillis()); - nestedParseSpec = new JSONParseSpec( new TimestampSpec("timestamp", "iso", null), new DimensionsSpec(Lists.newArrayList( @@ -162,12 +156,4 @@ private byte[] getProtoInputs(String fileName) } return bytes; } - - public static void main(String[] args) throws RunnerException - { - Options opt = new OptionsBuilder() - .include(ProtobufParserBenchmark.class.getSimpleName()) - .build(); - new Runner(opt).run(); - } } From 178191f0a45d24256952aeaa461929413e234c91 Mon Sep 17 00:00:00 2001 From: xionghuilin Date: Wed, 24 Jun 2020 18:21:39 +0800 Subject: [PATCH 5/5] rename 'flatten' to 'flat' to make it clearer --- .../druid/benchmark/ProtobufParserBenchmark.java | 12 ++++++------ .../input/protobuf/ProtobufInputRowParserTest.java | 9 ++++----- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java index 8fac9e8b4310..b408076806cf 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java @@ -69,8 +69,8 @@ public class ProtobufParserBenchmark private ParseSpec nestedParseSpec; private ProtobufInputRowParser nestedParser; - private ParseSpec flattenParseSpec; - private ProtobufInputRowParser flattenParser; + private ParseSpec flatParseSpec; + private ProtobufInputRowParser flatParser; private byte[] protoInputs; private String protoFilePath; @@ -97,7 +97,7 @@ public void setup() null ); - flattenParseSpec = new JSONParseSpec( + flatParseSpec = new JSONParseSpec( new TimestampSpec("timestamp", "iso", null), new DimensionsSpec(Lists.newArrayList( new StringDimensionSchema("event"), @@ -113,16 +113,16 @@ public void setup() protoFilePath = "ProtoFile"; protoInputs = getProtoInputs(protoFilePath); nestedParser = new ProtobufInputRowParser(nestedParseSpec, "prototest.desc", "ProtoTestEvent"); - flattenParser = new ProtobufInputRowParser(flattenParseSpec, "prototest.desc", "ProtoTestEvent"); + flatParser = new ProtobufInputRowParser(flatParseSpec, "prototest.desc", "ProtoTestEvent"); } @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MICROSECONDS) - public void consumeFlattenData(Blackhole blackhole) + public void consumeFlatData(Blackhole blackhole) { for (int i = 0; i < rowsPerSegment; i++) { - InputRow row = flattenParser.parseBatch(ByteBuffer.wrap(protoInputs)).get(0); + InputRow row = flatParser.parseBatch(ByteBuffer.wrap(protoInputs)).get(0); blackhole.consume(row); } } diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java index 184d21d96cac..3171cc577d5f 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java @@ -52,7 +52,7 @@ public class ProtobufInputRowParserTest public ExpectedException expectedException = ExpectedException.none(); private ParseSpec parseSpec; - private ParseSpec flattenParseSpec; + private ParseSpec flatParseSpec; @Before public void setUp() @@ -77,7 +77,7 @@ public void setUp() null ); - flattenParseSpec = new JSONParseSpec( + flatParseSpec = new JSONParseSpec( new TimestampSpec("timestamp", "iso", null), new DimensionsSpec(Lists.newArrayList( new StringDimensionSchema("event"), @@ -172,7 +172,6 @@ public void testParseNestedData() throws Exception event.writeTo(out); InputRow row = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0); - System.out.println(row); Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); @@ -192,10 +191,10 @@ public void testParseNestedData() throws Exception } @Test - public void testParseFlattenData() throws Exception + public void testParseFlatData() throws Exception { //configure parser with desc file - ProtobufInputRowParser parser = new ProtobufInputRowParser(flattenParseSpec, "prototest.desc", "ProtoTestEvent"); + ProtobufInputRowParser parser = new ProtobufInputRowParser(flatParseSpec, "prototest.desc", "ProtoTestEvent"); //create binary of proto test event DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());