Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions docs/ingestion/data-formats.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,41 @@ The Parquet `inputFormat` has the following components:
|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Parquet file. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) |
| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |

### Thrift Stream
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I'm not sure we have any other 'contrib' extensions described in this section, it might be best if this lives in https://github.com/apache/druid/blob/master/docs/development/extensions-contrib/thrift.md for now. On the other hand, thrift i think is the only data format that isn't a core extension (maybe in the future we should just consider adding integration tests and making it a core extension?), so maybe it is ok to be here. @techdocsmith do you have any thoughts?

Also, looking closer at the code, I guess this might also work with batch ingestion too since the deserializer detects the format based on the bytes given to it, though I haven't personally used this extension or tested this scenario. I'll see if I can find some time to pull your branch and test it out

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for @clintropolis suggestion to keep the doc in /docs/development/extensions-contrib/thrift.md until the extension is made core.


> You need to include the [`druid-thrift-extensions`](../development/extensions-contrib/thrift.md) as an extension to use the Thrift Stream input format.

The `inputFormat` to load data of Thrift format in stream ingestion. An example is:
```json
"ioConfig": {
"inputFormat": {
"type": "thrift",
"flattenSpec": {
"useFieldDiscovery": true,
"jarPath": "book.jar",
"thriftClass": "org.apache.druid.data.input.thrift.Book",
"fields": [
{
"type": "path",
"name": "someRecord_subInt",
"expr": "$.someRecord.subInt"
}
]
},
"binaryAsString": false
},
...
}
```

| Field | Type | Description | Required |
|-------|------|-------------|----------|
|type| String| This should be set to `thrift` to read Thrift serialized data| yes |
|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Thrift record. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) |
| binaryAsString | Boolean | Specifies if the bytes Thrift column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |
| thriftJar | String | path of Thrift jar, if not provided, it will try to find the Thrift class in classpath.| no |
| thriftClass | String | classname of Thrift | yes |

### Avro Stream

> You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro Stream input format.
Expand Down
30 changes: 30 additions & 0 deletions extensions-contrib/thrift-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,36 @@
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>2.0.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.10.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.2</version>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think versions on a lot of these should be already defined in the top level pom (the dependency checker in travis sometimes suggests more than is necessary to fix the issue)

<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>0.22.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public List<? extends Module> getJacksonModules()
return Collections.singletonList(
new SimpleModule("ThriftInputRowParserModule")
.registerSubtypes(
new NamedType(ThriftInputRowParser.class, "thrift")
new NamedType(ThriftInputRowParser.class, "thrift"),
new NamedType(ThriftInputFormat.class, "thrift")
)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input.thrift;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.NestedInputFormat;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;

import javax.annotation.Nullable;

import java.io.File;
import java.util.Objects;

public class ThriftInputFormat extends NestedInputFormat
{
private final String jarPath;
private final String thriftClassName;

@JsonCreator
public ThriftInputFormat(
@JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
@JsonProperty("thriftJar") String jarPath,
@JsonProperty("thriftClass") String thriftClassName
)
{
super(flattenSpec);
this.jarPath = jarPath;
this.thriftClassName = thriftClassName;
Preconditions.checkNotNull(thriftClassName, "thrift class name");
}

@Override
public boolean isSplittable()
{
return false;
}

@JsonProperty
public String getThriftJar()
{
return jarPath;
}

@JsonProperty
public String getThriftClass()
{
return thriftClassName;
}

@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
return new ThriftReader(
inputRowSchema,
source,
jarPath,
thriftClassName,
getFlattenSpec()
);
}

@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ThriftInputFormat that = (ThriftInputFormat) o;
return Objects.equals(getFlattenSpec(), that.getFlattenSpec()) &&
Objects.equals(jarPath, that.jarPath) &&
Objects.equals(thriftClassName, that.thriftClassName);
}

@Override
public int hashCode()
{
return Objects.hash(jarPath, thriftClassName, getFlattenSpec());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input.thrift;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterators;
import org.apache.commons.io.IOUtils;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.IntermediateRowParsingReader;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.CollectionUtils;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
import java.util.Map;

public class ThriftReader extends IntermediateRowParsingReader<String>
{
private final InputEntity source;
private final String jarPath;
private final String thriftClassName;
private volatile Class<TBase> thriftClass = null;
private final ObjectMapper objectMapper;
private final ObjectFlattener<JsonNode> flattener;
private final InputRowSchema inputRowSchema;


ThriftReader(
InputRowSchema inputRowSchema,
InputEntity source,
String jarPath,
String thriftClassName,
JSONPathSpec flattenSpec
)
{
this.inputRowSchema = inputRowSchema;
this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(true));
this.source = source;
this.jarPath = jarPath;
this.thriftClassName = thriftClassName;
this.objectMapper = new ObjectMapper();
if (thriftClass == null) {
try {
thriftClass = getThriftClass();
}
catch (IOException e) {
throw new IAE(e, "failed to load jar [%s]", jarPath);
}
catch (ClassNotFoundException e) {
throw new IAE(e, "class [%s] not found in jar", thriftClassName);
}
catch (InstantiationException | IllegalAccessException e) {
throw new IAE(e, "instantiation thrift instance failed");
}
}
}

public Class<TBase> getThriftClass()
throws IOException, ClassNotFoundException, IllegalAccessException, InstantiationException
{
final Class<TBase> thrift;
if (jarPath != null) {
File jar = new File(jarPath);
URLClassLoader child = new URLClassLoader(
new URL[] {jar.toURI().toURL()},
this.getClass().getClassLoader()
);
thrift = (Class<TBase>) Class.forName(thriftClassName, true, child);
} else {
thrift = (Class<TBase>) Class.forName(thriftClassName);
}
thrift.newInstance();
return thrift;
}

@Override
protected CloseableIterator<String> intermediateRowIterator() throws IOException
{
final String json;
try {
final byte[] bytes = IOUtils.toByteArray(source.open());
TBase o = thriftClass.newInstance();
ThriftDeserialization.detectAndDeserialize(bytes, o);
json = ThriftDeserialization.SERIALIZER_SIMPLE_JSON.get().toString(o);
}
catch (IllegalAccessException | InstantiationException | TException e) {
throw new IAE("some thing wrong with your thrift?");
}

return CloseableIterators.withEmptyBaggage(
Iterators.singletonIterator(json)
);
}

@Override
protected List<InputRow> parseInputRows(String intermediateRow) throws IOException, ParseException
{
final List<InputRow> inputRows;
JsonParser parser = new JsonFactory().createParser(intermediateRow);
final MappingIterator<JsonNode> delegate = this.objectMapper.readValues(parser, JsonNode.class);
inputRows = FluentIterable.from(() -> delegate)
.transform(jsonNode -> MapInputRowParser.parse(inputRowSchema, flattener.flatten(jsonNode)))
.toList();
if (CollectionUtils.isNullOrEmpty(inputRows)) {
throw new ParseException("Unable to parse [%s] as the intermediateRow resulted in empty input row", intermediateRow);
}
return inputRows;
}

@Override
protected List<Map<String, Object>> toMap(String intermediateRow) throws IOException
{
JsonParser parser = new JsonFactory().createParser(intermediateRow);
final MappingIterator<Map> delegate = objectMapper.readValues(parser, Map.class);
return FluentIterable.from(() -> delegate)
.transform(map -> (Map<String, Object>) map)
.toList();
}
}
Loading