diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 1cdedf23952df7..c01be1200ddb20 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -1389,6 +1389,10 @@ alter_stmt ::= {: RESULT = new AlterUserStmt(ifExists, user, null, passwdOptions); :} + | KW_ALTER KW_REPOSITORY ident:repoName opt_properties:properties + {: + RESULT = new AlterRepositoryStmt(repoName, properties); + :} ; opt_datasource_properties ::= diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRepositoryStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRepositoryStmt.java new file mode 100644 index 00000000000000..b0ecd6d3bd4b76 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRepositoryStmt.java @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.FeNameFormat; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.PrintableMap; +import org.apache.doris.datasource.property.constants.S3Properties; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.Maps; + +import java.util.HashMap; +import java.util.Map; + +public class AlterRepositoryStmt extends DdlStmt { + private String name; + private Map properties; + + public AlterRepositoryStmt(String name, Map properties) { + this.name = name; + this.properties = properties; + if (this.properties == null) { + this.properties = Maps.newHashMap(); + } + } + + public Map getProperties() { + return properties; + } + + public String getName() { + return name; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + // check auth + if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + FeNameFormat.checkCommonName("repository", name); + Map copyProperties = new HashMap<>(properties); + if (copyProperties.size() == 0) { + throw new UserException("alter repository need contains ak/sk/token info of s3."); + } + copyProperties.remove(S3Properties.ACCESS_KEY); + copyProperties.remove(S3Properties.SECRET_KEY); + copyProperties.remove(S3Properties.SESSION_TOKEN); + copyProperties.remove(S3Properties.Env.ACCESS_KEY); + copyProperties.remove(S3Properties.Env.SECRET_KEY); + copyProperties.remove(S3Properties.Env.TOKEN); + if (copyProperties.size() != 0) { + throw new UserException("alter repository only support ak/sk/token info of s3." + + " unsupported properties: " + copyProperties.keySet()); + } + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("ALTER REPOSITORY '").append(name).append("' "); + sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ", true, false)).append(")"); + return sb.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java index d2f1dd11979186..0df9155ab34e87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java @@ -155,6 +155,8 @@ public void setTypeRead(boolean isTypeRead) { public abstract boolean isCancelled(); + public abstract Status updateRepo(Repository repo); + public static AbstractJob read(DataInput in) throws IOException { AbstractJob job = null; JobType type = JobType.valueOf(Text.readString(in)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index 5279175fa00ec7..7c0485228f7f02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.AbstractBackupStmt; import org.apache.doris.analysis.AbstractBackupTableRefClause; +import org.apache.doris.analysis.AlterRepositoryStmt; import org.apache.doris.analysis.BackupStmt; import org.apache.doris.analysis.BackupStmt.BackupType; import org.apache.doris.analysis.CancelBackupStmt; @@ -49,6 +50,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.fs.remote.RemoteFileSystem; +import org.apache.doris.fs.remote.S3FileSystem; import org.apache.doris.task.DirMoveTask; import org.apache.doris.task.DownloadTask; import org.apache.doris.task.SnapshotTask; @@ -222,6 +224,49 @@ public void createRepository(CreateRepositoryStmt stmt) throws DdlException { } } + public void alterRepository(AlterRepositoryStmt stmt) throws DdlException { + tryLock(); + try { + Repository repo = repoMgr.getRepo(stmt.getName()); + if (repo == null) { + ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Repository does not exist"); + } + + if (repo.getRemoteFileSystem() instanceof S3FileSystem) { + Map oldProperties = new HashMap<>(stmt.getProperties()); + Status status = repo.alterRepositoryS3Properties(oldProperties); + if (!status.ok()) { + ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, status.getErrMsg()); + } + RemoteFileSystem fileSystem = FileSystemFactory.get(repo.getRemoteFileSystem().getName(), + StorageBackend.StorageType.S3, oldProperties); + Repository newRepo = new Repository(repo.getId(), repo.getName(), repo.isReadOnly(), + repo.getLocation(), fileSystem); + if (!newRepo.ping()) { + LOG.warn("Failed to connect repository {}. msg: {}", repo.getName(), repo.getErrorMsg()); + ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, + "Repo can not ping with new s3 properties"); + } + + Status st = repoMgr.alterRepo(newRepo, false /* not replay */); + if (!st.ok()) { + ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, + "Failed to alter repository: " + st.getErrMsg()); + } + for (AbstractJob job : getAllCurrentJobs()) { + if (!job.isDone() && job.getRepoId() == repo.getId()) { + job.updateRepo(newRepo); + } + } + } else { + ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, + "Only support alter s3 repository"); + } + } finally { + seqlock.unlock(); + } + } + // handle drop repository stmt public void dropRepository(DropRepositoryStmt stmt) throws DdlException { tryLock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index a91c2e12a5183b..ef77d042390d34 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -272,6 +272,28 @@ public boolean isCancelled() { return state == BackupJobState.CANCELLED; } + @Override + public synchronized Status updateRepo(Repository repo) { + this.repo = repo; + + if (this.state == BackupJobState.UPLOADING) { + for (Map.Entry entry : unfinishedTaskIds.entrySet()) { + long signature = entry.getKey(); + long beId = entry.getValue(); + AgentTask task = AgentTaskQueue.getTask(beId, TTaskType.UPLOAD, signature); + if (task == null || task.getTaskType() != TTaskType.UPLOAD) { + continue; + } + ((UploadTask) task).updateBrokerProperties( + S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties())); + AgentTaskQueue.updateTask(beId, TTaskType.UPLOAD, signature, task); + } + LOG.info("finished to update upload job properties. {}", this); + } + LOG.info("finished to update repo of job. {}", this); + return Status.OK; + } + // Polling the job state and do the right things. @Override public synchronized void run() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java index a236ba3c30865f..251cb24efae746 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java @@ -29,6 +29,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.fs.PersistentFileSystem; import org.apache.doris.fs.remote.BrokerFileSystem; import org.apache.doris.fs.remote.RemoteFile; @@ -58,7 +59,10 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.UUID; /* @@ -290,6 +294,37 @@ public Status initRepository() { } } + public Status alterRepositoryS3Properties(Map properties) { + if (fileSystem instanceof S3FileSystem) { + Map oldProperties = new HashMap<>(this.getRemoteFileSystem().getProperties()); + oldProperties.remove(S3Properties.ACCESS_KEY); + oldProperties.remove(S3Properties.SECRET_KEY); + oldProperties.remove(S3Properties.SESSION_TOKEN); + oldProperties.remove(S3Properties.Env.ACCESS_KEY); + oldProperties.remove(S3Properties.Env.SECRET_KEY); + oldProperties.remove(S3Properties.Env.TOKEN); + for (Map.Entry entry : properties.entrySet()) { + if (Objects.equals(entry.getKey(), S3Properties.ACCESS_KEY) + || Objects.equals(entry.getKey(), S3Properties.Env.ACCESS_KEY)) { + oldProperties.putIfAbsent(S3Properties.ACCESS_KEY, entry.getValue()); + } + if (Objects.equals(entry.getKey(), S3Properties.SECRET_KEY) + || Objects.equals(entry.getKey(), S3Properties.Env.SECRET_KEY)) { + oldProperties.putIfAbsent(S3Properties.SECRET_KEY, entry.getValue()); + } + if (Objects.equals(entry.getKey(), S3Properties.SESSION_TOKEN) + || Objects.equals(entry.getKey(), S3Properties.Env.TOKEN)) { + oldProperties.putIfAbsent(S3Properties.SESSION_TOKEN, entry.getValue()); + } + } + properties.clear(); + properties.putAll(oldProperties); + return Status.OK; + } else { + return new Status(ErrCode.COMMON_ERROR, "Only support alter s3 repository"); + } + } + // eg: location/__palo_repository_repo_name/__repo_info public String assembleRepoInfoFilePath() { return Joiner.on(PATH_DELIMITER).join(location, diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java index 3866c55331c414..e923ba4e0441aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.Daemon; +import org.apache.doris.fs.remote.S3FileSystem; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -96,6 +97,31 @@ public Repository getRepo(long repoId) { return repoIdMap.get(repoId); } + public Status alterRepo(Repository newRepo, boolean isReplay) { + lock.lock(); + try { + Repository repo = repoNameMap.get(newRepo.getName()); + if (repo != null) { + if (repo.getRemoteFileSystem() instanceof S3FileSystem) { + repoNameMap.put(repo.getName(), newRepo); + repoIdMap.put(repo.getId(), newRepo); + + if (!isReplay) { + // log + Env.getCurrentEnv().getEditLog().logAlterRepository(newRepo); + } + LOG.info("successfully alter repo {}, isReplay {}", newRepo.getName(), isReplay); + return Status.OK; + } else { + return new Status(ErrCode.COMMON_ERROR, "Only support alter s3 repository"); + } + } + return new Status(ErrCode.NOT_FOUND, "repository does not exist"); + } finally { + lock.unlock(); + } + } + public Status removeRepo(String repoName, boolean isReplay) { lock.lock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index a69189ba10a758..e69bab67cb5587 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -337,6 +337,28 @@ public boolean isCancelled() { return state == RestoreJobState.CANCELLED; } + @Override + public synchronized Status updateRepo(Repository repo) { + this.repo = repo; + + if (this.state == RestoreJobState.DOWNLOADING) { + for (Map.Entry entry : unfinishedSignatureToId.entrySet()) { + long signature = entry.getKey(); + long beId = entry.getValue(); + AgentTask task = AgentTaskQueue.getTask(beId, TTaskType.DOWNLOAD, signature); + if (task == null || task.getTaskType() != TTaskType.DOWNLOAD) { + continue; + } + ((DownloadTask) task).updateBrokerProperties( + S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties())); + AgentTaskQueue.updateTask(beId, TTaskType.DOWNLOAD, signature, task); + } + LOG.info("finished to update download job properties. {}", this); + } + LOG.info("finished to update repo of job. {}", this); + return Status.OK; + } + @Override public void run() { if (state == RestoreJobState.FINISHED || state == RestoreJobState.CANCELLED) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index b0749db1fe14ea..b47dd090f4c921 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -876,6 +876,11 @@ public void readFields(DataInput in) throws IOException { isRead = true; break; } + case OperationType.OP_ALTER_REPOSITORY: { + data = Repository.read(in); + isRead = true; + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index c5da3e600e5e08..98d2c1ff0a3d91 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -1116,6 +1116,11 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) { env.getAlterInstance().processAlterMTMV(alterMtmv, true); break; } + case OperationType.OP_ALTER_REPOSITORY: { + Repository repository = (Repository) journal.getData(); + env.getBackupHandler().getRepoMgr().alterRepo(repository, true); + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); @@ -1524,6 +1529,10 @@ public void logDropRepository(String repoName) { logEdit(OperationType.OP_DROP_REPOSITORY, new Text(repoName)); } + public void logAlterRepository(Repository repo) { + logEdit(OperationType.OP_ALTER_REPOSITORY, repo); + } + public void logRestoreJob(RestoreJob job) { logEdit(OperationType.OP_RESTORE_JOB, job); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 0769cba0d81e11..af410c46986432 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -348,6 +348,8 @@ public class OperationType { public static final short OP_ALTER_MTMV = 459; + public static final short OP_ALTER_REPOSITORY = 460; + /** * Get opcode name by op code. **/ diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 5a2bae1ee05d99..9cb11b7630cded 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -38,6 +38,7 @@ import org.apache.doris.analysis.AlterDatabaseQuotaStmt; import org.apache.doris.analysis.AlterDatabaseRename; import org.apache.doris.analysis.AlterPolicyStmt; +import org.apache.doris.analysis.AlterRepositoryStmt; import org.apache.doris.analysis.AlterResourceStmt; import org.apache.doris.analysis.AlterRoutineLoadStmt; import org.apache.doris.analysis.AlterSqlBlockRuleStmt; @@ -389,6 +390,8 @@ public static void execute(Env env, DdlStmt ddlStmt) throws Exception { } else if (ddlStmt instanceof DropAnalyzeJobStmt) { DropAnalyzeJobStmt analyzeJobStmt = (DropAnalyzeJobStmt) ddlStmt; Env.getCurrentEnv().getAnalysisManager().dropAnalyzeJob(analyzeJobStmt); + } else if (ddlStmt instanceof AlterRepositoryStmt) { + env.getBackupHandler().alterRepository((AlterRepositoryStmt) ddlStmt); } else { LOG.warn("Unkown statement " + ddlStmt.getClass()); throw new DdlException("Unknown statement."); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java index 1677701272cbe8..c22f0b2ee36100 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java @@ -135,6 +135,18 @@ public static synchronized AgentTask getTask(long backendId, TTaskType type, lon return signatureMap.get(signature); } + public static synchronized void updateTask(long backendId, TTaskType type, long signature, AgentTask newTask) { + if (!tasks.contains(backendId, type)) { + return; + } + + Map signatureMap = tasks.get(backendId, type); + if (!signatureMap.containsKey(signature)) { + return; + } + signatureMap.put(signature, newTask); + } + // this is just for unit test public static synchronized List getTask(TTaskType type) { List res = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java index 6482c5f807b08b..5dbbc46b7b26f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java @@ -78,6 +78,10 @@ public Map getBrokerProperties() { return brokerProperties; } + public void updateBrokerProperties(Map brokerProperties) { + this.brokerProperties = new java.util.HashMap<>(brokerProperties); + } + public TDownloadReq toThrift() { // these fields are required // 1: required i64 job_id diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/UploadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/UploadTask.java index f41d722825a33a..b2a4ae489ef814 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/UploadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/UploadTask.java @@ -24,6 +24,7 @@ import org.apache.doris.thrift.TTaskType; import org.apache.doris.thrift.TUploadReq; +import java.util.HashMap; import java.util.Map; public class UploadTask extends AgentTask { @@ -64,6 +65,10 @@ public Map getBrokerProperties() { return brokerProperties; } + public void updateBrokerProperties(Map brokerProperties) { + this.brokerProperties = new HashMap<>(brokerProperties); + } + public TUploadReq toThrift() { TNetworkAddress address = new TNetworkAddress(broker.host, broker.port); TUploadReq request = new TUploadReq(jobId, srcToDestPath, address);