diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index b4fd0b781170d..701e5e75397ed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1083,6 +1083,8 @@ public class IoTDBConfig { private long loadTsFileAnalyzeSchemaMemorySizeInBytes = 0L; // 0 means that the decision will be adaptive based on the number of sequences + private long loadTsFileTabletConversionBatchMemorySizeInBytes = 4096 * 1024; + private int loadTsFileMaxDeviceCountToUseDeviceTimeIndex = 10000; private long loadChunkMetadataMemorySizeInBytes = 33554432; // 32MB @@ -3770,6 +3772,16 @@ public void setLoadTsFileAnalyzeSchemaMemorySizeInBytes( this.loadTsFileAnalyzeSchemaMemorySizeInBytes = loadTsFileAnalyzeSchemaMemorySizeInBytes; } + public long getLoadTsFileTabletConversionBatchMemorySizeInBytes() { + return loadTsFileTabletConversionBatchMemorySizeInBytes; + } + + public void setLoadTsFileTabletConversionBatchMemorySizeInBytes( + long loadTsFileTabletConversionBatchMemorySizeInBytes) { + this.loadTsFileTabletConversionBatchMemorySizeInBytes = + loadTsFileTabletConversionBatchMemorySizeInBytes; + } + public int getLoadTsFileMaxDeviceCountToUseDeviceTimeIndex() { return loadTsFileMaxDeviceCountToUseDeviceTimeIndex; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 7c4b7bbb5f741..7a10972ef08d3 100755 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -2172,6 +2172,11 @@ private void loadLoadTsFileProps(TrimProperties properties) throws IOException { properties.getProperty( "load_tsfile_analyze_schema_memory_size_in_bytes", String.valueOf(conf.getLoadTsFileAnalyzeSchemaMemorySizeInBytes())))); + conf.setLoadTsFileTabletConversionBatchMemorySizeInBytes( + Long.parseLong( + properties.getProperty( + "load_tsfile_tablet_conversion_batch_memory_size_in_bytes", + String.valueOf(conf.getLoadTsFileTabletConversionBatchMemorySizeInBytes())))); conf.setLoadTsFileMaxDeviceCountToUseDeviceTimeIndex( Integer.parseInt( properties.getProperty( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java index 7b4b87fd764dc..cfca453a3ed73 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java @@ -37,7 +37,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex; -import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileAnalyzeSchemaMemoryBlock; +import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock; import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager; import org.apache.iotdb.db.utils.ModificationUtils; @@ -79,7 +79,7 @@ public class LoadTsFileTableSchemaCache { : CONFIG.getLoadTsFileAnalyzeSchemaMemorySizeInBytes(); } - private final LoadTsFileAnalyzeSchemaMemoryBlock block; + private final LoadTsFileMemoryBlock block; private String database; private final Metadata metadata; @@ -104,7 +104,7 @@ public LoadTsFileTableSchemaCache(Metadata metadata, MPPQueryContext context) throws LoadRuntimeOutOfMemoryException { this.block = LoadTsFileMemoryManager.getInstance() - .allocateAnalyzeSchemaMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES); + .allocateMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES); this.metadata = metadata; this.context = context; this.currentBatchTable2Devices = new HashMap<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTreeSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTreeSchemaCache.java index ad5fd0acaeb86..6bd72972d4b1b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTreeSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTreeSchemaCache.java @@ -29,7 +29,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex; -import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileAnalyzeSchemaMemoryBlock; +import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock; import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager; import org.apache.iotdb.db.utils.ModificationUtils; @@ -70,7 +70,7 @@ public class LoadTsFileTreeSchemaCache { FLUSH_ALIGNED_CACHE_MEMORY_SIZE_IN_BYTES = ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES >> 1; } - private final LoadTsFileAnalyzeSchemaMemoryBlock block; + private final LoadTsFileMemoryBlock block; private Map> currentBatchDevice2TimeSeriesSchemas; private Map tsFileDevice2IsAligned; @@ -90,7 +90,7 @@ public class LoadTsFileTreeSchemaCache { public LoadTsFileTreeSchemaCache() throws LoadRuntimeOutOfMemoryException { this.block = LoadTsFileMemoryManager.getInstance() - .allocateAnalyzeSchemaMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES); + .allocateMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES); this.currentBatchDevice2TimeSeriesSchemas = new HashMap<>(); this.tsFileDevice2IsAligned = new HashMap<>(); this.alreadySetDatabases = new HashSet<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java index 6e9601f8d95c1..a4b1c0385ed94 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.StatementNode; import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.rpc.TSStatusCode; @@ -45,6 +46,12 @@ public TSStatus visitInsertTablet( return visitInsertBase(insertTabletStatement, context); } + @Override + public TSStatus visitInsertMultiTablets( + final InsertMultiTabletsStatement insertMultiTabletsStatement, final TSStatus context) { + return visitInsertBase(insertMultiTabletsStatement, context); + } + private TSStatus visitInsertBase( final InsertBaseStatement insertBaseStatement, final TSStatus context) { if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java index 398cee6362780..23e8375fceb8e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java @@ -71,6 +71,7 @@ public Optional visitLoadTsFile( LOGGER.info("Start data type conversion for LoadTsFileStatement: {}.", loadTsFileStatement); + // TODO: Use batch insert after Table model supports insertMultiTablets for (final File file : loadTsFileStatement.getTsFiles()) { try (final TsFileInsertionEventTableParser parser = new TsFileInsertionEventTableParser( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java index d92dbeab3e593..9420c9da36101 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java @@ -21,15 +21,19 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.StatementNode; import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock; +import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.commons.io.FileUtils; @@ -39,16 +43,25 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil.calculateTabletSizeInBytes; public class LoadTreeStatementDataTypeConvertExecutionVisitor extends StatementVisitor, Void> { - - private final StatementExecutor statementExecutor; - private static final Logger LOGGER = LoggerFactory.getLogger(LoadTreeStatementDataTypeConvertExecutionVisitor.class); + private static final long TABLET_BATCH_MEMORY_SIZE_IN_BYTES = + IoTDBDescriptor.getInstance() + .getConfig() + .getLoadTsFileTabletConversionBatchMemorySizeInBytes(); + + private final StatementExecutor statementExecutor; + @FunctionalInterface public interface StatementExecutor { TSStatus execute(final Statement statement); @@ -70,60 +83,82 @@ public Optional visitLoadFile( LOGGER.info("Start data type conversion for LoadTsFileStatement: {}", loadTsFileStatement); - for (final File file : loadTsFileStatement.getTsFiles()) { - try (final TsFileInsertionEventScanParser parser = - new TsFileInsertionEventScanParser( - file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { - for (final Pair tabletWithIsAligned : parser.toTabletWithIsAligneds()) { - final LoadConvertedInsertTabletStatement statement = - new LoadConvertedInsertTabletStatement( - PipeTransferTabletRawReq.toTPipeTransferRawReq( - tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight()) - .constructStatement(), - loadTsFileStatement.isConvertOnTypeMismatch()); - - TSStatus result; - try { - result = - statement.accept( - LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR, - statementExecutor.execute(statement)); - - // Retry max 5 times if the write process is rejected - for (int i = 0; - i < 5 - && result.getCode() - == TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(); - i++) { - Thread.sleep(100L * (i + 1)); - result = - statement.accept( - LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR, - statementExecutor.execute(statement)); + final LoadTsFileMemoryBlock block = + LoadTsFileMemoryManager.getInstance() + .allocateMemoryBlock(TABLET_BATCH_MEMORY_SIZE_IN_BYTES); + final List tabletRawReqs = new ArrayList<>(); + final List tabletRawReqSizes = new ArrayList<>(); + + try { + for (final File file : loadTsFileStatement.getTsFiles()) { + try (final TsFileInsertionEventScanParser parser = + new TsFileInsertionEventScanParser( + file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { + for (final Pair tabletWithIsAligned : parser.toTabletWithIsAligneds()) { + final PipeTransferTabletRawReq tabletRawReq = + PipeTransferTabletRawReq.toTPipeTransferRawReq( + tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight()); + final long curMemory = calculateTabletSizeInBytes(tabletWithIsAligned.getLeft()) + 1; + if (block.hasEnoughMemory(curMemory)) { + tabletRawReqs.add(tabletRawReq); + tabletRawReqSizes.add(curMemory); + block.addMemoryUsage(curMemory); + continue; } - } catch (final Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); + + final TSStatus result = + executeInsertMultiTabletsWithRetry( + tabletRawReqs, loadTsFileStatement.isConvertOnTypeMismatch()); + + for (final long memoryCost : tabletRawReqSizes) { + block.reduceMemoryUsage(memoryCost); + } + tabletRawReqs.clear(); + tabletRawReqSizes.clear(); + + if (!handleTSStatus(result, loadTsFileStatement)) { + return Optional.empty(); } - result = statement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e); + + tabletRawReqs.add(tabletRawReq); + tabletRawReqSizes.add(curMemory); + block.addMemoryUsage(curMemory); + } + } catch (final Exception e) { + LOGGER.warn( + "Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e); + return Optional.empty(); + } + } + + if (!tabletRawReqs.isEmpty()) { + try { + final TSStatus result = + executeInsertMultiTabletsWithRetry( + tabletRawReqs, loadTsFileStatement.isConvertOnTypeMismatch()); + + for (final long memoryCost : tabletRawReqSizes) { + block.reduceMemoryUsage(memoryCost); } + tabletRawReqs.clear(); + tabletRawReqSizes.clear(); - if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - || result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() - || result.getCode() - == TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) { - LOGGER.warn( - "Failed to convert data type for LoadTsFileStatement: {}, status code is {}.", - loadTsFileStatement, - result.getCode()); + if (!handleTSStatus(result, loadTsFileStatement)) { return Optional.empty(); } + } catch (final Exception e) { + LOGGER.warn( + "Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e); + return Optional.empty(); } - } catch (final Exception e) { - LOGGER.warn( - "Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e); - return Optional.empty(); } + } finally { + for (final long memoryCost : tabletRawReqSizes) { + block.reduceMemoryUsage(memoryCost); + } + tabletRawReqs.clear(); + tabletRawReqSizes.clear(); + block.close(); } if (loadTsFileStatement.isDeleteAfterLoad()) { @@ -144,4 +179,57 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { return Optional.of(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); } + + private TSStatus executeInsertMultiTabletsWithRetry( + final List tabletRawReqs, boolean isConvertOnTypeMismatch) { + final InsertMultiTabletsStatement batchStatement = new InsertMultiTabletsStatement(); + batchStatement.setInsertTabletStatementList( + tabletRawReqs.stream() + .map( + req -> + new LoadConvertedInsertTabletStatement( + req.constructStatement(), isConvertOnTypeMismatch)) + .collect(Collectors.toList())); + + TSStatus result; + try { + result = + batchStatement.accept( + LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR, + statementExecutor.execute(batchStatement)); + + // Retry max 5 times if the write process is rejected + for (int i = 0; + i < 5 + && result.getCode() + == TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(); + i++) { + Thread.sleep(100L * (i + 1)); + result = + batchStatement.accept( + LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR, + statementExecutor.execute(batchStatement)); + } + } catch (final Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + result = batchStatement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e); + } + return result; + } + + private static boolean handleTSStatus( + final TSStatus result, final LoadTsFileStatement loadTsFileStatement) { + if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + || result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() + || result.getCode() == TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) { + LOGGER.warn( + "Failed to convert data type for LoadTsFileStatement: {}, status code is {}.", + loadTsFileStatement, + result.getCode()); + return false; + } + return true; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryBlock.java similarity index 88% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryBlock.java index 2bf276748cc0d..af3c1f20f8fde 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryBlock.java @@ -30,14 +30,13 @@ import java.util.concurrent.atomic.AtomicLong; -public class LoadTsFileAnalyzeSchemaMemoryBlock extends LoadTsFileAbstractMemoryBlock { - private static final Logger LOGGER = - LoggerFactory.getLogger(LoadTsFileAnalyzeSchemaMemoryBlock.class); +public class LoadTsFileMemoryBlock extends LoadTsFileAbstractMemoryBlock { + private static final Logger LOGGER = LoggerFactory.getLogger(LoadTsFileMemoryBlock.class); private long totalMemorySizeInBytes; private final AtomicLong memoryUsageInBytes; - LoadTsFileAnalyzeSchemaMemoryBlock(long totalMemorySizeInBytes) { + LoadTsFileMemoryBlock(long totalMemorySizeInBytes) { super(); this.totalMemorySizeInBytes = totalMemorySizeInBytes; @@ -60,7 +59,7 @@ public synchronized void addMemoryUsage(long memoryInBytes) { Metric.LOAD_MEM.toString(), MetricLevel.IMPORTANT, Tag.NAME.toString(), - LoadTsFileMemMetricSet.LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY) + LoadTsFileMemMetricSet.LOAD_TSFILE_OTHER_MEMORY) .incr(memoryInBytes); } @@ -75,7 +74,7 @@ public synchronized void reduceMemoryUsage(long memoryInBytes) { Metric.LOAD_MEM.toString(), MetricLevel.IMPORTANT, Tag.NAME.toString(), - LoadTsFileMemMetricSet.LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY) + LoadTsFileMemMetricSet.LOAD_TSFILE_OTHER_MEMORY) .decr(memoryInBytes); } @@ -108,7 +107,7 @@ protected synchronized void releaseAllMemory() { @Override public String toString() { - return "LoadTsFileAnalyzeSchemaMemoryBlock{" + return "LoadTsFileMemoryBlock{" + "totalMemorySizeInBytes=" + totalMemorySizeInBytes + ", usedMemoryInBytes=" diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java index 221c0bbea3c8c..f55607ece7533 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java @@ -94,25 +94,24 @@ public synchronized void releaseToQuery(long sizeInBytes) { this.notifyAll(); } - public synchronized LoadTsFileAnalyzeSchemaMemoryBlock allocateAnalyzeSchemaMemoryBlock( - long sizeInBytes) throws LoadRuntimeOutOfMemoryException { + public synchronized LoadTsFileMemoryBlock allocateMemoryBlock(long sizeInBytes) + throws LoadRuntimeOutOfMemoryException { try { forceAllocateFromQuery(sizeInBytes); if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Load: Allocated AnalyzeSchemaMemoryBlock from query engine, size: {}", sizeInBytes); + LOGGER.debug("Load: Allocated MemoryBlock from query engine, size: {}", sizeInBytes); } } catch (LoadRuntimeOutOfMemoryException e) { if (dataCacheMemoryBlock != null && dataCacheMemoryBlock.doShrink(sizeInBytes)) { LOGGER.info( - "Load: Query engine's memory is not sufficient, allocated AnalyzeSchemaMemoryBlock from DataCacheMemoryBlock, size: {}", + "Load: Query engine's memory is not sufficient, allocated MemoryBlock from DataCacheMemoryBlock, size: {}", sizeInBytes); usedMemorySizeInBytes.addAndGet(sizeInBytes); - return new LoadTsFileAnalyzeSchemaMemoryBlock(sizeInBytes); + return new LoadTsFileMemoryBlock(sizeInBytes); } throw e; } - return new LoadTsFileAnalyzeSchemaMemoryBlock(sizeInBytes); + return new LoadTsFileMemoryBlock(sizeInBytes); } /** @@ -120,7 +119,7 @@ public synchronized LoadTsFileAnalyzeSchemaMemoryBlock allocateAnalyzeSchemaMemo * * @throws LoadRuntimeOutOfMemoryException if failed to allocate enough memory */ - synchronized void forceResize(LoadTsFileAnalyzeSchemaMemoryBlock memoryBlock, long newSizeInBytes) + synchronized void forceResize(LoadTsFileMemoryBlock memoryBlock, long newSizeInBytes) throws LoadRuntimeOutOfMemoryException { if (memoryBlock.getTotalMemorySizeInBytes() >= newSizeInBytes) { @@ -142,14 +141,14 @@ synchronized void forceResize(LoadTsFileAnalyzeSchemaMemoryBlock memoryBlock, lo forceAllocateFromQuery(bytesNeeded); if (LOGGER.isDebugEnabled()) { LOGGER.info( - "Load: Force resized LoadTsFileAnalyzeSchemaMemoryBlock with memory from query engine, size added: {}, new size: {}", + "Load: Force resized LoadTsFileMemoryBlock with memory from query engine, size added: {}, new size: {}", bytesNeeded, newSizeInBytes); } } catch (LoadRuntimeOutOfMemoryException e) { if (dataCacheMemoryBlock != null && dataCacheMemoryBlock.doShrink(bytesNeeded)) { LOGGER.info( - "Load: Query engine's memory is not sufficient, force resized LoadTsFileAnalyzeSchemaMemoryBlock with memory from DataCacheMemoryBlock, size added: {}, new size: {}", + "Load: Query engine's memory is not sufficient, force resized LoadTsFileMemoryBlock with memory from DataCacheMemoryBlock, size added: {}, new size: {}", bytesNeeded, newSizeInBytes); usedMemorySizeInBytes.addAndGet(bytesNeeded); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileMemMetricSet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileMemMetricSet.java index 77758142b2cda..63608ae12f4b0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileMemMetricSet.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileMemMetricSet.java @@ -30,7 +30,7 @@ public class LoadTsFileMemMetricSet implements IMetricSet { private static final String LOAD_TSFILE_USED_MEMORY = "LoadTsFileUsedMemory"; - public static final String LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY = "LoadTsFileAnalyzeSchemaMemory"; + public static final String LOAD_TSFILE_OTHER_MEMORY = "LoadTsFileOtherMemory"; private static final String LOAD_TSFILE_DATA_CACHE_MEMORY = "LoadTsFileDataCacheMemory"; @@ -57,7 +57,7 @@ public void bindTo(AbstractMetricService metricService) { Metric.LOAD_MEM.toString(), MetricLevel.IMPORTANT, Tag.NAME.toString(), - LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY) + LOAD_TSFILE_OTHER_MEMORY) .set(0L); } @@ -77,7 +77,7 @@ public void unbindFrom(AbstractMetricService metricService) { MetricType.AUTO_GAUGE, Metric.LOAD_MEM.toString(), Tag.NAME.toString(), - LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY); + LOAD_TSFILE_OTHER_MEMORY); } //////////////////////////// singleton ////////////////////////////