diff --git a/docs/development/extensions-contrib/thrift.md b/docs/development/extensions-contrib/thrift.md index 31489827090e..8e15c154b72c 100644 --- a/docs/development/extensions-contrib/thrift.md +++ b/docs/development/extensions-contrib/thrift.md @@ -25,27 +25,114 @@ title: "Thrift" To use this Apache Druid extension, [include](../../configuration/extensions.md#loading-extensions) `druid-thrift-extensions` in the extensions load list. -This extension enables Druid to ingest thrift compact data online (`ByteBuffer`) and offline (SequenceFile of type `` or LzoThriftBlock File). +This extension enables Druid to ingest Thrift-encoded data from streaming sources such as Kafka and Kinesis, as well as from Hadoop batch jobs reading SequenceFile or LzoThriftBlock files. The binary, compact, and JSON Thrift wire protocols are all supported, with optional Base64 encoding. You may want to use another version of thrift, change the dependency in pom and compile yourself. +## Thrift input format + +Thrift-encoded data for streaming ingestion (Kafka, Kinesis) can be ingested using the Thrift [input format](../../ingestion/data-formats.md#input-format). It supports `flattenSpec` for extracting fields from nested Thrift structs using JSONPath expressions. + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | Must be `thrift` | yes | +| thriftClass | String | Fully qualified class name of the Thrift-generated `TBase` class to deserialize into. | yes | +| thriftJar | String | Path to a JAR file containing the Thrift class. If not provided, the class is looked up from the classpath. | no | +| flattenSpec | JSON Object | Specifies flattening of nested Thrift structs. See [Flattening nested data](../../ingestion/data-formats.md#flattenspec) for details. | no | + +### Example: Kafka ingestion + +Consider the following Thrift schema definition: + +``` +namespace java com.example.druid + +struct Author { + 1: string firstName; + 2: string lastName; +} + +struct Book { + 1: string date; + 2: double price; + 3: string title; + 4: Author author; +} +``` + +Compile it to produce `com.example.druid.Book` (and `com.example.druid.Author`) and make the resulting JAR available on the classpath of your Druid processes, or reference it via `thriftJar`. + +The following Kafka supervisor spec ingests compact-encoded `Book` messages, using a `flattenSpec` to extract the nested `author.lastName` field: + +```json +{ + "type": "kafka", + "spec": { + "dataSchema": { + "dataSource": "books", + "timestampSpec": { + "column": "date", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": [ + "title", + "lastName" + ] + }, + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "DAY", + "queryGranularity": "NONE" + } + }, + "tuningConfig": { + "type": "kafka" + }, + "ioConfig": { + "type": "kafka", + "consumerProperties": { + "bootstrap.servers": "localhost:9092" + }, + "topic": "books", + "inputFormat": { + "type": "thrift", + "thriftClass": "com.example.druid.Book", + "flattenSpec": { + "useFieldDiscovery": true, + "fields": [ + { + "type": "path", + "name": "lastName", + "expr": "$.author.lastName" + } + ] + } + }, + "taskCount": 1, + "replicas": 1, + "taskDuration": "PT1H" + } + } +} +``` + ## LZO Support If you plan to read LZO-compressed Thrift files, you will need to download version 0.4.19 of the [hadoop-lzo JAR](https://mvnrepository.com/artifact/com.hadoop.gplcompression/hadoop-lzo/0.4.19) and place it in your `extensions/druid-thrift-extensions` directory. -## Thrift Parser - +## Thrift parser (deprecated) -| Field | Type | Description | Required | -| ----------- | ----------- | ---------------------------------------- | -------- | -| type | String | This should say `thrift` | yes | -| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be a JSON parseSpec. | yes | -| thriftJar | String | path of thrift jar, if not provided, it will try to find the thrift class in classpath. Thrift jar in batch ingestion should be uploaded to HDFS first and configure `jobProperties` with `"tmpjars":"/path/to/your/thrift.jar"` | no | -| thriftClass | String | classname of thrift | yes | +`ThriftInputRowParser` is the legacy parser-based approach to Thrift ingestion. It is deprecated in favor of `ThriftInputFormat` above and will be removed in a future release. -- Batch Ingestion example - `inputFormat` and `tmpjars` should be set. +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | Must be `thrift` | yes | +| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be a JSON parseSpec. | yes | +| thriftJar | String | Path to the Thrift JAR. If not provided, the class is looked up from the classpath. For Hadoop batch ingestion the JAR should be uploaded to HDFS first and `jobProperties` configured with `"tmpjars":"/path/to/your/thrift.jar"`. | no | +| thriftClass | String | Fully qualified class name of the Thrift-generated class. | yes | -This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of inputSpec in ioConfig could be one of `"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"` and `com.twitter.elephantbird.mapreduce.input.LzoThriftBlockInputFormat`. Be careful, when `LzoThriftBlockInputFormat` is used, thrift class must be provided twice. +Batch ingestion example using the HadoopDruidIndexer. The `inputFormat` of `inputSpec` in `ioConfig` can be either `"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"` or `"com.twitter.elephantbird.mapreduce.input.LzoThriftBlockInputFormat"`. When using `LzoThriftBlockInputFormat`, the Thrift class must be provided twice. ```json { @@ -60,7 +147,8 @@ This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of inp "protocol": "compact", "parseSpec": { "format": "json", - ... + "timestampSpec": {}, + "dimensionsSpec": {} } }, "metricsSpec": [], @@ -71,15 +159,13 @@ This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of inp "inputSpec": { "type": "static", "inputFormat": "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", - // "inputFormat": "com.twitter.elephantbird.mapreduce.input.LzoThriftBlockInputFormat", "paths": "/user/to/some/book.seq" } }, "tuningConfig": { "type": "hadoop", "jobProperties": { - "tmpjars":"/user/h_user_profile/du00/druid/test/book.jar", - // "elephantbird.class.for.MultiInputFormat" : "${YOUR_THRIFT_CLASS_NAME}" + "tmpjars": "/user/h_user_profile/du00/druid/test/book.jar" } } } diff --git a/embedded-tests/pom.xml b/embedded-tests/pom.xml index 192bfa71b038..1e62fc3678ce 100644 --- a/embedded-tests/pom.xml +++ b/embedded-tests/pom.xml @@ -137,6 +137,18 @@ ${project.parent.version} test + + org.apache.druid.extensions.contrib + druid-thrift-extensions + ${project.parent.version} + test + + + com.googlecode.json-simple + json-simple + + + joda-time @@ -591,7 +603,12 @@ 5.5 test - + + org.apache.thrift + libthrift + 0.13.0 + test + diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexDataFormatsTestBase.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexDataFormatsTestBase.java index 18d3f5123749..031632f5989a 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexDataFormatsTestBase.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexDataFormatsTestBase.java @@ -42,6 +42,8 @@ import org.apache.druid.data.input.protobuf.ProtobufInputFormat; import org.apache.druid.data.input.protobuf.ProtobufInputRowParser; import org.apache.druid.data.input.protobuf.SchemaRegistryBasedProtobufBytesDecoder; +import org.apache.druid.data.input.thrift.ThriftExtensionsModule; +import org.apache.druid.data.input.thrift.ThriftInputFormat; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.query.DruidMetrics; @@ -54,6 +56,8 @@ import org.apache.druid.testing.embedded.EmbeddedOverlord; import org.apache.druid.testing.embedded.StreamIngestResource; import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.apache.druid.testing.embedded.tools.ThriftEventSerializer; +import org.apache.druid.testing.embedded.tools.WikipediaThriftEvent; import org.apache.druid.testing.tools.AvroEventSerializer; import org.apache.druid.testing.tools.AvroSchemaRegistryEventSerializer; import org.apache.druid.testing.tools.CsvEventSerializer; @@ -81,6 +85,8 @@ *
  • CSV
  • *
  • JSON
  • *
  • Protobuf (with and without schema registry)
  • + *
  • Thrift
  • + *
  • TSV
  • * * * This tests both InputFormat and Parser. Parser is deprecated for Streaming Ingestion, @@ -138,6 +144,7 @@ public EmbeddedDruidCluster createCluster() coordinator.addProperty("druid.manager.segments.useIncrementalCache", "ifSynced"); cluster.addExtension(ProtobufExtensionsModule.class) .addExtension(AvroExtensionsModule.class) + .addExtension(ThriftExtensionsModule.class) .useLatchableEmitter() .addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s") .addResource(streamResource) @@ -543,6 +550,28 @@ public void test_tsvDataFormat() stopSupervisor(supervisorSpec); } + @Test + @Timeout(30) + public void test_thriftDataFormat() + { + streamResource.createTopicWithPartitions(dataSource, 3); + EventSerializer serializer = new ThriftEventSerializer(); + int recordCount = generateStreamAndPublish(dataSource, serializer, false); + + ThriftInputFormat inputFormat = new ThriftInputFormat( + new JSONPathSpec(true, null), + null, + WikipediaThriftEvent.class.getName() + ); + + SupervisorSpec supervisorSpec = createSupervisor(dataSource, dataSource, inputFormat); + final String supervisorId = cluster.callApi().postSupervisor(supervisorSpec); + Assertions.assertEquals(dataSource, supervisorId); + + waitForDataAndVerifyIngestedEvents(dataSource, recordCount); + stopSupervisor(supervisorSpec); + } + private void waitForDataAndVerifyIngestedEvents(String dataSource, int expectedCount) { // Wait for the task to succeed diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/tools/ThriftEventSerializer.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/tools/ThriftEventSerializer.java new file mode 100644 index 000000000000..5f5767c5192e --- /dev/null +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/tools/ThriftEventSerializer.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.tools; + +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.testing.tools.EventSerializer; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TCompactProtocol; + +import java.util.List; + +/** + * {@link EventSerializer} that serializes Wikipedia stream events as Thrift compact-encoded + * {@link WikipediaThriftEvent} objects. All field values are converted to strings so that + * integer values like {@code added}, {@code deleted}, and {@code delta} are serialized + * uniformly without requiring a mixed-type Thrift struct. + */ +public class ThriftEventSerializer implements EventSerializer +{ + private static final TSerializer SERIALIZER = new TSerializer(new TCompactProtocol.Factory()); + + @Override + public byte[] serialize(List> event) + { + WikipediaThriftEvent wikiEvent = new WikipediaThriftEvent(); + for (Pair pair : event) { + String value = pair.rhs == null ? null : String.valueOf(pair.rhs); + switch (pair.lhs) { + case "timestamp": + wikiEvent.timestamp = value; + break; + case "page": + wikiEvent.page = value; + break; + case "language": + wikiEvent.language = value; + break; + case "user": + wikiEvent.user = value; + break; + case "unpatrolled": + wikiEvent.unpatrolled = value; + break; + case "newPage": + wikiEvent.newPage = value; + break; + case "robot": + wikiEvent.robot = value; + break; + case "anonymous": + wikiEvent.anonymous = value; + break; + case "namespace": + wikiEvent.namespace = value; + break; + case "continent": + wikiEvent.continent = value; + break; + case "country": + wikiEvent.country = value; + break; + case "region": + wikiEvent.region = value; + break; + case "city": + wikiEvent.city = value; + break; + case "added": + wikiEvent.added = value; + break; + case "deleted": + wikiEvent.deleted = value; + break; + case "delta": + wikiEvent.delta = value; + break; + default: + break; + } + } + try { + return SERIALIZER.serialize(wikiEvent); + } + catch (TException e) { + throw new RuntimeException("Failed to serialize WikipediaThriftEvent", e); + } + } + + @Override + public void close() + { + } +} diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/tools/WikipediaThriftEvent.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/tools/WikipediaThriftEvent.java new file mode 100644 index 000000000000..c2aabf98d657 --- /dev/null +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/tools/WikipediaThriftEvent.java @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.tools; + +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.apache.thrift.TFieldIdEnum; +import org.apache.thrift.protocol.TField; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TProtocolUtil; +import org.apache.thrift.protocol.TStruct; +import org.apache.thrift.protocol.TType; + +import java.util.EnumMap; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * A hand-written Thrift struct representing a Wikipedia stream event for use in + * embedded stream ingestion tests. All fields are strings so that + * {@code TSimpleJSONProtocol} can serialize them without special handling. + */ +public class WikipediaThriftEvent + implements TBase, java.io.Serializable +{ + private static final long serialVersionUID = 1L; + + private static final TStruct STRUCT_DESC = new TStruct("WikipediaThriftEvent"); + + private static final TField TIMESTAMP_FIELD_DESC = new TField("timestamp", TType.STRING, (short) 1); + private static final TField PAGE_FIELD_DESC = new TField("page", TType.STRING, (short) 2); + private static final TField LANGUAGE_FIELD_DESC = new TField("language", TType.STRING, (short) 3); + private static final TField USER_FIELD_DESC = new TField("user", TType.STRING, (short) 4); + private static final TField UNPATROLLED_FIELD_DESC = new TField("unpatrolled", TType.STRING, (short) 5); + private static final TField NEW_PAGE_FIELD_DESC = new TField("newPage", TType.STRING, (short) 6); + private static final TField ROBOT_FIELD_DESC = new TField("robot", TType.STRING, (short) 7); + private static final TField ANONYMOUS_FIELD_DESC = new TField("anonymous", TType.STRING, (short) 8); + private static final TField NAMESPACE_FIELD_DESC = new TField("namespace", TType.STRING, (short) 9); + private static final TField CONTINENT_FIELD_DESC = new TField("continent", TType.STRING, (short) 10); + private static final TField COUNTRY_FIELD_DESC = new TField("country", TType.STRING, (short) 11); + private static final TField REGION_FIELD_DESC = new TField("region", TType.STRING, (short) 12); + private static final TField CITY_FIELD_DESC = new TField("city", TType.STRING, (short) 13); + private static final TField ADDED_FIELD_DESC = new TField("added", TType.STRING, (short) 14); + private static final TField DELETED_FIELD_DESC = new TField("deleted", TType.STRING, (short) 15); + private static final TField DELTA_FIELD_DESC = new TField("delta", TType.STRING, (short) 16); + + public String timestamp; + public String page; + public String language; + public String user; + public String unpatrolled; + public String newPage; + public String robot; + public String anonymous; + public String namespace; + public String continent; + public String country; + public String region; + public String city; + public String added; + public String deleted; + public String delta; + + public enum _Fields implements TFieldIdEnum + { + TIMESTAMP((short) 1, "timestamp"), + PAGE((short) 2, "page"), + LANGUAGE((short) 3, "language"), + USER((short) 4, "user"), + UNPATROLLED((short) 5, "unpatrolled"), + NEW_PAGE((short) 6, "newPage"), + ROBOT((short) 7, "robot"), + ANONYMOUS((short) 8, "anonymous"), + NAMESPACE((short) 9, "namespace"), + CONTINENT((short) 10, "continent"), + COUNTRY((short) 11, "country"), + REGION((short) 12, "region"), + CITY((short) 13, "city"), + ADDED((short) 14, "added"), + DELETED((short) 15, "deleted"), + DELTA((short) 16, "delta"); + + private static final Map BY_NAME = new HashMap<>(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + BY_NAME.put(field.getFieldName(), field); + } + } + + public static _Fields findByThriftId(int fieldId) + { + switch (fieldId) { + case 1: + return TIMESTAMP; + case 2: + return PAGE; + case 3: + return LANGUAGE; + case 4: + return USER; + case 5: + return UNPATROLLED; + case 6: + return NEW_PAGE; + case 7: + return ROBOT; + case 8: + return ANONYMOUS; + case 9: + return NAMESPACE; + case 10: + return CONTINENT; + case 11: + return COUNTRY; + case 12: + return REGION; + case 13: + return CITY; + case 14: + return ADDED; + case 15: + return DELETED; + case 16: + return DELTA; + default: + return null; + } + } + + public static _Fields findByName(String name) + { + return BY_NAME.get(name); + } + + private final short thriftId; + private final String fieldName; + + _Fields(short thriftId, String fieldName) + { + this.thriftId = thriftId; + this.fieldName = fieldName; + } + + @Override + public short getThriftFieldId() + { + return thriftId; + } + + @Override + public String getFieldName() + { + return fieldName; + } + } + + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> META_DATA_MAP; + + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<>(_Fields.class); + for (_Fields field : _Fields.values()) { + tmpMap.put(field, new org.apache.thrift.meta_data.FieldMetaData( + field.getFieldName(), + org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(TType.STRING) + )); + } + META_DATA_MAP = java.util.Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(WikipediaThriftEvent.class, META_DATA_MAP); + } + + public WikipediaThriftEvent() + { + } + + public WikipediaThriftEvent(WikipediaThriftEvent other) + { + this.timestamp = other.timestamp; + this.page = other.page; + this.language = other.language; + this.user = other.user; + this.unpatrolled = other.unpatrolled; + this.newPage = other.newPage; + this.robot = other.robot; + this.anonymous = other.anonymous; + this.namespace = other.namespace; + this.continent = other.continent; + this.country = other.country; + this.region = other.region; + this.city = other.city; + this.added = other.added; + this.deleted = other.deleted; + this.delta = other.delta; + } + + @Override + public WikipediaThriftEvent deepCopy() + { + return new WikipediaThriftEvent(this); + } + + @Override + public void clear() + { + timestamp = null; + page = null; + language = null; + user = null; + unpatrolled = null; + newPage = null; + robot = null; + anonymous = null; + namespace = null; + continent = null; + country = null; + region = null; + city = null; + added = null; + deleted = null; + delta = null; + } + + @Override + public _Fields fieldForId(int fieldId) + { + return _Fields.findByThriftId(fieldId); + } + + @Override + public boolean isSet(_Fields field) + { + if (field == null) { + throw new IllegalArgumentException(); + } + return getFieldValue(field) != null; + } + + @Override + public Object getFieldValue(_Fields field) + { + switch (field) { + case TIMESTAMP: + return timestamp; + case PAGE: + return page; + case LANGUAGE: + return language; + case USER: + return user; + case UNPATROLLED: + return unpatrolled; + case NEW_PAGE: + return newPage; + case ROBOT: + return robot; + case ANONYMOUS: + return anonymous; + case NAMESPACE: + return namespace; + case CONTINENT: + return continent; + case COUNTRY: + return country; + case REGION: + return region; + case CITY: + return city; + case ADDED: + return added; + case DELETED: + return deleted; + case DELTA: + return delta; + default: + throw new IllegalStateException(); + } + } + + @Override + public void setFieldValue(_Fields field, Object value) + { + switch (field) { + case TIMESTAMP: + timestamp = (String) value; + break; + case PAGE: + page = (String) value; + break; + case LANGUAGE: + language = (String) value; + break; + case USER: + user = (String) value; + break; + case UNPATROLLED: + unpatrolled = (String) value; + break; + case NEW_PAGE: + newPage = (String) value; + break; + case ROBOT: + robot = (String) value; + break; + case ANONYMOUS: + anonymous = (String) value; + break; + case NAMESPACE: + namespace = (String) value; + break; + case CONTINENT: + continent = (String) value; + break; + case COUNTRY: + country = (String) value; + break; + case REGION: + region = (String) value; + break; + case CITY: + city = (String) value; + break; + case ADDED: + added = (String) value; + break; + case DELETED: + deleted = (String) value; + break; + case DELTA: + delta = (String) value; + break; + default: + throw new IllegalStateException(); + } + } + + @Override + public void read(TProtocol iprot) throws TException + { + TField field; + iprot.readStructBegin(); + while (true) { + field = iprot.readFieldBegin(); + if (field.type == TType.STOP) { + break; + } + if (field.type == TType.STRING) { + switch (field.id) { + case 1: + timestamp = iprot.readString(); + break; + case 2: + page = iprot.readString(); + break; + case 3: + language = iprot.readString(); + break; + case 4: + user = iprot.readString(); + break; + case 5: + unpatrolled = iprot.readString(); + break; + case 6: + newPage = iprot.readString(); + break; + case 7: + robot = iprot.readString(); + break; + case 8: + anonymous = iprot.readString(); + break; + case 9: + namespace = iprot.readString(); + break; + case 10: + continent = iprot.readString(); + break; + case 11: + country = iprot.readString(); + break; + case 12: + region = iprot.readString(); + break; + case 13: + city = iprot.readString(); + break; + case 14: + added = iprot.readString(); + break; + case 15: + deleted = iprot.readString(); + break; + case 16: + delta = iprot.readString(); + break; + default: + TProtocolUtil.skip(iprot, field.type); + break; + } + } else { + TProtocolUtil.skip(iprot, field.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + validate(); + } + + @Override + public void write(TProtocol oprot) throws TException + { + validate(); + oprot.writeStructBegin(STRUCT_DESC); + writeStringField(oprot, TIMESTAMP_FIELD_DESC, timestamp); + writeStringField(oprot, PAGE_FIELD_DESC, page); + writeStringField(oprot, LANGUAGE_FIELD_DESC, language); + writeStringField(oprot, USER_FIELD_DESC, user); + writeStringField(oprot, UNPATROLLED_FIELD_DESC, unpatrolled); + writeStringField(oprot, NEW_PAGE_FIELD_DESC, newPage); + writeStringField(oprot, ROBOT_FIELD_DESC, robot); + writeStringField(oprot, ANONYMOUS_FIELD_DESC, anonymous); + writeStringField(oprot, NAMESPACE_FIELD_DESC, namespace); + writeStringField(oprot, CONTINENT_FIELD_DESC, continent); + writeStringField(oprot, COUNTRY_FIELD_DESC, country); + writeStringField(oprot, REGION_FIELD_DESC, region); + writeStringField(oprot, CITY_FIELD_DESC, city); + writeStringField(oprot, ADDED_FIELD_DESC, added); + writeStringField(oprot, DELETED_FIELD_DESC, deleted); + writeStringField(oprot, DELTA_FIELD_DESC, delta); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + private static void writeStringField(TProtocol oprot, TField desc, String value) throws TException + { + if (value != null) { + oprot.writeFieldBegin(desc); + oprot.writeString(value); + oprot.writeFieldEnd(); + } + } + + public void validate() throws TException + { + // no required fields + } + + @Override + public int compareTo(WikipediaThriftEvent other) + { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + return Objects.compare(timestamp, other.timestamp, java.util.Comparator.nullsFirst(java.util.Comparator.naturalOrder())); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WikipediaThriftEvent that = (WikipediaThriftEvent) o; + return Objects.equals(timestamp, that.timestamp) + && Objects.equals(page, that.page) + && Objects.equals(language, that.language) + && Objects.equals(user, that.user) + && Objects.equals(namespace, that.namespace); + } + + @Override + public int hashCode() + { + return Objects.hash(timestamp, page, language, user, namespace); + } + + @Override + public String toString() + { + return "WikipediaThriftEvent{timestamp=" + timestamp + ", page=" + page + "}"; + } +} diff --git a/extensions-contrib/thrift-extensions/pom.xml b/extensions-contrib/thrift-extensions/pom.xml index e212ddf4907d..9e39c0b5d8b4 100644 --- a/extensions-contrib/thrift-extensions/pom.xml +++ b/extensions-contrib/thrift-extensions/pom.xml @@ -115,6 +115,16 @@ jackson-databind provided + + com.google.code.findbugs + jsr305 + provided + + + commons-io + commons-io + provided + com.twitter scrooge-core_2.11 @@ -136,6 +146,11 @@ hamcrest test + + nl.jqno.equalsverifier + equalsverifier + test + diff --git a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftExtensionsModule.java b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftExtensionsModule.java index bea2a913e39e..4ffba3a7e241 100644 --- a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftExtensionsModule.java +++ b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftExtensionsModule.java @@ -37,7 +37,8 @@ public List getJacksonModules() return Collections.singletonList( new SimpleModule("ThriftInputRowParserModule") .registerSubtypes( - new NamedType(ThriftInputRowParser.class, "thrift") + new NamedType(ThriftInputRowParser.class, "thrift"), + new NamedType(ThriftInputFormat.class, "thrift") ) ); } diff --git a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java new file mode 100644 index 000000000000..fdf638fa7e3b --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.thrift; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.NestedInputFormat; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Objects; + +/** + * {@link org.apache.druid.data.input.InputFormat} for Thrift-encoded data. Supports binary, compact, and JSON + * Thrift protocols (with optional Base64 encoding). + *

    + * The thrift class can be provided either from the classpath or from an external jar file via {@code thriftJar}. + */ +public class ThriftInputFormat extends NestedInputFormat +{ + private final String thriftJar; + private final String thriftClass; + + @JsonCreator + public ThriftInputFormat( + @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec, + @JsonProperty("thriftJar") @Nullable String thriftJar, + @JsonProperty("thriftClass") String thriftClass + ) + { + super(flattenSpec); + this.thriftJar = thriftJar; + InvalidInput.conditionalException(thriftClass != null, "thriftClass must not be null"); + this.thriftClass = thriftClass; + } + + @Nullable + @JsonProperty("thriftJar") + @JsonInclude(JsonInclude.Include.NON_NULL) + public String getThriftJar() + { + return thriftJar; + } + + @JsonProperty("thriftClass") + public String getThriftClass() + { + return thriftClass; + } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + return new ThriftReader(inputRowSchema, source, thriftJar, thriftClass, getFlattenSpec()); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + ThriftInputFormat that = (ThriftInputFormat) o; + return Objects.equals(thriftJar, that.thriftJar) && + Objects.equals(thriftClass, that.thriftClass); + } + + @Override + public int hashCode() + { + return Objects.hash(super.hashCode(), thriftJar, thriftClass); + } +} diff --git a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java new file mode 100644 index 000000000000..d4b3c0bdc82c --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.thrift; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterators; +import org.apache.commons.io.IOUtils; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.common.parsers.ObjectFlattener; +import org.apache.druid.java.util.common.parsers.ObjectFlatteners; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class ThriftReader extends IntermediateRowParsingReader +{ + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final InputRowSchema inputRowSchema; + private final InputEntity source; + private final String jarPath; + private final String thriftClassName; + private final ObjectFlattener recordFlattener; + + private volatile Class thriftClass = null; + + ThriftReader( + InputRowSchema inputRowSchema, + InputEntity source, + @Nullable String jarPath, + String thriftClassName, + @Nullable JSONPathSpec flattenSpec + ) + { + this.inputRowSchema = inputRowSchema; + this.source = source; + this.jarPath = jarPath; + this.thriftClassName = thriftClassName; + this.recordFlattener = ObjectFlatteners.create( + flattenSpec, + new JSONFlattenerMaker(false, inputRowSchema.getDimensionsSpec().useSchemaDiscovery()) + ); + } + + @Override + protected CloseableIterator intermediateRowIterator() throws IOException + { + return CloseableIterators.withEmptyBaggage( + Iterators.singletonIterator(IOUtils.toByteArray(source.open())) + ); + } + + @Override + protected InputEntity source() + { + return source; + } + + @Override + protected List parseInputRows(byte[] intermediateRow) throws IOException, ParseException + { + return Collections.singletonList(MapInputRowParser.parse(inputRowSchema, toFlattenedMap(intermediateRow))); + } + + @Override + protected List> toMap(byte[] intermediateRow) throws IOException + { + return Collections.singletonList(toFlattenedMap(intermediateRow)); + } + + private Map toFlattenedMap(byte[] bytes) throws ParseException + { + try { + final Class clazz = getThriftClass(); + final TBase tbase = clazz.newInstance(); + ThriftDeserialization.detectAndDeserialize(bytes, tbase); + final String json = ThriftDeserialization.SERIALIZER_SIMPLE_JSON.get().toString(tbase); + final JsonNode node = OBJECT_MAPPER.readTree(json); + return recordFlattener.flatten(node); + } + catch (TException | IOException e) { + throw new ParseException(null, e, "Failed to deserialize Thrift data"); + } + catch (InstantiationException | IllegalAccessException e) { + throw new ParseException(null, e, "Failed to instantiate Thrift class [%s]", thriftClassName); + } + catch (ClassNotFoundException e) { + throw new ParseException(null, e, "Thrift class [%s] not found", thriftClassName); + } + } + + @SuppressWarnings({"unchecked", "ReturnValueIgnored"}) + private Class getThriftClass() throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException + { + if (thriftClass == null) { + final Class clazz; + if (jarPath != null) { + File jar = new File(jarPath); + URLClassLoader child = new URLClassLoader( + new URL[]{jar.toURI().toURL()}, + this.getClass().getClassLoader() + ); + clazz = (Class) Class.forName(thriftClassName, true, child); + } else { + clazz = (Class) Class.forName(thriftClassName); + } + // Verify the class can be instantiated + clazz.newInstance(); + thriftClass = clazz; + } + return thriftClass; + } +} diff --git a/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputFormatTest.java b/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputFormatTest.java new file mode 100644 index 000000000000..79a5d36a3c99 --- /dev/null +++ b/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputFormatTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.thrift; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TJSONProtocol; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +public class ThriftInputFormatTest +{ + private static final String THRIFT_CLASS = "org.apache.druid.data.input.thrift.Book"; + + private Book book; + private InputRowSchema schema; + private JSONPathSpec flattenSpec; + + @Before + public void setUp() + { + book = new Book() + .setDate("2016-08-29") + .setPrice(19.9) + .setTitle("title") + .setAuthor(new Author().setFirstName("first").setLastName("last")); + + schema = new InputRowSchema( + new TimestampSpec("date", "auto", null), + new DimensionsSpec(Lists.newArrayList( + new StringDimensionSchema("title"), + new StringDimensionSchema("lastName") + )), + ColumnsFilter.all() + ); + + flattenSpec = new JSONPathSpec( + true, + Lists.newArrayList( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "title", "title"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "lastName", "$.author.lastName") + ) + ); + } + + @Test + public void testParseCompact() throws Exception + { + TSerializer serializer = new TSerializer(new TCompactProtocol.Factory()); + byte[] bytes = serializer.serialize(book); + assertParsedRow(bytes); + } + + @Test + public void testParseBinaryBase64() throws Exception + { + TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory()); + byte[] bytes = StringUtils.encodeBase64(serializer.serialize(book)); + assertParsedRow(bytes); + } + + @Test + public void testParseJson() throws Exception + { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + byte[] bytes = serializer.serialize(book); + assertParsedRow(bytes); + } + + @Test + public void testParseWithJarPath() throws Exception + { + TSerializer serializer = new TSerializer(new TCompactProtocol.Factory()); + byte[] bytes = serializer.serialize(book); + + ThriftInputFormat format = new ThriftInputFormat(flattenSpec, "example/book.jar", THRIFT_CLASS); + InputRow row = readSingleRow(format, bytes); + Assert.assertEquals("title", row.getDimension("title").get(0)); + Assert.assertEquals("last", row.getDimension("lastName").get(0)); + } + + @Test + public void testSerde() throws Exception + { + ObjectMapper mapper = new DefaultObjectMapper(); + for (Module module : new ThriftExtensionsModule().getJacksonModules()) { + mapper.registerModule(module); + } + + ThriftInputFormat format = new ThriftInputFormat(flattenSpec, null, THRIFT_CLASS); + String json = mapper.writeValueAsString(format); + ThriftInputFormat deserialized = (ThriftInputFormat) mapper.readValue(json, org.apache.druid.data.input.InputFormat.class); + + Assert.assertEquals(format, deserialized); + } + + @Test + public void testIsSplittable() + { + ThriftInputFormat format = new ThriftInputFormat(null, null, THRIFT_CLASS); + Assert.assertFalse(format.isSplittable()); + } + + @Test + public void testEquals() + { + EqualsVerifier.forClass(ThriftInputFormat.class).usingGetClass().verify(); + } + + private void assertParsedRow(byte[] bytes) throws IOException + { + ThriftInputFormat format = new ThriftInputFormat(flattenSpec, null, THRIFT_CLASS); + InputRow row = readSingleRow(format, bytes); + Assert.assertEquals("title", row.getDimension("title").get(0)); + Assert.assertEquals("last", row.getDimension("lastName").get(0)); + } + + private InputRow readSingleRow(ThriftInputFormat format, byte[] bytes) throws IOException + { + InputEntityReader reader = format.createReader(schema, new ByteEntity(bytes), null); + try (CloseableIterator iterator = reader.read()) { + Assert.assertTrue(iterator.hasNext()); + InputRow row = iterator.next(); + Assert.assertFalse(iterator.hasNext()); + return row; + } + } +} diff --git a/website/.spelling b/website/.spelling index 309eddb17772..41c86bdb1781 100644 --- a/website/.spelling +++ b/website/.spelling @@ -562,6 +562,7 @@ stdout storages stringDictionaryEncoding stringified +structs sub-conditions subarray subnet