Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/druid/data/input/Firehose.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ public interface Firehose extends Closeable
/**
* Returns an {@link InputRowListPlusRawValues} object containing the InputRow plus the raw, unparsed data corresponding to
* the next row available. Used in the sampler to provide the caller with information to assist in configuring a parse
* spec. If a ParseException is thrown by the parser, it should be caught and returned in the InputRowListPlusJson so
* spec. If a ParseException is thrown by the parser, it should be caught and returned in the InputRowListPlusRawValues so
* we will be able to provide information on the raw row which failed to be parsed. Should only be called if hasMore
* returns true.
*
* @return an InputRowListPlusJson which may contain any of: an InputRow, map of the raw data, or a ParseException
* @return an InputRowListPlusRawValues which may contain any of: an InputRow, map of the raw data, or a ParseException
*/
@Deprecated
default InputRowListPlusRawValues nextRowWithRaw() throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.druid.guice.annotations.UnstableApi;

import java.io.File;
import java.io.IOException;

/**
* InputFormat abstracts the file format of input data.
Expand All @@ -57,5 +58,9 @@ public interface InputFormat
@JsonIgnore
boolean isSplittable();

InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory);
InputEntityReader createReader(
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory
) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class FileEntity implements InputEntity
{
private final File file;

FileEntity(File file)
public FileEntity(File file)
{
this.file = file;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ public CloseableIterator<InputRow> read()
{
return createIterator(entity -> {
// InputEntityReader is stateful and so a new one should be created per entity.
final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entity, temporaryDirectory);
try {
final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entity, temporaryDirectory);
return reader.read();
}
catch (IOException e) {
Expand All @@ -88,8 +88,8 @@ public CloseableIterator<InputRowListPlusRawValues> sample()
{
return createIterator(entity -> {
// InputEntityReader is stateful and so a new one should be created per entity.
final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entity, temporaryDirectory);
try {
final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entity, temporaryDirectory);
return reader.sample();
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.spi.json.JsonProvider;
import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
import net.thisptr.jackson.jq.JsonQuery;
import net.thisptr.jackson.jq.exception.JsonQueryException;
Expand All @@ -43,9 +44,11 @@

public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker<JsonNode>
{
private static final JsonProvider JSON_PROVIDER = new FastJacksonJsonNodeJsonProvider();

private static final Configuration JSONPATH_CONFIGURATION =
Configuration.builder()
.jsonProvider(new FastJacksonJsonNodeJsonProvider())
.jsonProvider(JSON_PROVIDER)
.mappingProvider(new JacksonMappingProvider())
.options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
.build();
Expand Down Expand Up @@ -97,6 +100,12 @@ public Function<JsonNode, Object> makeJsonQueryExtractor(final String expr)
}
}

@Override
public JsonProvider getJsonProvider()
{
return JSON_PROVIDER;
}

@Nullable
private Object valueConversionFunction(JsonNode val)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@
public interface ObjectFlattener<T>
{
Map<String, Object> flatten(T obj);

Map<String, Object> toMap(T obj);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@
package org.apache.druid.java.util.common.parsers;

import com.google.common.collect.Iterables;
import com.jayway.jsonpath.spi.json.JsonProvider;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.UOE;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -194,17 +198,88 @@ public Object setValue(final Object value)
}
};
}

@Override
public Map<String, Object> toMap(T obj)
{
return flattenerMaker.toMap(obj);
}
};
}

public interface FlattenerMaker<T>
{
JsonProvider getJsonProvider();
/**
* List all "root" primitive properties and primitive lists (no nested objects, no lists of objects)
*/
Iterable<String> discoverRootFields(T obj);

/**
* Get a top level field from a "json" object
*/
Object getRootField(T obj, String key);

/**
* Create a "field" extractor for {@link com.jayway.jsonpath.JsonPath} expressions
*/
Function<T, Object> makeJsonPathExtractor(String expr);

/**
* Create a "field" extractor for 'jq' expressions
*/
Function<T, Object> makeJsonQueryExtractor(String expr);

/**
* Convert object to Java {@link Map} using {@link #getJsonProvider()} and {@link #finalizeConversionForMap} to
* extract and convert data
*/
default Map<String, Object> toMap(T obj)
{
return (Map<String, Object>) toMapHelper(obj);
}

/**
* Recursively traverse "json" object using a {@link JsonProvider}, converting to Java {@link Map} and {@link List},
* potentially transforming via {@link #finalizeConversionForMap} as we go
*/
default Object toMapHelper(Object o)
{
final JsonProvider jsonProvider = getJsonProvider();
if (jsonProvider.isMap(o)) {
Map<String, Object> actualMap = new HashMap<>();
for (String key : jsonProvider.getPropertyKeys(o)) {
Object field = jsonProvider.getMapValue(o, key);
if (jsonProvider.isMap(field) || jsonProvider.isArray(field)) {
actualMap.put(key, toMapHelper(finalizeConversionForMap(field)));
} else {
actualMap.put(key, finalizeConversionForMap(field));
}
}
return actualMap;
} else if (jsonProvider.isArray(o)) {
final int length = jsonProvider.length(o);
List<Object> actualList = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
Object element = jsonProvider.getArrayIndex(o, i);
if (jsonProvider.isMap(element) || jsonProvider.isArray(element)) {
actualList.add(toMapHelper(finalizeConversionForMap(element)));
} else {
actualList.add(finalizeConversionForMap(element));
}
}
return finalizeConversionForMap(actualList);
}
// unknown, just pass it through
return o;
}

