From ec29152608dd6fa29d5276b78040befd97824008 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Tue, 9 Apr 2024 16:54:11 +0800 Subject: [PATCH 1/5] fix --- .../apache/doris/common/profile/Profile.java | 2 +- .../doris/common/profile/SummaryProfile.java | 120 +++++++++++++++++- .../doris/datasource/hive/HMSTransaction.java | 108 ++++++++++++++-- .../commands/insert/HiveInsertExecutor.java | 9 +- .../org/apache/doris/qe/StmtExecutor.java | 4 +- .../doris/transaction/TransactionType.java | 24 ++++ 6 files changed, 250 insertions(+), 17 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java index 12ba687bfd1bcc..b9cefdd0c4aa04 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java @@ -89,7 +89,7 @@ public List getExecutionProfiles() { } // This API will also add the profile to ProfileManager, so that we could get the profile from ProfileManager. - // isFinished ONLY means the cooridnator or stmtexecutor is finished. + // isFinished ONLY means the coordinator or stmtexecutor is finished. public synchronized void updateSummary(long startTime, Map summaryInfo, boolean isFinished, Planner planner) { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java index b7bc5b4728e2d6..90033258c4ea64 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java @@ -21,6 +21,7 @@ import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.thrift.TUnit; +import org.apache.doris.transaction.TransactionType; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -89,10 +90,19 @@ public class SummaryProfile { public static final String FRAGMENT_COMPRESSED_SIZE = "Fragment Compressed Size"; public static final String FRAGMENT_RPC_COUNT = "Fragment RPC Count"; + public static final String TRANSACTION_COMMIT_TIME = "Transaction Commit Time"; + public static final String FILESYSTEM_OPT_TIME = "FileSystem Operator Time"; + public static final String FILESYSTEM_OPT_RENAME_FILE_CNT = "Rename File Cnt"; + public static final String FILESYSTEM_OPT_RENAME_DIR_CNT = "Rename Dir Cnt"; + public static final String FILESYSTEM_OPT_DELETE_DIR_CNT = "Delete Dir Cnt"; + public static final String HMS_ADD_PARTITION_TIME = "HMS Add Partition Time"; + public static final String HMS_ADD_PARTITION_CNT = "HMS Add Partition Cnt"; + public static final String HMS_UPDATE_PARTITION_TIME = "HMS Update Partition Time"; + public static final String HMS_UPDATE_PARTITION_CNT = "HMS Update Partition Cnt"; // These info will display on FE's web ui table, every one will be displayed as // a column, so that should not - // add many columns here. Add to ExcecutionSummary list. + // add many columns here. Add to ExecutionSummary list. public static final ImmutableList SUMMARY_KEYS = ImmutableList.of(PROFILE_ID, TASK_TYPE, START_TIME, END_TIME, TOTAL_TIME, TASK_STATE, USER, DEFAULT_DB, SQL_STATEMENT); @@ -111,7 +121,8 @@ public class SummaryProfile { QUERY_DISTRIBUTED_TIME, INIT_SCAN_NODE_TIME, FINALIZE_SCAN_NODE_TIME, - GET_SPLITS_TIME, GET_PARTITIONS_TIME, + GET_SPLITS_TIME, + GET_PARTITIONS_TIME, GET_PARTITION_FILES_TIME, CREATE_SCAN_RANGE_TIME, GET_PARTITION_VERSION_TIME, @@ -136,7 +147,9 @@ public class SummaryProfile { TOTAL_INSTANCES_NUM, INSTANCES_NUM_PER_BE, PARALLEL_FRAGMENT_EXEC_INSTANCE, - TRACE_ID); + TRACE_ID, + TRANSACTION_COMMIT_TIME + ); // Ident of each item. Default is 0, which doesn't need to present in this Map. // Please set this map for new profile items if they need ident. @@ -162,6 +175,14 @@ public class SummaryProfile { .put(SEND_FRAGMENT_PHASE2_TIME, 1) .put(FRAGMENT_COMPRESSED_SIZE, 1) .put(FRAGMENT_RPC_COUNT, 1) + .put(FILESYSTEM_OPT_TIME, 1) + .put(FILESYSTEM_OPT_RENAME_FILE_CNT, 2) + .put(FILESYSTEM_OPT_RENAME_DIR_CNT, 2) + .put(FILESYSTEM_OPT_DELETE_DIR_CNT, 2) + .put(HMS_ADD_PARTITION_TIME, 1) + .put(HMS_ADD_PARTITION_CNT, 2) + .put(HMS_UPDATE_PARTITION_TIME, 1) + .put(HMS_UPDATE_PARTITION_CNT, 2) .build(); private RuntimeProfile summaryProfile; @@ -212,6 +233,17 @@ public class SummaryProfile { private long getPartitionVersionByHasDataCount = 0; private long getTableVersionTime = 0; private long getTableVersionCount = 0; + private long transactionCommitBeginTime = -1; + private long transactionCommitEndTime = -1; + private long filesystemOptTime = -1; + private long hmsAddPartitionTime = -1; + private long hmsAddPartitionCnt = 0; + private long hmsUpdatePartitionTime = -1; + private long hmsUpdatePartitionCnt = 0; + private long filesystemRenameFileCnt = 0; + private long filesystemRenameDirCnt = 0; + private long filesystemDeleteDirCnt = 0; + private TransactionType transactionType = TransactionType.UNKNOWN; public SummaryProfile() { summaryProfile = new RuntimeProfile(SUMMARY_PROFILE_NAME); @@ -317,6 +349,7 @@ private void updateExecutionSummaryProfile() { RuntimeProfile.printCounter(queryFetchResultConsumeTime, TUnit.TIME_MS)); executionSummaryProfile.addInfoString(WRITE_RESULT_TIME, RuntimeProfile.printCounter(queryWriteResultConsumeTime, TUnit.TIME_MS)); + setTransactionSummary(); if (Config.isCloudMode()) { executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_TIME, getPrettyGetPartitionVersionTime()); @@ -328,6 +361,31 @@ private void updateExecutionSummaryProfile() { } } + public void setTransactionSummary() { + executionSummaryProfile.addInfoString(TRANSACTION_COMMIT_TIME, + getPrettyTime(transactionCommitEndTime, transactionCommitBeginTime, TUnit.TIME_MS)); + + if (transactionType.equals(TransactionType.HMS)) { + executionSummaryProfile.addInfoString(FILESYSTEM_OPT_TIME, + getPrettyTime(filesystemOptTime, 0, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(FILESYSTEM_OPT_RENAME_FILE_CNT, + getPrettyCount(filesystemRenameFileCnt)); + executionSummaryProfile.addInfoString(FILESYSTEM_OPT_RENAME_DIR_CNT, + getPrettyCount(filesystemRenameDirCnt)); + executionSummaryProfile.addInfoString(FILESYSTEM_OPT_DELETE_DIR_CNT, + getPrettyCount(filesystemDeleteDirCnt)); + + executionSummaryProfile.addInfoString(HMS_ADD_PARTITION_TIME, + getPrettyTime(hmsAddPartitionTime, 0, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(HMS_ADD_PARTITION_CNT, + getPrettyCount(hmsAddPartitionCnt)); + executionSummaryProfile.addInfoString(HMS_UPDATE_PARTITION_TIME, + getPrettyTime(hmsUpdatePartitionTime, 0, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(HMS_UPDATE_PARTITION_CNT, + getPrettyCount(hmsUpdatePartitionCnt)); + } + } + public void setParseSqlStartTime(long parseSqlStartTime) { this.parseSqlStartTime = parseSqlStartTime; } @@ -607,6 +665,10 @@ private String getPrettyGetPartitionVersionCount() { return RuntimeProfile.printCounter(getPartitionVersionCount, TUnit.UNIT); } + private String getPrettyCount(long cnt) { + return RuntimeProfile.printCounter(cnt, TUnit.UNIT); + } + private String getPrettyGetTableVersionTime() { if (getTableVersionTime == 0) { return "N/A"; @@ -624,4 +686,56 @@ private String getPrettyTime(long end, long start, TUnit unit) { } return RuntimeProfile.printCounter(end - start, unit); } + + public void setTransactionBeginTime(TransactionType type) { + this.transactionCommitBeginTime = TimeUtils.getStartTimeMs(); + this.transactionType = type; + } + + public void setTransactionEndTime() { + this.transactionCommitEndTime = TimeUtils.getStartTimeMs(); + } + + public void freshFilesystemOptTime() { + if (this.filesystemOptTime == -1) { + // Because this value needs to be summed up. + // If it is not set zero here: + // 1. If the detection time is longer than 1ms, + // the final cumulative value will be 1 ms less due to -1 initialization. + // 2. if the detection time is no longer than 1ms, + // the final cumulative value will be -1 always. + // This is considered to be the indicator's not being detected, + // Apparently not, it's just that the value detected is 0. + this.filesystemOptTime = 0; + } + this.filesystemOptTime += System.currentTimeMillis() - tempStarTime; + } + + public void setHmsAddPartitionTime() { + this.hmsAddPartitionTime = TimeUtils.getStartTimeMs() - tempStarTime; + } + + public void addHmsAddPartitionCnt(long c) { + this.hmsAddPartitionCnt = c; + } + + public void setHmsUpdatePartitionTime() { + this.hmsUpdatePartitionTime = TimeUtils.getStartTimeMs() - tempStarTime; + } + + public void addHmsUpdatePartitionCnt(long c) { + this.hmsUpdatePartitionCnt = c; + } + + public void addRenameFileCnt(long c) { + this.filesystemRenameFileCnt += c; + } + + public void incRenameDirCnt() { + this.filesystemRenameDirCnt += 1; + } + + public void incDeleteDirRecursiveCnt() { + this.filesystemDeleteDirCnt += 1; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java index 84221b74e7f1e5..eedbcd323a1356 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java @@ -23,8 +23,10 @@ import org.apache.doris.backup.Status; import org.apache.doris.common.Pair; +import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.fs.FileSystem; import org.apache.doris.fs.remote.RemoteFile; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.THivePartitionUpdate; import org.apache.doris.thrift.TUpdateMode; import org.apache.doris.transaction.Transaction; @@ -68,6 +70,7 @@ public class HMSTransaction implements Transaction { private final FileSystem fs; private String dbName; private String tbName; + private SummaryProfile summaryProfile = null; private final Map> tableActions = new HashMap<>(); private final Map, Action>> @@ -79,6 +82,10 @@ public class HMSTransaction implements Transaction { public HMSTransaction(HiveMetadataOps hiveOps) { this.hiveOps = hiveOps; this.fs = hiveOps.getFs(); + + if (ConnectContext.get().getExecutor() != null) { + summaryProfile = ConnectContext.get().getExecutor().getSummaryProfile(); + } } @Override @@ -597,7 +604,7 @@ private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, boolean d } public boolean deleteIfExists(Path path) { - Status status = fs.delete(path.toString()); + Status status = wrapperDeleteWithProfileSummary(path.toString()); if (status.ok()) { return true; } @@ -1057,7 +1064,7 @@ public void prepareInsertExistingTable(TableAndMore tableAndMore) { String targetPath = table.getSd().getLocation(); String writePath = tableAndMore.getCurrentLocation(); if (!targetPath.equals(writePath)) { - fs.asyncRename( + wrapperAsyncRenameWithProfileSummary( fileSystemExecutor, asyncFileSystemTaskFutures, fileSystemTaskCancelled, @@ -1083,7 +1090,7 @@ public void prepareAlterTable(TableAndMore tableAndMore) { if (!targetPath.equals(writePath)) { Path path = new Path(targetPath); String oldTablePath = new Path(path.getParent(), "_temp_" + path.getName()).toString(); - Status status = fs.renameDir( + Status status = wrapperRenameDirWithProfileSummary( targetPath, oldTablePath, () -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldTablePath, targetPath))); @@ -1093,7 +1100,7 @@ public void prepareAlterTable(TableAndMore tableAndMore) { } clearDirsForFinish.add(oldTablePath); - status = fs.renameDir( + status = wrapperRenameDirWithProfileSummary( writePath, targetPath, () -> directoryCleanUpTasksForAbort.add( @@ -1120,7 +1127,7 @@ public void prepareAddPartition(PartitionAndMore partitionAndMore) { String writePath = partitionAndMore.getCurrentLocation(); if (!targetPath.equals(writePath)) { - fs.asyncRenameDir( + wrapperAsyncRenameDirWithProfileSummary( fileSystemExecutor, asyncFileSystemTaskFutures, fileSystemTaskCancelled, @@ -1160,7 +1167,7 @@ public void prepareInsertExistPartition(PartitionAndMore partitionAndMore) { directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, false)); if (!targetPath.equals(writePath)) { - fs.asyncRename( + wrapperAsyncRenameWithProfileSummary( fileSystemExecutor, asyncFileSystemTaskFutures, fileSystemTaskCancelled, @@ -1189,7 +1196,7 @@ private void runRenameDirTasksForAbort() { for (RenameDirectoryTask task : renameDirectoryTasksForAbort) { status = fs.exists(task.getRenameFrom()); if (status.ok()) { - status = fs.renameDir(task.getRenameFrom(), task.getRenameTo(), () -> {}); + status = wrapperRenameDirWithProfileSummary(task.getRenameFrom(), task.getRenameTo(), () -> {}); if (!status.ok()) { LOG.warn("Failed to abort rename dir from {} to {}:{}", task.getRenameFrom(), task.getRenameTo(), status.getErrMsg()); @@ -1201,7 +1208,7 @@ private void runRenameDirTasksForAbort() { private void runClearPathsForFinish() { Status status; for (String path : clearDirsForFinish) { - status = fs.delete(path); + status = wrapperDeleteWithProfileSummary(path); if (!status.ok()) { LOG.warn("Failed to recursively delete path {}:{}", path, status.getErrCode()); } @@ -1216,7 +1223,7 @@ public void prepareAlterPartition(PartitionAndMore partitionAndMore) { if (!targetPath.equals(writePath)) { Path path = new Path(targetPath); String oldPartitionPath = new Path(path.getParent(), "_temp_" + path.getName()).toString(); - Status status = fs.renameDir( + Status status = wrapperRenameDirWithProfileSummary( targetPath, oldPartitionPath, () -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldPartitionPath, targetPath))); @@ -1228,7 +1235,7 @@ public void prepareAlterPartition(PartitionAndMore partitionAndMore) { } clearDirsForFinish.add(oldPartitionPath); - status = fs.renameDir( + status = wrapperRenameDirWithProfileSummary( writePath, targetPath, () -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true))); @@ -1250,18 +1257,41 @@ public void prepareAlterPartition(PartitionAndMore partitionAndMore) { private void waitForAsyncFileSystemTasks() { + if (summaryProfile != null) { + summaryProfile.setTempStartTime(); + } + for (CompletableFuture future : asyncFileSystemTaskFutures) { MoreFutures.getFutureValue(future, RuntimeException.class); } + + if (summaryProfile != null) { + summaryProfile.freshFilesystemOptTime(); + } } private void doAddPartitionsTask() { + + if (summaryProfile != null) { + summaryProfile.setTempStartTime(); + summaryProfile.addHmsAddPartitionCnt(addPartitionsTask.getPartitions().size()); + } + if (!addPartitionsTask.isEmpty()) { addPartitionsTask.run(hiveOps); } + + if (summaryProfile != null) { + summaryProfile.setHmsAddPartitionTime(); + } } private void doUpdateStatisticsTasks() { + if (summaryProfile != null) { + summaryProfile.setTempStartTime(); + summaryProfile.addHmsUpdatePartitionCnt(updateStatisticsTasks.size()); + } + ImmutableList.Builder> updateStatsFutures = ImmutableList.builder(); List failedTaskDescriptions = new ArrayList<>(); List suppressedExceptions = new ArrayList<>(); @@ -1289,6 +1319,10 @@ private void doUpdateStatisticsTasks() { suppressedExceptions.forEach(exception::addSuppressed); throw exception; } + + if (summaryProfile != null) { + summaryProfile.setHmsUpdatePartitionTime(); + } } public void doNothing() { @@ -1312,4 +1346,58 @@ public void rollback() { runRenameDirTasksForAbort(); } } + + public Status wrapperRenameDirWithProfileSummary(String origFilePath, + String destFilePath, + Runnable runWhenPathNotExist) { + if (summaryProfile != null) { + summaryProfile.setTempStartTime(); + summaryProfile.incRenameDirCnt(); + } + + Status status = fs.renameDir(origFilePath, destFilePath, runWhenPathNotExist); + + if (summaryProfile != null) { + summaryProfile.freshFilesystemOptTime(); + } + return status; + } + + public Status wrapperDeleteWithProfileSummary(String remotePath) { + if (summaryProfile != null) { + summaryProfile.setTempStartTime(); + summaryProfile.incDeleteDirRecursiveCnt(); + } + + Status status = fs.delete(remotePath); + + if (summaryProfile != null) { + summaryProfile.freshFilesystemOptTime(); + } + return status; + } + + public void wrapperAsyncRenameWithProfileSummary(Executor executor, + List> renameFileFutures, + AtomicBoolean cancelled, + String origFilePath, + String destFilePath, + List fileNames) { + fs.asyncRename(executor, renameFileFutures, cancelled, origFilePath, destFilePath, fileNames); + if (summaryProfile != null) { + summaryProfile.addRenameFileCnt(fileNames.size()); + } + } + + public void wrapperAsyncRenameDirWithProfileSummary(Executor executor, + List> renameFileFutures, + AtomicBoolean cancelled, + String origFilePath, + String destFilePath, + Runnable runWhenPathNotExist) { + fs.asyncRenameDir(executor, renameFileFutures, cancelled, origFilePath, destFilePath, runWhenPathNotExist); + if (summaryProfile != null) { + summaryProfile.incRenameDirCnt(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java index 66dfe763e469be..d8d8fb684ea54c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java @@ -36,6 +36,7 @@ import org.apache.doris.qe.StmtExecutor; import org.apache.doris.transaction.TransactionManager; import org.apache.doris.transaction.TransactionStatus; +import org.apache.doris.transaction.TransactionType; import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; @@ -102,7 +103,13 @@ protected void onComplete() throws UserException { String dbName = ((HMSExternalTable) table).getDbName(); String tbName = table.getName(); transaction.finishInsertTable(dbName, tbName); + if (ConnectContext.get().getExecutor() != null) { + ConnectContext.get().getExecutor().getSummaryProfile().setTransactionBeginTime(TransactionType.HMS); + } transactionManager.commit(txnId); + if (ConnectContext.get().getExecutor() != null) { + ConnectContext.get().getExecutor().getSummaryProfile().setTransactionEndTime(); + } txnStatus = TransactionStatus.COMMITTED; Env.getCurrentEnv().getCatalogMgr().refreshExternalTable( dbName, @@ -135,7 +142,7 @@ protected void afterExec(StmtExecutor executor) { sb.append("{"); sb.append("'status':'") .append(ctx.isTxnModel() ? TransactionStatus.PREPARE.name() : txnStatus.name()); - // sb.append("', 'txnId':'").append(txnId).append("'"); + sb.append("', 'txnId':'").append(txnId).append("'"); if (!Strings.isNullOrEmpty(errMsg)) { sb.append(", 'err':'").append(errMsg).append("'"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 15d8c22af5ed7d..d33fe44d55d6c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1141,13 +1141,13 @@ public void updateProfile(boolean isFinished) { if (!context.getSessionVariable().enableProfile()) { return; } - // If any error happends in update profile, we should ignore this error + // If any error happened in update profile, we should ignore this error // and ensure the sql is finished normally. For example, if update profile // failed, the insert stmt should be success try { profile.updateSummary(context.startTime, getSummaryInfo(isFinished), isFinished, this.planner); } catch (Throwable t) { - LOG.warn("failed to update profile, ingore this error", t); + LOG.warn("failed to update profile, ignore this error", t); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java new file mode 100644 index 00000000000000..2372c199738116 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.transaction; + +public enum TransactionType { + UNKNOWN, + HMS, + ICEBERG +} From 64de08ac3a58d2ee52ca85c5fe270e2bf3a29dcb Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Wed, 10 Apr 2024 17:48:45 +0800 Subject: [PATCH 2/5] fix --- .../apache/doris/common/profile/SummaryProfile.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java index 90033258c4ea64..b3e8898ed40a05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java @@ -92,13 +92,13 @@ public class SummaryProfile { public static final String FRAGMENT_RPC_COUNT = "Fragment RPC Count"; public static final String TRANSACTION_COMMIT_TIME = "Transaction Commit Time"; public static final String FILESYSTEM_OPT_TIME = "FileSystem Operator Time"; - public static final String FILESYSTEM_OPT_RENAME_FILE_CNT = "Rename File Cnt"; - public static final String FILESYSTEM_OPT_RENAME_DIR_CNT = "Rename Dir Cnt"; - public static final String FILESYSTEM_OPT_DELETE_DIR_CNT = "Delete Dir Cnt"; + public static final String FILESYSTEM_OPT_RENAME_FILE_CNT = "Rename File Count"; + public static final String FILESYSTEM_OPT_RENAME_DIR_CNT = "Rename Dir Count"; + public static final String FILESYSTEM_OPT_DELETE_DIR_CNT = "Delete Dir Count"; public static final String HMS_ADD_PARTITION_TIME = "HMS Add Partition Time"; - public static final String HMS_ADD_PARTITION_CNT = "HMS Add Partition Cnt"; + public static final String HMS_ADD_PARTITION_CNT = "HMS Add Partition Count"; public static final String HMS_UPDATE_PARTITION_TIME = "HMS Update Partition Time"; - public static final String HMS_UPDATE_PARTITION_CNT = "HMS Update Partition Cnt"; + public static final String HMS_UPDATE_PARTITION_CNT = "HMS Update Partition Count"; // These info will display on FE's web ui table, every one will be displayed as // a column, so that should not From 74f46fe3f4068ff7bdbdcdc1109aed852ad3bae3 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Wed, 10 Apr 2024 22:30:30 +0800 Subject: [PATCH 3/5] fix --- .../doris/datasource/hive/HMSTransaction.java | 68 +++++++------------ 1 file changed, 26 insertions(+), 42 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java index eedbcd323a1356..0e668e0eda5195 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java @@ -70,7 +70,7 @@ public class HMSTransaction implements Transaction { private final FileSystem fs; private String dbName; private String tbName; - private SummaryProfile summaryProfile = null; + private Optional summaryProfile = Optional.empty(); private final Map> tableActions = new HashMap<>(); private final Map, Action>> @@ -84,7 +84,7 @@ public HMSTransaction(HiveMetadataOps hiveOps) { this.fs = hiveOps.getFs(); if (ConnectContext.get().getExecutor() != null) { - summaryProfile = ConnectContext.get().getExecutor().getSummaryProfile(); + summaryProfile = Optional.of(ConnectContext.get().getExecutor().getSummaryProfile()); } } @@ -1257,40 +1257,34 @@ public void prepareAlterPartition(PartitionAndMore partitionAndMore) { private void waitForAsyncFileSystemTasks() { - if (summaryProfile != null) { - summaryProfile.setTempStartTime(); - } + summaryProfile.ifPresent(SummaryProfile::setTempStartTime); for (CompletableFuture future : asyncFileSystemTaskFutures) { MoreFutures.getFutureValue(future, RuntimeException.class); } - if (summaryProfile != null) { - summaryProfile.freshFilesystemOptTime(); - } + summaryProfile.ifPresent(SummaryProfile::freshFilesystemOptTime); } private void doAddPartitionsTask() { - if (summaryProfile != null) { - summaryProfile.setTempStartTime(); - summaryProfile.addHmsAddPartitionCnt(addPartitionsTask.getPartitions().size()); - } + summaryProfile.ifPresent(profile -> { + profile.setTempStartTime(); + profile.addHmsAddPartitionCnt(addPartitionsTask.getPartitions().size()); + }); if (!addPartitionsTask.isEmpty()) { addPartitionsTask.run(hiveOps); } - if (summaryProfile != null) { - summaryProfile.setHmsAddPartitionTime(); - } + summaryProfile.ifPresent(SummaryProfile::setHmsAddPartitionTime); } private void doUpdateStatisticsTasks() { - if (summaryProfile != null) { - summaryProfile.setTempStartTime(); - summaryProfile.addHmsUpdatePartitionCnt(updateStatisticsTasks.size()); - } + summaryProfile.ifPresent(profile -> { + profile.setTempStartTime(); + profile.addHmsUpdatePartitionCnt(updateStatisticsTasks.size()); + }); ImmutableList.Builder> updateStatsFutures = ImmutableList.builder(); List failedTaskDescriptions = new ArrayList<>(); @@ -1320,9 +1314,7 @@ private void doUpdateStatisticsTasks() { throw exception; } - if (summaryProfile != null) { - summaryProfile.setHmsUpdatePartitionTime(); - } + summaryProfile.ifPresent(SummaryProfile::setHmsUpdatePartitionTime); } public void doNothing() { @@ -1350,30 +1342,26 @@ public void rollback() { public Status wrapperRenameDirWithProfileSummary(String origFilePath, String destFilePath, Runnable runWhenPathNotExist) { - if (summaryProfile != null) { - summaryProfile.setTempStartTime(); - summaryProfile.incRenameDirCnt(); - } + summaryProfile.ifPresent(profile -> { + profile.setTempStartTime(); + profile.incRenameDirCnt(); + }); Status status = fs.renameDir(origFilePath, destFilePath, runWhenPathNotExist); - if (summaryProfile != null) { - summaryProfile.freshFilesystemOptTime(); - } + summaryProfile.ifPresent(SummaryProfile::freshFilesystemOptTime); return status; } public Status wrapperDeleteWithProfileSummary(String remotePath) { - if (summaryProfile != null) { - summaryProfile.setTempStartTime(); - summaryProfile.incDeleteDirRecursiveCnt(); - } + summaryProfile.ifPresent(profile -> { + profile.setTempStartTime(); + profile.incDeleteDirRecursiveCnt(); + }); Status status = fs.delete(remotePath); - if (summaryProfile != null) { - summaryProfile.freshFilesystemOptTime(); - } + summaryProfile.ifPresent(SummaryProfile::freshFilesystemOptTime); return status; } @@ -1384,9 +1372,7 @@ public void wrapperAsyncRenameWithProfileSummary(Executor executor, String destFilePath, List fileNames) { fs.asyncRename(executor, renameFileFutures, cancelled, origFilePath, destFilePath, fileNames); - if (summaryProfile != null) { - summaryProfile.addRenameFileCnt(fileNames.size()); - } + summaryProfile.ifPresent(profile -> profile.addRenameFileCnt(fileNames.size())); } public void wrapperAsyncRenameDirWithProfileSummary(Executor executor, @@ -1396,8 +1382,6 @@ public void wrapperAsyncRenameDirWithProfileSummary(Executor executor, String destFilePath, Runnable runWhenPathNotExist) { fs.asyncRenameDir(executor, renameFileFutures, cancelled, origFilePath, destFilePath, runWhenPathNotExist); - if (summaryProfile != null) { - summaryProfile.incRenameDirCnt(); - } + summaryProfile.ifPresent(SummaryProfile::incRenameDirCnt); } } From 3485f361615facfcf3aa230f1d05900995aeb938 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Wed, 10 Apr 2024 22:34:44 +0800 Subject: [PATCH 4/5] fix --- .../plans/commands/insert/HiveInsertExecutor.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java index d8d8fb684ea54c..116a04215d8eb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java @@ -20,6 +20,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.UserException; +import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HMSTransaction; @@ -54,6 +55,7 @@ public class HiveInsertExecutor extends AbstractInsertExecutor { private TransactionStatus txnStatus = TransactionStatus.ABORTED; private final TransactionManager transactionManager; private final String catalogName; + private Optional summaryProfile = Optional.empty(); /** * constructor @@ -64,6 +66,10 @@ public HiveInsertExecutor(ConnectContext ctx, HMSExternalTable table, super(ctx, table, labelName, planner, insertCtx); catalogName = table.getCatalog().getName(); transactionManager = table.getCatalog().getTransactionManager(); + + if (ConnectContext.get().getExecutor() != null) { + summaryProfile = Optional.of(ConnectContext.get().getExecutor().getSummaryProfile()); + } } public long getTxnId() { @@ -103,13 +109,9 @@ protected void onComplete() throws UserException { String dbName = ((HMSExternalTable) table).getDbName(); String tbName = table.getName(); transaction.finishInsertTable(dbName, tbName); - if (ConnectContext.get().getExecutor() != null) { - ConnectContext.get().getExecutor().getSummaryProfile().setTransactionBeginTime(TransactionType.HMS); - } + summaryProfile.ifPresent(profile -> profile.setTransactionBeginTime(TransactionType.HMS)); transactionManager.commit(txnId); - if (ConnectContext.get().getExecutor() != null) { - ConnectContext.get().getExecutor().getSummaryProfile().setTransactionEndTime(); - } + summaryProfile.ifPresent(SummaryProfile::setTransactionEndTime); txnStatus = TransactionStatus.COMMITTED; Env.getCurrentEnv().getCatalogMgr().refreshExternalTable( dbName, From e1cf19a8fbb27ba93ccf2c9caf511ebd91179680 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Thu, 11 Apr 2024 10:07:27 +0800 Subject: [PATCH 5/5] fix --- .../java/org/apache/doris/datasource/hive/HmsCommitTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java index 4ec6ca84c525b1..a5a85d789e34d8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.datasource.HMSCachedClientTest; import org.apache.doris.fs.LocalDfsFileSystem; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.THiveLocationParams; import org.apache.doris.thrift.THivePartitionUpdate; import org.apache.doris.thrift.TUpdateMode; @@ -68,6 +69,10 @@ public static void beforeClass() throws Throwable { writeLocation = "file://" + writePath.toAbsolutePath() + "/"; createTestHiveCatalog(); createTestHiveDatabase(); + + // context + ConnectContext connectContext = new ConnectContext(); + connectContext.setThreadLocalInfo(); } @AfterClass