Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions be/src/io/cache/block_file_cache_downloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include <memory>
#include <mutex>
#include <unordered_set>
#include <variant>

#include "cloud/cloud_tablet_mgr.h"
Expand Down Expand Up @@ -171,6 +172,7 @@ std::unordered_map<std::string, RowsetMetaSharedPtr> snapshot_rs_metas(BaseTable

void FileCacheBlockDownloader::download_file_cache_block(
const DownloadTask::FileCacheBlockMetaVec& metas) {
std::unordered_set<int64_t> synced_tablets;
std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) {
VLOG_DEBUG << "download_file_cache_block: start, tablet_id=" << meta.tablet_id()
<< ", rowset_id=" << meta.rowset_id() << ", segment_id=" << meta.segment_id()
Expand All @@ -183,12 +185,20 @@ void FileCacheBlockDownloader::download_file_cache_block(
} else {
tablet = std::move(res).value();
}

if (!synced_tablets.contains(meta.tablet_id())) {
auto st = tablet->sync_rowsets();
if (!st) {
// just log failed, try it best
LOG(WARNING) << "failed to sync rowsets: " << meta.tablet_id()
<< " err msg: " << st.to_string();
}
synced_tablets.insert(meta.tablet_id());
}
auto id_to_rowset_meta_map = snapshot_rs_metas(tablet.get());
auto find_it = id_to_rowset_meta_map.find(meta.rowset_id());
if (find_it == id_to_rowset_meta_map.end()) {
LOG(WARNING) << "download_file_cache_block: tablet_id=" << meta.tablet_id()
<< "rowset_id not found, rowset_id=" << meta.rowset_id();
<< " rowset_id not found, rowset_id=" << meta.rowset_id();
return;
}

Expand Down
27 changes: 25 additions & 2 deletions cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2607,7 +2607,7 @@ void handle_set_cluster_status(const std::string& instance_id, const ClusterInfo
});
}

void handle_alter_vcluster_Info(const std::string& instance_id, const ClusterInfo& cluster,
void handle_alter_vcluster_info(const std::string& instance_id, const ClusterInfo& cluster,
std::shared_ptr<ResourceManager> resource_mgr, std::string& msg,
MetaServiceCode& code) {
msg = resource_mgr->update_cluster(
Expand Down Expand Up @@ -2694,6 +2694,26 @@ void handle_alter_vcluster_Info(const std::string& instance_id, const ClusterInf
});
}

void handle_alter_properties(const std::string& instance_id, const ClusterInfo& cluster,
std::shared_ptr<ResourceManager> resource_mgr, std::string& msg,
MetaServiceCode& code) {
msg = resource_mgr->update_cluster(
instance_id, cluster,
[&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); },
[&](ClusterPB& c, std::vector<ClusterPB>&) {
std::string msg;
std::stringstream ss;
if (ClusterPB::COMPUTE != c.type()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "just support set COMPUTE cluster status";
msg = ss.str();
return msg;
}
*c.mutable_properties() = cluster.cluster.properties();
return msg;
});
}