/**
* Handle any special conversions for object when translating an input type into a {@link Map} for {@link #toMap}
*/
default Object finalizeConversionForMap(Object o)
{
return o;
}
}
}
45 changes: 42 additions & 3 deletions docs/development/extensions-core/parquet.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,51 @@ title: "Apache Parquet Extension"
This Apache Druid (incubating) module extends [Druid Hadoop based indexing](../../ingestion/hadoop.md) to ingest data directly from offline
Apache Parquet files.

Note: `druid-parquet-extensions` depends on the `druid-avro-extensions` module, so be sure to
Note: If using the `parquet-avro` parser for Apache Hadoop based indexing, `druid-parquet-extensions` depends on the `druid-avro-extensions` module, so be sure to
[include both](../../development/extensions.md#loading-extensions).

## Parquet and Native Batch
This extension provides a `parquet` input format which can be used with Druid [native batch ingestion](../../ingestion/native-batch.md).

### Parquet InputFormat
|Field | Type | Description | Required|
|---|---|---|---|
|type| String| This should be set to `parquet` to read Parquet file| yes |
|flattenSpec| JSON Object |Define a [`flattenSpec`](../../ingestion/index.md#flattenspec) to extract nested values from a Parquet file. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) |
| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default == false) |

### Example

```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "local",
"baseDir": "/some/path/to/file/",
"filter": "file.parquet"
},
"inputFormat": {
"type": "parquet"
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type": "path",
"name": "nested",
"expr": "$.path.to.nested"
}
]
}
"binaryAsString": false
},
...
}
...
```
## Parquet Hadoop Parser

This extension provides two ways to parse Parquet files:
For Hadoop, this extension provides two parser implementations for reading Parquet files:

* `parquet` - using a simple conversion contained within this extension
* `parquet-avro` - conversion to avro records with the `parquet-avro` library and using the `druid-avro-extensions`
Expand Down Expand Up @@ -62,7 +101,7 @@ However, `parquet-avro` was the original basis for this extension, and as such i
|----------|-------------|----------------------------------------------------------------------------------------|---------|
| type | String | Choose `parquet` or `parquet-avro` to determine how Parquet files are parsed | yes |
| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data, and optionally, a flatten spec. Valid parseSpec formats are `timeAndDims`, `parquet`, `avro` (if used with avro conversion). | yes |
| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be converted to strings anyway. | no(default == false) |
| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no(default == false) |

When the time dimension is a [DateType column](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md), a format should not be supplied. When the format is UTF8 (String), either `auto` or a explicitly defined [format](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html) is required.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.spi.json.JsonProvider;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
Expand All @@ -39,9 +40,10 @@

public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<GenericRecord>
{
private static final JsonProvider AVRO_JSON_PROVIDER = new GenericAvroJsonProvider();
private static final Configuration JSONPATH_CONFIGURATION =
Configuration.builder()
.jsonProvider(new GenericAvroJsonProvider())
.jsonProvider(AVRO_JSON_PROVIDER)
.mappingProvider(new NotImplementedMappingProvider())
.options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
.build();
Expand Down Expand Up @@ -125,6 +127,12 @@ public Function<GenericRecord, Object> makeJsonQueryExtractor(final String expr)
throw new UnsupportedOperationException("Avro + JQ not supported");
}

@Override
public JsonProvider getJsonProvider()
{
return AVRO_JSON_PROVIDER;
}

private Object transformValue(final Object field)
{
if (field instanceof ByteBuffer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.spi.json.JsonProvider;
import org.apache.druid.java.util.common.parsers.NotImplementedMappingProvider;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.orc.TypeDescription;
Expand All @@ -38,13 +39,15 @@
public class OrcStructFlattenerMaker implements ObjectFlatteners.FlattenerMaker<OrcStruct>
{
private final Configuration jsonPathConfiguration;
private final JsonProvider orcJsonProvider;
private final OrcStructConverter converter;

OrcStructFlattenerMaker(boolean binaryAsString)
{
this.converter = new OrcStructConverter(binaryAsString);
this.orcJsonProvider = new OrcStructJsonProvider(converter);
this.jsonPathConfiguration = Configuration.builder()
.jsonProvider(new OrcStructJsonProvider(converter))
.jsonProvider(orcJsonProvider)
.mappingProvider(new NotImplementedMappingProvider())
.options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
.build();
Expand Down Expand Up @@ -88,6 +91,12 @@ public Function<OrcStruct, Object> makeJsonQueryExtractor(String expr)
throw new UnsupportedOperationException("ORC flattener does not support JQ");
}

@Override
public JsonProvider getJsonProvider()
{
return orcJsonProvider;
}

private Object finalizeConversion(Object o)
{
// replace any remaining complex types with null
Expand Down
Loading