diff --git a/docs/Documentation/UserGuideV0.7.0/2-Concept.md b/docs/Documentation/UserGuideV0.7.0/2-Concept.md index 7dda96fdb471d..aee51988aff46 100644 --- a/docs/Documentation/UserGuideV0.7.0/2-Concept.md +++ b/docs/Documentation/UserGuideV0.7.0/2-Concept.md @@ -191,7 +191,6 @@ A column of data contains all values belonging to a time series and the timestam ## Data Type IoTDB supports six data types in total: BOOLEAN (Boolean), INT32 (Integer), INT64 (Long Integer), FLOAT (Single Precision Floating Point), DOUBLE (Double Precision Floating Point), TEXT (String). - The time series of FLOAT and DOUBLE type can specify (MAX\_POINT\_NUMBER, see [this page](#iotdb-query-statement) for more information on how to specify), which is the number of digits after the decimal point of the floating point number, if the encoding method is [RLE](#encoding) or [TS\_2DIFF](#encoding) (Refer to [Create Timeseries Statement](#chapter-5-iotdb-sql-documentation) for more information on how to specify). If MAX\_POINT\_NUMBER is not specified, the system will use [float\_precision](#encoding) in the configuration file "tsfile-format.properties" for configuration for the configuration method. * For Float data value, The data range is (-Integer.MAX_VALUE, Integer.MAX_VALUE), rather than Float.MAX_VALUE, and the max_point_number is 19, it is because of the limition of function Math.round(float) in Java. @@ -248,4 +247,4 @@ The four encodings described in the previous sections are applicable to differen When the time series is written and encoded as binary data according to the specified type, IoTDB compresses the data using compression technology to further improve space storage efficiency. Although both encoding and compression are designed to improve storage efficiency, encoding techniques are usually only available for specific data types (e.g., second-order differential encoding is only suitable for INT32 or INT64 data type, and storing floating-point numbers requires multiplying them by 10m to convert to integers), after which the data is converted to a binary stream. The compression method (SNAPPY) compresses the binary stream, so the use of the compression method is no longer limited by the data type. -IoTDB allows you to specify the compression method of the column when creating a time series. IoTDB now supports two kinds of compression: UNCOMPRESSED (no compression) and SNAPPY compression. The specified syntax for compression is detailed in [Create Timeseries Statement](#chapter-5-iotdb-sql-documentation). \ No newline at end of file +IoTDB allows you to specify the compression method of the column when creating a time series. IoTDB now supports two kinds of compression: UNCOMPRESSED (no compression) and SNAPPY compression. The specified syntax for compression is detailed in [Create Timeseries Statement](#chapter-5-iotdb-sql-documentation). diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGenerator.java index 20d93e89a8f21..e820d79e788b8 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGenerator.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithTimeGenerator.java @@ -35,6 +35,7 @@ public class EngineDataSetWithTimeGenerator extends QueryDataSet { private EngineTimeGenerator timeGenerator; private List readers; + //TODO add null filed /** * constructor of EngineDataSetWithTimeGenerator. @@ -62,11 +63,11 @@ public RowRecord next() throws IOException { RowRecord rowRecord = new RowRecord(timestamp); for (int i = 0; i < readers.size(); i++) { EngineReaderByTimeStamp reader = readers.get(i); - TsPrimitiveType tsPrimitiveType = reader.getValueInTimestamp(timestamp); - if (tsPrimitiveType == null) { + Object value = reader.getValueInTimestamp(timestamp); + if (value == null) { rowRecord.addField(new Field(null)); } else { - rowRecord.addField(getField(tsPrimitiveType.getValue(), dataTypes.get(i))); + rowRecord.addField(getField(value, dataTypes.get(i))); } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java index 13f0053781d8c..45fc0b4e741e7 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.executor; import java.io.IOException; @@ -31,9 +32,8 @@ import org.apache.iotdb.db.query.dataset.EngineDataSetWithTimeGenerator; import org.apache.iotdb.db.query.factory.SeriesReaderFactory; import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; -import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader; import org.apache.iotdb.db.query.reader.merge.PriorityMergeReaderByTimestamp; -import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader; +import org.apache.iotdb.db.query.reader.sequence.SequenceDataReaderByTimestamp; import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; @@ -106,13 +106,13 @@ private List getReadersOfSelectedPaths(List paths PriorityMergeReaderByTimestamp mergeReaderByTimestamp = new PriorityMergeReaderByTimestamp(); // reader for sequence data - SequenceDataReader tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(), - null, context); + SequenceDataReaderByTimestamp tsFilesReader = new SequenceDataReaderByTimestamp( + queryDataSource.getSeqDataSource(), context); mergeReaderByTimestamp.addReaderWithPriority(tsFilesReader, 1); // reader for unSequence data - PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance() - .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null); + PriorityMergeReaderByTimestamp unSeqMergeReader = SeriesReaderFactory.getInstance() + .createUnSeqMergeReaderByTimestamp(queryDataSource.getOverflowSeriesDataSource()); mergeReaderByTimestamp.addReaderWithPriority(unSeqMergeReader, 2); readersOfSelectedSeries.add(mergeReaderByTimestamp); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java index 51ce94b642370..f07c97cfbd568 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java @@ -28,11 +28,14 @@ import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.db.query.reader.IReader; +import org.apache.iotdb.db.query.reader.mem.MemChunkReaderByTimestamp; import org.apache.iotdb.db.query.reader.mem.MemChunkReaderWithFilter; import org.apache.iotdb.db.query.reader.mem.MemChunkReaderWithoutFilter; import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader; +import org.apache.iotdb.db.query.reader.merge.PriorityMergeReaderByTimestamp; import org.apache.iotdb.db.query.reader.sequence.SealedTsFilesReader; import org.apache.iotdb.db.query.reader.unsequence.EngineChunkReader; +import org.apache.iotdb.db.query.reader.unsequence.EngineChunkReaderByTimestamp; import org.apache.iotdb.db.utils.QueryUtils; import org.apache.iotdb.tsfile.common.constant.StatisticConstant; import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; @@ -45,6 +48,7 @@ import org.apache.iotdb.tsfile.read.filter.DigestForFilter; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp; import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithFilter; import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader; @@ -125,7 +129,50 @@ public PriorityMergeReader createUnSeqMergeReader( return unSeqMergeReader; } - // TODO createUnSeqMergeReaderByTime a method with filter + /** + * This method is used to create unsequence insert reader by timestamp for IoTDB request, such as + * query, aggregation and groupby request. + */ + public PriorityMergeReaderByTimestamp createUnSeqMergeReaderByTimestamp( + OverflowSeriesDataSource overflowSeriesDataSource) + throws IOException { + + PriorityMergeReaderByTimestamp unSeqMergeReader = new PriorityMergeReaderByTimestamp(); + + int priorityValue = 1; + + for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource + .getOverflowInsertFileList()) { + + // store only one opened file stream into manager, to avoid too many opened files + TsFileSequenceReader unClosedTsFileReader = FileReaderManager.getInstance() + .get(overflowInsertFile.getFilePath(), false); + + ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(unClosedTsFileReader); + + for (ChunkMetaData chunkMetaData : overflowInsertFile.getChunkMetaDataList()) { + + Chunk chunk = chunkLoader.getChunk(chunkMetaData); + ChunkReaderByTimestamp chunkReader = new ChunkReaderByTimestamp(chunk); + + unSeqMergeReader + .addReaderWithPriority( + new EngineChunkReaderByTimestamp(chunkReader, unClosedTsFileReader), + priorityValue); + priorityValue++; + } + } + + // add reader for MemTable + if (overflowSeriesDataSource.hasRawChunk()) { + unSeqMergeReader.addReaderWithPriority( + new MemChunkReaderByTimestamp(overflowSeriesDataSource.getReadableMemChunk()), + priorityValue); + } + + // TODO add external sort when needed + return unSeqMergeReader; + } /** * This method is used to construct reader for merge process in IoTDB. To merge only one TsFile diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReaderByTimestamp.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReaderByTimestamp.java index dbd6beb7da5d4..82c4dcfb32d64 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReaderByTimestamp.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReaderByTimestamp.java @@ -16,17 +16,18 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.mem; import java.io.IOException; import java.util.Iterator; import org.apache.iotdb.db.engine.memtable.TimeValuePairSorter; +import org.apache.iotdb.db.query.reader.IReader; import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; import org.apache.iotdb.db.utils.TimeValuePair; -import org.apache.iotdb.db.utils.TsPrimitiveType; import org.apache.iotdb.tsfile.read.common.BatchData; -public class MemChunkReaderByTimestamp implements EngineReaderByTimeStamp { +public class MemChunkReaderByTimestamp implements EngineReaderByTimeStamp, IReader { private Iterator timeValuePairIterator; private boolean hasCachedTimeValuePair; @@ -65,15 +66,16 @@ public void close() { } // TODO consider change timeValuePairIterator to List structure, and use binary search instead of - // sequential search - @Override - public TsPrimitiveType getValueInTimestamp(long timestamp) throws IOException { + /** + * sequential search + */ + @Override public Object getValueInTimestamp(long timestamp) throws IOException { while (hasNext()) { TimeValuePair timeValuePair = next(); - long time = timeValuePair.getTimestamp(); - if (time == timestamp) { - return timeValuePair.getValue(); - } else if (time > timestamp) { + long currentMemTime = timeValuePair.getTimestamp(); + if (currentMemTime == timestamp) { + return timeValuePair.getValue().getValue(); + } else if (currentMemTime > timestamp) { hasCachedTimeValuePair = true; cachedTimeValuePair = timeValuePair; break; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReaderWithoutFilter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReaderWithoutFilter.java index bae74968dbb0a..a23260380892e 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReaderWithoutFilter.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReaderWithoutFilter.java @@ -24,7 +24,9 @@ import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.tsfile.read.common.BatchData; -// TODO merge MemChunkReaderWithoutFilter and MemChunkReaderWithFilter to one class +/** + * TODO merge MemChunkReaderWithoutFilter and MemChunkReaderWithFilter to one class + */ public class MemChunkReaderWithoutFilter implements IReader { private Iterator timeValuePairIterator; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/EngineReaderByTimeStamp.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/EngineReaderByTimeStamp.java index 8ddf0add06b72..ca58f4fad7d87 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/EngineReaderByTimeStamp.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/EngineReaderByTimeStamp.java @@ -16,17 +16,20 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.merge; import java.io.IOException; -import org.apache.iotdb.db.query.reader.IReader; -import org.apache.iotdb.db.utils.TsPrimitiveType; -public interface EngineReaderByTimeStamp extends IReader { +public interface EngineReaderByTimeStamp { /** * Given a timestamp, the reader is supposed to return the corresponding value in the timestamp. * If no value in this timestamp, null will be returned. */ - TsPrimitiveType getValueInTimestamp(long timestamp) throws IOException; + Object getValueInTimestamp(long timestamp) throws IOException; + + boolean hasNext() throws IOException; + + void close() throws IOException; } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReaderByTimestamp.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReaderByTimestamp.java index 244291a11856c..f6ed7c7d28d68 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReaderByTimestamp.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReaderByTimestamp.java @@ -16,44 +16,61 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.merge; import java.io.IOException; -import org.apache.iotdb.db.utils.TimeValuePair; -import org.apache.iotdb.db.utils.TsPrimitiveType; +import java.util.ArrayList; +import java.util.List; /** - * TODO the process of PriorityMergeReaderByTimestamp can be optimized. + *

