Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 35 additions & 20 deletions cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,9 +523,18 @@ static void set_default_vault_log_helper(const InstanceInfoPB& instance,
LOG(INFO) << vault_msg;
}

static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction> txn,
static bool vault_exist(const InstanceInfoPB& instance, const std::string& new_vault_name) {
for (auto& name : instance.storage_vault_names()) {
if (new_vault_name == name) {
return true;
}
}
return false;
}

static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction>& txn,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add UT

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add UT

done

const StorageVaultPB& vault, MetaServiceCode& code,
std::string& msg) {
std::string& msg, AlterObjStoreInfoResponse* response) {
if (!vault.has_hdfs_info()) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
Expand Down Expand Up @@ -591,6 +600,13 @@ static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tr
msg = ss.str();
return -1;
}

if (vault_exist(instance, vault.alter_name())) {
code = MetaServiceCode::ALREADY_EXISTED;
msg = fmt::format("vault_name={} already existed", vault.alter_name());
return -1;
}

new_vault.set_name(vault.alter_name());
*name_itr = vault.alter_name();
}
Expand Down Expand Up @@ -623,19 +639,15 @@ static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tr
txn->put(vault_key, val);
LOG(INFO) << "put vault_id=" << vault_id << ", vault_key=" << hex(vault_key)
<< ", origin vault=" << origin_vault_info << ", new_vault=" << new_vault_info;
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit kv txn, err={}", err);
LOG(WARNING) << msg;
}

DCHECK_EQ(new_vault.id(), vault_id);
response->set_storage_vault_id(new_vault.id());
return 0;
}

static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction> txn,
static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction>& txn,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'alter_s3_storage_vault' exceeds recommended size/complexity thresholds [readability-function-size]

static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction>& txn,
           ^
Additional context

cloud/src/meta-service/meta_service_resource.cpp:654: 130 lines including whitespace and comments (threshold 80)

static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction>& txn,
           ^

const StorageVaultPB& vault, MetaServiceCode& code,
std::string& msg) {
std::string& msg, AlterObjStoreInfoResponse* response) {
if (!vault.has_obj_info()) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
Expand Down Expand Up @@ -708,6 +720,13 @@ static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tran
msg = ss.str();
return -1;
}

if (vault_exist(instance, vault.alter_name())) {
code = MetaServiceCode::ALREADY_EXISTED;
msg = fmt::format("vault_name={} already existed", vault.alter_name());
return -1;
}

new_vault.set_name(vault.alter_name());
*name_itr = vault.alter_name();
}
Expand Down Expand Up @@ -747,13 +766,9 @@ static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tran
txn->put(vault_key, val);
LOG(INFO) << "put vault_id=" << vault_id << ", vault_key=" << hex(vault_key)
<< ", origin vault=" << origin_vault_info << ", new vault=" << new_vault_info;
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit kv txn, err={}", err);
LOG(WARNING) << msg;
}

DCHECK_EQ(new_vault.id(), vault_id);
response->set_storage_vault_id(new_vault.id());
return 0;
}

