diff --git a/core/src/main/java/org/apache/druid/data/input/Firehose.java b/core/src/main/java/org/apache/druid/data/input/Firehose.java index 6c7605ca11d0..66995bda184a 100644 --- a/core/src/main/java/org/apache/druid/data/input/Firehose.java +++ b/core/src/main/java/org/apache/druid/data/input/Firehose.java @@ -69,11 +69,11 @@ public interface Firehose extends Closeable /** * Returns an {@link InputRowListPlusRawValues} object containing the InputRow plus the raw, unparsed data corresponding to * the next row available. Used in the sampler to provide the caller with information to assist in configuring a parse - * spec. If a ParseException is thrown by the parser, it should be caught and returned in the InputRowListPlusJson so + * spec. If a ParseException is thrown by the parser, it should be caught and returned in the InputRowListPlusRawValues so * we will be able to provide information on the raw row which failed to be parsed. Should only be called if hasMore * returns true. * - * @return an InputRowListPlusJson which may contain any of: an InputRow, map of the raw data, or a ParseException + * @return an InputRowListPlusRawValues which may contain any of: an InputRow, map of the raw data, or a ParseException */ @Deprecated default InputRowListPlusRawValues nextRowWithRaw() throws IOException diff --git a/core/src/main/java/org/apache/druid/data/input/InputFormat.java b/core/src/main/java/org/apache/druid/data/input/InputFormat.java index 2ea04bd1e0b5..cb6b495179b1 100644 --- a/core/src/main/java/org/apache/druid/data/input/InputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/InputFormat.java @@ -31,6 +31,7 @@ import org.apache.druid.guice.annotations.UnstableApi; import java.io.File; +import java.io.IOException; /** * InputFormat abstracts the file format of input data. @@ -57,5 +58,9 @@ public interface InputFormat @JsonIgnore boolean isSplittable(); - InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory); + InputEntityReader createReader( + InputRowSchema inputRowSchema, + InputEntity source, + File temporaryDirectory + ) throws IOException; } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/FileEntity.java b/core/src/main/java/org/apache/druid/data/input/impl/FileEntity.java index d48345710a3e..568d6152e5b0 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/FileEntity.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/FileEntity.java @@ -34,7 +34,7 @@ public class FileEntity implements InputEntity { private final File file; - FileEntity(File file) + public FileEntity(File file) { this.file = file; } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java b/core/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java index 3ca0412d8852..64581763b22a 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java @@ -73,8 +73,8 @@ public CloseableIterator read() { return createIterator(entity -> { // InputEntityReader is stateful and so a new one should be created per entity. - final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entity, temporaryDirectory); try { + final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entity, temporaryDirectory); return reader.read(); } catch (IOException e) { @@ -88,8 +88,8 @@ public CloseableIterator sample() { return createIterator(entity -> { // InputEntityReader is stateful and so a new one should be created per entity. - final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entity, temporaryDirectory); try { + final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entity, temporaryDirectory); return reader.sample(); } catch (IOException e) { diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java index 8d58451f34bb..135c70271c23 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java @@ -24,6 +24,7 @@ import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.Option; +import com.jayway.jsonpath.spi.json.JsonProvider; import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider; import net.thisptr.jackson.jq.JsonQuery; import net.thisptr.jackson.jq.exception.JsonQueryException; @@ -43,9 +44,11 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker { + private static final JsonProvider JSON_PROVIDER = new FastJacksonJsonNodeJsonProvider(); + private static final Configuration JSONPATH_CONFIGURATION = Configuration.builder() - .jsonProvider(new FastJacksonJsonNodeJsonProvider()) + .jsonProvider(JSON_PROVIDER) .mappingProvider(new JacksonMappingProvider()) .options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS)) .build(); @@ -97,6 +100,12 @@ public Function makeJsonQueryExtractor(final String expr) } } + @Override + public JsonProvider getJsonProvider() + { + return JSON_PROVIDER; + } + @Nullable private Object valueConversionFunction(JsonNode val) { diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlattener.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlattener.java index ae674c84e95e..36e7ca34acc3 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlattener.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlattener.java @@ -24,4 +24,6 @@ public interface ObjectFlattener { Map flatten(T obj); + + Map toMap(T obj); } diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java index 2cdb2339f06d..2130c016d056 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java @@ -20,13 +20,17 @@ package org.apache.druid.java.util.common.parsers; import com.google.common.collect.Iterables; +import com.jayway.jsonpath.spi.json.JsonProvider; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.UOE; import java.util.AbstractMap; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; @@ -194,17 +198,88 @@ public Object setValue(final Object value) } }; } + + @Override + public Map toMap(T obj) + { + return flattenerMaker.toMap(obj); + } }; } public interface FlattenerMaker { + JsonProvider getJsonProvider(); + /** + * List all "root" primitive properties and primitive lists (no nested objects, no lists of objects) + */ Iterable discoverRootFields(T obj); + /** + * Get a top level field from a "json" object + */ Object getRootField(T obj, String key); + /** + * Create a "field" extractor for {@link com.jayway.jsonpath.JsonPath} expressions + */ Function makeJsonPathExtractor(String expr); + /** + * Create a "field" extractor for 'jq' expressions + */ Function makeJsonQueryExtractor(String expr); + + /** + * Convert object to Java {@link Map} using {@link #getJsonProvider()} and {@link #finalizeConversionForMap} to + * extract and convert data + */ + default Map toMap(T obj) + { + return (Map) toMapHelper(obj); + } + + /** + * Recursively traverse "json" object using a {@link JsonProvider}, converting to Java {@link Map} and {@link List}, + * potentially transforming via {@link #finalizeConversionForMap} as we go + */ + default Object toMapHelper(Object o) + { + final JsonProvider jsonProvider = getJsonProvider(); + if (jsonProvider.isMap(o)) { + Map actualMap = new HashMap<>(); + for (String key : jsonProvider.getPropertyKeys(o)) { + Object field = jsonProvider.getMapValue(o, key); + if (jsonProvider.isMap(field) || jsonProvider.isArray(field)) { + actualMap.put(key, toMapHelper(finalizeConversionForMap(field))); + } else { + actualMap.put(key, finalizeConversionForMap(field)); + } + } + return actualMap; + } else if (jsonProvider.isArray(o)) { + final int length = jsonProvider.length(o); + List actualList = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + Object element = jsonProvider.getArrayIndex(o, i); + if (jsonProvider.isMap(element) || jsonProvider.isArray(element)) { + actualList.add(toMapHelper(finalizeConversionForMap(element))); + } else { + actualList.add(finalizeConversionForMap(element)); + } + } + return finalizeConversionForMap(actualList); + } + // unknown, just pass it through + return o; + } + + /** + * Handle any special conversions for object when translating an input type into a {@link Map} for {@link #toMap} + */ + default Object finalizeConversionForMap(Object o) + { + return o; + } } } diff --git a/docs/development/extensions-core/parquet.md b/docs/development/extensions-core/parquet.md index 645cf2418d00..b2629b5db391 100644 --- a/docs/development/extensions-core/parquet.md +++ b/docs/development/extensions-core/parquet.md @@ -26,12 +26,51 @@ title: "Apache Parquet Extension" This Apache Druid (incubating) module extends [Druid Hadoop based indexing](../../ingestion/hadoop.md) to ingest data directly from offline Apache Parquet files. -Note: `druid-parquet-extensions` depends on the `druid-avro-extensions` module, so be sure to +Note: If using the `parquet-avro` parser for Apache Hadoop based indexing, `druid-parquet-extensions` depends on the `druid-avro-extensions` module, so be sure to [include both](../../development/extensions.md#loading-extensions). +## Parquet and Native Batch +This extension provides a `parquet` input format which can be used with Druid [native batch ingestion](../../ingestion/native-batch.md). + +### Parquet InputFormat +|Field | Type | Description | Required| +|---|---|---|---| +|type| String| This should be set to `parquet` to read Parquet file| yes | +|flattenSpec| JSON Object |Define a [`flattenSpec`](../../ingestion/index.md#flattenspec) to extract nested values from a Parquet file. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) | +| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default == false) | + +### Example + +```json + ... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "local", + "baseDir": "/some/path/to/file/", + "filter": "file.parquet" + }, + "inputFormat": { + "type": "parquet" + "flattenSpec": { + "useFieldDiscovery": true, + "fields": [ + { + "type": "path", + "name": "nested", + "expr": "$.path.to.nested" + } + ] + } + "binaryAsString": false + }, + ... + } + ... +``` ## Parquet Hadoop Parser -This extension provides two ways to parse Parquet files: +For Hadoop, this extension provides two parser implementations for reading Parquet files: * `parquet` - using a simple conversion contained within this extension * `parquet-avro` - conversion to avro records with the `parquet-avro` library and using the `druid-avro-extensions` @@ -62,7 +101,7 @@ However, `parquet-avro` was the original basis for this extension, and as such i |----------|-------------|----------------------------------------------------------------------------------------|---------| | type | String | Choose `parquet` or `parquet-avro` to determine how Parquet files are parsed | yes | | parseSpec | JSON Object | Specifies the timestamp and dimensions of the data, and optionally, a flatten spec. Valid parseSpec formats are `timeAndDims`, `parquet`, `avro` (if used with avro conversion). | yes | -| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be converted to strings anyway. | no(default == false) | +| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no(default == false) | When the time dimension is a [DateType column](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md), a format should not be supplied. When the format is UTF8 (String), either `auto` or a explicitly defined [format](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html) is required. diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java index a142dcab9ee8..1957bf9ae5ad 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java @@ -22,6 +22,7 @@ import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.Option; +import com.jayway.jsonpath.spi.json.JsonProvider; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; @@ -39,9 +40,10 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker { + private static final JsonProvider AVRO_JSON_PROVIDER = new GenericAvroJsonProvider(); private static final Configuration JSONPATH_CONFIGURATION = Configuration.builder() - .jsonProvider(new GenericAvroJsonProvider()) + .jsonProvider(AVRO_JSON_PROVIDER) .mappingProvider(new NotImplementedMappingProvider()) .options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS)) .build(); @@ -125,6 +127,12 @@ public Function makeJsonQueryExtractor(final String expr) throw new UnsupportedOperationException("Avro + JQ not supported"); } + @Override + public JsonProvider getJsonProvider() + { + return AVRO_JSON_PROVIDER; + } + private Object transformValue(final Object field) { if (field instanceof ByteBuffer) { diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructFlattenerMaker.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructFlattenerMaker.java index a7094666255a..f947a8e0c810 100644 --- a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructFlattenerMaker.java +++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructFlattenerMaker.java @@ -22,6 +22,7 @@ import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.Option; +import com.jayway.jsonpath.spi.json.JsonProvider; import org.apache.druid.java.util.common.parsers.NotImplementedMappingProvider; import org.apache.druid.java.util.common.parsers.ObjectFlatteners; import org.apache.orc.TypeDescription; @@ -38,13 +39,15 @@ public class OrcStructFlattenerMaker implements ObjectFlatteners.FlattenerMaker { private final Configuration jsonPathConfiguration; + private final JsonProvider orcJsonProvider; private final OrcStructConverter converter; OrcStructFlattenerMaker(boolean binaryAsString) { this.converter = new OrcStructConverter(binaryAsString); + this.orcJsonProvider = new OrcStructJsonProvider(converter); this.jsonPathConfiguration = Configuration.builder() - .jsonProvider(new OrcStructJsonProvider(converter)) + .jsonProvider(orcJsonProvider) .mappingProvider(new NotImplementedMappingProvider()) .options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS)) .build(); @@ -88,6 +91,12 @@ public Function makeJsonQueryExtractor(String expr) throw new UnsupportedOperationException("ORC flattener does not support JQ"); } + @Override + public JsonProvider getJsonProvider() + { + return orcJsonProvider; + } + private Object finalizeConversion(Object o) { // replace any remaining complex types with null diff --git a/extensions-core/parquet-extensions/pom.xml b/extensions-core/parquet-extensions/pom.xml index 3c4603602131..b45805902576 100644 --- a/extensions-core/parquet-extensions/pom.xml +++ b/extensions-core/parquet-extensions/pom.xml @@ -137,10 +137,249 @@ ${project.parent.version} provided + org.apache.hadoop - hadoop-client - provided + hadoop-mapreduce-client-core + ${hadoop.compile.version} + compile + + + aopalliance + aopalliance + + + org.apache.commons + commons-compress + + + com.google.guava + guava + + + com.google.inject + guice + + + com.google.inject.extensions + guice-servlet + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + javax.inject + javax + + + io.netty + netty + + + slf4j-log4j12 + org.slf4j + + + org.slf4j + slf4j-api + + + protobuf-java + com.google.protobuf + + + + + org.apache.hadoop + hadoop-common + ${hadoop.compile.version} + compile + + + org.apache.yetus + audience-annotations + + + commons-codec + commons-codec + + + org.apache.commons + commons-compress + + + commons-io + commons-io + + + commons-lang + commons-lang + + + org.apache.commons + commons-math3 + + + commons-net + commons-net + + + org.apache.curator + curator-client + + + org.apache.curator + curator-framework + + + org.apache.curator + curator-recipes + + + com.google.guava + guava + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + com.sun.jersey + jersey-core + + + com.sun.jersey + jersey-server + + + javax.servlet.jsp + jsp-api + + + com.google.code.findbugs + jsr305 + + + javax.ws.rs + jsr311-api + + + org.apache.zookeeper + zookeeper + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 + + + com.sun.jersey + jersey-json + + + log4j + log4j + + + jetty-sslengine + org.mortbay.jetty + + + jetty-util + org.mortbay.jetty + + + jackson-core-asl + org.codehaus.jackson + + + jets3t + net.java.dev.jets3t + + + jackson-mapper-asl + org.codehaus.jackson + + + jetty + org.mortbay.jetty + + + gson + com.google.code.gson + + + xmlenc + xmlenc + + + httpclient + org.apache.httpcomponents + + + jsch + com.jcraft + + + protobuf-java + com.google.protobuf + + + commons-collections + commons-collections + + + commons-logging + commons-logging + + + commons-cli + commons-cli + + + commons-digester + commons-digester + + + commons-beanutils-core + commons-beanutils + + + apacheds-kerberos-codec + org.apache.directory.server + + + nimbus-jose-jwt + com.nimbusds + + com.google.code.findbugs @@ -162,16 +401,6 @@ joda-time provided - - org.apache.hadoop - hadoop-mapreduce-client-core - provided - - - org.apache.hadoop - hadoop-common - provided - com.google.inject guice diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetExtensionsModule.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetExtensionsModule.java index f87aacb54ef3..dd2cf4c34f82 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetExtensionsModule.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetExtensionsModule.java @@ -23,20 +23,33 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; +import com.google.inject.Inject; import org.apache.druid.data.input.parquet.avro.ParquetAvroHadoopInputRowParser; import org.apache.druid.data.input.parquet.simple.ParquetHadoopInputRowParser; import org.apache.druid.data.input.parquet.simple.ParquetParseSpec; import org.apache.druid.initialization.DruidModule; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Properties; public class ParquetExtensionsModule implements DruidModule { - public static final String PARQUET_SIMPLE_INPUT_PARSER_TYPE = "parquet"; - public static final String PARQUET_SIMPLE_PARSE_SPEC_TYPE = "parquet"; - public static final String PARQUET_AVRO_INPUT_PARSER_TYPE = "parquet-avro"; - public static final String PARQUET_AVRO_PARSE_SPEC_TYPE = "avro"; + static final String PARQUET_SIMPLE_INPUT_PARSER_TYPE = "parquet"; + static final String PARQUET_SIMPLE_PARSE_SPEC_TYPE = "parquet"; + static final String PARQUET_AVRO_INPUT_PARSER_TYPE = "parquet-avro"; + static final String PARQUET_AVRO_PARSE_SPEC_TYPE = "avro"; + + private Properties props = null; + + @Inject + public void setProperties(Properties props) + { + this.props = props; + } @Override public List getJacksonModules() @@ -46,7 +59,8 @@ public List getJacksonModules() .registerSubtypes( new NamedType(ParquetAvroHadoopInputRowParser.class, PARQUET_AVRO_INPUT_PARSER_TYPE), new NamedType(ParquetHadoopInputRowParser.class, PARQUET_SIMPLE_INPUT_PARSER_TYPE), - new NamedType(ParquetParseSpec.class, PARQUET_SIMPLE_INPUT_PARSER_TYPE) + new NamedType(ParquetParseSpec.class, PARQUET_SIMPLE_INPUT_PARSER_TYPE), + new NamedType(ParquetInputFormat.class, PARQUET_SIMPLE_PARSE_SPEC_TYPE) ) ); } @@ -54,6 +68,36 @@ public List getJacksonModules() @Override public void configure(Binder binder) { + // this block of code is common among extensions that use Hadoop things but are not running in Hadoop, in order + // to properly initialize everything + + final Configuration conf = new Configuration(); + + // Set explicit CL. Otherwise it'll try to use thread context CL, which may not have all of our dependencies. + conf.setClassLoader(getClass().getClassLoader()); + + // Ensure that FileSystem class level initialization happens with correct CL + // See https://github.com/apache/incubator-druid/issues/1714 + ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + FileSystem.get(conf); + } + catch (IOException ex) { + throw new RuntimeException(ex); + } + finally { + Thread.currentThread().setContextClassLoader(currCtxCl); + } + + if (props != null) { + for (String propName : props.stringPropertyNames()) { + if (propName.startsWith("hadoop.")) { + conf.set(propName.substring("hadoop.".length()), props.getProperty(propName)); + } + } + } + binder.bind(Configuration.class).toInstance(conf); } } diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java new file mode 100644 index 000000000000..fbd93414a4a1 --- /dev/null +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.parquet; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.NestedInputFormat; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.Objects; + +public class ParquetInputFormat extends NestedInputFormat +{ + private final boolean binaryAsString; + + @JsonCreator + public ParquetInputFormat( + @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec, + @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString + ) + { + super(flattenSpec); + this.binaryAsString = binaryAsString == null ? false : binaryAsString; + } + + @JsonProperty + public boolean getBinaryAsString() + { + return binaryAsString; + } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public InputEntityReader createReader( + InputRowSchema inputRowSchema, + InputEntity source, + File temporaryDirectory + ) throws IOException + { + return new ParquetReader(inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + ParquetInputFormat that = (ParquetInputFormat) o; + return binaryAsString == that.binaryAsString; + } + + @Override + public int hashCode() + { + return Objects.hash(super.hashCode(), binaryAsString); + } +} diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java new file mode 100644 index 000000000000..a98f929a1c03 --- /dev/null +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.parquet; + +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.data.input.parquet.simple.ParquetGroupFlattenerMaker; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.common.parsers.ObjectFlattener; +import org.apache.druid.java.util.common.parsers.ObjectFlatteners; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.example.GroupReadSupport; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +public class ParquetReader extends IntermediateRowParsingReader +{ + private final InputRowSchema inputRowSchema; + private final ObjectFlattener flattener; + + private final org.apache.parquet.hadoop.ParquetReader reader; + private final Closer closer; + + ParquetReader( + InputRowSchema inputRowSchema, + InputEntity source, + File temporaryDirectory, + JSONPathSpec flattenSpec, + boolean binaryAsString + ) throws IOException + { + this.inputRowSchema = inputRowSchema; + this.flattener = ObjectFlatteners.create(flattenSpec, new ParquetGroupFlattenerMaker(binaryAsString)); + + closer = Closer.create(); + byte[] buffer = new byte[InputEntity.DEFAULT_FETCH_BUFFER_SIZE]; + final InputEntity.CleanableFile file = closer.register(source.fetch(temporaryDirectory, buffer)); + final Path path = new Path(file.file().toURI()); + + final ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + reader = closer.register(org.apache.parquet.hadoop.ParquetReader.builder(new GroupReadSupport(), path).build()); + } + finally { + Thread.currentThread().setContextClassLoader(currentClassLoader); + } + } + + @Override + protected CloseableIterator intermediateRowIterator() + { + return new CloseableIterator() + { + Group value = null; + + @Override + public boolean hasNext() + { + if (value == null) { + try { + value = reader.read(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + return value != null; + } + + @Override + public Group next() + { + if (value == null) { + throw new NoSuchElementException(); + } + Group currentValue = value; + value = null; + return currentValue; + } + + @Override + public void close() throws IOException + { + closer.close(); + } + }; + } + + @Override + protected List parseInputRows(Group intermediateRow) throws ParseException + { + return Collections.singletonList( + MapInputRowParser.parse( + inputRowSchema.getTimestampSpec(), + inputRowSchema.getDimensionsSpec(), + flattener.flatten(intermediateRow) + ) + ); + } + + @Override + protected Map toMap(Group intermediateRow) + { + return flattener.toMap(intermediateRow); + } +} diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupFlattenerMaker.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupFlattenerMaker.java index 2ba939b162a1..b7681c2dd873 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupFlattenerMaker.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupFlattenerMaker.java @@ -22,6 +22,7 @@ import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.Option; +import com.jayway.jsonpath.spi.json.JsonProvider; import org.apache.druid.java.util.common.parsers.NotImplementedMappingProvider; import org.apache.druid.java.util.common.parsers.ObjectFlatteners; import org.apache.parquet.example.data.Group; @@ -37,15 +38,16 @@ public class ParquetGroupFlattenerMaker implements ObjectFlatteners.FlattenerMaker { - private final Configuration jsonPathConfiguration; private final ParquetGroupConverter converter; + private final JsonProvider parquetJsonProvider; - ParquetGroupFlattenerMaker(boolean binaryAsString) + public ParquetGroupFlattenerMaker(boolean binaryAsString) { this.converter = new ParquetGroupConverter(binaryAsString); + this.parquetJsonProvider = new ParquetGroupJsonProvider(converter); this.jsonPathConfiguration = Configuration.builder() - .jsonProvider(new ParquetGroupJsonProvider(converter)) + .jsonProvider(parquetJsonProvider) .mappingProvider(new NotImplementedMappingProvider()) .options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS)) .build(); @@ -86,6 +88,18 @@ public Function makeJsonQueryExtractor(String expr) throw new UnsupportedOperationException("Parquet does not support JQ"); } + @Override + public JsonProvider getJsonProvider() + { + return parquetJsonProvider; + } + + @Override + public Object finalizeConversionForMap(Object o) + { + return finalizeConversion(o); + } + /** * After json conversion, wrapped list items can still need unwrapped. See * {@link ParquetGroupConverter#isWrappedListPrimitive(Object)} and @@ -101,7 +115,7 @@ private Object finalizeConversion(Object o) if (ParquetGroupConverter.isWrappedListPrimitive(o)) { return converter.unwrapListPrimitive(o); } else if (o instanceof List) { - List asList = ((List) o).stream().filter(Objects::nonNull).collect(Collectors.toList()); + List asList = ((List) o).stream().filter(Objects::nonNull).collect(Collectors.toList()); if (asList.stream().allMatch(ParquetGroupConverter::isWrappedListPrimitive)) { return asList.stream().map(Group.class::cast).map(converter::unwrapListPrimitive).collect(Collectors.toList()); } diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java index b422e97d7edb..3ba15e41a98e 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java @@ -39,7 +39,7 @@ public class ParquetGroupJsonProvider implements JsonProvider { private final ParquetGroupConverter converter; - ParquetGroupJsonProvider(ParquetGroupConverter converter) + public ParquetGroupJsonProvider(ParquetGroupConverter converter) { this.converter = converter; } diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetInputTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetInputTest.java index 3a0da9f89e93..9cfbf0d12306 100644 --- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetInputTest.java +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetInputTest.java @@ -47,21 +47,21 @@ class BaseParquetInputTest { - private static Map parseSpecType = ImmutableMap.of( + private static final Map PARSE_SPEC_TYPES = ImmutableMap.of( ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE, ParquetExtensionsModule.PARQUET_AVRO_PARSE_SPEC_TYPE, ParquetExtensionsModule.PARQUET_SIMPLE_INPUT_PARSER_TYPE, ParquetExtensionsModule.PARQUET_SIMPLE_PARSE_SPEC_TYPE ); - private static Map inputFormatType = ImmutableMap.of( + private static final Map INPUT_FORMAT_TYPES = ImmutableMap.of( ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE, "org.apache.druid.data.input.parquet.DruidParquetAvroInputFormat", ParquetExtensionsModule.PARQUET_SIMPLE_INPUT_PARSER_TYPE, "org.apache.druid.data.input.parquet.DruidParquetInputFormat" ); - private static Map> inputFormatClass = ImmutableMap.of( + private static final Map> INPUT_FORMAT_CLASSES = ImmutableMap.of( ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE, DruidParquetAvroInputFormat.class, ParquetExtensionsModule.PARQUET_SIMPLE_INPUT_PARSER_TYPE, @@ -78,9 +78,9 @@ static HadoopDruidIndexerConfig transformHadoopDruidIndexerConfig( String template = Strings.utf8ToString(Files.readAllBytes(Paths.get(templateFile))); String transformed; if (withParseType) { - transformed = StringUtils.format(template, inputFormatType.get(type), type, parseSpecType.get(type)); + transformed = StringUtils.format(template, INPUT_FORMAT_TYPES.get(type), type, PARSE_SPEC_TYPES.get(type)); } else { - transformed = StringUtils.format(template, inputFormatType.get(type), type); + transformed = StringUtils.format(template, INPUT_FORMAT_TYPES.get(type), type); } return HadoopDruidIndexerConfig.fromString(transformed); } @@ -93,7 +93,7 @@ static Object getFirstRow(Job job, String parserType, String parquetPath) throws FileSplit split = new FileSplit(path, 0, testFile.length(), null); InputFormat inputFormat = ReflectionUtils.newInstance( - inputFormatClass.get(parserType), + INPUT_FORMAT_CLASSES.get(parserType), job.getConfiguration() ); TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); @@ -117,7 +117,7 @@ static List getAllRows(String parserType, HadoopDruidIndexerConfig con FileSplit split = new FileSplit(path, 0, testFile.length(), null); InputFormat inputFormat = ReflectionUtils.newInstance( - inputFormatClass.get(parserType), + INPUT_FORMAT_CLASSES.get(parserType), job.getConfiguration() ); TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java new file mode 100644 index 000000000000..ac22f77f5f49 --- /dev/null +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.parquet; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.FileEntity; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +class BaseParquetReaderTest +{ + ObjectWriter DEFAULT_JSON_WRITER = new ObjectMapper().writerWithDefaultPrettyPrinter(); + + InputEntityReader createReader(String parquetFile, InputRowSchema schema, JSONPathSpec flattenSpec) throws IOException + { + return createReader(parquetFile, schema, flattenSpec, false); + } + + InputEntityReader createReader( + String parquetFile, + InputRowSchema schema, + JSONPathSpec flattenSpec, + boolean binaryAsString + ) throws IOException + { + FileEntity entity = new FileEntity(new File(parquetFile)); + ParquetInputFormat parquet = new ParquetInputFormat(flattenSpec, binaryAsString); + return parquet.createReader(schema, entity, null); + } + + List readAllRows(InputEntityReader reader) throws IOException + { + List rows = new ArrayList<>(); + try (CloseableIterator iterator = reader.read()) { + iterator.forEachRemaining(rows::add); + } + return rows; + } + + List sampleAllRows(InputEntityReader reader) throws IOException + { + List rows = new ArrayList<>(); + try (CloseableIterator iterator = reader.sample()) { + iterator.forEachRemaining(rows::add); + } + return rows; + } +} diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/CompatParquetInputTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/CompatParquetInputTest.java index 2169d9818b8a..6989d09b0d7b 100644 --- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/CompatParquetInputTest.java +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/CompatParquetInputTest.java @@ -67,7 +67,7 @@ public void testBinaryAsString() throws IOException, InterruptedException InputRow row = ((List) config.getParser().parseBatch(data)).get(0); - // without binaryAsString: true, the value would something like "[104, 101, 121, 32, 116, 104, 105, 115, 32, 105, 115, 3.... ]" + // without binaryAsString: true, the value would be "aGV5IHRoaXMgaXMgJsOpKC3DqF/Dp8OgKT1eJMO5KiEgzqleXg==" Assert.assertEquals("hey this is &é(-è_çà)=^$ù*! Ω^^", row.getDimension("field").get(0)); Assert.assertEquals(1471800234, row.getTimestampFromEpoch()); } diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/CompatParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/CompatParquetReaderTest.java new file mode 100644 index 000000000000..f8b586bec67d --- /dev/null +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/CompatParquetReaderTest.java @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.parquet; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * Duplicate of {@link CompatParquetInputTest} but for {@link ParquetReader} instead of Hadoop + */ +public class CompatParquetReaderTest extends BaseParquetReaderTest +{ + @Test + public void testBinaryAsString() throws IOException + { + final String file = "example/compat/284a0e001476716b-56d5676f53bd6e85_115466471_data.0.parq"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("ts", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("field"))), + ImmutableList.of() + ); + InputEntityReader reader = createReader( + file, + schema, + JSONPathSpec.DEFAULT, + true + ); + + InputEntityReader readerNotAsString = createReader( + file, + schema, + JSONPathSpec.DEFAULT, + false + ); + + List rows = readAllRows(reader); + List rowsAsBinary = readAllRows(readerNotAsString); + + Assert.assertEquals("hey this is &é(-è_çà)=^$ù*! Ω^^", rows.get(0).getDimension("field").get(0)); + Assert.assertEquals(1471800234, rows.get(0).getTimestampFromEpoch()); + Assert.assertEquals( + "aGV5IHRoaXMgaXMgJsOpKC3DqF/Dp8OgKT1eJMO5KiEgzqleXg==", + rowsAsBinary.get(0).getDimension("field").get(0) + ); + Assert.assertEquals(1471800234, rowsAsBinary.get(0).getTimestampFromEpoch()); + + reader = createReader( + file, + schema, + JSONPathSpec.DEFAULT, + true + ); + readerNotAsString = createReader( + file, + schema, + JSONPathSpec.DEFAULT, + false + ); + List sampled = sampleAllRows(reader); + List sampledAsBinary = sampleAllRows(readerNotAsString); + final String expectedJson = "{\n" + + " \"field\" : \"hey this is &é(-è_çà)=^$ù*! Ω^^\",\n" + + " \"ts\" : 1471800234\n" + + "}"; + Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues())); + + final String expectedJsonBinary = "{\n" + + " \"field\" : \"aGV5IHRoaXMgaXMgJsOpKC3DqF/Dp8OgKT1eJMO5KiEgzqleXg==\",\n" + + " \"ts\" : 1471800234\n" + + "}"; + Assert.assertEquals( + expectedJsonBinary, + DEFAULT_JSON_WRITER.writeValueAsString(sampledAsBinary.get(0).getRawValues()) + ); + } + + + @Test + public void testParquet1217() throws IOException + { + final String file = "example/compat/parquet-1217.parquet"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())), + ImmutableList.of("metric1") + ); + List flattenExpr = ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "col", "col"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "metric1", "$.col") + ); + JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr); + InputEntityReader reader = createReader( + file, + schema, + flattenSpec + ); + + List rows = readAllRows(reader); + + Assert.assertEquals("2018-09-01T00:00:00.000Z", rows.get(0).getTimestamp().toString()); + Assert.assertEquals("-1", rows.get(0).getDimension("col").get(0)); + Assert.assertEquals(-1, rows.get(0).getMetric("metric1")); + Assert.assertTrue(rows.get(4).getDimension("col").isEmpty()); + + reader = createReader( + file, + schema, + flattenSpec + ); + List sampled = sampleAllRows(reader); + final String expectedJson = "{\n" + + " \"col\" : -1\n" + + "}"; + Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues())); + } + + @Test + public void testParquetThriftCompat() throws IOException + { + /* + message ParquetSchema { + required boolean boolColumn; + required int32 byteColumn; + required int32 shortColumn; + required int32 intColumn; + required int64 longColumn; + required double doubleColumn; + required binary binaryColumn (UTF8); + required binary stringColumn (UTF8); + required binary enumColumn (ENUM); + optional boolean maybeBoolColumn; + optional int32 maybeByteColumn; + optional int32 maybeShortColumn; + optional int32 maybeIntColumn; + optional int64 maybeLongColumn; + optional double maybeDoubleColumn; + optional binary maybeBinaryColumn (UTF8); + optional binary maybeStringColumn (UTF8); + optional binary maybeEnumColumn (ENUM); + required group stringsColumn (LIST) { + repeated binary stringsColumn_tuple (UTF8); + } + required group intSetColumn (LIST) { + repeated int32 intSetColumn_tuple; + } + required group intToStringColumn (MAP) { + repeated group map (MAP_KEY_VALUE) { + required int32 key; + optional binary value (UTF8); + } + } + required group complexColumn (MAP) { + repeated group map (MAP_KEY_VALUE) { + required int32 key; + optional group value (LIST) { + repeated group value_tuple { + required group nestedIntsColumn (LIST) { + repeated int32 nestedIntsColumn_tuple; + } + required binary nestedStringColumn (UTF8); + } + } + } + } + } + */ + final String file = "example/compat/parquet-thrift-compat.snappy.parquet"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())), + Collections.emptyList() + ); + List flattenExpr = ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.PATH, "extractByLogicalMap", "$.intToStringColumn.1"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "extractByComplexLogicalMap", "$.complexColumn.1[0].nestedIntsColumn[1]") + ); + JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr); + InputEntityReader reader = createReader( + file, + schema, + flattenSpec + ); + + List rows = readAllRows(reader); + + Assert.assertEquals("2018-09-01T00:00:00.000Z", rows.get(0).getTimestamp().toString()); + Assert.assertEquals("true", rows.get(0).getDimension("boolColumn").get(0)); + Assert.assertEquals("0", rows.get(0).getDimension("byteColumn").get(0)); + Assert.assertEquals("1", rows.get(0).getDimension("shortColumn").get(0)); + Assert.assertEquals("2", rows.get(0).getDimension("intColumn").get(0)); + Assert.assertEquals("0", rows.get(0).getDimension("longColumn").get(0)); + Assert.assertEquals("0.2", rows.get(0).getDimension("doubleColumn").get(0)); + Assert.assertEquals("val_0", rows.get(0).getDimension("binaryColumn").get(0)); + Assert.assertEquals("val_0", rows.get(0).getDimension("stringColumn").get(0)); + Assert.assertEquals("SPADES", rows.get(0).getDimension("enumColumn").get(0)); + Assert.assertTrue(rows.get(0).getDimension("maybeBoolColumn").isEmpty()); + Assert.assertTrue(rows.get(0).getDimension("maybeByteColumn").isEmpty()); + Assert.assertTrue(rows.get(0).getDimension("maybeShortColumn").isEmpty()); + Assert.assertTrue(rows.get(0).getDimension("maybeIntColumn").isEmpty()); + Assert.assertTrue(rows.get(0).getDimension("maybeLongColumn").isEmpty()); + Assert.assertTrue(rows.get(0).getDimension("maybeDoubleColumn").isEmpty()); + Assert.assertTrue(rows.get(0).getDimension("maybeBinaryColumn").isEmpty()); + Assert.assertTrue(rows.get(0).getDimension("maybeStringColumn").isEmpty()); + Assert.assertTrue(rows.get(0).getDimension("maybeEnumColumn").isEmpty()); + Assert.assertEquals("arr_0", rows.get(0).getDimension("stringsColumn").get(0)); + Assert.assertEquals("arr_1", rows.get(0).getDimension("stringsColumn").get(1)); + Assert.assertEquals("0", rows.get(0).getDimension("intSetColumn").get(0)); + Assert.assertEquals("val_1", rows.get(0).getDimension("extractByLogicalMap").get(0)); + Assert.assertEquals("1", rows.get(0).getDimension("extractByComplexLogicalMap").get(0)); + + reader = createReader( + file, + schema, + flattenSpec + ); + List sampled = sampleAllRows(reader); + final String expectedJson = "{\n" + + " \"enumColumn\" : \"SPADES\",\n" + + " \"maybeStringColumn\" : { },\n" + + " \"maybeBinaryColumn\" : { },\n" + + " \"shortColumn\" : 1,\n" + + " \"byteColumn\" : 0,\n" + + " \"maybeBoolColumn\" : { },\n" + + " \"intColumn\" : 2,\n" + + " \"doubleColumn\" : 0.2,\n" + + " \"maybeByteColumn\" : { },\n" + + " \"intSetColumn\" : [ 0 ],\n" + + " \"boolColumn\" : true,\n" + + " \"binaryColumn\" : \"val_0\",\n" + + " \"maybeIntColumn\" : { },\n" + + " \"intToStringColumn\" : {\n" + + " \"0\" : \"val_0\",\n" + + " \"1\" : \"val_1\",\n" + + " \"2\" : \"val_2\"\n" + + " },\n" + + " \"maybeDoubleColumn\" : { },\n" + + " \"maybeEnumColumn\" : { },\n" + + " \"maybeLongColumn\" : { },\n" + + " \"stringsColumn\" : [ \"arr_0\", \"arr_1\", \"arr_2\" ],\n" + + " \"longColumn\" : 0,\n" + + " \"stringColumn\" : \"val_0\",\n" + + " \"maybeShortColumn\" : { },\n" + + " \"complexColumn\" : {\n" + + " \"0\" : [ {\n" + + " \"nestedStringColumn\" : \"val_0\",\n" + + " \"nestedIntsColumn\" : [ 0, 1, 2 ]\n" + + " }, {\n" + + " \"nestedStringColumn\" : \"val_1\",\n" + + " \"nestedIntsColumn\" : [ 1, 2, 3 ]\n" + + " }, {\n" + + " \"nestedStringColumn\" : \"val_2\",\n" + + " \"nestedIntsColumn\" : [ 2, 3, 4 ]\n" + + " } ],\n" + + " \"1\" : [ {\n" + + " \"nestedStringColumn\" : \"val_0\",\n" + + " \"nestedIntsColumn\" : [ 0, 1, 2 ]\n" + + " }, {\n" + + " \"nestedStringColumn\" : \"val_1\",\n" + + " \"nestedIntsColumn\" : [ 1, 2, 3 ]\n" + + " }, {\n" + + " \"nestedStringColumn\" : \"val_2\",\n" + + " \"nestedIntsColumn\" : [ 2, 3, 4 ]\n" + + " } ],\n" + + " \"2\" : [ {\n" + + " \"nestedStringColumn\" : \"val_0\",\n" + + " \"nestedIntsColumn\" : [ 0, 1, 2 ]\n" + + " }, {\n" + + " \"nestedStringColumn\" : \"val_1\",\n" + + " \"nestedIntsColumn\" : [ 1, 2, 3 ]\n" + + " }, {\n" + + " \"nestedStringColumn\" : \"val_2\",\n" + + " \"nestedIntsColumn\" : [ 2, 3, 4 ]\n" + + " } ]\n" + + " }\n" + + "}"; + Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues())); + } + + @Test + public void testOldRepeatedInt() throws IOException + { + final String file = "example/compat/old-repeated-int.parquet"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("repeatedInt"))), + Collections.emptyList() + ); + List flattenExpr = ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "repeatedInt", "repeatedInt") + ); + JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr); + InputEntityReader reader = createReader( + file, + schema, + flattenSpec + ); + + List rows = readAllRows(reader); + Assert.assertEquals("2018-09-01T00:00:00.000Z", rows.get(0).getTimestamp().toString()); + Assert.assertEquals("1", rows.get(0).getDimension("repeatedInt").get(0)); + Assert.assertEquals("2", rows.get(0).getDimension("repeatedInt").get(1)); + Assert.assertEquals("3", rows.get(0).getDimension("repeatedInt").get(2)); + + reader = createReader( + file, + schema, + flattenSpec + ); + List sampled = sampleAllRows(reader); + final String expectedJson = "{\n" + + " \"repeatedInt\" : [ 1, 2, 3 ]\n" + + "}"; + Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues())); + } + + + @Test + public void testReadNestedArrayStruct() throws IOException + { + final String file = "example/compat/nested-array-struct.parquet"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("i32_dec", "extracted1", "extracted2"))), + Collections.emptyList() + ); + List flattenExpr = ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.PATH, "extracted1", "$.myComplex[0].id"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "extracted2", "$.myComplex[0].repeatedMessage[*].someId") + ); + JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr); + InputEntityReader reader = createReader( + file, + schema, + flattenSpec + ); + + List rows = readAllRows(reader); + Assert.assertEquals("2018-09-01T00:00:00.000Z", rows.get(1).getTimestamp().toString()); + Assert.assertEquals("5", rows.get(1).getDimension("primitive").get(0)); + Assert.assertEquals("4", rows.get(1).getDimension("extracted1").get(0)); + Assert.assertEquals("6", rows.get(1).getDimension("extracted2").get(0)); + + reader = createReader( + file, + schema, + flattenSpec + ); + List sampled = sampleAllRows(reader); + final String expectedJson = "{\n" + + " \"primitive\" : 2,\n" + + " \"myComplex\" : [ {\n" + + " \"id\" : 1,\n" + + " \"repeatedMessage\" : [ 3 ]\n" + + " } ]\n" + + "}"; + Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues())); + } + + @Test + public void testProtoStructWithArray() throws IOException + { + final String file = "example/compat/proto-struct-with-array.parquet"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())), + Collections.emptyList() + ); + List flattenExpr = ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.PATH, "extractedOptional", "$.optionalMessage.someId"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "extractedRequired", "$.requiredMessage.someId"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "extractedRepeated", "$.repeatedMessage[*]") + ); + JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr); + InputEntityReader reader = createReader( + file, + schema, + flattenSpec + ); + + List rows = readAllRows(reader); + Assert.assertEquals("2018-09-01T00:00:00.000Z", rows.get(0).getTimestamp().toString()); + Assert.assertEquals("10", rows.get(0).getDimension("optionalPrimitive").get(0)); + Assert.assertEquals("9", rows.get(0).getDimension("requiredPrimitive").get(0)); + Assert.assertTrue(rows.get(0).getDimension("repeatedPrimitive").isEmpty()); + Assert.assertTrue(rows.get(0).getDimension("extractedOptional").isEmpty()); + Assert.assertEquals("9", rows.get(0).getDimension("extractedRequired").get(0)); + Assert.assertEquals("9", rows.get(0).getDimension("extractedRepeated").get(0)); + Assert.assertEquals("10", rows.get(0).getDimension("extractedRepeated").get(1)); + + reader = createReader( + file, + schema, + flattenSpec + ); + List sampled = sampleAllRows(reader); + final String expectedJson = "{\n" + + " \"optionalMessage\" : { },\n" + + " \"requiredPrimitive\" : 9,\n" + + " \"repeatedPrimitive\" : { },\n" + + " \"repeatedMessage\" : [ 9, 10 ],\n" + + " \"optionalPrimitive\" : 10,\n" + + " \"requiredMessage\" : {\n" + + " \"someId\" : 9\n" + + " }\n" + + "}"; + Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues())); + } +} diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/DecimalParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/DecimalParquetReaderTest.java new file mode 100644 index 000000000000..50b9fe2df27d --- /dev/null +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/DecimalParquetReaderTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.parquet; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.List; + +/** + * Duplicate of {@link DecimalParquetInputTest} but for {@link ParquetReader} instead of Hadoop + */ +public class DecimalParquetReaderTest extends BaseParquetReaderTest +{ + @Test + public void testReadParquetDecimalFixedLen() throws IOException + { + final String file = "example/decimals/dec-in-fixed-len.parquet"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("fixed_len_dec"))), + ImmutableList.of("metric1") + ); + List flattenExpr = ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "fixed_len_dec", "fixed_len_dec"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "metric1", "$.fixed_len_dec") + ); + JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr); + InputEntityReader reader = createReader( + file, + schema, + flattenSpec + ); + + List rows = readAllRows(reader); + Assert.assertEquals("2018-09-01T00:00:00.000Z", rows.get(1).getTimestamp().toString()); + Assert.assertEquals("1.0", rows.get(1).getDimension("fixed_len_dec").get(0)); + Assert.assertEquals(new BigDecimal("1.0"), rows.get(1).getMetric("metric1")); + + reader = createReader( + file, + schema, + flattenSpec + ); + List sampled = sampleAllRows(reader); + final String expectedJson = "{\n" + + " \"fixed_len_dec\" : 1.0\n" + + "}"; + Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(1).getRawValues())); + } + + @Test + public void testReadParquetDecimali32() throws IOException + { + final String file = "example/decimals/dec-in-i32.parquet"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("i32_dec"))), + ImmutableList.of("metric1") + ); + List flattenExpr = ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "i32_dec", "i32_dec"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "metric1", "$.i32_dec") + ); + JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr); + InputEntityReader reader = createReader( + file, + schema, + flattenSpec + ); + + List rows = readAllRows(reader); + Assert.assertEquals("2018-09-01T00:00:00.000Z", rows.get(1).getTimestamp().toString()); + Assert.assertEquals("100", rows.get(1).getDimension("i32_dec").get(0)); + Assert.assertEquals(new BigDecimal(100), rows.get(1).getMetric("metric1")); + + reader = createReader( + file, + schema, + flattenSpec + ); + List sampled = sampleAllRows(reader); + final String expectedJson = "{\n" + + " \"i32_dec\" : 100\n" + + "}"; + Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(1).getRawValues())); + } + + @Test + public void testReadParquetDecimali64() throws IOException + { + final String file = "example/decimals/dec-in-i64.parquet"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("i64_dec"))), + ImmutableList.of("metric1") + ); + List flattenExpr = ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "i32_dec", "i64_dec"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "metric1", "$.i64_dec") + ); + JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr); + InputEntityReader reader = createReader( + file, + schema, + flattenSpec + ); + + List rows = readAllRows(reader); + Assert.assertEquals("2018-09-01T00:00:00.000Z", rows.get(1).getTimestamp().toString()); + Assert.assertEquals("100", rows.get(1).getDimension("i64_dec").get(0)); + Assert.assertEquals(new BigDecimal(100), rows.get(1).getMetric("metric1")); + + reader = createReader( + file, + schema, + flattenSpec + ); + List sampled = sampleAllRows(reader); + final String expectedJson = "{\n" + + " \"i64_dec\" : 100\n" + + "}"; + Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(1).getRawValues())); + } +} diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetInputTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetInputTest.java index 748086dbff78..6532ab222df8 100644 --- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetInputTest.java +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetInputTest.java @@ -36,7 +36,7 @@ @RunWith(Parameterized.class) public class FlattenSpecParquetInputTest extends BaseParquetInputTest { - private static final String TS1 = "2018-09-18T00:18:00.023Z"; + static final String TS1 = "2018-09-18T00:18:00.023Z"; @Parameterized.Parameters(name = "type = {0}") public static Iterable constructorFeeder() diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetReaderTest.java new file mode 100644 index 000000000000..5be38dda494d --- /dev/null +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetReaderTest.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.parquet; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * Duplicate of {@link FlattenSpecParquetInputTest} but for {@link ParquetReader} instead of Hadoop + */ +public class FlattenSpecParquetReaderTest extends BaseParquetReaderTest +{ + private static final String FLAT_JSON = "{\n" + + " \"listDim\" : [ \"listDim1v1\", \"listDim1v2\" ],\n" + + " \"dim3\" : 1,\n" + + " \"dim2\" : \"d2v1\",\n" + + " \"dim1\" : \"d1v1\",\n" + + " \"metric1\" : 1,\n" + + " \"timestamp\" : 1537229880023\n" + + "}"; + + private static final String NESTED_JSON = "{\n" + + " \"nestedData\" : {\n" + + " \"listDim\" : [ \"listDim1v1\", \"listDim1v2\" ],\n" + + " \"dim3\" : 1,\n" + + " \"dim2\" : \"d2v1\",\n" + + " \"metric2\" : 2\n" + + " },\n" + + " \"dim1\" : \"d1v1\",\n" + + " \"metric1\" : 1,\n" + + " \"timestamp\" : 1537229880023\n" + + "}"; + + @Test + public void testFlat1NoFlattenSpec() throws IOException + { + final String file = "example/flattening/test_flat_1.parquet"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("timestamp", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim3", "listDim"))), + ImmutableList.of("metric1", "metric2") + ); + JSONPathSpec flattenSpec = new JSONPathSpec(false, ImmutableList.of()); + InputEntityReader reader = createReader( + file, + schema, + flattenSpec + ); + + List rows = readAllRows(reader); + Assert.assertEquals(FlattenSpecParquetInputTest.TS1, rows.get(0).getTimestamp().toString()); + Assert.assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0)); + Assert.assertEquals("d2v1", rows.get(0).getDimension("dim2").get(0)); + Assert.assertEquals("1", rows.get(0).getDimension("dim3").get(0)); + Assert.assertEquals("listDim1v1", rows.get(0).getDimension("listDim").get(0)); + Assert.assertEquals("listDim1v2", rows.get(0).getDimension("listDim").get(1)); + Assert.assertEquals(1, rows.get(0).getMetric("metric1").longValue()); + + reader = createReader( + file, + schema, + flattenSpec + ); + List sampled = sampleAllRows(reader); + Assert.assertEquals(FLAT_JSON, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues())); + } + + @Test + public void testFlat1Autodiscover() throws IOException + { + final String file = "example/flattening/test_flat_1.parquet"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("timestamp", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())), + ImmutableList.of("metric1", "metric2") + ); + InputEntityReader reader = createReader( + file, + schema, + JSONPathSpec.DEFAULT + ); + + List rows = readAllRows(reader); + Assert.assertEquals(FlattenSpecParquetInputTest.TS1, rows.get(0).getTimestamp().toString()); + Assert.assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0)); + Assert.assertEquals("d2v1", rows.get(0).getDimension("dim2").get(0)); + Assert.assertEquals("1", rows.get(0).getDimension("dim3").get(0)); + Assert.assertEquals("listDim1v1", rows.get(0).getDimension("listDim").get(0)); + Assert.assertEquals("listDim1v2", rows.get(0).getDimension("listDim").get(1)); + Assert.assertEquals(1, rows.get(0).getMetric("metric1").longValue()); + + reader = createReader( + file, + schema, + JSONPathSpec.DEFAULT + ); + List sampled = sampleAllRows(reader); + Assert.assertEquals(FLAT_JSON, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues())); + } + + @Test + public void testFlat1Flatten() throws IOException + { + final String file = "example/flattening/test_flat_1.parquet"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("timestamp", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim3", "list"))), + ImmutableList.of("metric1", "metric2") + ); + List flattenExpr = ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "timestamp", null), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "dim1", null), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "dim2", null), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "dim3", null), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "list", "$.listDim") + ); + JSONPathSpec flattenSpec = new JSONPathSpec(false, flattenExpr); + InputEntityReader reader = createReader( + file, + schema, + flattenSpec + ); + + List rows = readAllRows(reader); + Assert.assertEquals(FlattenSpecParquetInputTest.TS1, rows.get(0).getTimestamp().toString()); + Assert.assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0)); + Assert.assertEquals("d2v1", rows.get(0).getDimension("dim2").get(0)); + Assert.assertEquals("1", rows.get(0).getDimension("dim3").get(0)); + Assert.assertEquals("listDim1v1", rows.get(0).getDimension("list").get(0)); + Assert.assertEquals("listDim1v2", rows.get(0).getDimension("list").get(1)); + Assert.assertEquals(1, rows.get(0).getMetric("metric1").longValue()); + + reader = createReader( + file, + schema, + flattenSpec + ); + List sampled = sampleAllRows(reader); + Assert.assertEquals(FLAT_JSON, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues())); + } + + @Test + public void testFlat1FlattenSelectListItem() throws IOException + { + final String file = "example/flattening/test_flat_1.parquet"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("timestamp", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "listExtracted"))), + ImmutableList.of("metric1", "metric2") + ); + List flattenExpr = ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "timestamp", null), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "dim1", null), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "dim2", null), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "listExtracted", "$.listDim[1]") + ); + JSONPathSpec flattenSpec = new JSONPathSpec(false, flattenExpr); + InputEntityReader reader = createReader( + file, + schema, + flattenSpec + ); + + List rows = readAllRows(reader); + Assert.assertEquals(FlattenSpecParquetInputTest.TS1, rows.get(0).getTimestamp().toString()); + Assert.assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0)); + Assert.assertEquals("d2v1", rows.get(0).getDimension("dim2").get(0)); + Assert.assertEquals("listDim1v2", rows.get(0).getDimension("listExtracted").get(0)); + Assert.assertEquals(1, rows.get(0).getMetric("metric1").longValue()); + + reader = createReader( + file, + schema, + flattenSpec + ); + List sampled = sampleAllRows(reader); + + Assert.assertEquals(FLAT_JSON, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues())); + } + + + @Test + public void testNested1NoFlattenSpec() throws IOException + { + final String file = "example/flattening/test_nested_1.parquet"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("timestamp", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1"))), + ImmutableList.of("metric1") + ); + JSONPathSpec flattenSpec = new JSONPathSpec(false, ImmutableList.of()); + InputEntityReader reader = createReader( + file, + schema, + flattenSpec + ); + + List rows = readAllRows(reader); + Assert.assertEquals(FlattenSpecParquetInputTest.TS1, rows.get(0).getTimestamp().toString()); + Assert.assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0)); + List dims = rows.get(0).getDimensions(); + Assert.assertEquals(1, dims.size()); + Assert.assertFalse(dims.contains("dim2")); + Assert.assertFalse(dims.contains("dim3")); + Assert.assertFalse(dims.contains("listDim")); + Assert.assertFalse(dims.contains("nestedData")); + Assert.assertEquals(1, rows.get(0).getMetric("metric1").longValue()); + + reader = createReader( + file, + schema, + flattenSpec + ); + List sampled = sampleAllRows(reader); + Assert.assertEquals(NESTED_JSON, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues())); + } + + @Test + public void testNested1Autodiscover() throws IOException + { + final String file = "example/flattening/test_nested_1.parquet"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("timestamp", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())), + ImmutableList.of("metric1", "metric2") + ); + InputEntityReader reader = createReader( + file, + schema, + JSONPathSpec.DEFAULT + ); + + List rows = readAllRows(reader); + Assert.assertEquals(FlattenSpecParquetInputTest.TS1, rows.get(0).getTimestamp().toString()); + Assert.assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0)); + List dims = rows.get(0).getDimensions(); + Assert.assertFalse(dims.contains("dim2")); + Assert.assertFalse(dims.contains("dim3")); + Assert.assertFalse(dims.contains("listDim")); + Assert.assertEquals(1, rows.get(0).getMetric("metric1").longValue()); + + reader = createReader( + file, + schema, + JSONPathSpec.DEFAULT + ); + List sampled = sampleAllRows(reader); + Assert.assertEquals(NESTED_JSON, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues())); + } + + @Test + public void testNested1Flatten() throws IOException + { + final String file = "example/flattening/test_nested_1.parquet"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("timestamp", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())), + ImmutableList.of("metric1", "metric2") + ); + List flattenExpr = ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "timestamp", null), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "dim1", null), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "dim2", "$.nestedData.dim2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "dim3", "$.nestedData.dim3"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "metric2", "$.nestedData.metric2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "listDim", "$.nestedData.listDim[*]") + ); + JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr); + InputEntityReader reader = createReader( + file, + schema, + flattenSpec + ); + + List rows = readAllRows(reader); + Assert.assertEquals(FlattenSpecParquetInputTest.TS1, rows.get(0).getTimestamp().toString()); + Assert.assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0)); + Assert.assertEquals("d2v1", rows.get(0).getDimension("dim2").get(0)); + Assert.assertEquals("1", rows.get(0).getDimension("dim3").get(0)); + Assert.assertEquals("listDim1v1", rows.get(0).getDimension("listDim").get(0)); + Assert.assertEquals("listDim1v2", rows.get(0).getDimension("listDim").get(1)); + Assert.assertEquals(1, rows.get(0).getMetric("metric1").longValue()); + Assert.assertEquals(2, rows.get(0).getMetric("metric2").longValue()); + + reader = createReader( + file, + schema, + flattenSpec + ); + List sampled = sampleAllRows(reader); + Assert.assertEquals(NESTED_JSON, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues())); + } + + @Test + public void testNested1FlattenSelectListItem() throws IOException + { + final String file = "example/flattening/test_nested_1.parquet"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("timestamp", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())), + Collections.emptyList() + ); + List flattenExpr = ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "timestamp", null), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "dim1", null), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "dim2", "$.nestedData.dim2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "dim3", "$.nestedData.dim3"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "listextracted", "$.nestedData.listDim[1]") + ); + JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr); + InputEntityReader reader = createReader( + file, + schema, + flattenSpec + ); + + List rows = readAllRows(reader); + + Assert.assertEquals(FlattenSpecParquetInputTest.TS1, rows.get(0).getTimestamp().toString()); + Assert.assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0)); + Assert.assertEquals("d2v1", rows.get(0).getDimension("dim2").get(0)); + Assert.assertEquals("1", rows.get(0).getDimension("dim3").get(0)); + Assert.assertEquals("listDim1v2", rows.get(0).getDimension("listextracted").get(0)); + Assert.assertEquals(1, rows.get(0).getMetric("metric1").longValue()); + + reader = createReader( + file, + schema, + flattenSpec + ); + List sampled = sampleAllRows(reader); + Assert.assertEquals(NESTED_JSON, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues())); + } +} diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/TimestampsParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/TimestampsParquetReaderTest.java new file mode 100644 index 000000000000..19f1544dcff0 --- /dev/null +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/TimestampsParquetReaderTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.parquet; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * Duplicate of {@link TimestampsParquetInputTest} but for {@link ParquetReader} instead of Hadoop + */ +public class TimestampsParquetReaderTest extends BaseParquetReaderTest +{ + @Test + public void testDateHandling() throws IOException + { + final String file = "example/timestamps/test_date_data.snappy.parquet"; + InputRowSchema schemaAsString = new InputRowSchema( + new TimestampSpec("date_as_string", "Y-M-d", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())), + Collections.emptyList() + ); + InputRowSchema schemaAsDate = new InputRowSchema( + new TimestampSpec("date_as_date", null, null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())), + Collections.emptyList() + ); + InputEntityReader readerAsString = createReader( + file, + schemaAsString, + JSONPathSpec.DEFAULT + ); + InputEntityReader readerAsDate = createReader( + file, + schemaAsDate, + JSONPathSpec.DEFAULT + ); + + List rowsWithString = readAllRows(readerAsString); + List rowsWithDate = readAllRows(readerAsDate); + Assert.assertEquals(rowsWithDate.size(), rowsWithString.size()); + + for (int i = 0; i < rowsWithDate.size(); i++) { + Assert.assertEquals(rowsWithString.get(i).getTimestamp(), rowsWithDate.get(i).getTimestamp()); + } + + readerAsString = createReader( + file, + schemaAsString, + JSONPathSpec.DEFAULT + ); + readerAsDate = createReader( + file, + schemaAsDate, + JSONPathSpec.DEFAULT + ); + List sampledAsString = sampleAllRows(readerAsString); + List sampledAsDate = sampleAllRows(readerAsDate); + final String expectedJson = "{\n" + + " \"date_as_string\" : \"2017-06-18\",\n" + + " \"timestamp_as_timestamp\" : 1497702471815,\n" + + " \"timestamp_as_string\" : \"2017-06-17 14:27:51.815\",\n" + + " \"idx\" : 1,\n" + + " \"date_as_date\" : 1497744000000\n" + + "}"; + Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampledAsString.get(0).getRawValues())); + Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampledAsDate.get(0).getRawValues())); + } + + @Test + public void testParseInt96Timestamp() throws IOException + { + // the source parquet file was found in apache spark sql repo tests, where it is known as impala_timestamp.parq + // it has a single column, "ts" which is an int96 timestamp + final String file = "example/timestamps/int96_timestamp.parquet"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("ts", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())), + Collections.emptyList() + ); + InputEntityReader reader = createReader(file, schema, JSONPathSpec.DEFAULT); + + List rows = readAllRows(reader); + Assert.assertEquals("2001-01-01T01:01:01.000Z", rows.get(0).getTimestamp().toString()); + + reader = createReader( + file, + schema, + JSONPathSpec.DEFAULT + ); + List sampled = sampleAllRows(reader); + final String expectedJson = "{\n" + + " \"ts\" : 978310861000\n" + + "}"; + Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues())); + } + + @Test + public void testTimeMillisInInt64() throws IOException + { + final String file = "example/timestamps/timemillis-in-i64.parquet"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("time", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())), + Collections.emptyList() + ); + InputEntityReader reader = createReader( + file, + schema, + JSONPathSpec.DEFAULT + ); + + List rows = readAllRows(reader); + Assert.assertEquals("1970-01-01T00:00:00.010Z", rows.get(0).getTimestamp().toString()); + + reader = createReader( + file, + schema, + JSONPathSpec.DEFAULT + ); + List sampled = sampleAllRows(reader); + final String expectedJson = "{\n" + + " \"time\" : 10\n" + + "}"; + Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues())); + } +} diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/WikiParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/WikiParquetReaderTest.java new file mode 100644 index 000000000000..75e5e916ec78 --- /dev/null +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/WikiParquetReaderTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.parquet; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * Duplicate of {@link WikiParquetInputTest} but for {@link ParquetReader} instead of Hadoop + */ +public class WikiParquetReaderTest extends BaseParquetReaderTest +{ + @Test + public void testWiki() throws IOException + { + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("page", "language", "user", "unpatrolled"))), + Collections.emptyList() + ); + InputEntityReader reader = createReader("example/wiki/wiki.parquet", schema, JSONPathSpec.DEFAULT); + + List rows = readAllRows(reader); + Assert.assertEquals("Gypsy Danger", rows.get(0).getDimension("page").get(0)); + String s1 = rows.get(0).getDimension("language").get(0); + String s2 = rows.get(0).getDimension("language").get(1); + Assert.assertEquals("en", s1); + Assert.assertEquals("zh", s2); + + reader = createReader("example/wiki/wiki.parquet", schema, JSONPathSpec.DEFAULT); + List sampled = sampleAllRows(reader); + + final String expectedJson = "{\n" + + " \"continent\" : \"North America\",\n" + + " \"country\" : \"United States\",\n" + + " \"added\" : 57,\n" + + " \"city\" : \"San Francisco\",\n" + + " \"unpatrolled\" : \"true\",\n" + + " \"delta\" : -143,\n" + + " \"language\" : [ \"en\", \"zh\" ],\n" + + " \"robot\" : \"false\",\n" + + " \"deleted\" : 200,\n" + + " \"newPage\" : \"true\",\n" + + " \"namespace\" : \"article\",\n" + + " \"anonymous\" : \"false\",\n" + + " \"page\" : \"Gypsy Danger\",\n" + + " \"region\" : \"Bay Area\",\n" + + " \"user\" : \"nuclear\",\n" + + " \"timestamp\" : \"2013-08-31T01:02:33Z\"\n" + + "}"; + Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues())); + } +} diff --git a/website/.spelling b/website/.spelling index d28328196e8e..a38cf47d7f2c 100644 --- a/website/.spelling +++ b/website/.spelling @@ -90,6 +90,7 @@ ISO8601 IndexSpec IndexTask InfluxDB +InputFormat Integer.MAX_VALUE JBOD JDBC