Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "olap/snapshot_manager.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
#include "util/work_thread_pool.hpp"

namespace doris {

Expand Down Expand Up @@ -168,8 +169,8 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) {
_workers[TTaskType::ALTER] = std::make_unique<TaskWorkerPool>(
"ALTER_TABLE", config::alter_tablet_worker_count, [&engine](auto&& task) { return alter_tablet_callback(engine, task); });

_workers[TTaskType::CLONE] = std::make_unique<TaskWorkerPool>(
"CLONE", config::clone_worker_count, [&engine, &cluster_info = _cluster_info](auto&& task) { return clone_callback(engine, cluster_info, task); });
_workers[TTaskType::CLONE] = std::make_unique<PriorTaskWorkerPool>(
"CLONE", config::clone_worker_count,config::clone_worker_count, [&engine, &cluster_info = _cluster_info](auto&& task) { return clone_callback(engine, cluster_info, task); });

_workers[TTaskType::STORAGE_MEDIUM_MIGRATE] = std::make_unique<TaskWorkerPool>(
"STORAGE_MEDIUM_MIGRATE", config::storage_medium_migrate_count, [&engine](auto&& task) { return storage_medium_migrate_callback(engine, task); });
Expand Down Expand Up @@ -198,6 +199,8 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) {
_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_INDEX_POLICY", _cluster_info, config::report_index_policy_interval_seconds,[&cluster_info = _cluster_info] { report_index_policy_callback(cluster_info); }));
// clang-format on

exec_env->storage_engine().to_local().workers = &_workers;
}

void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_env) {
Expand Down
48 changes: 48 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "agent/task_worker_pool.h"

#include <brpc/controller.h>
#include <fmt/format.h>
#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/DataSinks_types.h>
Expand Down Expand Up @@ -85,6 +86,7 @@
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/snapshot_loader.h"
#include "service/backend_options.h"
#include "util/brpc_client_cache.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/jni-util.h"
Expand Down Expand Up @@ -609,6 +611,52 @@ Status PriorTaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
});
}

Status PriorTaskWorkerPool::submit_high_prior_and_cancel_low(const TAgentTaskRequest& task) {
const TTaskType::type task_type = task.task_type;
int64_t signature = task.signature;
std::string type_str;
EnumToString(TTaskType, task_type, type_str);
auto req = std::make_unique<TAgentTaskRequest>(task);

DCHECK(req->__isset.priority && req->priority == TPriority::HIGH);
do {
std::lock_guard lock(s_task_signatures_mtx);
auto& set = s_task_signatures[task_type];
if (!set.contains(signature)) {
// If it doesn't exist, put it directly into the priority queue
add_task_count(*req, 1);
set.insert(signature);
std::lock_guard lock(_mtx);
_high_prior_queue.push_back(std::move(req));
_high_prior_condv.notify_one();
_normal_condv.notify_one();
break;
} else {
std::lock_guard lock(_mtx);
for (auto it = _normal_queue.begin(); it != _normal_queue.end();) {
// If it exists in the normal queue, cancel the task in the normal queue
if ((*it)->signature == signature) {
_normal_queue.erase(it); // cancel the original task
_high_prior_queue.push_back(std::move(req)); // add the new task to the queue
_high_prior_condv.notify_one();
_normal_condv.notify_one();
break;
} else {
++it; // doesn't meet the condition, continue to the next one
}
}
// If it exists in the high priority queue, no operation is needed
LOG_INFO("task has already existed in high prior queue.").tag("signature", signature);
}
} while (false);

// Set the receiving time of task so that we can determine whether it is timed out later
(const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr));

LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature);
return Status::OK();
}

