diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 9d265e87f5e73e..db357607041166 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -331,7 +331,7 @@ std::vector StorageEngine::get_stores() { template std::vector StorageEngine::get_stores(); template std::vector StorageEngine::get_stores(); -OLAPStatus StorageEngine::get_all_data_dir_info(vector* data_dir_infos, +OLAPStatus StorageEngine::get_all_data_dir_info(std::vector* data_dir_infos, bool need_update) { OLAPStatus res = OLAP_SUCCESS; data_dir_infos->clear(); @@ -377,6 +377,20 @@ OLAPStatus StorageEngine::get_all_data_dir_info(vector* data_dir_in return res; } +int64_t StorageEngine::get_file_or_directory_size(std::filesystem::path file_path) { + if (!std::filesystem::exists(file_path)) { + return 0; + } + if (!std::filesystem::is_directory(file_path)) { + return std::filesystem::file_size(file_path); + } + int64_t sum_size = 0; + for (const auto& it : std::filesystem::directory_iterator(file_path)) { + sum_size += get_file_or_directory_size(it.path()); + } + return sum_size; +} + void StorageEngine::_start_disk_stat_monitor() { for (auto& it : _store_map) { it.second->health_check(); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 1162260fc2038f..feb2b1f1d8f930 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -100,6 +100,8 @@ class StorageEngine { // @brief 获取所有root_path信息 OLAPStatus get_all_data_dir_info(std::vector* data_dir_infos, bool need_update); + int64_t get_file_or_directory_size(std::filesystem::path file_path); + // get root path for creating tablet. The returned vector of root path should be random, // for avoiding that all the tablet would be deployed one disk. std::vector get_stores_for_create_tablet(TStorageMedium::type storage_medium); diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index f4e4fd0b979a3d..87ad432d1c554d 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -210,6 +210,40 @@ void BackendService::get_tablet_stat(TTabletStatResult& result) { StorageEngine::instance()->tablet_manager()->get_tablet_stat(&result); } +int64_t BackendService::get_trash_used_capacity() { + int64_t result = 0; + + std::vector data_dir_infos; + StorageEngine::instance()->get_all_data_dir_info(&data_dir_infos, false /*do not update */); + + for (const auto& root_path_info : data_dir_infos) { + std::string lhs_trash_path = root_path_info.path + TRASH_PREFIX; + std::filesystem::path trash_path(lhs_trash_path); + result += StorageEngine::instance()->get_file_or_directory_size(trash_path); + } + return result; +} + +void BackendService::get_disk_trash_used_capacity(std::vector& diskTrashInfos) { + std::vector data_dir_infos; + StorageEngine::instance()->get_all_data_dir_info(&data_dir_infos, false /*do not update */); + + for (const auto& root_path_info : data_dir_infos) { + TDiskTrashInfo diskTrashInfo; + + diskTrashInfo.__set_root_path(root_path_info.path); + + diskTrashInfo.__set_state(root_path_info.is_used ? "ONLINE" : "OFFLINE"); + + std::string lhs_trash_path = root_path_info.path + TRASH_PREFIX; + std::filesystem::path trash_path(lhs_trash_path); + diskTrashInfo.__set_trash_used_capacity( + StorageEngine::instance()->get_file_or_directory_size(trash_path)); + + diskTrashInfos.push_back(diskTrashInfo); + } +} + void BackendService::submit_routine_load_task(TStatus& t_status, const std::vector& tasks) { for (auto& task : tasks) { diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index 166a1366d82b04..1464fe46f6269b 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -61,6 +61,7 @@ class TQueryOptions; class TExportTaskRequest; class TExportStatusResult; class TStreamLoadRecordResult; +class TDiskTrashInfo; // This class just forward rpc for actual handler // make this class because we can bind multiple service on single point @@ -129,6 +130,10 @@ class BackendService : public BackendServiceIf { virtual void get_tablet_stat(TTabletStatResult& result) override; + virtual int64_t get_trash_used_capacity() override; + + virtual void get_disk_trash_used_capacity(std::vector& diskTrashInfos) override; + virtual void submit_routine_load_task(TStatus& t_status, const std::vector& tasks) override; @@ -141,7 +146,8 @@ class BackendService : public BackendServiceIf { // used for external service, close some context and release resource related with this context virtual void close_scanner(TScanCloseResult& result_, const TScanCloseParams& params); - virtual void get_stream_load_record(TStreamLoadRecordResult& result, const int64_t last_stream_record_time) override; + virtual void get_stream_load_record(TStreamLoadRecordResult& result, + const int64_t last_stream_record_time) override; private: Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params); diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js index bf40bd33352d69..544711140c3259 100644 --- a/docs/.vuepress/sidebar/en.js +++ b/docs/.vuepress/sidebar/en.js @@ -427,6 +427,7 @@ module.exports = [ "SHOW MIGRATIONS", "SHOW PLUGINS", "SHOW TABLE STATUS", + "SHOW TRASH", "UNINSTALL PLUGIN", ], }, diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js index e9ef97be8fb541..6edfbf318ebd3a 100644 --- a/docs/.vuepress/sidebar/zh-CN.js +++ b/docs/.vuepress/sidebar/zh-CN.js @@ -432,6 +432,7 @@ module.exports = [ "SHOW MIGRATIONS", "SHOW PLUGINS", "SHOW TABLE STATUS", + "SHOW TRASH", "UNINSTALL PLUGIN", ], }, diff --git a/docs/en/sql-reference/sql-statements/Administration/SHOW TRASH.md b/docs/en/sql-reference/sql-statements/Administration/SHOW TRASH.md new file mode 100644 index 00000000000000..b1081d524fb5e1 --- /dev/null +++ b/docs/en/sql-reference/sql-statements/Administration/SHOW TRASH.md @@ -0,0 +1,53 @@ +--- +{ + "title": "SHOW TRASH", + "language": "en" +} +--- + + + +# SHOW TRASH +## description + +This statement is used to view trash used capacity on some backends. + + Syntax: + + SHOW TRASH [ON "BackendHost:BackendHeartBeatPort"]; + + Explain: + + 1. Backend The format is BackendHost:BackendHeartBeatPort of the node. + 2. TrashUsedCapacity Indicates that the trash data of the node occupies space. + +## example + + 1. View the space occupied by trash data of all be nodes. + + SHOW TRASH; + + 2. Check the space occupied by trash data of '192.168.0.1:9050'(The specific disk information will be displayed). + + SHOW TRASH ON "192.168.0.1:9050"; + +## keyword + SHOW, TRASH + diff --git a/docs/zh-CN/sql-reference/sql-statements/Administration/SHOW TRASH.md b/docs/zh-CN/sql-reference/sql-statements/Administration/SHOW TRASH.md new file mode 100644 index 00000000000000..5f2d3dfb7874ea --- /dev/null +++ b/docs/zh-CN/sql-reference/sql-statements/Administration/SHOW TRASH.md @@ -0,0 +1,49 @@ +--- +{ + "title": "SHOW TRASH", + "language": "zh-CN" +} +--- + + + +# SHOW TRASH +## description + 该语句用于查看 backend 内的垃圾数据占用空间。 + 语法: + SHOW TRASH [ON BackendHost:BackendHeartBeatPort]; + + 说明: + 1. Backend 格式为该节点的BackendHost:BackendHeartBeatPort。 + 2. TrashUsedCapacity 表示该节点垃圾数据占用空间。 + +## example + + 1. 查看所有be节点的垃圾数据占用空间。 + + SHOW TRASH; + + 2. 查看'192.168.0.1:9050'的垃圾数据占用空间(会显示具体磁盘信息)。 + + SHOW TRASH ON "192.168.0.1:9050"; + +## keyword + SHOW, TRASH + diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index c4543364574e96..984ab65dc115ea 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -264,7 +264,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_A KW_S3, KW_SCHEMA, KW_SCHEMAS, KW_SECOND, KW_SELECT, KW_SEMI, KW_SERIALIZABLE, KW_SESSION, KW_SET, KW_SETS, KW_SET_VAR, KW_SHOW, KW_SIGNED, KW_SMALLINT, KW_SNAPSHOT, KW_SONAME, KW_SPLIT, KW_START, KW_STATUS, KW_STOP, KW_STORAGE, KW_STREAM, KW_STRING, KW_STRUCT, KW_SUM, KW_SUPERUSER, KW_SYNC, KW_SYSTEM, - KW_TABLE, KW_TABLES, KW_TABLET, KW_TASK, KW_TEMPORARY, KW_TERMINATED, KW_THAN, KW_TIME, KW_THEN, KW_TIMESTAMP, KW_TINYINT, + KW_TABLE, KW_TABLES, KW_TABLET, KW_TASK, KW_TEMPORARY, KW_TERMINATED, KW_THAN, KW_TIME, KW_THEN, KW_TIMESTAMP, KW_TINYINT,KW_TRASH, KW_TO, KW_TRANSACTION, KW_TRIGGERS, KW_TRIM, KW_TRUE, KW_TRUNCATE, KW_TYPE, KW_TYPES, KW_UNCOMMITTED, KW_UNBOUNDED, KW_UNION, KW_UNIQUE, KW_UNSIGNED, KW_USE, KW_USER, KW_USING, KW_UNINSTALL, KW_VALUE, KW_VALUES, KW_VARCHAR, KW_VARIABLES, KW_VERBOSE, KW_VIEW, @@ -2554,6 +2554,14 @@ show_param ::= {: RESULT = new ShowBackendsStmt(); :} + | KW_TRASH KW_ON STRING_LITERAL:backend + {: + RESULT = new ShowTrashDiskStmt(backend); + :} + | KW_TRASH + {: + RESULT = new ShowTrashStmt(); + :} | KW_FRONTENDS {: RESULT = new ShowFrontendsStmt(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashDiskStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashDiskStmt.java new file mode 100644 index 00000000000000..3cca857ed7ad6a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashDiskStmt.java @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.proc.TrashProcNode; +import org.apache.doris.qe.ShowResultSetMetaData; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Backend; +import org.apache.doris.mysql.privilege.PrivPredicate; + +import com.google.common.collect.ImmutableMap; + +public class ShowTrashDiskStmt extends ShowStmt { + + private Backend backend; + + public ShowTrashDiskStmt(String backendQuery) { + ImmutableMap backendsInfo = Catalog.getCurrentSystemInfo().getIdToBackend(); + for (Backend backend : backendsInfo.values()) { + String backendStr = String.valueOf(backend.getHost()) + ":" + String.valueOf(backend.getHeartbeatPort()); + if (backendQuery.equals(backendStr)) { + this.backend = backend; + break; + } + } + } + + public Backend getBackend() { + return backend; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN) + && !Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), + PrivPredicate.OPERATOR)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN/OPERATOR"); + } + } + + @Override + public ShowResultSetMetaData getMetaData() { + ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); + for (String title : TrashProcNode.TITLE_NAMES) { + builder.addColumn(new Column(title, ScalarType.createVarchar(30))); + } + return builder.build(); + } + + @Override + public RedirectStatus getRedirectStatus() { + return RedirectStatus.NO_FORWARD; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashStmt.java new file mode 100644 index 00000000000000..a2c744428a2b70 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashStmt.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.proc.TrashProcNode; +import org.apache.doris.qe.ShowResultSetMetaData; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Backend; +import org.apache.doris.mysql.privilege.PrivPredicate; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import java.util.List; + +public class ShowTrashStmt extends ShowStmt { + private List backends = Lists.newArrayList(); + + public ShowTrashStmt() { + ImmutableMap backendsInfo = Catalog.getCurrentSystemInfo().getIdToBackend(); + for (Backend backend : backendsInfo.values()) { + this.backends.add(backend); + } + } + + public List getBackends() { + return backends; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN) + && !Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), + PrivPredicate.OPERATOR)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN/OPERATOR"); + } + } + + @Override + public ShowResultSetMetaData getMetaData() { + ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); + for (String title : TrashProcNode.TITLE_NAMES) { + builder.addColumn(new Column(title, ScalarType.createVarchar(30))); + } + return builder.build(); + } + + @Override + public RedirectStatus getRedirectStatus() { + return RedirectStatus.NO_FORWARD; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java index c93b137917fd42..247e46e80b8c67 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java @@ -45,6 +45,7 @@ private ProcService() { root.register("resources", Catalog.getCurrentCatalog().getResourceMgr().getProcNode()); root.register("load_error_hub", new LoadErrorHubProcNode(Catalog.getCurrentCatalog())); root.register("transactions", new TransDbProcDir()); + root.register("trash", new TrashProcDir()); root.register("monitor", new MonitorProcDir()); root.register("current_queries", new CurrentQueryStatisticsProcDir()); root.register("current_backend_instances", new CurrentQueryBackendInstanceProcDir()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TrashProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TrashProcDir.java new file mode 100644 index 00000000000000..e1b0b859b3376e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TrashProcDir.java @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.proc; + +import org.apache.doris.thrift.BackendService; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.system.Backend; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Pair; +import org.apache.doris.common.ClientPool; +import org.apache.doris.common.util.DebugUtil; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +/* + * Show trash + * SHOW PROC '/trash' + * SHOW PROC '/trash/backendId' + */ +public class TrashProcDir implements ProcDirInterface { + private static final Logger LOG = LogManager.getLogger(TrashProcNode.class); + + public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder().add("BackendId") + .add("Backend").add("TrashUsedCapacity").build(); + + private List backends = Lists.newArrayList(); + + public TrashProcDir() { + ImmutableMap backendsInfo = Catalog.getCurrentSystemInfo().getIdToBackend(); + for (Backend backend : backendsInfo.values()) { + this.backends.add(backend); + } + } + + @Override + public ProcResult fetchResult() { + BaseProcResult result = new BaseProcResult(); + result.setNames(TITLE_NAMES); + + List> infos = Lists.newArrayList(); + + getTrashInfo(backends, infos); + + for (List info : infos) { + result.addRow(info); + } + + return result; + } + + public static void getTrashInfo(List backends, List> infos) { + + for (Backend backend : backends) { + BackendService.Client client = null; + TNetworkAddress address = null; + Long trashUsedCapacityB = null; + boolean ok = false; + try { + long start = System.currentTimeMillis(); + address = new TNetworkAddress(backend.getHost(), backend.getBePort()); + client = ClientPool.backendPool.borrowObject(address); + trashUsedCapacityB = client.getTrashUsedCapacity(); + ok = true; + } catch (Exception e) { + LOG.warn("task exec error. backend[{}]", backend.getId(), e); + } finally { + if (ok) { + ClientPool.backendPool.returnObject(address, client); + } else { + ClientPool.backendPool.invalidateObject(address, client); + } + } + + List backendInfo = new ArrayList(); + backendInfo.add(String.valueOf(backend.getId())); + backendInfo.add(backend.getHost() + ":" + String.valueOf(backend.getHeartbeatPort())); + if (trashUsedCapacityB != null) { + Pair trashUsedCapacity = DebugUtil.getByteUint(trashUsedCapacityB); + backendInfo.add(DebugUtil.DECIMAL_FORMAT_SCALE_3.format(trashUsedCapacity.first) + " " + + trashUsedCapacity.second); + } else { + backendInfo.add(""); + } + infos.add(backendInfo); + } + } + + @Override + public boolean register(String name, ProcNodeInterface node) { + return false; + } + + @Override + public ProcNodeInterface lookup(String backendHostAndPort) throws AnalysisException { + if (Strings.isNullOrEmpty(backendHostAndPort)) { + throw new AnalysisException("BackendHost:HeartBeatPort is null"); + } + String backendHost; + int backendHeartBeatPort; + try { + backendHost = backendHostAndPort.split(":")[0]; + backendHeartBeatPort = Integer.parseInt(backendHostAndPort.split(":")[1]); + } catch (NumberFormatException e) { + throw new AnalysisException("Invalid backend format: " + backendHostAndPort); + } + Backend backend = Catalog.getCurrentSystemInfo().getBackendWithHeartbeatPort(backendHost, backendHeartBeatPort); + if (backend == null) { + throw new AnalysisException("Backend[" + backendHostAndPort + "] does not exist."); + } + + return new TrashProcNode(backend); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TrashProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TrashProcNode.java new file mode 100644 index 00000000000000..d01e3ccceca55b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TrashProcNode.java @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.proc; + +import org.apache.doris.thrift.BackendService; +import org.apache.doris.thrift.TDiskTrashInfo; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.system.Backend; +import org.apache.doris.common.Pair; +import org.apache.doris.common.ClientPool; +import org.apache.doris.common.util.DebugUtil; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +public class TrashProcNode implements ProcNodeInterface { + private static final Logger LOG = LogManager.getLogger(TrashProcNode.class); + + public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder().add("RootPath") + .add("State").add("TrashUsedCapacity").build(); + + private Backend backend; + + public TrashProcNode(Backend backend) { + this.backend = backend; + } + + @Override + public ProcResult fetchResult() { + Preconditions.checkNotNull(backend); + + BaseProcResult result = new BaseProcResult(); + result.setNames(TITLE_NAMES); + + List> infos = Lists.newArrayList(); + + getTrashDiskInfo(backend, infos); + + for (List info : infos) { + result.addRow(info); + } + + return result; + } + + public static void getTrashDiskInfo(Backend backend, List> infos) { + + BackendService.Client client = null; + TNetworkAddress address = null; + boolean ok = false; + List diskTrashInfos = null; + try { + address = new TNetworkAddress(backend.getHost(), backend.getBePort()); + client = ClientPool.backendPool.borrowObject(address); + diskTrashInfos = client.getDiskTrashUsedCapacity(); + ok = true; + } catch (Exception e) { + LOG.warn("task exec error. backend[{}]", backend.getId(), e); + } finally { + if (ok) { + ClientPool.backendPool.returnObject(address, client); + } else { + ClientPool.backendPool.invalidateObject(address, client); + } + } + + if (diskTrashInfos == null) { + return; + } + for (TDiskTrashInfo diskTrashInfo : diskTrashInfos) { + List diskInfo = new ArrayList(); + + diskInfo.add(diskTrashInfo.getRootPath()); + + diskInfo.add(diskTrashInfo.getState()); + + long trashUsedCapacityB = diskTrashInfo.getTrashUsedCapacity(); + Pair trashUsedCapacity = DebugUtil.getByteUint(trashUsedCapacityB); + diskInfo.add( + DebugUtil.DECIMAL_FORMAT_SCALE_3.format(trashUsedCapacity.first) + " " + trashUsedCapacity.second); + + infos.add(diskInfo); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java index 8d4b960dad251e..41d911820ee471 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java @@ -27,7 +27,7 @@ import java.util.UUID; public class DebugUtil { - public static final DecimalFormat DECIMAL_FORMAT_SCALE_3 = new DecimalFormat("#.000"); + public static final DecimalFormat DECIMAL_FORMAT_SCALE_3 = new DecimalFormat("0.000"); public static int THOUSAND = 1000; public static int MILLION = 1000 * THOUSAND; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index e526b0001f9559..07025ac831063a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -71,6 +71,8 @@ import org.apache.doris.analysis.ShowTableStmt; import org.apache.doris.analysis.ShowTabletStmt; import org.apache.doris.analysis.ShowTransactionStmt; +import org.apache.doris.analysis.ShowTrashStmt; +import org.apache.doris.analysis.ShowTrashDiskStmt; import org.apache.doris.analysis.ShowUserPropertyStmt; import org.apache.doris.analysis.ShowVariablesStmt; import org.apache.doris.analysis.ShowViewStmt; @@ -117,6 +119,8 @@ import org.apache.doris.common.proc.RollupProcDir; import org.apache.doris.common.proc.SchemaChangeProcDir; import org.apache.doris.common.proc.TabletsProcDir; +import org.apache.doris.common.proc.TrashProcDir; +import org.apache.doris.common.proc.TrashProcNode; import org.apache.doris.common.profile.ProfileTreeNode; import org.apache.doris.common.profile.ProfileTreePrinter; import org.apache.doris.common.util.ListComparator; @@ -272,6 +276,10 @@ public ShowResultSet execute() throws AnalysisException { handleShowGrants(); } else if (stmt instanceof ShowRolesStmt) { handleShowRoles(); + } else if (stmt instanceof ShowTrashStmt) { + handleShowTrash(); + } else if (stmt instanceof ShowTrashDiskStmt) { + handleShowTrashDisk(); } else if (stmt instanceof AdminShowReplicaStatusStmt) { handleAdminShowTabletStatus(); } else if (stmt instanceof AdminShowReplicaDistributionStmt) { @@ -1690,6 +1698,20 @@ private void handleShowRoles() { List> infos = Catalog.getCurrentCatalog().getAuth().getRoleInfo(); resultSet = new ShowResultSet(showStmt.getMetaData(), infos); } + + private void handleShowTrash() { + ShowTrashStmt showStmt = (ShowTrashStmt) stmt; + List> infos = Lists.newArrayList(); + TrashProcDir.getTrashInfo(showStmt.getBackends(), infos); + resultSet = new ShowResultSet(showStmt.getMetaData(), infos); + } + + private void handleShowTrashDisk() { + ShowTrashDiskStmt showStmt = (ShowTrashDiskStmt) stmt; + List> infos = Lists.newArrayList(); + TrashProcNode.getTrashDiskInfo(showStmt.getBackend(), infos); + resultSet = new ShowResultSet(showStmt.getMetaData(), infos); + } private void handleAdminShowTabletStatus() throws AnalysisException { AdminShowReplicaStatusStmt showStmt = (AdminShowReplicaStatusStmt) stmt; diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index 53c6c703c2693f..94a0251f74bb9f 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -365,6 +365,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("tinyint", new Integer(SqlParserSymbols.KW_TINYINT)); keywordMap.put("to", new Integer(SqlParserSymbols.KW_TO)); keywordMap.put("transaction", new Integer(SqlParserSymbols.KW_TRANSACTION)); + keywordMap.put("trash", new Integer(SqlParserSymbols.KW_TRASH)); keywordMap.put("triggers", new Integer(SqlParserSymbols.KW_TRIGGERS)); keywordMap.put("trim", new Integer(SqlParserSymbols.KW_TRIM)); keywordMap.put("true", new Integer(SqlParserSymbols.KW_TRUE)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java index b8a6152151e35b..e134e812b9377e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java @@ -25,6 +25,7 @@ import org.apache.doris.thrift.TCancelPlanFragmentParams; import org.apache.doris.thrift.TCancelPlanFragmentResult; import org.apache.doris.thrift.TDeleteEtlFilesRequest; +import org.apache.doris.thrift.TDiskTrashInfo; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TExecPlanFragmentResult; import org.apache.doris.thrift.TExportStatusResult; @@ -193,6 +194,18 @@ public TStatus eraseExportTask(TUniqueId task_id) throws TException { return null; } + @Override + public long getTrashUsedCapacity() throws TException { + // TODO Auto-generated method stub + return 0l; + } + + @Override + public List getDiskTrashUsedCapacity() throws TException { + // TODO Auto-generated method stub + return null; + } + @Override public TTabletStatResult getTabletStat() throws TException { // TODO Auto-generated method stub diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index a65461d89ac0d2..f14d59b3e4adbe 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -33,6 +33,7 @@ import org.apache.doris.thrift.TCancelPlanFragmentParams; import org.apache.doris.thrift.TCancelPlanFragmentResult; import org.apache.doris.thrift.TDeleteEtlFilesRequest; +import org.apache.doris.thrift.TDiskTrashInfo; import org.apache.doris.thrift.TEtlState; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TExecPlanFragmentResult; @@ -244,6 +245,16 @@ public TStatus eraseExportTask(TUniqueId task_id) throws TException { return new TStatus(TStatusCode.OK); } + @Override + public long getTrashUsedCapacity() throws TException { + return 0l; + } + + @Override + public List getDiskTrashUsedCapacity() throws TException { + return null; + } + @Override public TTabletStatResult getTabletStat() throws TException { return new TTabletStatResult(Maps.newHashMap()); diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 200a8ead956534..1288a2bed21d56 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -105,6 +105,12 @@ struct TStreamLoadRecordResult { 1: required map stream_load_record } +struct TDiskTrashInfo { + 1: required string root_path + 2: required string state + 3: required i64 trash_used_capacity +} + service BackendService { // Called by coord to start asynchronous execution of plan fragment in backend. // Returns as soon as all incoming data streams have been set up. @@ -147,6 +153,10 @@ service BackendService { Status.TStatus erase_export_task(1:Types.TUniqueId task_id); TTabletStatResult get_tablet_stat(); + + i64 get_trash_used_capacity(); + + list get_disk_trash_used_capacity(); Status.TStatus submit_routine_load_task(1:list tasks);