Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,10 @@ alter_stmt ::=
{:
RESULT = new AlterUserStmt(ifExists, user, null, passwdOptions);
:}
| KW_ALTER KW_REPOSITORY ident:repoName opt_properties:properties
{:
RESULT = new AlterRepositoryStmt(repoName, properties);
:}
;

opt_datasource_properties ::=
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.analysis;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.Maps;

import java.util.HashMap;
import java.util.Map;

public class AlterRepositoryStmt extends DdlStmt {
private String name;
private Map<String, String> properties;

public AlterRepositoryStmt(String name, Map<String, String> properties) {
this.name = name;
this.properties = properties;
if (this.properties == null) {
this.properties = Maps.newHashMap();
}
}

public Map<String, String> getProperties() {
return properties;
}

public String getName() {
return name;
}

@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
// check auth
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
FeNameFormat.checkCommonName("repository", name);
Map<String, String> copyProperties = new HashMap<>(properties);
if (copyProperties.size() == 0) {
throw new UserException("alter repository need contains ak/sk/token info of s3.");
}
copyProperties.remove(S3Properties.ACCESS_KEY);
copyProperties.remove(S3Properties.SECRET_KEY);
copyProperties.remove(S3Properties.SESSION_TOKEN);
copyProperties.remove(S3Properties.Env.ACCESS_KEY);
copyProperties.remove(S3Properties.Env.SECRET_KEY);
copyProperties.remove(S3Properties.Env.TOKEN);
if (copyProperties.size() != 0) {
throw new UserException("alter repository only support ak/sk/token info of s3."
+ " unsupported properties: " + copyProperties.keySet());
}
}

@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("ALTER REPOSITORY '").append(name).append("' ");
sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ", true, false)).append(")");
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ public void setTypeRead(boolean isTypeRead) {

public abstract boolean isCancelled();

public abstract Status updateRepo(Repository repo);

public static AbstractJob read(DataInput in) throws IOException {
AbstractJob job = null;
JobType type = JobType.valueOf(Text.readString(in));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.analysis.AbstractBackupStmt;
import org.apache.doris.analysis.AbstractBackupTableRefClause;
import org.apache.doris.analysis.AlterRepositoryStmt;
import org.apache.doris.analysis.BackupStmt;
import org.apache.doris.analysis.BackupStmt.BackupType;
import org.apache.doris.analysis.CancelBackupStmt;
Expand Down Expand Up @@ -49,6 +50,7 @@
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.fs.remote.S3FileSystem;
import org.apache.doris.task.DirMoveTask;
import org.apache.doris.task.DownloadTask;
import org.apache.doris.task.SnapshotTask;
Expand Down Expand Up @@ -222,6 +224,49 @@ public void createRepository(CreateRepositoryStmt stmt) throws DdlException {
}
}

public void alterRepository(AlterRepositoryStmt stmt) throws DdlException {
tryLock();
try {
Repository repo = repoMgr.getRepo(stmt.getName());
if (repo == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Repository does not exist");
}

if (repo.getRemoteFileSystem() instanceof S3FileSystem) {
Map<String, String> oldProperties = new HashMap<>(stmt.getProperties());
Status status = repo.alterRepositoryS3Properties(oldProperties);
if (!status.ok()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, status.getErrMsg());
}
RemoteFileSystem fileSystem = FileSystemFactory.get(repo.getRemoteFileSystem().getName(),
StorageBackend.StorageType.S3, oldProperties);
Repository newRepo = new Repository(repo.getId(), repo.getName(), repo.isReadOnly(),
repo.getLocation(), fileSystem);
if (!newRepo.ping()) {
LOG.warn("Failed to connect repository {}. msg: {}", repo.getName(), repo.getErrorMsg());
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Repo can not ping with new s3 properties");
}

Status st = repoMgr.alterRepo(newRepo, false /* not replay */);
if (!st.ok()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Failed to alter repository: " + st.getErrMsg());
}
for (AbstractJob job : getAllCurrentJobs()) {
if (!job.isDone() && job.getRepoId() == repo.getId()) {
job.updateRepo(newRepo);
}
}
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Only support alter s3 repository");
}
} finally {
seqlock.unlock();
}
}

