diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index d431fead16e1..95cd8233c279 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -223,6 +223,41 @@ The Parquet `inputFormat` has the following components: |flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Parquet file. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) | | binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) | +### Thrift Stream + +> You need to include the [`druid-thrift-extensions`](../development/extensions-contrib/thrift.md) as an extension to use the Thrift Stream input format. + +The `inputFormat` to load data of Thrift format in stream ingestion. An example is: +```json +"ioConfig": { + "inputFormat": { + "type": "thrift", + "flattenSpec": { + "useFieldDiscovery": true, + "jarPath": "book.jar", + "thriftClass": "org.apache.druid.data.input.thrift.Book", + "fields": [ + { + "type": "path", + "name": "someRecord_subInt", + "expr": "$.someRecord.subInt" + } + ] + }, + "binaryAsString": false + }, + ... +} +``` + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +|type| String| This should be set to `thrift` to read Thrift serialized data| yes | +|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Thrift record. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) | +| binaryAsString | Boolean | Specifies if the bytes Thrift column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) | +| thriftJar | String | path of Thrift jar, if not provided, it will try to find the Thrift class in classpath.| no | +| thriftClass | String | classname of Thrift | yes | + ### Avro Stream > You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro Stream input format. diff --git a/extensions-contrib/thrift-extensions/pom.xml b/extensions-contrib/thrift-extensions/pom.xml index 07b8edb3939c..324b2c244066 100644 --- a/extensions-contrib/thrift-extensions/pom.xml +++ b/extensions-contrib/thrift-extensions/pom.xml @@ -141,6 +141,36 @@ hamcrest-core test + + com.google.code.findbugs + jsr305 + 2.0.1 + provided + + + commons-io + commons-io + 2.9.0 + provided + + + joda-time + joda-time + 2.10.5 + provided + + + com.fasterxml.jackson.core + jackson-core + 2.10.2 + provided + + + org.apache.druid + druid-processing + 0.22.0-SNAPSHOT + provided + diff --git a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftExtensionsModule.java b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftExtensionsModule.java index bea2a913e39e..4ffba3a7e241 100644 --- a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftExtensionsModule.java +++ b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftExtensionsModule.java @@ -37,7 +37,8 @@ public List getJacksonModules() return Collections.singletonList( new SimpleModule("ThriftInputRowParserModule") .registerSubtypes( - new NamedType(ThriftInputRowParser.class, "thrift") + new NamedType(ThriftInputRowParser.class, "thrift"), + new NamedType(ThriftInputFormat.class, "thrift") ) ); } diff --git a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java new file mode 100644 index 000000000000..d8bf75d86ca3 --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.thrift; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.NestedInputFormat; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Objects; + +public class ThriftInputFormat extends NestedInputFormat +{ + private final String jarPath; + private final String thriftClassName; + + @JsonCreator + public ThriftInputFormat( + @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec, + @JsonProperty("thriftJar") String jarPath, + @JsonProperty("thriftClass") String thriftClassName + ) + { + super(flattenSpec); + this.jarPath = jarPath; + this.thriftClassName = thriftClassName; + Preconditions.checkNotNull(thriftClassName, "thrift class name"); + } + + @Override + public boolean isSplittable() + { + return false; + } + + @JsonProperty + public String getThriftJar() + { + return jarPath; + } + + @JsonProperty + public String getThriftClass() + { + return thriftClassName; + } + + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + return new ThriftReader( + inputRowSchema, + source, + jarPath, + thriftClassName, + getFlattenSpec() + ); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ThriftInputFormat that = (ThriftInputFormat) o; + return Objects.equals(getFlattenSpec(), that.getFlattenSpec()) && + Objects.equals(jarPath, that.jarPath) && + Objects.equals(thriftClassName, that.thriftClassName); + } + + @Override + public int hashCode() + { + return Objects.hash(jarPath, thriftClassName, getFlattenSpec()); + } + +} diff --git a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java new file mode 100644 index 000000000000..657f89bca73d --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.thrift; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Iterators; +import org.apache.commons.io.IOUtils; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.common.parsers.ObjectFlattener; +import org.apache.druid.java.util.common.parsers.ObjectFlatteners; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.utils.CollectionUtils; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.List; +import java.util.Map; + +public class ThriftReader extends IntermediateRowParsingReader +{ + private final InputEntity source; + private final String jarPath; + private final String thriftClassName; + private volatile Class thriftClass = null; + private final ObjectMapper objectMapper; + private final ObjectFlattener flattener; + private final InputRowSchema inputRowSchema; + + + ThriftReader( + InputRowSchema inputRowSchema, + InputEntity source, + String jarPath, + String thriftClassName, + JSONPathSpec flattenSpec + ) + { + this.inputRowSchema = inputRowSchema; + this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(true)); + this.source = source; + this.jarPath = jarPath; + this.thriftClassName = thriftClassName; + this.objectMapper = new ObjectMapper(); + if (thriftClass == null) { + try { + thriftClass = getThriftClass(); + } + catch (IOException e) { + throw new IAE(e, "failed to load jar [%s]", jarPath); + } + catch (ClassNotFoundException e) { + throw new IAE(e, "class [%s] not found in jar", thriftClassName); + } + catch (InstantiationException | IllegalAccessException e) { + throw new IAE(e, "instantiation thrift instance failed"); + } + } + } + + public Class getThriftClass() + throws IOException, ClassNotFoundException, IllegalAccessException, InstantiationException + { + final Class thrift; + if (jarPath != null) { + File jar = new File(jarPath); + URLClassLoader child = new URLClassLoader( + new URL[] {jar.toURI().toURL()}, + this.getClass().getClassLoader() + ); + thrift = (Class) Class.forName(thriftClassName, true, child); + } else { + thrift = (Class) Class.forName(thriftClassName); + } + thrift.newInstance(); + return thrift; + } + + @Override + protected CloseableIterator intermediateRowIterator() throws IOException + { + final String json; + try { + final byte[] bytes = IOUtils.toByteArray(source.open()); + TBase o = thriftClass.newInstance(); + ThriftDeserialization.detectAndDeserialize(bytes, o); + json = ThriftDeserialization.SERIALIZER_SIMPLE_JSON.get().toString(o); + } + catch (IllegalAccessException | InstantiationException | TException e) { + throw new IAE("some thing wrong with your thrift?"); + } + + return CloseableIterators.withEmptyBaggage( + Iterators.singletonIterator(json) + ); + } + + @Override + protected List parseInputRows(String intermediateRow) throws IOException, ParseException + { + final List inputRows; + JsonParser parser = new JsonFactory().createParser(intermediateRow); + final MappingIterator delegate = this.objectMapper.readValues(parser, JsonNode.class); + inputRows = FluentIterable.from(() -> delegate) + .transform(jsonNode -> MapInputRowParser.parse(inputRowSchema, flattener.flatten(jsonNode))) + .toList(); + if (CollectionUtils.isNullOrEmpty(inputRows)) { + throw new ParseException("Unable to parse [%s] as the intermediateRow resulted in empty input row", intermediateRow); + } + return inputRows; + } + + @Override + protected List> toMap(String intermediateRow) throws IOException + { + JsonParser parser = new JsonFactory().createParser(intermediateRow); + final MappingIterator delegate = objectMapper.readValues(parser, Map.class); + return FluentIterable.from(() -> delegate) + .transform(map -> (Map) map) + .toList(); + } +} diff --git a/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputFormatTest.java b/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputFormatTest.java new file mode 100644 index 000000000000..2a9d0f2bf839 --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputFormatTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.thrift; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.NestedInputFormat; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TJSONProtocol; +import org.joda.time.DateTime; +import org.joda.time.chrono.ISOChronology; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.Collections; + +import static org.apache.druid.data.input.thrift.ThriftReaderTest.verifyFlattenData; +import static org.apache.druid.data.input.thrift.ThriftReaderTest.verifyNestedData; + +public class ThriftInputFormatTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private InputRowSchema inputRowSchema; + private InputRowSchema flattenInputRowSchema; + private JSONPathSpec flattenSpec; + private DateTime dateTime; + private String date; + + private final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Before + public void setUp() + { + TimestampSpec timestampSpec = new TimestampSpec("date", "auto", null); + DimensionsSpec dimensionsSpec = new DimensionsSpec(Lists.newArrayList( + new StringDimensionSchema("title"), + new StringDimensionSchema("lastName") + ), null, null); + DimensionsSpec flattenDimensionsSpec = new DimensionsSpec(Collections.singletonList( + new StringDimensionSchema("title") + ), null, null); + flattenSpec = new JSONPathSpec( + true, + Lists.newArrayList( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "title", "title"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "lastName", "$.author.lastName") + ) + ); + + inputRowSchema = new InputRowSchema(timestampSpec, dimensionsSpec, null); + flattenInputRowSchema = new InputRowSchema(timestampSpec, flattenDimensionsSpec, null); + dateTime = new DateTime(2016, 8, 29, 0, 0, ISOChronology.getInstanceUTC()); + date = "2016-08-29"; + + for (Module jacksonModule : new ThriftExtensionsModule().getJacksonModules()) { + jsonMapper.registerModule(jacksonModule); + } + } + + @Test + public void testSerde() throws IOException + { + ThriftInputFormat thriftInputFormat = new ThriftInputFormat(flattenSpec, "example/book.jar", "org.apache.druid.data.input.thrift.Book"); + + NestedInputFormat thriftInputFormat2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(thriftInputFormat), + NestedInputFormat.class + ); + + Assert.assertEquals(thriftInputFormat, thriftInputFormat2); + } + + + @Test + public void testParseNestedData() throws Exception + { + Book book = new Book().setDate(date).setPrice(19.9).setTitle("title") + .setAuthor(new Author().setFirstName("first").setLastName("last")); + TSerializer serializer; + byte[] bytes; + serializer = new TSerializer(new TJSONProtocol.Factory()); + bytes = serializer.serialize(book); + final ByteEntity entity = new ByteEntity(bytes); + + ThriftInputFormat thriftInputFormat = new ThriftInputFormat(flattenSpec, "example/book.jar", "org.apache.druid.data.input.thrift.Book"); + InputRow row = thriftInputFormat.createReader(inputRowSchema, entity, null).read().next(); + + verifyNestedData(row, this.dateTime); + } + + @Test + public void testParseFlatData() throws Exception + { + Book book = new Book().setDate(date).setPrice(19.9).setTitle("title"); + TSerializer serializer; + byte[] bytes; + serializer = new TSerializer(new TJSONProtocol.Factory()); + bytes = serializer.serialize(book); + final ByteEntity entity = new ByteEntity(bytes); + + ThriftInputFormat thriftInputFormat = new ThriftInputFormat(null, "example/book.jar", "org.apache.druid.data.input.thrift.Book"); + InputRow row = thriftInputFormat.createReader(flattenInputRowSchema, entity, null).read().next(); + + verifyFlattenData(row, this.dateTime); + } +} diff --git a/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftReaderTest.java b/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftReaderTest.java new file mode 100644 index 000000000000..a3b879b4507d --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftReaderTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.thrift; + +import com.google.common.collect.Lists; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TJSONProtocol; +import org.joda.time.DateTime; +import org.joda.time.chrono.ISOChronology; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Collections; +import java.util.List; + +public class ThriftReaderTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private InputRowSchema inputRowSchema; + private InputRowSchema flattenInputRowSchema; + private JSONPathSpec flattenSpec; + private DateTime dateTime; + private String date; + + @Before + public void setUp() + { + TimestampSpec timestampSpec = new TimestampSpec("date", "auto", null); + DimensionsSpec dimensionsSpec = new DimensionsSpec(Lists.newArrayList( + new StringDimensionSchema("title"), + new StringDimensionSchema("lastName") + ), null, null); + DimensionsSpec flattenDimensionsSpec = new DimensionsSpec(Collections.singletonList( + new StringDimensionSchema("title") + ), null, null); + flattenSpec = new JSONPathSpec( + true, + Lists.newArrayList( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "title", "title"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "lastName", "$.author.lastName") + ) + ); + + inputRowSchema = new InputRowSchema(timestampSpec, dimensionsSpec, null); + flattenInputRowSchema = new InputRowSchema(timestampSpec, flattenDimensionsSpec, null); + dateTime = new DateTime(2016, 8, 29, 0, 0, ISOChronology.getInstanceUTC()); + date = "2016-08-29"; + } + + @Test + public void testParseNestedData() throws Exception + { + Book book = new Book().setDate(date).setPrice(19.9).setTitle("title") + .setAuthor(new Author().setFirstName("first").setLastName("last")); + TSerializer serializer; + byte[] bytes; + serializer = new TSerializer(new TJSONProtocol.Factory()); + bytes = serializer.serialize(book); + final ByteEntity entity = new ByteEntity(bytes); + + ThriftReader reader = new ThriftReader(inputRowSchema, entity, "example/book.jar", "org.apache.druid.data.input.thrift.Book", flattenSpec); + InputRow row = reader.read().next(); + + verifyNestedData(row, this.dateTime); + } + + @Test + public void testParseFlatData() throws Exception + { + Book book = new Book().setDate(date).setPrice(19.9).setTitle("title"); + TSerializer serializer; + byte[] bytes; + serializer = new TSerializer(new TJSONProtocol.Factory()); + bytes = serializer.serialize(book); + final ByteEntity entity = new ByteEntity(bytes); + + ThriftReader reader = new ThriftReader(flattenInputRowSchema, entity, "example/book.jar", "org.apache.druid.data.input.thrift.Book", JSONPathSpec.DEFAULT); + InputRow row = reader.read().next(); + + verifyFlattenData(row, this.dateTime); + } + + + static void verifyNestedData(InputRow row, DateTime dateTime) + { + Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); + + assertDimensionEquals(row, "title", "title"); + assertDimensionEquals(row, "lastName", "last"); + + Assert.assertEquals(19.9F, row.getMetric("price").floatValue(), 0.0); + } + + static void verifyFlattenData(InputRow row, DateTime dateTime) + { + Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); + + assertDimensionEquals(row, "title", "title"); + + Assert.assertEquals(19.9F, row.getMetric("price").floatValue(), 0.0); + } + + private static void assertDimensionEquals(InputRow row, String dimension, Object expected) + { + List values = row.getDimension(dimension); + Assert.assertEquals(1, values.size()); + Assert.assertEquals(expected, values.get(0)); + } + +}