From f8773f636c0f46482ab7bb4c37847648f8f9148c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E5=AD=90=E5=9D=A4?= <55695098+DanielWang2035@users.noreply.github.com> Date: Tue, 18 Mar 2025 17:20:07 +0800 Subject: [PATCH 1/4] Load: Batched tablet insertion during conversion --- .../org/apache/iotdb/db/conf/IoTDBConfig.java | 12 ++ .../apache/iotdb/db/conf/IoTDBDescriptor.java | 5 + .../load/LoadTsFileTableSchemaCache.java | 6 +- .../load/LoadTsFileTreeSchemaCache.java | 6 +- ...dInsertTabletStatementTSStatusVisitor.java | 7 + ...tementDataTypeConvertExecutionVisitor.java | 153 +++++++++++++----- ...tementDataTypeConvertExecutionVisitor.java | 152 ++++++++++++----- .../memory/LoadTsFileAbstractMemoryBlock.java | 2 + .../LoadTsFileDataCacheMemoryBlock.java | 5 + ...yBlock.java => LoadTsFileMemoryBlock.java} | 18 ++- .../load/memory/LoadTsFileMemoryManager.java | 19 ++- .../load/metrics/LoadTsFileMemMetricSet.java | 6 +- 12 files changed, 282 insertions(+), 109 deletions(-) rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/{LoadTsFileAnalyzeSchemaMemoryBlock.java => LoadTsFileMemoryBlock.java} (88%) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 41b3c6a0fa33c..5ffd6e0aa4274 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1083,6 +1083,8 @@ public class IoTDBConfig { private long loadTsFileAnalyzeSchemaMemorySizeInBytes = 0L; // 0 means that the decision will be adaptive based on the number of sequences + private long loadTsFileTabletConversionBatchMemorySizeInBytes = 4096 * 1024; + private int loadTsFileMaxDeviceCountToUseDeviceTimeIndex = 10000; private long loadChunkMetadataMemorySizeInBytes = 33554432; // 32MB @@ -3764,6 +3766,16 @@ public void setLoadTsFileAnalyzeSchemaMemorySizeInBytes( this.loadTsFileAnalyzeSchemaMemorySizeInBytes = loadTsFileAnalyzeSchemaMemorySizeInBytes; } + public long getLoadTsFileTabletConversionBatchMemorySizeInBytes() { + return loadTsFileTabletConversionBatchMemorySizeInBytes; + } + + public void setLoadTsFileTabletConversionBatchMemorySizeInBytes( + long loadTsFileTabletConversionBatchMemorySizeInBytes) { + this.loadTsFileTabletConversionBatchMemorySizeInBytes = + loadTsFileTabletConversionBatchMemorySizeInBytes; + } + public int getLoadTsFileMaxDeviceCountToUseDeviceTimeIndex() { return loadTsFileMaxDeviceCountToUseDeviceTimeIndex; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 9bf9d893dfa0a..3311fd4045f10 100755 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -2168,6 +2168,11 @@ private void loadLoadTsFileProps(TrimProperties properties) { properties.getProperty( "load_tsfile_analyze_schema_memory_size_in_bytes", String.valueOf(conf.getLoadTsFileAnalyzeSchemaMemorySizeInBytes())))); + conf.setLoadTsFileTabletConversionBatchMemorySizeInBytes( + Long.parseLong( + properties.getProperty( + "load_tsfile_tablet_conversion_batch_memory_size_in_bytes", + String.valueOf(conf.getLoadTsFileTabletConversionBatchMemorySizeInBytes())))); conf.setLoadTsFileMaxDeviceCountToUseDeviceTimeIndex( Integer.parseInt( properties.getProperty( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java index 7b4b87fd764dc..cfca453a3ed73 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java @@ -37,7 +37,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex; -import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileAnalyzeSchemaMemoryBlock; +import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock; import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager; import org.apache.iotdb.db.utils.ModificationUtils; @@ -79,7 +79,7 @@ public class LoadTsFileTableSchemaCache { : CONFIG.getLoadTsFileAnalyzeSchemaMemorySizeInBytes(); } - private final LoadTsFileAnalyzeSchemaMemoryBlock block; + private final LoadTsFileMemoryBlock block; private String database; private final Metadata metadata; @@ -104,7 +104,7 @@ public LoadTsFileTableSchemaCache(Metadata metadata, MPPQueryContext context) throws LoadRuntimeOutOfMemoryException { this.block = LoadTsFileMemoryManager.getInstance() - .allocateAnalyzeSchemaMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES); + .allocateMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES); this.metadata = metadata; this.context = context; this.currentBatchTable2Devices = new HashMap<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTreeSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTreeSchemaCache.java index ad5fd0acaeb86..6bd72972d4b1b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTreeSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTreeSchemaCache.java @@ -29,7 +29,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex; -import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileAnalyzeSchemaMemoryBlock; +import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock; import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager; import org.apache.iotdb.db.utils.ModificationUtils; @@ -70,7 +70,7 @@ public class LoadTsFileTreeSchemaCache { FLUSH_ALIGNED_CACHE_MEMORY_SIZE_IN_BYTES = ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES >> 1; } - private final LoadTsFileAnalyzeSchemaMemoryBlock block; + private final LoadTsFileMemoryBlock block; private Map> currentBatchDevice2TimeSeriesSchemas; private Map tsFileDevice2IsAligned; @@ -90,7 +90,7 @@ public class LoadTsFileTreeSchemaCache { public LoadTsFileTreeSchemaCache() throws LoadRuntimeOutOfMemoryException { this.block = LoadTsFileMemoryManager.getInstance() - .allocateAnalyzeSchemaMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES); + .allocateMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES); this.currentBatchDevice2TimeSeriesSchemas = new HashMap<>(); this.tsFileDevice2IsAligned = new HashMap<>(); this.alreadySetDatabases = new HashSet<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java index 6e9601f8d95c1..a4b1c0385ed94 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.StatementNode; import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.rpc.TSStatusCode; @@ -45,6 +46,12 @@ public TSStatus visitInsertTablet( return visitInsertBase(insertTabletStatement, context); } + @Override + public TSStatus visitInsertMultiTablets( + final InsertMultiTabletsStatement insertMultiTabletsStatement, final TSStatus context) { + return visitInsertBase(insertMultiTabletsStatement, context); + } + private TSStatus visitInsertBase( final InsertBaseStatement insertBaseStatement, final TSStatus context) { if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java index 2a8a98c59e257..d39681530b8d0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java @@ -21,22 +21,32 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReqV2; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile; import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock; +import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.commons.io.FileUtils; +import org.apache.tsfile.write.record.Tablet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil.calculateTabletSizeInBytes; public class LoadTableStatementDataTypeConvertExecutionVisitor extends AstVisitor, String> { @@ -44,6 +54,11 @@ public class LoadTableStatementDataTypeConvertExecutionVisitor private static final Logger LOGGER = LoggerFactory.getLogger(LoadTableStatementDataTypeConvertExecutionVisitor.class); + private static final long TABLET_BATCH_MEMORY_SIZE_IN_BYTES = + IoTDBDescriptor.getInstance() + .getConfig() + .getLoadTsFileTabletConversionBatchMemorySizeInBytes(); + @FunctionalInterface public interface StatementExecutor { // databaseName can NOT be null @@ -68,6 +83,11 @@ public Optional visitLoadTsFile( LOGGER.info("Start data type conversion for LoadTsFileStatement: {}.", loadTsFileStatement); + final LoadTsFileMemoryBlock block = + LoadTsFileMemoryManager.getInstance() + .allocateMemoryBlock(TABLET_BATCH_MEMORY_SIZE_IN_BYTES); + final List tabletRawReqs = new ArrayList<>(); + for (final File file : loadTsFileStatement.getTsFiles()) { try (final TsFileInsertionEventTableParser parser = new TsFileInsertionEventTableParser( @@ -85,49 +105,24 @@ public Optional visitLoadTsFile( final PipeRawTabletInsertionEvent rawTabletInsertionEvent = (PipeRawTabletInsertionEvent) tabletInsertionEvent; - final LoadConvertedInsertTabletStatement statement = - new LoadConvertedInsertTabletStatement( - PipeTransferTabletRawReqV2.toTPipeTransferRawReq( - rawTabletInsertionEvent.convertToTablet(), - rawTabletInsertionEvent.isAligned(), - databaseName) - .constructStatement(), - loadTsFileStatement.isConvertOnTypeMismatch()); - - TSStatus result; - try { - result = - statement.accept( - LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR, - statementExecutor.execute(statement, databaseName)); - - // Retry max 5 times if the write process is rejected - for (int i = 0; - i < 5 - && result.getCode() - == TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(); - i++) { - Thread.sleep(100L * (i + 1)); - result = - statement.accept( - LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR, - statementExecutor.execute(statement, databaseName)); - } - } catch (final Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - result = statement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e); + final Tablet tablet = rawTabletInsertionEvent.convertToTablet(); + final PipeTransferTabletRawReqV2 tabletRawReq = + PipeTransferTabletRawReqV2.toTPipeTransferRawReq( + tablet, rawTabletInsertionEvent.isAligned(), databaseName); + tabletRawReqs.add(tabletRawReq); + block.addMemoryUsage(calculateTabletSizeInBytes(tablet) + 1); + if (block.hasEnoughMemory()) { + continue; } - if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - || result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() - || result.getCode() - == TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) { - LOGGER.warn( - "Failed to convert data type for LoadTsFileStatement: {}, status code is {}.", - loadTsFileStatement, - result.getCode()); + final TSStatus result = + executeInsertMultiTabletsWithRetry( + tabletRawReqs, databaseName, loadTsFileStatement.isConvertOnTypeMismatch()); + + tabletRawReqs.clear(); + block.clearMemoryUsage(); + + if (!handleTSStatus(result, loadTsFileStatement)) { return Optional.empty(); } } @@ -138,6 +133,27 @@ public Optional visitLoadTsFile( } } + if (!tabletRawReqs.isEmpty()) { + try { + final TSStatus result = + executeInsertMultiTabletsWithRetry( + tabletRawReqs, databaseName, loadTsFileStatement.isConvertOnTypeMismatch()); + + tabletRawReqs.clear(); + block.clearMemoryUsage(); + + if (!handleTSStatus(result, loadTsFileStatement)) { + return Optional.empty(); + } + } catch (final Exception e) { + LOGGER.warn( + "Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e); + return Optional.empty(); + } + } + + block.close(); + if (loadTsFileStatement.isDeleteAfterLoad()) { loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly); } @@ -147,4 +163,59 @@ public Optional visitLoadTsFile( return Optional.of(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); } + + private TSStatus executeInsertMultiTabletsWithRetry( + final List tabletRawReqs, + String databaseName, + boolean isConvertOnTypeMismatch) { + final InsertMultiTabletsStatement batchStatement = new InsertMultiTabletsStatement(); + batchStatement.setInsertTabletStatementList( + tabletRawReqs.stream() + .map( + req -> + new LoadConvertedInsertTabletStatement( + req.constructStatement(), isConvertOnTypeMismatch)) + .collect(Collectors.toList())); + + TSStatus result; + try { + result = + batchStatement.accept( + LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR, + statementExecutor.execute(batchStatement, databaseName)); + + // Retry max 5 times if the write process is rejected + for (int i = 0; + i < 5 + && result.getCode() + == TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(); + i++) { + Thread.sleep(100L * (i + 1)); + result = + batchStatement.accept( + LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR, + statementExecutor.execute(batchStatement, databaseName)); + } + } catch (final Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + result = batchStatement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e); + } + return result; + } + + private static boolean handleTSStatus( + final TSStatus result, final LoadTsFile loadTsFileStatement) { + if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + || result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() + || result.getCode() == TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) { + LOGGER.warn( + "Failed to convert data type for LoadTsFileStatement: {}, status code is {}.", + loadTsFileStatement, + result.getCode()); + return false; + } + return true; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java index eb7efba361850..c573a5d07609a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java @@ -21,12 +21,16 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.StatementNode; import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock; +import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.commons.io.FileUtils; @@ -36,16 +40,25 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil.calculateTabletSizeInBytes; public class LoadTreeStatementDataTypeConvertExecutionVisitor extends StatementVisitor, Void> { - - private final StatementExecutor statementExecutor; - private static final Logger LOGGER = LoggerFactory.getLogger(LoadTreeStatementDataTypeConvertExecutionVisitor.class); + private static final long TABLET_BATCH_MEMORY_SIZE_IN_BYTES = + IoTDBDescriptor.getInstance() + .getConfig() + .getLoadTsFileTabletConversionBatchMemorySizeInBytes(); + + private final StatementExecutor statementExecutor; + @FunctionalInterface public interface StatementExecutor { TSStatus execute(final Statement statement); @@ -67,52 +80,33 @@ public Optional visitLoadFile( LOGGER.info("Start data type conversion for LoadTsFileStatement: {}", loadTsFileStatement); + final LoadTsFileMemoryBlock block = + LoadTsFileMemoryManager.getInstance() + .allocateMemoryBlock(TABLET_BATCH_MEMORY_SIZE_IN_BYTES); + final List tabletRawReqs = new ArrayList<>(); + for (final File file : loadTsFileStatement.getTsFiles()) { try (final TsFileInsertionEventScanParser parser = new TsFileInsertionEventScanParser( file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { for (final Pair tabletWithIsAligned : parser.toTabletWithIsAligneds()) { - final LoadConvertedInsertTabletStatement statement = - new LoadConvertedInsertTabletStatement( - PipeTransferTabletRawReq.toTPipeTransferRawReq( - tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight()) - .constructStatement(), - loadTsFileStatement.isConvertOnTypeMismatch()); - - TSStatus result; - try { - result = - statement.accept( - LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR, - statementExecutor.execute(statement)); - - // Retry max 5 times if the write process is rejected - for (int i = 0; - i < 5 - && result.getCode() - == TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(); - i++) { - Thread.sleep(100L * (i + 1)); - result = - statement.accept( - LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR, - statementExecutor.execute(statement)); - } - } catch (final Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - result = statement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e); + final PipeTransferTabletRawReq tabletRawReq = + PipeTransferTabletRawReq.toTPipeTransferRawReq( + tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight()); + tabletRawReqs.add(tabletRawReq); + block.addMemoryUsage(calculateTabletSizeInBytes(tabletWithIsAligned.getLeft()) + 1); + if (block.hasEnoughMemory()) { + continue; } - if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - || result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() - || result.getCode() - == TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) { - LOGGER.warn( - "Failed to convert data type for LoadTsFileStatement: {}, status code is {}.", - loadTsFileStatement, - result.getCode()); + final TSStatus result = + executeInsertMultiTabletsWithRetry( + tabletRawReqs, loadTsFileStatement.isConvertOnTypeMismatch()); + + tabletRawReqs.clear(); + block.clearMemoryUsage(); + + if (!handleTSStatus(result, loadTsFileStatement)) { return Optional.empty(); } } @@ -123,6 +117,27 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { } } + if (!tabletRawReqs.isEmpty()) { + try { + final TSStatus result = + executeInsertMultiTabletsWithRetry( + tabletRawReqs, loadTsFileStatement.isConvertOnTypeMismatch()); + + tabletRawReqs.clear(); + block.clearMemoryUsage(); + + if (!handleTSStatus(result, loadTsFileStatement)) { + return Optional.empty(); + } + } catch (final Exception e) { + LOGGER.warn( + "Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e); + return Optional.empty(); + } + } + + block.close(); + if (loadTsFileStatement.isDeleteAfterLoad()) { loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly); } @@ -132,4 +147,57 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { return Optional.of(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); } + + private TSStatus executeInsertMultiTabletsWithRetry( + final List tabletRawReqs, boolean isConvertOnTypeMismatch) { + final InsertMultiTabletsStatement batchStatement = new InsertMultiTabletsStatement(); + batchStatement.setInsertTabletStatementList( + tabletRawReqs.stream() + .map( + req -> + new LoadConvertedInsertTabletStatement( + req.constructStatement(), isConvertOnTypeMismatch)) + .collect(Collectors.toList())); + + TSStatus result; + try { + result = + batchStatement.accept( + LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR, + statementExecutor.execute(batchStatement)); + + // Retry max 5 times if the write process is rejected + for (int i = 0; + i < 5 + && result.getCode() + == TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(); + i++) { + Thread.sleep(100L * (i + 1)); + result = + batchStatement.accept( + LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR, + statementExecutor.execute(batchStatement)); + } + } catch (final Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + result = batchStatement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e); + } + return result; + } + + private static boolean handleTSStatus( + final TSStatus result, final LoadTsFileStatement loadTsFileStatement) { + if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + || result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() + || result.getCode() == TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) { + LOGGER.warn( + "Failed to convert data type for LoadTsFileStatement: {}, status code is {}.", + loadTsFileStatement, + result.getCode()); + return false; + } + return true; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java index bca1591b1ef8e..7754475a6c0cf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java @@ -42,6 +42,8 @@ public boolean hasEnoughMemory() { public abstract void reduceMemoryUsage(long memoryInBytes); + public abstract void clearMemoryUsage(); + abstract long getMemoryUsageInBytes(); public abstract void forceResize(long newSizeInBytes); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java index 4611df37aa8d7..d08db972d19d8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java @@ -71,6 +71,11 @@ public synchronized void reduceMemoryUsage(long memoryInBytes) { } } + @Override + public void clearMemoryUsage() { + memoryUsageInBytes.set(0); + } + @Override public synchronized void forceResize(long newSizeInBytes) { throw new UnsupportedOperationException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryBlock.java similarity index 88% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryBlock.java index 2bf276748cc0d..419faf3a4dc17 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryBlock.java @@ -30,14 +30,13 @@ import java.util.concurrent.atomic.AtomicLong; -public class LoadTsFileAnalyzeSchemaMemoryBlock extends LoadTsFileAbstractMemoryBlock { - private static final Logger LOGGER = - LoggerFactory.getLogger(LoadTsFileAnalyzeSchemaMemoryBlock.class); +public class LoadTsFileMemoryBlock extends LoadTsFileAbstractMemoryBlock { + private static final Logger LOGGER = LoggerFactory.getLogger(LoadTsFileMemoryBlock.class); private long totalMemorySizeInBytes; private final AtomicLong memoryUsageInBytes; - LoadTsFileAnalyzeSchemaMemoryBlock(long totalMemorySizeInBytes) { + LoadTsFileMemoryBlock(long totalMemorySizeInBytes) { super(); this.totalMemorySizeInBytes = totalMemorySizeInBytes; @@ -60,7 +59,7 @@ public synchronized void addMemoryUsage(long memoryInBytes) { Metric.LOAD_MEM.toString(), MetricLevel.IMPORTANT, Tag.NAME.toString(), - LoadTsFileMemMetricSet.LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY) + LoadTsFileMemMetricSet.LOAD_TSFILE_OTHER_MEMORY) .incr(memoryInBytes); } @@ -75,10 +74,15 @@ public synchronized void reduceMemoryUsage(long memoryInBytes) { Metric.LOAD_MEM.toString(), MetricLevel.IMPORTANT, Tag.NAME.toString(), - LoadTsFileMemMetricSet.LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY) + LoadTsFileMemMetricSet.LOAD_TSFILE_OTHER_MEMORY) .decr(memoryInBytes); } + @Override + public void clearMemoryUsage() { + memoryUsageInBytes.set(0); + } + @Override synchronized long getMemoryUsageInBytes() { return memoryUsageInBytes.get(); @@ -108,7 +112,7 @@ protected synchronized void releaseAllMemory() { @Override public String toString() { - return "LoadTsFileAnalyzeSchemaMemoryBlock{" + return "LoadTsFileMemoryBlock{" + "totalMemorySizeInBytes=" + totalMemorySizeInBytes + ", usedMemoryInBytes=" diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java index 221c0bbea3c8c..f55607ece7533 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java @@ -94,25 +94,24 @@ public synchronized void releaseToQuery(long sizeInBytes) { this.notifyAll(); } - public synchronized LoadTsFileAnalyzeSchemaMemoryBlock allocateAnalyzeSchemaMemoryBlock( - long sizeInBytes) throws LoadRuntimeOutOfMemoryException { + public synchronized LoadTsFileMemoryBlock allocateMemoryBlock(long sizeInBytes) + throws LoadRuntimeOutOfMemoryException { try { forceAllocateFromQuery(sizeInBytes); if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Load: Allocated AnalyzeSchemaMemoryBlock from query engine, size: {}", sizeInBytes); + LOGGER.debug("Load: Allocated MemoryBlock from query engine, size: {}", sizeInBytes); } } catch (LoadRuntimeOutOfMemoryException e) { if (dataCacheMemoryBlock != null && dataCacheMemoryBlock.doShrink(sizeInBytes)) { LOGGER.info( - "Load: Query engine's memory is not sufficient, allocated AnalyzeSchemaMemoryBlock from DataCacheMemoryBlock, size: {}", + "Load: Query engine's memory is not sufficient, allocated MemoryBlock from DataCacheMemoryBlock, size: {}", sizeInBytes); usedMemorySizeInBytes.addAndGet(sizeInBytes); - return new LoadTsFileAnalyzeSchemaMemoryBlock(sizeInBytes); + return new LoadTsFileMemoryBlock(sizeInBytes); } throw e; } - return new LoadTsFileAnalyzeSchemaMemoryBlock(sizeInBytes); + return new LoadTsFileMemoryBlock(sizeInBytes); } /** @@ -120,7 +119,7 @@ public synchronized LoadTsFileAnalyzeSchemaMemoryBlock allocateAnalyzeSchemaMemo * * @throws LoadRuntimeOutOfMemoryException if failed to allocate enough memory */ - synchronized void forceResize(LoadTsFileAnalyzeSchemaMemoryBlock memoryBlock, long newSizeInBytes) + synchronized void forceResize(LoadTsFileMemoryBlock memoryBlock, long newSizeInBytes) throws LoadRuntimeOutOfMemoryException { if (memoryBlock.getTotalMemorySizeInBytes() >= newSizeInBytes) { @@ -142,14 +141,14 @@ synchronized void forceResize(LoadTsFileAnalyzeSchemaMemoryBlock memoryBlock, lo forceAllocateFromQuery(bytesNeeded); if (LOGGER.isDebugEnabled()) { LOGGER.info( - "Load: Force resized LoadTsFileAnalyzeSchemaMemoryBlock with memory from query engine, size added: {}, new size: {}", + "Load: Force resized LoadTsFileMemoryBlock with memory from query engine, size added: {}, new size: {}", bytesNeeded, newSizeInBytes); } } catch (LoadRuntimeOutOfMemoryException e) { if (dataCacheMemoryBlock != null && dataCacheMemoryBlock.doShrink(bytesNeeded)) { LOGGER.info( - "Load: Query engine's memory is not sufficient, force resized LoadTsFileAnalyzeSchemaMemoryBlock with memory from DataCacheMemoryBlock, size added: {}, new size: {}", + "Load: Query engine's memory is not sufficient, force resized LoadTsFileMemoryBlock with memory from DataCacheMemoryBlock, size added: {}, new size: {}", bytesNeeded, newSizeInBytes); usedMemorySizeInBytes.addAndGet(bytesNeeded); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileMemMetricSet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileMemMetricSet.java index 77758142b2cda..63608ae12f4b0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileMemMetricSet.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileMemMetricSet.java @@ -30,7 +30,7 @@ public class LoadTsFileMemMetricSet implements IMetricSet { private static final String LOAD_TSFILE_USED_MEMORY = "LoadTsFileUsedMemory"; - public static final String LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY = "LoadTsFileAnalyzeSchemaMemory"; + public static final String LOAD_TSFILE_OTHER_MEMORY = "LoadTsFileOtherMemory"; private static final String LOAD_TSFILE_DATA_CACHE_MEMORY = "LoadTsFileDataCacheMemory"; @@ -57,7 +57,7 @@ public void bindTo(AbstractMetricService metricService) { Metric.LOAD_MEM.toString(), MetricLevel.IMPORTANT, Tag.NAME.toString(), - LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY) + LOAD_TSFILE_OTHER_MEMORY) .set(0L); } @@ -77,7 +77,7 @@ public void unbindFrom(AbstractMetricService metricService) { MetricType.AUTO_GAUGE, Metric.LOAD_MEM.toString(), Tag.NAME.toString(), - LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY); + LOAD_TSFILE_OTHER_MEMORY); } //////////////////////////// singleton //////////////////////////// From 15eabf47a698ac857334560e83be5a7992dd2a72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E5=AD=90=E5=9D=A4?= <55695098+DanielWang2035@users.noreply.github.com> Date: Fri, 21 Mar 2025 14:24:20 +0800 Subject: [PATCH 2/4] fix tree --- ...tementDataTypeConvertExecutionVisitor.java | 101 ++++++++++-------- 1 file changed, 54 insertions(+), 47 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java index a3a8aff37eb6e..1f1ddac3c78ba 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java @@ -87,21 +87,45 @@ public Optional visitLoadFile( LoadTsFileMemoryManager.getInstance() .allocateMemoryBlock(TABLET_BATCH_MEMORY_SIZE_IN_BYTES); final List tabletRawReqs = new ArrayList<>(); - - for (final File file : loadTsFileStatement.getTsFiles()) { - try (final TsFileInsertionEventScanParser parser = - new TsFileInsertionEventScanParser( - file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { - for (final Pair tabletWithIsAligned : parser.toTabletWithIsAligneds()) { - final PipeTransferTabletRawReq tabletRawReq = - PipeTransferTabletRawReq.toTPipeTransferRawReq( - tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight()); - tabletRawReqs.add(tabletRawReq); - block.addMemoryUsage(calculateTabletSizeInBytes(tabletWithIsAligned.getLeft()) + 1); - if (block.hasEnoughMemory()) { - continue; + try { + for (final File file : loadTsFileStatement.getTsFiles()) { + try (final TsFileInsertionEventScanParser parser = + new TsFileInsertionEventScanParser( + file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { + for (final Pair tabletWithIsAligned : parser.toTabletWithIsAligneds()) { + final PipeTransferTabletRawReq tabletRawReq = + PipeTransferTabletRawReq.toTPipeTransferRawReq( + tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight()); + final long curMemory = calculateTabletSizeInBytes(tabletWithIsAligned.getLeft()) + 1; + if (block.hasEnoughMemory(curMemory)) { + tabletRawReqs.add(tabletRawReq); + block.addMemoryUsage(curMemory); + continue; + } + + final TSStatus result = + executeInsertMultiTabletsWithRetry( + tabletRawReqs, loadTsFileStatement.isConvertOnTypeMismatch()); + + tabletRawReqs.clear(); + block.clearMemoryUsage(); + + if (!handleTSStatus(result, loadTsFileStatement)) { + return Optional.empty(); + } + + tabletRawReqs.add(tabletRawReq); + block.addMemoryUsage(curMemory); } + } catch (final Exception e) { + LOGGER.warn( + "Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e); + return Optional.empty(); + } + } + if (!tabletRawReqs.isEmpty()) { + try { final TSStatus result = executeInsertMultiTabletsWithRetry( tabletRawReqs, loadTsFileStatement.isConvertOnTypeMismatch()); @@ -112,50 +136,33 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { if (!handleTSStatus(result, loadTsFileStatement)) { return Optional.empty(); } - } - } catch (final Exception e) { - LOGGER.warn( - "Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e); - return Optional.empty(); - } - } - - if (!tabletRawReqs.isEmpty()) { - try { - final TSStatus result = - executeInsertMultiTabletsWithRetry( - tabletRawReqs, loadTsFileStatement.isConvertOnTypeMismatch()); - - tabletRawReqs.clear(); - block.clearMemoryUsage(); - - if (!handleTSStatus(result, loadTsFileStatement)) { + } catch (final Exception e) { + LOGGER.warn( + "Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e); return Optional.empty(); } - } catch (final Exception e) { - LOGGER.warn( - "Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e); - return Optional.empty(); } + } finally { + tabletRawReqs.clear(); + block.clearMemoryUsage(); + block.close(); } - block.close(); - if (loadTsFileStatement.isDeleteAfterLoad()) { loadTsFileStatement - .getTsFiles() - .forEach( - tsfile -> { - FileUtils.deleteQuietly(tsfile); - final String tsFilePath = tsfile.getAbsolutePath(); - FileUtils.deleteQuietly(new File(tsFilePath + TsFileResource.RESOURCE_SUFFIX)); - FileUtils.deleteQuietly(new File(tsFilePath + ModificationFileV1.FILE_SUFFIX)); - FileUtils.deleteQuietly(new File(tsFilePath + ModificationFile.FILE_SUFFIX)); - }); + .getTsFiles() + .forEach( + tsfile -> { + FileUtils.deleteQuietly(tsfile); + final String tsFilePath = tsfile.getAbsolutePath(); + FileUtils.deleteQuietly(new File(tsFilePath + TsFileResource.RESOURCE_SUFFIX)); + FileUtils.deleteQuietly(new File(tsFilePath + ModificationFileV1.FILE_SUFFIX)); + FileUtils.deleteQuietly(new File(tsFilePath + ModificationFile.FILE_SUFFIX)); + }); } LOGGER.info( - "Data type conversion for LoadTsFileStatement {} is successful.", loadTsFileStatement); + "Data type conversion for LoadTsFileStatement {} is successful.", loadTsFileStatement); return Optional.of(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); } From fd2432bf3de9f9bea7cb9962f61d5af49c700fdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E5=AD=90=E5=9D=A4?= <55695098+DanielWang2035@users.noreply.github.com> Date: Fri, 21 Mar 2025 14:27:08 +0800 Subject: [PATCH 3/4] revert for table model --- ...tementDataTypeConvertExecutionVisitor.java | 174 ++++++------------ 1 file changed, 52 insertions(+), 122 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java index 140bac570b6b0..23e8375fceb8e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java @@ -21,35 +21,25 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReqV2; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile; import org.apache.iotdb.db.queryengine.plan.statement.Statement; -import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; -import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock; -import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.commons.io.FileUtils; -import org.apache.tsfile.write.record.Tablet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.util.ArrayList; -import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.stream.Collectors; - -import static org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil.calculateTabletSizeInBytes; public class LoadTableStatementDataTypeConvertExecutionVisitor extends AstVisitor, String> { @@ -57,11 +47,6 @@ public class LoadTableStatementDataTypeConvertExecutionVisitor private static final Logger LOGGER = LoggerFactory.getLogger(LoadTableStatementDataTypeConvertExecutionVisitor.class); - private static final long TABLET_BATCH_MEMORY_SIZE_IN_BYTES = - IoTDBDescriptor.getInstance() - .getConfig() - .getLoadTsFileTabletConversionBatchMemorySizeInBytes(); - @FunctionalInterface public interface StatementExecutor { // databaseName can NOT be null @@ -86,11 +71,7 @@ public Optional visitLoadTsFile( LOGGER.info("Start data type conversion for LoadTsFileStatement: {}.", loadTsFileStatement); - final LoadTsFileMemoryBlock block = - LoadTsFileMemoryManager.getInstance() - .allocateMemoryBlock(TABLET_BATCH_MEMORY_SIZE_IN_BYTES); - final List tabletRawReqs = new ArrayList<>(); - + // TODO: Use batch insert after Table model supports insertMultiTablets for (final File file : loadTsFileStatement.getTsFiles()) { try (final TsFileInsertionEventTableParser parser = new TsFileInsertionEventTableParser( @@ -108,24 +89,49 @@ public Optional visitLoadTsFile( final PipeRawTabletInsertionEvent rawTabletInsertionEvent = (PipeRawTabletInsertionEvent) tabletInsertionEvent; - final Tablet tablet = rawTabletInsertionEvent.convertToTablet(); - final PipeTransferTabletRawReqV2 tabletRawReq = - PipeTransferTabletRawReqV2.toTPipeTransferRawReq( - tablet, rawTabletInsertionEvent.isAligned(), databaseName); - tabletRawReqs.add(tabletRawReq); - block.addMemoryUsage(calculateTabletSizeInBytes(tablet) + 1); - if (block.hasEnoughMemory()) { - continue; + final LoadConvertedInsertTabletStatement statement = + new LoadConvertedInsertTabletStatement( + PipeTransferTabletRawReqV2.toTPipeTransferRawReq( + rawTabletInsertionEvent.convertToTablet(), + rawTabletInsertionEvent.isAligned(), + databaseName) + .constructStatement(), + loadTsFileStatement.isConvertOnTypeMismatch()); + + TSStatus result; + try { + result = + statement.accept( + LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR, + statementExecutor.execute(statement, databaseName)); + + // Retry max 5 times if the write process is rejected + for (int i = 0; + i < 5 + && result.getCode() + == TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(); + i++) { + Thread.sleep(100L * (i + 1)); + result = + statement.accept( + LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR, + statementExecutor.execute(statement, databaseName)); + } + } catch (final Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + result = statement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e); } - final TSStatus result = - executeInsertMultiTabletsWithRetry( - tabletRawReqs, databaseName, loadTsFileStatement.isConvertOnTypeMismatch()); - - tabletRawReqs.clear(); - block.clearMemoryUsage(); - - if (!handleTSStatus(result, loadTsFileStatement)) { + if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + || result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() + || result.getCode() + == TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) { + LOGGER.warn( + "Failed to convert data type for LoadTsFileStatement: {}, status code is {}.", + loadTsFileStatement, + result.getCode()); return Optional.empty(); } } @@ -136,98 +142,22 @@ public Optional visitLoadTsFile( } } - if (!tabletRawReqs.isEmpty()) { - try { - final TSStatus result = - executeInsertMultiTabletsWithRetry( - tabletRawReqs, databaseName, loadTsFileStatement.isConvertOnTypeMismatch()); - - tabletRawReqs.clear(); - block.clearMemoryUsage(); - - if (!handleTSStatus(result, loadTsFileStatement)) { - return Optional.empty(); - } - } catch (final Exception e) { - LOGGER.warn( - "Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e); - return Optional.empty(); - } - } - - block.close(); - if (loadTsFileStatement.isDeleteAfterLoad()) { loadTsFileStatement - .getTsFiles() - .forEach( - tsfile -> { - FileUtils.deleteQuietly(tsfile); - final String tsFilePath = tsfile.getAbsolutePath(); - FileUtils.deleteQuietly(new File(tsFilePath + TsFileResource.RESOURCE_SUFFIX)); - FileUtils.deleteQuietly(new File(tsFilePath + ModificationFileV1.FILE_SUFFIX)); - FileUtils.deleteQuietly(new File(tsFilePath + ModificationFile.FILE_SUFFIX)); - }); + .getTsFiles() + .forEach( + tsfile -> { + FileUtils.deleteQuietly(tsfile); + final String tsFilePath = tsfile.getAbsolutePath(); + FileUtils.deleteQuietly(new File(tsFilePath + TsFileResource.RESOURCE_SUFFIX)); + FileUtils.deleteQuietly(new File(tsFilePath + ModificationFileV1.FILE_SUFFIX)); + FileUtils.deleteQuietly(new File(tsFilePath + ModificationFile.FILE_SUFFIX)); + }); } LOGGER.info( - "Data type conversion for LoadTsFileStatement {} is successful.", loadTsFileStatement); + "Data type conversion for LoadTsFileStatement {} is successful.", loadTsFileStatement); return Optional.of(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); } - - private TSStatus executeInsertMultiTabletsWithRetry( - final List tabletRawReqs, - String databaseName, - boolean isConvertOnTypeMismatch) { - final InsertMultiTabletsStatement batchStatement = new InsertMultiTabletsStatement(); - batchStatement.setInsertTabletStatementList( - tabletRawReqs.stream() - .map( - req -> - new LoadConvertedInsertTabletStatement( - req.constructStatement(), isConvertOnTypeMismatch)) - .collect(Collectors.toList())); - - TSStatus result; - try { - result = - batchStatement.accept( - LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR, - statementExecutor.execute(batchStatement, databaseName)); - - // Retry max 5 times if the write process is rejected - for (int i = 0; - i < 5 - && result.getCode() - == TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(); - i++) { - Thread.sleep(100L * (i + 1)); - result = - batchStatement.accept( - LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR, - statementExecutor.execute(batchStatement, databaseName)); - } - } catch (final Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - result = batchStatement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e); - } - return result; - } - - private static boolean handleTSStatus( - final TSStatus result, final LoadTsFile loadTsFileStatement) { - if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - || result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() - || result.getCode() == TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) { - LOGGER.warn( - "Failed to convert data type for LoadTsFileStatement: {}, status code is {}.", - loadTsFileStatement, - result.getCode()); - return false; - } - return true; - } } From 5e30227a50de672d8ede92289acb4180276a4baf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E5=AD=90=E5=9D=A4?= <55695098+DanielWang2035@users.noreply.github.com> Date: Thu, 27 Mar 2025 20:11:56 +0800 Subject: [PATCH 4/4] fix --- ...tementDataTypeConvertExecutionVisitor.java | 19 ++++++++++++++++--- .../memory/LoadTsFileAbstractMemoryBlock.java | 2 -- .../LoadTsFileDataCacheMemoryBlock.java | 5 ----- .../load/memory/LoadTsFileMemoryBlock.java | 5 ----- 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java index 1f1ddac3c78ba..9420c9da36101 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java @@ -87,6 +87,8 @@ public Optional visitLoadFile( LoadTsFileMemoryManager.getInstance() .allocateMemoryBlock(TABLET_BATCH_MEMORY_SIZE_IN_BYTES); final List tabletRawReqs = new ArrayList<>(); + final List tabletRawReqSizes = new ArrayList<>(); + try { for (final File file : loadTsFileStatement.getTsFiles()) { try (final TsFileInsertionEventScanParser parser = @@ -99,6 +101,7 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { final long curMemory = calculateTabletSizeInBytes(tabletWithIsAligned.getLeft()) + 1; if (block.hasEnoughMemory(curMemory)) { tabletRawReqs.add(tabletRawReq); + tabletRawReqSizes.add(curMemory); block.addMemoryUsage(curMemory); continue; } @@ -107,14 +110,18 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { executeInsertMultiTabletsWithRetry( tabletRawReqs, loadTsFileStatement.isConvertOnTypeMismatch()); + for (final long memoryCost : tabletRawReqSizes) { + block.reduceMemoryUsage(memoryCost); + } tabletRawReqs.clear(); - block.clearMemoryUsage(); + tabletRawReqSizes.clear(); if (!handleTSStatus(result, loadTsFileStatement)) { return Optional.empty(); } tabletRawReqs.add(tabletRawReq); + tabletRawReqSizes.add(curMemory); block.addMemoryUsage(curMemory); } } catch (final Exception e) { @@ -130,8 +137,11 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { executeInsertMultiTabletsWithRetry( tabletRawReqs, loadTsFileStatement.isConvertOnTypeMismatch()); + for (final long memoryCost : tabletRawReqSizes) { + block.reduceMemoryUsage(memoryCost); + } tabletRawReqs.clear(); - block.clearMemoryUsage(); + tabletRawReqSizes.clear(); if (!handleTSStatus(result, loadTsFileStatement)) { return Optional.empty(); @@ -143,8 +153,11 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { } } } finally { + for (final long memoryCost : tabletRawReqSizes) { + block.reduceMemoryUsage(memoryCost); + } tabletRawReqs.clear(); - block.clearMemoryUsage(); + tabletRawReqSizes.clear(); block.close(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java index 7754475a6c0cf..bca1591b1ef8e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java @@ -42,8 +42,6 @@ public boolean hasEnoughMemory() { public abstract void reduceMemoryUsage(long memoryInBytes); - public abstract void clearMemoryUsage(); - abstract long getMemoryUsageInBytes(); public abstract void forceResize(long newSizeInBytes); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java index d08db972d19d8..4611df37aa8d7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java @@ -71,11 +71,6 @@ public synchronized void reduceMemoryUsage(long memoryInBytes) { } } - @Override - public void clearMemoryUsage() { - memoryUsageInBytes.set(0); - } - @Override public synchronized void forceResize(long newSizeInBytes) { throw new UnsupportedOperationException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryBlock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryBlock.java index 419faf3a4dc17..af3c1f20f8fde 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryBlock.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryBlock.java @@ -78,11 +78,6 @@ public synchronized void reduceMemoryUsage(long memoryInBytes) { .decr(memoryInBytes); } - @Override - public void clearMemoryUsage() { - memoryUsageInBytes.set(0); - } - @Override synchronized long getMemoryUsageInBytes() { return memoryUsageInBytes.get();