From 6bf3131c4d730844b32164c743d6ea7ca7b19b84 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Tue, 8 Jun 2021 19:43:41 +0800 Subject: [PATCH 1/7] add inputformat --- .../input/thrift/ThriftExtensionsModule.java | 3 +- .../data/input/thrift/ThriftInputFormat.java | 93 ++++++++++ .../druid/data/input/thrift/ThriftReader.java | 175 ++++++++++++++++++ 3 files changed, 270 insertions(+), 1 deletion(-) create mode 100644 extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java create mode 100644 extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java diff --git a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftExtensionsModule.java b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftExtensionsModule.java index bea2a913e39e..4ffba3a7e241 100644 --- a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftExtensionsModule.java +++ b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftExtensionsModule.java @@ -37,7 +37,8 @@ public List getJacksonModules() return Collections.singletonList( new SimpleModule("ThriftInputRowParserModule") .registerSubtypes( - new NamedType(ThriftInputRowParser.class, "thrift") + new NamedType(ThriftInputRowParser.class, "thrift"), + new NamedType(ThriftInputFormat.class, "thrift") ) ); } diff --git a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java new file mode 100644 index 000000000000..0e54c17e5d9f --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.thrift; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.NestedInputFormat; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Objects; + +public class ThriftInputFormat extends NestedInputFormat +{ + private final String jarPath; + private final String thriftClassName; + + @JsonCreator + public ThriftInputFormat( + @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec, + @JsonProperty("thriftJar") String jarPath, + @JsonProperty("thriftClass") String thriftClassName + ) + { + super(flattenSpec); + this.jarPath = jarPath; + this.thriftClassName = thriftClassName; + Preconditions.checkNotNull(thriftClassName, "thrift class name"); + } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + return new ThriftReader( + inputRowSchema, + source, + jarPath, + thriftClassName, + getFlattenSpec() + ); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ThriftInputFormat that = (ThriftInputFormat) o; + return Objects.equals(getFlattenSpec(), that.getFlattenSpec()) && + Objects.equals(jarPath, that.jarPath) && + Objects.equals(thriftClassName, that.thriftClassName); + } + + @Override + public int hashCode() + { + return Objects.hash(jarPath, thriftClassName, getFlattenSpec()); + } + +} diff --git a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java new file mode 100644 index 000000000000..41b0b389d0d4 --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.thrift; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterators; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.commons.io.IOUtils; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.common.parsers.ObjectFlattener; +import org.apache.druid.java.util.common.parsers.ObjectFlatteners; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.java.util.common.parsers.Parser; +import org.apache.druid.utils.CollectionUtils; +import org.apache.hadoop.io.BytesWritable; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class ThriftReader extends IntermediateRowParsingReader +{ + private final InputRowSchema inputRowSchema; + private final InputEntity source; + private final JSONPathSpec flattenSpec; + private final ObjectFlattener recordFlattener; + private final String jarPath; + private final String thriftClassName; + private Parser parser; + private volatile Class thriftClass = null; + + ThriftReader( + InputRowSchema inputRowSchema, + InputEntity source, + String jarPath, + String thriftClassName, + JSONPathSpec flattenSpec + ) + { + if (flattenSpec == null) { + this.inputRowSchema = new ProtobufInputRowSchema(inputRowSchema); + this.recordFlattener = null; + } else { + this.inputRowSchema = inputRowSchema; + this.recordFlattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(true)); + } + + this.source = source; + this.jarPath = jarPath; + this.thriftClassName = thriftClassName; + this.flattenSpec = flattenSpec; + if (thriftClass == null) { + try { + thriftClass = getThriftClass(); + } + catch (IOException e) { + e.printStackTrace(); + } + catch (ClassNotFoundException e) { + e.printStackTrace(); + } + catch (IllegalAccessException e) { + e.printStackTrace(); + } + catch (InstantiationException e) { + e.printStackTrace(); + } + } + } + + public Class getThriftClass() + throws IOException, ClassNotFoundException, IllegalAccessException, InstantiationException + { + final Class thrift; + if (jarPath != null) { + File jar = new File(jarPath); + URLClassLoader child = new URLClassLoader( + new URL[] {jar.toURI().toURL()}, + this.getClass().getClassLoader() + ); + thrift = (Class) Class.forName(thriftClassName, true, child); + } else { + thrift = (Class) Class.forName(thriftClassName); + } + thrift.newInstance(); + return thrift; + } + + @Override + protected CloseableIterator intermediateRowIterator() throws IOException + { + final String json; + try { + final byte[] bytes = IOUtils.toByteArray(source.open()); + TBase o = thriftClass.newInstance(); + ThriftDeserialization.detectAndDeserialize(bytes, o); + json = ThriftDeserialization.SERIALIZER_SIMPLE_JSON.get().toString(o); + } + catch (IllegalAccessException | InstantiationException | TException e) { + throw new IAE("some thing wrong with your thrift?"); + } + + Map record = parser.parseToMap(json); + ThriftDeserialization.detectAndDeserialize(bytes, o); + return CloseableIterators.withEmptyBaggage( + Iterators.singletonIterator(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open())))) + ); + } + + @Override + protected List parseInputRows(DynamicMessage intermediateRow) throws ParseException, JsonProcessingException + { + Map record; + + if (flattenSpec == null) { + try { + record = CollectionUtils.mapKeys(intermediateRow.getAllFields(), k -> k.getJsonName()); + } catch (Exception ex) { + throw new ParseException(ex, "Protobuf message could not be parsed"); + } + } else { + try { + String json = JsonFormat.printer().print(intermediateRow); + JsonNode document = new ObjectMapper().readValue(json, JsonNode.class); + record = recordFlattener.flatten(document); + } catch (InvalidProtocolBufferException e) { + throw new ParseException(e, "Protobuf message could not be parsed"); + } + } + + return Collections.singletonList(MapInputRowParser.parse(inputRowSchema, record)); + } + + @Override + protected List> toMap(DynamicMessage intermediateRow) throws JsonProcessingException, InvalidProtocolBufferException + { + return Collections.singletonList(new ObjectMapper().readValue(JsonFormat.printer().print(intermediateRow), Map.class)); + } +} From a1980af4f64ad72ea72093779d11431705c9b068 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Fri, 11 Jun 2021 10:27:26 +0800 Subject: [PATCH 2/7] bug fixed --- .../druid/data/input/thrift/ThriftReader.java | 89 +++++++++---------- 1 file changed, 44 insertions(+), 45 deletions(-) diff --git a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java index 41b0b389d0d4..33419dc66e37 100644 --- a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java +++ b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java @@ -19,12 +19,14 @@ package org.apache.druid.data.input.thrift; -import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.MappingIterator; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.FluentIterable; import com.google.common.collect.Iterators; -import com.google.protobuf.DynamicMessage; -import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.io.IOUtils; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputRow; @@ -39,9 +41,7 @@ import org.apache.druid.java.util.common.parsers.ObjectFlattener; import org.apache.druid.java.util.common.parsers.ObjectFlatteners; import org.apache.druid.java.util.common.parsers.ParseException; -import org.apache.druid.java.util.common.parsers.Parser; import org.apache.druid.utils.CollectionUtils; -import org.apache.hadoop.io.BytesWritable; import org.apache.thrift.TBase; import org.apache.thrift.TException; @@ -49,21 +49,19 @@ import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; -import java.nio.ByteBuffer; -import java.util.Collections; import java.util.List; import java.util.Map; -public class ThriftReader extends IntermediateRowParsingReader +public class ThriftReader extends IntermediateRowParsingReader { - private final InputRowSchema inputRowSchema; private final InputEntity source; - private final JSONPathSpec flattenSpec; - private final ObjectFlattener recordFlattener; private final String jarPath; private final String thriftClassName; - private Parser parser; private volatile Class thriftClass = null; + private final ObjectMapper objectMapper; + private final ObjectFlattener flattener; + private final InputRowSchema inputRowSchema; + ThriftReader( InputRowSchema inputRowSchema, @@ -73,18 +71,12 @@ public class ThriftReader extends IntermediateRowParsingReader JSONPathSpec flattenSpec ) { - if (flattenSpec == null) { - this.inputRowSchema = new ProtobufInputRowSchema(inputRowSchema); - this.recordFlattener = null; - } else { - this.inputRowSchema = inputRowSchema; - this.recordFlattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(true)); - } - + this.inputRowSchema = inputRowSchema; + this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(true)); this.source = source; this.jarPath = jarPath; this.thriftClassName = thriftClassName; - this.flattenSpec = flattenSpec; + this.objectMapper = new ObjectMapper(); if (thriftClass == null) { try { thriftClass = getThriftClass(); @@ -123,7 +115,7 @@ public Class getThriftClass() } @Override - protected CloseableIterator intermediateRowIterator() throws IOException + protected CloseableIterator intermediateRowIterator() throws IOException { final String json; try { @@ -136,40 +128,47 @@ protected CloseableIterator intermediateRowIterator() throws IOE throw new IAE("some thing wrong with your thrift?"); } - Map record = parser.parseToMap(json); - ThriftDeserialization.detectAndDeserialize(bytes, o); return CloseableIterators.withEmptyBaggage( - Iterators.singletonIterator(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open())))) + Iterators.singletonIterator(json) ); } @Override - protected List parseInputRows(DynamicMessage intermediateRow) throws ParseException, JsonProcessingException + protected List parseInputRows(String intermediateRow) throws IOException, ParseException { - Map record; - - if (flattenSpec == null) { - try { - record = CollectionUtils.mapKeys(intermediateRow.getAllFields(), k -> k.getJsonName()); - } catch (Exception ex) { - throw new ParseException(ex, "Protobuf message could not be parsed"); - } - } else { - try { - String json = JsonFormat.printer().print(intermediateRow); - JsonNode document = new ObjectMapper().readValue(json, JsonNode.class); - record = recordFlattener.flatten(document); - } catch (InvalidProtocolBufferException e) { - throw new ParseException(e, "Protobuf message could not be parsed"); + final List inputRows; + try (JsonParser parser = new JsonFactory().createParser(intermediateRow)) { + final MappingIterator delegate = this.objectMapper.readValues(parser, JsonNode.class); + inputRows = FluentIterable.from(() -> delegate) + .transform(jsonNode -> MapInputRowParser.parse(inputRowSchema, flattener.flatten(jsonNode))) + .toList(); + } + catch (RuntimeException e) { + if (e.getCause() instanceof JsonParseException) { + throw new ParseException(e, "Unable to parse row [%s]", intermediateRow); } + throw e; } - - return Collections.singletonList(MapInputRowParser.parse(inputRowSchema, record)); + if (CollectionUtils.isNullOrEmpty(inputRows)) { + throw new ParseException("Unable to parse [%s] as the intermediateRow resulted in empty input row", intermediateRow); + } + return inputRows; } @Override - protected List> toMap(DynamicMessage intermediateRow) throws JsonProcessingException, InvalidProtocolBufferException + protected List> toMap(String intermediateRow) throws IOException { - return Collections.singletonList(new ObjectMapper().readValue(JsonFormat.printer().print(intermediateRow), Map.class)); + try (JsonParser parser = new JsonFactory().createParser(intermediateRow)) { + final MappingIterator delegate = objectMapper.readValues(parser, Map.class); + return FluentIterable.from(() -> delegate) + .transform(map -> (Map) map) + .toList(); + } + catch (RuntimeException e) { + if (e.getCause() instanceof JsonParseException) { + throw new ParseException(e, "Unable to parse row [%s]", intermediateRow); + } + throw e; + } } } From 27cc64d35e2b1fe279314d1bd63fe01e8ae01697 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Fri, 11 Jun 2021 14:25:52 +0800 Subject: [PATCH 3/7] add test --- .../data/input/thrift/ThriftInputFormat.java | 12 ++ .../druid/data/input/thrift/ThriftReader.java | 11 +- .../input/thrift/ThriftInputFormatTest.java | 140 +++++++++++++++++ .../data/input/thrift/ThriftReaderTest.java | 141 ++++++++++++++++++ 4 files changed, 297 insertions(+), 7 deletions(-) create mode 100644 extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputFormatTest.java create mode 100644 extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftReaderTest.java diff --git a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java index 0e54c17e5d9f..d8bf75d86ca3 100644 --- a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java +++ b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java @@ -57,6 +57,18 @@ public boolean isSplittable() return false; } + @JsonProperty + public String getThriftJar() + { + return jarPath; + } + + @JsonProperty + public String getThriftClass() + { + return thriftClassName; + } + @Override public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { diff --git a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java index 33419dc66e37..d185a09a3314 100644 --- a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java +++ b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java @@ -82,16 +82,13 @@ public class ThriftReader extends IntermediateRowParsingReader thriftClass = getThriftClass(); } catch (IOException e) { - e.printStackTrace(); + throw new IAE(e, "failed to load jar [%s]", jarPath); } catch (ClassNotFoundException e) { - e.printStackTrace(); + throw new IAE(e, "class [%s] not found in jar", thriftClassName); } - catch (IllegalAccessException e) { - e.printStackTrace(); - } - catch (InstantiationException e) { - e.printStackTrace(); + catch (InstantiationException | IllegalAccessException e) { + throw new IAE(e, "instantiation thrift instance failed"); } } } diff --git a/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputFormatTest.java b/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputFormatTest.java new file mode 100644 index 000000000000..2a9d0f2bf839 --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputFormatTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.thrift; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.NestedInputFormat; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TJSONProtocol; +import org.joda.time.DateTime; +import org.joda.time.chrono.ISOChronology; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.Collections; + +import static org.apache.druid.data.input.thrift.ThriftReaderTest.verifyFlattenData; +import static org.apache.druid.data.input.thrift.ThriftReaderTest.verifyNestedData; + +public class ThriftInputFormatTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private InputRowSchema inputRowSchema; + private InputRowSchema flattenInputRowSchema; + private JSONPathSpec flattenSpec; + private DateTime dateTime; + private String date; + + private final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Before + public void setUp() + { + TimestampSpec timestampSpec = new TimestampSpec("date", "auto", null); + DimensionsSpec dimensionsSpec = new DimensionsSpec(Lists.newArrayList( + new StringDimensionSchema("title"), + new StringDimensionSchema("lastName") + ), null, null); + DimensionsSpec flattenDimensionsSpec = new DimensionsSpec(Collections.singletonList( + new StringDimensionSchema("title") + ), null, null); + flattenSpec = new JSONPathSpec( + true, + Lists.newArrayList( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "title", "title"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "lastName", "$.author.lastName") + ) + ); + + inputRowSchema = new InputRowSchema(timestampSpec, dimensionsSpec, null); + flattenInputRowSchema = new InputRowSchema(timestampSpec, flattenDimensionsSpec, null); + dateTime = new DateTime(2016, 8, 29, 0, 0, ISOChronology.getInstanceUTC()); + date = "2016-08-29"; + + for (Module jacksonModule : new ThriftExtensionsModule().getJacksonModules()) { + jsonMapper.registerModule(jacksonModule); + } + } + + @Test + public void testSerde() throws IOException + { + ThriftInputFormat thriftInputFormat = new ThriftInputFormat(flattenSpec, "example/book.jar", "org.apache.druid.data.input.thrift.Book"); + + NestedInputFormat thriftInputFormat2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(thriftInputFormat), + NestedInputFormat.class + ); + + Assert.assertEquals(thriftInputFormat, thriftInputFormat2); + } + + + @Test + public void testParseNestedData() throws Exception + { + Book book = new Book().setDate(date).setPrice(19.9).setTitle("title") + .setAuthor(new Author().setFirstName("first").setLastName("last")); + TSerializer serializer; + byte[] bytes; + serializer = new TSerializer(new TJSONProtocol.Factory()); + bytes = serializer.serialize(book); + final ByteEntity entity = new ByteEntity(bytes); + + ThriftInputFormat thriftInputFormat = new ThriftInputFormat(flattenSpec, "example/book.jar", "org.apache.druid.data.input.thrift.Book"); + InputRow row = thriftInputFormat.createReader(inputRowSchema, entity, null).read().next(); + + verifyNestedData(row, this.dateTime); + } + + @Test + public void testParseFlatData() throws Exception + { + Book book = new Book().setDate(date).setPrice(19.9).setTitle("title"); + TSerializer serializer; + byte[] bytes; + serializer = new TSerializer(new TJSONProtocol.Factory()); + bytes = serializer.serialize(book); + final ByteEntity entity = new ByteEntity(bytes); + + ThriftInputFormat thriftInputFormat = new ThriftInputFormat(null, "example/book.jar", "org.apache.druid.data.input.thrift.Book"); + InputRow row = thriftInputFormat.createReader(flattenInputRowSchema, entity, null).read().next(); + + verifyFlattenData(row, this.dateTime); + } +} diff --git a/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftReaderTest.java b/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftReaderTest.java new file mode 100644 index 000000000000..a3b879b4507d --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftReaderTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.thrift; + +import com.google.common.collect.Lists; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TJSONProtocol; +import org.joda.time.DateTime; +import org.joda.time.chrono.ISOChronology; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Collections; +import java.util.List; + +public class ThriftReaderTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private InputRowSchema inputRowSchema; + private InputRowSchema flattenInputRowSchema; + private JSONPathSpec flattenSpec; + private DateTime dateTime; + private String date; + + @Before + public void setUp() + { + TimestampSpec timestampSpec = new TimestampSpec("date", "auto", null); + DimensionsSpec dimensionsSpec = new DimensionsSpec(Lists.newArrayList( + new StringDimensionSchema("title"), + new StringDimensionSchema("lastName") + ), null, null); + DimensionsSpec flattenDimensionsSpec = new DimensionsSpec(Collections.singletonList( + new StringDimensionSchema("title") + ), null, null); + flattenSpec = new JSONPathSpec( + true, + Lists.newArrayList( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "title", "title"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "lastName", "$.author.lastName") + ) + ); + + inputRowSchema = new InputRowSchema(timestampSpec, dimensionsSpec, null); + flattenInputRowSchema = new InputRowSchema(timestampSpec, flattenDimensionsSpec, null); + dateTime = new DateTime(2016, 8, 29, 0, 0, ISOChronology.getInstanceUTC()); + date = "2016-08-29"; + } + + @Test + public void testParseNestedData() throws Exception + { + Book book = new Book().setDate(date).setPrice(19.9).setTitle("title") + .setAuthor(new Author().setFirstName("first").setLastName("last")); + TSerializer serializer; + byte[] bytes; + serializer = new TSerializer(new TJSONProtocol.Factory()); + bytes = serializer.serialize(book); + final ByteEntity entity = new ByteEntity(bytes); + + ThriftReader reader = new ThriftReader(inputRowSchema, entity, "example/book.jar", "org.apache.druid.data.input.thrift.Book", flattenSpec); + InputRow row = reader.read().next(); + + verifyNestedData(row, this.dateTime); + } + + @Test + public void testParseFlatData() throws Exception + { + Book book = new Book().setDate(date).setPrice(19.9).setTitle("title"); + TSerializer serializer; + byte[] bytes; + serializer = new TSerializer(new TJSONProtocol.Factory()); + bytes = serializer.serialize(book); + final ByteEntity entity = new ByteEntity(bytes); + + ThriftReader reader = new ThriftReader(flattenInputRowSchema, entity, "example/book.jar", "org.apache.druid.data.input.thrift.Book", JSONPathSpec.DEFAULT); + InputRow row = reader.read().next(); + + verifyFlattenData(row, this.dateTime); + } + + + static void verifyNestedData(InputRow row, DateTime dateTime) + { + Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); + + assertDimensionEquals(row, "title", "title"); + assertDimensionEquals(row, "lastName", "last"); + + Assert.assertEquals(19.9F, row.getMetric("price").floatValue(), 0.0); + } + + static void verifyFlattenData(InputRow row, DateTime dateTime) + { + Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); + + assertDimensionEquals(row, "title", "title"); + + Assert.assertEquals(19.9F, row.getMetric("price").floatValue(), 0.0); + } + + private static void assertDimensionEquals(InputRow row, String dimension, Object expected) + { + List values = row.getDimension(dimension); + Assert.assertEquals(1, values.size()); + Assert.assertEquals(expected, values.get(0)); + } + +} From 830d561311cbe2eecbeabaa042035360fc3658d6 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sun, 13 Jun 2021 18:16:33 +0800 Subject: [PATCH 4/7] fix pom --- extensions-contrib/thrift-extensions/pom.xml | 30 ++++++++++++++++ .../druid/data/input/thrift/ThriftReader.java | 35 ++++++------------- 2 files changed, 40 insertions(+), 25 deletions(-) diff --git a/extensions-contrib/thrift-extensions/pom.xml b/extensions-contrib/thrift-extensions/pom.xml index 07b8edb3939c..324b2c244066 100644 --- a/extensions-contrib/thrift-extensions/pom.xml +++ b/extensions-contrib/thrift-extensions/pom.xml @@ -141,6 +141,36 @@ hamcrest-core test + + com.google.code.findbugs + jsr305 + 2.0.1 + provided + + + commons-io + commons-io + 2.9.0 + provided + + + joda-time + joda-time + 2.10.5 + provided + + + com.fasterxml.jackson.core + jackson-core + 2.10.2 + provided + + + org.apache.druid + druid-processing + 0.22.0-SNAPSHOT + provided + diff --git a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java index d185a09a3314..657f89bca73d 100644 --- a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java +++ b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java @@ -20,7 +20,6 @@ package org.apache.druid.data.input.thrift; import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.MappingIterator; @@ -134,18 +133,11 @@ protected CloseableIterator intermediateRowIterator() throws IOException protected List parseInputRows(String intermediateRow) throws IOException, ParseException { final List inputRows; - try (JsonParser parser = new JsonFactory().createParser(intermediateRow)) { - final MappingIterator delegate = this.objectMapper.readValues(parser, JsonNode.class); - inputRows = FluentIterable.from(() -> delegate) - .transform(jsonNode -> MapInputRowParser.parse(inputRowSchema, flattener.flatten(jsonNode))) - .toList(); - } - catch (RuntimeException e) { - if (e.getCause() instanceof JsonParseException) { - throw new ParseException(e, "Unable to parse row [%s]", intermediateRow); - } - throw e; - } + JsonParser parser = new JsonFactory().createParser(intermediateRow); + final MappingIterator delegate = this.objectMapper.readValues(parser, JsonNode.class); + inputRows = FluentIterable.from(() -> delegate) + .transform(jsonNode -> MapInputRowParser.parse(inputRowSchema, flattener.flatten(jsonNode))) + .toList(); if (CollectionUtils.isNullOrEmpty(inputRows)) { throw new ParseException("Unable to parse [%s] as the intermediateRow resulted in empty input row", intermediateRow); } @@ -155,17 +147,10 @@ protected List parseInputRows(String intermediateRow) throws IOExcepti @Override protected List> toMap(String intermediateRow) throws IOException { - try (JsonParser parser = new JsonFactory().createParser(intermediateRow)) { - final MappingIterator delegate = objectMapper.readValues(parser, Map.class); - return FluentIterable.from(() -> delegate) - .transform(map -> (Map) map) - .toList(); - } - catch (RuntimeException e) { - if (e.getCause() instanceof JsonParseException) { - throw new ParseException(e, "Unable to parse row [%s]", intermediateRow); - } - throw e; - } + JsonParser parser = new JsonFactory().createParser(intermediateRow); + final MappingIterator delegate = objectMapper.readValues(parser, Map.class); + return FluentIterable.from(() -> delegate) + .transform(map -> (Map) map) + .toList(); } } From 4929840673eee1de971d0d64738abf58cc2fe8ea Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 19 Jun 2021 14:53:06 +0800 Subject: [PATCH 5/7] add document for this extention --- docs/ingestion/data-formats.md | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index d431fead16e1..dec25682961c 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -270,6 +270,39 @@ The `inputFormat` to load data of Avro format in stream ingestion. An example is |`avroBytesDecoder`| JSON Object |Specifies how to decode bytes to Avro record. | yes | | binaryAsString | Boolean | Specifies if the bytes Avro column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) | +### Thrift Stream + +> You need to include the [`druid-thrift-extensions`](../development/extensions-contrib/thrift.md) as an extension to use the Thrift Stream input format. + +The `inputFormat` to load data of Thrift format in stream ingestion. An example is: +```json +"ioConfig": { + "inputFormat": { + "type": "thrift", + "flattenSpec": { + "useFieldDiscovery": true, + "fields": [ + { + "type": "path", + "name": "someRecord_subInt", + "expr": "$.someRecord.subInt" + } + ] + }, + "binaryAsString": false + }, + ... +} +``` + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +|type| String| This should be set to `thrift` to read Thrift serialized data| yes | +|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Thrift record. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) | +| binaryAsString | Boolean | Specifies if the bytes Thrift column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) | +| thriftJar | String | path of thrift jar, if not provided, it will try to find the thrift class in classpath. Thrift jar in batch ingestion should be uploaded to HDFS first and configure `jobProperties` with `"tmpjars":"/path/to/your/thrift.jar"` | no | +| thriftClass | String | classname of thrift | yes | + ##### Avro Bytes Decoder If `type` is not included, the avroBytesDecoder defaults to `schema_repo`. From 87ac348f4ce5376c5423f11ef96516f94b848908 Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 19 Jun 2021 14:56:19 +0800 Subject: [PATCH 6/7] document fixed --- docs/ingestion/data-formats.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index dec25682961c..41a7e072e66a 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -281,6 +281,8 @@ The `inputFormat` to load data of Thrift format in stream ingestion. An example "type": "thrift", "flattenSpec": { "useFieldDiscovery": true, + "jarPath": "book.jar", + "thriftClass": "org.apache.druid.data.input.thrift.Book", "fields": [ { "type": "path", @@ -300,8 +302,8 @@ The `inputFormat` to load data of Thrift format in stream ingestion. An example |type| String| This should be set to `thrift` to read Thrift serialized data| yes | |flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Thrift record. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) | | binaryAsString | Boolean | Specifies if the bytes Thrift column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) | -| thriftJar | String | path of thrift jar, if not provided, it will try to find the thrift class in classpath. Thrift jar in batch ingestion should be uploaded to HDFS first and configure `jobProperties` with `"tmpjars":"/path/to/your/thrift.jar"` | no | -| thriftClass | String | classname of thrift | yes | +| thriftJar | String | path of Thrift jar, if not provided, it will try to find the Thrift class in classpath.| no | +| thriftClass | String | classname of Thrift | yes | ##### Avro Bytes Decoder From 030e49ca180ee533b969abb7d27e3aea512f7a4c Mon Sep 17 00:00:00 2001 From: yuanyi Date: Sat, 19 Jun 2021 14:59:25 +0800 Subject: [PATCH 7/7] document fixed --- docs/ingestion/data-formats.md | 68 +++++++++++++++++----------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index 41a7e072e66a..95cd8233c279 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -223,32 +223,19 @@ The Parquet `inputFormat` has the following components: |flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Parquet file. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) | | binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) | -### Avro Stream - -> You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro Stream input format. +### Thrift Stream -> See the [Avro Types](../development/extensions-core/avro.md#avro-types) section for how Avro types are handled in Druid +> You need to include the [`druid-thrift-extensions`](../development/extensions-contrib/thrift.md) as an extension to use the Thrift Stream input format. -The `inputFormat` to load data of Avro format in stream ingestion. An example is: +The `inputFormat` to load data of Thrift format in stream ingestion. An example is: ```json "ioConfig": { "inputFormat": { - "type": "avro_stream", - "avroBytesDecoder": { - "type": "schema_inline", - "schema": { - //your schema goes here, for example - "namespace": "org.apache.druid.data", - "name": "User", - "type": "record", - "fields": [ - { "name": "FullName", "type": "string" }, - { "name": "Country", "type": "string" } - ] - } - }, + "type": "thrift", "flattenSpec": { "useFieldDiscovery": true, + "jarPath": "book.jar", + "thriftClass": "org.apache.druid.data.input.thrift.Book", "fields": [ { "type": "path", @@ -265,24 +252,38 @@ The `inputFormat` to load data of Avro format in stream ingestion. An example is | Field | Type | Description | Required | |-------|------|-------------|----------| -|type| String| This should be set to `avro_stream` to read Avro serialized data| yes | -|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Avro record. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) | -|`avroBytesDecoder`| JSON Object |Specifies how to decode bytes to Avro record. | yes | -| binaryAsString | Boolean | Specifies if the bytes Avro column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) | +|type| String| This should be set to `thrift` to read Thrift serialized data| yes | +|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Thrift record. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) | +| binaryAsString | Boolean | Specifies if the bytes Thrift column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) | +| thriftJar | String | path of Thrift jar, if not provided, it will try to find the Thrift class in classpath.| no | +| thriftClass | String | classname of Thrift | yes | -### Thrift Stream +### Avro Stream -> You need to include the [`druid-thrift-extensions`](../development/extensions-contrib/thrift.md) as an extension to use the Thrift Stream input format. +> You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro Stream input format. -The `inputFormat` to load data of Thrift format in stream ingestion. An example is: +> See the [Avro Types](../development/extensions-core/avro.md#avro-types) section for how Avro types are handled in Druid + +The `inputFormat` to load data of Avro format in stream ingestion. An example is: ```json "ioConfig": { "inputFormat": { - "type": "thrift", + "type": "avro_stream", + "avroBytesDecoder": { + "type": "schema_inline", + "schema": { + //your schema goes here, for example + "namespace": "org.apache.druid.data", + "name": "User", + "type": "record", + "fields": [ + { "name": "FullName", "type": "string" }, + { "name": "Country", "type": "string" } + ] + } + }, "flattenSpec": { "useFieldDiscovery": true, - "jarPath": "book.jar", - "thriftClass": "org.apache.druid.data.input.thrift.Book", "fields": [ { "type": "path", @@ -299,11 +300,10 @@ The `inputFormat` to load data of Thrift format in stream ingestion. An example | Field | Type | Description | Required | |-------|------|-------------|----------| -|type| String| This should be set to `thrift` to read Thrift serialized data| yes | -|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Thrift record. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) | -| binaryAsString | Boolean | Specifies if the bytes Thrift column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) | -| thriftJar | String | path of Thrift jar, if not provided, it will try to find the Thrift class in classpath.| no | -| thriftClass | String | classname of Thrift | yes | +|type| String| This should be set to `avro_stream` to read Avro serialized data| yes | +|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Avro record. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) | +|`avroBytesDecoder`| JSON Object |Specifies how to decode bytes to Avro record. | yes | +| binaryAsString | Boolean | Specifies if the bytes Avro column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) | ##### Avro Bytes Decoder