From 64f1e1a86df1e945c58ec1c42ebb4294fb26fb94 Mon Sep 17 00:00:00 2001 From: morningman Date: Sun, 25 Aug 2019 20:57:45 +0800 Subject: [PATCH 1/8] first commit --- be/src/agent/task_worker_pool.cpp | 18 ++++++++- .../doris/analysis/ShowBackendsStmt.java | 9 +++++ .../org/apache/doris/backup/RestoreJob.java | 9 +++++ .../org/apache/doris/catalog/DiskInfo.java | 12 ++++++ .../java/org/apache/doris/catalog/Tablet.java | 20 ++++++++++ .../java/org/apache/doris/common/Status.java | 7 ++-- .../org/apache/doris/master/MasterImpl.java | 15 +++++-- .../apache/doris/planner/OlapTableSink.java | 20 ++++++++-- .../java/org/apache/doris/system/Backend.java | 29 ++++++++++++-- .../doris/system/SystemInfoService.java | 40 +++++++++++++++++++ 10 files changed, 161 insertions(+), 18 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index ba4d7677c65d78..9c17c412817442 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -433,8 +433,8 @@ void* TaskWorkerPool::_create_tablet_worker_thread_callback(void* arg_this) { vector error_msgs; TStatus task_status; - OLAPStatus create_status = - worker_pool_this->_env->storage_engine()->create_tablet(create_tablet_req); + std::vector finish_tablet_infos; + OLAPStatus create_status = worker_pool_this->_env->storage_engine()->create_tablet(create_tablet_req); if (create_status != OLAPStatus::OLAP_SUCCESS) { OLAP_LOG_WARNING("create table failed. status: %d, signature: %ld", create_status, agent_task_req.signature); @@ -442,12 +442,26 @@ void* TaskWorkerPool::_create_tablet_worker_thread_callback(void* arg_this) { status_code = TStatusCode::RUNTIME_ERROR; } else { ++_s_report_version; + // get path hash of the created tablet + TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( + create_tablet_req.tablet_id, create_tablet_req.tablet_schema.schema_hash); + DCHECK(tablet != nullptr); + TTabletInfo tablet_info; + tablet_info.tablet_id = tablet->table_id(); + tablet_info.schema_hash = tablet->schema_hash(); + tablet_info.version = create_tablet_req.version; + tablet_info.version_hash = create_tablet_req.version_hash; + tablet_info.row_count = 0; + tablet_info.data_size = 0; + tablet_info.__set_path_hash(tablet->data_dir()->path_hash()); + finish_tablet_infos.push_back(tablet_info); } task_status.__set_status_code(status_code); task_status.__set_error_msgs(error_msgs); TFinishTaskRequest finish_task_request; + finish_task_request.__set_finish_tablet_infos(finish_tablet_infos); finish_task_request.__set_backend(worker_pool_this->_backend); finish_task_request.__set_report_version(_s_report_version); finish_task_request.__set_task_type(agent_task_req.task_type); diff --git a/fe/src/main/java/org/apache/doris/analysis/ShowBackendsStmt.java b/fe/src/main/java/org/apache/doris/analysis/ShowBackendsStmt.java index fc6365dc6995e2..2c894a42886292 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ShowBackendsStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/ShowBackendsStmt.java @@ -54,5 +54,14 @@ public ShowResultSetMetaData getMetaData() { } return builder.build(); } + + @Override + public RedirectStatus getRedirectStatus() { + if (ConnectContext.get().getSessionVariable().getForwardToMaster()) { + return RedirectStatus.FORWARD_NO_SYNC; + } else { + return RedirectStatus.NO_FORWARD; + } + } } diff --git a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java index 83642a2c88f93c..4f78e53ff9f4c7 100644 --- a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -755,6 +755,7 @@ private void checkAndPrepareMeta() { unfinishedSignatureToId.clear(); taskProgress.clear(); taskErrMsg.clear(); + Map pathBeMap = Maps.newHashMap(); batchTask = new AgentBatchTask(); db.readLock(); try { @@ -773,10 +774,18 @@ private void checkAndPrepareMeta() { true /* is restore task*/); batchTask.addTask(task); unfinishedSignatureToId.put(signature, tablet.getId()); + pathBeMap.put(replica.getPathHash(), replica.getBackendId()); } } finally { db.readUnlock(); } + + // check disk capacity + org.apache.doris.common.Status st = Catalog.getCurrentSystemInfo().checkExceedDiskCapacityLimit(pathBeMap, true); + if (!st.ok()) { + status = new Status(ErrCode.COMMON_ERROR, st.getErrorMsg()); + return; + } // send tasks for (AgentTask task : batchTask.getAllTasks()) { diff --git a/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java b/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java index c4f4162db651f1..1c013f1b333b3c 100644 --- a/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java +++ b/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java @@ -17,6 +17,8 @@ package org.apache.doris.catalog; +import org.apache.doris.clone.RootPathLoadStatistic; +import org.apache.doris.common.Config; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; @@ -123,6 +125,16 @@ public void setStorageMedium(TStorageMedium storageMedium) { this.storageMedium = storageMedium; } + public boolean exceedLimit(boolean highWaterMark) { + if (highWaterMark) { + return diskAvailableCapacityB < RootPathLoadStatistic.MIN_LEFT_CAPACITY_BYTES_LIMIT && + (double) (totalCapacityB-diskAvailableCapacityB) / totalCapacityB > RootPathLoadStatistic.MAX_USAGE_PERCENT_LIMIT; + } else { + return diskAvailableCapacityB < Config.storage_min_left_capacity_bytes || + (double) (totalCapacityB-diskAvailableCapacityB) / totalCapacityB > Config.storage_high_watermark_usage_percent; + } + } + @Override public String toString() { return "DiskInfo [rootPath=" + rootPath + "(" + pathHash + "), totalCapacityB=" + totalCapacityB diff --git a/fe/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/src/main/java/org/apache/doris/catalog/Tablet.java index f3d73f83a011d7..fa4fa8c497c6ee 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/src/main/java/org/apache/doris/catalog/Tablet.java @@ -27,6 +27,7 @@ import org.apache.doris.system.SystemInfoService; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; @@ -39,6 +40,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -184,6 +186,24 @@ public List getNormalReplicaBackendIds() { return beIds; } + // return map of (path hash -> BE id) of normal replicas + public Map getNormalReplicaBackendPathMap() { + Map map = Maps.newHashMap(); + SystemInfoService infoService = Catalog.getCurrentSystemInfo(); + for (Replica replica : replicas) { + if (replica.isBad()) { + continue; + } + + ReplicaState state = replica.getState(); + if (infoService.checkBackendAlive(replica.getBackendId()) + && (state == ReplicaState.NORMAL || state == ReplicaState.SCHEMA_CHANGE)) { + map.put(replica.getPathHash(), replica.getBackendId()); + } + } + return map; + } + // for query public void getQueryableReplicas(List allQuerableReplica, List localReplicas, long visibleVersion, long visibleVersionHash, long localBeId, int schemaHash) { diff --git a/fe/src/main/java/org/apache/doris/common/Status.java b/fe/src/main/java/org/apache/doris/common/Status.java index 13107b3233c63c..93b655e7012f1e 100644 --- a/fe/src/main/java/org/apache/doris/common/Status.java +++ b/fe/src/main/java/org/apache/doris/common/Status.java @@ -22,10 +22,9 @@ import org.apache.doris.thrift.TStatusCode; public class Status { - public static final Status OK = new Status(); - public static final Status CANCELLED = new Status(TStatusCode.CANCELLED, "Cancelled"); - public static final Status THRIFT_RPC_ERROR = - new Status(TStatusCode.THRIFT_RPC_ERROR, "Thrift RPC failed"); + public static final Status OK = new Status(); + public static final Status CANCELLED = new Status(TStatusCode.CANCELLED, "Cancelled"); + public static final Status THRIFT_RPC_ERROR = new Status(TStatusCode.THRIFT_RPC_ERROR, "Thrift RPC failed"); public TStatusCode getErrorCode() { return errorCode; diff --git a/fe/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/src/main/java/org/apache/doris/master/MasterImpl.java index 3da5a9ac74dbe0..a94b4e22485c82 100644 --- a/fe/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/src/main/java/org/apache/doris/master/MasterImpl.java @@ -143,7 +143,7 @@ public TMasterResult finishTask(TFinishTaskRequest request) throws TException { switch (taskType) { case CREATE: Preconditions.checkState(request.isSetReport_version()); - finishCreateReplica(task, request.getReport_version()); + finishCreateReplica(task, request); break; case PUSH: checkHasTabletInfo(request); @@ -224,20 +224,27 @@ private void checkHasTabletInfo(TFinishTaskRequest request) throws Exception { } } - private void finishCreateReplica(AgentTask task, long reportVersion) { + private void finishCreateReplica(AgentTask task, TFinishTaskRequest request) { // if we get here, this task will be removed from AgentTaskQueue for certain. // because in this function, the only problem that cause failure is meta missing. // and if meta is missing, we no longer need to resend this task CreateReplicaTask createReplicaTask = (CreateReplicaTask) task; long tabletId = createReplicaTask.getTabletId(); + + if (request.isSetFinish_tablet_infos()) { + Replica replica = Catalog.getCurrentInvertedIndex().getReplica(createReplicaTask.getTabletId(), + createReplicaTask.getBackendId()); + replica.setPathHash(request.getFinish_tablet_infos().get(0).getPath_hash()); + } // this should be called before 'countDownLatch()' - Catalog.getCurrentSystemInfo().updateBackendReportVersion(task.getBackendId(), reportVersion, task.getDbId()); + Catalog.getCurrentSystemInfo().updateBackendReportVersion(task.getBackendId(), request.getReport_version(), + task.getDbId()); createReplicaTask.countDownLatch(task.getBackendId(), task.getSignature()); LOG.debug("finish create replica. tablet id: {}, be: {}, report version: {}", - tabletId, task.getBackendId(), reportVersion); + tabletId, task.getBackendId(), request.getReport_version()); AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.CREATE, task.getSignature()); } diff --git a/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java index f69754afe6c9e5..465da2ac62412b 100644 --- a/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -31,8 +31,10 @@ import org.apache.doris.catalog.RangePartitionInfo; import org.apache.doris.catalog.Tablet; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; @@ -54,6 +56,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Range; import com.google.common.collect.Sets; @@ -292,20 +295,29 @@ private TOlapTablePartitionParam createPartition(long dbId, OlapTable table) thr private TOlapTableLocationParam createLocation(OlapTable table) throws UserException { TOlapTableLocationParam locationParam = new TOlapTableLocationParam(); + Map allPathBeMap = Maps.newHashMap(); for (Partition partition : table.getPartitions()) { int quorum = table.getPartitionInfo().getReplicationNum(partition.getId()) / 2 + 1; for (MaterializedIndex index : partition.getMaterializedIndices()) { // we should ensure the replica backend is alive // otherwise, there will be a 'unknown node id, id=xxx' error for stream load for (Tablet tablet : index.getTablets()) { - List beIds = tablet.getNormalReplicaBackendIds(); - if (beIds.size() < quorum) { - throw new UserException("tablet " + tablet.getId() + " has few replicas: " + beIds.size()); + Map pathBeMap = tablet.getNormalReplicaBackendPathMap(); + if (pathBeMap.size() < quorum) { + throw new UserException("tablet " + tablet.getId() + " has few replicas: " + pathBeMap.size()); } - locationParam.addToTablets(new TTabletLocation(tablet.getId(), beIds)); + locationParam.addToTablets(new TTabletLocation(tablet.getId(), Lists.newArrayList(pathBeMap.values()))); + allPathBeMap.putAll(pathBeMap); } } } + + // check if disk capacity reach limit + // this is for load process, so use high water mark to check + Status st = Catalog.getCurrentSystemInfo().checkExceedDiskCapacityLimit(allPathBeMap, true); + if (!st.ok()) { + throw new DdlException(st.getErrorMsg()); + } return locationParam; } diff --git a/fe/src/main/java/org/apache/doris/system/Backend.java b/fe/src/main/java/org/apache/doris/system/Backend.java index 63c46f7132c904..a39e605817918c 100644 --- a/fe/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/src/main/java/org/apache/doris/system/Backend.java @@ -28,6 +28,7 @@ import org.apache.doris.thrift.TDisk; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; @@ -36,11 +37,13 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; /** * This class extends the primary identifier of a Backend with ephemeral state, @@ -80,6 +83,10 @@ public enum BackendState { private AtomicReference> disksRef; private String heartbeatErrMsg = ""; + + // This is used for the first time we init pathHashToDishInfo in SystemInfoService. + // after init it, this variable is set to true. + private boolean initPathInfo = false; public Backend() { this.host = ""; @@ -338,9 +345,18 @@ public String getPathByPathHash(long pathHash) { } public void updateDisks(Map backendDisks) { - // update status or add new diskInfo + ImmutableMap disks = disksRef.get(); - Map newDisks = Maps.newHashMap(); + // The very first time to init the path info + if (!initPathInfo) { + initPathInfo = true; + Catalog.getCurrentSystemInfo().updatePathInfo(disks.values().stream().collect(Collectors.toList()), Lists.newArrayList()); + } + + // update status or add new diskInfo + Map newDiskInfos = Maps.newHashMap(); + List addedDisks = Lists.newArrayList(); + List removedDisks = Lists.newArrayList(); /* * set isChanged to true only if new disk is added or old disk is dropped. * we ignore the change of capacity, because capacity info is only used in master FE. @@ -356,10 +372,11 @@ public void updateDisks(Map backendDisks) { DiskInfo diskInfo = disks.get(rootPath); if (diskInfo == null) { diskInfo = new DiskInfo(rootPath); + addedDisks.add(diskInfo); isChanged = true; LOG.info("add new disk info. backendId: {}, rootPath: {}", id, rootPath); } - newDisks.put(rootPath, diskInfo); + newDiskInfos.put(rootPath, diskInfo); diskInfo.setTotalCapacityB(totalCapacityB); diskInfo.setDataUsedCapacityB(dataUsedCapacityB); @@ -388,6 +405,7 @@ public void updateDisks(Map backendDisks) { for (DiskInfo diskInfo : disks.values()) { String rootPath = diskInfo.getRootPath(); if (!backendDisks.containsKey(rootPath)) { + removedDisks.add(diskInfo); isChanged = true; LOG.warn("remove not exist rootPath. backendId: {}, rootPath: {}", id, rootPath); } @@ -395,10 +413,13 @@ public void updateDisks(Map backendDisks) { if (isChanged) { // update disksRef - disksRef.set(ImmutableMap.copyOf(newDisks)); + disksRef.set(ImmutableMap.copyOf(newDiskInfos)); + Catalog.getCurrentSystemInfo().updatePathInfo(addedDisks, removedDisks); // log disk changing Catalog.getInstance().getEditLog().logBackendStateChange(this); } + + } public static Backend read(DataInput in) throws IOException { diff --git a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java index 81688bc86a50f4..026e91a87b7861 100644 --- a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -19,13 +19,16 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DiskInfo; import org.apache.doris.cluster.Cluster; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.Pair; +import org.apache.doris.common.Status; import org.apache.doris.metric.MetricRepo; import org.apache.doris.system.Backend.BackendState; +import org.apache.doris.thrift.TStatusCode; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -73,6 +76,8 @@ public class SystemInfoService { private long lastBackendIdForCreation = -1; private long lastBackendIdForOther = -1; + private AtomicReference> pathHashToDishInfoRef; + // sort host backends list by num of backends, descending private static final Comparator> hostBackendsListComparator = new Comparator> (){ @Override @@ -92,6 +97,7 @@ public SystemInfoService() { lastBackendIdForCreationMap = new ConcurrentHashMap(); lastBackendIdForOtherMap = new ConcurrentHashMap(); + pathHashToDishInfoRef = new AtomicReference>(ImmutableMap.of()); } // for deploy manager @@ -1107,5 +1113,39 @@ public Set getClusterNames() { } return clusterNames; } + + /* + * Check if the specified disks' capacity has reached the limit. + * pathBeMap is (path hash -> BE id) + * If highWatermark is true, it will check with the high watermark threshold. + * + * return Status.OK if not reach the limit + */ + public Status checkExceedDiskCapacityLimit(Map pathBeMap, boolean highWatermark) { + ImmutableMap pathHashToDiskInfo = pathHashToDishInfoRef.get(); + for (Long pathHash : pathBeMap.keySet()) { + DiskInfo diskInfo = pathHashToDiskInfo.get(pathHash); + if (diskInfo != null && diskInfo.exceedLimit(highWatermark)) { + return new Status(TStatusCode.CANCELLED, + "disk " + pathHash + " on backend " + pathBeMap.get(pathHash) + " exceed limit usage"); + } + } + return Status.OK; + } + + // update the path info when disk report + // there is only one thread can update path info, so no need to worry about concurrency control + public void updatePathInfo(List addedDisks, List removedDisks) { + Map copiedPathInfos = Maps.newHashMap(pathHashToDishInfoRef.get()); + for (DiskInfo diskInfo : addedDisks) { + copiedPathInfos.put(diskInfo.getPathHash(), diskInfo); + } + for (DiskInfo diskInfo : removedDisks) { + copiedPathInfos.remove(diskInfo.getPathHash()); + } + ImmutableMap newPathInfos = ImmutableMap.copyOf(copiedPathInfos); + pathHashToDishInfoRef.set(newPathInfos); + LOG.debug("update path infos: {}", newPathInfos); + } } From f6624da6d0ed154a90dbc0fde417f073c6a57217 Mon Sep 17 00:00:00 2001 From: morningman Date: Sun, 25 Aug 2019 22:11:26 +0800 Subject: [PATCH 2/8] add config --- .../java/org/apache/doris/catalog/DiskInfo.java | 13 +++++++++---- .../doris/clone/RootPathLoadStatistic.java | 7 ++----- .../java/org/apache/doris/common/Config.java | 17 ++++++++++++++--- .../apache/doris/system/SystemInfoService.java | 6 +++--- 4 files changed, 28 insertions(+), 15 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java b/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java index 1c013f1b333b3c..1a6eb0905b9c13 100644 --- a/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java +++ b/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java @@ -125,10 +125,15 @@ public void setStorageMedium(TStorageMedium storageMedium) { this.storageMedium = storageMedium; } - public boolean exceedLimit(boolean highWaterMark) { - if (highWaterMark) { - return diskAvailableCapacityB < RootPathLoadStatistic.MIN_LEFT_CAPACITY_BYTES_LIMIT && - (double) (totalCapacityB-diskAvailableCapacityB) / totalCapacityB > RootPathLoadStatistic.MAX_USAGE_PERCENT_LIMIT; + /* + * Check if this disk's capacity reach the limit. Return true if yes. + * if floodStage is true, use floodStage threshold to check. + * floodStage threshold means a loosely limit, and we use 'AND' to give a more loosely limit. + */ + public boolean exceedLimit(boolean floodStage) { + if (floodStage) { + return diskAvailableCapacityB < Config.storage_flood_stage_left_capacity_bytes && + (double) (totalCapacityB-diskAvailableCapacityB) / totalCapacityB >Config.storage_flood_stage_usage_percent; } else { return diskAvailableCapacityB < Config.storage_min_left_capacity_bytes || (double) (totalCapacityB-diskAvailableCapacityB) / totalCapacityB > Config.storage_high_watermark_usage_percent; diff --git a/fe/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java b/fe/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java index 2c54d9b05f7602..105837cad612dd 100644 --- a/fe/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java +++ b/fe/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java @@ -24,9 +24,6 @@ import org.apache.doris.thrift.TStorageMedium; public class RootPathLoadStatistic implements Comparable { - // Even if for tablet recovery, we can not exceed these 2 limitations. - public static final double MAX_USAGE_PERCENT_LIMIT = 0.95; - public static final double MIN_LEFT_CAPACITY_BYTES_LIMIT = 100 * 1024 * 1024; // 100MB private long beId; private String path; @@ -96,8 +93,8 @@ public BalanceStatus isFit(long tabletSize, boolean isSupplement) { } if (isSupplement) { - if ((usedCapacityB + tabletSize) / (double) capacityB > MAX_USAGE_PERCENT_LIMIT - && capacityB - usedCapacityB - tabletSize < MIN_LEFT_CAPACITY_BYTES_LIMIT) { + if ((usedCapacityB + tabletSize) / (double) capacityB > Config.storage_flood_stage_usage_percent + && capacityB - usedCapacityB - tabletSize < Config.storage_flood_stage_left_capacity_bytes) { return new BalanceStatus(ErrCode.COMMON_ERROR, toString() + " does not fit tablet with size: " + tabletSize + ", limitation reached"); } else { diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 39858e2a3eecf0..5e1c5a619d4efa 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -662,15 +662,26 @@ public class Config extends ConfigBase { public static int backup_job_default_timeout_ms = 86400 * 1000; // 1 day /* - * storage_high_watermark_usage_percent limit the max capacity usage percent of a Backend storage path. - * storage_min_left_capacity_bytes limit the minimum left capacity of a Backend storage path. + * 'storage_high_watermark_usage_percent' limit the max capacity usage percent of a Backend storage path. + * 'storage_min_left_capacity_bytes' limit the minimum left capacity of a Backend storage path. * If both limitations are reached, this storage path can not be chose as tablet balance destination. * But for tablet recovery, we may exceed these limit for keeping data integrity as much as possible. */ @ConfField(mutable = true, masterOnly = true) public static double storage_high_watermark_usage_percent = 0.85; @ConfField(mutable = true, masterOnly = true) - public static double storage_min_left_capacity_bytes = 1000 * 1024 * 1024; // 1G + public static double storage_min_left_capacity_bytes = 1024 * 1024 * 1024; // 1G + + /* + * If capacity of disk reach the 'storage_flood_stage_usage_percent' and 'storage_flood_stage_left_capacity_bytes', + * the following operation will be rejected: + * 1. load job + * 2. restore job + */ + @ConfField(mutable = true, masterOnly = true) + public static double storage_flood_stage_usage_percent = 0.95; + @ConfField(mutable = true, masterOnly = true) + public static double storage_flood_stage_left_capacity_bytes = 100 * 1024 * 1024; // 100MB // update interval of tablet stat // All frontends will get tablet stat from all backends at each interval diff --git a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java index 026e91a87b7861..86fccf31c84920 100644 --- a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -1117,15 +1117,15 @@ public Set getClusterNames() { /* * Check if the specified disks' capacity has reached the limit. * pathBeMap is (path hash -> BE id) - * If highWatermark is true, it will check with the high watermark threshold. + * If floodStage is true, it will check with the floodStage threshold. * * return Status.OK if not reach the limit */ - public Status checkExceedDiskCapacityLimit(Map pathBeMap, boolean highWatermark) { + public Status checkExceedDiskCapacityLimit(Map pathBeMap, boolean floodStage) { ImmutableMap pathHashToDiskInfo = pathHashToDishInfoRef.get(); for (Long pathHash : pathBeMap.keySet()) { DiskInfo diskInfo = pathHashToDiskInfo.get(pathHash); - if (diskInfo != null && diskInfo.exceedLimit(highWatermark)) { + if (diskInfo != null && diskInfo.exceedLimit(floodStage)) { return new Status(TStatusCode.CANCELLED, "disk " + pathHash + " on backend " + pathBeMap.get(pathHash) + " exceed limit usage"); } From a40ae0e679a2974c9d771495682ccf5af4f602b1 Mon Sep 17 00:00:00 2001 From: morningman Date: Sun, 25 Aug 2019 23:04:21 +0800 Subject: [PATCH 3/8] fix bug --- .../main/java/org/apache/doris/catalog/DiskInfo.java | 12 +++++++++--- .../main/java/org/apache/doris/system/Backend.java | 12 ++++++++++-- .../org/apache/doris/system/SystemInfoService.java | 1 + 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java b/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java index 1a6eb0905b9c13..9b6f5effb3e7fa 100644 --- a/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java +++ b/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java @@ -17,18 +17,22 @@ package org.apache.doris.catalog; -import org.apache.doris.clone.RootPathLoadStatistic; import org.apache.doris.common.Config; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.thrift.TStorageMedium; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class DiskInfo implements Writable { + private static final Logger LOG = LogManager.getLogger(DiskInfo.class); + public enum DiskState { ONLINE, OFFLINE @@ -131,12 +135,14 @@ public void setStorageMedium(TStorageMedium storageMedium) { * floodStage threshold means a loosely limit, and we use 'AND' to give a more loosely limit. */ public boolean exceedLimit(boolean floodStage) { + LOG.debug("flood stage: {}, diskAvailableCapacityB: {}, totalCapacityB: {}", + floodStage, diskAvailableCapacityB, totalCapacityB); if (floodStage) { return diskAvailableCapacityB < Config.storage_flood_stage_left_capacity_bytes && - (double) (totalCapacityB-diskAvailableCapacityB) / totalCapacityB >Config.storage_flood_stage_usage_percent; + (double) (totalCapacityB - diskAvailableCapacityB) / totalCapacityB > Config.storage_flood_stage_usage_percent; } else { return diskAvailableCapacityB < Config.storage_min_left_capacity_bytes || - (double) (totalCapacityB-diskAvailableCapacityB) / totalCapacityB > Config.storage_high_watermark_usage_percent; + (double) (totalCapacityB - diskAvailableCapacityB) / totalCapacityB > Config.storage_high_watermark_usage_percent; } } diff --git a/fe/src/main/java/org/apache/doris/system/Backend.java b/fe/src/main/java/org/apache/doris/system/Backend.java index a39e605817918c..961a5cb488aea5 100644 --- a/fe/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/src/main/java/org/apache/doris/system/Backend.java @@ -349,8 +349,16 @@ public void updateDisks(Map backendDisks) { ImmutableMap disks = disksRef.get(); // The very first time to init the path info if (!initPathInfo) { - initPathInfo = true; - Catalog.getCurrentSystemInfo().updatePathInfo(disks.values().stream().collect(Collectors.toList()), Lists.newArrayList()); + boolean allPathHashUpdated = true; + for (DiskInfo diskInfo : disks.values()) { + if (diskInfo.getPathHash() == 0) { + allPathHashUpdated = false; + } + } + if (allPathHashUpdated) { + initPathInfo = true; + Catalog.getCurrentSystemInfo().updatePathInfo(disks.values().stream().collect(Collectors.toList()), Lists.newArrayList()); + } } // update status or add new diskInfo diff --git a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java index 86fccf31c84920..2202bb76d12214 100644 --- a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -1122,6 +1122,7 @@ public Set getClusterNames() { * return Status.OK if not reach the limit */ public Status checkExceedDiskCapacityLimit(Map pathBeMap, boolean floodStage) { + LOG.debug("pathBeMap: {}", pathBeMap); ImmutableMap pathHashToDiskInfo = pathHashToDishInfoRef.get(); for (Long pathHash : pathBeMap.keySet()) { DiskInfo diskInfo = pathHashToDiskInfo.get(pathHash); From 22ec8240d0ac6e5d811efafe974a712d5dacb0e4 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 26 Aug 2019 15:04:33 +0800 Subject: [PATCH 4/8] add check capacity --- be/src/agent/status.h | 1 + be/src/common/config.h | 10 ++++- be/src/olap/base_compaction.cpp | 5 ++- be/src/olap/base_compaction.h | 2 +- be/src/olap/compaction.cpp | 16 ++++++- be/src/olap/compaction.h | 6 ++- be/src/olap/cumulative_compaction.cpp | 1 + be/src/olap/data_dir.cpp | 33 +++++++++++++-- be/src/olap/data_dir.h | 36 ++++++++++++---- be/src/olap/olap_common.h | 9 ++++ be/src/olap/olap_define.h | 1 + be/src/olap/olap_server.cpp | 8 +++- be/src/olap/rowset/segment_writer.cpp | 19 ++++++--- be/src/olap/schema_change.cpp | 8 +++- be/src/olap/storage_engine.cpp | 42 +++---------------- be/src/olap/storage_engine.h | 4 -- be/src/olap/task/engine_batch_load_task.cpp | 9 +++- be/src/olap/task/engine_clone_task.cpp | 6 +++ .../task/engine_storage_migration_task.cpp | 8 ++++ be/src/runtime/fragment_mgr.cpp | 2 +- 20 files changed, 155 insertions(+), 71 deletions(-) diff --git a/be/src/agent/status.h b/be/src/agent/status.h index 14fe4c91aeb986..f31b5b09c8eee3 100644 --- a/be/src/agent/status.h +++ b/be/src/agent/status.h @@ -41,6 +41,7 @@ enum AgentStatus { DORIS_PUSH_HAD_LOADED = -504, DORIS_TIMEOUT = -901, DORIS_INTERNAL_ERROR = -902, + DORIS_DISK_REACH_CAPACITY_LIMIT = -903, }; } // namespace doris #endif // DORIS_BE_SRC_AGENT_STATUS_H diff --git a/be/src/common/config.h b/be/src/common/config.h index 54072d901837f5..9cd4f4e3e76c4b 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -212,12 +212,11 @@ namespace config { // inc_rowset expired interval CONF_Int32(inc_rowset_expired_sec, "1800"); // garbage sweep policy - CONF_Int32(max_garbage_sweep_interval, "14400"); + CONF_Int32(max_garbage_sweep_interval, "3600"); CONF_Int32(min_garbage_sweep_interval, "180"); CONF_Int32(snapshot_expire_time_sec, "172800"); // 仅仅是建议值,当磁盘空间不足时,trash下的文件保存期可不遵守这个参数 CONF_Int32(trash_file_expire_time_sec, "259200"); - CONF_Int32(disk_capacity_insufficient_percentage, "90"); // check row nums for BE/CE and schema change. true is open, false is closed. CONF_Bool(row_nums_check, "true") //file descriptors cache, by default, cache 30720 descriptors @@ -439,6 +438,13 @@ namespace config { CONF_Int32(path_gc_check_step, "1000"); CONF_Int32(path_gc_check_step_interval_ms, "10"); CONF_Int32(path_scan_interval_second, "86400"); + + // The following 2 configs limit the max usage of disk capacity of a data dir. + // If both of these 2 threshold reached, no more data can be writen into that data dir. + // The percent of max used capacity of a data dir + CONF_Int32(capacity_used_percent_flood_stage, "95"); // 95% + // The min bytes that should be left of a data dir + CONF_Int64(capacity_min_left_bytes_flood_stage, "1073741824") // 1GB } // namespace config } // namespace doris diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index a77c6ccf9d3925..09624bae4ab2aa 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -20,8 +20,8 @@ namespace doris { -BaseCompaction::BaseCompaction(TabletSharedPtr tablet) - : Compaction(tablet) +BaseCompaction::BaseCompaction(TabletSharedPtr tablet, DataDir* data_dir) + : Compaction(tablet, data_dir), { } BaseCompaction::~BaseCompaction() { } @@ -39,6 +39,7 @@ OLAPStatus BaseCompaction::compact() { // 1. pick rowsets to compact RETURN_NOT_OK(pick_rowsets_to_compact()); + RETURN_NOT_OK(check_disk_capacity()); // 2. do base compaction, merge rowsets RETURN_NOT_OK(do_compaction()); diff --git a/be/src/olap/base_compaction.h b/be/src/olap/base_compaction.h index 5110734cdbee3e..b58330ffa5af00 100644 --- a/be/src/olap/base_compaction.h +++ b/be/src/olap/base_compaction.h @@ -29,7 +29,7 @@ namespace doris { class BaseCompaction : public Compaction { public: - BaseCompaction(TabletSharedPtr tablet); + BaseCompaction(TabletSharedPtr tablet, DataDir* data_dir); ~BaseCompaction() override; OLAPStatus compact() override; diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index b4049a25efd888..48d9362d466c00 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -21,8 +21,9 @@ using std::vector; namespace doris { -Compaction::Compaction(TabletSharedPtr tablet) +Compaction::Compaction(TabletSharedPtr tablet, DataDir* data_dir) : _tablet(tablet), + _data_dir(data_dir); _input_rowsets_size(0), _input_row_num(0), _state(CompactionState::INITED) @@ -186,4 +187,17 @@ OLAPStatus Compaction::check_correctness(const Merger& merger) { return OLAP_SUCCESS; } +OLAPStatus Compaction::check_disk_capacity() { + int64_t incoming_data_size = 0; + for (auto& rowset : _input_rowsets) { + incoming_data_size += rowset->data_disk_size(); + incoming_data_size += rowset->index_disk_size(); + } + + if (_data_dir->reach_capacity_limit(incoming_data_size)) { + return OLAP_ERR_DISK_REACH_CAPACITY_LIMIT; + } + return OLAP_SUCCESS; +} + } // namespace doris diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index ebe77de40dc37a..a2511703cc0460 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -32,6 +32,7 @@ namespace doris { +class DataDir; class Merger; // This class is a base class for compaction. @@ -43,7 +44,7 @@ class Merger; // 4. gc unused rowstes class Compaction { public: - Compaction(TabletSharedPtr tablet); + Compaction(TabletSharedPtr tablet, DataDir* data_dir); virtual ~Compaction(); virtual OLAPStatus compact() = 0; @@ -63,8 +64,11 @@ class Compaction { OLAPStatus check_version_continuity(const std::vector& rowsets); OLAPStatus check_correctness(const Merger& merger); + OLAPStatus check_disk_capacity(); + protected: TabletSharedPtr _tablet; + DataDir* _data_dir; std::vector _input_rowsets; std::vector _input_rs_readers; diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 35ddb8174734dc..58484f06fc305e 100755 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -43,6 +43,7 @@ OLAPStatus CumulativeCompaction::compact() { // 2. pick rowsets to compact RETURN_NOT_OK(pick_rowsets_to_compact()); + RETURN_NOT_OK(check_disk_capacity()); // 3. do cumulative compaction, merge rowsets RETURN_NOT_OK(do_compaction()); diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index ec94e13cb09451..ea23bbbf496190 100755 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -59,14 +59,12 @@ DataDir::DataDir(const std::string& path, int64_t capacity_bytes, TabletManager* tablet_manager, TxnManager* txn_manager) : _path(path), _capacity_bytes(capacity_bytes), + _is_used(false), _tablet_manager(tablet_manager), _txn_manager(txn_manager), _cluster_id(-1), - _available_bytes(0), - _used_bytes(0), - _current_shard(0), - _is_used(false), _to_be_deleted(false), + _current_shard(0), _test_file_read_buf(nullptr), _test_file_write_buf(nullptr), _meta(nullptr) { @@ -100,6 +98,7 @@ Status DataDir::init() { return Status::InternalError("invalid root path: "); } + RETURN_IF_ERROR(update_capacity()); RETURN_IF_ERROR(_init_cluster_id()); RETURN_IF_ERROR(_init_extension_and_capacity()); RETURN_IF_ERROR(_init_file_system()); @@ -1057,4 +1056,30 @@ void DataDir::_remove_check_paths_no_lock(const std::set& paths) { } } +Status DataDir::update_capacity() { + try { + boost::filesystem::path path_name(_path); + boost::filesystem::space_info path_info = boost::filesystem::space(path_name); + _available_bytes = path_info.available; + _disk_capacity_bytes = path_info.capacity; + } catch (boost::filesystem::filesystem_error& e) { + LOG(WARNING) << "get space info failed. path: " << root_path << " erro:" << e.what(); + return Status.InternalError("get path available capacity failed"); + } + LOG(INFO) << "path: " << _path << " total capacity: " << _disk_capacity_bytes + << ", available capacity: " << _available_bytes; + + return Status::OK(); +} + +bool DataDir::reach_capacity_limit(int64_t incoming_data_size); + double used_pct = _available_bytes + incoming_data_size / (double) _disk_capacity_bytes; + int64_t left_bytes = _disk_capacity_bytes - _available_bytes - incoming_data_size; + + if (used_pct >= config::capacity_used_percent_flood_stage / 100.0 + && left_bytes <= config::capacity_min_left_bytes_flood_stage) { + return true; + } + return false; +} } // namespace doris diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h index 44528279cf19ce..e5b9a914f34f69 100644 --- a/be/src/olap/data_dir.h +++ b/be/src/olap/data_dir.h @@ -48,12 +48,15 @@ class DataDir { bool is_used() const { return _is_used; } void set_is_used(bool is_used) { _is_used = is_used; } int32_t cluster_id() const { return _cluster_id; } + DataDirInfo get_dir_info() { DataDirInfo info; info.path = _path; info.path_hash = _path_hash; - info.is_used = _is_used; info.capacity = _capacity_bytes; + info.available = _available_bytes; + info.is_used = _is_used; + info.storage_medium = _storage_medium; return info; } @@ -121,6 +124,16 @@ class DataDir { OLAPStatus set_convert_finished(); + // check if the capacity reach the limit after adding the incoming data + // return true if limit reached, otherwise, return false. + // TODO(cmy): for now we can not precisely calculate the capacity Doris used, + // so in order to avoid running out of disk capacity, we currenty use the actual + // disk avaiable capacity and total capacity to do the calculation. + // So that the capacity Doris actually used may exceeds the user specified capacity. + bool reach_capacity_limit(int64_t incoming_data_size); + + Status update_capacity(); + private: std::string _cluster_id_path() const { return _path + CLUSTER_ID_PREFIX; } Status _init_cluster_id(); @@ -146,23 +159,30 @@ class DataDir { private: std::string _path; - size_t _path_hash; + int64_t _path_hash; + // user specified capacity + int64_t _capacity_bytes; + // the actual avaiable capacity of the disk of this data dir + // NOTICE that _available_byte smay be larger than _capacity_bytes, if capacity is set + // by user, not the disk's actual capacity + int64_t _available_bytes; + // the actual capacity of the disk of this data dir + int64_t _disk_capacity_bytes; + TStorageMedium::type _storage_medium; + bool _is_used; + uint32_t _rand_seed; std::string _file_system; - int64_t _capacity_bytes; TabletManager* _tablet_manager; TxnManager* _txn_manager; int32_t _cluster_id; - int64_t _available_bytes; - int64_t _used_bytes; - uint64_t _current_shard; - bool _is_used; // This flag will be set true if this store was not in root path when reloading bool _to_be_deleted; + // used to protect _current_shard and _tablet_set std::mutex _mutex; - TStorageMedium::type _storage_medium; // 存储介质类型:SSD|HDD + uint64_t _current_shard; std::set _tablet_set; static const size_t TEST_FILE_BUF_SIZE = 4096; diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index ecc1fdfbcac5a8..c564b87ce28ec3 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -56,6 +56,15 @@ struct DataDirInfo { data_used_capacity(0), is_used(false) { } + DataDirInfo(const DataDirInfo& other) { + path = other.path; + path_hash = other.pash_hash; + capacity = other.capacity; + data_used_capacity = other.data_used_capacity; + is_used = other.is_used; + storage_medium = other.storage_medium; + } + std::string path; int64_t path_hash; int64_t capacity; // 总空间,单位字节 diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index 8de070442a4ba1..f3575ed547d50c 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -164,6 +164,7 @@ enum OLAPStatus { OLAP_ERR_TRANSACTION_ALREADY_VISIBLE = -229, OLAP_ERR_VERSION_ALREADY_MERGED = -230, OLAP_ERR_LZO_DISABLED = -231, + OLAP_ERR_DISK_REACH_CAPACITY_LIMIT = -232, // CommandExecutor // [-300, -400) diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index b637e1d077d430..71cdb0346d57cd 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -157,7 +157,9 @@ void* StorageEngine::_base_compaction_thread_callback(void* arg, DataDir* data_d // cgroup is not initialized at this time // add tid to cgroup CgroupsMgr::apply_system_cgroup(); - perform_base_compaction(data_dir); + if (!data_dir->reach_capacity_limit(0)) { + perform_base_compaction(data_dir); + } usleep(interval * 1000000); } @@ -249,7 +251,9 @@ void* StorageEngine::_cumulative_compaction_thread_callback(void* arg, DataDir* // cgroup is not initialized at this time // add tid to cgroup CgroupsMgr::apply_system_cgroup(); - perform_cumulative_compaction(data_dir); + if (!data_dir->reach_capacity_limit(0)) { + perform_cumulative_compaction(data_dir); + } usleep(interval * 1000000); } diff --git a/be/src/olap/rowset/segment_writer.cpp b/be/src/olap/rowset/segment_writer.cpp index 09a77ecd367c19..cb7a45fcd40572 100644 --- a/be/src/olap/rowset/segment_writer.cpp +++ b/be/src/olap/rowset/segment_writer.cpp @@ -213,12 +213,6 @@ OLAPStatus SegmentWriter::finalize(uint32_t* segment_file_size) { boost::filesystem::path data_dir_path = tablet_path.parent_path().parent_path().parent_path().parent_path(); std::string data_dir_string = data_dir_path.string(); DataDir* data_dir = StorageEngine::instance()->get_store(data_dir_string); - data_dir->add_pending_ids(ROWSET_ID_PREFIX + std::to_string(_segment_group->rowset_id())); - if (OLAP_SUCCESS != (res = file_handle.open_with_mode( - _file_name, O_CREAT | O_EXCL | O_WRONLY , S_IRUSR | S_IWUSR))) { - LOG(WARNING) << "fail to open file. [file_name=" << _file_name << "]"; - return res; - } res = _make_file_header(file_header.mutable_message()); if (OLAP_SUCCESS != res) { @@ -226,6 +220,19 @@ OLAPStatus SegmentWriter::finalize(uint32_t* segment_file_size) { return res; } + // check disk capacity + if (data_dir->reach_capacity_limit((int64_t) file_header.file_length())) { + LOG(WARNING) << "dir " << data_dir->path() << " reach the capacity limit"; + return OLAP_ERR_DISK_REACH_CAPACITY_LIMIT; + } + + data_dir->add_pending_ids(ROWSET_ID_PREFIX + std::to_string(_segment_group->rowset_id())); + if (OLAP_SUCCESS != (res = file_handle.open_with_mode( + _file_name, O_CREAT | O_EXCL | O_WRONLY , S_IRUSR | S_IWUSR))) { + LOG(WARNING) << "fail to open file. [file_name=" << _file_name << "]"; + return res; + } + res = file_header.prepare(&file_handle); if (OLAP_SUCCESS != res) { OLAP_LOG_WARNING("write file header error. [err=%m]"); diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 340e74940f934e..904b541211e8e7 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -1889,10 +1889,16 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa VLOG(10) << "begin to convert a history rowset. version=" << rs_reader->version().first << "-" << rs_reader->version().second; + // check disk capacity + int64_t tablet_size = rs_reader->row_set()->index_disk_size() + rs_reader->row_set()->data_disk_size(); + if (sc_params.new_tablet->data_dir()->reach_capacity_limit(tablet_size)) { + res = OLAP_ERR_DISK_REACH_CAPACITY_LIMIT; + goto PROCESS_ALTER_EXIT; + } + // set status for monitor // 只要有一个new_table为running,ref table就设置为running // NOTE 如果第一个sub_table先fail,这里会继续按正常走 - RowsetId rowset_id = 0; TabletSharedPtr new_tablet = sc_params.new_tablet; res = sc_params.new_tablet->next_rowset_id(&rowset_id); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 8fff00dcb9597d..aaa81394eb73bd 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -295,28 +295,19 @@ OLAPStatus StorageEngine::get_all_data_dir_info(vector* data_dir_in timer.start(); int tablet_counter = 0; + // 1. update avaiable capacity of each data dir // get all root path info and construct a path map. // path -> DataDirInfo std::map path_map; { std::lock_guard l(_store_lock); for (auto& it : _store_map) { - std::string path = it.first; - path_map.emplace(path, it.second->get_dir_info()); - // if this path is not used, init it's info - if (!path_map[path].is_used) { - path_map[path].capacity = 1; - path_map[path].data_used_capacity = 0; - path_map[path].available = 0; - path_map[path].storage_medium = TStorageMedium::HDD; - } else { - path_map[path].storage_medium = it.second->storage_medium(); - } + it.second->update_available_capacity(); + path_map.emplace(it.first, it.second->get_dir_info()); } } - // for each tablet, get it's data size, and accumulate the path 'data_used_capacity' - // which the tablet belongs to. + // 2. get total tablets' size of each data dir _tablet_manager->update_root_path_info(&path_map, &tablet_counter); // add path info to data_dir_infos @@ -324,12 +315,6 @@ OLAPStatus StorageEngine::get_all_data_dir_info(vector* data_dir_in data_dir_infos->emplace_back(entry.second); } - // get available capacity of each path - for (auto& info: *data_dir_infos) { - if (info.is_used) { - _get_path_available_capacity(info.path, &info.available); - } - } timer.stop(); LOG(INFO) << "get root path info cost: " << timer.elapsed_time() / 1000000 << " ms. tablet counter: " << tablet_counter; @@ -470,23 +455,6 @@ void StorageEngine::_delete_tablets_on_unused_root_path() { _tablet_manager->drop_tablets_on_error_root_path(tablet_info_vec); } -OLAPStatus StorageEngine::_get_path_available_capacity( - const string& root_path, - int64_t* disk_available) { - OLAPStatus res = OLAP_SUCCESS; - - try { - boost::filesystem::path path_name(root_path); - boost::filesystem::space_info path_info = boost::filesystem::space(path_name); - *disk_available = path_info.available; - } catch (boost::filesystem::filesystem_error& e) { - LOG(WARNING) << "get space info failed. path: " << root_path << " erro:" << e.what(); - return OLAP_ERR_STL_ERROR; - } - - return res; -} - OLAPStatus StorageEngine::clear() { // 删除lru中所有内容,其实进程退出这么做本身意义不大,但对单测和更容易发现问题还是有很大意义的 delete FileHandler::get_fd_cache(); @@ -597,7 +565,7 @@ OLAPStatus StorageEngine::start_trash_sweep(double* usage) { const int32_t snapshot_expire = config::snapshot_expire_time_sec; const int32_t trash_expire = config::trash_file_expire_time_sec; - const double guard_space = config::disk_capacity_insufficient_percentage / 100.0; + const double guard_space = config::capacity_used_percent_flood_stage / 100.0; std::vector data_dir_infos; res = get_all_data_dir_info(&data_dir_infos); if (res != OLAP_SUCCESS) { diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 1332065b95391a..db700dfe9aaa93 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -201,10 +201,6 @@ class StorageEngine { bool _used_disk_not_enough(uint32_t unused_num, uint32_t total_num); - OLAPStatus _get_path_available_capacity( - const std::string& root_path, - int64_t* disk_available); - OLAPStatus _config_root_path_unused_flag_file( const std::string& root_path, std::string* unused_flag_file); diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp index 6126919d59275f..d2076a4097165e 100644 --- a/be/src/olap/task/engine_batch_load_task.cpp +++ b/be/src/olap/task/engine_batch_load_task.cpp @@ -117,10 +117,17 @@ AgentStatus EngineBatchLoadTask::_init() { } // Empty remote_path - if (!_push_req.__isset.http_file_path) { + if (!_push_req.__isset.http_file_path || !_push_req.__isset.http_file_size) { _is_init = true; return status; } + + // check disk capacity + if (_push_req.push_type == TPushType::LOAD) { + if (tablet->data_dir()->reach_capacity_limit(_push_req.__isset.http_file_size)) { + return DORIS_DISK_REACH_CAPACITY_LIMIT; + } + } // Check remote path _remote_file_path = _push_req.http_file_path; diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index c70d27489f6456..dfed7143572b2f 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -468,6 +468,12 @@ AgentStatus EngineCloneTask::_clone_copy( break; } + // check disk capacity + if (data_dir->reach_capacity_limit(file_size)) { + status = DORIS_DISK_REACH_CAPACITY_LIMIT; + break; + } + total_file_size += file_size; uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024; if (estimate_timeout < config::download_low_speed_time) { diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index f65a5281fc8a01..0c2faccf833f8c 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -118,6 +118,14 @@ OLAPStatus EngineStorageMigrationTask::_storage_medium_migrate( break; } + // check disk capacity + int64_t tablet_size = tablet->tablet_footprint(); + if (stores[0]->reach_capacity_limit(tablet_size)) { + res = OLAP_ERR_DISK_REACH_CAPACITY_LIMIT; + break; + } + + // get shard uint64_t shard = 0; res = stores[0]->get_shard(&shard); if (res != OLAP_SUCCESS) { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 08e9c8bc19a8aa..4a4f64f9348b9f 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -500,7 +500,7 @@ void FragmentMgr::cancel_worker() { } } for (auto& id : to_delete) { - LOG(INFO) << "FragmentMgr cancel worker going to cancel fragment " << id; + LOG(INFO) << "FragmentMgr cancel worker going to cancel fragment " << print_id(id); cancel(id); } From cccc93fee9cad5dfafb544f91e19ccb32d587819 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 26 Aug 2019 16:46:48 +0800 Subject: [PATCH 5/8] fix compile bug --- be/src/agent/task_worker_pool.cpp | 2 +- be/src/olap/base_compaction.cpp | 4 ++-- be/src/olap/base_compaction.h | 2 +- be/src/olap/compaction.cpp | 5 ++--- be/src/olap/compaction.h | 3 +-- be/src/olap/data_dir.cpp | 17 ++++++++++++----- be/src/olap/delta_writer.cpp | 5 +---- be/src/olap/olap_common.h | 9 --------- be/src/olap/rowset/column_data_writer.cpp | 8 ++++---- be/src/olap/rowset/segment_writer.cpp | 1 - be/src/olap/schema_change.cpp | 2 +- be/src/olap/storage_engine.cpp | 10 ++++++---- be/src/olap/storage_engine.h | 2 +- be/src/olap/task/engine_clone_task.cpp | 2 +- be/src/runtime/snapshot_loader.cpp | 15 +++++++++++++++ be/src/runtime/tablet_writer_mgr.cpp | 20 ++++++++++++-------- 16 files changed, 60 insertions(+), 47 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 9c17c412817442..3bba1abcd03903 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1266,7 +1266,7 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this) } #endif vector data_dir_infos; - worker_pool_this->_env->storage_engine()->get_all_data_dir_info(&data_dir_infos); + worker_pool_this->_env->storage_engine()->get_all_data_dir_info(&data_dir_infos, true /* update */); map disks; for (auto& root_path_info : data_dir_infos) { diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index 09624bae4ab2aa..20f40f38bd7914 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -20,8 +20,8 @@ namespace doris { -BaseCompaction::BaseCompaction(TabletSharedPtr tablet, DataDir* data_dir) - : Compaction(tablet, data_dir), +BaseCompaction::BaseCompaction(TabletSharedPtr tablet) + : Compaction(tablet) { } BaseCompaction::~BaseCompaction() { } diff --git a/be/src/olap/base_compaction.h b/be/src/olap/base_compaction.h index b58330ffa5af00..5110734cdbee3e 100644 --- a/be/src/olap/base_compaction.h +++ b/be/src/olap/base_compaction.h @@ -29,7 +29,7 @@ namespace doris { class BaseCompaction : public Compaction { public: - BaseCompaction(TabletSharedPtr tablet, DataDir* data_dir); + BaseCompaction(TabletSharedPtr tablet); ~BaseCompaction() override; OLAPStatus compact() override; diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 48d9362d466c00..8460240bb80eeb 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -21,9 +21,8 @@ using std::vector; namespace doris { -Compaction::Compaction(TabletSharedPtr tablet, DataDir* data_dir) +Compaction::Compaction(TabletSharedPtr tablet) : _tablet(tablet), - _data_dir(data_dir); _input_rowsets_size(0), _input_row_num(0), _state(CompactionState::INITED) @@ -194,7 +193,7 @@ OLAPStatus Compaction::check_disk_capacity() { incoming_data_size += rowset->index_disk_size(); } - if (_data_dir->reach_capacity_limit(incoming_data_size)) { + if (_tablet->data_dir()->reach_capacity_limit(incoming_data_size)) { return OLAP_ERR_DISK_REACH_CAPACITY_LIMIT; } return OLAP_SUCCESS; diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index a2511703cc0460..87c51d65b65da6 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -44,7 +44,7 @@ class Merger; // 4. gc unused rowstes class Compaction { public: - Compaction(TabletSharedPtr tablet, DataDir* data_dir); + Compaction(TabletSharedPtr tablet); virtual ~Compaction(); virtual OLAPStatus compact() = 0; @@ -68,7 +68,6 @@ class Compaction { protected: TabletSharedPtr _tablet; - DataDir* _data_dir; std::vector _input_rowsets; std::vector _input_rs_readers; diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index ea23bbbf496190..11f138208ea1cd 100755 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -59,6 +59,8 @@ DataDir::DataDir(const std::string& path, int64_t capacity_bytes, TabletManager* tablet_manager, TxnManager* txn_manager) : _path(path), _capacity_bytes(capacity_bytes), + _available_bytes(0), + _disk_capacity_bytes(0), _is_used(false), _tablet_manager(tablet_manager), _txn_manager(txn_manager), @@ -1061,10 +1063,13 @@ Status DataDir::update_capacity() { boost::filesystem::path path_name(_path); boost::filesystem::space_info path_info = boost::filesystem::space(path_name); _available_bytes = path_info.available; - _disk_capacity_bytes = path_info.capacity; + if (_disk_capacity_bytes == 0) { + // disk capacity only need to be set once + _disk_capacity_bytes = path_info.capacity; + } } catch (boost::filesystem::filesystem_error& e) { - LOG(WARNING) << "get space info failed. path: " << root_path << " erro:" << e.what(); - return Status.InternalError("get path available capacity failed"); + LOG(WARNING) << "get space info failed. path: " << _path << " erro:" << e.what(); + return Status::InternalError("get path available capacity failed"); } LOG(INFO) << "path: " << _path << " total capacity: " << _disk_capacity_bytes << ", available capacity: " << _available_bytes; @@ -1072,12 +1077,14 @@ Status DataDir::update_capacity() { return Status::OK(); } -bool DataDir::reach_capacity_limit(int64_t incoming_data_size); - double used_pct = _available_bytes + incoming_data_size / (double) _disk_capacity_bytes; +bool DataDir::reach_capacity_limit(int64_t incoming_data_size) { + double used_pct = (_available_bytes + incoming_data_size) / (double) _disk_capacity_bytes; int64_t left_bytes = _disk_capacity_bytes - _available_bytes - incoming_data_size; if (used_pct >= config::capacity_used_percent_flood_stage / 100.0 && left_bytes <= config::capacity_min_left_bytes_flood_stage) { + LOG(WARNING) << "reach capacity limit. used pct: " << used_pct << ", left bytes: " << left_bytes + << ", path: " << _path; return true; } return false; diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index d3f63478c4e967..35c4d18fc49691 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -137,10 +137,7 @@ OLAPStatus DeltaWriter::init() { // TODO: new RowsetBuilder according to tablet storage type _rowset_writer.reset(new AlphaRowsetWriter()); - status = _rowset_writer->init(writer_context); - if (status != OLAP_SUCCESS) { - return OLAP_ERR_ROWSET_WRITER_INIT; - } + RETURN_NOT_OK(_rowset_writer->init(writer_context)); const std::vector& slots = _req.tuple_desc->slots(); const TabletSchema& schema = _tablet->tablet_schema(); diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index c564b87ce28ec3..ecc1fdfbcac5a8 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -56,15 +56,6 @@ struct DataDirInfo { data_used_capacity(0), is_used(false) { } - DataDirInfo(const DataDirInfo& other) { - path = other.path; - path_hash = other.pash_hash; - capacity = other.capacity; - data_used_capacity = other.data_used_capacity; - is_used = other.is_used; - storage_medium = other.storage_medium; - } - std::string path; int64_t path_hash; int64_t capacity; // 总空间,单位字节 diff --git a/be/src/olap/rowset/column_data_writer.cpp b/be/src/olap/rowset/column_data_writer.cpp index c8da480e974b43..7037f87241cb1b 100644 --- a/be/src/olap/rowset/column_data_writer.cpp +++ b/be/src/olap/rowset/column_data_writer.cpp @@ -280,7 +280,7 @@ OLAPStatus ColumnDataWriter::_flush_segment_with_verfication() { OLAPStatus res = _finalize_segment(); if (OLAP_SUCCESS != res) { OLAP_LOG_WARNING("fail to finalize segment. [res=%d]", res); - return OLAP_ERR_WRITER_DATA_WRITE_ERROR; + return res; } _new_segment_created = false; @@ -292,12 +292,12 @@ OLAPStatus ColumnDataWriter::_finalize_segment() { OLAPStatus res = OLAP_SUCCESS; uint32_t data_segment_size; - if (OLAP_SUCCESS != _segment_writer->finalize(&data_segment_size)) { + if ((res = _segment_writer->finalize(&data_segment_size)) != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to finish segment from olap_data."); - return OLAP_ERR_WRITER_DATA_WRITE_ERROR; + return res; } - if (OLAP_SUCCESS != _segment_group->finalize_segment(data_segment_size, _num_rows)) { + if ((res != _segment_group->finalize_segment(data_segment_size, _num_rows)) != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to finish segment from olap_index."); return OLAP_ERR_WRITER_INDEX_WRITE_ERROR; } diff --git a/be/src/olap/rowset/segment_writer.cpp b/be/src/olap/rowset/segment_writer.cpp index cb7a45fcd40572..70ffccb53f9e94 100644 --- a/be/src/olap/rowset/segment_writer.cpp +++ b/be/src/olap/rowset/segment_writer.cpp @@ -222,7 +222,6 @@ OLAPStatus SegmentWriter::finalize(uint32_t* segment_file_size) { // check disk capacity if (data_dir->reach_capacity_limit((int64_t) file_header.file_length())) { - LOG(WARNING) << "dir " << data_dir->path() << " reach the capacity limit"; return OLAP_ERR_DISK_REACH_CAPACITY_LIMIT; } diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 904b541211e8e7..7be09c21f47dc7 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -1890,7 +1890,7 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa << rs_reader->version().first << "-" << rs_reader->version().second; // check disk capacity - int64_t tablet_size = rs_reader->row_set()->index_disk_size() + rs_reader->row_set()->data_disk_size(); + int64_t tablet_size = rs_reader->rowset()->index_disk_size() + rs_reader->rowset()->data_disk_size(); if (sc_params.new_tablet->data_dir()->reach_capacity_limit(tablet_size)) { res = OLAP_ERR_DISK_REACH_CAPACITY_LIMIT; goto PROCESS_ALTER_EXIT; diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index aaa81394eb73bd..555f8127ad4836 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -287,7 +287,7 @@ std::vector StorageEngine::get_stores() { template std::vector StorageEngine::get_stores(); template std::vector StorageEngine::get_stores(); -OLAPStatus StorageEngine::get_all_data_dir_info(vector* data_dir_infos) { +OLAPStatus StorageEngine::get_all_data_dir_info(vector* data_dir_infos, bool need_update) { OLAPStatus res = OLAP_SUCCESS; data_dir_infos->clear(); @@ -302,7 +302,9 @@ OLAPStatus StorageEngine::get_all_data_dir_info(vector* data_dir_in { std::lock_guard l(_store_lock); for (auto& it : _store_map) { - it.second->update_available_capacity(); + if (need_update) { + it.second->update_capacity(); + } path_map.emplace(it.first, it.second->get_dir_info()); } } @@ -414,7 +416,7 @@ std::vector StorageEngine::get_stores_for_create_tablet( } DataDir* StorageEngine::get_store(const std::string& path) { - std::lock_guard l(_store_lock); + // _store_map is unchanged, no need to lock auto it = _store_map.find(path); if (it == std::end(_store_map)) { return nullptr; @@ -567,7 +569,7 @@ OLAPStatus StorageEngine::start_trash_sweep(double* usage) { const int32_t trash_expire = config::trash_file_expire_time_sec; const double guard_space = config::capacity_used_percent_flood_stage / 100.0; std::vector data_dir_infos; - res = get_all_data_dir_info(&data_dir_infos); + res = get_all_data_dir_info(&data_dir_infos, false); if (res != OLAP_SUCCESS) { LOG(WARNING) << "failed to get root path stat info when sweep trash."; return res; diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index db700dfe9aaa93..dad4ce34529655 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -120,7 +120,7 @@ class StorageEngine { void set_store_used_flag(const std::string& root_path, bool is_used); // @brief 获取所有root_path信息 - OLAPStatus get_all_data_dir_info(std::vector* data_dir_infos); + OLAPStatus get_all_data_dir_info(std::vector* data_dir_infos, bool need_update); // 磁盘状态监测。监测unused_flag路劲新的对应root_path unused标识位, // 当检测到有unused标识时,从内存中删除对应表信息,磁盘数据不动。 diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index dfed7143572b2f..13300f5b318207 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -469,7 +469,7 @@ AgentStatus EngineCloneTask::_clone_copy( } // check disk capacity - if (data_dir->reach_capacity_limit(file_size)) { + if (data_dir.reach_capacity_limit(file_size)) { status = DORIS_DISK_REACH_CAPACITY_LIMIT; break; } diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index f3b48822648195..daf947e08b7842 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -318,6 +318,15 @@ Status SnapshotLoader::download( return Status::InternalError(ss.str()); } + TabletSharedPtr tablet = _env->storage_engine()->tablet_manager()->get_tablet(local_tablet_id, schema_hash); + if (tablet == nullptr) { + std::stringstream ss; + ss << "failed to get local tablet: " << local_tablet_id; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + DataDir* data_dir = tablet->data_dir(); + for (auto& iter : remote_files) { RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num, @@ -367,6 +376,12 @@ Status SnapshotLoader::download( LOG(INFO) << "begin to download from " << full_remote_file << " to " << full_local_file; size_t file_len = file_stat.size; + + // check disk capacity + if (data_dir->reach_capacity_limit(file_len)) { + return Status::InternalError("capacity limit reached"); + } + { // 1. open remote file for read std::unique_ptr broker_reader; diff --git a/be/src/runtime/tablet_writer_mgr.cpp b/be/src/runtime/tablet_writer_mgr.cpp index 1d1c282a57ffeb..8b9bf6487fdcb8 100644 --- a/be/src/runtime/tablet_writer_mgr.cpp +++ b/be/src/runtime/tablet_writer_mgr.cpp @@ -155,9 +155,9 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) { if (st != OLAP_SUCCESS) { std::stringstream ss; ss << "tablet writer write failed, tablet_id=" << it->first - << ", transaction_id=" << _txn_id << ", status=" << st; + << ", transaction_id=" << _txn_id << ", err=" << st; LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str() + ", be: " + BackendOptions::get_localhost()); + return Status::InternalError(ss.str()); } } _next_seqs[params.sender_id()]++; @@ -187,9 +187,11 @@ Status TabletsChannel::close(int sender_id, bool* finished, if (_partition_ids.count(it.second->partition_id()) > 0) { auto st = it.second->close(tablet_vec); if (st != OLAP_SUCCESS) { - LOG(WARNING) << "close tablet writer failed, tablet_id=" << it.first - << ", transaction_id=" << _txn_id; - _close_status = Status::InternalError("close tablet writer failed"); + std::stringstream ss; + ss << "close tablet writer failed, tablet_id=" << it.first + << ", transaction_id=" << _txn_id << ", err=" << st; + LOG(WARNING) << ss.str(); + _close_status = Status::InternalError(ss.str()); return _close_status; } } else { @@ -233,11 +235,13 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& params) DeltaWriter* writer = nullptr; auto st = DeltaWriter::open(&request, &writer); if (st != OLAP_SUCCESS) { - LOG(WARNING) << "open delta writer failed, tablet_id=" << tablet.tablet_id() + std::stringstream ss; + ss << "open delta writer failed, tablet_id=" << tablet.tablet_id() << ", txn_id=" << _txn_id << ", partition_id=" << tablet.partition_id() - << ", status=" << st; - return Status::InternalError("open tablet writer failed"); + << ", err=" << st; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); } _tablet_writers.emplace(tablet.tablet_id(), writer); } From 3ae23c1e1277cf2a0b48e0742db575574142ee25 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 26 Aug 2019 18:53:36 +0800 Subject: [PATCH 6/8] Limit the disk usage to avoid running out of disk capacity Set high watermark and flood stage of disk used capacity. And forbid some operation is disk usage is too high. --- be/src/common/config.h | 4 +- be/src/olap/data_dir.cpp | 4 +- be/src/olap/storage_engine.cpp | 2 +- .../operation/disk-capacity.md | 122 ++++++++++++++++++ .../org/apache/doris/alter/SystemHandler.java | 2 +- .../org/apache/doris/catalog/DiskInfo.java | 4 +- .../doris/clone/RootPathLoadStatistic.java | 4 +- .../java/org/apache/doris/common/Config.java | 8 +- 8 files changed, 136 insertions(+), 14 deletions(-) create mode 100644 docs/documentation/cn/administrator-guide/operation/disk-capacity.md diff --git a/be/src/common/config.h b/be/src/common/config.h index 9cd4f4e3e76c4b..c5ffeff6a9be06 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -442,9 +442,9 @@ namespace config { // The following 2 configs limit the max usage of disk capacity of a data dir. // If both of these 2 threshold reached, no more data can be writen into that data dir. // The percent of max used capacity of a data dir - CONF_Int32(capacity_used_percent_flood_stage, "95"); // 95% + CONF_Int32(storage_flood_stage_usage_percent, "95"); // 95% // The min bytes that should be left of a data dir - CONF_Int64(capacity_min_left_bytes_flood_stage, "1073741824") // 1GB + CONF_Int64(storage_flood_stage_left_capacity_bytes, "1073741824") // 1GB } // namespace config } // namespace doris diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index 11f138208ea1cd..c186c5927c5730 100755 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -1081,8 +1081,8 @@ bool DataDir::reach_capacity_limit(int64_t incoming_data_size) { double used_pct = (_available_bytes + incoming_data_size) / (double) _disk_capacity_bytes; int64_t left_bytes = _disk_capacity_bytes - _available_bytes - incoming_data_size; - if (used_pct >= config::capacity_used_percent_flood_stage / 100.0 - && left_bytes <= config::capacity_min_left_bytes_flood_stage) { + if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 + && left_bytes <= config::storage_flood_stage_left_capacity_bytes) { LOG(WARNING) << "reach capacity limit. used pct: " << used_pct << ", left bytes: " << left_bytes << ", path: " << _path; return true; diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 555f8127ad4836..caba4108028c17 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -567,7 +567,7 @@ OLAPStatus StorageEngine::start_trash_sweep(double* usage) { const int32_t snapshot_expire = config::snapshot_expire_time_sec; const int32_t trash_expire = config::trash_file_expire_time_sec; - const double guard_space = config::capacity_used_percent_flood_stage / 100.0; + const double guard_space = config::storage_flood_stage_usage_percent / 100.0; std::vector data_dir_infos; res = get_all_data_dir_info(&data_dir_infos, false); if (res != OLAP_SUCCESS) { diff --git a/docs/documentation/cn/administrator-guide/operation/disk-capacity.md b/docs/documentation/cn/administrator-guide/operation/disk-capacity.md new file mode 100644 index 00000000000000..502305fd71f5ce --- /dev/null +++ b/docs/documentation/cn/administrator-guide/operation/disk-capacity.md @@ -0,0 +1,122 @@ +# 磁盘空间管理 + +本文档主要介绍和磁盘存储空间有关的系统参数和处理策略。 + +Doris 的数据磁盘空间如果不加以控制,会因磁盘写满而导致进程挂掉。因此我们监测磁盘的使用率和剩余空间,通过设置不同的警戒水位,来控制 Doris 系统中的各项操作,尽量避免发生磁盘被写满的情况。 + +## 名词解释 + +* FE:Frontend,Doris 的前端节点。负责元数据管理和请求接入。 +* BE:Backend,Doris 的后端节点。负责查询执行和数据存储。 +* Data Dir:数据目录,在 BE 配置文件 `be.conf` 的 `storage_root_path` 中指定的各个数据目录。通常一个数据目录对应一个磁盘、因此下文中 **磁盘** 也指代一个数据目录。 + +## 基本原理 + +BE 定期(每隔一分钟)会向 FE 汇报一次磁盘使用情况。FE 记录这些统计值,并根据这些统计值,限制不同的操作请求。 + +在 FE 中分别设置了 **高水位(High Watermark)** 和 **危险水位(Flood Stage)** 两级阈值。危险水位高于高水位。当磁盘使用率高于高水位时,Doris 会限制某些操作的执行(如副本均衡等)。而如果高于危险水位,则会禁止某些操作的执行(如导入)。 + +同时,在 BE 上也设置了 **危险水位(Flood Stage)**。考虑到 FE 并不能完全及时的检测到 BE 上的磁盘使用情况,以及无法控制某些 BE 自身运行的操作(如 Compaction)。因此 BE 上的危险水位用于 BE 主动拒绝和停止某些操作,达到自我保护的目的。 + +## FE 参数 + +**高水位:** + +``` +storage_high_watermark_usage_percent 默认 85 (85%)。 +storage_min_left_capacity_bytes 默认 2GB。 +``` + +当磁盘空间使用率**大于** `storage_high_watermark_usage_percent`,**或者** 磁盘空间剩余大小**小于** `storage_min_left_capacity_bytes` 时,该磁盘不会再被作为以下操作的目的路径: + +* Tablet 均衡操作(Balance) +* Colocation 表数据分片的重分布(Relocation) +* Decommission + +**危险水位:** + +``` +storage_flood_stage_usage_percent 默认 95 (95%)。 +storage_flood_stage_left_capacity_bytes 默认 1GB。 +``` + +当磁盘空间使用率**大于** `storage_flood_stage_usage_percent`,**并且** 磁盘空间剩余大小**小于** `storage_flood_stage_left_capacity_bytes` 时,该磁盘不会再被作为以下操作的目的路径,并禁止某些操作: + +* Tablet 均衡操作(Balance) +* Colocation 表数据分片的重分布(Relocation) +* 副本补齐 +* 恢复操作(Restore) +* 数据导入(Load/Insert) + +## BE 参数 + +**危险水位:** + +``` +capacity_used_percent_flood_stage 默认 95 (95%)。 +capacity_min_left_bytes_flood_stage 默认 1GB。 +``` + +当磁盘空间使用率**大于** `storage_flood_stage_usage_percent`,**并且** 磁盘空间剩余大小**小于** `storage_flood_stage_left_capacity_bytes` 时,该磁盘上的以下操作会被禁止: + +* Base/Cumulative Compaction。 +* 数据写入。包括各种导入操作。 +* Clone Task。通常发生于副本修复或均衡时。 +* Push Task。发生在 Hadoop 导入的 Loading 阶段,下载文件。 +* Alter Task。Schema Change 或 Rollup 任务。 +* Download Task。恢复操作的 Downloading 阶段。 + +## 磁盘空间释放 + +当磁盘空间高于高水位甚至危险水位后,很多操作都会被禁止。此时可以尝试通过以下方式减少磁盘使用率,恢复系统。 + +* 删除表或分区 + + 通过删除表或分区的方式,能够快速降低磁盘空间使用率,恢复集群。**注意:只有 `DROP` 操作可以达到快速降低磁盘空间使用率的目的,`DELETE` 操作不可以。** + + ``` + DROP TABLE tbl; + ALTER TABLE tbl DROP PARTITION p1; + ``` + +* 扩容 BE + + 扩容后,数据分片会自动均衡到磁盘使用率较低的 BE 节点上。扩容操作会根据数据量及节点数量不同,在数小时或数天后使集群到达均衡状态。 + +* 修改表或分区的副本 + + 可以将表或分区的副本数降低。比如默认3副本可以降低为2副本。该方法虽然降低了数据的可靠性,但是能够快速的降低磁盘使用率,使集群恢复正常。该方法通常用于紧急恢复系统。请在恢复后,通过扩容或删除数据等方式,降低磁盘使用率后,将副本数恢复为 3。 + + 修改副本操作为瞬间生效,后台会自动异步的删除多余的副本。 + + ``` + ALTER TABLE tbl MODIFY PARTITION p1 SET("replication_num" = "2"); + ``` + +* 删除多余文件 + + 当 BE 进程已经因为磁盘写满而挂掉并无法启动时(此现象可能因 FE 或 BE 检测不及时而发生)。需要通过删除数据目录下的一些临时文件,保证 BE 进程能够启动。以下目录中的文件可以直接删除: + + * log/:日志目录下的日志文件。 + * snapshot/: 快照目录下的快照文件。 + * trash/:回收站中的文件。 + +* 删除数据文件(危险!!!) + + 当以上操作都无法释放空间时,需要通过删除数据文件来释放空间。数据文件在指定数据目录的 `data/` 目录下。删除数据分片(Tablet)必须先确保该 Tablet 至少有一个副本是正常的,否则**删除唯一副本会导致数据丢失**。假设我们要删除 id 为 12345 的 Tablet: + + * 找到 Tablet 对应的目录,通常位于 `data/shard_id/tablet_id/` 下。如: + + ```data/0/12345/``` + + * 记录 tablet id 和 schema hash。其中 schema hash 为上一步目录的下一级目录名。如下为 352781111: + + ```data/0/12345/352781111``` + + * 删除数据目录: + + ```rm -rf data/0/12345/``` + + * 删除 Tablet 元数据(具体参考 [Tablet 元数据管理工具](./tablet-meta-tool.md)) + + ```./lib/meta_tool --operation=delete_header --root_path=/path/to/root_path --tablet_id=12345 --schema_hash= 352781111``` \ No newline at end of file diff --git a/fe/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/src/main/java/org/apache/doris/alter/SystemHandler.java index e951f3439a4424..470488e628a79b 100644 --- a/fe/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -268,7 +268,7 @@ public static Map> checkDecommission(List totalAvailableCapacityB * Config.storage_high_watermark_usage_percent) { + if (totalNeededCapacityB > totalAvailableCapacityB * (Config.storage_high_watermark_usage_percent / 100.0)) { throw new DdlException("No available capacity for decommission in cluster: " + clusterName + ", needed: " + totalNeededCapacityB + ", available: " + totalAvailableCapacityB + ", threshold: " + Config.storage_high_watermark_usage_percent); diff --git a/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java b/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java index 9b6f5effb3e7fa..8470c0cea5b908 100644 --- a/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java +++ b/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java @@ -139,10 +139,10 @@ public boolean exceedLimit(boolean floodStage) { floodStage, diskAvailableCapacityB, totalCapacityB); if (floodStage) { return diskAvailableCapacityB < Config.storage_flood_stage_left_capacity_bytes && - (double) (totalCapacityB - diskAvailableCapacityB) / totalCapacityB > Config.storage_flood_stage_usage_percent; + (double) (totalCapacityB - diskAvailableCapacityB) / totalCapacityB > (Config.storage_flood_stage_usage_percent / 100.0); } else { return diskAvailableCapacityB < Config.storage_min_left_capacity_bytes || - (double) (totalCapacityB - diskAvailableCapacityB) / totalCapacityB > Config.storage_high_watermark_usage_percent; + (double) (totalCapacityB - diskAvailableCapacityB) / totalCapacityB > (Config.storage_high_watermark_usage_percent / 100.0); } } diff --git a/fe/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java b/fe/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java index 105837cad612dd..2b5fee92cca8cd 100644 --- a/fe/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java +++ b/fe/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java @@ -93,7 +93,7 @@ public BalanceStatus isFit(long tabletSize, boolean isSupplement) { } if (isSupplement) { - if ((usedCapacityB + tabletSize) / (double) capacityB > Config.storage_flood_stage_usage_percent + if ((usedCapacityB + tabletSize) / (double) capacityB > (Config.storage_flood_stage_usage_percent / 100.0) && capacityB - usedCapacityB - tabletSize < Config.storage_flood_stage_left_capacity_bytes) { return new BalanceStatus(ErrCode.COMMON_ERROR, toString() + " does not fit tablet with size: " + tabletSize + ", limitation reached"); @@ -102,7 +102,7 @@ public BalanceStatus isFit(long tabletSize, boolean isSupplement) { } } - if ((usedCapacityB + tabletSize) / (double) capacityB > Config.storage_high_watermark_usage_percent + if ((usedCapacityB + tabletSize) / (double) capacityB > (Config.storage_high_watermark_usage_percent / 100.0) || capacityB - usedCapacityB - tabletSize < Config.storage_min_left_capacity_bytes) { return new BalanceStatus(ErrCode.COMMON_ERROR, toString() + " does not fit tablet with size: " + tabletSize); diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 5e1c5a619d4efa..5e02f988275b69 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -668,9 +668,9 @@ public class Config extends ConfigBase { * But for tablet recovery, we may exceed these limit for keeping data integrity as much as possible. */ @ConfField(mutable = true, masterOnly = true) - public static double storage_high_watermark_usage_percent = 0.85; + public static int storage_high_watermark_usage_percent = 85; @ConfField(mutable = true, masterOnly = true) - public static double storage_min_left_capacity_bytes = 1024 * 1024 * 1024; // 1G + public static long storage_min_left_capacity_bytes = 2 * 1024 * 1024 * 1024; // 2G /* * If capacity of disk reach the 'storage_flood_stage_usage_percent' and 'storage_flood_stage_left_capacity_bytes', @@ -679,9 +679,9 @@ public class Config extends ConfigBase { * 2. restore job */ @ConfField(mutable = true, masterOnly = true) - public static double storage_flood_stage_usage_percent = 0.95; + public static int storage_flood_stage_usage_percent = 95; @ConfField(mutable = true, masterOnly = true) - public static double storage_flood_stage_left_capacity_bytes = 100 * 1024 * 1024; // 100MB + public static long storage_flood_stage_left_capacity_bytes = 1 * 1024 * 1024 * 1024; // 100MB // update interval of tablet stat // All frontends will get tablet stat from all backends at each interval From 5dde7995001c49df30b0f81e8d7d24f4ec608977 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 26 Aug 2019 19:08:43 +0800 Subject: [PATCH 7/8] fix ut --- fe/src/test/java/org/apache/doris/catalog/BackendTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/fe/src/test/java/org/apache/doris/catalog/BackendTest.java b/fe/src/test/java/org/apache/doris/catalog/BackendTest.java index 131a1bdbc33261..48f323dc06ff4c 100644 --- a/fe/src/test/java/org/apache/doris/catalog/BackendTest.java +++ b/fe/src/test/java/org/apache/doris/catalog/BackendTest.java @@ -21,6 +21,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.metric.MetricRepo; import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TDisk; import org.easymock.EasyMock; @@ -57,12 +58,17 @@ public class BackendTest { private Catalog catalog; + private SystemInfoService systemInfoService; + @Before public void setUp() { + systemInfoService = new SystemInfoService(); + catalog = AccessTestUtil.fetchAdminCatalog(); PowerMock.mockStatic(Catalog.class); EasyMock.expect(Catalog.getInstance()).andReturn(catalog).anyTimes(); EasyMock.expect(Catalog.getCurrentCatalogJournalVersion()).andReturn(FeConstants.meta_version).anyTimes(); + EasyMock.expect(Catalog.getCurrentSystemInfo()).andReturn(systemInfoService).anyTimes(); PowerMock.replay(Catalog.class); backend = new Backend(backendId, host, heartbeatPort); From 1c1452670602102bc4d8bf99db5a6d5817cdfa7e Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 26 Aug 2019 20:53:56 +0800 Subject: [PATCH 8/8] fix by review --- be/src/olap/base_compaction.cpp | 1 - be/src/olap/compaction.cpp | 13 ------------- be/src/olap/compaction.h | 2 -- be/src/olap/cumulative_compaction.cpp | 1 - be/src/olap/schema_change.cpp | 7 ------- .../apache/doris/load/loadv2/BrokerLoadJobTest.java | 1 - .../org/apache/doris/planner/OlapTableSinkTest.java | 11 ++++++----- 7 files changed, 6 insertions(+), 30 deletions(-) diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index 20f40f38bd7914..a77c6ccf9d3925 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -39,7 +39,6 @@ OLAPStatus BaseCompaction::compact() { // 1. pick rowsets to compact RETURN_NOT_OK(pick_rowsets_to_compact()); - RETURN_NOT_OK(check_disk_capacity()); // 2. do base compaction, merge rowsets RETURN_NOT_OK(do_compaction()); diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 8460240bb80eeb..b4049a25efd888 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -186,17 +186,4 @@ OLAPStatus Compaction::check_correctness(const Merger& merger) { return OLAP_SUCCESS; } -OLAPStatus Compaction::check_disk_capacity() { - int64_t incoming_data_size = 0; - for (auto& rowset : _input_rowsets) { - incoming_data_size += rowset->data_disk_size(); - incoming_data_size += rowset->index_disk_size(); - } - - if (_tablet->data_dir()->reach_capacity_limit(incoming_data_size)) { - return OLAP_ERR_DISK_REACH_CAPACITY_LIMIT; - } - return OLAP_SUCCESS; -} - } // namespace doris diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 87c51d65b65da6..40d80d62ccbc55 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -64,8 +64,6 @@ class Compaction { OLAPStatus check_version_continuity(const std::vector& rowsets); OLAPStatus check_correctness(const Merger& merger); - OLAPStatus check_disk_capacity(); - protected: TabletSharedPtr _tablet; diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 58484f06fc305e..35ddb8174734dc 100755 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -43,7 +43,6 @@ OLAPStatus CumulativeCompaction::compact() { // 2. pick rowsets to compact RETURN_NOT_OK(pick_rowsets_to_compact()); - RETURN_NOT_OK(check_disk_capacity()); // 3. do cumulative compaction, merge rowsets RETURN_NOT_OK(do_compaction()); diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 7be09c21f47dc7..6e7f7bb263fbbb 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -1889,13 +1889,6 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa VLOG(10) << "begin to convert a history rowset. version=" << rs_reader->version().first << "-" << rs_reader->version().second; - // check disk capacity - int64_t tablet_size = rs_reader->rowset()->index_disk_size() + rs_reader->rowset()->data_disk_size(); - if (sc_params.new_tablet->data_dir()->reach_capacity_limit(tablet_size)) { - res = OLAP_ERR_DISK_REACH_CAPACITY_LIMIT; - goto PROCESS_ALTER_EXIT; - } - // set status for monitor // 只要有一个new_table为running,ref table就设置为running // NOTE 如果第一个sub_table先fail,这里会继续按正常走 diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index bd0ae6232c318c..c308e2f72d8ffc 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -113,7 +113,6 @@ public void testFromLoadStmt(@Injectable LoadStmt loadStmt, List dataDescriptionList = Lists.newArrayList(); dataDescriptionList.add(dataDescription); - new Expectations() { { loadStmt.getLabel(); diff --git a/fe/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java b/fe/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java index e6e3a3b1d0383b..891c78de6c2490 100644 --- a/fe/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java +++ b/fe/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java @@ -33,7 +33,6 @@ import org.apache.doris.catalog.SinglePartitionInfo; import org.apache.doris.catalog.Tablet; import org.apache.doris.common.UserException; -import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TUniqueId; @@ -42,20 +41,22 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.junit.Before; import org.junit.Test; import mockit.Expectations; import mockit.Injectable; -import mockit.Mocked; public class OlapTableSinkTest { private static final Logger LOG = LogManager.getLogger(OlapTableSinkTest.class); @Injectable - OlapTable dstTable; + public OlapTable dstTable; - @Mocked - SystemInfoService systemInfoService; + @Before + public void setUp() { + + } private TupleDescriptor getTuple() { DescriptorTable descTable = new DescriptorTable();