diff --git a/docs/content/development/extensions-contrib/thrift.md b/docs/content/development/extensions-contrib/thrift.md new file mode 100644 index 000000000000..61a3912d979a --- /dev/null +++ b/docs/content/development/extensions-contrib/thrift.md @@ -0,0 +1,76 @@ +--- +layout: doc_page +--- + +# Thrift + +This extension enables Druid to ingest and understand the Apache Thrift data format. Make sure to [include](../../operations/including-extensions.html) `druid-thrift-extensions` as an extension. + +Notice that for both stream and hadoop parser explained below, the thrift data class file should be included in classpath. + +### Thrift Stream Parser + +This is for streaming/realtime ingestion. + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | This should say `thrift_stream`. | yes | +| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be a timeAndDims parseSpec. | yes | +| tClassName | String | Specifies the class name of thrift object type. | yes | + +For example, using Thrift stream parser with schema repo Thrift bytes decoder: + +```json +"parser" : { + "type" : "thrift_stream", + "tClassName" : "${YOUR_THRIFT_CLASS_NAME}", + "parseSpec" : { + "format": "timeAndDims", + "timestampSpec": , + "dimensionsSpec": + } +} +``` + +### Thrift Hadoop Parser + +This is for batch ingestion using the HadoopDruidIndexer. The `inputFormat` of `inputSpec` in `ioConfig` can be set to `"com.twitter.elephantbird.mapreduce.input.LzoThriftBlockInputFormat"` or other inputFormat depending on your demand. If using InputFormat from elephant-bird, the `elephantbird.class.for.MultiInputFormat` of `jobProperties` in `tuningConfig` should be set to your thrift class name. Make sure to include "io.druid.extensions:druid-thrift-extensions" as an extension. + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | This should say `thrift_hadoop`. | yes | +| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be a timeAndDims parseSpec. | yes | + +For example, using Thrift Hadoop parser with custom reader's schema file: + +```json +{ + "type" : "index_hadoop", + "spec" : { + "dataSchema" : { + "dataSource" : "", + "parser" : { + "type" : "thrift_hadoop", + "parseSpec" : { + "format": "timeAndDims", + "timestampSpec": , + "dimensionsSpec": + } + } + }, + "ioConfig" : { + "type" : "hadoop", + "inputSpec" : { + "type" : "static", + "inputFormat": "com.twitter.elephantbird.mapreduce.input.LzoThriftBlockInputFormat", + "paths" : "" + } + }, + "tuningConfig" : { + "jobProperties" : { + "elephantbird.class.for.MultiInputFormat" : "${YOUR_THRIFT_CLASS_NAME}" + } + } + } +} +``` diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md index 4ca2b19c0c8c..44f9a46b78b2 100644 --- a/docs/content/development/extensions.md +++ b/docs/content/development/extensions.md @@ -58,6 +58,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c |druid-parquet-extensions|Support for data in Apache Parquet data format. Requires druid-avro-extensions to be loaded.|[link](../development/extensions-contrib/parquet.html)| |druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)| |druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)| +|druid-thrift-extensions|Support for data in Apache Thrift data format.|[link](../development/extensions-contrib/thrift.html)| |graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)| |statsd-emitter|StatsD metrics emitter|[link](../development/extensions-contrib/statsd.html)| diff --git a/extensions-contrib/thrift-extensions/pom.xml b/extensions-contrib/thrift-extensions/pom.xml new file mode 100644 index 000000000000..47edce531185 --- /dev/null +++ b/extensions-contrib/thrift-extensions/pom.xml @@ -0,0 +1,136 @@ + + + + + + io.druid + druid + 0.9.3-SNAPSHOT + ../../pom.xml + + 4.0.0 + + io.druid.extensions.contrib + druid-thrift-extensions + druid-thrift-extensions + druid-thrift-extensions + + + + dtrott + http://maven.davidtrott.com/repository + + + + + + mac + + + mac + + + + mac + + + + linux + + + linux + + + + linux + + + + + + 0.6.1 + + ${basedir}/src/main/native/${os.family}/bin/thrift + 4.6 + + + + + + org.apache.thrift.tools + maven-thrift-plugin + 0.1.10 + + ${thrift.exec} + ${basedir}/src/test/thrift + java:private-members,hashcode + + + + thrift-test-sources + generate-test-sources + + testCompile + + + + + + + + + + io.druid + druid-api + ${project.parent.version} + provided + + + org.apache.thrift + libthrift + ${thrift.version} + + + org.apache.hadoop + hadoop-common + ${hadoop.compile.version} + compile + + + com.twitter.elephantbird + elephant-bird-core + ${elephantbird.version} + + + com.twitter.elephantbird + elephant-bird-hadoop-compat + ${elephantbird.version} + + + + junit + junit + test + + + + diff --git a/extensions-contrib/thrift-extensions/src/main/java/io/druid/data/input/ThriftHadoopInputRowParser.java b/extensions-contrib/thrift-extensions/src/main/java/io/druid/data/input/ThriftHadoopInputRowParser.java new file mode 100644 index 000000000000..d7062161006e --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/main/java/io/druid/data/input/ThriftHadoopInputRowParser.java @@ -0,0 +1,75 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.data.input; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.twitter.elephantbird.mapreduce.io.ThriftWritable; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.ParseSpec; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.data.input.thrift.TBaseAsMap; +import org.apache.thrift.TBase; +import org.joda.time.DateTime; + +import java.util.List; + +@JsonTypeName("thrift_hadoop") +public class ThriftHadoopInputRowParser implements InputRowParser> +{ + + private final ParseSpec parseSpec; + private final List dimensions; + + @JsonCreator + public ThriftHadoopInputRowParser( + @JsonProperty("parseSpec") ParseSpec parseSpec + ) + { + this.parseSpec = parseSpec; + this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames(); + } + + @Override + public InputRow parse(ThriftWritable input) + { + TBase tBase = input.get(); + @SuppressWarnings("unchecked") + TBaseAsMap tBaseAsMap = new TBaseAsMap(tBase); + TimestampSpec timestampSpec = parseSpec.getTimestampSpec(); + DateTime dateTime = timestampSpec.extractTimestamp(tBaseAsMap); + return new MapBasedInputRow(dateTime, dimensions, tBaseAsMap); + } + + @JsonProperty + @Override + public ParseSpec getParseSpec() + { + return parseSpec; + } + + @Override + public InputRowParser withParseSpec(ParseSpec parseSpec) + { + return new ThriftHadoopInputRowParser(parseSpec); + } + +} diff --git a/extensions-contrib/thrift-extensions/src/main/java/io/druid/data/input/ThriftStreamInputRowParser.java b/extensions-contrib/thrift-extensions/src/main/java/io/druid/data/input/ThriftStreamInputRowParser.java new file mode 100644 index 000000000000..1c0c6aa18e59 --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/main/java/io/druid/data/input/ThriftStreamInputRowParser.java @@ -0,0 +1,113 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.data.input; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import io.druid.data.input.impl.ParseSpec; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.data.input.thrift.TBaseAsMap; +import io.druid.data.input.thrift.util.ThriftUtils; +import io.druid.java.util.common.logger.Logger; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.joda.time.DateTime; + +import java.nio.ByteBuffer; +import java.util.List; + +@JsonTypeName("thrift_stream") +public class ThriftStreamInputRowParser implements ByteBufferInputRowParser +{ + + private static final Logger log = new Logger(ThriftStreamInputRowParser.class); + + private final ParseSpec parseSpec; + private final List dimensions; + private final String tClassName; + private final Class tClass; + + @JsonCreator + public ThriftStreamInputRowParser( + @JsonProperty("parseSpec") ParseSpec parseSpec, + @JsonProperty("tClassName") String tClassName + ) + { + this.parseSpec = parseSpec; + this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames(); + this.tClassName = tClassName; + try { + this.tClass = Class.forName(tClassName).asSubclass(TBase.class); + } + catch (ClassNotFoundException e) { + log.error(e, "Wrong config on TBase className: %s", tClassName); + throw new RuntimeException(e); + } + } + + @Override + public InputRow parse(ByteBuffer input) + { + byte[] data = input.array(); + + TBase tBase; + try { + tBase = tClass.newInstance(); + } + catch (InstantiationException | IllegalAccessException e) { + log.error(e, "Unexpected Exception, maybe wrong config on TBase className: %s", tClassName); + throw new RuntimeException(e); + } + + try { + ThriftUtils.detectAndDeserialize(data, tBase); + } + catch (TException e) { + log.warn(e, "TException during deserialization with rawLog: %s", new String(data)); + } + + @SuppressWarnings("unchecked") + TBaseAsMap tBaseAsMap = new TBaseAsMap(tBase); + TimestampSpec timestampSpec = parseSpec.getTimestampSpec(); + DateTime dateTime = timestampSpec.extractTimestamp(tBaseAsMap); + return new MapBasedInputRow(dateTime, dimensions, tBaseAsMap); + } + + @JsonProperty + @Override + public ParseSpec getParseSpec() + { + return parseSpec; + } + + @JsonProperty + public String gettClassName() + { + return tClassName; + } + + @Override + public ByteBufferInputRowParser withParseSpec(ParseSpec parseSpec) + { + return new ThriftStreamInputRowParser(parseSpec, tClassName); + } + +} diff --git a/extensions-contrib/thrift-extensions/src/main/java/io/druid/data/input/thrift/TBaseAsMap.java b/extensions-contrib/thrift-extensions/src/main/java/io/druid/data/input/thrift/TBaseAsMap.java new file mode 100644 index 000000000000..8c34177bd73d --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/main/java/io/druid/data/input/thrift/TBaseAsMap.java @@ -0,0 +1,175 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.data.input.thrift; + +import io.druid.java.util.common.logger.Logger; +import org.apache.thrift.TBase; +import org.apache.thrift.TFieldIdEnum; +import org.apache.thrift.meta_data.FieldMetaData; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +public class TBaseAsMap implements Map +{ + + private static final Logger log = new Logger(TBaseAsMap.class); + + private T tBase; + + public TBaseAsMap(T tBase) + { + this.tBase = tBase; + } + + @Override + public int size() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isEmpty() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsKey(Object key) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsValue(Object value) + { + throw new UnsupportedOperationException(); + } + + /** + * When used in MapBasedRow, field in TBase will be interpret as follows: + *
    + *
  • thrift type -> druid dimension:
  • + *
      + *
    • null, boolean, i16, i32, i64, double, string, Enum, Map, Set, Struct -> String, using String.valueOf
    • + *
    • bytes -> Arrays.toString()
    • + *
    • List -> List<String>, using Lists.transform(<List>dimValue, TO_STRING_INCLUDING_NULL)
    • + *
    + *
  • thrift type -> druid metric:
  • + *
      + *
    • null -> 0F/0L
    • + *
    • i16, i32, i64, double -> Float/Long, using Number.floatValue()/Number.longValue()
    • + *
    • string -> Float/Long, using Float.valueOf()/Long.valueOf()
    • + *
    • boolean, bytes, List, Enum, Map, Set, Struct -> ParseException
    • + *
    + *
