Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,8 @@ public class IoTDBConfig {
private long loadTsFileAnalyzeSchemaMemorySizeInBytes =
0L; // 0 means that the decision will be adaptive based on the number of sequences

private long loadTsFileTabletConversionBatchMemorySizeInBytes = 4096 * 1024;

private int loadTsFileMaxDeviceCountToUseDeviceTimeIndex = 10000;

private long loadChunkMetadataMemorySizeInBytes = 33554432; // 32MB
Expand Down Expand Up @@ -3770,6 +3772,16 @@ public void setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
this.loadTsFileAnalyzeSchemaMemorySizeInBytes = loadTsFileAnalyzeSchemaMemorySizeInBytes;
}

public long getLoadTsFileTabletConversionBatchMemorySizeInBytes() {
return loadTsFileTabletConversionBatchMemorySizeInBytes;
}

public void setLoadTsFileTabletConversionBatchMemorySizeInBytes(
long loadTsFileTabletConversionBatchMemorySizeInBytes) {
this.loadTsFileTabletConversionBatchMemorySizeInBytes =
loadTsFileTabletConversionBatchMemorySizeInBytes;
}

public int getLoadTsFileMaxDeviceCountToUseDeviceTimeIndex() {
return loadTsFileMaxDeviceCountToUseDeviceTimeIndex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2172,6 +2172,11 @@ private void loadLoadTsFileProps(TrimProperties properties) throws IOException {
properties.getProperty(
"load_tsfile_analyze_schema_memory_size_in_bytes",
String.valueOf(conf.getLoadTsFileAnalyzeSchemaMemorySizeInBytes()))));
conf.setLoadTsFileTabletConversionBatchMemorySizeInBytes(
Long.parseLong(
properties.getProperty(
"load_tsfile_tablet_conversion_batch_memory_size_in_bytes",
String.valueOf(conf.getLoadTsFileTabletConversionBatchMemorySizeInBytes()))));
conf.setLoadTsFileMaxDeviceCountToUseDeviceTimeIndex(
Integer.parseInt(
properties.getProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileAnalyzeSchemaMemoryBlock;
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock;
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
import org.apache.iotdb.db.utils.ModificationUtils;

Expand Down Expand Up @@ -79,7 +79,7 @@ public class LoadTsFileTableSchemaCache {
: CONFIG.getLoadTsFileAnalyzeSchemaMemorySizeInBytes();
}

private final LoadTsFileAnalyzeSchemaMemoryBlock block;
private final LoadTsFileMemoryBlock block;

private String database;
private final Metadata metadata;
Expand All @@ -104,7 +104,7 @@ public LoadTsFileTableSchemaCache(Metadata metadata, MPPQueryContext context)
throws LoadRuntimeOutOfMemoryException {
this.block =
LoadTsFileMemoryManager.getInstance()
.allocateAnalyzeSchemaMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES);
.allocateMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES);
this.metadata = metadata;
this.context = context;
this.currentBatchTable2Devices = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileAnalyzeSchemaMemoryBlock;
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock;
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
import org.apache.iotdb.db.utils.ModificationUtils;

Expand Down Expand Up @@ -70,7 +70,7 @@ public class LoadTsFileTreeSchemaCache {
FLUSH_ALIGNED_CACHE_MEMORY_SIZE_IN_BYTES = ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES >> 1;
}

private final LoadTsFileAnalyzeSchemaMemoryBlock block;
private final LoadTsFileMemoryBlock block;

private Map<IDeviceID, Set<MeasurementSchema>> currentBatchDevice2TimeSeriesSchemas;
private Map<IDeviceID, Boolean> tsFileDevice2IsAligned;
Expand All @@ -90,7 +90,7 @@ public class LoadTsFileTreeSchemaCache {
public LoadTsFileTreeSchemaCache() throws LoadRuntimeOutOfMemoryException {
this.block =
LoadTsFileMemoryManager.getInstance()
.allocateAnalyzeSchemaMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES);
.allocateMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES);
this.currentBatchDevice2TimeSeriesSchemas = new HashMap<>();
this.tsFileDevice2IsAligned = new HashMap<>();
this.alreadySetDatabases = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.rpc.TSStatusCode;

Expand All @@ -45,6 +46,12 @@ public TSStatus visitInsertTablet(
return visitInsertBase(insertTabletStatement, context);
}

@Override
public TSStatus visitInsertMultiTablets(
final InsertMultiTabletsStatement insertMultiTabletsStatement, final TSStatus context) {
return visitInsertBase(insertMultiTabletsStatement, context);
}

private TSStatus visitInsertBase(
final InsertBaseStatement insertBaseStatement, final TSStatus context) {
if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public Optional<TSStatus> visitLoadTsFile(

LOGGER.info("Start data type conversion for LoadTsFileStatement: {}.", loadTsFileStatement);

// TODO: Use batch insert after Table model supports insertMultiTablets
for (final File file : loadTsFileStatement.getTsFiles()) {
try (final TsFileInsertionEventTableParser parser =
new TsFileInsertionEventTableParser(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock;
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.commons.io.FileUtils;
Expand All @@ -39,16 +43,25 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil.calculateTabletSizeInBytes;

public class LoadTreeStatementDataTypeConvertExecutionVisitor
extends StatementVisitor<Optional<TSStatus>, Void> {

private final StatementExecutor statementExecutor;

private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTreeStatementDataTypeConvertExecutionVisitor.class);

private static final long TABLET_BATCH_MEMORY_SIZE_IN_BYTES =
IoTDBDescriptor.getInstance()
.getConfig()
.getLoadTsFileTabletConversionBatchMemorySizeInBytes();

private final StatementExecutor statementExecutor;

@FunctionalInterface
public interface StatementExecutor {
TSStatus execute(final Statement statement);
Expand All @@ -70,60 +83,82 @@ public Optional<TSStatus> visitLoadFile(

LOGGER.info("Start data type conversion for LoadTsFileStatement: {}", loadTsFileStatement);

for (final File file : loadTsFileStatement.getTsFiles()) {
try (final TsFileInsertionEventScanParser parser =
new TsFileInsertionEventScanParser(
file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) {
for (final Pair<Tablet, Boolean> tabletWithIsAligned : parser.toTabletWithIsAligneds()) {
final LoadConvertedInsertTabletStatement statement =
new LoadConvertedInsertTabletStatement(
PipeTransferTabletRawReq.toTPipeTransferRawReq(
tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight())
.constructStatement(),
loadTsFileStatement.isConvertOnTypeMismatch());

TSStatus result;
try {
result =
statement.accept(
LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
statementExecutor.execute(statement));

// Retry max 5 times if the write process is rejected
for (int i = 0;
i < 5
&& result.getCode()
== TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode();
i++) {
Thread.sleep(100L * (i + 1));
result =
statement.accept(
LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
statementExecutor.execute(statement));
final LoadTsFileMemoryBlock block =
LoadTsFileMemoryManager.getInstance()
.allocateMemoryBlock(TABLET_BATCH_MEMORY_SIZE_IN_BYTES);
final List<PipeTransferTabletRawReq> tabletRawReqs = new ArrayList<>();
final List<Long> tabletRawReqSizes = new ArrayList<>();

try {
for (final File file : loadTsFileStatement.getTsFiles()) {
try (final TsFileInsertionEventScanParser parser =
new TsFileInsertionEventScanParser(
file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) {
for (final Pair<Tablet, Boolean> tabletWithIsAligned : parser.toTabletWithIsAligneds()) {
final PipeTransferTabletRawReq tabletRawReq =
PipeTransferTabletRawReq.toTPipeTransferRawReq(
tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight());
final long curMemory = calculateTabletSizeInBytes(tabletWithIsAligned.getLeft()) + 1;
if (block.hasEnoughMemory(curMemory)) {
tabletRawReqs.add(tabletRawReq);
tabletRawReqSizes.add(curMemory);
block.addMemoryUsage(curMemory);
continue;
}
} catch (final Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();

final TSStatus result =
executeInsertMultiTabletsWithRetry(
tabletRawReqs, loadTsFileStatement.isConvertOnTypeMismatch());

for (final long memoryCost : tabletRawReqSizes) {
block.reduceMemoryUsage(memoryCost);
}
tabletRawReqs.clear();
tabletRawReqSizes.clear();

if (!handleTSStatus(result, loadTsFileStatement)) {
return Optional.empty();
}
result = statement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e);

tabletRawReqs.add(tabletRawReq);
tabletRawReqSizes.add(curMemory);
block.addMemoryUsage(curMemory);
}
} catch (final Exception e) {
LOGGER.warn(
"Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e);
return Optional.empty();
}
}

if (!tabletRawReqs.isEmpty()) {
try {
final TSStatus result =
executeInsertMultiTabletsWithRetry(
tabletRawReqs, loadTsFileStatement.isConvertOnTypeMismatch());

for (final long memoryCost : tabletRawReqSizes) {
block.reduceMemoryUsage(memoryCost);
}
tabletRawReqs.clear();
tabletRawReqSizes.clear();

if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
|| result.getCode()
== TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) {
LOGGER.warn(
"Failed to convert data type for LoadTsFileStatement: {}, status code is {}.",
loadTsFileStatement,
result.getCode());
if (!handleTSStatus(result, loadTsFileStatement)) {
return Optional.empty();
}
} catch (final Exception e) {
LOGGER.warn(
"Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e);
return Optional.empty();
}
} catch (final Exception e) {
LOGGER.warn(
"Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e);
return Optional.empty();
}
} finally {
for (final long memoryCost : tabletRawReqSizes) {
block.reduceMemoryUsage(memoryCost);
}
tabletRawReqs.clear();
tabletRawReqSizes.clear();
block.close();
}

if (loadTsFileStatement.isDeleteAfterLoad()) {
Expand All @@ -144,4 +179,57 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) {

return Optional.of(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}

private TSStatus executeInsertMultiTabletsWithRetry(
final List<PipeTransferTabletRawReq> tabletRawReqs, boolean isConvertOnTypeMismatch) {
final InsertMultiTabletsStatement batchStatement = new InsertMultiTabletsStatement();
batchStatement.setInsertTabletStatementList(
tabletRawReqs.stream()
.map(
req ->
new LoadConvertedInsertTabletStatement(
req.constructStatement(), isConvertOnTypeMismatch))
.collect(Collectors.toList()));

TSStatus result;
try {
result =
batchStatement.accept(
LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
statementExecutor.execute(batchStatement));

// Retry max 5 times if the write process is rejected
for (int i = 0;
i < 5
&& result.getCode()
== TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode();
i++) {
Thread.sleep(100L * (i + 1));
result =
batchStatement.accept(
LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
statementExecutor.execute(batchStatement));
}
} catch (final Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
result = batchStatement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e);
}
return result;
}

private static boolean handleTSStatus(
final TSStatus result, final LoadTsFileStatement loadTsFileStatement) {
if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
|| result.getCode() == TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) {
LOGGER.warn(
"Failed to convert data type for LoadTsFileStatement: {}, status code is {}.",
loadTsFileStatement,
result.getCode());
return false;
}
return true;
}
}
Loading
Loading