Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@

import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.encrypt.EncryptParameter;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.StopReadTsFileByInterruptException;
import org.apache.tsfile.exception.write.PageException;
import org.apache.tsfile.file.metadata.IDeviceID;
Expand Down Expand Up @@ -281,21 +280,9 @@ private void compactNonAlignedSeries(
// timeseries metadata, in order to facilitate the reading of chunkMetadata directly by this
// offset later. Here we don't need to deserialize chunk metadata, we can deserialize them and
// get their schema later.
Map<String, Map<TsFileResource, Pair<Long, Long>>> timeseriesMetadataOffsetMap =
new LinkedHashMap<>();

Map<String, TSDataType> measurementDataTypeMap = new LinkedHashMap<>();

Map<String, CompactionSeriesContext> compactionSeriesContextMap =
deviceIterator.getCompactionSeriesContextOfCurrentDevice();

for (Map.Entry<String, CompactionSeriesContext> entry : compactionSeriesContextMap.entrySet()) {
timeseriesMetadataOffsetMap.put(
entry.getKey(), entry.getValue().getFileTimeseriesMetdataOffsetMap());
measurementDataTypeMap.put(entry.getKey(), entry.getValue().getFinalType());
}

List<String> allMeasurements = new ArrayList<>(timeseriesMetadataOffsetMap.keySet());
List<String> allMeasurements = new ArrayList<>(compactionSeriesContextMap.keySet());
allMeasurements.sort((String::compareTo));

int subTaskNums = Math.min(allMeasurements.size(), SUB_TASK_NUM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ISeqCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionTableSchemaCollector;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.MultiTsFileDeviceIterator;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.BatchedReadChunkAlignedSeriesCompactionExecutor;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.SingleSeriesCompactionExecutor;
Expand All @@ -48,10 +49,12 @@
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.Schema;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -147,7 +150,11 @@ public void perform()

if (aligned) {
compactAlignedSeries(
device, targetResources.get(currentTargetFileIndex), currentWriter, deviceIterator);
deviceIterator.getDatabaseName(),
device,
targetResources.get(currentTargetFileIndex),
currentWriter,
deviceIterator);
} else {
compactNotAlignedSeries(
device, targetResources.get(currentTargetFileIndex), currentWriter, deviceIterator);
Expand Down Expand Up @@ -224,6 +231,7 @@ public void setSummary(CompactionTaskSummary summary) {
}

private void compactAlignedSeries(
String database,
IDeviceID device,
TsFileResource targetResource,
CompactionTsFileWriter writer,
Expand All @@ -239,6 +247,7 @@ private void compactAlignedSeries(
writer.startChunkGroup(device);
BatchedReadChunkAlignedSeriesCompactionExecutor compactionExecutor =
new BatchedReadChunkAlignedSeriesCompactionExecutor(
database,
device,
targetResource,
readerAndChunkMetadataList,
Expand Down Expand Up @@ -293,7 +302,7 @@ private void compactNotAlignedSeries(
seriesIterator.getMetadataListForCurrentSeries();
// remove the chunk metadata whose data type not match the data type of last chunk
readerAndChunkMetadataList =
filterDataTypeNotMatchedChunkMetadata(readerAndChunkMetadataList);
filterDataTypeNotMatchedChunkMetadata(device, measurement, readerAndChunkMetadataList);
SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries =
new SingleSeriesCompactionExecutor(
device, measurement, readerAndChunkMetadataList, writer, targetResource, summary);
Expand All @@ -304,42 +313,63 @@ private void compactNotAlignedSeries(

private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
filterDataTypeNotMatchedChunkMetadata(
IDeviceID deviceID,
String measurement,
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList) {
if (readerAndChunkMetadataList.isEmpty()) {
return readerAndChunkMetadataList;
}
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> result = new LinkedList<>();
// find correct data type
TSDataType correctDataType = null;
for (int i = readerAndChunkMetadataList.size() - 1; i >= 0 && correctDataType == null; i--) {
List<ChunkMetadata> chunkMetadataList = readerAndChunkMetadataList.get(i).getRight();
if (chunkMetadataList == null || chunkMetadataList.isEmpty()) {
continue;
}
boolean hasDifferentDataTypes = false;
Iterator<Pair<TsFileSequenceReader, List<ChunkMetadata>>> descIterator =
readerAndChunkMetadataList.descendingIterator();
while (descIterator.hasNext()) {
Pair<TsFileSequenceReader, List<ChunkMetadata>> pair = descIterator.next();
List<ChunkMetadata> chunkMetadataList = pair.right;
TSDataType dataTypeInCurrentFile = null;
for (ChunkMetadata chunkMetadata : chunkMetadataList) {
if (chunkMetadata == null) {
continue;
if (chunkMetadata != null) {
dataTypeInCurrentFile = chunkMetadata.getDataType();
break;
}
correctDataType = chunkMetadata.getDataType();
}
if (correctDataType == null) {
correctDataType = dataTypeInCurrentFile;
} else if (correctDataType != dataTypeInCurrentFile) {
hasDifferentDataTypes = true;
break;
}
}
if (correctDataType == null) {
if (!hasDifferentDataTypes) {
return readerAndChunkMetadataList;
}

IMeasurementSchema schema =
CompactionUtils.getLatestMeasurementSchemasForTreeModel(
deviceID, Collections.singletonList(measurement))
.get(0);
if (schema != null) {
correctDataType = schema.getType();
}

LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> result = new LinkedList<>();
// check data type consistent and skip compact files with wrong data type
for (Pair<TsFileSequenceReader, List<ChunkMetadata>> tsFileSequenceReaderListPair :
readerAndChunkMetadataList) {
boolean dataTypeConsistent = true;
for (ChunkMetadata chunkMetadata : tsFileSequenceReaderListPair.getRight()) {
if (chunkMetadata != null
&& !MetadataUtils.canAlter(chunkMetadata.getDataType(), correctDataType)) {
dataTypeConsistent = false;
if (chunkMetadata == null) {
continue;
}
if (chunkMetadata.getDataType() == correctDataType) {
break;
}
if (chunkMetadata != null && chunkMetadata.getDataType() != correctDataType) {
chunkMetadata.setNewType(correctDataType);
if (!MetadataUtils.canAlter(chunkMetadata.getDataType(), correctDataType)) {
dataTypeConsistent = false;
break;
}
chunkMetadata.setNewType(correctDataType);
Comment on lines 322 to +372
}
if (!dataTypeConsistent) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
public class CompactionSeriesContext {
Map<TsFileResource, Pair<Long, Long>> fileTimeseriesMetdataOffsetMap;
TSDataType finalType;
boolean needUpdateDataType = false;

public CompactionSeriesContext() {
fileTimeseriesMetdataOffsetMap = new HashMap<>();
Expand Down Expand Up @@ -57,4 +58,16 @@ public void setFinalTypeIfAbsent(TSDataType finalType) {
this.finalType = finalType;
}
}

public void setFinalType(TSDataType finalType) {
this.finalType = finalType;
}

public boolean isNeedUpdateDataType() {
return needUpdateDataType;
}

public void setNeedUpdateDataType(boolean needUpdateDataType) {
this.needUpdateDataType = needUpdateDataType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,20 @@
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.service.metrics.FileMetrics;
Expand All @@ -56,6 +64,7 @@
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.fileSystem.FSFactoryProducer;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -85,8 +94,15 @@ public class CompactionUtils {
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
private static final String SYSTEM = "system";

private static ISchemaFetcher schemaFetcherForTest = null;

private CompactionUtils() {}

@TestOnly
public static void setSchemaFetcher(ISchemaFetcher schemaFetcher) {
CompactionUtils.schemaFetcherForTest = schemaFetcher;
}

/**
* Update the targetResource. Move tmp target file to target file and serialize
* xxx.tsfile.resource.
Expand Down Expand Up @@ -640,4 +656,34 @@ private static void acquireCompactionReadRate(long size) {
}
rateLimiter.acquire((int) size);
}

public static List<IMeasurementSchema> getLatestMeasurementSchemasForTreeModel(
IDeviceID deviceID, List<String> measurements) {
if (measurements.isEmpty()) {
return Collections.emptyList();
}
ISchemaFetcher schemaFetcher =
schemaFetcherForTest == null ? ClusterSchemaFetcher.getInstance() : schemaFetcherForTest;
PartialPath devicePath;
PathPatternTree patternTree = new PathPatternTree();
try {
devicePath = CompactionPathUtils.getPath(deviceID);
for (String measurement : measurements) {
patternTree.appendFullPath(devicePath, measurement);
}
} catch (IllegalPathException e) {
throw new RuntimeException(e);
}
ISchemaTree schemaTree =
schemaFetcher.fetchRawSchemaInMeasurementLevel(
patternTree,
SchemaConstant.ALL_MATCH_SCOPE,
new MPPQueryContext(new QueryId("compaction")),
true);
DeviceSchemaInfo deviceSchemaInfo = schemaTree.searchDeviceSchemaInfo(devicePath, measurements);
if (deviceSchemaInfo == null) {
return Collections.nCopies(measurements.size(), null);
}
return deviceSchemaInfo.getMeasurementSchemaList();
}
}
Loading
Loading