Expand Down Expand Up @@ -1100,12 +1115,12 @@ void MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* contr
break;
}
case AlterObjStoreInfoRequest::ALTER_S3_VAULT: {
alter_s3_storage_vault(instance, std::move(txn), request->vault(), code, msg);
return;
alter_s3_storage_vault(instance, txn, request->vault(), code, msg, response);
break;
}
case AlterObjStoreInfoRequest::ALTER_HDFS_VAULT: {
alter_hdfs_storage_vault(instance, std::move(txn), request->vault(), code, msg);
return;
alter_hdfs_storage_vault(instance, txn, request->vault(), code, msg, response);
break;
}
case AlterObjStoreInfoRequest::DROP_S3_VAULT:
[[fallthrough]];
Expand Down
39 changes: 39 additions & 0 deletions cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,25 @@ TEST(MetaServiceTest, AlterS3StorageVaultTest) {
meta_service->alter_storage_vault(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();

{
AlterObjStoreInfoRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
req.set_op(AlterObjStoreInfoRequest::ALTER_S3_VAULT);
StorageVaultPB vault;
vault.set_alter_name(new_vault_name);
ObjectStoreInfoPB obj;
obj_info.set_ak("new_ak");
obj_info.set_sk("new_sk");
vault.mutable_obj_info()->MergeFrom(obj);
vault.set_name(new_vault_name);
req.mutable_vault()->CopyFrom(vault);
meta_service->alter_storage_vault(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res,
nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED) << res.status().msg();
}

InstanceInfoPB instance;
get_test_instance(instance);

Expand Down Expand Up @@ -726,6 +745,7 @@ TEST(MetaServiceTest, AlterHdfsStorageVaultTest) {
meta_service->alter_storage_vault(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();

InstanceInfoPB instance;
get_test_instance(instance);

Expand Down Expand Up @@ -793,6 +813,25 @@ TEST(MetaServiceTest, AlterHdfsStorageVaultTest) {
meta_service->alter_storage_vault(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();

{
AlterObjStoreInfoRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
req.set_op(AlterObjStoreInfoRequest::ALTER_HDFS_VAULT);
StorageVaultPB vault;
vault.mutable_hdfs_info()->mutable_build_conf()->set_user("hadoop");
vault.set_name(new_vault_name);
vault.set_alter_name(new_vault_name);
req.mutable_vault()->CopyFrom(vault);

brpc::Controller cntl;
AlterObjStoreInfoResponse res;
meta_service->alter_storage_vault(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res,
nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED) << res.status().msg();
}

InstanceInfoPB instance;
get_test_instance(instance);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3725,7 +3725,10 @@ private static void addOlapTablePropertyInfo(OlapTable olapTable, StringBuilder
}

// Storage Vault
if (!olapTable.getStorageVaultName().isEmpty()) {
if (!Strings.isNullOrEmpty(olapTable.getStorageVaultId())) {
sb.append(",\n\"").append(PropertyAnalyzer
.PROPERTIES_STORAGE_VAULT_ID).append("\" = \"");
sb.append(olapTable.getStorageVaultId()).append("\"");
sb.append(",\n\"").append(PropertyAnalyzer
.PROPERTIES_STORAGE_VAULT_NAME).append("\" = \"");
sb.append(olapTable.getStorageVaultName()).append("\"");
Expand Down
20 changes: 8 additions & 12 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,22 +265,18 @@ public void setIsBeingSynced(boolean isBeingSynced) {
String.valueOf(isBeingSynced));
}

public void setStorageVaultName(String storageVaultName) throws DdlException {
if (storageVaultName == null || storageVaultName.isEmpty()) {
return;
}
getOrCreatTableProperty().setStorageVaultName(storageVaultName);
}

public String getStorageVaultName() {
return getOrCreatTableProperty().getStorageVaultName();
if (Strings.isNullOrEmpty(getStorageVaultId())) {
return "";
}
return Env.getCurrentEnv().getStorageVaultMgr().getVaultNameById(getStorageVaultId());
}

public void setStorageVaultId(String setStorageVaultId) throws DdlException {
if (setStorageVaultId == null || setStorageVaultId.isEmpty()) {
throw new DdlException("Invalid Storage Vault, please set one useful storage vault");
public void setStorageVaultId(String storageVaultId) throws DdlException {
if (Strings.isNullOrEmpty(storageVaultId)) {
throw new DdlException("Invalid storage vault id, please set an available storage vault");
}
getOrCreatTableProperty().setStorageVaultId(setStorageVaultId);
getOrCreatTableProperty().setStorageVaultId(storageVaultId);
}

public String getStorageVaultId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public abstract class StorageVault {
public static final String LOWER_CASE_META_NAMES = "lower_case_meta_names";
public static final String META_NAMES_MAPPING = "meta_names_mapping";

public static final String VAULT_NAME = "VAULT_NAME";

public enum StorageVaultType {
UNKNOWN,
S3,
Expand All @@ -60,7 +62,6 @@ public static StorageVaultType fromString(String storageVaultTypeType) {
}
}

protected static final String VAULT_NAME = "VAULT_NAME";
protected String name;
protected StorageVaultType type;
protected String id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.doris.thrift.TNetworkAddress;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -80,12 +82,42 @@ public void refreshVaultMap(Map<String, String> vaultMap) {
rwLock.writeLock().unlock();
}

public String getVaultIdByName(String name) {
String vaultId;
rwLock.readLock().lock();
vaultId = vaultNameToVaultId.getOrDefault(name, "");
rwLock.readLock().unlock();
return vaultId;
public String getVaultIdByName(String vaultName) {
try {
rwLock.readLock().lock();
return vaultNameToVaultId.getOrDefault(vaultName, "");
} finally {
rwLock.readLock().unlock();
}
}

public String getVaultNameById(String vaultId) {
try {
rwLock.readLock().lock();
for (Map.Entry<String, String> entry : vaultNameToVaultId.entrySet()) {
if (entry.getValue().equals(vaultId)) {
return entry.getKey();
}
}
return "";
} finally {
rwLock.readLock().unlock();
}
}

private void updateVaultNameToIdCache(String oldVaultName, String newVaultName, String vaultId) {
try {
rwLock.writeLock().lock();
String cachedVaultId = vaultNameToVaultId.get(oldVaultName);
vaultNameToVaultId.remove(oldVaultName);
Preconditions.checkArgument(!Strings.isNullOrEmpty(cachedVaultId), cachedVaultId,
"Cached vault id is null or empty");
Preconditions.checkArgument(cachedVaultId.equals(vaultId),
"Cached vault id not equal to remote storage." + cachedVaultId + " - " + vaultId);
vaultNameToVaultId.put(newVaultName, vaultId);
} finally {
rwLock.writeLock().unlock();
}
}

private Cloud.StorageVaultPB.Builder buildAlterS3VaultRequest(Map<String, String> properties, String name)
Expand Down Expand Up @@ -166,8 +198,12 @@ public void alterStorageVault(StorageVaultType type, Map<String, String> propert
LOG.warn("failed to alter storage vault response: {} ", response);
throw new DdlException(response.getStatus().getMsg());
}
LOG.info("Succeed to alter storage vault {}, id {}, origin default vault replaced {}",
name, response.getStorageVaultId(), response.getDefaultStorageVaultReplaced());

if (request.hasVault() && request.getVault().hasAlterName()) {
updateVaultNameToIdCache(name, request.getVault().getAlterName(), response.getStorageVaultId());
LOG.info("Succeed to alter storage vault, old name:{} new name: {} id:{}", name,
request.getVault().getAlterName(), response.getStorageVaultId());
}

// Make BE eagerly fetch the storage vault info from Meta Service
ALTER_BE_SYNC_THREAD_POOL.execute(() -> alterSyncVaultTask());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,10 +793,6 @@ public String getStorageVaultName() {
return properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_VAULT_NAME, "");
}

public void setStorageVaultName(String storageVaultName) {
properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_VAULT_NAME, storageVaultName);
}

public String getPropertiesString() throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.proto.OlapCommon;
import org.apache.doris.proto.OlapFile;
Expand Down Expand Up @@ -106,6 +105,12 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa
throws DdlException {
// create base index first.
Preconditions.checkArgument(tbl.getBaseIndexId() != -1);

if (((CloudEnv) Env.getCurrentEnv()).getEnableStorageVault()) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(tbl.getStorageVaultId()),
"Storage vault id is null or empty");
}

MaterializedIndex baseIndex = new MaterializedIndex(tbl.getBaseIndexId(), IndexState.NORMAL);

LOG.info("begin create cloud partition");
Expand All @@ -129,9 +134,6 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa

long version = partition.getVisibleVersion();

final String storageVaultName = tbl.getStorageVaultName();
boolean storageVaultIdSet = tbl.getStorageVaultId().isEmpty();

// short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
long indexId = entry.getKey();
Expand Down Expand Up @@ -184,29 +186,11 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa
tbl.storagePageSize());
requestBuilder.addTabletMetas(builder);
}
if (!storageVaultIdSet && ((CloudEnv) Env.getCurrentEnv()).getEnableStorageVault()) {
requestBuilder.setStorageVaultName(storageVaultName);
}
requestBuilder.setDbId(dbId);

LOG.info("create tablets, dbId: {}, tableId: {}, tableName: {}, partitionId: {}, partitionName: {}, "
+ "indexId: {}, vault name: {}",
dbId, tbl.getId(), tbl.getName(), partitionId, partitionName, indexId, storageVaultName);
Cloud.CreateTabletsResponse resp = sendCreateTabletsRpc(requestBuilder);
// If the resp has no vault id set, it means the MS is running with enable_storage_vault false
if (resp.hasStorageVaultId() && !storageVaultIdSet) {
tbl.setStorageVaultId(resp.getStorageVaultId());
storageVaultIdSet = true;
if (storageVaultName.isEmpty()) {
// If user doesn't specify the vault name for this table, we should set it
// to make the show create table stmt return correct stmt
// TODO(ByteYue): setDefaultStorageVault for vaultMgr might override user's
// defualt vault, maybe we should set it using show default storage vault stmt
tbl.setStorageVaultName(resp.getStorageVaultName());
Env.getCurrentEnv().getStorageVaultMgr().setDefaultStorageVault(
Pair.of(resp.getStorageVaultName(), resp.getStorageVaultId()));
}
}
LOG.info("create tablets dbId: {} tableId: {} tableName: {} partitionId: {} partitionName: {} "
+ "indexId: {} vaultId: {}",
dbId, tbl.getId(), tbl.getName(), partitionId, partitionName, indexId, tbl.getStorageVaultId());
sendCreateTabletsRpc(requestBuilder);
if (index.getId() != tbl.getBaseIndexId()) {
// add rollup index to partition
partition.createRollupIndex(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2771,7 +2771,6 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx
+ "' for storage vault '" + storageVaultName + "'");
}

olapTable.setStorageVaultName(storageVaultName);
storageVaultId = env.getStorageVaultMgr().getVaultIdByName(storageVaultName);
if (Strings.isNullOrEmpty(storageVaultId)) {
throw new DdlException("Storage vault '" + storageVaultName + "' does not exist. "
Expand Down
Loading
Loading