// handle drop repository stmt
public void dropRepository(DropRepositoryStmt stmt) throws DdlException {
tryLock();
Expand Down
22 changes: 22 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,28 @@ public boolean isCancelled() {
return state == BackupJobState.CANCELLED;
}

@Override
public synchronized Status updateRepo(Repository repo) {
this.repo = repo;

if (this.state == BackupJobState.UPLOADING) {
for (Map.Entry<Long, Long> entry : unfinishedTaskIds.entrySet()) {
long signature = entry.getKey();
long beId = entry.getValue();
AgentTask task = AgentTaskQueue.getTask(beId, TTaskType.UPLOAD, signature);
if (task == null || task.getTaskType() != TTaskType.UPLOAD) {
continue;
}
((UploadTask) task).updateBrokerProperties(
S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties()));
AgentTaskQueue.updateTask(beId, TTaskType.UPLOAD, signature, task);
}
LOG.info("finished to update upload job properties. {}", this);
}
LOG.info("finished to update repo of job. {}", this);
return Status.OK;
}

// Polling the job state and do the right things.
@Override
public synchronized void run() {
Expand Down
35 changes: 35 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.PersistentFileSystem;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.RemoteFile;
Expand Down Expand Up @@ -58,7 +59,10 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;

/*
Expand Down Expand Up @@ -290,6 +294,37 @@ public Status initRepository() {
}
}

public Status alterRepositoryS3Properties(Map<String, String> properties) {
if (fileSystem instanceof S3FileSystem) {
Map<String, String> oldProperties = new HashMap<>(this.getRemoteFileSystem().getProperties());
oldProperties.remove(S3Properties.ACCESS_KEY);
oldProperties.remove(S3Properties.SECRET_KEY);
oldProperties.remove(S3Properties.SESSION_TOKEN);
oldProperties.remove(S3Properties.Env.ACCESS_KEY);
oldProperties.remove(S3Properties.Env.SECRET_KEY);
oldProperties.remove(S3Properties.Env.TOKEN);
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (Objects.equals(entry.getKey(), S3Properties.ACCESS_KEY)
|| Objects.equals(entry.getKey(), S3Properties.Env.ACCESS_KEY)) {
oldProperties.putIfAbsent(S3Properties.ACCESS_KEY, entry.getValue());
}
if (Objects.equals(entry.getKey(), S3Properties.SECRET_KEY)
|| Objects.equals(entry.getKey(), S3Properties.Env.SECRET_KEY)) {
oldProperties.putIfAbsent(S3Properties.SECRET_KEY, entry.getValue());
}
if (Objects.equals(entry.getKey(), S3Properties.SESSION_TOKEN)
|| Objects.equals(entry.getKey(), S3Properties.Env.TOKEN)) {
oldProperties.putIfAbsent(S3Properties.SESSION_TOKEN, entry.getValue());
}
}
properties.clear();
properties.putAll(oldProperties);
return Status.OK;
} else {
return new Status(ErrCode.COMMON_ERROR, "Only support alter s3 repository");
}
}

// eg: location/__palo_repository_repo_name/__repo_info
public String assembleRepoInfoFilePath() {
return Joiner.on(PATH_DELIMITER).join(location,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.fs.remote.S3FileSystem;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -96,6 +97,31 @@ public Repository getRepo(long repoId) {
return repoIdMap.get(repoId);
}

public Status alterRepo(Repository newRepo, boolean isReplay) {
lock.lock();
try {
Repository repo = repoNameMap.get(newRepo.getName());
if (repo != null) {
if (repo.getRemoteFileSystem() instanceof S3FileSystem) {
repoNameMap.put(repo.getName(), newRepo);
repoIdMap.put(repo.getId(), newRepo);

if (!isReplay) {
// log
Env.getCurrentEnv().getEditLog().logAlterRepository(newRepo);
}
LOG.info("successfully alter repo {}, isReplay {}", newRepo.getName(), isReplay);
return Status.OK;
} else {
return new Status(ErrCode.COMMON_ERROR, "Only support alter s3 repository");
}
}
return new Status(ErrCode.NOT_FOUND, "repository does not exist");
} finally {
lock.unlock();
}
}

public Status removeRepo(String repoName, boolean isReplay) {
lock.lock();
try {
Expand Down
22 changes: 22 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,28 @@ public boolean isCancelled() {
return state == RestoreJobState.CANCELLED;
}

@Override
public synchronized Status updateRepo(Repository repo) {
this.repo = repo;

if (this.state == RestoreJobState.DOWNLOADING) {
for (Map.Entry<Long, Long> entry : unfinishedSignatureToId.entrySet()) {
long signature = entry.getKey();
long beId = entry.getValue();
AgentTask task = AgentTaskQueue.getTask(beId, TTaskType.DOWNLOAD, signature);
if (task == null || task.getTaskType() != TTaskType.DOWNLOAD) {
continue;
}
((DownloadTask) task).updateBrokerProperties(
S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties()));
AgentTaskQueue.updateTask(beId, TTaskType.DOWNLOAD, signature, task);
}
LOG.info("finished to update download job properties. {}", this);
}
LOG.info("finished to update repo of job. {}", this);
return Status.OK;
}

@Override
public void run() {
if (state == RestoreJobState.FINISHED || state == RestoreJobState.CANCELLED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,11 @@ public void readFields(DataInput in) throws IOException {
isRead = true;
break;
}
case OperationType.OP_ALTER_REPOSITORY: {
data = Repository.read(in);
isRead = true;
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,11 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) {
env.getAlterInstance().processAlterMTMV(alterMtmv, true);
break;
}
case OperationType.OP_ALTER_REPOSITORY: {
Repository repository = (Repository) journal.getData();
env.getBackupHandler().getRepoMgr().alterRepo(repository, true);
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
Expand Down Expand Up @@ -1524,6 +1529,10 @@ public void logDropRepository(String repoName) {
logEdit(OperationType.OP_DROP_REPOSITORY, new Text(repoName));
}

public void logAlterRepository(Repository repo) {
logEdit(OperationType.OP_ALTER_REPOSITORY, repo);
}

public void logRestoreJob(RestoreJob job) {
logEdit(OperationType.OP_RESTORE_JOB, job);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ public class OperationType {

public static final short OP_ALTER_MTMV = 459;

public static final short OP_ALTER_REPOSITORY = 460;

/**
* Get opcode name by op code.
**/
Expand Down
3 changes: 3 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
import org.apache.doris.analysis.AlterDatabaseRename;
import org.apache.doris.analysis.AlterPolicyStmt;
import org.apache.doris.analysis.AlterRepositoryStmt;
import org.apache.doris.analysis.AlterResourceStmt;
import org.apache.doris.analysis.AlterRoutineLoadStmt;
import org.apache.doris.analysis.AlterSqlBlockRuleStmt;
Expand Down Expand Up @@ -389,6 +390,8 @@ public static void execute(Env env, DdlStmt ddlStmt) throws Exception {
} else if (ddlStmt instanceof DropAnalyzeJobStmt) {
DropAnalyzeJobStmt analyzeJobStmt = (DropAnalyzeJobStmt) ddlStmt;
Env.getCurrentEnv().getAnalysisManager().dropAnalyzeJob(analyzeJobStmt);
} else if (ddlStmt instanceof AlterRepositoryStmt) {
env.getBackupHandler().alterRepository((AlterRepositoryStmt) ddlStmt);
} else {
LOG.warn("Unkown statement " + ddlStmt.getClass());
throw new DdlException("Unknown statement.");
Expand Down
Loading