From e18f326f6c031064617d5305500ef6c41c98f6ba Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 19 Aug 2019 23:04:39 +0800 Subject: [PATCH 1/3] Release snapshot when finishing or cancelling backup/restore job Snapshot may takes a lot disk space if not releasing them in time --- be/src/agent/task_worker_pool.cpp | 2 +- be/src/olap/snapshot_manager.cpp | 14 +++++++--- be/src/olap/snapshot_manager.h | 1 + be/src/olap/storage_engine.cpp | 18 ++++++++++--- be/src/olap/storage_engine.h | 2 +- .../org/apache/doris/backup/BackupJob.java | 27 +++++++++++++++++-- .../org/apache/doris/backup/RestoreJob.java | 24 +++++++++++++++++ .../org/apache/doris/backup/SnapshotInfo.java | 2 +- .../apache/doris/task/ExportPendingTask.java | 1 + .../org/apache/doris/task/SnapshotTask.java | 11 ++++---- 10 files changed, 85 insertions(+), 17 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 5bdb75fd0caa31..ba4d7677c65d78 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1525,7 +1525,7 @@ void* TaskWorkerPool::_make_snapshot_thread_callback(void* arg_this) { << ", snapshot_path:" << snapshot_path; if (snapshot_request.__isset.list_files) { // list and save all snapshot files - // snapshot_path like: data/snapshot/20180417205230.1 + // snapshot_path like: data/snapshot/20180417205230.1.86400 // we need to add subdir: tablet_id/schema_hash/ std::stringstream ss; ss << snapshot_path << "/" << snapshot_request.tablet_id diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index 5b8e01b6caea15..2480159d9a3b4b 100755 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -108,7 +108,7 @@ OLAPStatus SnapshotManager::release_snapshot(const string& snapshot_path) { && snapshot_path.compare(abs_path.size(), SNAPSHOT_PREFIX.size(), SNAPSHOT_PREFIX) == 0) { remove_all_dir(snapshot_path); - VLOG(3) << "success to release snapshot path. [path='" << snapshot_path << "']"; + LOG(INFO) << "success to release snapshot path. [path='" << snapshot_path << "']"; return OLAP_SUCCESS; } @@ -283,8 +283,11 @@ OLAPStatus SnapshotManager::_rename_rowset_id(const RowsetMetaPB& rs_meta_pb, co return OLAP_SUCCESS; } +// get snapshot path: curtime.seq.timeout +// eg: 20190819221234.3.86400 OLAPStatus SnapshotManager::_calc_snapshot_id_path( const TabletSharedPtr& tablet, + int64_t timeout_s, string* out_path) { OLAPStatus res = OLAP_SUCCESS; if (out_path == nullptr) { @@ -303,7 +306,8 @@ OLAPStatus SnapshotManager::_calc_snapshot_id_path( stringstream snapshot_id_path_stream; MutexLock auto_lock(&_snapshot_mutex); // will automatically unlock when function return. snapshot_id_path_stream << tablet->data_dir()->path() << SNAPSHOT_PREFIX - << "/" << time_str << "." << _snapshot_base_id++; + << "/" << time_str << "." << _snapshot_base_id++ + << "." << timeout_s; *out_path = snapshot_id_path_stream.str(); return res; } @@ -356,7 +360,11 @@ OLAPStatus SnapshotManager::_create_snapshot_files( } string snapshot_id_path; - res = _calc_snapshot_id_path(ref_tablet, &snapshot_id_path); + int64_t timeout_s = config::snapshot_expire_time_sec; + if (request.__isset.timeout) { + timeout_s = request.__isset.timeout; + } + res = _calc_snapshot_id_path(ref_tablet, timeout_s, &snapshot_id_path); if (res != OLAP_SUCCESS) { LOG(WARNING) << "failed to calc snapshot_id_path, ref tablet=" << ref_tablet->data_dir()->path(); diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h index ef8131cda77c4f..c2b3214a1c63d8 100644 --- a/be/src/olap/snapshot_manager.h +++ b/be/src/olap/snapshot_manager.h @@ -73,6 +73,7 @@ class SnapshotManager { OLAPStatus _calc_snapshot_id_path( const TabletSharedPtr& tablet, + int64_t timeout_s, std::string* out_path); std::string _get_header_full_path( diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 6d7bd20a65dc74..5b596b2778728d 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -606,8 +606,8 @@ OLAPStatus StorageEngine::start_trash_sweep(double* usage) { OLAPStatus res = OLAP_SUCCESS; LOG(INFO) << "start trash and snapshot sweep."; - const uint32_t snapshot_expire = config::snapshot_expire_time_sec; - const uint32_t trash_expire = config::trash_file_expire_time_sec; + const int32_t snapshot_expire = config::snapshot_expire_time_sec; + const int32_t trash_expire = config::trash_file_expire_time_sec; const double guard_space = config::disk_capacity_insufficient_percentage / 100.0; std::vector data_dir_infos; res = get_all_data_dir_info(&data_dir_infos); @@ -679,7 +679,7 @@ void StorageEngine::_clean_unused_txns() { } OLAPStatus StorageEngine::_do_sweep( - const string& scan_root, const time_t& local_now, const uint32_t expire) { + const string& scan_root, const time_t& local_now, const int32_t expire) { OLAPStatus res = OLAP_SUCCESS; if (!check_dir_existed(scan_root)) { // dir not existed. no need to sweep trash. @@ -700,7 +700,17 @@ OLAPStatus StorageEngine::_do_sweep( res = OLAP_ERR_OS_ERROR; continue; } - if (difftime(local_now, mktime(&local_tm_create)) >= expire) { + + int32_t actual_expire = expire; + // try get timeout in dir name, the old snapshot dir does not contain timeout + // eg: 20190818221123.3.86400, the 86400 is timeout, in second + size_t pos = dir_name.find('.', str_time.size() + 1); + if (pos != string::npos) { + actual_expire = std::stoi(dir_name.substr(pos + 1)); + } + VLOG(10) << "get actual expire time " << actual_expire << " of dir: " << dir_name; + + if (difftime(local_now, mktime(&local_tm_create)) >= actual_expire) { if (remove_all_dir(path_name) != OLAP_SUCCESS) { LOG(WARNING) << "fail to remove file or directory. path=" << path_name; res = OLAP_ERR_OS_ERROR; diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 59e3555cd6559c..0bdfcfef6c21f5 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -222,7 +222,7 @@ class StorageEngine { void _clean_unused_txns(); OLAPStatus _do_sweep( - const std::string& scan_root, const time_t& local_tm_now, const uint32_t expire); + const std::string& scan_root, const time_t& local_tm_now, const int32_t expire); // Thread functions // unused rowset monitor thread diff --git a/fe/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/src/main/java/org/apache/doris/backup/BackupJob.java index ee2cf15d9c577d..13b656ae269448 100644 --- a/fe/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/src/main/java/org/apache/doris/backup/BackupJob.java @@ -35,6 +35,7 @@ import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; +import org.apache.doris.task.ReleaseSnapshotTask; import org.apache.doris.task.SnapshotTask; import org.apache.doris.task.UploadTask; import org.apache.doris.thrift.TFinishTaskRequest; @@ -152,9 +153,9 @@ public synchronized boolean finishTabletSnapshotTask(SnapshotTask task, TFinishT Preconditions.checkState(request.isSetSnapshot_files()); // snapshot path does not contains last 'tablet_id' and 'schema_hash' dir // eg: - // /path/to/your/be/data/snapshot/20180410102311.0/ + // /path/to/your/be/data/snapshot/20180410102311.0.86400/ // Full path will look like: - // /path/to/your/be/data/snapshot/20180410102311.0/10006/352781111/ + // /path/to/your/be/data/snapshot/20180410102311.0.86400/10006/352781111/ SnapshotInfo info = new SnapshotInfo(task.getDbId(), task.getTableId(), task.getPartitionId(), task.getIndexId(), task.getTabletId(), task.getBackendId(), task.getSchemaHash(), request.getSnapshot_path(), @@ -583,6 +584,10 @@ private void saveMetaInfo() { // meta info and job info has been saved to local file, this can be cleaned to reduce log size backupMeta = null; jobInfo = null; + + // release all snapshots before clearing the snapshotInfos. + releaseSnapshots(); + snapshotInfos.clear(); // log @@ -591,6 +596,22 @@ private void saveMetaInfo() { localMetaInfoFilePath, localJobInfoFilePath, this); } + private void releaseSnapshots() { + if (snapshotInfos.isEmpty()) { + return; + } + // we do not care about the release snapshot tasks' success or failure, + // the GC thread on BE will sweep the snapshot, finally. + AgentBatchTask batchTask = new AgentBatchTask(); + for (SnapshotInfo info : snapshotInfos.values()) { + ReleaseSnapshotTask releaseTask = new ReleaseSnapshotTask(null, info.getBeId(), info.getDbId(), + info.getTabletId(), info.getPath()); + batchTask.addTask(releaseTask); + } + AgentTaskExecutor.submit(batchTask); + LOG.info("send {} release snapshot tasks, job: {}", snapshotInfos.size(), this); + } + private void uploadMetaAndJobInfoFile() { String remoteMetaInfoFile = repo.assembleMetaInfoFilePath(label); if (!uploadFile(localMetaInfoFilePath, remoteMetaInfoFile)) { @@ -684,6 +705,8 @@ private void cancelInternal() { } } + releaseSnapshots(); + BackupJobState curState = state; finishedTime = System.currentTimeMillis(); state = BackupJobState.CANCELLED; diff --git a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java index bb850ffc5f409a..83642a2c88f93c 100644 --- a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -55,6 +55,7 @@ import org.apache.doris.task.CreateReplicaTask; import org.apache.doris.task.DirMoveTask; import org.apache.doris.task.DownloadTask; +import org.apache.doris.task.ReleaseSnapshotTask; import org.apache.doris.task.SnapshotTask; import org.apache.doris.thrift.TFinishTaskRequest; import org.apache.doris.thrift.TStatusCode; @@ -1230,6 +1231,10 @@ private Status allTabletCommitted(boolean isReplay) { if (!isReplay) { restoredPartitions.clear(); restoredTbls.clear(); + + // release snapshot before clearing snapshotInfos + releaseSnapshots(); + snapshotInfos.clear(); finishedTime = System.currentTimeMillis(); @@ -1242,6 +1247,22 @@ private Status allTabletCommitted(boolean isReplay) { return Status.OK; } + private void releaseSnapshots() { + if (snapshotInfos.isEmpty()) { + return; + } + // we do not care about the release snapshot tasks' success or failure, + // the GC thread on BE will sweep the snapshot, finally. + AgentBatchTask batchTask = new AgentBatchTask(); + for (SnapshotInfo info : snapshotInfos.values()) { + ReleaseSnapshotTask releaseTask = new ReleaseSnapshotTask(null, info.getBeId(), info.getDbId(), + info.getTabletId(), info.getPath()); + batchTask.addTask(releaseTask); + } + AgentTaskExecutor.submit(batchTask); + LOG.info("send {} release snapshot tasks, job: {}", snapshotInfos.size(), this); + } + private void replayWaitingAllTabletsCommitted() { allTabletCommitted(true /* is replay */); } @@ -1371,6 +1392,9 @@ public void cancelInternal(boolean isReplay) { // backupMeta is useless backupMeta = null; + releaseSnapshots(); + + snapshotInfos.clear(); RestoreJobState curState = state; finishedTime = System.currentTimeMillis(); state = RestoreJobState.CANCELLED; diff --git a/fe/src/main/java/org/apache/doris/backup/SnapshotInfo.java b/fe/src/main/java/org/apache/doris/backup/SnapshotInfo.java index d2462e527438bb..9a6a6ec6dedafe 100644 --- a/fe/src/main/java/org/apache/doris/backup/SnapshotInfo.java +++ b/fe/src/main/java/org/apache/doris/backup/SnapshotInfo.java @@ -36,7 +36,7 @@ public class SnapshotInfo implements Writable { private long tabletId; private long beId; private int schemaHash; - // eg: /path/to/your/be/data/snapshot/20180410102311.0/ + // eg: /path/to/your/be/data/snapshot/20180410102311.0.86400/ private String path; // eg: // 10006_0_1_0_0.dat diff --git a/fe/src/main/java/org/apache/doris/task/ExportPendingTask.java b/fe/src/main/java/org/apache/doris/task/ExportPendingTask.java index 907b7ecf64d50b..dd690254d3efa2 100644 --- a/fe/src/main/java/org/apache/doris/task/ExportPendingTask.java +++ b/fe/src/main/java/org/apache/doris/task/ExportPendingTask.java @@ -113,6 +113,7 @@ private Status makeSnapshots() { snapshotRequest.setSchema_hash(Integer.parseInt(paloScanRange.getSchema_hash())); snapshotRequest.setVersion(Long.parseLong(paloScanRange.getVersion())); snapshotRequest.setVersion_hash(Long.parseLong(paloScanRange.getVersion_hash())); + snapshotRequest.setTimeout(job.getTimeoutSecond()); AgentClient client = new AgentClient(host, port); TAgentResult result = client.makeSnapshot(snapshotRequest); diff --git a/fe/src/main/java/org/apache/doris/task/SnapshotTask.java b/fe/src/main/java/org/apache/doris/task/SnapshotTask.java index 185790f54e7955..2ea7184ccbb0b6 100644 --- a/fe/src/main/java/org/apache/doris/task/SnapshotTask.java +++ b/fe/src/main/java/org/apache/doris/task/SnapshotTask.java @@ -29,13 +29,13 @@ public class SnapshotTask extends AgentTask { private int schemaHash; - private long timeout; + private long timeoutMs; private boolean isRestoreTask; public SnapshotTask(TResourceInfo resourceInfo, long backendId, long signature, long jobId, long dbId, long tableId, long partitionId, long indexId, long tabletId, - long version, long versionHash, int schemaHash, long timeout, boolean isRestoreTask) { + long version, long versionHash, int schemaHash, long timeoutMs, boolean isRestoreTask) { super(resourceInfo, backendId, TTaskType.MAKE_SNAPSHOT, dbId, tableId, partitionId, indexId, tabletId, signature); @@ -45,7 +45,7 @@ public SnapshotTask(TResourceInfo resourceInfo, long backendId, long signature, this.versionHash = versionHash; this.schemaHash = schemaHash; - this.timeout = timeout; + this.timeoutMs = timeoutMs; this.isRestoreTask = isRestoreTask; } @@ -66,8 +66,8 @@ public int getSchemaHash() { return schemaHash; } - public long getTimeout() { - return timeout; + public long getTimeoutMs() { + return timeoutMs; } public boolean isRestoreTask() { @@ -80,6 +80,7 @@ public TSnapshotRequest toThrift() { request.setVersion_hash(versionHash); request.setList_files(true); request.setPreferred_snapshot_version(2); + request.setTimeout(timeoutMs / 1000); return request; } } From 57fe90419fdcc8da3fa8f4bf66cefbc8dcba2da2 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 20 Aug 2019 09:43:58 +0800 Subject: [PATCH 2/3] add clone snapshot timeout --- be/src/common/config.h | 4 ++-- be/src/olap/snapshot_manager.cpp | 2 +- be/src/olap/task/engine_clone_task.cpp | 4 ++++ fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java | 6 +++--- fe/src/main/java/org/apache/doris/task/CloneTask.java | 6 +++++- gensrc/thrift/AgentService.thrift | 1 + 6 files changed, 16 insertions(+), 7 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 80e74a3daa7d57..54072d901837f5 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -212,8 +212,8 @@ namespace config { // inc_rowset expired interval CONF_Int32(inc_rowset_expired_sec, "1800"); // garbage sweep policy - CONF_Int32(max_garbage_sweep_interval, "43200"); - CONF_Int32(min_garbage_sweep_interval, "200"); + CONF_Int32(max_garbage_sweep_interval, "14400"); + CONF_Int32(min_garbage_sweep_interval, "180"); CONF_Int32(snapshot_expire_time_sec, "172800"); // 仅仅是建议值,当磁盘空间不足时,trash下的文件保存期可不遵守这个参数 CONF_Int32(trash_file_expire_time_sec, "259200"); diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index 2480159d9a3b4b..407ac465f0dfef 100755 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -362,7 +362,7 @@ OLAPStatus SnapshotManager::_create_snapshot_files( string snapshot_id_path; int64_t timeout_s = config::snapshot_expire_time_sec; if (request.__isset.timeout) { - timeout_s = request.__isset.timeout; + timeout_s = request.timeout; } res = _calc_snapshot_id_path(ref_tablet, timeout_s, &snapshot_id_path); if (res != OLAP_SUCCESS) { diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 6a2bb650d6b941..c70d27489f6456 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -323,6 +323,10 @@ AgentStatus EngineCloneTask::_clone_copy( } snapshot_request.__set_missing_version(snapshot_versions); } + if (clone_req.__isset.timeout_s) { + snapshot_request.__set_timeout(clone_req.timeout_s); + } + agent_client.make_snapshot( snapshot_request, &make_snapshot_result); diff --git a/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index 175cd04be899fe..e2fac5d6946d00 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -669,6 +669,8 @@ public CloneTask createCloneReplicaAndTask() throws SchedException { "dest backend " + srcReplica.getBackendId() + " does not exist"); } + taskTimeoutMs = getApproximateTimeoutMs(); + // create the clone task and clone replica. // we use visible version in clone task, but set the clone replica's last failed version to // committed version. @@ -682,7 +684,7 @@ public CloneTask createCloneReplicaAndTask() throws SchedException { TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), srcBe.getHttpPort()); cloneTask = new CloneTask(destBackendId, dbId, tblId, partitionId, indexId, tabletId, schemaHash, Lists.newArrayList(tSrcBe), storageMedium, - visibleVersion, visibleVersionHash); + visibleVersion, visibleVersionHash, (int) (taskTimeoutMs / 1000)); cloneTask.setPathHash(srcPathHash, destPathHash); // if this is a balance task, or this is a repair task with REPLICA_MISSING/REPLICA_RELOCATING or REPLICA_MISSING_IN_CLUSTER, @@ -714,8 +716,6 @@ public CloneTask createCloneReplicaAndTask() throws SchedException { } } - taskTimeoutMs = getApproximateTimeoutMs(); - this.state = State.RUNNING; return cloneTask; } diff --git a/fe/src/main/java/org/apache/doris/task/CloneTask.java b/fe/src/main/java/org/apache/doris/task/CloneTask.java index 8aef8ab83f2080..b3243783dd1954 100644 --- a/fe/src/main/java/org/apache/doris/task/CloneTask.java +++ b/fe/src/main/java/org/apache/doris/task/CloneTask.java @@ -38,18 +38,21 @@ public class CloneTask extends AgentTask { private long srcPathHash = -1; private long destPathHash = -1; + + private int timeoutS; private int taskVersion = VERSION_1; public CloneTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId, int schemaHash, List srcBackends, TStorageMedium storageMedium, - long visibleVersion, long visibleVersionHash) { + long visibleVersion, long visibleVersionHash, int timeoutS) { super(null, backendId, TTaskType.CLONE, dbId, tableId, partitionId, indexId, tabletId); this.schemaHash = schemaHash; this.srcBackends = srcBackends; this.storageMedium = storageMedium; this.visibleVersion = visibleVersion; this.visibleVersionHash = visibleVersionHash; + this.timeoutS = timeoutS; } public int getSchemaHash() { @@ -88,6 +91,7 @@ public TCloneReq toThrift() { request.setSrc_path_hash(srcPathHash); request.setDest_path_hash(destPathHash); } + request.setTimeout_s(timeoutS); return request; } diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index a382fbde4a65b0..a82f0b10caef25 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -118,6 +118,7 @@ struct TCloneReq { 7: optional i32 task_version; 8: optional i64 src_path_hash; 9: optional i64 dest_path_hash; + 10: optional i32 timeout_s; } struct TStorageMediumMigrateReq { From 3d3519195bb86231b5a36cf1e2e9041f34a04216 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 20 Aug 2019 12:38:02 +0800 Subject: [PATCH 3/3] fix compile bug --- fe/src/test/java/org/apache/doris/task/AgentTaskTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/src/test/java/org/apache/doris/task/AgentTaskTest.java index 1a47123a2754dd..317dd8650b8995 100644 --- a/fe/src/test/java/org/apache/doris/task/AgentTaskTest.java +++ b/fe/src/test/java/org/apache/doris/task/AgentTaskTest.java @@ -125,7 +125,7 @@ public void setUp() throws AnalysisException { // clone cloneTask = new CloneTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, schemaHash1, - Arrays.asList(new TBackend("host1", 8290, 8390)), TStorageMedium.HDD, -1, -1); + Arrays.asList(new TBackend("host1", 8290, 8390)), TStorageMedium.HDD, -1, -1, 3600); // rollup rollupTask =