From 70e3823b3117eeae11206ee7b0045de10cd2aa77 Mon Sep 17 00:00:00 2001 From: Hao Tan Date: Tue, 17 Nov 2020 05:50:25 +0000 Subject: [PATCH 01/10] Record all backup jobs and support where clause --- fe/fe-core/src/main/cup/sql_parser.cup | 4 +- .../apache/doris/analysis/ShowBackupStmt.java | 92 ++++++++++++- .../doris/analysis/ShowRestoreStmt.java | 82 +++++++++++- .../apache/doris/backup/BackupHandler.java | 122 ++++++++++++++---- .../java/org/apache/doris/common/Config.java | 6 + .../org/apache/doris/qe/ShowExecutor.java | 30 ++--- 6 files changed, 282 insertions(+), 54 deletions(-) diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index c5c19f6646acab..813b850ad61a0c 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -2507,9 +2507,9 @@ show_param ::= {: RESULT = new ShowUserPropertyStmt(user, parser.wild); :} - | KW_BACKUP opt_db:db + | KW_BACKUP opt_db:db opt_wild_where {: - RESULT = new ShowBackupStmt(db); + RESULT = new ShowBackupStmt(db, parser.where); :} | KW_RESTORE opt_db:db opt_wild_where {: diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBackupStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBackupStmt.java index 6137697e3d60ad..dcaa2f61242422 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBackupStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBackupStmt.java @@ -17,19 +17,23 @@ package org.apache.doris.analysis; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.ScalarType; import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.CaseSensibility; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.UserException; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ShowResultSetMetaData; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; +import java.util.function.Predicate; public class ShowBackupStmt extends ShowStmt { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() @@ -39,9 +43,13 @@ public class ShowBackupStmt extends ShowStmt { .build(); private String dbName; + private final Expr where; + private boolean isAccurateMatch; + private String labelValue; - public ShowBackupStmt(String dbName) { + public ShowBackupStmt(String dbName, Expr where) { this.dbName = dbName; + this.where = where; } public String getDbName() { @@ -65,6 +73,56 @@ public void analyze(Analyzer analyzer) throws UserException { ErrorReport.reportAnalysisException(ErrorCode.ERR_DB_ACCESS_DENIED, ConnectContext.get().getQualifiedUser(), dbName); } + + if (where == null) { + return; + } + boolean valid = analyzeWhereClause(); + if (!valid) { + throw new AnalysisException("Where clause should like: LABEL = \"your_label_name\", " + + " or LABEL LIKE \"matcher\""); + } + } + + private boolean analyzeWhereClause() { + if (!(where instanceof LikePredicate) && !(where instanceof BinaryPredicate)) { + return false; + } + + if (where instanceof BinaryPredicate) { + BinaryPredicate binaryPredicate = (BinaryPredicate) where; + if (BinaryPredicate.Operator.EQ != binaryPredicate.getOp()) { + return false; + } + isAccurateMatch = true; + } + + if (where instanceof LikePredicate) { + LikePredicate likePredicate = (LikePredicate) where; + if (LikePredicate.Operator.LIKE != likePredicate.getOp()) { + return false; + } + } + + // left child + if (!(where.getChild(0) instanceof SlotRef)) { + return false; + } + String leftKey = ((SlotRef) where.getChild(0)).getColumnName(); + if (!"label".equalsIgnoreCase(leftKey)) { + return false; + } + + // right child + if (!(where.getChild(1) instanceof StringLiteral)) { + return false; + } + labelValue = ((StringLiteral) where.getChild(1)).getStringValue(); + if (Strings.isNullOrEmpty(labelValue)) { + return false; + } + + return true; } @Override @@ -84,6 +142,10 @@ public String toSql() { builder.append(" FROM `").append(dbName).append("` "); } + if (where != null) { + builder.append(where.toSql()); + } + return builder.toString(); } @@ -96,4 +158,28 @@ public String toString() { public RedirectStatus getRedirectStatus() { return RedirectStatus.FORWARD_NO_SYNC; } + + public boolean isAccurateMatch() { + return isAccurateMatch; + } + + public String getLabelValue() { + return labelValue; + } + + public Expr getWhere() { + return where; + } + + public Predicate getLabelPredicate() throws AnalysisException { + if (null == where) { + return label -> true; + } + if (isAccurateMatch) { + return CaseSensibility.LABEL.getCaseSensibility() ? label -> label.equals(labelValue) : label -> label.equalsIgnoreCase(labelValue); + } else { + PatternMatcher patternMatcher = PatternMatcher.createMysqlPattern(labelValue, CaseSensibility.LABEL.getCaseSensibility()); + return patternMatcher::match; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRestoreStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRestoreStmt.java index 9426898c39addd..b47a0a39f7a5ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRestoreStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRestoreStmt.java @@ -21,8 +21,11 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.ScalarType; import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.CaseSensibility; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.UserException; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -31,6 +34,8 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; +import java.util.function.Predicate; + public class ShowRestoreStmt extends ShowStmt { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("JobId").add("Label").add("Timestamp").add("DbName").add("State") @@ -42,7 +47,8 @@ public class ShowRestoreStmt extends ShowStmt { private String dbName; private Expr where; - private String label; + private String labelValue; + private boolean isAccurateMatch; public ShowRestoreStmt(String dbName, Expr where) { this.dbName = dbName; @@ -53,8 +59,8 @@ public String getDbName() { return dbName; } - public String getLabel() { - return label; + public String getLabelValue() { + return labelValue; } @Override @@ -74,6 +80,56 @@ public void analyze(Analyzer analyzer) throws UserException { ErrorReport.reportAnalysisException(ErrorCode.ERR_DB_ACCESS_DENIED, ConnectContext.get().getQualifiedUser(), dbName); } + + if (where == null) { + return; + } + boolean valid = analyzeWhereClause(); + if (!valid) { + throw new AnalysisException("Where clause should like: LABEL = \"your_label_name\", " + + " or LABEL LIKE \"matcher\""); + } + } + + private boolean analyzeWhereClause() { + if (!(where instanceof LikePredicate) && !(where instanceof BinaryPredicate)) { + return false; + } + + if (where instanceof BinaryPredicate) { + BinaryPredicate binaryPredicate = (BinaryPredicate) where; + if (BinaryPredicate.Operator.EQ != binaryPredicate.getOp()) { + return false; + } + isAccurateMatch = true; + } + + if (where instanceof LikePredicate) { + LikePredicate likePredicate = (LikePredicate) where; + if (LikePredicate.Operator.LIKE != likePredicate.getOp()) { + return false; + } + } + + // left child + if (!(where.getChild(0) instanceof SlotRef)) { + return false; + } + String leftKey = ((SlotRef) where.getChild(0)).getColumnName(); + if (!"label".equalsIgnoreCase(leftKey)) { + return false; + } + + // right child + if (!(where.getChild(1) instanceof StringLiteral)) { + return false; + } + labelValue = ((StringLiteral) where.getChild(1)).getStringValue(); + if (Strings.isNullOrEmpty(labelValue)) { + return false; + } + + return true; } @Override @@ -106,5 +162,25 @@ public String toString() { public RedirectStatus getRedirectStatus() { return RedirectStatus.FORWARD_NO_SYNC; } + + public boolean isAccurateMatch() { + return isAccurateMatch; + } + + public Expr getWhere() { + return where; + } + + public Predicate getLabelPredicate() throws AnalysisException { + if (null == where) { + return label -> true; + } + if (isAccurateMatch) { + return CaseSensibility.LABEL.getCaseSensibility() ? label -> label.equals(labelValue) : label -> label.equalsIgnoreCase(labelValue); + } else { + PatternMatcher patternMatcher = PatternMatcher.createMysqlPattern(labelValue, CaseSensibility.LABEL.getCaseSensibility()); + return patternMatcher::match; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index bf5e43cb254c3c..3b500647c72f0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -17,6 +17,7 @@ package org.apache.doris.backup; +import org.apache.commons.collections.CollectionUtils; import org.apache.doris.analysis.AbstractBackupStmt; import org.apache.doris.analysis.AbstractBackupTableRefClause; import org.apache.doris.analysis.BackupStmt; @@ -55,7 +56,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; @@ -67,11 +67,16 @@ import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; +import java.util.stream.Collectors; public class BackupHandler extends MasterDaemon implements Writable { private static final Logger LOG = LogManager.getLogger(BackupHandler.class); @@ -82,13 +87,14 @@ public class BackupHandler extends MasterDaemon implements Writable { private RepositoryMgr repoMgr = new RepositoryMgr(); - // db id -> last running or finished backup/restore jobs - // We only save the last backup/restore job of a database. + // this lock is used for updating dbIdToBackupOrRestoreJobs + private final ReentrantLock jobLock = new ReentrantLock(); + + // db id -> last 100(max_backup_job_num_per_db) backup/restore jobs // Newly submitted job will replace the current job, only if current job is finished or cancelled. // If the last job is finished, user can get the job info from repository. If the last job is cancelled, // user can get the error message before submitting the next one. - // Use ConcurrentMap to get rid of locks. - private Map dbIdToBackupOrRestoreJob = Maps.newConcurrentMap(); + private final Map> dbIdToBackupOrRestoreJobs = new HashMap<>(); // this lock is used for handling one backup or restore request at a time. private ReentrantLock seqlock = new ReentrantLock(); @@ -154,7 +160,19 @@ private boolean init() { } public AbstractJob getJob(long dbId) { - return dbIdToBackupOrRestoreJob.get(dbId); + return getCurrentJob(dbId); + } + + public List getJobs(long dbId, Predicate predicate) { + jobLock.lock(); + try { + return dbIdToBackupOrRestoreJobs.getOrDefault(dbId, new LinkedList<>()) + .stream() + .filter(e -> predicate.test(e.getLabel())) + .collect(Collectors.toList()); + } finally { + jobLock.unlock(); + } } @Override @@ -165,7 +183,7 @@ protected void runAfterCatalogReady() { } } - for (AbstractJob job : dbIdToBackupOrRestoreJob.values()) { + for (AbstractJob job : getAllCurrentJobs()) { job.setCatalog(catalog); job.run(); } @@ -197,8 +215,8 @@ public void dropRepository(DropRepositoryStmt stmt) throws DdlException { if (repo == null) { ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Repository does not exist"); } - - for (AbstractJob job : dbIdToBackupOrRestoreJob.values()) { + + for (AbstractJob job : getAllCurrentJobs()) { if (!job.isDone() && job.getRepoId() == repo.getId()) { ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Backup or restore job is running on this repository." @@ -239,7 +257,7 @@ public void process(AbstractBackupStmt stmt) throws DdlException { tryLock(); try { // Check if there is backup or restore job running on this database - AbstractJob currentJob = dbIdToBackupOrRestoreJob.get(db.getId()); + AbstractJob currentJob = getCurrentJob(db.getId()); if (currentJob != null && !currentJob.isDone()) { ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Can only run one backup or restore job of a database at same time"); @@ -364,7 +382,7 @@ private void backup(Repository repository, Database db, BackupStmt stmt) throws catalog.getEditLog().logBackupJob(backupJob); // must put to dbIdToBackupOrRestoreJob after edit log, otherwise the state of job may be changed. - dbIdToBackupOrRestoreJob.put(db.getId(), backupJob); + addBackupOrRestoreJob(db.getId(), backupJob); LOG.info("finished to submit backup job: {}", backupJob); } @@ -392,11 +410,49 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw catalog.getEditLog().logRestoreJob(restoreJob); // must put to dbIdToBackupOrRestoreJob after edit log, otherwise the state of job may be changed. - dbIdToBackupOrRestoreJob.put(db.getId(), restoreJob); + addBackupOrRestoreJob(db.getId(), restoreJob); LOG.info("finished to submit restore job: {}", restoreJob); } + private void addBackupOrRestoreJob(long dbId, AbstractJob job) { + jobLock.lock(); + try { + Deque jobs = dbIdToBackupOrRestoreJobs.computeIfAbsent(dbId, k -> Lists.newLinkedList()); + if (jobs.size() == Config.max_backup_job_num_per_db) { + jobs.removeFirst(); + } + AbstractJob lastJob = jobs.peekLast(); + // only save the latest job + if (lastJob != null && (lastJob.isPending() || lastJob.getJobId() == job.getJobId())) { + jobs.removeLast(); + } + jobs.addLast(job); + } finally { + jobLock.unlock(); + } + } + + private List getAllCurrentJobs() { + jobLock.lock(); + try { + return dbIdToBackupOrRestoreJobs.values().stream().filter(CollectionUtils::isNotEmpty) + .map(Deque::getLast).collect(Collectors.toList()); + } finally { + jobLock.unlock(); + } + } + + private AbstractJob getCurrentJob(long dbId) { + jobLock.lock(); + try { + Deque jobs = dbIdToBackupOrRestoreJobs.getOrDefault(dbId, Lists.newLinkedList()); + return jobs.isEmpty() ? null : jobs.getLast(); + } finally { + jobLock.unlock(); + } + } + private void checkAndFilterRestoreObjsExistInSnapshot(BackupJobInfo jobInfo, AbstractBackupTableRefClause backupTableRefClause) throws DdlException { @@ -490,8 +546,8 @@ public void cancel(CancelBackupStmt stmt) throws DdlException { if (db == null) { ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName); } - - AbstractJob job = dbIdToBackupOrRestoreJob.get(db.getId()); + + AbstractJob job = getCurrentJob(db.getId()); if (job == null || (job instanceof BackupJob && stmt.isRestore()) || (job instanceof RestoreJob && !stmt.isRestore())) { ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "No " @@ -508,7 +564,8 @@ public void cancel(CancelBackupStmt stmt) throws DdlException { } public boolean handleFinishedSnapshotTask(SnapshotTask task, TFinishTaskRequest request) { - AbstractJob job = dbIdToBackupOrRestoreJob.get(task.getDbId()); + AbstractJob job = getCurrentJob(task.getDbId()); + if (job == null) { LOG.warn("failed to find backup or restore job for task: {}", task); // return true to remove this task from AgentTaskQueue @@ -533,7 +590,7 @@ public boolean handleFinishedSnapshotTask(SnapshotTask task, TFinishTaskRequest } public boolean handleFinishedSnapshotUploadTask(UploadTask task, TFinishTaskRequest request) { - AbstractJob job = dbIdToBackupOrRestoreJob.get(task.getDbId()); + AbstractJob job = getCurrentJob(task.getDbId()); if (job == null || (job instanceof RestoreJob)) { LOG.info("invalid upload task: {}, no backup job is found. db id: {}", task, task.getDbId()); return false; @@ -548,8 +605,8 @@ public boolean handleFinishedSnapshotUploadTask(UploadTask task, TFinishTaskRequ } public boolean handleDownloadSnapshotTask(DownloadTask task, TFinishTaskRequest request) { - AbstractJob job = dbIdToBackupOrRestoreJob.get(task.getDbId()); - if (job == null || !(job instanceof RestoreJob)) { + AbstractJob job = getCurrentJob(task.getDbId()); + if (!(job instanceof RestoreJob)) { LOG.warn("failed to find restore job for task: {}", task); // return true to remove this task from AgentTaskQueue return true; @@ -559,8 +616,8 @@ public boolean handleDownloadSnapshotTask(DownloadTask task, TFinishTaskRequest } public boolean handleDirMoveTask(DirMoveTask task, TFinishTaskRequest request) { - AbstractJob job = dbIdToBackupOrRestoreJob.get(task.getDbId()); - if (job == null || !(job instanceof RestoreJob)) { + AbstractJob job = getCurrentJob(task.getDbId()); + if (!(job instanceof RestoreJob)) { LOG.warn("failed to find restore job for task: {}", task); // return true to remove this task from AgentTaskQueue return true; @@ -571,16 +628,16 @@ public boolean handleDirMoveTask(DirMoveTask task, TFinishTaskRequest request) { public void replayAddJob(AbstractJob job) { if (job.isCancelled()) { - AbstractJob existingJob = dbIdToBackupOrRestoreJob.get(job.getDbId()); + AbstractJob existingJob = getCurrentJob(job.getDbId()); if (existingJob == null || existingJob.isDone()) { LOG.error("invalid existing job: {}. current replay job is: {}", - existingJob, job); + existingJob, job); return; } existingJob.setCatalog(catalog); existingJob.replayCancel(); } else if (!job.isPending()) { - AbstractJob existingJob = dbIdToBackupOrRestoreJob.get(job.getDbId()); + AbstractJob existingJob = getCurrentJob(job.getDbId()); if (existingJob == null || existingJob.isDone()) { LOG.error("invalid existing job: {}. current replay job is: {}", existingJob, job); @@ -591,11 +648,12 @@ public void replayAddJob(AbstractJob job) { // for example: In restore job, PENDING will transfer to SNAPSHOTING, not DOWNLOAD. job.replayRun(); } - dbIdToBackupOrRestoreJob.put(job.getDbId(), job); + + addBackupOrRestoreJob(job.getDbId(), job); } public boolean report(TTaskType type, long jobId, long taskId, int finishedNum, int totalNum) { - for (AbstractJob job : dbIdToBackupOrRestoreJob.values()) { + for (AbstractJob job : getAllCurrentJobs()) { if (job.getType() == JobType.BACKUP) { if (!job.isDone() && job.getJobId() == jobId && type == TTaskType.UPLOAD) { job.taskProgress.put(taskId, Pair.create(finishedNum, totalNum)); @@ -621,8 +679,16 @@ public static BackupHandler read(DataInput in) throws IOException { public void write(DataOutput out) throws IOException { repoMgr.write(out); - out.writeInt(dbIdToBackupOrRestoreJob.size()); - for (AbstractJob job : dbIdToBackupOrRestoreJob.values()) { + jobLock.lock(); + List jobs; + try { + jobs = dbIdToBackupOrRestoreJobs.values().stream().flatMap(Deque::stream).collect(Collectors.toList()); + } finally { + jobLock.unlock(); + } + + out.writeInt(jobs.size()); + for (AbstractJob job : jobs) { job.write(out); } } @@ -633,7 +699,7 @@ public void readFields(DataInput in) throws IOException { int size = in.readInt(); for (int i = 0; i < size; i++) { AbstractJob job = AbstractJob.read(in); - dbIdToBackupOrRestoreJob.put(job.getDbId(), job); + addBackupOrRestoreJob(job.getDbId(), job); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 52fa0db5d3b03f..55a3beff5c8e48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1396,4 +1396,10 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static int max_dynamic_partition_num = 500; + + /* + * Control the max num of backup job per db + */ + @ConfField(mutable = true, masterOnly = true) + public static int max_backup_job_num_per_db = 100; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 1e9a006e135878..42e035c800b6f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -1614,16 +1614,13 @@ private void handleShowBackup() throws AnalysisException { ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR, showStmt.getDbName()); } - AbstractJob jobI = Catalog.getCurrentCatalog().getBackupHandler().getJob(db.getId()); - if (!(jobI instanceof BackupJob)) { - resultSet = new ShowResultSet(showStmt.getMetaData(), EMPTY_SET); - return; - } + List jobs = Catalog.getCurrentCatalog().getBackupHandler().getJobs(db.getId(), showStmt.getLabelPredicate()); + + List backupJobs = jobs.stream().filter(job -> job instanceof BackupJob) + .map(job -> (BackupJob) job).collect(Collectors.toList()); + + List> infos = backupJobs.stream().map(BackupJob::getInfo).collect(Collectors.toList()); - BackupJob backupJob = (BackupJob) jobI; - List info = backupJob.getInfo(); - List> infos = Lists.newArrayList(); - infos.add(info); resultSet = new ShowResultSet(showStmt.getMetaData(), infos); } @@ -1634,16 +1631,13 @@ private void handleShowRestore() throws AnalysisException { ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR, showStmt.getDbName()); } - AbstractJob jobI = Catalog.getCurrentCatalog().getBackupHandler().getJob(db.getId()); - if (!(jobI instanceof RestoreJob)) { - resultSet = new ShowResultSet(showStmt.getMetaData(), EMPTY_SET); - return; - } + List jobs = Catalog.getCurrentCatalog().getBackupHandler().getJobs(db.getId(), showStmt.getLabelPredicate()); + + List restoreJobs = jobs.stream().filter(job -> job instanceof RestoreJob) + .map(job -> (RestoreJob) job).collect(Collectors.toList()); + + List> infos = restoreJobs.stream().map(RestoreJob::getInfo).collect(Collectors.toList()); - RestoreJob restoreJob = (RestoreJob) jobI; - List info = restoreJob.getInfo(); - List> infos = Lists.newArrayList(); - infos.add(info); resultSet = new ShowResultSet(showStmt.getMetaData(), infos); } From 7e3be0f2ad6d440f3f27a8999618d740df808f88 Mon Sep 17 00:00:00 2001 From: Tan Hao Date: Mon, 31 May 2021 18:32:42 +0800 Subject: [PATCH 02/10] add doc --- docs/en/administrator-guide/config/fe_config.md | 4 ++++ docs/zh-CN/administrator-guide/config/fe_config.md | 3 +++ 2 files changed, 7 insertions(+) diff --git a/docs/en/administrator-guide/config/fe_config.md b/docs/en/administrator-guide/config/fe_config.md index 7c4fa5745f8c54..8727502d59f70a 100644 --- a/docs/en/administrator-guide/config/fe_config.md +++ b/docs/en/administrator-guide/config/fe_config.md @@ -490,6 +490,10 @@ Can cooperate with `mix_clone_task_timeout_sec` to control the maximum and minim ### `max_running_rollup_job_num_per_table` +### `max_backup_job_num_per_db` + +This configuration is mainly used to control the number of backup/restore tasks recorded in each database. + ### `max_running_txn_num_per_db` This configuration is mainly used to control the number of concurrent load jobs of the same database. diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md b/docs/zh-CN/administrator-guide/config/fe_config.md index 29acd0288a345b..320d1cdfeec08b 100644 --- a/docs/zh-CN/administrator-guide/config/fe_config.md +++ b/docs/zh-CN/administrator-guide/config/fe_config.md @@ -487,6 +487,9 @@ HTTP服务允许接收请求的Header的最大长度,单位为比特,默认 ### `max_running_rollup_job_num_per_table` +### `max_backup_job_num_per_db` +此配置用于控制每个 DB 能够记录的 backup/restore 任务的数量,默认为 100 + ### `max_running_txn_num_per_db` 这个配置主要是用来控制同一个 db 的并发导入个数的。 From 60ad68b1944e716bfeace39b77b6c48f87022352 Mon Sep 17 00:00:00 2001 From: Tan Hao Date: Sat, 5 Jun 2021 12:07:17 +0800 Subject: [PATCH 03/10] resolve comment --- docs/en/administrator-guide/config/fe_config.md | 2 +- docs/zh-CN/administrator-guide/config/fe_config.md | 2 +- .../main/java/org/apache/doris/backup/BackupHandler.java | 8 +++++--- .../src/main/java/org/apache/doris/common/Config.java | 2 +- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/docs/en/administrator-guide/config/fe_config.md b/docs/en/administrator-guide/config/fe_config.md index 8727502d59f70a..1366856acfa589 100644 --- a/docs/en/administrator-guide/config/fe_config.md +++ b/docs/en/administrator-guide/config/fe_config.md @@ -490,7 +490,7 @@ Can cooperate with `mix_clone_task_timeout_sec` to control the maximum and minim ### `max_running_rollup_job_num_per_table` -### `max_backup_job_num_per_db` +### `max_backup_restore_job_num_per_db` This configuration is mainly used to control the number of backup/restore tasks recorded in each database. diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md b/docs/zh-CN/administrator-guide/config/fe_config.md index 320d1cdfeec08b..773f3920f74c08 100644 --- a/docs/zh-CN/administrator-guide/config/fe_config.md +++ b/docs/zh-CN/administrator-guide/config/fe_config.md @@ -487,7 +487,7 @@ HTTP服务允许接收请求的Header的最大长度,单位为比特,默认 ### `max_running_rollup_job_num_per_table` -### `max_backup_job_num_per_db` +### `max_backup_restore_job_num_per_db` 此配置用于控制每个 DB 能够记录的 backup/restore 任务的数量,默认为 100 ### `max_running_txn_num_per_db` diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index 3b500647c72f0d..94943a1baaed83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -90,7 +90,7 @@ public class BackupHandler extends MasterDaemon implements Writable { // this lock is used for updating dbIdToBackupOrRestoreJobs private final ReentrantLock jobLock = new ReentrantLock(); - // db id -> last 100(max_backup_job_num_per_db) backup/restore jobs + // db id -> last 10(max_backup_restore_job_num_per_db) backup/restore jobs // Newly submitted job will replace the current job, only if current job is finished or cancelled. // If the last job is finished, user can get the job info from repository. If the last job is cancelled, // user can get the error message before submitting the next one. @@ -419,11 +419,13 @@ private void addBackupOrRestoreJob(long dbId, AbstractJob job) { jobLock.lock(); try { Deque jobs = dbIdToBackupOrRestoreJobs.computeIfAbsent(dbId, k -> Lists.newLinkedList()); - if (jobs.size() == Config.max_backup_job_num_per_db) { + while (jobs.size() >= Config.max_backup_restore_job_num_per_db) { jobs.removeFirst(); } AbstractJob lastJob = jobs.peekLast(); - // only save the latest job + + // Remove duplicate jobs and keep only the latest status + // Otherwise, the tasks that have been successfully executed will be repeated when replaying edit log. if (lastJob != null && (lastJob.isPending() || lastJob.getJobId() == job.getJobId())) { jobs.removeLast(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 55a3beff5c8e48..5175852bdcf969 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1401,5 +1401,5 @@ public class Config extends ConfigBase { * Control the max num of backup job per db */ @ConfField(mutable = true, masterOnly = true) - public static int max_backup_job_num_per_db = 100; + public static int max_backup_restore_job_num_per_db = 10; } From c99f474986db7f810a59d8784fe513510880db24 Mon Sep 17 00:00:00 2001 From: Tan Hao Date: Sat, 5 Jun 2021 12:07:55 +0800 Subject: [PATCH 04/10] fix log --- fe/fe-core/src/main/java/org/apache/doris/common/Config.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 5175852bdcf969..934d435dc99a17 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1398,7 +1398,7 @@ public class Config extends ConfigBase { public static int max_dynamic_partition_num = 500; /* - * Control the max num of backup job per db + * Control the max num of backup/restore job per db */ @ConfField(mutable = true, masterOnly = true) public static int max_backup_restore_job_num_per_db = 10; From 6a958bb06153e121551173f4b57e34f908f2ef69 Mon Sep 17 00:00:00 2001 From: Tan Hao Date: Wed, 9 Jun 2021 13:32:58 +0800 Subject: [PATCH 05/10] import order --- .../src/main/java/org/apache/doris/backup/BackupHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index 94943a1baaed83..ea0221e21cddcb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -17,7 +17,6 @@ package org.apache.doris.backup; -import org.apache.commons.collections.CollectionUtils; import org.apache.doris.analysis.AbstractBackupStmt; import org.apache.doris.analysis.AbstractBackupTableRefClause; import org.apache.doris.analysis.BackupStmt; @@ -58,6 +57,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; From eea05dd596a2b99298d6ae12da3e3f878ba5fd0f Mon Sep 17 00:00:00 2001 From: Tan Hao Date: Wed, 9 Jun 2021 13:35:45 +0800 Subject: [PATCH 06/10] import --- .../main/java/org/apache/doris/analysis/ShowBackupStmt.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBackupStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBackupStmt.java index dcaa2f61242422..053d5c8d480663 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBackupStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBackupStmt.java @@ -17,8 +17,6 @@ package org.apache.doris.analysis; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.ScalarType; @@ -33,6 +31,9 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ShowResultSetMetaData; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; + import java.util.function.Predicate; public class ShowBackupStmt extends ShowStmt { From 87b273739f421a91980bc0478686d99399aa1ea7 Mon Sep 17 00:00:00 2001 From: Hao Tan <45457672+e0c9@users.noreply.github.com> Date: Fri, 18 Jun 2021 15:29:42 +0800 Subject: [PATCH 07/10] Update fe_config.md --- docs/en/administrator-guide/config/fe_config.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/docs/en/administrator-guide/config/fe_config.md b/docs/en/administrator-guide/config/fe_config.md index 1a5c0699c457ec..752cc290309086 100644 --- a/docs/en/administrator-guide/config/fe_config.md +++ b/docs/en/administrator-guide/config/fe_config.md @@ -571,9 +571,6 @@ IsMutable:true MasterOnly:true - -### `max_running_txn_num_per_db` - This configuration is mainly used to control the number of concurrent load jobs of the same database. When there are too many load jobs running in the cluster, the newly submitted load jobs may report errors: From 60745bf17e194155d07d920cf4111df085749fbd Mon Sep 17 00:00:00 2001 From: Hao Tan <45457672+e0c9@users.noreply.github.com> Date: Fri, 18 Jun 2021 15:30:44 +0800 Subject: [PATCH 08/10] Update fe_config.md --- docs/en/administrator-guide/config/fe_config.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/en/administrator-guide/config/fe_config.md b/docs/en/administrator-guide/config/fe_config.md index 752cc290309086..db4cd53d42dde8 100644 --- a/docs/en/administrator-guide/config/fe_config.md +++ b/docs/en/administrator-guide/config/fe_config.md @@ -496,7 +496,9 @@ This will limit the max recursion depth of hash distribution pruner. So that distribution pruner will no work and just return all buckets. Increase the depth can support distribution pruning for more elements, but may cost more CPU. -### `max_backup_restore_job_num_per_db` +### max_backup_restore_job_num_per_db + +Default: 10 This configuration is mainly used to control the number of backup/restore tasks recorded in each database. @@ -508,7 +510,6 @@ IsMutable:true MasterOnly:true - If set to true, the insert stmt with processing error will still return a label to user. And user can use this label to check the load job's status. The default value is false, which means if insert operation encounter errors, exception will be thrown to user client directly without load label. ### small_file_dir From 50262f45c05cafca968a3c726dde125b62db2b76 Mon Sep 17 00:00:00 2001 From: Hao Tan <45457672+e0c9@users.noreply.github.com> Date: Fri, 18 Jun 2021 15:31:29 +0800 Subject: [PATCH 09/10] Update fe_config.md --- docs/zh-CN/administrator-guide/config/fe_config.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md b/docs/zh-CN/administrator-guide/config/fe_config.md index a980fee5075797..612704f3e561a0 100644 --- a/docs/zh-CN/administrator-guide/config/fe_config.md +++ b/docs/zh-CN/administrator-guide/config/fe_config.md @@ -545,7 +545,7 @@ SmallFileMgr 中存储的最大文件数 最大 Routine Load 作业数,包括 NEED_SCHEDULED, RUNNING, PAUSE -### `max_backup_restore_job_num_per_db` +### max_backup_restore_job_num_per_db 默认值:10 @@ -559,7 +559,6 @@ SmallFileMgr 中存储的最大文件数 是否为 Master FE 节点独有的配置项:true - 这个配置主要是用来控制同一个 db 的并发导入个数的。 当集群中有过多的导入任务正在运行时,新提交的导入任务可能会报错: From 9f8874feae7b7de0557a8b26391e1c795577dab5 Mon Sep 17 00:00:00 2001 From: Hao Tan <45457672+e0c9@users.noreply.github.com> Date: Fri, 18 Jun 2021 16:31:32 +0800 Subject: [PATCH 10/10] Update BackupHandler.java --- .../main/java/org/apache/doris/backup/BackupHandler.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index ea0221e21cddcb..5b18f2fee31389 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -681,14 +681,7 @@ public static BackupHandler read(DataInput in) throws IOException { public void write(DataOutput out) throws IOException { repoMgr.write(out); - jobLock.lock(); - List jobs; - try { - jobs = dbIdToBackupOrRestoreJobs.values().stream().flatMap(Deque::stream).collect(Collectors.toList()); - } finally { - jobLock.unlock(); - } - + List jobs = dbIdToBackupOrRestoreJobs.values().stream().flatMap(Deque::stream).collect(Collectors.toList()); out.writeInt(jobs.size()); for (AbstractJob job : jobs) { job.write(out);