+ * Usage: Get value in timestamp by sorting time-value pair in multiple readers with time and + * priority. (1) merge multiple chunk group readers in the unsequence file (2)merge sequence reader, + * unsequence reader and mem reader + *

*/ -public class PriorityMergeReaderByTimestamp extends PriorityMergeReader implements - EngineReaderByTimeStamp { +public class PriorityMergeReaderByTimestamp implements EngineReaderByTimeStamp { + + private List readerList = new ArrayList<>(); + private List priorityList = new ArrayList<>(); - private boolean hasCachedTimeValuePair; - private TimeValuePair cachedTimeValuePair; + /** + * This function doesn't sort reader by priority. So you have to call this function in order of + * reader priority from small to large. + */ + public void addReaderWithPriority(EngineReaderByTimeStamp reader, int priority) { + readerList.add(reader); + priorityList.add(priority); + } @Override - public TsPrimitiveType getValueInTimestamp(long timestamp) throws IOException { - - if (hasCachedTimeValuePair) { - if (cachedTimeValuePair.getTimestamp() == timestamp) { - hasCachedTimeValuePair = false; - return cachedTimeValuePair.getValue(); - } else if (cachedTimeValuePair.getTimestamp() > timestamp) { - return null; + public Object getValueInTimestamp(long timestamp) throws IOException { + Object value = null; + for (int i = readerList.size() - 1; i >= 0; i--) { + value = readerList.get(i).getValueInTimestamp(timestamp); + if (value != null) { + return value; } } + return value; + } - while (hasNext()) { - cachedTimeValuePair = next(); - if (cachedTimeValuePair.getTimestamp() == timestamp) { - hasCachedTimeValuePair = false; - return cachedTimeValuePair.getValue(); - } else if (cachedTimeValuePair.getTimestamp() > timestamp) { - hasCachedTimeValuePair = true; - return null; - } + @Override + public void close() throws IOException { + for (EngineReaderByTimeStamp reader : readerList) { + reader.close(); } + } - return null; + @Override + public boolean hasNext() throws IOException { + for (int i = readerList.size() - 1; i >= 0; i--) { + if (readerList.get(i).hasNext()) { + return true; + } + } + return false; } + } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReaderByTimestamp.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReaderByTimestamp.java new file mode 100644 index 0000000000000..9c4376d12ff41 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReaderByTimestamp.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.reader.sequence; + +import java.io.IOException; +import java.util.List; +import org.apache.iotdb.db.engine.filenode.IntervalFileNode; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.FileReaderManager; +import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; +import org.apache.iotdb.db.utils.QueryUtils; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.controller.ChunkLoader; +import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl; +import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl; +import org.apache.iotdb.tsfile.read.reader.series.SeriesReaderByTimestamp; + +public class SealedTsFilesReaderByTimestamp implements EngineReaderByTimeStamp { + + private Path seriesPath; + private List sealedTsFiles; + private int nextIntervalFileIndex; + private SeriesReaderByTimestamp seriesReader; + private QueryContext context; + + /** + * init with seriesPath and sealedTsFiles. + */ + public SealedTsFilesReaderByTimestamp(Path seriesPath, List sealedTsFiles, + QueryContext context) { + this.seriesPath = seriesPath; + this.sealedTsFiles = sealedTsFiles; + this.nextIntervalFileIndex = 0; + this.seriesReader = null; + this.context = context; + } + + @Override + public Object getValueInTimestamp(long timestamp) throws IOException { + Object value = null; + if (seriesReader != null) { + value = seriesReader.getValueInTimestamp(timestamp); + if (value != null || seriesReader.hasNext()) { + return value; + } + } + constructReader(timestamp); + if (seriesReader != null) { + value = seriesReader.getValueInTimestamp(timestamp); + if (value != null || seriesReader.hasNext()) { + return value; + } + } + + return value; + } + + @Override + public boolean hasNext() throws IOException { + if (seriesReader != null && seriesReader.hasNext()) { + return true; + } + while (nextIntervalFileIndex < sealedTsFiles.size()) { + initSingleTsFileReader(sealedTsFiles.get(nextIntervalFileIndex), context); + nextIntervalFileIndex++; + if(seriesReader.hasNext()){ + return true; + } + } + return false; + } + + @Override + public void close() throws IOException { + // file streams are managed uniformly. + } + + /** + * construct reader with the file that might overlap this timestamp. + * @param timestamp + * @throws IOException + */ + private void constructReader(long timestamp) throws IOException { + while (nextIntervalFileIndex < sealedTsFiles.size()) { + if (singleTsFileSatisfied(sealedTsFiles.get(nextIntervalFileIndex), timestamp)) { + initSingleTsFileReader(sealedTsFiles.get(nextIntervalFileIndex), context); + } + nextIntervalFileIndex++; + } + } + + /** + * Judge whether the file should be skipped. + */ + private boolean singleTsFileSatisfied(IntervalFileNode fileNode, long timestamp) { + long endTime = fileNode.getEndTime(seriesPath.getDevice()); + return endTime >= timestamp; + } + + private void initSingleTsFileReader(IntervalFileNode fileNode, QueryContext context) + throws IOException { + + // to avoid too many opened files + TsFileSequenceReader tsFileReader = FileReaderManager.getInstance() + .get(fileNode.getFilePath(), true); + + MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader); + List metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath); + + List pathModifications = context.getPathModifications(fileNode.getModFile(), + seriesPath.getFullPath()); + if (!pathModifications.isEmpty()) { + QueryUtils.modifyChunkMetaData(metaDataList, pathModifications); + } + ChunkLoader chunkLoader = new ChunkLoaderImpl(tsFileReader); + + seriesReader = new SeriesReaderByTimestamp(chunkLoader, metaDataList); + + } + +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestamp.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestamp.java new file mode 100644 index 0000000000000..cf634265aa230 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestamp.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.reader.sequence; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.reader.mem.MemChunkReaderByTimestamp; +import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; + +public class SequenceDataReaderByTimestamp implements EngineReaderByTimeStamp { + + private List seriesReaders; + private int nextSeriesReaderIndex; + private EngineReaderByTimeStamp currentSeriesReader; + + /** + * init with globalSortedSeriesDataSource and filter. + */ + public SequenceDataReaderByTimestamp(GlobalSortedSeriesDataSource sources, QueryContext context) + throws IOException { + seriesReaders = new ArrayList<>(); + + nextSeriesReaderIndex = 0; + + // add reader for sealed TsFiles + if (sources.hasSealedTsFiles()) { + seriesReaders.add( + new SealedTsFilesReaderByTimestamp(sources.getSeriesPath(), sources.getSealedTsFiles(), + context)); + } + + // add reader for unSealed TsFile + if (sources.hasUnsealedTsFile()) { + seriesReaders.add(new UnSealedTsFilesReaderByTimestamp(sources.getUnsealedTsFile())); + } + + // add data in memTable + if (sources.hasRawSeriesChunk()) { + seriesReaders.add(new MemChunkReaderByTimestamp(sources.getReadableChunk())); + } + + } + + /** + * This method is used only in unit test. + * @param seriesReaders + */ + public SequenceDataReaderByTimestamp(List seriesReaders){ + this.seriesReaders = seriesReaders; + nextSeriesReaderIndex = 0; + } + + + @Override + public Object getValueInTimestamp(long timestamp) throws IOException { + Object value = null; + if (currentSeriesReader != null) { + value = currentSeriesReader.getValueInTimestamp(timestamp); + if (value != null || currentSeriesReader.hasNext()) { + return value; + } + } + + while (nextSeriesReaderIndex < seriesReaders.size()) { + currentSeriesReader = seriesReaders.get(nextSeriesReaderIndex++); + if (currentSeriesReader != null) { + value = currentSeriesReader.getValueInTimestamp(timestamp); + if (value != null || currentSeriesReader.hasNext()) { + return value; + } + } + } + return value; + } + + @Override + public boolean hasNext() throws IOException { + if (currentSeriesReader != null && currentSeriesReader.hasNext()) { + return true; + } + while (nextSeriesReaderIndex < seriesReaders.size()) { + currentSeriesReader = seriesReaders.get(nextSeriesReaderIndex++); + if (currentSeriesReader != null && currentSeriesReader.hasNext()) { + return true; + } + } + return false; + } + + + @Override + public void close() throws IOException { + for (EngineReaderByTimeStamp seriesReader : seriesReaders) { + seriesReader.close(); + } + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFilesReaderByTimestamp.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFilesReaderByTimestamp.java new file mode 100644 index 0000000000000..4cd0dfb012257 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFilesReaderByTimestamp.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.reader.sequence; + +import java.io.IOException; +import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile; +import org.apache.iotdb.db.query.control.FileReaderManager; +import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.controller.ChunkLoader; +import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl; +import org.apache.iotdb.tsfile.read.reader.series.SeriesReaderByTimestamp; + +public class UnSealedTsFilesReaderByTimestamp implements EngineReaderByTimeStamp { + + protected Path seriesPath; + private SeriesReaderByTimestamp unSealedReader; + + /** + * Construct funtion for UnSealedTsFileReader. + * + * @param unsealedTsFile -param to initial + */ + public UnSealedTsFilesReaderByTimestamp(UnsealedTsFile unsealedTsFile) throws IOException { + + TsFileSequenceReader unClosedTsFileReader = FileReaderManager.getInstance() + .get(unsealedTsFile.getFilePath(), + false); + ChunkLoader chunkLoader = new ChunkLoaderImpl(unClosedTsFileReader); + unSealedReader = new SeriesReaderByTimestamp(chunkLoader, + unsealedTsFile.getChunkMetaDataList()); + + + } + + @Override + public Object getValueInTimestamp(long timestamp) throws IOException { + return unSealedReader.getValueInTimestamp(timestamp); + } + + @Override + public boolean hasNext() throws IOException { + return unSealedReader.hasNext(); + } + + @Override + public void close() throws IOException { + + } + +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/unsequence/EngineChunkReaderByTimestamp.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/unsequence/EngineChunkReaderByTimestamp.java new file mode 100644 index 0000000000000..dbdf31fcfad8c --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/unsequence/EngineChunkReaderByTimestamp.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.reader.unsequence; + +import java.io.IOException; +import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp; + +public class EngineChunkReaderByTimestamp implements EngineReaderByTimeStamp { + + private ChunkReaderByTimestamp chunkReader; + private BatchData data; + + /** + * Each EngineChunkReader has a corresponding UnClosedTsFileReader, when EngineChunkReader is + * closed, UnClosedTsFileReader also should be closed in meanwhile. + */ + private TsFileSequenceReader unClosedTsFileReader; + + public EngineChunkReaderByTimestamp(ChunkReaderByTimestamp chunkReader, + TsFileSequenceReader unClosedTsFileReader) { + this.chunkReader = chunkReader; + this.unClosedTsFileReader = unClosedTsFileReader; + } + + /** + * get value with time equals timestamp. If there is no such point, return null. + */ + @Override + public Object getValueInTimestamp(long timestamp) throws IOException { + + while (data != null) { + Object value = data.getValueInTimestamp(timestamp); + if (value != null) { + return value; + } + if (data.hasNext()) { + return null; + } else { + chunkReader.setCurrentTimestamp(timestamp); + if (chunkReader.hasNextBatch()) { + data = chunkReader.nextBatch(); + } else { + return null; + } + } + } + + return null; + } + + @Override + public boolean hasNext() throws IOException { + if (data != null && data.hasNext()) { + return true; + } + if (chunkReader != null && chunkReader.hasNextBatch()) { + data = chunkReader.nextBatch(); + return true; + } + return false; + } + + @Override + public void close() throws IOException { + this.chunkReader.close(); + this.unClosedTsFileReader.close(); + } +} diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/FakedSeriesReaderByTimestamp.java b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/FakedSeriesReaderByTimestamp.java new file mode 100644 index 0000000000000..ae7f496657942 --- /dev/null +++ b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/FakedSeriesReaderByTimestamp.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.reader; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; +import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.db.utils.TsPrimitiveType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +public class FakedSeriesReaderByTimestamp implements EngineReaderByTimeStamp { + private Iterator iterator; + private boolean hasCachedTimeValuePair = false; + private TimeValuePair cachedTimeValuePair; + + public FakedSeriesReaderByTimestamp(long startTime, int size, int interval, int modValue) { + long time = startTime; + List list = new ArrayList<>(); + for (int i = 0; i < size; i++) { + list.add( + new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.INT64, time % modValue))); + time += interval; + } + iterator = list.iterator(); + } + + @Override + public Object getValueInTimestamp(long timestamp) throws IOException { + if(hasCachedTimeValuePair){ + if(timestamp == cachedTimeValuePair.getTimestamp()){ + hasCachedTimeValuePair = false; + return cachedTimeValuePair.getValue().getValue(); + } + else if(timestamp > cachedTimeValuePair.getTimestamp()){ + hasCachedTimeValuePair = false; + } + else { + return null; + } + } + while(iterator.hasNext()){ + cachedTimeValuePair = iterator.next(); + if(timestamp == cachedTimeValuePair.getTimestamp()){ + return cachedTimeValuePair.getValue().getValue(); + } + else if(timestamp < cachedTimeValuePair.getTimestamp()){ + hasCachedTimeValuePair = true; + break; + } + } + return null; + } + + @Override + public boolean hasNext() throws IOException { + return hasCachedTimeValuePair || iterator.hasNext(); + } + + @Override + public void close() throws IOException { + + } +} diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReaderByTimestampTest.java b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReaderByTimestampTest.java index 03771e80fa748..2a5104cf3184f 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReaderByTimestampTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReaderByTimestampTest.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.merge; import java.io.IOException; @@ -23,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Random; +import org.apache.iotdb.db.query.reader.IReader; import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.db.utils.TsPrimitiveType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -50,22 +52,26 @@ public void test() throws IOException { Random random = new Random(); for (long time = 4; time < 1080 + 200 * 13 + 600; ) { - TsPrimitiveType value = priorityReader.getValueInTimestamp(time); - // System.out.println("time = " + time + " value = " + value); + Long value = (Long) priorityReader.getValueInTimestamp(time); + if(time < 1080 + 199 * 13){ + Assert.assertTrue(priorityReader.hasNext()); + } + + //System.out.println("time = " + time + " value = " + value); if (time < 100) { // null Assert.assertNull(value); } else if (time < 850) { // reader 1 if ((time - 100) % 5 == 0) { - Assert.assertEquals(time % 11, value.getLong()); + Assert.assertEquals(time % 11, value.longValue()); } } else if (time < 1080) { // reader 2, reader 1 if (time >= 850 && (time - 850) % 7 == 0) { - Assert.assertEquals(time % 19, value.getLong()); + Assert.assertEquals(time % 19, value.longValue()); } else if (time < 1100 && (time - 100) % 5 == 0) { - Assert.assertEquals(time % 11, value.getLong()); + Assert.assertEquals(time % 11, value.longValue()); } else { Assert.assertNull(value); } @@ -73,11 +79,11 @@ public void test() throws IOException { } else if (time < 1080 + 200 * 13) { // reader 3, reader 2, reader 1 if (time >= 1080 && (time - 1080) % 13 == 0) { - Assert.assertEquals(time % 31, value.getLong()); + Assert.assertEquals(time % 31, value.longValue()); } else if (time < 850 + 200 * 7 && (time - 850) % 7 == 0) { - Assert.assertEquals(time % 19, value.getLong()); + Assert.assertEquals(time % 19, value.longValue()); } else if (time < 1100 && (time - 100) % 5 == 0) { - Assert.assertEquals(time % 11, value.getLong()); + Assert.assertEquals(time % 11, value.longValue()); } else { Assert.assertNull(value); } @@ -87,24 +93,10 @@ public void test() throws IOException { } time += random.nextInt(50) + 1; } - - while (priorityReader.hasNext()) { - TimeValuePair timeValuePair = priorityReader.next(); - long time = timeValuePair.getTimestamp(); - long value = timeValuePair.getValue().getLong(); - if (time < 850) { - Assert.assertEquals(time % 11, value); - } else if (time < 1080) { - Assert.assertEquals(time % 19, value); - } else { - Assert.assertEquals(time % 31, value); - } - cnt++; - } - } - public static class FakedPrioritySeriesReaderByTimestamp implements EngineReaderByTimeStamp { + public static class FakedPrioritySeriesReaderByTimestamp implements EngineReaderByTimeStamp, + IReader { private Iterator iterator; private long currentTimeStamp = Long.MIN_VALUE; @@ -159,17 +151,17 @@ public void close() { } @Override - public TsPrimitiveType getValueInTimestamp(long timestamp) throws IOException { + public Object getValueInTimestamp(long timestamp) throws IOException { this.currentTimeStamp = timestamp; if (hasCachedTimeValuePair && cachedTimeValuePair.getTimestamp() == timestamp) { hasCachedTimeValuePair = false; - return cachedTimeValuePair.getValue(); + return cachedTimeValuePair.getValue().getValue(); } if (hasNext()) { cachedTimeValuePair = next(); if (cachedTimeValuePair.getTimestamp() == timestamp) { - return cachedTimeValuePair.getValue(); + return cachedTimeValuePair.getValue().getValue(); } else if (cachedTimeValuePair.getTimestamp() > timestamp) { hasCachedTimeValuePair = true; } diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestampTest.java b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestampTest.java new file mode 100644 index 0000000000000..6706653c457a2 --- /dev/null +++ b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReaderByTimestampTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.reader.sequence; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import org.apache.iotdb.db.query.reader.FakedSeriesReaderByTimestamp; +import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; +import org.junit.Assert; +import org.junit.Test; + +public class SequenceDataReaderByTimestampTest { + + /** + * Test hasNext() and getValueInTimestamp(long timestamp) method in SequenceDataReaderByTimestamp. + */ + @Test + public void text() throws IOException { + List readers = new ArrayList<>(); + FakedSeriesReaderByTimestamp sealedTsFile = new FakedSeriesReaderByTimestamp(100,1000, 7, 11); + FakedSeriesReaderByTimestamp unsealedTsFile = new FakedSeriesReaderByTimestamp(100 + 1005 *7,100, 17, 3); + FakedSeriesReaderByTimestamp dataInMemory = new FakedSeriesReaderByTimestamp(100 + 1005 *7 + 100 * 17,60, 19, 23); + readers.add(sealedTsFile); + readers.add(unsealedTsFile); + readers.add(dataInMemory); + SequenceDataReaderByTimestamp sequenceReader = new SequenceDataReaderByTimestamp(readers); + + long startTime = 100l; + long endTime = 100 + 1005 *7 + 100 * 17 + 59 * 19; + Random random = new Random(); + for(long time = startTime - 50; time < endTime + 50; time++){ + if(time < endTime){ + Assert.assertTrue(sequenceReader.hasNext()); + } + time += 1 + random.nextInt(10); + Object value = sequenceReader.getValueInTimestamp(time); + if(time < 100){ + Assert.assertNull(value); + } + //sealed tsfile + else if(time < 100 + 1005 *7){ + if((time - 100) % 7 != 0 || time > 100 + 999*7){ + Assert.assertNull(value); + } + else { + Assert.assertEquals(time % 11, value); + } + } + //unsealed tsfile + else if(time < 100 + 1005 *7 + 100 * 17){ + if((time - (100 + 1005 *7)) % 17 != 0 ){ + Assert.assertNull(value); + } + else { + Assert.assertEquals(time % 3, value); + } + } + //memory data + else if(time < 100 + 1005 *7 + 100 * 17 + 60 * 19){ + if((time - (100 + 1005 *7 + 100 * 17)) % 19 != 0 ){ + Assert.assertNull(value); + } + else { + Assert.assertEquals(time % 23, value); + } + } + else { + Assert.assertNull(value); + } + } + + + } + +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java index ffb37e263903f..c2160700db823 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.tsfile.read.common; import java.util.ArrayList; @@ -527,4 +528,19 @@ public void setAnObject(int idx, Comparable v) { public int length() { return this.timeLength; } + + public Object getValueInTimestamp(long time) { + while (hasNext()) { + if (currentTime() < time) { + next(); + } else if (currentTime() == time) { + Object value = currentValue(); + next(); + return value; + } else { + return null; + } + } + return null; + } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/SeriesReaderByTimestamp.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/SeriesReaderByTimestamp.java index 070924f555d6b..735206f9f5a9b 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/SeriesReaderByTimestamp.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/SeriesReaderByTimestamp.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.tsfile.read.reader.series; import java.io.IOException; @@ -86,8 +87,10 @@ public Object getValueInTimestamp(long timestamp) throws IOException { } if (data.hasNext()) { + Object value = data.currentValue(); if (data.currentTime() == timestamp) { - return data.currentValue(); + data.next(); + return value; } return null; } else { @@ -102,6 +105,36 @@ public Object getValueInTimestamp(long timestamp) throws IOException { return null; } + /** + * Judge if the series reader has next time-value pair. + * + * @return true if has next, false if not. + */ + public boolean hasNext() throws IOException { + + if (chunkReader != null) { + if (data != null && data.hasNext()) { + return true; + } + while (chunkReader.hasNextBatch()) { + data = chunkReader.nextBatch(); + if (data != null && data.hasNext()) { + return true; + } + } + } + + while (constructNextSatisfiedChunkReader()) { + while (chunkReader.hasNextBatch()) { + data = chunkReader.nextBatch(); + if (data != null && data.hasNext()) { + return true; + } + } + } + return false; + } + private boolean constructNextSatisfiedChunkReader() throws IOException { while (currentChunkIndex < chunkMetaDataList.size()) { ChunkMetaData chunkMetaData = chunkMetaDataList.get(currentChunkIndex++); diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/ReaderTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/ReaderTest.java index 6bc2e8c226d24..ff25688ab85b7 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/ReaderTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/ReaderTest.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.List; +import java.util.Random; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.exception.write.WriteProcessException; import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; @@ -36,6 +37,7 @@ import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader; import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithFilter; import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithoutFilter; +import org.apache.iotdb.tsfile.read.reader.series.SeriesReaderByTimestamp; import org.apache.iotdb.tsfile.utils.TsFileGeneratorForTest; import org.junit.After; import org.junit.Assert; @@ -126,4 +128,27 @@ public void readWithFilterTest() throws IOException { } } } + + @Test + public void readByTimestampTest() throws IOException { + ChunkLoaderImpl seriesChunkLoader = new ChunkLoaderImpl(fileReader); + List chunkMetaDataList = metadataQuerierByFile + .getChunkMetaDataList(new Path("d1.s1")); + SeriesReaderByTimestamp seriesReader = new SeriesReaderByTimestamp(seriesChunkLoader, + chunkMetaDataList); + + long startTime = TsFileGeneratorForTest.START_TIMESTAMP; + long endTime = TsFileGeneratorForTest.START_TIMESTAMP + rowCount; + Random random = new Random(); + for (long time = startTime - 500; time < endTime + 500; ) { + time += random.nextInt(10) + 1; + Object value = seriesReader.getValueInTimestamp(time); + if (time < startTime || time >= endTime) { + Assert.assertNull(value); + } else { + int actualData = (int) ((time - startTime) * 10 + 1); + Assert.assertEquals(actualData, value); + } + } + } }