void PriorTaskWorkerPool::normal_loop() {
while (true) {
std::unique_ptr<TAgentTaskRequest> req;
Expand Down
2 changes: 2 additions & 0 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class PriorTaskWorkerPool final : public TaskWorkerPoolIf {

Status submit_task(const TAgentTaskRequest& task) override;

Status submit_high_prior_and_cancel_low(const TAgentTaskRequest& task);

private:
void normal_loop();

Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1532,6 +1532,10 @@ DEFINE_mInt32(segments_key_bounds_truncation_threshold, "-1");
// ATTENTION: for test only, use random segments key bounds truncation threshold every time
DEFINE_mBool(random_segments_key_bounds_truncation, "false");

DEFINE_mBool(enable_auto_clone_on_compaction_missing_version, "false");

DEFINE_mBool(enable_auto_clone_on_mow_publish_missing_version, "false");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,10 @@ DECLARE_mInt32(segments_key_bounds_truncation_threshold);
// ATTENTION: for test only, use random segments key bounds truncation threshold every time
DECLARE_mBool(random_segments_key_bounds_truncation);

DECLARE_mBool(enable_auto_clone_on_compaction_missing_version);

DECLARE_mBool(enable_auto_clone_on_mow_publish_missing_version);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
23 changes: 23 additions & 0 deletions be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,23 @@
#include "olap/cumulative_compaction.h"

#include <cpp/sync_point.h>
#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/Types_types.h>

#include <memory>
#include <mutex>
#include <ostream>
#include <vector>

#include "common/config.h"
#include "common/logging.h"
#include "olap/cumulative_compaction_policy.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/olap_define.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "util/doris_metrics.h"
#include "util/time.h"
Expand Down Expand Up @@ -187,6 +192,24 @@ Status CumulativeCompaction::pick_rowsets_to_compact() {
<< ", first missed version prev rowset verison=" << missing_versions[0]
<< ", first missed version next rowset version=" << missing_versions[1]
<< ", tablet=" << _tablet->tablet_id();
if (config::enable_auto_clone_on_compaction_missing_version) {
LOG_INFO("cumulative compaction submit missing rowset clone task.")
.tag("tablet_id", _tablet->tablet_id())
.tag("version", missing_versions.back().first)
.tag("replica_id", tablet()->replica_id())
.tag("partition_id", _tablet->partition_id())
.tag("table_id", _tablet->table_id());
Status st = _engine.submit_clone_task(tablet(), missing_versions.back().first);
if (!st) {
LOG_WARNING("cumulative compaction failed to submit missing rowset clone task.")
.tag("st", st.to_string())
.tag("tablet_id", _tablet->tablet_id())
.tag("version", missing_versions.back().first)
.tag("replica_id", tablet()->replica_id())
.tag("partition_id", _tablet->partition_id())
.tag("table_id", _tablet->table_id());
}
}
}

int64_t max_score = config::cumulative_compaction_max_deltas;
Expand Down
110 changes: 110 additions & 0 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
// IWYU pragma: no_include <bthread/errno.h>
#include <fmt/format.h>
#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
#include <rapidjson/document.h>
#include <rapidjson/encodings.h>
#include <rapidjson/prettywriter.h>
Expand All @@ -37,6 +39,7 @@
#include <cstring>
#include <filesystem>
#include <iterator>
#include <memory>
#include <mutex>
#include <ostream>
#include <set>
Expand All @@ -49,6 +52,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "gen_cpp/FrontendService.h"
#include "io/fs/local_file_system.h"
#include "olap/binlog.h"
#include "olap/data_dir.h"
Expand All @@ -67,15 +71,18 @@
#include "olap/tablet_meta.h"
#include "olap/tablet_meta_manager.h"
#include "olap/txn_manager.h"
#include "runtime/client_cache.h"
#include "runtime/stream_load/stream_load_recorder.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/metrics.h"
#include "util/stopwatch.hpp"
#include "util/thread.h"
#include "util/threadpool.h"
#include "util/thrift_rpc_helper.h"
#include "util/uid_util.h"
#include "util/work_thread_pool.hpp"
#include "vec/common/assert_cast.h"

using std::filesystem::directory_iterator;
using std::filesystem::path;
Expand Down Expand Up @@ -1500,6 +1507,79 @@ bool StorageEngine::get_peer_replica_info(int64_t tablet_id, TReplicaInfo* repli
return false;
}

bool StorageEngine::get_peers_replica_backends(int64_t tablet_id, std::vector<TBackend>* backends) {
TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
if (tablet == nullptr) {
LOG(WARNING) << "tablet is no longer exist: tablet_id=" << tablet_id;
return false;
}
int64_t cur_time = UnixMillis();
if (cur_time - _last_get_peers_replica_backends_time_ms < 10000) {
LOG_WARNING("failed to get peers replica backens.")
.tag("last time", _last_get_peers_replica_backends_time_ms)
.tag("cur time", cur_time);
return false;
}
LOG_INFO("start get peers replica backends info.").tag("tablet id", tablet_id);
ClusterInfo* cluster_info = ExecEnv::GetInstance()->cluster_info();
if (cluster_info == nullptr) {
LOG(WARNING) << "Have not get FE Master heartbeat yet";
return false;
}
TNetworkAddress master_addr = cluster_info->master_fe_addr;
if (master_addr.hostname.empty() || master_addr.port == 0) {
LOG(WARNING) << "Have not get FE Master heartbeat yet";
return false;
}
TGetTabletReplicaInfosRequest request;
TGetTabletReplicaInfosResult result;
request.tablet_ids.emplace_back(tablet_id);
Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
client->getTabletReplicaInfos(result, request);
});

if (!rpc_st.ok()) {
LOG(WARNING) << "Failed to get tablet replica infos, encounter rpc failure, "
"tablet id: "
<< tablet_id;
return false;
}
std::unique_lock<std::mutex> lock(_peer_replica_infos_mutex);
if (result.tablet_replica_infos.contains(tablet_id)) {
std::vector<TReplicaInfo> reps = result.tablet_replica_infos[tablet_id];
DCHECK_NE(reps.size(), 0);
for (const auto& rep : reps) {
if (rep.replica_id != tablet->replica_id()) {
TBackend backend;
backend.__set_host(rep.host);
backend.__set_be_port(rep.be_port);
backend.__set_http_port(rep.http_port);
backend.__set_brpc_port(rep.brpc_port);
if (rep.__isset.is_alive) {
backend.__set_is_alive(rep.is_alive);
}
if (rep.__isset.backend_id) {
backend.__set_id(rep.backend_id);
}
backends->emplace_back(backend);
std::stringstream backend_string;
backend.printTo(backend_string);
LOG_INFO("get 1 peer replica backend info.")
.tag("tablet id", tablet_id)
.tag("backend info", backend_string.str());
}
}
_last_get_peers_replica_backends_time_ms = UnixMillis();
LOG_INFO("succeed get peers replica backends info.")
.tag("tablet id", tablet_id)
.tag("replica num", backends->size());
return true;
}
return false;
}

bool StorageEngine::should_fetch_from_peer(int64_t tablet_id) {
#ifdef BE_TEST
if (tablet_id % 2 == 0) {
Expand Down Expand Up @@ -1617,6 +1697,36 @@ Status StorageEngine::_persist_broken_paths() {
return Status::OK();
}

Status StorageEngine::submit_clone_task(Tablet* tablet, int64_t version) {
std::vector<TBackend> backends;
if (!get_peers_replica_backends(tablet->tablet_id(), &backends)) {
LOG(WARNING) << tablet->tablet_id() << " tablet doesn't have peer replica backends";
return Status::InternalError("");
}
TAgentTaskRequest task;
TCloneReq req;
req.__set_tablet_id(tablet->tablet_id());
req.__set_schema_hash(tablet->schema_hash());
req.__set_src_backends(backends);
req.__set_version(version);
req.__set_replica_id(tablet->replica_id());
req.__set_partition_id(tablet->partition_id());
req.__set_table_id(tablet->table_id());
task.__set_task_type(TTaskType::CLONE);
task.__set_clone_req(req);
task.__set_priority(TPriority::HIGH);
task.__set_signature(tablet->tablet_id());
LOG_INFO("BE start to submit missing rowset clone task.")
.tag("tablet_id", tablet->tablet_id())
.tag("version", version)
.tag("replica_id", tablet->replica_id())
.tag("partition_id", tablet->partition_id())
.tag("table_id", tablet->table_id());
RETURN_IF_ERROR(assert_cast<PriorTaskWorkerPool*>(workers->at(TTaskType::CLONE).get())
->submit_high_prior_and_cancel_low(task));
return Status::OK();
}

int CreateTabletRRIdxCache::get_index(const std::string& key) {
auto* lru_handle = lookup(key);
if (lru_handle) {
Expand Down
9 changes: 9 additions & 0 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <unordered_set>
#include <vector>

#include "agent/task_worker_pool.h"
#include "common/config.h"
#include "common/status.h"
#include "olap/calc_delete_bitmap_executor.h"
Expand Down Expand Up @@ -312,6 +313,8 @@ class StorageEngine final : public BaseStorageEngine {

bool get_peer_replica_info(int64_t tablet_id, TReplicaInfo* replica, std::string* token);

bool get_peers_replica_backends(int64_t tablet_id, std::vector<TBackend>* backends);

bool should_fetch_from_peer(int64_t tablet_id);

const std::shared_ptr<StreamLoadRecorder>& get_stream_load_recorder() {
Expand Down Expand Up @@ -341,6 +344,10 @@ class StorageEngine final : public BaseStorageEngine {

std::set<std::string> get_broken_paths() { return _broken_paths; }

Status submit_clone_task(Tablet* tablet, int64_t version);

std::unordered_map<int64_t, std::unique_ptr<TaskWorkerPoolIf>>* workers;

private:
// Instance should be inited from `static open()`
// MUST NOT be called in other circumstances.
Expand Down Expand Up @@ -561,6 +568,8 @@ class StorageEngine final : public BaseStorageEngine {

// thread to check tablet delete bitmap count tasks
scoped_refptr<Thread> _check_delete_bitmap_score_thread;

int64_t _last_get_peers_replica_backends_time_ms {0};
};

// lru cache for create tabelt round robin in disks
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include "common/config.h"
#include "common/logging.h"
Expand Down Expand Up @@ -986,5 +987,4 @@ Status EngineCloneTask::_finish_full_clone(Tablet* tablet,
return tablet->revise_tablet_meta(to_add, to_delete, false);
// TODO(plat1ko): write cooldown meta to remote if this replica is cooldown replica
}

} // namespace doris
Loading
Loading