diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java index 629493f184b5c..b2f0e8c1392db 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java @@ -52,7 +52,6 @@ import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.encrypt.EncryptParameter; -import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.exception.StopReadTsFileByInterruptException; import org.apache.tsfile.exception.write.PageException; import org.apache.tsfile.file.metadata.IDeviceID; @@ -281,21 +280,9 @@ private void compactNonAlignedSeries( // timeseries metadata, in order to facilitate the reading of chunkMetadata directly by this // offset later. Here we don't need to deserialize chunk metadata, we can deserialize them and // get their schema later. - Map>> timeseriesMetadataOffsetMap = - new LinkedHashMap<>(); - - Map measurementDataTypeMap = new LinkedHashMap<>(); - Map compactionSeriesContextMap = deviceIterator.getCompactionSeriesContextOfCurrentDevice(); - - for (Map.Entry entry : compactionSeriesContextMap.entrySet()) { - timeseriesMetadataOffsetMap.put( - entry.getKey(), entry.getValue().getFileTimeseriesMetdataOffsetMap()); - measurementDataTypeMap.put(entry.getKey(), entry.getValue().getFinalType()); - } - - List allMeasurements = new ArrayList<>(timeseriesMetadataOffsetMap.keySet()); + List allMeasurements = new ArrayList<>(compactionSeriesContextMap.keySet()); allMeasurements.sort((String::compareTo)); int subTaskNums = Math.min(allMeasurements.size(), SUB_TASK_NUM); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java index d406286e37f64..64a940ae96087 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ISeqCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionTableSchemaCollector; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.MultiTsFileDeviceIterator; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.BatchedReadChunkAlignedSeriesCompactionExecutor; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.SingleSeriesCompactionExecutor; @@ -48,10 +49,12 @@ import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.Schema; import java.io.IOException; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Optional; @@ -147,7 +150,11 @@ public void perform() if (aligned) { compactAlignedSeries( - device, targetResources.get(currentTargetFileIndex), currentWriter, deviceIterator); + deviceIterator.getDatabaseName(), + device, + targetResources.get(currentTargetFileIndex), + currentWriter, + deviceIterator); } else { compactNotAlignedSeries( device, targetResources.get(currentTargetFileIndex), currentWriter, deviceIterator); @@ -224,6 +231,7 @@ public void setSummary(CompactionTaskSummary summary) { } private void compactAlignedSeries( + String database, IDeviceID device, TsFileResource targetResource, CompactionTsFileWriter writer, @@ -239,6 +247,7 @@ private void compactAlignedSeries( writer.startChunkGroup(device); BatchedReadChunkAlignedSeriesCompactionExecutor compactionExecutor = new BatchedReadChunkAlignedSeriesCompactionExecutor( + database, device, targetResource, readerAndChunkMetadataList, @@ -293,7 +302,7 @@ private void compactNotAlignedSeries( seriesIterator.getMetadataListForCurrentSeries(); // remove the chunk metadata whose data type not match the data type of last chunk readerAndChunkMetadataList = - filterDataTypeNotMatchedChunkMetadata(readerAndChunkMetadataList); + filterDataTypeNotMatchedChunkMetadata(device, measurement, readerAndChunkMetadataList); SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries = new SingleSeriesCompactionExecutor( device, measurement, readerAndChunkMetadataList, writer, targetResource, summary); @@ -304,42 +313,63 @@ private void compactNotAlignedSeries( private LinkedList>> filterDataTypeNotMatchedChunkMetadata( + IDeviceID deviceID, + String measurement, LinkedList>> readerAndChunkMetadataList) { if (readerAndChunkMetadataList.isEmpty()) { return readerAndChunkMetadataList; } - LinkedList>> result = new LinkedList<>(); // find correct data type TSDataType correctDataType = null; - for (int i = readerAndChunkMetadataList.size() - 1; i >= 0 && correctDataType == null; i--) { - List chunkMetadataList = readerAndChunkMetadataList.get(i).getRight(); - if (chunkMetadataList == null || chunkMetadataList.isEmpty()) { - continue; - } + boolean hasDifferentDataTypes = false; + Iterator>> descIterator = + readerAndChunkMetadataList.descendingIterator(); + while (descIterator.hasNext()) { + Pair> pair = descIterator.next(); + List chunkMetadataList = pair.right; + TSDataType dataTypeInCurrentFile = null; for (ChunkMetadata chunkMetadata : chunkMetadataList) { - if (chunkMetadata == null) { - continue; + if (chunkMetadata != null) { + dataTypeInCurrentFile = chunkMetadata.getDataType(); + break; } - correctDataType = chunkMetadata.getDataType(); + } + if (correctDataType == null) { + correctDataType = dataTypeInCurrentFile; + } else if (correctDataType != dataTypeInCurrentFile) { + hasDifferentDataTypes = true; break; } } - if (correctDataType == null) { + if (!hasDifferentDataTypes) { return readerAndChunkMetadataList; } + + IMeasurementSchema schema = + CompactionUtils.getLatestMeasurementSchemasForTreeModel( + deviceID, Collections.singletonList(measurement)) + .get(0); + if (schema != null) { + correctDataType = schema.getType(); + } + + LinkedList>> result = new LinkedList<>(); // check data type consistent and skip compact files with wrong data type for (Pair> tsFileSequenceReaderListPair : readerAndChunkMetadataList) { boolean dataTypeConsistent = true; for (ChunkMetadata chunkMetadata : tsFileSequenceReaderListPair.getRight()) { - if (chunkMetadata != null - && !MetadataUtils.canAlter(chunkMetadata.getDataType(), correctDataType)) { - dataTypeConsistent = false; + if (chunkMetadata == null) { + continue; + } + if (chunkMetadata.getDataType() == correctDataType) { break; } - if (chunkMetadata != null && chunkMetadata.getDataType() != correctDataType) { - chunkMetadata.setNewType(correctDataType); + if (!MetadataUtils.canAlter(chunkMetadata.getDataType(), correctDataType)) { + dataTypeConsistent = false; + break; } + chunkMetadata.setNewType(correctDataType); } if (!dataTypeConsistent) { continue; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionSeriesContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionSeriesContext.java index 8f6d017f2354a..fc208adfb267c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionSeriesContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionSeriesContext.java @@ -29,6 +29,7 @@ public class CompactionSeriesContext { Map> fileTimeseriesMetdataOffsetMap; TSDataType finalType; + boolean needUpdateDataType = false; public CompactionSeriesContext() { fileTimeseriesMetdataOffsetMap = new HashMap<>(); @@ -57,4 +58,16 @@ public void setFinalTypeIfAbsent(TSDataType finalType) { this.finalType = finalType; } } + + public void setFinalType(TSDataType finalType) { + this.finalType = finalType; + } + + public boolean isNeedUpdateDataType() { + return needUpdateDataType; + } + + public void setNeedUpdateDataType(boolean needUpdateDataType) { + this.needUpdateDataType = needUpdateDataType; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java index 61943047e541f..ac46cbb70bdd8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java @@ -24,12 +24,20 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.commons.path.PatternTreeMap; +import org.apache.iotdb.commons.schema.SchemaConstant; import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo; +import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; +import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher; +import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.db.service.metrics.FileMetrics; @@ -56,6 +64,7 @@ import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.fileSystem.FSFactoryProducer; import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.writer.TsFileIOWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,8 +94,15 @@ public class CompactionUtils { LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); private static final String SYSTEM = "system"; + private static ISchemaFetcher schemaFetcherForTest = null; + private CompactionUtils() {} + @TestOnly + public static void setSchemaFetcher(ISchemaFetcher schemaFetcher) { + CompactionUtils.schemaFetcherForTest = schemaFetcher; + } + /** * Update the targetResource. Move tmp target file to target file and serialize * xxx.tsfile.resource. @@ -640,4 +656,34 @@ private static void acquireCompactionReadRate(long size) { } rateLimiter.acquire((int) size); } + + public static List getLatestMeasurementSchemasForTreeModel( + IDeviceID deviceID, List measurements) { + if (measurements.isEmpty()) { + return Collections.emptyList(); + } + ISchemaFetcher schemaFetcher = + schemaFetcherForTest == null ? ClusterSchemaFetcher.getInstance() : schemaFetcherForTest; + PartialPath devicePath; + PathPatternTree patternTree = new PathPatternTree(); + try { + devicePath = CompactionPathUtils.getPath(deviceID); + for (String measurement : measurements) { + patternTree.appendFullPath(devicePath, measurement); + } + } catch (IllegalPathException e) { + throw new RuntimeException(e); + } + ISchemaTree schemaTree = + schemaFetcher.fetchRawSchemaInMeasurementLevel( + patternTree, + SchemaConstant.ALL_MATCH_SCOPE, + new MPPQueryContext(new QueryId("compaction")), + true); + DeviceSchemaInfo deviceSchemaInfo = schemaTree.searchDeviceSchemaInfo(devicePath, measurements); + if (deviceSchemaInfo == null) { + return Collections.nCopies(measurements.size(), null); + } + return deviceSchemaInfo.getMeasurementSchemaList(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java index 3129677109027..f2c0a49575d68 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java @@ -24,9 +24,11 @@ import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PatternTreeMap; +import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; -import org.apache.iotdb.commons.utils.MetadataUtils; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache; +import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; @@ -47,14 +49,17 @@ import org.apache.tsfile.read.TsFileDeviceIterator; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -245,6 +250,10 @@ public Pair nextDevice() throws IllegalPathException, IOExce return currentDevice; } + public String getDatabaseName() { + return databaseName; + } + public long getTTLForCurrentDevice() { return ttlForCurrentDevice; } @@ -253,6 +262,12 @@ public long getTimeLowerBoundForCurrentDevice() { return timeLowerBoundForCurrentDevice; } + public Map getAllSchemasOfCurrentDevice() throws IOException { + return ignoreAllNullRows + ? getAllSchemasOfCurrentDeviceForTree() + : getAllSchemasOfCurrentDeviceForTable(); + } + /** * Get all measurements and schemas of the current device from source files. Traverse all the * files from the newest to the oldest in turn and start traversing the index tree from the @@ -260,8 +275,9 @@ public long getTimeLowerBoundForCurrentDevice() { * * @throws IOException if io errors occurred */ - public Map getAllSchemasOfCurrentDevice() throws IOException { + public Map getAllSchemasOfCurrentDeviceForTree() throws IOException { Map schemaMap = new ConcurrentHashMap<>(); + Set seriesNeedToUpdateDataType = new HashSet<>(); // get schemas from the newest file to the oldest file for (TsFileResource resource : tsFileResourcesSortedByDesc) { if (!deviceIteratorMap.containsKey(resource) @@ -275,16 +291,75 @@ public Map getAllSchemasOfCurrentDevice() throws IOEx reader.getDeviceTimeseriesMetadata( timeseriesMetadataList, deviceIteratorMap.get(resource).getFirstMeasurementNodeOfCurrentDevice(), + seriesNeedToUpdateDataType, + true, + null); + for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) { + MeasurementSchema measurementSchema = schemaMap.get(timeseriesMetadata.getMeasurementId()); + if (measurementSchema == null) { + if (!timeseriesMetadata.getChunkMetadataList().isEmpty()) { + schemaMap.put( + timeseriesMetadata.getMeasurementId(), + reader.getMeasurementSchema(timeseriesMetadata.getChunkMetadataList())); + } + continue; + } + if (measurementSchema.getType() != timeseriesMetadata.getTsDataType()) { + seriesNeedToUpdateDataType.add(timeseriesMetadata.getMeasurementId()); + } + } + } + List latestMeasurementSchemas = + CompactionUtils.getLatestMeasurementSchemasForTreeModel( + currentDevice.left, new ArrayList<>(seriesNeedToUpdateDataType)); + for (IMeasurementSchema latestMeasurementSchema : latestMeasurementSchemas) { + if (latestMeasurementSchema != null) { + schemaMap.put( + latestMeasurementSchema.getMeasurementName(), + (MeasurementSchema) latestMeasurementSchema); + } + } + return schemaMap; + } + + private Map getAllSchemasOfCurrentDeviceForTable() throws IOException { + Map schemaMap = new ConcurrentHashMap<>(); + TsTable tsTable = + DataNodeTableCache.getInstance().getTable(databaseName, currentDevice.left.getTableName()); + // get schemas from the newest file to the oldest file + for (TsFileResource resource : tsFileResourcesSortedByDesc) { + TsFileDeviceIterator deviceIterator = deviceIteratorMap.get(resource); + if (!deviceIteratorMap.containsKey(resource) + || !deviceIterator.current().equals(currentDevice)) { + // if this tsfile has no more device or next device is not equals to the current device, + // which means this tsfile does not contain the current device, then skip it. + continue; + } + TsFileSequenceReader reader = readerMap.get(resource); + List timeseriesMetadataList = new ArrayList<>(); + reader.getDeviceTimeseriesMetadata( + timeseriesMetadataList, + deviceIterator.getFirstMeasurementNodeOfCurrentDevice(), schemaMap.keySet(), true, null); for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) { - if (!schemaMap.containsKey(timeseriesMetadata.getMeasurementId()) - && !timeseriesMetadata.getChunkMetadataList().isEmpty()) { - schemaMap.put( - timeseriesMetadata.getMeasurementId(), - reader.getMeasurementSchema(timeseriesMetadata.getChunkMetadataList())); + MeasurementSchema measurementSchema = schemaMap.get(timeseriesMetadata.getMeasurementId()); + if (measurementSchema != null) { + continue; + } + if (tsTable != null) { + TsTableColumnSchema columnSchema = + tsTable.getColumnSchema(timeseriesMetadata.getMeasurementId()); + if (columnSchema != null) { + measurementSchema = (MeasurementSchema) columnSchema.getMeasurementSchema(); + } + } + if (measurementSchema == null && !timeseriesMetadata.getChunkMetadataList().isEmpty()) { + measurementSchema = + reader.getMeasurementSchema(timeseriesMetadata.getChunkMetadataList()); } + schemaMap.put(timeseriesMetadata.getMeasurementId(), measurementSchema); } } return schemaMap; @@ -300,6 +375,7 @@ public Map getAllSchemasOfCurrentDevice() throws IOEx public Map getCompactionSeriesContextOfCurrentDevice() throws IOException { Map compactionSeriesContextMap = new HashMap<>(); + List seriesNeedToUpdateDataType = new ArrayList<>(); for (TsFileResource resource : tsFileResourcesSortedByDesc) { if (!deviceIteratorMap.containsKey(resource) || !deviceIteratorMap.get(resource).current().equals(currentDevice)) { @@ -318,23 +394,37 @@ public Map getCompactionSeriesContextOfCurrentD String measurementId = entrySet.getKey(); TimeseriesMetadata timeseriesMetadata = entrySet.getValue().left; Pair offset = entrySet.getValue().right; - TSDataType dataType = timeseriesMetadata.getTsDataType(); + TSDataType dataTypeOfCurrentFile = timeseriesMetadata.getTsDataType(); CompactionSeriesContext compactionSeriesContext = compactionSeriesContextMap.get(measurementId); if (compactionSeriesContext != null - && compactionSeriesContext.getFinalType() != null - && !MetadataUtils.canAlter(dataType, compactionSeriesContext.getFinalType())) { - continue; + && !compactionSeriesContext.isNeedUpdateDataType() + && compactionSeriesContext.getFinalType() != dataTypeOfCurrentFile) { + compactionSeriesContext.setNeedUpdateDataType(true); + seriesNeedToUpdateDataType.add(measurementId); } compactionSeriesContext = compactionSeriesContextMap.computeIfAbsent( measurementId, k -> new CompactionSeriesContext()); compactionSeriesContext.put(resource, offset); - compactionSeriesContext.setFinalTypeIfAbsent(dataType); + compactionSeriesContext.setFinalTypeIfAbsent(dataTypeOfCurrentFile); + } + } + + List measurementSchema = + CompactionUtils.getLatestMeasurementSchemasForTreeModel( + currentDevice.left, seriesNeedToUpdateDataType); + for (IMeasurementSchema iMeasurementSchema : measurementSchema) { + if (iMeasurementSchema == null) { + continue; } + compactionSeriesContextMap + .get(iMeasurementSchema.getMeasurementName()) + .setFinalType(iMeasurementSchema.getType()); } + return compactionSeriesContextMap; } @@ -347,14 +437,80 @@ public Map getCompactionSeriesContextOfCurrentD * endOffset> * @throws IOException if io errors occurred */ - @SuppressWarnings({"checkstyle:AtclauseOrderCheck", "squid:S3824"}) public Map>>> getTimeseriesSchemaAndMetadataOffsetOfCurrentDevice() throws IOException { + return ignoreAllNullRows + ? getTimeseriesSchemaAndMetadataOffsetOfCurrentDeviceForTree() + : getTimeseriesSchemaAndMetadataOffsetOfCurrentDeviceForTable(); + } + + @SuppressWarnings({"checkstyle:AtclauseOrderCheck", "squid:S3824"}) + public Map>>> + getTimeseriesSchemaAndMetadataOffsetOfCurrentDeviceForTree() throws IOException { Map>>> timeseriesMetadataOffsetMap = new LinkedHashMap<>(); + Set seriesNeedToUpdateDataType = new LinkedHashSet<>(); for (TsFileResource resource : tsFileResourcesSortedByDesc) { - if (!deviceIteratorMap.containsKey(resource) - || !deviceIteratorMap.get(resource).current().equals(currentDevice)) { + TsFileDeviceIterator iterator = deviceIteratorMap.get(resource); + if (iterator == null || !iterator.current().equals(currentDevice)) { + // if this tsfile has no more device or next device is not equals to the current device, + // which means this tsfile does not contain the current device, then skip it. + continue; + } + + CompactionTsFileReader reader = (CompactionTsFileReader) readerMap.get(resource); + + for (Map.Entry>> entrySet : + reader + .getTimeseriesMetadataAndOffsetByDevice( + deviceIteratorMap.get(resource).getFirstMeasurementNodeOfCurrentDevice(), + timeseriesMetadataOffsetMap.keySet(), + true) + .entrySet()) { + String measurementId = entrySet.getKey(); + Pair>> existedPair = + timeseriesMetadataOffsetMap.get(measurementId); + if (existedPair == null) { + MeasurementSchema schema = + reader.getMeasurementSchema(entrySet.getValue().left.getChunkMetadataList()); + existedPair = new Pair<>(schema, new HashMap<>()); + timeseriesMetadataOffsetMap.put(measurementId, existedPair); + } else if (!seriesNeedToUpdateDataType.contains(existedPair.getLeft()) + && existedPair.left.getType() != entrySet.getValue().getLeft().getTsDataType()) { + seriesNeedToUpdateDataType.add(existedPair.getLeft()); + } + existedPair.right.put(resource, entrySet.getValue().right); + } + } + List correctMeasurementSchemas = + CompactionUtils.getLatestMeasurementSchemasForTreeModel( + currentDevice.left, + seriesNeedToUpdateDataType.stream() + .map(IMeasurementSchema::getMeasurementName) + .collect(Collectors.toList())); + int i = 0; + for (MeasurementSchema measurementSchema : seriesNeedToUpdateDataType) { + IMeasurementSchema correctSchema = correctMeasurementSchemas.get(i); + i++; + if (correctSchema == null) { + continue; + } + measurementSchema.setDataType(correctSchema.getType()); + measurementSchema.setEncoding(correctSchema.getEncodingType()); + measurementSchema.setCompressionType(correctSchema.getCompressor()); + } + return timeseriesMetadataOffsetMap; + } + + public Map>>> + getTimeseriesSchemaAndMetadataOffsetOfCurrentDeviceForTable() throws IOException { + Map>>> + timeseriesMetadataOffsetMap = new LinkedHashMap<>(); + TsTable tsTable = + DataNodeTableCache.getInstance().getTable(databaseName, currentDevice.left.getTableName()); + for (TsFileResource resource : tsFileResourcesSortedByDesc) { + TsFileDeviceIterator iterator = deviceIteratorMap.get(resource); + if (iterator == null || !iterator.current().equals(currentDevice)) { // if this tsfile has no more device or next device is not equals to the current device, // which means this tsfile does not contain the current device, then skip it. continue; @@ -363,23 +519,33 @@ public Map getCompactionSeriesContextOfCurrentD continue; } - TsFileSequenceReader reader = readerMap.get(resource); - for (Map.Entry, Pair>> entrySet : + CompactionTsFileReader reader = (CompactionTsFileReader) readerMap.get(resource); + + for (Map.Entry>> entrySet : reader - .getTimeseriesMetadataOffsetByDevice( + .getTimeseriesMetadataAndOffsetByDevice( deviceIteratorMap.get(resource).getFirstMeasurementNodeOfCurrentDevice(), timeseriesMetadataOffsetMap.keySet(), true) .entrySet()) { String measurementId = entrySet.getKey(); - if (!timeseriesMetadataOffsetMap.containsKey(measurementId)) { - MeasurementSchema schema = reader.getMeasurementSchema(entrySet.getValue().left); - timeseriesMetadataOffsetMap.put(measurementId, new Pair<>(schema, new HashMap<>())); + Pair>> existedPair = + timeseriesMetadataOffsetMap.get(measurementId); + if (existedPair == null) { + MeasurementSchema schema = null; + if (tsTable != null) { + TsTableColumnSchema columnSchema = tsTable.getColumnSchema(measurementId); + if (columnSchema != null) { + schema = (MeasurementSchema) columnSchema.getMeasurementSchema(); + } + } + if (schema == null) { + schema = reader.getMeasurementSchema(entrySet.getValue().left.getChunkMetadataList()); + } + existedPair = new Pair<>(schema, new HashMap<>()); + timeseriesMetadataOffsetMap.put(measurementId, existedPair); } - timeseriesMetadataOffsetMap - .get(measurementId) - .right - .put(resource, entrySet.getValue().right); + existedPair.right.put(resource, entrySet.getValue().right); } } return timeseriesMetadataOffsetMap; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedReadChunkAlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedReadChunkAlignedSeriesCompactionExecutor.java index dc03078756694..108c11fe4aec4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedReadChunkAlignedSeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedReadChunkAlignedSeriesCompactionExecutor.java @@ -72,6 +72,7 @@ public class BatchedReadChunkAlignedSeriesCompactionExecutor originReaderAndChunkMetadataList; public BatchedReadChunkAlignedSeriesCompactionExecutor( + String database, IDeviceID device, TsFileResource targetResource, LinkedList>> @@ -80,7 +81,14 @@ public BatchedReadChunkAlignedSeriesCompactionExecutor( CompactionTaskSummary summary, boolean ignoreAllNullRows) throws IOException { - super(device, targetResource, readerAndChunkMetadataList, writer, summary, ignoreAllNullRows); + super( + database, + device, + targetResource, + readerAndChunkMetadataList, + writer, + summary, + ignoreAllNullRows); this.originReaderAndChunkMetadataList = readerAndChunkMetadataList; this.batchColumnSelection = new AlignedSeriesBatchCompactionUtils.BatchColumnSelection(schemaList, batchSize); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java index 363554fb60672..49cbcb14582f3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PatternTreeMap; +import org.apache.iotdb.commons.utils.MetadataUtils; import org.apache.iotdb.db.exception.ChunkTypeInconsistentException; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary; @@ -165,9 +166,17 @@ void deserializeFileIntoChunkMetadataQueue(List fileElements) removeFile(fileElement); } } + boolean checked = false; for (int i = 0; i < iChunkMetadataList.size(); i++) { IChunkMetadata chunkMetadata = iChunkMetadataList.get(i); - if (dataType != null && chunkMetadata.getDataType() != dataType) { + if (!checked) { + if (!MetadataUtils.canAlter(chunkMetadata.getDataType(), dataType)) { + removeFile(fileElement); + break; + } + checked = true; + } + if (chunkMetadata.getDataType() != dataType) { chunkMetadata.setNewType(dataType); } // add into queue diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java index 654f8f770e74f..220e45dfb368b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java @@ -19,10 +19,14 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk; +import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema; import org.apache.iotdb.commons.utils.MetadataUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.AlignedSeriesBatchCompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.CompactionAlignedPageLazyLoadPointReader; @@ -34,6 +38,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.encoding.decoder.Decoder; import org.apache.tsfile.encrypt.EncryptUtils; @@ -45,6 +50,7 @@ import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.file.metadata.statistics.Statistics; import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.read.TsFileSequenceReader; @@ -86,6 +92,7 @@ public class ReadChunkAlignedSeriesCompactionExecutor { private boolean lastWriteTimestampSet = false; public ReadChunkAlignedSeriesCompactionExecutor( + String database, IDeviceID device, TsFileResource targetResource, LinkedList>> @@ -99,13 +106,17 @@ public ReadChunkAlignedSeriesCompactionExecutor( this.writer = writer; this.targetResource = targetResource; this.summary = summary; - collectValueColumnSchemaList(); + this.ignoreAllNullRows = ignoreAllNullRows; + if (device.isTableModel()) { + collectValueColumnSchemaListForTable(database); + } else { + collectValueColumnSchemaListForTree(); + } fillAlignedChunkMetadataToMatchSchemaList(); int compactionFileLevel = Integer.parseInt(this.targetResource.getTsFile().getName().split("-")[2]); flushController = new ReadChunkAlignedSeriesCompactionFlushController(compactionFileLevel); this.chunkWriter = constructAlignedChunkWriter(); - this.ignoreAllNullRows = ignoreAllNullRows; } // used for batched column compaction @@ -133,8 +144,66 @@ public ReadChunkAlignedSeriesCompactionExecutor( this.ignoreAllNullRows = ignoreAllNullRows; } - private void collectValueColumnSchemaList() throws IOException { + private void collectValueColumnSchemaListForTable(String database) throws IOException { + TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig(); + TsTable tsTable = DataNodeTableCache.getInstance().getTable(database, device.getTableName()); Map measurementSchemaMap = new HashMap<>(); + for (int i = this.readerAndChunkMetadataList.size() - 1; i >= 0; i--) { + Pair> pair = + this.readerAndChunkMetadataList.get(i); + CompactionTsFileReader reader = (CompactionTsFileReader) pair.getLeft(); + List alignedChunkMetadataList = pair.getRight(); + for (AbstractAlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList) { + if (alignedChunkMetadata == null) { + continue; + } + if (timeSchema == null) { + timeSchema = + new MeasurementSchema( + "", + alignedChunkMetadata.getTimeChunkMetadata().getDataType(), + TSEncoding.valueOf(tsFileConfig.getTimeEncoder()), + tsFileConfig.getCompressor()); + } + for (IChunkMetadata chunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) { + if (chunkMetadata == null + || measurementSchemaMap.containsKey(chunkMetadata.getMeasurementUid())) { + continue; + } + TsTableColumnSchema schemaInTsTable = + tsTable.getColumnSchema(chunkMetadata.getMeasurementUid()); + IMeasurementSchema measurementSchema; + if (schemaInTsTable == null) { + ChunkHeader chunkHeader = + reader.readChunkHeader(chunkMetadata.getOffsetOfChunkHeader()); + measurementSchema = + new MeasurementSchema( + chunkHeader.getMeasurementID(), + chunkHeader.getDataType(), + chunkHeader.getEncodingType(), + chunkHeader.getCompressionType()); + } else { + measurementSchema = + new MeasurementSchema( + chunkMetadata.getMeasurementUid(), + schemaInTsTable.getDataType(), + tsFileConfig.getValueEncoder(schemaInTsTable.getDataType()), + tsFileConfig.getCompressor(schemaInTsTable.getDataType())); + } + measurementSchemaMap.put(chunkMetadata.getMeasurementUid(), measurementSchema); + } + } + } + this.schemaList = + measurementSchemaMap.values().stream() + .sorted(Comparator.comparing(IMeasurementSchema::getMeasurementName)) + .collect(Collectors.toList()); + } + + private void collectValueColumnSchemaListForTree() throws IOException { + Map> measurementSchemaMap = new HashMap<>(); + List> measurementSchemasNeedToUpdate = new ArrayList<>(); + List measurementNamesNeedToUpdate = new ArrayList<>(); for (int i = this.readerAndChunkMetadataList.size() - 1; i >= 0; i--) { Pair> pair = this.readerAndChunkMetadataList.get(i); @@ -158,24 +227,50 @@ private void collectValueColumnSchemaList() throws IOException { } for (IChunkMetadata chunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) { - if (chunkMetadata == null - || measurementSchemaMap.containsKey(chunkMetadata.getMeasurementUid())) { + if (chunkMetadata == null) { continue; } - ChunkHeader chunkHeader = reader.readChunkHeader(chunkMetadata.getOffsetOfChunkHeader()); - IMeasurementSchema schema = - new MeasurementSchema( - chunkHeader.getMeasurementID(), - chunkHeader.getDataType(), - chunkHeader.getEncodingType(), - chunkHeader.getCompressionType()); - measurementSchemaMap.put(chunkMetadata.getMeasurementUid(), schema); + Pair measurementSchema = + measurementSchemaMap.get(chunkMetadata.getMeasurementUid()); + if (measurementSchema == null) { + ChunkHeader chunkHeader = + reader.readChunkHeader(chunkMetadata.getOffsetOfChunkHeader()); + IMeasurementSchema schema = + new MeasurementSchema( + chunkHeader.getMeasurementID(), + chunkHeader.getDataType(), + chunkHeader.getEncodingType(), + chunkHeader.getCompressionType()); + measurementSchemaMap.put(chunkMetadata.getMeasurementUid(), new Pair<>(schema, false)); + continue; + } + if (measurementSchema.getLeft().getType() != chunkMetadata.getDataType() + && !measurementSchema.getRight()) { + measurementSchema.setRight(true); + measurementSchemasNeedToUpdate.add(measurementSchema); + measurementNamesNeedToUpdate.add(chunkMetadata.getMeasurementUid()); + } + } + } + } + + if (ignoreAllNullRows) { + List latestMeasurementSchemas = + CompactionUtils.getLatestMeasurementSchemasForTreeModel( + device, measurementNamesNeedToUpdate); + for (int i = 0; i < measurementSchemasNeedToUpdate.size(); i++) { + IMeasurementSchema latestMeasurementSchema = latestMeasurementSchemas.get(i); + if (latestMeasurementSchema == null) { + continue; } + Pair pair = measurementSchemasNeedToUpdate.get(i); + pair.setLeft(latestMeasurementSchema); } } this.schemaList = measurementSchemaMap.values().stream() + .map(Pair::getLeft) .sorted(Comparator.comparing(IMeasurementSchema::getMeasurementName)) .collect(Collectors.toList()); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakeSchemaFetcherImpl.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakeSchemaFetcherImpl.java index 0aab351894575..e4d54a3f19e33 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakeSchemaFetcherImpl.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakeSchemaFetcherImpl.java @@ -44,7 +44,7 @@ public class FakeSchemaFetcherImpl implements ISchemaFetcher { - private final ClusterSchemaTree schemaTree = new ClusterSchemaTree(generateSchemaTree()); + protected final ClusterSchemaTree schemaTree = new ClusterSchemaTree(generateSchemaTree()); @Override public ClusterSchemaTree fetchSchema( @@ -106,7 +106,7 @@ public void fetchAndComputeSchemaWithAutoCreate( * * @return the root node of the generated schemaTree */ - private SchemaNode generateSchemaTree() { + protected SchemaNode generateSchemaTree() { SchemaNode root = new SchemaInternalNode("root"); SchemaNode sg = new SchemaInternalNode("sg"); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java index a5503bb9e645c..25d9419abdb67 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java @@ -33,6 +33,12 @@ import org.apache.iotdb.db.storageengine.buffer.BloomFilterCache; import org.apache.iotdb.db.storageengine.buffer.ChunkCache; import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerSeqCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerUnseqCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.MultiTsFileDeviceIterator; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.IDataBlockReader; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.SeriesDataBlockReader; @@ -871,4 +877,14 @@ protected List getPaths(List resources) } return new ArrayList<>(paths); } + + protected ICompactionPerformer getPerformer(String performerType) { + if (performerType.equalsIgnoreCase(InnerSeqCompactionPerformer.READ_CHUNK.toString())) { + return new ReadChunkCompactionPerformer(); + } else if (performerType.equalsIgnoreCase(InnerUnseqCompactionPerformer.FAST.toString())) { + return new FastCompactionPerformer(false); + } else { + return new ReadPointCompactionPerformer(); + } + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDataTypeAlterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDataTypeAlterTest.java deleted file mode 100644 index 23d509f3dfebc..0000000000000 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDataTypeAlterTest.java +++ /dev/null @@ -1,377 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.storageengine.dataregion.compaction; - -import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.exception.MetadataException; -import org.apache.iotdb.commons.path.MeasurementPath; -import org.apache.iotdb.db.exception.StorageEngineException; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; -import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; -import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; -import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils; - -import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.exception.write.WriteProcessException; -import org.apache.tsfile.file.metadata.IDeviceID; -import org.apache.tsfile.read.TsFileReader; -import org.apache.tsfile.read.TsFileSequenceReader; -import org.apache.tsfile.read.common.Path; -import org.apache.tsfile.read.expression.QueryExpression; -import org.apache.tsfile.read.query.dataset.QueryDataSet; -import org.apache.tsfile.write.TsFileWriter; -import org.apache.tsfile.write.record.TSRecord; -import org.apache.tsfile.write.record.datapoint.DoubleDataPoint; -import org.apache.tsfile.write.record.datapoint.IntDataPoint; -import org.apache.tsfile.write.schema.IMeasurementSchema; -import org.apache.tsfile.write.schema.MeasurementSchema; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -public class CompactionDataTypeAlterTest extends AbstractCompactionTest { - private final String oldThreadName = Thread.currentThread().getName(); - private final IDeviceID device = - IDeviceID.Factory.DEFAULT_FACTORY.create(COMPACTION_TEST_SG + ".d1"); - - @Before - public void setUp() - throws IOException, WriteProcessException, MetadataException, InterruptedException { - super.setUp(); - Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1"); - } - - @After - public void tearDown() throws IOException, StorageEngineException { - super.tearDown(); - Thread.currentThread().setName(oldThreadName); - } - - @Test - public void testCompactNonAlignedSeriesWithReadChunkCompactionPerformer() - throws IOException, WriteProcessException { - generateDataTypeNotMatchFilesWithNonAlignedSeries(); - InnerSpaceCompactionTask task = - new InnerSpaceCompactionTask( - 0, tsFileManager, seqResources, true, new ReadChunkCompactionPerformer(), 0); - Assert.assertTrue(task.start()); - TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0)); - Assert.assertEquals( - 1, ((long) tsFileManager.getTsFileList(true).get(0).getStartTime(device).get())); - Assert.assertEquals( - 2, ((long) tsFileManager.getTsFileList(true).get(0).getEndTime(device).get())); - TsFileResource tsFileResource = tsFileManager.getTsFileList(true).get(0); - try (TsFileSequenceReader reader = - new TsFileSequenceReader(tsFileResource.getTsFile().getAbsolutePath()); - TsFileReader readTsFile = new TsFileReader(reader)) { - ArrayList paths = new ArrayList<>(); - paths.add(new Path(device, "s1", true)); - QueryExpression queryExpression = QueryExpression.create(paths, null); - QueryDataSet queryDataSet = readTsFile.query(queryExpression); - Assert.assertTrue(queryDataSet.hasNext()); - Assert.assertEquals("1\t1.0", queryDataSet.next().toString()); - Assert.assertTrue(queryDataSet.hasNext()); - Assert.assertEquals("2\t2.0", queryDataSet.next().toString()); - Assert.assertFalse(queryDataSet.hasNext()); - } - } - - @Test - public void testCompactNonAlignedSeriesWithFastCompactionPerformer() - throws IOException, WriteProcessException { - generateDataTypeNotMatchFilesWithNonAlignedSeries(); - InnerSpaceCompactionTask task = - new InnerSpaceCompactionTask( - 0, tsFileManager, seqResources, true, new FastCompactionPerformer(false), 0); - Assert.assertTrue(task.start()); - TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0)); - Assert.assertEquals( - 1, ((long) tsFileManager.getTsFileList(true).get(0).getStartTime(device).get())); - Assert.assertEquals( - 2, ((long) tsFileManager.getTsFileList(true).get(0).getEndTime(device).get())); - TsFileResource tsFileResource = tsFileManager.getTsFileList(true).get(0); - try (TsFileSequenceReader reader = - new TsFileSequenceReader(tsFileResource.getTsFile().getAbsolutePath()); - TsFileReader readTsFile = new TsFileReader(reader)) { - ArrayList paths = new ArrayList<>(); - paths.add(new Path(device, "s1", true)); - QueryExpression queryExpression = QueryExpression.create(paths, null); - QueryDataSet queryDataSet = readTsFile.query(queryExpression); - Assert.assertTrue(queryDataSet.hasNext()); - Assert.assertEquals("1\t1.0", queryDataSet.next().toString()); - Assert.assertTrue(queryDataSet.hasNext()); - Assert.assertEquals("2\t2.0", queryDataSet.next().toString()); - Assert.assertFalse(queryDataSet.hasNext()); - } - } - - @Test - public void testCompactNonAlignedSeriesWithReadPointCompactionPerformer() - throws IOException, WriteProcessException { - generateDataTypeNotMatchFilesWithNonAlignedSeries(); - InnerSpaceCompactionTask task = - new InnerSpaceCompactionTask( - 0, tsFileManager, seqResources, true, new ReadPointCompactionPerformer(), 0); - Assert.assertTrue(task.start()); - TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0)); - Assert.assertEquals( - 1, ((long) tsFileManager.getTsFileList(true).get(0).getStartTime(device).get())); - Assert.assertEquals( - 2, ((long) tsFileManager.getTsFileList(true).get(0).getEndTime(device).get())); - TsFileResource tsFileResource = tsFileManager.getTsFileList(true).get(0); - try (TsFileSequenceReader reader = - new TsFileSequenceReader(tsFileResource.getTsFile().getAbsolutePath()); - TsFileReader readTsFile = new TsFileReader(reader)) { - ArrayList paths = new ArrayList<>(); - paths.add(new Path(device, "s1", true)); - QueryExpression queryExpression = QueryExpression.create(paths, null); - QueryDataSet queryDataSet = readTsFile.query(queryExpression); - Assert.assertTrue(queryDataSet.hasNext()); - Assert.assertEquals("1\t1.0", queryDataSet.next().toString()); - Assert.assertTrue(queryDataSet.hasNext()); - Assert.assertEquals("2\t2.0", queryDataSet.next().toString()); - Assert.assertFalse(queryDataSet.hasNext()); - } - } - - @Test - public void testCompactAlignedSeriesWithReadChunkCompactionPerformer() - throws IOException, WriteProcessException { - generateDataTypeNotMatchFilesWithAlignedSeries(); - InnerSpaceCompactionTask task = - new InnerSpaceCompactionTask( - 0, tsFileManager, seqResources, true, new ReadChunkCompactionPerformer(), 0); - Assert.assertTrue(task.start()); - TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0)); - Assert.assertEquals( - 1, ((long) tsFileManager.getTsFileList(true).get(0).getStartTime(device).get())); - Assert.assertEquals( - 2, ((long) tsFileManager.getTsFileList(true).get(0).getEndTime(device).get())); - TsFileResource tsFileResource = tsFileManager.getTsFileList(true).get(0); - try (TsFileSequenceReader reader = - new TsFileSequenceReader(tsFileResource.getTsFile().getAbsolutePath()); - TsFileReader readTsFile = new TsFileReader(reader)) { - ArrayList paths = new ArrayList<>(); - paths.add(new Path(device, "s1", true)); - paths.add(new Path(device, "s2", true)); - QueryExpression queryExpression = QueryExpression.create(paths, null); - QueryDataSet queryDataSet = readTsFile.query(queryExpression); - Assert.assertTrue(queryDataSet.hasNext()); - Assert.assertEquals("1\t0.0\t1.0", queryDataSet.next().toString()); - Assert.assertTrue(queryDataSet.hasNext()); - Assert.assertEquals("2\t2.0\t3.0", queryDataSet.next().toString()); - Assert.assertFalse(queryDataSet.hasNext()); - } - } - - @Test - public void testCompactAlignedSeriesWithFastCompactionPerformer() - throws IOException, WriteProcessException { - generateDataTypeNotMatchFilesWithAlignedSeries(); - InnerSpaceCompactionTask task = - new InnerSpaceCompactionTask( - 0, tsFileManager, seqResources, true, new FastCompactionPerformer(false), 0); - Assert.assertTrue(task.start()); - TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0)); - Assert.assertEquals( - 1, ((long) tsFileManager.getTsFileList(true).get(0).getStartTime(device).get())); - Assert.assertEquals( - 2, ((long) tsFileManager.getTsFileList(true).get(0).getEndTime(device).get())); - TsFileResource tsFileResource = tsFileManager.getTsFileList(true).get(0); - try (TsFileSequenceReader reader = - new TsFileSequenceReader(tsFileResource.getTsFile().getAbsolutePath()); - TsFileReader readTsFile = new TsFileReader(reader)) { - ArrayList paths = new ArrayList<>(); - paths.add(new Path(device, "s1", true)); - paths.add(new Path(device, "s2", true)); - QueryExpression queryExpression = QueryExpression.create(paths, null); - QueryDataSet queryDataSet = readTsFile.query(queryExpression); - Assert.assertTrue(queryDataSet.hasNext()); - Assert.assertEquals("1\t0.0\t1.0", queryDataSet.next().toString()); - Assert.assertTrue(queryDataSet.hasNext()); - Assert.assertEquals("2\t2.0\t3.0", queryDataSet.next().toString()); - Assert.assertFalse(queryDataSet.hasNext()); - } - } - - @Test - public void testCompactAlignedSeriesWithReadPointCompactionPerformer() - throws IOException, WriteProcessException { - generateDataTypeNotMatchFilesWithAlignedSeries(); - InnerSpaceCompactionTask task = - new InnerSpaceCompactionTask( - 0, tsFileManager, seqResources, true, new ReadPointCompactionPerformer(), 0); - Assert.assertTrue(task.start()); - TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0)); - Assert.assertEquals( - 1, ((long) tsFileManager.getTsFileList(true).get(0).getStartTime(device).get())); - Assert.assertEquals( - 2, ((long) tsFileManager.getTsFileList(true).get(0).getEndTime(device).get())); - TsFileResource tsFileResource = tsFileManager.getTsFileList(true).get(0); - try (TsFileSequenceReader reader = - new TsFileSequenceReader(tsFileResource.getTsFile().getAbsolutePath()); - TsFileReader readTsFile = new TsFileReader(reader)) { - ArrayList paths = new ArrayList<>(); - paths.add(new Path(device, "s1", true)); - paths.add(new Path(device, "s2", true)); - QueryExpression queryExpression = QueryExpression.create(paths, null); - QueryDataSet queryDataSet = readTsFile.query(queryExpression); - Assert.assertTrue(queryDataSet.hasNext()); - Assert.assertEquals("1\t0.0\t1.0", queryDataSet.next().toString()); - Assert.assertTrue(queryDataSet.hasNext()); - Assert.assertEquals("2\t2.0\t3.0", queryDataSet.next().toString()); - Assert.assertFalse(queryDataSet.hasNext()); - } - } - - private void generateDataTypeNotMatchFilesWithNonAlignedSeries() - throws IOException, WriteProcessException { - MeasurementSchema measurementSchema1 = new MeasurementSchema("s1", TSDataType.INT32); - TsFileResource resource1 = createEmptyFileAndResource(true); - resource1.setStatusForTest(TsFileResourceStatus.COMPACTING); - try (TsFileWriter writer = new TsFileWriter(resource1.getTsFile())) { - writer.registerTimeseries(new Path(device), measurementSchema1); - TSRecord record = new TSRecord(device, 1); - record.addTuple(new IntDataPoint("s1", 1)); - writer.writeRecord(record); - writer.flush(); - } - resource1.updateStartTime(device, 1); - resource1.updateEndTime(device, 1); - resource1.serialize(); - seqResources.add(resource1); - - MeasurementSchema measurementSchema2 = new MeasurementSchema("s1", TSDataType.DOUBLE); - TsFileResource resource2 = createEmptyFileAndResource(true); - resource2.setStatusForTest(TsFileResourceStatus.COMPACTING); - try (TsFileWriter writer = new TsFileWriter(resource2.getTsFile())) { - writer.registerTimeseries(new Path(device), measurementSchema2); - TSRecord record = new TSRecord(device, 2); - record.addTuple(new DoubleDataPoint("s1", 2.0)); - writer.writeRecord(record); - writer.flush(); - } - resource2.updateStartTime(device, 2); - resource2.updateEndTime(device, 2); - resource2.serialize(); - seqResources.add(resource2); - } - - private void generateDataTypeNotMatchFilesWithAlignedSeries() - throws IOException, WriteProcessException { - List measurementSchemas1 = new ArrayList<>(); - measurementSchemas1.add(new MeasurementSchema("s1", TSDataType.INT32)); - measurementSchemas1.add(new MeasurementSchema("s2", TSDataType.INT32)); - - TsFileResource resource1 = createEmptyFileAndResource(true); - resource1.setStatusForTest(TsFileResourceStatus.COMPACTING); - try (TsFileWriter writer = new TsFileWriter(resource1.getTsFile())) { - writer.registerAlignedTimeseries(new Path(device), measurementSchemas1); - TSRecord record = new TSRecord(device, 1); - record.addTuple(new IntDataPoint("s1", 0)); - record.addTuple(new IntDataPoint("s2", 1)); - writer.writeRecord(record); - writer.flush(); - } - resource1.updateStartTime(device, 1); - resource1.updateEndTime(device, 1); - resource1.serialize(); - seqResources.add(resource1); - - List measurementSchemas2 = new ArrayList<>(); - measurementSchemas2.add(new MeasurementSchema("s1", TSDataType.DOUBLE)); - measurementSchemas2.add(new MeasurementSchema("s2", TSDataType.DOUBLE)); - TsFileResource resource2 = createEmptyFileAndResource(true); - resource2.setStatusForTest(TsFileResourceStatus.COMPACTING); - try (TsFileWriter writer = new TsFileWriter(resource2.getTsFile())) { - writer.registerAlignedTimeseries(new Path(device), measurementSchemas2); - TSRecord record = new TSRecord(device, 2); - record.addTuple(new DoubleDataPoint("s1", 2.0)); - record.addTuple(new DoubleDataPoint("s2", 3.0)); - writer.writeRecord(record); - writer.flush(); - } - resource2.updateStartTime(device, 2); - resource2.updateEndTime(device, 2); - resource2.serialize(); - seqResources.add(resource2); - } - - @Test - public void testAlterDataTypeWithAlignedSeriesWithTimeDeletion() - throws IOException, WriteProcessException, IllegalPathException { - List measurementSchemas1 = new ArrayList<>(); - measurementSchemas1.add(new MeasurementSchema("s1", TSDataType.INT32)); - measurementSchemas1.add(new MeasurementSchema("s2", TSDataType.INT32)); - - TsFileResource resource1 = createEmptyFileAndResource(true); - resource1.setStatusForTest(TsFileResourceStatus.COMPACTING); - try (TsFileWriter writer = new TsFileWriter(resource1.getTsFile())) { - writer.registerAlignedTimeseries(new Path(device), measurementSchemas1); - for (int i = 0; i < 100; i++) { - TSRecord record = new TSRecord(device, i); - record.addTuple(new IntDataPoint("s1", 0)); - record.addTuple(new IntDataPoint("s2", 1)); - writer.writeRecord(record); - } - writer.flush(); - } - resource1.updateStartTime(device, 1); - resource1.updateEndTime(device, 1); - resource1.serialize(); - ModificationFile modFile = resource1.getExclusiveModFile(); - modFile.write(new TreeDeletionEntry(new MeasurementPath(device + ".*"), 0, 40)); - modFile.close(); - seqResources.add(resource1); - - List measurementSchemas2 = new ArrayList<>(); - measurementSchemas2.add(new MeasurementSchema("s1", TSDataType.DOUBLE)); - measurementSchemas2.add(new MeasurementSchema("s2", TSDataType.DOUBLE)); - TsFileResource resource2 = createEmptyFileAndResource(true); - resource2.setStatusForTest(TsFileResourceStatus.COMPACTING); - try (TsFileWriter writer = new TsFileWriter(resource2.getTsFile())) { - writer.registerAlignedTimeseries(new Path(device), measurementSchemas2); - TSRecord record = new TSRecord(device, 200); - record.addTuple(new DoubleDataPoint("s1", 2.0)); - record.addTuple(new DoubleDataPoint("s2", 3.0)); - writer.writeRecord(record); - writer.flush(); - } - resource2.updateStartTime(device, 2); - resource2.updateEndTime(device, 2); - resource2.serialize(); - seqResources.add(resource2); - - InnerSpaceCompactionTask task = - new InnerSpaceCompactionTask( - 0, tsFileManager, seqResources, true, new ReadChunkCompactionPerformer(), 0); - Assert.assertTrue(task.start()); - } -} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/AbstractCompactionAlterDataTypeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/AbstractCompactionAlterDataTypeTest.java new file mode 100644 index 0000000000000..d97b66b68b77f --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/AbstractCompactionAlterDataTypeTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.alterDataType; + +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; +import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionFakeSchemaFetcherImpl; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.common.Path; +import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.record.TSRecord; +import org.apache.tsfile.write.record.datapoint.DoubleDataPoint; +import org.apache.tsfile.write.record.datapoint.FloatDataPoint; +import org.apache.tsfile.write.record.datapoint.IntDataPoint; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class AbstractCompactionAlterDataTypeTest extends AbstractCompactionTest { + + protected final String oldThreadName = Thread.currentThread().getName(); + protected final IDeviceID device = + IDeviceID.Factory.DEFAULT_FACTORY.create(COMPACTION_TEST_SG + ".d1"); + + protected CompactionFakeSchemaFetcherImpl schemaFetcher; + + @Before + public void setUp() + throws IOException, WriteProcessException, MetadataException, InterruptedException { + super.setUp(); + Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1"); + this.schemaFetcher = new CompactionFakeSchemaFetcherImpl(); + schemaFetcher.getSchemaTree().setDatabases(Collections.singleton(COMPACTION_TEST_SG)); + CompactionUtils.setSchemaFetcher(schemaFetcher); + } + + @After + public void tearDown() throws IOException, StorageEngineException { + super.tearDown(); + Thread.currentThread().setName(oldThreadName); + } + + protected TsFileResource generateInt32AlignedSeriesFile(TimeRange timeRange, boolean seq) + throws IOException, WriteProcessException { + List measurementSchemas1 = new ArrayList<>(); + measurementSchemas1.add(new MeasurementSchema("s1", TSDataType.INT32)); + measurementSchemas1.add(new MeasurementSchema("s2", TSDataType.INT32)); + + TsFileResource resource = createEmptyFileAndResource(seq); + resource.setStatusForTest(TsFileResourceStatus.COMPACTING); + try (TsFileWriter writer = new TsFileWriter(resource.getTsFile())) { + writer.registerAlignedTimeseries(new Path(device), measurementSchemas1); + for (long i = timeRange.getMin(); i <= timeRange.getMax(); i++) { + TSRecord record = new TSRecord(device, i); + record.addTuple(new IntDataPoint("s1", (int) i)); + record.addTuple(new IntDataPoint("s2", (int) i)); + writer.writeRecord(record); + } + writer.flush(); + } + resource.updateStartTime(device, timeRange.getMin()); + resource.updateEndTime(device, timeRange.getMax()); + resource.serialize(); + return resource; + } + + protected TsFileResource generateDoubleAlignedSeriesFile(TimeRange timeRange, boolean seq) + throws IOException, WriteProcessException { + + List measurementSchemas2 = new ArrayList<>(); + measurementSchemas2.add(new MeasurementSchema("s1", TSDataType.DOUBLE)); + measurementSchemas2.add(new MeasurementSchema("s2", TSDataType.DOUBLE)); + TsFileResource resource = createEmptyFileAndResource(seq); + resource.setStatusForTest(TsFileResourceStatus.COMPACTING); + try (TsFileWriter writer = new TsFileWriter(resource.getTsFile())) { + writer.registerAlignedTimeseries(new Path(device), measurementSchemas2); + for (long i = timeRange.getMin(); i <= timeRange.getMax(); i++) { + TSRecord record = new TSRecord(device, i); + record.addTuple(new DoubleDataPoint("s1", (double) i)); + record.addTuple(new DoubleDataPoint("s2", (double) i)); + writer.writeRecord(record); + } + writer.flush(); + } + resource.updateStartTime(device, timeRange.getMin()); + resource.updateEndTime(device, timeRange.getMax()); + resource.serialize(); + return resource; + } + + protected TsFileResource generateInt32NonAlignedSeriesFile(TimeRange timeRange, boolean seq) + throws IOException, WriteProcessException { + MeasurementSchema measurementSchema = new MeasurementSchema("s1", TSDataType.INT32); + TsFileResource resource = createEmptyFileAndResource(seq); + resource.setStatusForTest(TsFileResourceStatus.COMPACTING); + try (TsFileWriter writer = new TsFileWriter(resource.getTsFile())) { + writer.registerTimeseries(new Path(device), measurementSchema); + for (long i = timeRange.getMin(); i <= timeRange.getMax(); i++) { + TSRecord record = new TSRecord(device, i); + record.addTuple(new IntDataPoint("s1", (int) i)); + writer.writeRecord(record); + } + writer.flush(); + } + resource.updateStartTime(device, timeRange.getMin()); + resource.updateEndTime(device, timeRange.getMax()); + resource.serialize(); + return resource; + } + + protected TsFileResource generateFloatNonAlignedSeriesFile(TimeRange timeRange, boolean seq) + throws IOException, WriteProcessException { + MeasurementSchema measurementSchema = new MeasurementSchema("s1", TSDataType.FLOAT); + TsFileResource resource = createEmptyFileAndResource(seq); + resource.setStatusForTest(TsFileResourceStatus.COMPACTING); + try (TsFileWriter writer = new TsFileWriter(resource.getTsFile())) { + writer.registerTimeseries(new Path(device), measurementSchema); + for (long i = timeRange.getMin(); i <= timeRange.getMax(); i++) { + TSRecord record = new TSRecord(device, i); + record.addTuple(new FloatDataPoint("s1", (float) i)); + writer.writeRecord(record); + } + writer.flush(); + } + resource.updateStartTime(device, timeRange.getMin()); + resource.updateEndTime(device, timeRange.getMax()); + resource.serialize(); + return resource; + } + + protected TsFileResource generateDoubleNonAlignedSeriesFile(TimeRange timeRange, boolean seq) + throws IOException, WriteProcessException { + MeasurementSchema measurementSchema2 = new MeasurementSchema("s1", TSDataType.DOUBLE); + TsFileResource resource = createEmptyFileAndResource(seq); + resource.setStatusForTest(TsFileResourceStatus.COMPACTING); + try (TsFileWriter writer = new TsFileWriter(resource.getTsFile())) { + writer.registerTimeseries(new Path(device), measurementSchema2); + for (long i = timeRange.getMin(); i <= timeRange.getMax(); i++) { + TSRecord record = new TSRecord(device, i); + record.addTuple(new DoubleDataPoint("s1", (double) i)); + writer.writeRecord(record); + } + writer.flush(); + } + resource.updateStartTime(device, timeRange.getMin()); + resource.updateEndTime(device, timeRange.getMax()); + resource.serialize(); + return resource; + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/CompactionDataTypeAlterTableTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/CompactionDataTypeAlterTableTest.java new file mode 100644 index 0000000000000..24551efa8d2cf --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/CompactionDataTypeAlterTableTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.alterDataType; + +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.commons.schema.table.column.FieldColumnSchema; +import org.apache.iotdb.commons.schema.table.column.TagColumnSchema; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; + +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.v4.ITsFileWriter; +import org.apache.tsfile.write.v4.TsFileWriterBuilder; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +@RunWith(Parameterized.class) +public class CompactionDataTypeAlterTableTest extends AbstractCompactionAlterDataTypeTest { + + private IDeviceID tableDevice = new StringArrayDeviceID("table1.d1"); + + private boolean reverse; + private String performerType; + + public CompactionDataTypeAlterTableTest(boolean reverse, String performerType) { + this.reverse = reverse; + this.performerType = performerType; + } + + @Parameterized.Parameters(name = "reverse={0} performerType={1}") + public static Collection data() { + return Arrays.asList( + new Object[][] { + {true, "read_chunk"}, + {false, "read_chunk"}, + {true, "fast"}, + {false, "fast"}, + {true, "read_point"}, + {false, "read_point"}, + }); + } + + @Before + @Override + public void setUp() + throws IOException, WriteProcessException, MetadataException, InterruptedException { + super.setUp(); + DataNodeTableCache.getInstance().invalid(COMPACTION_TEST_SG); + } + + @After + @Override + public void tearDown() throws IOException, StorageEngineException { + super.tearDown(); + DataNodeTableCache.getInstance().invalid(COMPACTION_TEST_SG); + } + + @Test + public void testAlter() throws IOException, WriteProcessException { + generateDataTypeNotMatchedFiles(true); + InnerSpaceCompactionTask task = + new InnerSpaceCompactionTask( + 0, tsFileManager, seqResources, true, getPerformer(performerType), 0); + Assert.assertTrue(task.start()); + TsFileResource target = tsFileManager.getTsFileList(true).get(0); + + TimeseriesMetadata timeseriesMetadata = getTimeseriesMetadata(target, tableDevice, "s1"); + Assert.assertEquals(1L, timeseriesMetadata.getStatistics().getStartTime()); + Assert.assertEquals(2L, timeseriesMetadata.getStatistics().getEndTime()); + } + + @Test + public void testCannotAlter() throws IOException, WriteProcessException { + generateDataTypeNotMatchedFiles(false); + InnerSpaceCompactionTask task = + new InnerSpaceCompactionTask( + 0, tsFileManager, seqResources, true, getPerformer(performerType), 0); + Assert.assertTrue(task.start()); + TsFileResource target = tsFileManager.getTsFileList(true).get(0); + TimeseriesMetadata timeseriesMetadata = getTimeseriesMetadata(target, tableDevice, "s1"); + if (!reverse) { + Assert.assertEquals(1L, timeseriesMetadata.getStatistics().getStartTime()); + Assert.assertEquals(1L, timeseriesMetadata.getStatistics().getEndTime()); + } else { + Assert.assertEquals(2L, timeseriesMetadata.getStatistics().getStartTime()); + Assert.assertEquals(2L, timeseriesMetadata.getStatistics().getEndTime()); + } + } + + private TimeseriesMetadata getTimeseriesMetadata( + TsFileResource resource, IDeviceID deviceID, String measurement) throws IOException { + try (TsFileSequenceReader reader = new TsFileSequenceReader(resource.getTsFilePath())) { + return reader.readTimeseriesMetadata(deviceID, measurement, true); + } + } + + private void generateDataTypeNotMatchedFiles(boolean canAlter) + throws IOException, WriteProcessException { + if (!reverse) { + TsFileResource resource1 = generateInt32TableFile(new TimeRange(1, 1), true); + seqResources.add(resource1); + TsFileResource resource2 = generateDoubleTableFile(new TimeRange(2, 2), true); + seqResources.add(resource2); + } else { + TsFileResource resource1 = generateDoubleTableFile(new TimeRange(1, 1), true); + seqResources.add(resource1); + TsFileResource resource2 = generateInt32TableFile(new TimeRange(2, 2), true); + seqResources.add(resource2); + } + if (canAlter) { + createTable(tableDevice.getTableName(), TSDataType.DOUBLE); + } else { + createTable(tableDevice.getTableName(), TSDataType.INT32); + } + } + + private void createTable(String tableName, TSDataType dataType) { + TsTable tsTable = new TsTable(tableName); + tsTable.addColumnSchema(new TagColumnSchema("id_column", TSDataType.STRING)); + tsTable.addColumnSchema( + new FieldColumnSchema("s1", dataType, TSEncoding.PLAIN, CompressionType.LZ4)); + DataNodeTableCache.getInstance().preUpdateTable(this.COMPACTION_TEST_SG, tsTable, null); + DataNodeTableCache.getInstance().commitUpdateTable(this.COMPACTION_TEST_SG, tableName, null); + } + + private TsFileResource generateInt32TableFile(TimeRange timeRange, boolean seq) + throws IOException, WriteProcessException { + TsFileResource resource = createEmptyFileAndResource(seq); + resource.setStatusForTest(TsFileResourceStatus.COMPACTING); + TableSchema tableSchema = + new TableSchema( + tableDevice.getTableName(), + Arrays.asList("tag1", "s1"), + Arrays.asList(TSDataType.STRING, TSDataType.INT32), + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD)); + try (ITsFileWriter writer = + new TsFileWriterBuilder().file(resource.getTsFile()).tableSchema(tableSchema).build()) { + Tablet tablet = + new Tablet( + "table1", + Arrays.asList("tag1", "s1"), + Arrays.asList(TSDataType.STRING, TSDataType.INT32), + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD)); + for (long i = timeRange.getMin(); i <= timeRange.getMax(); i++) { + if (tablet.getRowSize() == tablet.getMaxRowNumber()) { + writer.write(tablet); + tablet.reset(); + } + int row = tablet.getRowSize(); + tablet.addTimestamp(row, i); + tablet.addValue(row, 0, "d1"); + tablet.addValue(row, 1, 1); + } + if (tablet.getRowSize() > 0) { + writer.write(tablet); + } + } + resource.updateStartTime(tableDevice, timeRange.getMin()); + resource.updateEndTime(tableDevice, timeRange.getMax()); + resource.serialize(); + return resource; + } + + private TsFileResource generateDoubleTableFile(TimeRange timeRange, boolean seq) + throws IOException, WriteProcessException { + TsFileResource resource = createEmptyFileAndResource(seq); + resource.setStatusForTest(TsFileResourceStatus.COMPACTING); + TableSchema tableSchema = + new TableSchema( + tableDevice.getTableName(), + Arrays.asList("tag1", "s1"), + Arrays.asList(TSDataType.STRING, TSDataType.DOUBLE), + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD)); + try (ITsFileWriter writer = + new TsFileWriterBuilder().file(resource.getTsFile()).tableSchema(tableSchema).build()) { + Tablet tablet = + new Tablet( + "table1", + Arrays.asList("tag1", "s1"), + Arrays.asList(TSDataType.STRING, TSDataType.DOUBLE), + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD)); + for (long i = timeRange.getMin(); i <= timeRange.getMax(); i++) { + if (tablet.getRowSize() == tablet.getMaxRowNumber()) { + writer.write(tablet); + tablet.reset(); + } + int row = tablet.getRowSize(); + tablet.addTimestamp(row, i); + tablet.addValue(row, 0, "d1"); + tablet.addValue(row, 1, (double) 1); + } + if (tablet.getRowSize() > 0) { + writer.write(tablet); + } + } + resource.updateStartTime(tableDevice, timeRange.getMin()); + resource.updateEndTime(tableDevice, timeRange.getMax()); + resource.serialize(); + return resource; + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/CompactionDataTypeAlterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/CompactionDataTypeAlterTest.java new file mode 100644 index 0000000000000..d20e0307a329e --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/CompactionDataTypeAlterTest.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.alterDataType; + +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; +import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.read.TsFileReader; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.Path; +import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.read.expression.QueryExpression; +import org.apache.tsfile.read.query.dataset.QueryDataSet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +@RunWith(Parameterized.class) +public class CompactionDataTypeAlterTest extends AbstractCompactionAlterDataTypeTest { + private boolean reverse; + private String performerType; + + public CompactionDataTypeAlterTest(boolean reverse, String performerType) { + this.reverse = reverse; + this.performerType = performerType; + } + + @Parameterized.Parameters(name = "reverse={0} performerType={1}") + public static Collection data() { + return Arrays.asList( + new Object[][] { + {true, "read_chunk"}, + {false, "read_chunk"}, + {true, "fast"}, + {false, "fast"}, + {true, "read_point"}, + {false, "read_point"}, + }); + } + + @Before + public void setUp() + throws IOException, WriteProcessException, MetadataException, InterruptedException { + super.setUp(); + } + + @After + public void tearDown() throws IOException, StorageEngineException { + super.tearDown(); + } + + @Test + public void testCompactNonAlignedSeries() + throws IOException, WriteProcessException, IllegalPathException { + generateDataTypeNotMatchFilesWithNonAlignedSeries(); + InnerSpaceCompactionTask task = + new InnerSpaceCompactionTask( + 0, tsFileManager, seqResources, true, getPerformer(performerType), 0); + Assert.assertTrue(task.start()); + TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0)); + Assert.assertEquals( + 1, ((long) tsFileManager.getTsFileList(true).get(0).getStartTime(device).get())); + Assert.assertEquals( + 2, ((long) tsFileManager.getTsFileList(true).get(0).getEndTime(device).get())); + TsFileResource tsFileResource = tsFileManager.getTsFileList(true).get(0); + try (TsFileSequenceReader reader = + new TsFileSequenceReader(tsFileResource.getTsFile().getAbsolutePath()); + TsFileReader readTsFile = new TsFileReader(reader)) { + ArrayList paths = new ArrayList<>(); + paths.add(new Path(device, "s1", true)); + QueryExpression queryExpression = QueryExpression.create(paths, null); + QueryDataSet queryDataSet = readTsFile.query(queryExpression); + Assert.assertTrue(queryDataSet.hasNext()); + Assert.assertEquals("1\t1.0", queryDataSet.next().toString()); + Assert.assertTrue(queryDataSet.hasNext()); + Assert.assertEquals("2\t2.0", queryDataSet.next().toString()); + Assert.assertFalse(queryDataSet.hasNext()); + } + } + + @Test + public void testCompactAlignedSeries() + throws IOException, WriteProcessException, IllegalPathException { + generateDataTypeNotMatchFilesWithAlignedSeries(); + InnerSpaceCompactionTask task = + new InnerSpaceCompactionTask( + 0, tsFileManager, seqResources, true, getPerformer(performerType), 0); + Assert.assertTrue(task.start()); + TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0)); + Assert.assertEquals( + 1, ((long) tsFileManager.getTsFileList(true).get(0).getStartTime(device).get())); + Assert.assertEquals( + 2, ((long) tsFileManager.getTsFileList(true).get(0).getEndTime(device).get())); + TsFileResource tsFileResource = tsFileManager.getTsFileList(true).get(0); + try (TsFileSequenceReader reader = + new TsFileSequenceReader(tsFileResource.getTsFile().getAbsolutePath()); + TsFileReader readTsFile = new TsFileReader(reader)) { + ArrayList paths = new ArrayList<>(); + paths.add(new Path(device, "s1", true)); + paths.add(new Path(device, "s2", true)); + QueryExpression queryExpression = QueryExpression.create(paths, null); + QueryDataSet queryDataSet = readTsFile.query(queryExpression); + Assert.assertTrue(queryDataSet.hasNext()); + Assert.assertEquals("1\t1.0\t1.0", queryDataSet.next().toString()); + Assert.assertTrue(queryDataSet.hasNext()); + Assert.assertEquals("2\t2.0\t2.0", queryDataSet.next().toString()); + Assert.assertFalse(queryDataSet.hasNext()); + } + } + + private void generateDataTypeNotMatchFilesWithNonAlignedSeries() + throws IOException, WriteProcessException, IllegalPathException { + if (!reverse) { + TsFileResource resource1 = generateInt32NonAlignedSeriesFile(new TimeRange(1, 1), true); + seqResources.add(resource1); + + TsFileResource resource2 = generateDoubleNonAlignedSeriesFile(new TimeRange(2, 2), true); + seqResources.add(resource2); + } else { + TsFileResource resource1 = generateDoubleNonAlignedSeriesFile(new TimeRange(1, 1), true); + seqResources.add(resource1); + + TsFileResource resource2 = generateInt32NonAlignedSeriesFile(new TimeRange(2, 2), true); + seqResources.add(resource2); + } + + schemaFetcher + .getSchemaTree() + .appendSingleMeasurementPath( + new MeasurementPath(device, "s1", new MeasurementSchema("s1", TSDataType.DOUBLE))); + } + + private void generateDataTypeNotMatchFilesWithAlignedSeries() + throws IOException, WriteProcessException, IllegalPathException { + if (!reverse) { + TsFileResource resource1 = generateInt32AlignedSeriesFile(new TimeRange(1, 1), true); + seqResources.add(resource1); + TsFileResource resource2 = generateDoubleAlignedSeriesFile(new TimeRange(2, 2), true); + seqResources.add(resource2); + } else { + TsFileResource resource1 = generateDoubleAlignedSeriesFile(new TimeRange(1, 1), true); + seqResources.add(resource1); + TsFileResource resource2 = generateInt32AlignedSeriesFile(new TimeRange(2, 2), true); + seqResources.add(resource2); + } + + List measurementSchemas2 = new ArrayList<>(); + measurementSchemas2.add(new MeasurementSchema("s1", TSDataType.DOUBLE)); + measurementSchemas2.add(new MeasurementSchema("s2", TSDataType.DOUBLE)); + + MeasurementPath s1Path = new MeasurementPath(device, "s1", measurementSchemas2.get(0)); + s1Path.setUnderAlignedEntity(true); + MeasurementPath s2Path = new MeasurementPath(device, "s2", measurementSchemas2.get(1)); + s2Path.setUnderAlignedEntity(true); + schemaFetcher.getSchemaTree().appendSingleMeasurementPath(s1Path); + schemaFetcher.getSchemaTree().appendSingleMeasurementPath(s2Path); + } + + @Test + public void testAlterDataTypeWithAlignedSeriesWithTimeDeletion() + throws IOException, WriteProcessException, IllegalPathException { + List measurementSchemas1 = new ArrayList<>(); + measurementSchemas1.add(new MeasurementSchema("s1", TSDataType.INT32)); + measurementSchemas1.add(new MeasurementSchema("s2", TSDataType.INT32)); + + TsFileResource resource1 = generateInt32AlignedSeriesFile(new TimeRange(0, 100), true); + ModificationFile modFile = resource1.getExclusiveModFile(); + modFile.write(new TreeDeletionEntry(new MeasurementPath(device + ".*"), 0, 40)); + modFile.close(); + seqResources.add(resource1); + + List measurementSchemas2 = new ArrayList<>(); + measurementSchemas2.add(new MeasurementSchema("s1", TSDataType.DOUBLE)); + measurementSchemas2.add(new MeasurementSchema("s2", TSDataType.DOUBLE)); + TsFileResource resource2 = generateDoubleAlignedSeriesFile(new TimeRange(200, 200), true); + seqResources.add(resource2); + + MeasurementPath s1Path = new MeasurementPath(device, "s1", measurementSchemas2.get(0)); + s1Path.setUnderAlignedEntity(true); + MeasurementPath s2Path = new MeasurementPath(device, "s2", measurementSchemas2.get(1)); + s2Path.setUnderAlignedEntity(true); + schemaFetcher.getSchemaTree().appendSingleMeasurementPath(s1Path); + schemaFetcher.getSchemaTree().appendSingleMeasurementPath(s2Path); + + InnerSpaceCompactionTask task = + new InnerSpaceCompactionTask( + 0, tsFileManager, seqResources, true, new ReadChunkCompactionPerformer(), 0); + Assert.assertTrue(task.start()); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDataTypeNotMatchAlterableDataTypeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/CompactionDataTypeNotMatchAlterableDataTypeTest.java similarity index 77% rename from iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDataTypeNotMatchAlterableDataTypeTest.java rename to iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/CompactionDataTypeNotMatchAlterableDataTypeTest.java index 66d3c97e6c652..c7782a411c0a3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDataTypeNotMatchAlterableDataTypeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/CompactionDataTypeNotMatchAlterableDataTypeTest.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.storageengine.dataregion.compaction; +package org.apache.iotdb.db.storageengine.dataregion.compaction.alterDataType; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.db.exception.StorageEngineException; @@ -31,13 +31,12 @@ import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.exception.write.WriteProcessException; -import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.common.Path; +import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.write.TsFileWriter; import org.apache.tsfile.write.record.TSRecord; import org.apache.tsfile.write.record.datapoint.DoubleDataPoint; import org.apache.tsfile.write.record.datapoint.FloatDataPoint; -import org.apache.tsfile.write.record.datapoint.IntDataPoint; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.After; @@ -50,22 +49,18 @@ import java.util.List; @SuppressWarnings("OptionalGetWithoutIsPresent") -public class CompactionDataTypeNotMatchAlterableDataTypeTest extends AbstractCompactionTest { - private final String oldThreadName = Thread.currentThread().getName(); - private final IDeviceID device = - IDeviceID.Factory.DEFAULT_FACTORY.create(COMPACTION_TEST_SG + ".d1"); +public class CompactionDataTypeNotMatchAlterableDataTypeTest + extends AbstractCompactionAlterDataTypeTest { @Before public void setUp() throws IOException, WriteProcessException, MetadataException, InterruptedException { super.setUp(); - Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1"); } @After public void tearDown() throws IOException, StorageEngineException { super.tearDown(); - Thread.currentThread().setName(oldThreadName); } @Test @@ -149,33 +144,11 @@ public void testCompactAlignedSeriesWithReadPointCompactionPerformer() private void generateDataTypeNotMatchFilesWithNonAlignedSeries() throws IOException, WriteProcessException { MeasurementSchema measurementSchema1 = new MeasurementSchema("s1", TSDataType.INT32); - TsFileResource resource1 = createEmptyFileAndResource(true); - resource1.setStatusForTest(TsFileResourceStatus.COMPACTING); - try (TsFileWriter writer = new TsFileWriter(resource1.getTsFile())) { - writer.registerTimeseries(new Path(device), measurementSchema1); - TSRecord record = new TSRecord(device, 1); - record.addTuple(new IntDataPoint("s1", 1)); - writer.writeRecord(record); - writer.flush(); - } - resource1.updateStartTime(device, 1); - resource1.updateEndTime(device, 1); - resource1.serialize(); + TsFileResource resource1 = generateInt32NonAlignedSeriesFile(new TimeRange(1, 1), true); seqResources.add(resource1); MeasurementSchema measurementSchema2 = new MeasurementSchema("s1", TSDataType.FLOAT); - TsFileResource resource2 = createEmptyFileAndResource(true); - resource2.setStatusForTest(TsFileResourceStatus.COMPACTING); - try (TsFileWriter writer = new TsFileWriter(resource2.getTsFile())) { - writer.registerTimeseries(new Path(device), measurementSchema2); - TSRecord record = new TSRecord(device, 2); - record.addTuple(new FloatDataPoint("s1", 2)); - writer.writeRecord(record); - writer.flush(); - } - resource2.updateStartTime(device, 2); - resource2.updateEndTime(device, 2); - resource2.serialize(); + TsFileResource resource2 = generateFloatNonAlignedSeriesFile(new TimeRange(2, 2), true); seqResources.add(resource2); } @@ -185,19 +158,7 @@ private void generateDataTypeNotMatchFilesWithAlignedSeries() measurementSchemas1.add(new MeasurementSchema("s1", TSDataType.INT32)); measurementSchemas1.add(new MeasurementSchema("s2", TSDataType.INT32)); - TsFileResource resource1 = createEmptyFileAndResource(true); - resource1.setStatusForTest(TsFileResourceStatus.COMPACTING); - try (TsFileWriter writer = new TsFileWriter(resource1.getTsFile())) { - writer.registerAlignedTimeseries(new Path(device), measurementSchemas1); - TSRecord record = new TSRecord(device, 1); - record.addTuple(new IntDataPoint("s1", 0)); - record.addTuple(new IntDataPoint("s2", 1)); - writer.writeRecord(record); - writer.flush(); - } - resource1.updateStartTime(device, 1); - resource1.updateEndTime(device, 1); - resource1.serialize(); + TsFileResource resource1 = generateInt32AlignedSeriesFile(new TimeRange(1, 1), true); seqResources.add(resource1); List measurementSchemas2 = new ArrayList<>(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDataTypeNotMatchTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/CompactionDataTypeNotMatchTest.java similarity index 57% rename from iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDataTypeNotMatchTest.java rename to iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/CompactionDataTypeNotMatchTest.java index ff12a13c6dac2..3b4dcacb8fa7d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDataTypeNotMatchTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/CompactionDataTypeNotMatchTest.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.storageengine.dataregion.compaction; +package org.apache.iotdb.db.storageengine.dataregion.compaction.alterDataType; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.db.exception.StorageEngineException; @@ -33,73 +33,62 @@ import org.apache.tsfile.exception.write.WriteProcessException; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.common.Path; +import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.write.TsFileWriter; import org.apache.tsfile.write.record.TSRecord; import org.apache.tsfile.write.record.datapoint.BooleanDataPoint; -import org.apache.tsfile.write.record.datapoint.IntDataPoint; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; @SuppressWarnings("OptionalGetWithoutIsPresent") -public class CompactionDataTypeNotMatchTest extends AbstractCompactionTest { - private final String oldThreadName = Thread.currentThread().getName(); +@RunWith(Parameterized.class) +public class CompactionDataTypeNotMatchTest extends AbstractCompactionAlterDataTypeTest { private final IDeviceID device = IDeviceID.Factory.DEFAULT_FACTORY.create(COMPACTION_TEST_SG + ".d1"); + private String performerType; @Before public void setUp() throws IOException, WriteProcessException, MetadataException, InterruptedException { super.setUp(); - Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1"); } @After public void tearDown() throws IOException, StorageEngineException { super.tearDown(); - Thread.currentThread().setName(oldThreadName); } - @Test - public void testCompactNonAlignedSeriesWithReadChunkCompactionPerformer() - throws IOException, WriteProcessException { - generateDataTypeNotMatchFilesWithNonAlignedSeries(); - InnerSpaceCompactionTask task = - new InnerSpaceCompactionTask( - 0, tsFileManager, seqResources, true, new ReadChunkCompactionPerformer(), 0); - Assert.assertTrue(task.start()); - TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0)); - Assert.assertEquals( - 2, ((long) tsFileManager.getTsFileList(true).get(0).getStartTime(device).get())); + @Parameterized.Parameters(name = "type={0}") + public static Collection data() { + return Arrays.asList( + new Object[][] { + {"read_chunk"}, {"fast"}, {"read_point"}, + }); } - @Test - public void testCompactNonAlignedSeriesWithFastCompactionPerformer() - throws IOException, WriteProcessException { - generateDataTypeNotMatchFilesWithNonAlignedSeries(); - InnerSpaceCompactionTask task = - new InnerSpaceCompactionTask( - 0, tsFileManager, seqResources, true, new FastCompactionPerformer(false), 0); - Assert.assertTrue(task.start()); - TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0)); - Assert.assertEquals( - 2, ((long) tsFileManager.getTsFileList(true).get(0).getStartTime(device).get())); + public CompactionDataTypeNotMatchTest(String type) { + this.performerType = type; } @Test - public void testCompactNonAlignedSeriesWithReadPointCompactionPerformer() + public void testCompactNonAlignedSeries() throws IOException, WriteProcessException { generateDataTypeNotMatchFilesWithNonAlignedSeries(); InnerSpaceCompactionTask task = new InnerSpaceCompactionTask( - 0, tsFileManager, seqResources, true, new ReadPointCompactionPerformer(), 0); + 0, tsFileManager, seqResources, true, getPerformer(performerType), 0); Assert.assertTrue(task.start()); TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0)); Assert.assertEquals( @@ -107,43 +96,18 @@ public void testCompactNonAlignedSeriesWithReadPointCompactionPerformer() } @Test - public void testCompactAlignedSeriesWithReadChunkCompactionPerformer() + public void testCompactAlignedSeries() throws IOException, WriteProcessException { generateDataTypeNotMatchFilesWithAlignedSeries(); InnerSpaceCompactionTask task = new InnerSpaceCompactionTask( - 0, tsFileManager, seqResources, true, new ReadChunkCompactionPerformer(), 0); + 0, tsFileManager, seqResources, true, getPerformer(performerType), 0); Assert.assertTrue(task.start()); TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0)); Assert.assertEquals( 2, ((long) tsFileManager.getTsFileList(true).get(0).getStartTime(device).get())); } - @Test - public void testCompactAlignedSeriesWithFastCompactionPerformer() - throws IOException, WriteProcessException { - generateDataTypeNotMatchFilesWithAlignedSeries(); - InnerSpaceCompactionTask task = - new InnerSpaceCompactionTask( - 0, tsFileManager, seqResources, true, new FastCompactionPerformer(false), 0); - Assert.assertTrue(task.start()); - TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0)); - Assert.assertEquals( - 2, ((long) tsFileManager.getTsFileList(true).get(0).getStartTime(device).get())); - } - - @Test - public void testCompactAlignedSeriesWithReadPointCompactionPerformer() - throws IOException, WriteProcessException { - generateDataTypeNotMatchFilesWithAlignedSeries(); - InnerSpaceCompactionTask task = - new InnerSpaceCompactionTask( - 0, tsFileManager, seqResources, true, new ReadPointCompactionPerformer(), 0); - Assert.assertTrue(task.start()); - TsFileResourceUtils.validateTsFileDataCorrectness(tsFileManager.getTsFileList(true).get(0)); - Assert.assertEquals( - 2, ((long) tsFileManager.getTsFileList(true).get(0).getStartTime(device).get())); - } private void generateDataTypeNotMatchFilesWithNonAlignedSeries() throws IOException, WriteProcessException { @@ -163,18 +127,7 @@ private void generateDataTypeNotMatchFilesWithNonAlignedSeries() seqResources.add(resource1); MeasurementSchema measurementSchema2 = new MeasurementSchema("s1", TSDataType.INT32); - TsFileResource resource2 = createEmptyFileAndResource(true); - resource2.setStatusForTest(TsFileResourceStatus.COMPACTING); - try (TsFileWriter writer = new TsFileWriter(resource2.getTsFile())) { - writer.registerTimeseries(new Path(device), measurementSchema2); - TSRecord record = new TSRecord(device, 2); - record.addTuple(new IntDataPoint("s1", 10)); - writer.writeRecord(record); - writer.flush(); - } - resource2.updateStartTime(device, 2); - resource2.updateEndTime(device, 2); - resource2.serialize(); + TsFileResource resource2 = generateInt32NonAlignedSeriesFile(new TimeRange(2, 2), true); seqResources.add(resource2); } @@ -184,19 +137,7 @@ private void generateDataTypeNotMatchFilesWithAlignedSeries() measurementSchemas1.add(new MeasurementSchema("s1", TSDataType.INT32)); measurementSchemas1.add(new MeasurementSchema("s2", TSDataType.INT32)); - TsFileResource resource1 = createEmptyFileAndResource(true); - resource1.setStatusForTest(TsFileResourceStatus.COMPACTING); - try (TsFileWriter writer = new TsFileWriter(resource1.getTsFile())) { - writer.registerAlignedTimeseries(new Path(device), measurementSchemas1); - TSRecord record = new TSRecord(device, 1); - record.addTuple(new IntDataPoint("s1", 0)); - record.addTuple(new IntDataPoint("s2", 1)); - writer.writeRecord(record); - writer.flush(); - } - resource1.updateStartTime(device, 1); - resource1.updateEndTime(device, 1); - resource1.serialize(); + TsFileResource resource1 = generateInt32AlignedSeriesFile(new TimeRange(1, 1), true); seqResources.add(resource1); List measurementSchemas2 = new ArrayList<>(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelCompactionWithTTLTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelCompactionWithTTLTest.java index ef9e2fb796e21..6746f0a274697 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelCompactionWithTTLTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelCompactionWithTTLTest.java @@ -26,12 +26,6 @@ import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerSeqCompactionPerformer; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerUnseqCompactionPerformer; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -86,16 +80,6 @@ public static Collection data() { }); } - public ICompactionPerformer getPerformer() { - if (performerType.equalsIgnoreCase(InnerSeqCompactionPerformer.READ_CHUNK.toString())) { - return new ReadChunkCompactionPerformer(); - } else if (performerType.equalsIgnoreCase(InnerUnseqCompactionPerformer.FAST.toString())) { - return new FastCompactionPerformer(false); - } else { - return new ReadPointCompactionPerformer(); - } - } - @Test public void testAllDataExpired() throws IOException { createTable("t1", 1); @@ -115,7 +99,8 @@ public void testAllDataExpired() throws IOException { } seqResources.add(resource1); InnerSpaceCompactionTask task = - new InnerSpaceCompactionTask(0, tsFileManager, seqResources, true, getPerformer(), 0); + new InnerSpaceCompactionTask( + 0, tsFileManager, seqResources, true, getPerformer(performerType), 0); Assert.assertTrue(task.start()); Assert.assertEquals(0, tsFileManager.getTsFileList(true).size()); } @@ -143,7 +128,8 @@ public void testPartialDataExpired() throws IOException { } seqResources.add(resource1); InnerSpaceCompactionTask task = - new InnerSpaceCompactionTask(0, tsFileManager, seqResources, true, getPerformer(), 0); + new InnerSpaceCompactionTask( + 0, tsFileManager, seqResources, true, getPerformer(performerType), 0); Assert.assertTrue(task.start()); TsFileResource target = tsFileManager.getTsFileList(true).get(0); Assert.assertTrue(target.getFileStartTime() > startTime && target.getFileEndTime() == endTime); @@ -171,7 +157,8 @@ public void testTableNotExist() throws IOException { } seqResources.add(resource1); InnerSpaceCompactionTask task = - new InnerSpaceCompactionTask(0, tsFileManager, seqResources, true, getPerformer(), 0); + new InnerSpaceCompactionTask( + 0, tsFileManager, seqResources, true, getPerformer(performerType), 0); Assert.assertTrue(task.start()); TsFileResource target = tsFileManager.getTsFileList(true).get(0); Assert.assertTrue(target.getFileStartTime() == startTime && target.getFileEndTime() == endTime); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionFakeSchemaFetcherImpl.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionFakeSchemaFetcherImpl.java new file mode 100644 index 0000000000000..173c1519897f2 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionFakeSchemaFetcherImpl.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.utils; + +import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree; +import org.apache.iotdb.db.queryengine.common.schematree.node.SchemaInternalNode; +import org.apache.iotdb.db.queryengine.common.schematree.node.SchemaNode; +import org.apache.iotdb.db.queryengine.plan.analyze.FakeSchemaFetcherImpl; + +public class CompactionFakeSchemaFetcherImpl extends FakeSchemaFetcherImpl { + + @Override + protected SchemaNode generateSchemaTree() { + return new SchemaInternalNode("root"); + } + + public ClusterSchemaTree getSchemaTree() { + return schemaTree; + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java index 8f484f4e231d5..48cd81b39e142 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java @@ -70,7 +70,6 @@ public class TsTable { private final Map columnSchemaMap = new LinkedHashMap<>(); private final Map tagColumnIndexMap = new HashMap<>(); - private final Map idColumnIndexMap = new HashMap<>(); private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); @@ -108,7 +107,7 @@ public TsTable(String tableName, ImmutableList columnSchema public TsTable(TsTable origin) { this.tableName = origin.tableName; origin.columnSchemaMap.forEach((col, schema) -> this.columnSchemaMap.put(col, schema.copy())); - this.idColumnIndexMap.putAll(origin.idColumnIndexMap); + this.tagColumnIndexMap.putAll(origin.tagColumnIndexMap); this.props = origin.props == null ? null : new HashMap<>(origin.props); this.ttlValue = origin.ttlValue; this.tagNums = origin.tagNums;