diff --git a/extensions-contrib/orc-extensions/example/hadoop_druid_orc_job.json b/extensions-contrib/orc-extensions/example/hadoop_druid_orc_job.json new file mode 100755 index 000000000000..b9533f373ba0 --- /dev/null +++ b/extensions-contrib/orc-extensions/example/hadoop_druid_orc_job.json @@ -0,0 +1,63 @@ +{ + "type": "index_hadoop", + "spec": { + "ioConfig": { + "type": "hadoop", + "inputSpec": { + "type": "static", + "inputFormat": "org.apache.druid.data.input.orc.DruidOrcNewInputFormat", + "paths": "wikipedia.gz.orc" + }, + "metadataUpdateSpec": { + "type": "postgresql", + "connectURI": "jdbc:postgresql://localhost/druid", + "user" : "druid", + "password" : "asdf", + "segmentTable": "druid_segments" + }, + "segmentOutputPath": "/tmp/segments" + }, + "dataSchema": { + "dataSource": "wikipedia", + "parser": { + "type": "druid_orc", + "parseSpec": { + "format": "timeAndDims", + "timestampSpec": { + "column": "timestamp", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": [ + "timestamp", + "col1", + "col2" + ], + "dimensionExclusions": [], + "spatialDimensions": [] + } + } + }, + "metricsSpec": [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "DAY", + "queryGranularity": "NONE", + "intervals": ["2015-01-01/2017-01-01"] + } + }, + "tuningConfig": { + "type": "hadoop", + "workingPath": "tmp/working_path", + "partitionsSpec": { + "targetPartitionSize": 5000000 + }, + "jobProperties" : { + "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps", + "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps", + "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" + }, + "leaveIntermediate": true + } + } +} diff --git a/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/DruidOrcHadoopInputRowParser.java b/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/DruidOrcHadoopInputRowParser.java new file mode 100644 index 000000000000..cbfa17442854 --- /dev/null +++ b/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/DruidOrcHadoopInputRowParser.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.orc; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.impl.ParseSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.logger.Logger; +import org.joda.time.DateTime; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class DruidOrcHadoopInputRowParser implements InputRowParser> +{ + private static final Logger log = new Logger(DruidOrcHadoopInputRowParser.class); + + private final ParseSpec parseSpec; + private final List dimensions; + + @JsonCreator + public DruidOrcHadoopInputRowParser( + @JsonProperty("parseSpec") ParseSpec parseSpec + ) + { + this.parseSpec = parseSpec; + this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames(); + } + + @SuppressWarnings("ArgumentParameterSwap") + @Override + public List parseBatch(Map input) + { + TimestampSpec timestampSpec = parseSpec.getTimestampSpec(); + DateTime dateTime = timestampSpec.extractTimestamp(input); + return ImmutableList.of(new MapBasedInputRow(dateTime, dimensions, input)); + } + + @Override + @JsonProperty + public ParseSpec getParseSpec() + { + return parseSpec; + } + + @Override + public InputRowParser withParseSpec(ParseSpec parseSpec) + { + return new DruidOrcHadoopInputRowParser(parseSpec); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final DruidOrcHadoopInputRowParser that = (DruidOrcHadoopInputRowParser) o; + return Objects.equals(parseSpec, that.parseSpec); + } + + @Override + public int hashCode() + { + return parseSpec.hashCode(); + } + + @Override + public String toString() + { + return "DruidOrcHadoopInputRowParser{" + + "parseSpec=" + parseSpec + + '}'; + } +} diff --git a/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/DruidOrcNewInputFormat.java b/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/DruidOrcNewInputFormat.java new file mode 100644 index 000000000000..c5bd8639a690 --- /dev/null +++ b/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/DruidOrcNewInputFormat.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.orc; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.io.orc.OrcFile; +import org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; +import org.apache.hadoop.hive.ql.io.orc.Reader; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DruidOrcNewInputFormat extends InputFormat> +{ + private final OrcNewInputFormat orcNewInputFormat; + + public DruidOrcNewInputFormat() + { + orcNewInputFormat = new OrcNewInputFormat(); + } + + @Override + public RecordReader> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException + { + FileSplit fileSplit = (FileSplit) split; + Path path = fileSplit.getPath(); + Configuration conf = ShimLoader.getHadoopShims().getConfiguration(context); + Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf)); + StructObjectInspector inspector = (StructObjectInspector) file.getObjectInspector(); + RecordReader reader = orcNewInputFormat.createRecordReader(split, context); + return new DruidOrcRecordReader(inspector, reader); + } + + @Override + public List getSplits(JobContext context) throws IOException, InterruptedException + { + return orcNewInputFormat.getSplits(context); + } + + private static class DruidOrcRecordReader extends RecordReader> + { + private final StructObjectInspector inspector; + private final RecordReader orcRecordReader; + + private DruidOrcRecordReader(StructObjectInspector inspector, RecordReader orcRecordReader) + { + this.inspector = inspector; + this.orcRecordReader = orcRecordReader; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException + { + orcRecordReader.initialize(split, context); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException + { + return orcRecordReader.nextKeyValue(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException + { + return orcRecordReader.getCurrentKey(); + } + + @Override + public Map getCurrentValue() throws IOException, InterruptedException + { + OrcStruct value = orcRecordReader.getCurrentValue(); + + Map map = new HashMap<>(); + List fields = inspector.getAllStructFieldRefs(); + for (StructField field : fields) { + ObjectInspector objectInspector = field.getFieldObjectInspector(); + switch (objectInspector.getCategory()) { + case PRIMITIVE: + PrimitiveObjectInspector primitiveObjectInspector = (PrimitiveObjectInspector) objectInspector; + map.put( + field.getFieldName(), + coercePrimitiveObject( + primitiveObjectInspector.getPrimitiveJavaObject(inspector.getStructFieldData(value, field)) + ) + ); + break; + case LIST: // array case - only 1-depth array supported yet + ListObjectInspector listObjectInspector = (ListObjectInspector) objectInspector; + map.put( + field.getFieldName(), + getListObject(listObjectInspector, inspector.getStructFieldData(value, field)) + ); + break; + case MAP: + MapObjectInspector mapObjectInspector = (MapObjectInspector) objectInspector; + addMapValues(map, field.getFieldName(), mapObjectInspector, inspector.getStructFieldData(value, field)); + break; + default: + break; + } + } + + return map; + } + + @Override + public float getProgress() throws IOException, InterruptedException + { + return orcRecordReader.getProgress(); + } + + @Override + public void close() throws IOException + { + orcRecordReader.close(); + } + + + + private static Object coercePrimitiveObject(final Object object) + { + if (object instanceof HiveDecimal) { + return ((HiveDecimal) object).doubleValue(); + } else { + return object; + } + } + + private List getListObject(ListObjectInspector listObjectInspector, Object listObject) + { + if (listObjectInspector.getListLength(listObject) < 0) { + return null; + } + List objectList = listObjectInspector.getList(listObject); + List list = null; + ObjectInspector child = listObjectInspector.getListElementObjectInspector(); + switch (child.getCategory()) { + case PRIMITIVE: + final PrimitiveObjectInspector primitiveObjectInspector = (PrimitiveObjectInspector) child; + list = Lists.transform(objectList, new Function() + { + @Nullable + @Override + public Object apply(@Nullable Object input) + { + return coercePrimitiveObject(primitiveObjectInspector.getPrimitiveJavaObject(input)); + } + } + ); + break; + default: + break; + } + + return list; + } + + private void addMapValues(Map parsedMap, String fieldName, MapObjectInspector mapObjectInspector, Object mapObject) + { + if (mapObjectInspector.getMapSize(mapObject) < 0) { + return; + } + ObjectInspector keyoip = mapObjectInspector.getMapKeyObjectInspector(); + ObjectInspector valueoip = mapObjectInspector.getMapValueObjectInspector(); + if (keyoip.getCategory() != ObjectInspector.Category.PRIMITIVE || valueoip.getCategory() != ObjectInspector.Category.PRIMITIVE) { + return; + } + + PrimitiveObjectInspector keyInspector = (PrimitiveObjectInspector) keyoip; + PrimitiveObjectInspector valueInspector = (PrimitiveObjectInspector) valueoip; + + Map objectMap = mapObjectInspector.getMap(mapObject); + + objectMap.forEach((k, v) -> { + String resolvedFieldName = fieldName + "_" + keyInspector.getPrimitiveJavaObject(k); + parsedMap.put(resolvedFieldName, valueInspector.getPrimitiveJavaObject(v)); + }); + } + } +} diff --git a/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java b/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java index 595bb3b856e7..a775842b540c 100644 --- a/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java +++ b/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java @@ -36,7 +36,8 @@ public List getJacksonModules() return Collections.singletonList( new SimpleModule("OrcInputRowParserModule") .registerSubtypes( - new NamedType(OrcHadoopInputRowParser.class, "orc") + new NamedType(OrcHadoopInputRowParser.class, "orc"), + new NamedType(DruidOrcHadoopInputRowParser.class, "druid_orc") ) ); } diff --git a/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java b/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java index 6d3340ca81a8..355e69791766 100644 --- a/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java +++ b/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java @@ -121,7 +121,7 @@ public List parseBatch(OrcStruct input) break; case MAP: MapObjectInspector mapObjectInspector = (MapObjectInspector) objectInspector; - getMapObject(field.getFieldName(), mapObjectInspector, oip.getStructFieldData(input, field), map); + addMapValues(map, field.getFieldName(), mapObjectInspector, oip.getStructFieldData(input, field)); break; default: break; @@ -164,11 +164,20 @@ private List getListObject(ListObjectInspector listObjectInspector, Object listO return list; } - private void getMapObject(String parentName, MapObjectInspector mapObjectInspector, Object mapObject, Map parsedMap) + private void addMapValues(Map parsedMap, String parentName, MapObjectInspector mapObjectInspector, Object mapObject) { if (mapObjectInspector.getMapSize(mapObject) < 0) { return; } + ObjectInspector keyoip = mapObjectInspector.getMapKeyObjectInspector(); + ObjectInspector valueoip = mapObjectInspector.getMapValueObjectInspector(); + if (keyoip.getCategory() != ObjectInspector.Category.PRIMITIVE || valueoip.getCategory() != ObjectInspector.Category.PRIMITIVE) { + return; + } + + PrimitiveObjectInspector keyInspector = (PrimitiveObjectInspector) keyoip; + PrimitiveObjectInspector valueInspector = (PrimitiveObjectInspector) valueoip; + String mapChildFieldNameFormat = StringUtils.replace( StringUtils.format(mapParentFieldNameFormat, parentName), MAP_CHILD_TAG, @@ -176,12 +185,10 @@ private void getMapObject(String parentName, MapObjectInspector mapObjectInspect ); Map objectMap = mapObjectInspector.getMap(mapObject); - PrimitiveObjectInspector key = (PrimitiveObjectInspector) mapObjectInspector.getMapKeyObjectInspector(); - PrimitiveObjectInspector value = (PrimitiveObjectInspector) mapObjectInspector.getMapValueObjectInspector(); objectMap.forEach((k, v) -> { - String resolvedFieldName = StringUtils.format(mapChildFieldNameFormat, key.getPrimitiveJavaObject(k).toString()); - parsedMap.put(resolvedFieldName, value.getPrimitiveJavaObject(v)); + String resolvedFieldName = StringUtils.format(mapChildFieldNameFormat, keyInspector.getPrimitiveJavaObject(k).toString()); + parsedMap.put(resolvedFieldName, valueInspector.getPrimitiveJavaObject(v)); }); } diff --git a/extensions-contrib/orc-extensions/src/test/java/org/apache/druid/data/input/orc/DruidOrcIndexGeneratorJobTest.java b/extensions-contrib/orc-extensions/src/test/java/org/apache/druid/data/input/orc/DruidOrcIndexGeneratorJobTest.java new file mode 100644 index 000000000000..4b4b78de38d7 --- /dev/null +++ b/extensions-contrib/orc-extensions/src/test/java/org/apache/druid/data/input/orc/DruidOrcIndexGeneratorJobTest.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.orc; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.impl.TimeAndDimsParseSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.HadoopDruidIndexerConfig; +import org.apache.druid.indexer.HadoopIOConfig; +import org.apache.druid.indexer.HadoopIngestionSpec; +import org.apache.druid.indexer.HadoopTuningConfig; +import org.apache.druid.indexer.HadoopyShardSpec; +import org.apache.druid.indexer.IndexGeneratorJob; +import org.apache.druid.indexer.JobHelper; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.QueryableIndexIndexableAdapter; +import org.apache.druid.segment.RowIterator; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; +import org.apache.druid.timeline.partition.ShardSpec; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.joda.time.DateTime; +import org.joda.time.DateTimeComparator; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +public class DruidOrcIndexGeneratorJobTest +{ + private static final AggregatorFactory[] aggs = { + new LongSumAggregatorFactory("visited_num", "visited_num"), + new HyperUniquesAggregatorFactory("unique_hosts", "host") + }; + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private ObjectMapper mapper; + private HadoopDruidIndexerConfig config; + private final String dataSourceName = "website"; + private final List data = ImmutableList.of( + "2014102200,a.example.com,100", + "2014102200,b.exmaple.com,50", + "2014102200,c.example.com,200", + "2014102200,d.example.com,250", + "2014102200,e.example.com,123", + "2014102200,f.example.com,567", + "2014102200,g.example.com,11", + "2014102200,h.example.com,251", + "2014102200,i.example.com,963", + "2014102200,j.example.com,333", + "2014102212,a.example.com,100", + "2014102212,b.exmaple.com,50", + "2014102212,c.example.com,200", + "2014102212,d.example.com,250", + "2014102212,e.example.com,123", + "2014102212,f.example.com,567", + "2014102212,g.example.com,11", + "2014102212,h.example.com,251", + "2014102212,i.example.com,963", + "2014102212,j.example.com,333" + ); + private final Interval interval = Intervals.of("2014-10-22T00:00:00Z/P1D"); + private File dataRoot; + private File outputRoot; + private Integer[][][] shardInfoForEachSegment = new Integer[][][]{ + { + {0, 4}, + {1, 4}, + {2, 4}, + {3, 4} + } + }; + private final InputRowParser inputRowParser = new DruidOrcHadoopInputRowParser( + new TimeAndDimsParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null) + ) + ); + + private File writeDataToLocalOrcFile(File outputDir, List data) throws IOException + { + File outputFile = new File(outputDir, "test.orc"); + TypeDescription schema = TypeDescription.createStruct() + .addField("timestamp", TypeDescription.createString()) + .addField("host", TypeDescription.createString()) + .addField("visited_num", TypeDescription.createInt()); + Configuration conf = new Configuration(); + Writer writer = OrcFile.createWriter( + new Path(outputFile.getPath()), + OrcFile.writerOptions(conf) + .setSchema(schema) + .stripeSize(100000) + .bufferSize(10000) + .compress(CompressionKind.ZLIB) + .version(OrcFile.Version.CURRENT) + ); + VectorizedRowBatch batch = schema.createRowBatch(); + batch.size = data.size(); + for (int idx = 0; idx < data.size(); idx++) { + String line = data.get(idx); + String[] lineSplit = line.split(","); + ((BytesColumnVector) batch.cols[0]).setRef( + idx, + StringUtils.toUtf8(lineSplit[0]), + 0, + lineSplit[0].length() + ); + ((BytesColumnVector) batch.cols[1]).setRef( + idx, + StringUtils.toUtf8(lineSplit[1]), + 0, + lineSplit[1].length() + ); + ((LongColumnVector) batch.cols[2]).vector[idx] = Long.parseLong(lineSplit[2]); + } + writer.addRowBatch(batch); + writer.close(); + + return outputFile; + } + + @Before + public void setUp() throws Exception + { + mapper = HadoopDruidIndexerConfig.JSON_MAPPER; + mapper.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed")); + + dataRoot = temporaryFolder.newFolder("data"); + outputRoot = temporaryFolder.newFolder("output"); + File dataFile = writeDataToLocalOrcFile(dataRoot, data); + + HashMap inputSpec = new HashMap(); + inputSpec.put("paths", dataFile.getCanonicalPath()); + inputSpec.put("type", "static"); + inputSpec.put("inputFormat", "org.apache.druid.data.input.orc.DruidOrcNewInputFormat"); + + config = new HadoopDruidIndexerConfig( + new HadoopIngestionSpec( + new DataSchema( + dataSourceName, + mapper.convertValue( + inputRowParser, + Map.class + ), + aggs, + new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, ImmutableList.of(this.interval)), + null, + mapper + ), + new HadoopIOConfig( + ImmutableMap.copyOf(inputSpec), + null, + outputRoot.getCanonicalPath() + ), + new HadoopTuningConfig( + outputRoot.getCanonicalPath(), + null, + null, + null, + null, + null, + null, + true, + false, + false, + false, + ImmutableMap.of(MRJobConfig.NUM_REDUCES, "0"), //verifies that set num reducers is ignored + false, + true, + null, + true, + null, + false, + false, + null, + null, + null + ) + ) + ); + config.setShardSpecs( + loadShardSpecs(shardInfoForEachSegment) + ); + config = HadoopDruidIndexerConfig.fromSpec(config.getSchema()); + } + + @Test + public void testIndexGeneratorJob() throws IOException + { + verifyJob(new IndexGeneratorJob(config)); + } + + private void verifyJob(IndexGeneratorJob job) throws IOException + { + Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job), config)); + + final Map> intervalToSegments = new HashMap<>(); + IndexGeneratorJob + .getPublishedSegments(config) + .forEach(segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()) + .add(segment)); + + final Map> intervalToIndexFiles = new HashMap<>(); + int segmentNum = 0; + for (DateTime currTime = interval.getStart(); currTime.isBefore(interval.getEnd()); currTime = currTime.plusDays(1)) { + Integer[][] shardInfo = shardInfoForEachSegment[segmentNum++]; + File segmentOutputFolder = new File( + StringUtils.format( + "%s/%s/%s_%s/%s", + config.getSchema().getIOConfig().getSegmentOutputPath(), + config.getSchema().getDataSchema().getDataSource(), + currTime.toString(), + currTime.plusDays(1).toString(), + config.getSchema().getTuningConfig().getVersion() + ) + ); + Assert.assertTrue(segmentOutputFolder.exists()); + Assert.assertEquals(shardInfo.length, segmentOutputFolder.list().length); + + for (int partitionNum = 0; partitionNum < shardInfo.length; ++partitionNum) { + File individualSegmentFolder = new File(segmentOutputFolder, Integer.toString(partitionNum)); + Assert.assertTrue(individualSegmentFolder.exists()); + + File indexZip = new File(individualSegmentFolder, "index.zip"); + Assert.assertTrue(indexZip.exists()); + + intervalToIndexFiles.computeIfAbsent(new Interval(currTime, currTime.plusDays(1)), k -> new ArrayList<>()) + .add(indexZip); + } + } + + Assert.assertEquals(intervalToSegments.size(), intervalToIndexFiles.size()); + + segmentNum = 0; + for (Entry> entry : intervalToSegments.entrySet()) { + final Interval interval = entry.getKey(); + final List segments = entry.getValue(); + final List indexFiles = intervalToIndexFiles.get(interval); + Collections.sort(segments); + indexFiles.sort(Comparator.comparing(File::getAbsolutePath)); + + Assert.assertNotNull(indexFiles); + Assert.assertEquals(segments.size(), indexFiles.size()); + Integer[][] shardInfo = shardInfoForEachSegment[segmentNum++]; + + int rowCount = 0; + for (int i = 0; i < segments.size(); i++) { + final DataSegment dataSegment = segments.get(i); + final File indexZip = indexFiles.get(i); + Assert.assertEquals(config.getSchema().getTuningConfig().getVersion(), dataSegment.getVersion()); + Assert.assertEquals("local", dataSegment.getLoadSpec().get("type")); + Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path")); + Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion()); + + Assert.assertEquals(dataSourceName, dataSegment.getDataSource()); + Assert.assertEquals(1, dataSegment.getDimensions().size()); + String[] dimensions = dataSegment.getDimensions().toArray(new String[0]); + Arrays.sort(dimensions); + Assert.assertEquals("host", dimensions[0]); + Assert.assertEquals("visited_num", dataSegment.getMetrics().get(0)); + Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1)); + + Integer[] hashShardInfo = shardInfo[i]; + HashBasedNumberedShardSpec spec = (HashBasedNumberedShardSpec) dataSegment.getShardSpec(); + Assert.assertEquals((int) hashShardInfo[0], spec.getPartitionNum()); + Assert.assertEquals((int) hashShardInfo[1], spec.getPartitions()); + + File dir = Files.createTempDir(); + + unzip(indexZip, dir); + + QueryableIndex index = HadoopDruidIndexerConfig.INDEX_IO.loadIndex(dir); + QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(index); + + try (RowIterator rowIt = adapter.getRows()) { + while (rowIt.moveToNext()) { + rowCount++; + Assert.assertEquals(2, rowIt.getPointer().getNumMetrics()); + } + } + } + Assert.assertEquals(rowCount, data.size()); + } + } + + private Map> loadShardSpecs( + Integer[][][] shardInfoForEachShard + ) + { + Map> shardSpecs = new TreeMap<>(DateTimeComparator.getInstance()); + int shardCount = 0; + int segmentNum = 0; + for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { + List specs = new ArrayList<>(); + for (Integer[] shardInfo : shardInfoForEachShard[segmentNum++]) { + specs.add(new HashBasedNumberedShardSpec(shardInfo[0], shardInfo[1], null, HadoopDruidIndexerConfig.JSON_MAPPER)); + } + List actualSpecs = Lists.newArrayListWithExpectedSize(specs.size()); + for (ShardSpec spec : specs) { + actualSpecs.add(new HadoopyShardSpec(spec, shardCount++)); + } + + shardSpecs.put(segmentGranularity.getStartMillis(), actualSpecs); + } + + return shardSpecs; + } + + private void unzip(File zip, File outDir) + { + try { + long size = 0L; + final byte[] buffer = new byte[1 << 13]; + try (ZipInputStream in = new ZipInputStream(new FileInputStream(zip))) { + for (ZipEntry entry = in.getNextEntry(); entry != null; entry = in.getNextEntry()) { + final String fileName = entry.getName(); + try (final OutputStream out = new BufferedOutputStream( + new FileOutputStream( + outDir.getAbsolutePath() + + File.separator + + fileName + ), 1 << 13 + )) { + for (int len = in.read(buffer); len >= 0; len = in.read(buffer)) { + if (len == 0) { + continue; + } + size += len; + out.write(buffer, 0, len); + } + out.flush(); + } + } + } + } + catch (IOException | RuntimeException exception) { + } + } +} diff --git a/extensions-contrib/orc-extensions/src/test/java/org/apache/druid/data/input/orc/DruidOrcNewInputFormatTest.java b/extensions-contrib/orc-extensions/src/test/java/org/apache/druid/data/input/orc/DruidOrcNewInputFormatTest.java new file mode 100644 index 000000000000..44e4257e0c42 --- /dev/null +++ b/extensions-contrib/orc-extensions/src/test/java/org/apache/druid/data/input/orc/DruidOrcNewInputFormatTest.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.orc; + +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.indexer.HadoopDruidIndexerConfig; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class DruidOrcNewInputFormatTest +{ + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + String timestamp = "2016-01-01T00:00:00.000Z"; + String col1 = "bar"; + String[] col2 = {"dat1", "dat2", "dat3"}; + double val1 = 1.1; + Job job; + HadoopDruidIndexerConfig config; + File testFile; + Path path; + FileSplit split; + + @Before + public void setUp() throws IOException + { + Configuration conf = new Configuration(); + job = Job.getInstance(conf); + + config = HadoopDruidIndexerConfig.fromFile(new File( + "example/hadoop_druid_orc_job.json")); + + config.intoConfiguration(job); + + testFile = makeOrcFile(); + path = new Path(testFile.getAbsoluteFile().toURI()); + split = new FileSplit(path, 0, testFile.length(), null); + + } + + @Test + public void testRead() throws IOException, InterruptedException + { + InputFormat inputFormat = ReflectionUtils.newInstance(DruidOrcNewInputFormat.class, job.getConfiguration()); + + TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); + RecordReader reader = inputFormat.createRecordReader(split, context); + InputRowParser> parser = (InputRowParser>) config.getParser(); + + reader.initialize(split, context); + + reader.nextKeyValue(); + + Map data = (Map) reader.getCurrentValue(); + + MapBasedInputRow row = (MapBasedInputRow) parser.parseBatch(data).get(0); + + Assert.assertEquals(data, row.getEvent()); + Assert.assertTrue(row.getEvent().keySet().size() == 4); + Assert.assertEquals(DateTimes.of(timestamp), row.getTimestamp()); + Assert.assertEquals(parser.getParseSpec().getDimensionsSpec().getDimensionNames(), row.getDimensions()); + Assert.assertEquals(col1, row.getEvent().get("col1")); + Assert.assertEquals(Arrays.asList(col2), row.getDimension("col2")); + + reader.close(); + } + + @Test + public void testReadDateColumn() throws IOException, InterruptedException + { + File testFile2 = makeOrcFileWithDate(); + Path path = new Path(testFile2.getAbsoluteFile().toURI()); + FileSplit split = new FileSplit(path, 0, testFile2.length(), null); + + InputFormat inputFormat = ReflectionUtils.newInstance(DruidOrcNewInputFormat.class, job.getConfiguration()); + + TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); + RecordReader reader = inputFormat.createRecordReader(split, context); + InputRowParser> parser = (InputRowParser>) config.getParser(); + + reader.initialize(split, context); + + reader.nextKeyValue(); + + Map data = (Map) reader.getCurrentValue(); + + MapBasedInputRow row = (MapBasedInputRow) parser.parseBatch(data).get(0); + + Assert.assertEquals(data, row.getEvent()); + Assert.assertTrue(row.getEvent().keySet().size() == 4); + Assert.assertEquals(DateTimes.of(timestamp), row.getTimestamp()); + Assert.assertEquals(parser.getParseSpec().getDimensionsSpec().getDimensionNames(), row.getDimensions()); + Assert.assertEquals(col1, row.getEvent().get("col1")); + Assert.assertEquals(Arrays.asList(col2), row.getDimension("col2")); + + reader.close(); + } + + private File makeOrcFile() throws IOException + { + final File dir = temporaryFolder.newFolder(); + final File testOrc = new File(dir, "test.orc"); + TypeDescription schema = TypeDescription.createStruct() + .addField("timestamp", TypeDescription.createString()) + .addField("col1", TypeDescription.createString()) + .addField("col2", TypeDescription.createList(TypeDescription.createString())) + .addField("val1", TypeDescription.createFloat()); + Configuration conf = new Configuration(); + Writer writer = OrcFile.createWriter( + new Path(testOrc.getPath()), + OrcFile.writerOptions(conf) + .setSchema(schema) + .stripeSize(100000) + .bufferSize(10000) + .compress(CompressionKind.ZLIB) + .version(OrcFile.Version.CURRENT) + ); + VectorizedRowBatch batch = schema.createRowBatch(); + batch.size = 1; + ((BytesColumnVector) batch.cols[0]).setRef( + 0, + StringUtils.toUtf8(timestamp), + 0, + timestamp.length() + ); + ((BytesColumnVector) batch.cols[1]).setRef(0, StringUtils.toUtf8(col1), 0, col1.length()); + + ListColumnVector listColumnVector = (ListColumnVector) batch.cols[2]; + listColumnVector.childCount = col2.length; + listColumnVector.lengths[0] = 3; + for (int idx = 0; idx < col2.length; idx++) { + ((BytesColumnVector) listColumnVector.child).setRef( + idx, + StringUtils.toUtf8(col2[idx]), + 0, + col2[idx].length() + ); + } + + ((DoubleColumnVector) batch.cols[3]).vector[0] = val1; + writer.addRowBatch(batch); + writer.close(); + + return testOrc; + } + + private File makeOrcFileWithDate() throws IOException + { + final File dir = temporaryFolder.newFolder(); + final File testOrc = new File(dir, "test-2.orc"); + TypeDescription schema = TypeDescription.createStruct() + .addField("timestamp", TypeDescription.createDate()) + .addField("col1", TypeDescription.createString()) + .addField("col2", TypeDescription.createList(TypeDescription.createString())) + .addField("val1", TypeDescription.createFloat()); + Configuration conf = new Configuration(); + Writer writer = OrcFile.createWriter( + new Path(testOrc.getPath()), + OrcFile.writerOptions(conf) + .setSchema(schema) + .stripeSize(100000) + .bufferSize(10000) + .compress(CompressionKind.ZLIB) + .version(OrcFile.Version.CURRENT) + ); + VectorizedRowBatch batch = schema.createRowBatch(); + batch.size = 1; + DateTime ts = DateTimes.of(timestamp); + + // date is stored as long column vector with number of days since epoch + ((LongColumnVector) batch.cols[0]).vector[0] = + TimeUnit.MILLISECONDS.toDays(ts.minus(DateTimes.EPOCH.getMillis()).getMillis()); + + ((BytesColumnVector) batch.cols[1]).setRef(0, StringUtils.toUtf8(col1), 0, col1.length()); + + ListColumnVector listColumnVector = (ListColumnVector) batch.cols[2]; + listColumnVector.childCount = col2.length; + listColumnVector.lengths[0] = 3; + for (int idx = 0; idx < col2.length; idx++) { + ((BytesColumnVector) listColumnVector.child).setRef( + idx, + StringUtils.toUtf8(col2[idx]), + 0, + col2[idx].length() + ); + } + + ((DoubleColumnVector) batch.cols[3]).vector[0] = val1; + writer.addRowBatch(batch); + writer.close(); + + return testOrc; + } +}