+ * + * @param key ".".join(field names along the path) + */ + @SuppressWarnings("unchecked") + @Override + public Object get(Object key) + { + String fieldName = key.toString(); + String[] fieldNames = fieldName.split("\\."); + + int length = fieldNames.length; + int index = 0; + Object ret = tBase; + + boolean fieldNotFound = false; + while (!fieldNotFound && ret != null && index < length) { + if (ret instanceof TBase) { + TBase tempTBase = (TBase) ret; + Map structMetaDataMap = FieldMetaData.getStructMetaDataMap(tempTBase.getClass()); + TFieldIdEnum fieldIdEnum = null; + for (TFieldIdEnum tFieldIdEnum : structMetaDataMap.keySet()) { + if (tFieldIdEnum.getFieldName().equals(fieldNames[index])) { + fieldIdEnum = tFieldIdEnum; + break; + } + } + if (fieldIdEnum != null) { + ret = tempTBase.getFieldValue(fieldIdEnum); + index++; + } else { + fieldNotFound = true; + } + } else { + fieldNotFound = true; + } + } + + if (fieldNotFound) { + log.error("field not exists: %s", fieldName); + return null; + } + + if (ret instanceof byte[]) { + ret = Arrays.toString((byte[]) ret); + } + + return ret; + } + + @Override + public Object put(String key, Object value) + { + throw new UnsupportedOperationException(); + } + + @Override + public Object remove(Object key) + { + throw new UnsupportedOperationException(); + } + + @Override + public void putAll(Map m) + { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() + { + throw new UnsupportedOperationException(); + } + + @Override + public Set keySet() + { + throw new UnsupportedOperationException(); + } + + @Override + public Collection values() + { + throw new UnsupportedOperationException(); + } + + @Override + public Set> entrySet() + { + throw new UnsupportedOperationException(); + } +} diff --git a/extensions-contrib/thrift-extensions/src/main/java/io/druid/data/input/thrift/ThriftExtensionsModule.java b/extensions-contrib/thrift-extensions/src/main/java/io/druid/data/input/thrift/ThriftExtensionsModule.java new file mode 100644 index 000000000000..f063e50a0088 --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/main/java/io/druid/data/input/thrift/ThriftExtensionsModule.java @@ -0,0 +1,53 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.data.input.thrift; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import io.druid.data.input.ThriftHadoopInputRowParser; +import io.druid.data.input.ThriftStreamInputRowParser; +import io.druid.initialization.DruidModule; + +import java.util.Arrays; +import java.util.List; + +public class ThriftExtensionsModule implements DruidModule +{ + + public ThriftExtensionsModule() {} + + @Override + public List getJacksonModules() + { + return Arrays.asList( + new SimpleModule("ThriftInputRowParserModule") + .registerSubtypes( + new NamedType(ThriftStreamInputRowParser.class, "thrift_stream"), + new NamedType(ThriftHadoopInputRowParser.class, "thrift_hadoop") + ) + ); + } + + @Override + public void configure(Binder binder) {} + +} diff --git a/extensions-contrib/thrift-extensions/src/main/java/io/druid/data/input/thrift/util/ThriftUtils.java b/extensions-contrib/thrift-extensions/src/main/java/io/druid/data/input/thrift/util/ThriftUtils.java new file mode 100644 index 000000000000..6830928d07e4 --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/main/java/io/druid/data/input/thrift/util/ThriftUtils.java @@ -0,0 +1,201 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.data.input.thrift.util; + +import io.druid.java.util.common.logger.Logger; +import org.apache.commons.codec.binary.Base64; +import org.apache.thrift.TBase; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TJSONProtocol; + +import java.nio.charset.StandardCharsets; + +import static java.util.Objects.requireNonNull; + +public final class ThriftUtils +{ + + private static final Logger log = new Logger(ThriftUtils.class); + + private static final ThreadLocal SERIALIZER = new ThreadLocal() + { + @Override + protected TSerializer initialValue() + { + return new TSerializer(); + } + }; + + + private static final ThreadLocal DESERIALIZER = new ThreadLocal() + { + @Override + protected TDeserializer initialValue() + { + return new TDeserializer(); + } + }; + + private static final ThreadLocal DESERIALIZER_COMPACT = new ThreadLocal() + { + @Override + protected TDeserializer initialValue() + { + return new TDeserializer(new TCompactProtocol.Factory()); + } + }; + + private static final ThreadLocal DESERIALIZER_JSON = new ThreadLocal() + { + @Override + protected TDeserializer initialValue() + { + return new TDeserializer(new TJSONProtocol.Factory()); + } + }; + + + private static final byte[] EMPTY_BYTES = new byte[0]; + + public static byte[] decodeB64IfNeeded(final byte[] src) + { + if (requireNonNull(src).length <= 0) { + return EMPTY_BYTES; + } + final byte last = src[src.length - 1]; + return (0 == last || '}' == last) ? src : Base64.decodeBase64(src); + } + + /** + * Attempt to determine the protocol used to serialize some data. + *

+ * The guess algorithm is copied from TProtocolUtil.java of thrift java runtime library. + * In some cases, no guess can be done, in that case we use TBinaryProtocol. + * To be certain to correctly detect the protocol, the first encoded + * field should have a field id < 256, and this is always true for our projects. + * + * @param data The serialized data to guess the protocol for. + * + * @return a deserializer with correct protocol for the current thread. + */ + private static ThreadLocal guessProtocol(final byte[] data) + { + if (data.length <= 1) { + return DESERIALIZER; + } + + final byte first = data[0], last = data[data.length - 1]; + + if ('{' == first && '}' == last) { + return DESERIALIZER_JSON; + } + + if (last != 0) { + return DESERIALIZER; + } + + if (first > 0x10) { + return DESERIALIZER_COMPACT; + } + + final byte second = data[1]; + if (0 == second) { + return DESERIALIZER; + } + + if ((second & 0x80) != 0) { + return DESERIALIZER_COMPACT; + } + + return DESERIALIZER; // fallback + } + + /** + * Deserializes byte-array into thrift object. + *

+ * Supporting binary, compact and json protocols, + * and the byte array could be or not be encoded by Base64. + * + * @param bytes the byte-array to deserialize + * @param thriftObj the output thrift object + * + * @return the output thrift object, or null if error occurs + */ + public static > T detectAndDeserialize(final byte[] bytes, final T thriftObj) throws TException + { + requireNonNull(thriftObj).clear(); + try { + final byte[] src = decodeB64IfNeeded(bytes); + guessProtocol(src).get().deserialize(thriftObj, src); + } + catch (final IllegalArgumentException e) { + throw new TException(e); + } + return thriftObj; + } + + + /** + * Serializes thrift object using binary protocol, and then encodes the binary using base64. + * + * @param thriftObj the thrift object + * + * @return the encoded base64 string, or null if error occurs + */ + public static String encodeBase64String(TBase thriftObj) + { + try { + byte[] binaryData = SERIALIZER.get().serialize(thriftObj); + return new String(Base64.encodeBase64(binaryData), StandardCharsets.UTF_8); + } + catch (TException e) { + log.warn("Error occurs when encoding thrift object, %s" + e.getMessage()); + return null; + } + } + + /** + * Decode base64 string into byte-array, and then deserializes it into thrift object using binary protocol + * + * @param str the base64 string to decode + * @param thriftObj the output thrift object + * + * @return the decoded thrift object, or null if error occurs + */ + @SuppressWarnings("rawtypes") + public static T decodeBase64String(String str, T thriftObj) + { + try { + byte[] binaryData = Base64.decodeBase64(str.getBytes(StandardCharsets.UTF_8)); + thriftObj.clear(); + DESERIALIZER.get().deserialize(thriftObj, binaryData); + return thriftObj; + } + catch (TException e) { + log.warn("Error occurs when decoding thrift object, %s" + e.getMessage()); + return null; + } + } + + +} diff --git a/extensions-contrib/thrift-extensions/src/main/native/linux/bin/thrift b/extensions-contrib/thrift-extensions/src/main/native/linux/bin/thrift new file mode 100755 index 000000000000..831a0f76b5f4 Binary files /dev/null and b/extensions-contrib/thrift-extensions/src/main/native/linux/bin/thrift differ diff --git a/extensions-contrib/thrift-extensions/src/main/native/mac/bin/thrift b/extensions-contrib/thrift-extensions/src/main/native/mac/bin/thrift new file mode 100755 index 000000000000..70d1eed23faf Binary files /dev/null and b/extensions-contrib/thrift-extensions/src/main/native/mac/bin/thrift differ diff --git a/extensions-contrib/thrift-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-contrib/thrift-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 000000000000..fd27114b60d9 --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.data.input.thrift.ThriftExtensionsModule \ No newline at end of file diff --git a/extensions-contrib/thrift-extensions/src/test/java/io/druid/data/input/ThriftHadoopInputRowParserTest.java b/extensions-contrib/thrift-extensions/src/test/java/io/druid/data/input/ThriftHadoopInputRowParserTest.java new file mode 100644 index 000000000000..5733f73775d9 --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/test/java/io/druid/data/input/ThriftHadoopInputRowParserTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.data.input; + +import com.twitter.elephantbird.mapreduce.io.ThriftWritable; +import com.twitter.elephantbird.util.ThriftUtils; +import io.druid.data.input.test.TestData; +import org.junit.Test; + +import static io.druid.data.input.ThriftStreamInputRowParserTest.PARSE_SPEC; +import static io.druid.data.input.ThriftStreamInputRowParserTest.assertInputRow; +import static io.druid.data.input.ThriftStreamInputRowParserTest.buildTestData; + +public class ThriftHadoopInputRowParserTest +{ + + private static final ThriftHadoopInputRowParser PARSER = new ThriftHadoopInputRowParser(PARSE_SPEC); + + @Test + public void test_parse() + { + InputRow inputRow = PARSER.parse(new ThriftWritable<>(buildTestData(), ThriftUtils.getTypeRef(TestData.class))); + assertInputRow(inputRow); + } + +} diff --git a/extensions-contrib/thrift-extensions/src/test/java/io/druid/data/input/ThriftStreamInputRowParserTest.java b/extensions-contrib/thrift-extensions/src/test/java/io/druid/data/input/ThriftStreamInputRowParserTest.java new file mode 100644 index 000000000000..e1f7012558f9 --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/test/java/io/druid/data/input/ThriftStreamInputRowParserTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.data.input; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.TimeAndDimsParseSpec; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.data.input.test.TestData; +import io.druid.data.input.test.TestEnum; +import io.druid.data.input.test.TestStruct; +import io.druid.data.input.thrift.util.ThriftUtils; +import org.joda.time.DateTime; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +public class ThriftStreamInputRowParserTest +{ + + private static final Function intToString = new Function() + { + @Nullable + @Override + public String apply(@Nullable Integer integer) + { + return integer.toString(); + } + }; + + public static final DateTime DATE_TIME = new DateTime(2016, 10, 19, 10, 30); + public static final short SHORT_DIMENSION_VALUE = 2; + public static final int INT_DIMENSION_VALUE = 3; + public static final long LONG_DIMENSION_VALUE = 4; + public static final double DOUBLE_DIMENSION_VALUE = 5; + public static final boolean BOOLEAN_DIMENSION_VALUE = true; + public static final String STRING_DIMENSION_VALUE = "6"; + public static final byte BYTE_DIMENSION_VALUE = 7; + public static final byte[] BINARY_DIMENSION_VALUE = new byte[]{8, 9, 10}; + public static final TestEnum ENUM_DIMENSION_VALUE = TestEnum.ONE; + public static final List INT_LIST_DIMENSION_VALUE = Collections.singletonList(1); + public static final Set INT_SET_DIMENSION_VALUE = Sets.newHashSet(1); + public static final Map INT_INT_MAP_DIMENSION_VALUE = ImmutableMap.of(1, 1); + public static final int STRUCT_INT_DIMENSION_VALUE = 1; + public static final TestStruct STRUCT_DIMENSION_VALUE = new TestStruct(STRUCT_INT_DIMENSION_VALUE); + public static final short SHORT_METRICS_VALUE = 16; + public static final int INT_METRICS_VALUE = 17; + public static final long LONG_METRICS_VALUE = 18; + public static final double DOUBLE_METRICS_VALUE = 19; + public static final String STRING_METRICS_VALUE = "20"; + public static final byte BYTE_METRICS_VALUE = 21; + + public static final String TIMESTAMP = "timestamp"; + public static final String SHORT_DIMENSION = "shortDimension"; + public static final String INT_DIMENSION = "intDimension"; + public static final String LONG_DIMENSION = "longDimension"; + public static final String DOUBLE_DIMENSION = "doubleDimension"; + public static final String BOOLEAN_DIMENSION = "booleanDimension"; + public static final String STRING_DIMENSION = "stringDimension"; + public static final String BYTE_DIMENSION = "byteDimension"; + public static final String BINARY_DIMENSION = "binaryDimension"; + public static final String ENUM_DIMENSION = "enumDimension"; + public static final String INT_LIST_DIMENSION = "intListDimension"; + public static final String INT_SET_DIMENSION = "intSetDimension"; + public static final String INT_INT_MAP_DIMENSION = "intIntMapDimension"; + public static final String STRUCT_INT_DIMENSION = "structDimension.someInt"; + public static final String SHORT_METRICS = "shortMetrics"; + public static final String INT_METRICS = "intMetrics"; + public static final String LONG_METRICS = "longMetrics"; + public static final String DOUBLE_METRICS = "doubleMetrics"; + public static final String STRING_METRICS = "stringMetrics"; + public static final String BYTE_METRICS = "byteMetrics"; + + public static final Float EPSILON = 0.00001F; + + public static final List DIMENSIONS = Arrays.asList( + SHORT_DIMENSION, + INT_DIMENSION, + LONG_DIMENSION, + DOUBLE_DIMENSION, + BOOLEAN_DIMENSION, + STRING_DIMENSION, + BYTE_DIMENSION, + BINARY_DIMENSION, + ENUM_DIMENSION, + INT_LIST_DIMENSION, + INT_SET_DIMENSION, + INT_INT_MAP_DIMENSION, + STRUCT_INT_DIMENSION + ); + + public static final TimestampSpec TIMESTAMP_SPEC = + new TimestampSpec(TIMESTAMP, "millis", null); + public static final DimensionsSpec DIMENSIONS_SPEC = + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(DIMENSIONS), new ArrayList(), null); + public static final TimeAndDimsParseSpec PARSE_SPEC = new TimeAndDimsParseSpec(TIMESTAMP_SPEC, DIMENSIONS_SPEC); + + public static final String tClasName = "io.druid.data.input.test.TestData"; + + public static final ThriftStreamInputRowParser PARSER = new ThriftStreamInputRowParser(PARSE_SPEC, tClasName); + + public static void assertInputRow(InputRow inputRow) + { + assertEquals(DIMENSIONS, inputRow.getDimensions()); + assertEquals(DATE_TIME.getMillis(), inputRow.getTimestampFromEpoch()); + + assertEquals(buildStringList(SHORT_DIMENSION_VALUE), inputRow.getDimension(SHORT_DIMENSION)); + assertEquals(buildStringList(INT_DIMENSION_VALUE), inputRow.getDimension(INT_DIMENSION)); + assertEquals(buildStringList(LONG_DIMENSION_VALUE), inputRow.getDimension(LONG_DIMENSION)); + assertEquals(buildStringList(DOUBLE_DIMENSION_VALUE), inputRow.getDimension(DOUBLE_DIMENSION)); + assertEquals(buildStringList(BOOLEAN_DIMENSION_VALUE), inputRow.getDimension(BOOLEAN_DIMENSION)); + assertEquals(buildStringList(STRING_DIMENSION_VALUE), inputRow.getDimension(STRING_DIMENSION)); + assertEquals(buildStringList(BYTE_DIMENSION_VALUE), inputRow.getDimension(BYTE_DIMENSION)); + assertEquals(buildStringList(Arrays.toString(BINARY_DIMENSION_VALUE)), inputRow.getDimension(BINARY_DIMENSION)); + assertEquals(buildStringList(ENUM_DIMENSION_VALUE), inputRow.getDimension(ENUM_DIMENSION)); + assertEquals( + Lists.transform(INT_LIST_DIMENSION_VALUE, intToString), + inputRow.getDimension(INT_LIST_DIMENSION) + ); + assertEquals(buildStringList(INT_INT_MAP_DIMENSION_VALUE), inputRow.getDimension(INT_INT_MAP_DIMENSION)); + assertEquals(buildStringList(INT_SET_DIMENSION_VALUE), inputRow.getDimension(INT_SET_DIMENSION)); + assertEquals(buildStringList(STRUCT_INT_DIMENSION_VALUE), inputRow.getDimension(STRUCT_INT_DIMENSION)); + + assertEquals((long) SHORT_METRICS_VALUE, inputRow.getLongMetric(SHORT_METRICS)); + assertEquals((long) INT_METRICS_VALUE, inputRow.getLongMetric(INT_METRICS)); + assertEquals(LONG_METRICS_VALUE, inputRow.getLongMetric(LONG_METRICS)); + assertEquals((long) DOUBLE_METRICS_VALUE, inputRow.getLongMetric(DOUBLE_METRICS)); + assertEquals(Long.valueOf(STRING_METRICS_VALUE).longValue(), inputRow.getLongMetric(STRING_METRICS)); + assertEquals((long) BYTE_METRICS_VALUE, inputRow.getLongMetric(BYTE_METRICS)); + + assertEquals((float) SHORT_METRICS_VALUE, inputRow.getFloatMetric(SHORT_METRICS), EPSILON); + assertEquals((float) INT_METRICS_VALUE, inputRow.getFloatMetric(INT_METRICS), EPSILON); + assertEquals((float) LONG_METRICS_VALUE, inputRow.getFloatMetric(LONG_METRICS), EPSILON); + assertEquals(DOUBLE_METRICS_VALUE, inputRow.getFloatMetric(DOUBLE_METRICS), EPSILON); + assertEquals(Float.valueOf(STRING_METRICS_VALUE), inputRow.getFloatMetric(STRING_METRICS), EPSILON); + assertEquals((float) BYTE_METRICS_VALUE, inputRow.getFloatMetric(BYTE_METRICS), EPSILON); + } + + public static TestData buildTestData() + { + TestData testData = new TestData(); + return testData + .setTimestamp(DATE_TIME.getMillis()) + .setShortDimension(SHORT_DIMENSION_VALUE) + .setIntDimension(INT_DIMENSION_VALUE) + .setLongDimension(LONG_DIMENSION_VALUE) + .setDoubleDimension(DOUBLE_DIMENSION_VALUE) + .setBooleanDimension(BOOLEAN_DIMENSION_VALUE) + .setStringDimension(STRING_DIMENSION_VALUE) + .setByteDimension(BYTE_DIMENSION_VALUE) + .setBinaryDimension(BINARY_DIMENSION_VALUE) + .setEnumDimension(ENUM_DIMENSION_VALUE) + .setIntListDimension(INT_LIST_DIMENSION_VALUE) + .setIntSetDimension(INT_SET_DIMENSION_VALUE) + .setIntIntMapDimension(INT_INT_MAP_DIMENSION_VALUE) + .setStructDimension(STRUCT_DIMENSION_VALUE) + .setShortMetrics(SHORT_METRICS_VALUE) + .setIntMetrics(INT_METRICS_VALUE) + .setLongMetrics(LONG_METRICS_VALUE) + .setDoubleMetrics(DOUBLE_METRICS_VALUE) + .setStringMetrics(STRING_METRICS_VALUE) + .setByteMetrics(BYTE_METRICS_VALUE); + } + + private static List buildStringList(Object obj) + { + return Collections.singletonList(String.valueOf(obj)); + } + + @Test + public void test_parse() + { + TestData testData = buildTestData(); + InputRow inputRow = PARSER.parse(ByteBuffer.wrap(ThriftUtils.encodeBase64String(testData).getBytes())); + assertInputRow(inputRow); + } + +} diff --git a/extensions-contrib/thrift-extensions/src/test/java/io/druid/data/input/thrift/util/ThriftUtilsTest.java b/extensions-contrib/thrift-extensions/src/test/java/io/druid/data/input/thrift/util/ThriftUtilsTest.java new file mode 100644 index 000000000000..d624d281958a --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/test/java/io/druid/data/input/thrift/util/ThriftUtilsTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.data.input.thrift.util; + +import io.druid.data.input.test.TestThriftObj; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +public class ThriftUtilsTest +{ + + @Test + public void convert_one_thrift_obj_base64() + { + TestThriftObj thriftObj1 = new TestThriftObj(), thriftObj2; + thriftObj1.setId(1); + thriftObj1.setName("name1"); + thriftObj1.setTimestamp(1234L); + thriftObj1.addToBlackList("invalidip"); + + String encodedStr = ThriftUtils.encodeBase64String(thriftObj1); + thriftObj2 = ThriftUtils.decodeBase64String(encodedStr, new TestThriftObj()); + assertEquals(thriftObj1, thriftObj2); + } + + + @Test + public void convert_thrift_obj_base64_for_reuse() + { + TestThriftObj thriftObj1 = new TestThriftObj(), thriftObj2 = new TestThriftObj(); + TestThriftObj outThriftObj = new TestThriftObj(); + + thriftObj1.setId(1); + thriftObj1.setName("name2"); + thriftObj1.setTimestamp(2345L); + + thriftObj2.setId(2); + thriftObj2.setTimestamp(3456L); + thriftObj2.addToBlackList("myiplist"); + + // Test re-using the output thrift object + String encodedStr1 = ThriftUtils.encodeBase64String(thriftObj1); + outThriftObj = ThriftUtils.decodeBase64String(encodedStr1, thriftObj2); + assertSame(thriftObj2, outThriftObj); + assertEquals(thriftObj1, outThriftObj); + + String encodedStr2 = ThriftUtils.encodeBase64String(thriftObj2); + outThriftObj = ThriftUtils.decodeBase64String(encodedStr2, thriftObj2); + assertSame(thriftObj2, outThriftObj); + assertEquals(thriftObj2, outThriftObj); + + String encodedStr3 = ThriftUtils.encodeBase64String(thriftObj1); + outThriftObj = ThriftUtils.decodeBase64String(encodedStr3, thriftObj2); + assertSame(thriftObj2, outThriftObj); + assertEquals(thriftObj1, outThriftObj); + } + + +} diff --git a/extensions-contrib/thrift-extensions/src/test/thrift/testData.thrift b/extensions-contrib/thrift-extensions/src/test/thrift/testData.thrift new file mode 100644 index 000000000000..515f07520e8c --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/test/thrift/testData.thrift @@ -0,0 +1,34 @@ +namespace java io.druid.data.input.test + +struct TestStruct { + 1: required i32 someInt; +} + +enum TestEnum { + ONE = 1, + TWO = 2, +} + +struct TestData { + 1: required i64 timestamp, + 2: optional i16 shortDimension, + 3: optional i32 intDimension, + 4: optional i64 longDimension, + 5: optional double doubleDimension, + 6: optional bool booleanDimension, + 7: optional string stringDimension, + 8: optional byte byteDimension, + 9: optional binary binaryDimension, + 10: optional TestEnum enumDimension, + 11: optional list intListDimension, + 12: optional set intSetDimension, + 13: optional map intIntMapDimension, + 14: optional TestStruct structDimension, + // 15 is skipped + 16: optional i16 shortMetrics, + 17: optional i32 intMetrics, + 18: optional i64 longMetrics, + 19: optional double doubleMetrics, + 20: optional string stringMetrics, + 21: optional byte byteMetrics, +} diff --git a/extensions-contrib/thrift-extensions/src/test/thrift/testThriftObj.thrift b/extensions-contrib/thrift-extensions/src/test/thrift/testThriftObj.thrift new file mode 100644 index 000000000000..97627fc9f350 --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/test/thrift/testThriftObj.thrift @@ -0,0 +1,10 @@ +namespace java io.druid.data.input.test + +struct TestThriftObj +{ + 1: required i32 id, + 2: optional string name, + 3: optional i64 timestamp, + 4: optional list blackList, +} + diff --git a/pom.xml b/pom.xml index a69561e42983..6cd69d10dd3d 100644 --- a/pom.xml +++ b/pom.xml @@ -109,6 +109,7 @@ extensions-contrib/distinctcount extensions-contrib/parquet-extensions extensions-contrib/statsd-emitter + extensions-contrib/thrift-extensions extensions-contrib/orc-extensions