void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
const AlterClusterRequest* request,
AlterClusterResponse* response,
Expand Down Expand Up @@ -2778,7 +2798,10 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
handle_set_cluster_status(instance_id, cluster, resource_mgr(), msg, code);
break;
case AlterClusterRequest::ALTER_VCLUSTER_INFO:
handle_alter_vcluster_Info(instance_id, cluster, resource_mgr(), msg, code);
handle_alter_vcluster_info(instance_id, cluster, resource_mgr(), msg, code);
break;
case AlterClusterRequest::ALTER_PROPERTIES:
handle_alter_properties(instance_id, cluster, resource_mgr(), msg, code);
break;
default:
code = MetaServiceCode::INVALID_ARGUMENT;
Expand Down
22 changes: 20 additions & 2 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -3333,8 +3333,26 @@ public static int metaServiceRpcRetryTimes() {
@ConfField(mutable = true, masterOnly = true)
public static int cloud_min_balance_tablet_num_per_run = 2;

@ConfField(mutable = true, masterOnly = true)
public static boolean enable_cloud_warm_up_for_rebalance = true;
@ConfField(description = {"指定存算分离模式下所有Compute group的扩缩容预热方式。"
+ "without_warmup: 直接修改tablet分片映射,首次读从S3拉取,均衡最快但性能波动最大;"
+ "async_warmup: 异步预热,尽力而为拉取cache,均衡较快但可能cache miss;"
+ "sync_warmup: 同步预热,确保cache迁移完成,均衡较慢但无cache miss;"
+ "peer_read_async_warmup: 直接修改tablet分片映射,首次读从Peer BE拉取,均衡最快可能会影响同计算组中其他BE性能。"
+ "注意:此为全局FE配置,也可通过SQL(ALTER COMPUTE GROUP cg PROPERTIES)"
+ "设置compute group维度的balance类型,compute group维度配置优先级更高",
"Specify the scaling and warming methods for all Compute groups in a cloud mode. "
+ "without_warmup: Directly modify shard mapping, first read from S3,"
+ "fastest re-balance but largest fluctuation; "
+ "async_warmup: Asynchronous warmup, best-effort cache pulling, "
+ "faster re-balance but possible cache miss; "
+ "sync_warmup: Synchronous warmup, ensure cache migration completion, "
+ "slower re-balance but no cache miss; "
+ "peer_read_async_warmup: Directly modify shard mapping, first read from Peer BE, "
+ "fastest re-balance but may affect other BEs in the same compute group performance. "
+ "Note: This is a global FE configuration, you can also use SQL (ALTER COMPUTE GROUP cg PROPERTIES) "
+ "to set balance type at compute group level, compute group level configuration has higher priority"},
options = {"without_warmup", "async_warmup", "sync_warmup", "peer_read_async_warmup"})
public static String cloud_warm_up_for_rebalance_type = "async_warmup";

@ConfField(mutable = true, masterOnly = false)
public static String security_checker_class_name = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ supportedAlterStatement
| ALTER STORAGE VAULT name=multipartIdentifier properties=propertyClause #alterStorageVault
| ALTER WORKLOAD GROUP name=identifierOrText (FOR computeGroup=identifierOrText)?
properties=propertyClause? #alterWorkloadGroup
| ALTER COMPUTE GROUP name=identifierOrText
properties=propertyClause? #alterComputeGroup
| ALTER CATALOG name=identifier SET PROPERTIES
LEFT_PAREN propertyItemList RIGHT_PAREN #alterCatalogProperties
| ALTER WORKLOAD POLICY name=identifierOrText
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.cloud.catalog;

import org.apache.doris.common.Config;

import lombok.Getter;

/**
* Enum for balance type options
*/
@Getter
public enum BalanceTypeEnum {
WITHOUT_WARMUP("without_warmup"),
ASYNC_WARMUP("async_warmup"),
SYNC_WARMUP("sync_warmup"),
PEER_READ_ASYNC_WARMUP("peer_read_async_warmup");

private final String value;

BalanceTypeEnum(String value) {
this.value = value;
}

/**
* Parse string value to enum, case-insensitive
*/
public static BalanceTypeEnum fromString(String value) {
if (value == null) {
return null;
}
for (BalanceTypeEnum type : BalanceTypeEnum.values()) {
if (type.value.equalsIgnoreCase(value)) {
return type;
}
}
return null;
}

/**
* Check if the given string is a valid balance type
*/
public static boolean isValid(String value) {
return fromString(value) != null;
}

/**
* Get the balance type enum from the configuration string
*/
public static BalanceTypeEnum getCloudWarmUpForRebalanceTypeEnum() {
return fromString(Config.cloud_warm_up_for_rebalance_type) == null
? ComputeGroup.DEFAULT_COMPUTE_GROUP_BALANCE_ENUM : fromString(Config.cloud_warm_up_for_rebalance_type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,63 @@ private void processVirtualClusters(List<Cloud.ClusterPB> clusters) {
List<Cloud.ClusterPB> virtualClusters = new ArrayList<>();
List<Cloud.ClusterPB> computeClusters = new ArrayList<>();
categorizeClusters(clusters, virtualClusters, computeClusters);
handleComputeClusters(computeClusters);
handleVirtualClusters(virtualClusters, computeClusters);
removeObsoleteVirtualGroups(virtualClusters);
}

private void handleComputeClusters(List<Cloud.ClusterPB> computeClusters) {
for (Cloud.ClusterPB computeClusterInMs : computeClusters) {
ComputeGroup computeGroupInFe = cloudSystemInfoService
.getComputeGroupById(computeClusterInMs.getClusterId());
if (computeGroupInFe == null) {
// cluster checker will sync it
LOG.info("found compute cluster {} in ms, but not in fe mem, "
+ "it may be wait cluster checker to sync, ignore it",
computeClusterInMs);
} else {
// exist compute group, check properties changed and update if needed
updatePropertiesIfChanged(computeGroupInFe, computeClusterInMs);
}
}
}

/**
* Compare properties between compute cluster in MS and compute group in FE,
* update only the changed key-value pairs to avoid unnecessary updates.
*/
private void updatePropertiesIfChanged(ComputeGroup computeGroupInFe, Cloud.ClusterPB computeClusterInMs) {
Map<String, String> propertiesInMs = computeClusterInMs.getPropertiesMap();
Map<String, String> propertiesInFe = computeGroupInFe.getProperties();

if (propertiesInMs == null || propertiesInMs.isEmpty()) {
return;
}
Map<String, String> changedProperties = new HashMap<>();

// Check for changed or new properties
for (Map.Entry<String, String> entry : propertiesInMs.entrySet()) {
String key = entry.getKey();
String valueInMs = entry.getValue();
String valueInFe = propertiesInFe.get(key);

if (valueInFe != null && valueInFe.equalsIgnoreCase(valueInMs)) {
continue;
}
changedProperties.put(key, valueInMs);

LOG.debug("Property changed for compute group {}: {} = {} (was: {})",
computeGroupInFe.getName(), key, valueInMs, valueInFe);
}

// Only update if there are actual changes
if (!changedProperties.isEmpty()) {
LOG.info("Updating properties for compute group {}: {}",
computeGroupInFe.getName(), changedProperties);
computeGroupInFe.setProperties(changedProperties);
}
}

private void categorizeClusters(List<Cloud.ClusterPB> clusters,
List<Cloud.ClusterPB> virtualClusters, List<Cloud.ClusterPB> computeClusters) {
for (Cloud.ClusterPB cluster : clusters) {
Expand Down
Loading
Loading