Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1525,7 +1525,7 @@ void* TaskWorkerPool::_make_snapshot_thread_callback(void* arg_this) {
<< ", snapshot_path:" << snapshot_path;
if (snapshot_request.__isset.list_files) {
// list and save all snapshot files
// snapshot_path like: data/snapshot/20180417205230.1
// snapshot_path like: data/snapshot/20180417205230.1.86400
// we need to add subdir: tablet_id/schema_hash/
std::stringstream ss;
ss << snapshot_path << "/" << snapshot_request.tablet_id
Expand Down
4 changes: 2 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ namespace config {
// inc_rowset expired interval
CONF_Int32(inc_rowset_expired_sec, "1800");
// garbage sweep policy
CONF_Int32(max_garbage_sweep_interval, "43200");
CONF_Int32(min_garbage_sweep_interval, "200");
CONF_Int32(max_garbage_sweep_interval, "14400");
CONF_Int32(min_garbage_sweep_interval, "180");
CONF_Int32(snapshot_expire_time_sec, "172800");
// 仅仅是建议值,当磁盘空间不足时,trash下的文件保存期可不遵守这个参数
CONF_Int32(trash_file_expire_time_sec, "259200");
Expand Down
14 changes: 11 additions & 3 deletions be/src/olap/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ OLAPStatus SnapshotManager::release_snapshot(const string& snapshot_path) {
&& snapshot_path.compare(abs_path.size(),
SNAPSHOT_PREFIX.size(), SNAPSHOT_PREFIX) == 0) {
remove_all_dir(snapshot_path);
VLOG(3) << "success to release snapshot path. [path='" << snapshot_path << "']";
LOG(INFO) << "success to release snapshot path. [path='" << snapshot_path << "']";

return OLAP_SUCCESS;
}
Expand Down Expand Up @@ -283,8 +283,11 @@ OLAPStatus SnapshotManager::_rename_rowset_id(const RowsetMetaPB& rs_meta_pb, co
return OLAP_SUCCESS;
}

// get snapshot path: curtime.seq.timeout
// eg: 20190819221234.3.86400
OLAPStatus SnapshotManager::_calc_snapshot_id_path(
const TabletSharedPtr& tablet,
int64_t timeout_s,
string* out_path) {
OLAPStatus res = OLAP_SUCCESS;
if (out_path == nullptr) {
Expand All @@ -303,7 +306,8 @@ OLAPStatus SnapshotManager::_calc_snapshot_id_path(
stringstream snapshot_id_path_stream;
MutexLock auto_lock(&_snapshot_mutex); // will automatically unlock when function return.
snapshot_id_path_stream << tablet->data_dir()->path() << SNAPSHOT_PREFIX
<< "/" << time_str << "." << _snapshot_base_id++;
<< "/" << time_str << "." << _snapshot_base_id++
<< "." << timeout_s;
*out_path = snapshot_id_path_stream.str();
return res;
}
Expand Down Expand Up @@ -356,7 +360,11 @@ OLAPStatus SnapshotManager::_create_snapshot_files(
}

string snapshot_id_path;
res = _calc_snapshot_id_path(ref_tablet, &snapshot_id_path);
int64_t timeout_s = config::snapshot_expire_time_sec;
if (request.__isset.timeout) {
timeout_s = request.timeout;
}
res = _calc_snapshot_id_path(ref_tablet, timeout_s, &snapshot_id_path);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to calc snapshot_id_path, ref tablet="
<< ref_tablet->data_dir()->path();
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/snapshot_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class SnapshotManager {

OLAPStatus _calc_snapshot_id_path(
const TabletSharedPtr& tablet,
int64_t timeout_s,
std::string* out_path);

std::string _get_header_full_path(
Expand Down
18 changes: 14 additions & 4 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,8 @@ OLAPStatus StorageEngine::start_trash_sweep(double* usage) {
OLAPStatus res = OLAP_SUCCESS;
LOG(INFO) << "start trash and snapshot sweep.";

const uint32_t snapshot_expire = config::snapshot_expire_time_sec;
const uint32_t trash_expire = config::trash_file_expire_time_sec;
const int32_t snapshot_expire = config::snapshot_expire_time_sec;
const int32_t trash_expire = config::trash_file_expire_time_sec;
const double guard_space = config::disk_capacity_insufficient_percentage / 100.0;
std::vector<DataDirInfo> data_dir_infos;
res = get_all_data_dir_info(&data_dir_infos);
Expand Down Expand Up @@ -679,7 +679,7 @@ void StorageEngine::_clean_unused_txns() {
}

OLAPStatus StorageEngine::_do_sweep(
const string& scan_root, const time_t& local_now, const uint32_t expire) {
const string& scan_root, const time_t& local_now, const int32_t expire) {
OLAPStatus res = OLAP_SUCCESS;
if (!check_dir_existed(scan_root)) {
// dir not existed. no need to sweep trash.
Expand All @@ -700,7 +700,17 @@ OLAPStatus StorageEngine::_do_sweep(
res = OLAP_ERR_OS_ERROR;
continue;
}
if (difftime(local_now, mktime(&local_tm_create)) >= expire) {

int32_t actual_expire = expire;
// try get timeout in dir name, the old snapshot dir does not contain timeout
// eg: 20190818221123.3.86400, the 86400 is timeout, in second
size_t pos = dir_name.find('.', str_time.size() + 1);
if (pos != string::npos) {
actual_expire = std::stoi(dir_name.substr(pos + 1));
}
VLOG(10) << "get actual expire time " << actual_expire << " of dir: " << dir_name;

if (difftime(local_now, mktime(&local_tm_create)) >= actual_expire) {
if (remove_all_dir(path_name) != OLAP_SUCCESS) {
LOG(WARNING) << "fail to remove file or directory. path=" << path_name;
res = OLAP_ERR_OS_ERROR;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class StorageEngine {
void _clean_unused_txns();

OLAPStatus _do_sweep(
const std::string& scan_root, const time_t& local_tm_now, const uint32_t expire);
const std::string& scan_root, const time_t& local_tm_now, const int32_t expire);

// Thread functions
// unused rowset monitor thread
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,10 @@ AgentStatus EngineCloneTask::_clone_copy(
}
snapshot_request.__set_missing_version(snapshot_versions);
}
if (clone_req.__isset.timeout_s) {
snapshot_request.__set_timeout(clone_req.timeout_s);
}

agent_client.make_snapshot(
snapshot_request,
&make_snapshot_result);
Expand Down
27 changes: 25 additions & 2 deletions fe/src/main/java/org/apache/doris/backup/BackupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.ReleaseSnapshotTask;
import org.apache.doris.task.SnapshotTask;
import org.apache.doris.task.UploadTask;
import org.apache.doris.thrift.TFinishTaskRequest;
Expand Down Expand Up @@ -152,9 +153,9 @@ public synchronized boolean finishTabletSnapshotTask(SnapshotTask task, TFinishT
Preconditions.checkState(request.isSetSnapshot_files());
// snapshot path does not contains last 'tablet_id' and 'schema_hash' dir
// eg:
// /path/to/your/be/data/snapshot/20180410102311.0/
// /path/to/your/be/data/snapshot/20180410102311.0.86400/
// Full path will look like:
// /path/to/your/be/data/snapshot/20180410102311.0/10006/352781111/
// /path/to/your/be/data/snapshot/20180410102311.0.86400/10006/352781111/
SnapshotInfo info = new SnapshotInfo(task.getDbId(), task.getTableId(), task.getPartitionId(),
task.getIndexId(), task.getTabletId(), task.getBackendId(),
task.getSchemaHash(), request.getSnapshot_path(),
Expand Down Expand Up @@ -583,6 +584,10 @@ private void saveMetaInfo() {
// meta info and job info has been saved to local file, this can be cleaned to reduce log size
backupMeta = null;
jobInfo = null;

// release all snapshots before clearing the snapshotInfos.
releaseSnapshots();

snapshotInfos.clear();

// log
Expand All @@ -591,6 +596,22 @@ private void saveMetaInfo() {
localMetaInfoFilePath, localJobInfoFilePath, this);
}

private void releaseSnapshots() {
if (snapshotInfos.isEmpty()) {
return;
}
// we do not care about the release snapshot tasks' success or failure,
// the GC thread on BE will sweep the snapshot, finally.
AgentBatchTask batchTask = new AgentBatchTask();
for (SnapshotInfo info : snapshotInfos.values()) {
ReleaseSnapshotTask releaseTask = new ReleaseSnapshotTask(null, info.getBeId(), info.getDbId(),
info.getTabletId(), info.getPath());
batchTask.addTask(releaseTask);
}
AgentTaskExecutor.submit(batchTask);
LOG.info("send {} release snapshot tasks, job: {}", snapshotInfos.size(), this);
}

private void uploadMetaAndJobInfoFile() {
String remoteMetaInfoFile = repo.assembleMetaInfoFilePath(label);
if (!uploadFile(localMetaInfoFilePath, remoteMetaInfoFile)) {
Expand Down Expand Up @@ -684,6 +705,8 @@ private void cancelInternal() {
}
}

releaseSnapshots();

BackupJobState curState = state;
finishedTime = System.currentTimeMillis();
state = BackupJobState.CANCELLED;
Expand Down
24 changes: 24 additions & 0 deletions fe/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.task.DirMoveTask;
import org.apache.doris.task.DownloadTask;
import org.apache.doris.task.ReleaseSnapshotTask;
import org.apache.doris.task.SnapshotTask;
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TStatusCode;
Expand Down Expand Up @@ -1230,6 +1231,10 @@ private Status allTabletCommitted(boolean isReplay) {
if (!isReplay) {
restoredPartitions.clear();
restoredTbls.clear();

// release snapshot before clearing snapshotInfos
releaseSnapshots();

snapshotInfos.clear();

finishedTime = System.currentTimeMillis();
Expand All @@ -1242,6 +1247,22 @@ private Status allTabletCommitted(boolean isReplay) {
return Status.OK;
}

private void releaseSnapshots() {
if (snapshotInfos.isEmpty()) {
return;
}
// we do not care about the release snapshot tasks' success or failure,
// the GC thread on BE will sweep the snapshot, finally.
AgentBatchTask batchTask = new AgentBatchTask();
for (SnapshotInfo info : snapshotInfos.values()) {
ReleaseSnapshotTask releaseTask = new ReleaseSnapshotTask(null, info.getBeId(), info.getDbId(),
info.getTabletId(), info.getPath());
batchTask.addTask(releaseTask);
}
AgentTaskExecutor.submit(batchTask);
LOG.info("send {} release snapshot tasks, job: {}", snapshotInfos.size(), this);
}

private void replayWaitingAllTabletsCommitted() {
allTabletCommitted(true /* is replay */);
}
Expand Down Expand Up @@ -1371,6 +1392,9 @@ public void cancelInternal(boolean isReplay) {
// backupMeta is useless
backupMeta = null;

releaseSnapshots();

snapshotInfos.clear();
RestoreJobState curState = state;
finishedTime = System.currentTimeMillis();
state = RestoreJobState.CANCELLED;
Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/java/org/apache/doris/backup/SnapshotInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class SnapshotInfo implements Writable {
private long tabletId;
private long beId;
private int schemaHash;
// eg: /path/to/your/be/data/snapshot/20180410102311.0/
// eg: /path/to/your/be/data/snapshot/20180410102311.0.86400/
private String path;
// eg:
// 10006_0_1_0_0.dat
Expand Down
6 changes: 3 additions & 3 deletions fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,8 @@ public CloneTask createCloneReplicaAndTask() throws SchedException {
"dest backend " + srcReplica.getBackendId() + " does not exist");
}

taskTimeoutMs = getApproximateTimeoutMs();

// create the clone task and clone replica.
// we use visible version in clone task, but set the clone replica's last failed version to
// committed version.
Expand All @@ -682,7 +684,7 @@ public CloneTask createCloneReplicaAndTask() throws SchedException {
TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), srcBe.getHttpPort());
cloneTask = new CloneTask(destBackendId, dbId, tblId, partitionId, indexId,
tabletId, schemaHash, Lists.newArrayList(tSrcBe), storageMedium,
visibleVersion, visibleVersionHash);
visibleVersion, visibleVersionHash, (int) (taskTimeoutMs / 1000));
cloneTask.setPathHash(srcPathHash, destPathHash);

// if this is a balance task, or this is a repair task with REPLICA_MISSING/REPLICA_RELOCATING or REPLICA_MISSING_IN_CLUSTER,
Expand Down Expand Up @@ -714,8 +716,6 @@ public CloneTask createCloneReplicaAndTask() throws SchedException {
}
}

taskTimeoutMs = getApproximateTimeoutMs();

this.state = State.RUNNING;
return cloneTask;
}
Expand Down
6 changes: 5 additions & 1 deletion fe/src/main/java/org/apache/doris/task/CloneTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,21 @@ public class CloneTask extends AgentTask {

private long srcPathHash = -1;
private long destPathHash = -1;

private int timeoutS;

private int taskVersion = VERSION_1;

public CloneTask(long backendId, long dbId, long tableId, long partitionId, long indexId,
long tabletId, int schemaHash, List<TBackend> srcBackends, TStorageMedium storageMedium,
long visibleVersion, long visibleVersionHash) {
long visibleVersion, long visibleVersionHash, int timeoutS) {
super(null, backendId, TTaskType.CLONE, dbId, tableId, partitionId, indexId, tabletId);
this.schemaHash = schemaHash;
this.srcBackends = srcBackends;
this.storageMedium = storageMedium;
this.visibleVersion = visibleVersion;
this.visibleVersionHash = visibleVersionHash;
this.timeoutS = timeoutS;
}

public int getSchemaHash() {
Expand Down Expand Up @@ -88,6 +91,7 @@ public TCloneReq toThrift() {
request.setSrc_path_hash(srcPathHash);
request.setDest_path_hash(destPathHash);
}
request.setTimeout_s(timeoutS);

return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ private Status makeSnapshots() {
snapshotRequest.setSchema_hash(Integer.parseInt(paloScanRange.getSchema_hash()));
snapshotRequest.setVersion(Long.parseLong(paloScanRange.getVersion()));
snapshotRequest.setVersion_hash(Long.parseLong(paloScanRange.getVersion_hash()));
snapshotRequest.setTimeout(job.getTimeoutSecond());

AgentClient client = new AgentClient(host, port);
TAgentResult result = client.makeSnapshot(snapshotRequest);
Expand Down
11 changes: 6 additions & 5 deletions fe/src/main/java/org/apache/doris/task/SnapshotTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ public class SnapshotTask extends AgentTask {

private int schemaHash;

private long timeout;
private long timeoutMs;

private boolean isRestoreTask;

public SnapshotTask(TResourceInfo resourceInfo, long backendId, long signature, long jobId,
long dbId, long tableId, long partitionId, long indexId, long tabletId,
long version, long versionHash, int schemaHash, long timeout, boolean isRestoreTask) {
long version, long versionHash, int schemaHash, long timeoutMs, boolean isRestoreTask) {
super(resourceInfo, backendId, TTaskType.MAKE_SNAPSHOT, dbId, tableId, partitionId, indexId, tabletId,
signature);

Expand All @@ -45,7 +45,7 @@ public SnapshotTask(TResourceInfo resourceInfo, long backendId, long signature,
this.versionHash = versionHash;
this.schemaHash = schemaHash;

this.timeout = timeout;
this.timeoutMs = timeoutMs;

this.isRestoreTask = isRestoreTask;
}
Expand All @@ -66,8 +66,8 @@ public int getSchemaHash() {
return schemaHash;
}

public long getTimeout() {
return timeout;
public long getTimeoutMs() {
return timeoutMs;
}

public boolean isRestoreTask() {
Expand All @@ -80,6 +80,7 @@ public TSnapshotRequest toThrift() {
request.setVersion_hash(versionHash);
request.setList_files(true);
request.setPreferred_snapshot_version(2);
request.setTimeout(timeoutMs / 1000);
return request;
}
}
2 changes: 1 addition & 1 deletion fe/src/test/java/org/apache/doris/task/AgentTaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void setUp() throws AnalysisException {
// clone
cloneTask =
new CloneTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, schemaHash1,
Arrays.asList(new TBackend("host1", 8290, 8390)), TStorageMedium.HDD, -1, -1);
Arrays.asList(new TBackend("host1", 8290, 8390)), TStorageMedium.HDD, -1, -1, 3600);

// rollup
rollupTask =
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/AgentService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ struct TCloneReq {
7: optional i32 task_version;
8: optional i64 src_path_hash;
9: optional i64 dest_path_hash;
10: optional i32 timeout_s;
}

struct TStorageMediumMigrateReq {
Expand Down