Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,18 @@ AgentServer::AgentServer(ExecEnv* exec_env,

// clean dpp download dir
_command_executor = new CommandExecutor();
vector<OLAPRootPathStat> root_paths_stat;
_command_executor->get_all_root_path_stat(&root_paths_stat);
for (auto root_path_stat : root_paths_stat) {
vector<RootPathInfo> root_paths_info;
_command_executor->get_all_root_path_info(&root_paths_info);
for (auto root_path_info: root_paths_info) {
try {
string dpp_download_path_str = root_path_stat.root_path + DPP_PREFIX;
string dpp_download_path_str = root_path_info.path + DPP_PREFIX;
boost::filesystem::path dpp_download_path(dpp_download_path_str);
if (boost::filesystem::exists(dpp_download_path)) {
boost::filesystem::remove_all(dpp_download_path);
}
} catch (...) {
OLAP_LOG_WARNING("boost exception when remove dpp download path. [path='%s']",
root_path_stat.root_path.c_str());
root_path_info.path.c_str());
}
}

Expand Down
18 changes: 9 additions & 9 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1541,19 +1541,19 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this)
}
#endif

vector<OLAPRootPathStat> root_paths_stat;
vector<RootPathInfo> root_paths_info;

worker_pool_this->_command_executor->get_all_root_path_stat(&root_paths_stat);
worker_pool_this->_command_executor->get_all_root_path_info(&root_paths_info);

map<string, TDisk> disks;
for (auto root_path_state : root_paths_stat) {
for (auto root_path_info : root_paths_info) {
TDisk disk;
disk.__set_root_path(root_path_state.root_path);
disk.__set_disk_total_capacity(static_cast<double>(root_path_state.disk_total_capacity));
disk.__set_data_used_capacity(static_cast<double>(root_path_state.data_used_capacity));
disk.__set_disk_available_capacity(static_cast<double>(root_path_state.disk_available_capacity));
disk.__set_used(root_path_state.is_used);
disks[root_path_state.root_path] = disk;
disk.__set_root_path(root_path_info.path);
disk.__set_disk_total_capacity(static_cast<double>(root_path_info.capacity));
disk.__set_data_used_capacity(static_cast<double>(root_path_info.data_used_capacity));
disk.__set_disk_available_capacity(static_cast<double>(root_path_info.available));
disk.__set_used(root_path_info.is_used);
disks[root_path_info.path] = disk;
}
request.__set_disks(disks);

Expand Down
11 changes: 5 additions & 6 deletions be/src/exec/csv_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <thrift/protocol/TDebugProtocol.h>

#include "exec/text_converter.hpp"
#include "exprs/hll_hash_function.h"
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/runtime_state.h"
#include "runtime/row_batch.h"
Expand Down Expand Up @@ -652,19 +653,17 @@ void CsvScanNode::hll_hash(const char* src, int len, std::string* result) {
std::string str(src, len);
if (str != "\\N") {
uint64_t hash = HashUtil::murmur_hash64A(src, len, HashUtil::MURMUR_SEED);
char buf[11];
memset(buf, 0, 11);
char buf[HllHashFunctions::HLL_INIT_EXPLICT_SET_SIZE];
// expliclit set
buf[0] = HLL_DATA_EXPLICIT;
buf[1] = 1;
*((uint64_t*)(buf + 2)) = hash;
*result = std::string(buf, 11);
*result = std::string(buf, sizeof(buf));
} else {
char buf[2];
memset(buf, 0, 2);
char buf[HllHashFunctions::HLL_EMPTY_SET_SIZE];
// empty set
buf[0] = HLL_DATA_EMPTY;
*result = std::string(buf, 2);
*result = std::string(buf, sizeof(buf));
}
}

Expand Down
19 changes: 12 additions & 7 deletions be/src/exprs/hll_hash_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,31 @@ namespace palo {

using palo_udf::BigIntVal;
using palo_udf::StringVal;


const int HllHashFunctions::HLL_INIT_EXPLICT_SET_SIZE = 10;
const int HllHashFunctions::HLL_EMPTY_SET_SIZE = 1;

void HllHashFunctions::init() {
}

StringVal HllHashFunctions::create_string_result(palo_udf::FunctionContext* ctx,
const StringVal& val, const bool is_null) {
std::string result;
StringVal result;
if (is_null) {
// HLL_DATA_EMPTY
result = "0";
char buf[HLL_EMPTY_SET_SIZE];
buf[0] = HLL_DATA_EMPTY;
result = AnyValUtil::from_buffer_temp(ctx, buf, sizeof(buf));
} else {
// HLL_DATA_EXPLHLL_DATA_EXPLICIT
uint64_t hash = HashUtil::murmur_hash64A(val.ptr, val.len, HashUtil::MURMUR_SEED);
char buf[10];
char buf[HLL_INIT_EXPLICT_SET_SIZE];
buf[0] = HLL_DATA_EXPLICIT;
buf[1] = 1;
*((uint64_t*)(buf + 2)) = hash;
result = std::string(buf, 10);
}
return AnyValUtil::from_buffer_temp(ctx, result.c_str(), result.length());
result = AnyValUtil::from_buffer_temp(ctx, buf, sizeof(buf));
}
return result;
}

StringVal HllHashFunctions::hll_hash(palo_udf::FunctionContext* ctx,
Expand Down
3 changes: 3 additions & 0 deletions be/src/exprs/hll_hash_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class HllHashFunctions {
const palo_udf::StringVal& dest_base);
static StringVal create_string_result(palo_udf::FunctionContext* ctx,
const StringVal& str, const bool is_null);

static const int HLL_INIT_EXPLICT_SET_SIZE;
static const int HLL_EMPTY_SET_SIZE;
};
}

Expand Down
12 changes: 6 additions & 6 deletions be/src/olap/command_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -696,18 +696,18 @@ OLAPStatus CommandExecutor::cancel_delete(const TCancelDeleteDataReq& request) {
return res;
}

OLAPStatus CommandExecutor::get_all_root_path_stat(
std::vector<OLAPRootPathStat>* root_paths_stat) {
OLAP_LOG_INFO("begin to process get all root path stat.");
OLAPStatus CommandExecutor::get_all_root_path_info(
std::vector<RootPathInfo>* root_paths_info) {
OLAP_LOG_INFO("begin to process get all root path info.");
OLAPStatus res = OLAP_SUCCESS;

res = OLAPRootPath::get_instance()->get_all_root_path_stat(root_paths_stat);
res = OLAPRootPath::get_instance()->get_all_root_path_info(root_paths_info);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to process get all root path stat. [res=%d]", res);
OLAP_LOG_WARNING("fail to process get all root path info. [res=%d]", res);
return res;
}

OLAP_LOG_INFO("success to process get all root path stat.");
OLAP_LOG_INFO("success to process get all root path info.");
return res;
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/command_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class CommandExecutor {
//
// @param root_paths_stat each root path stat including total/used/available capacity
// @return error code
virtual OLAPStatus get_all_root_path_stat(std::vector<OLAPRootPathStat>* root_paths_stat);
virtual OLAPStatus get_all_root_path_info(std::vector<RootPathInfo>* root_paths_info);

private:
// Create initial base and delta version.
Expand Down
40 changes: 20 additions & 20 deletions be/src/olap/olap_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,13 @@ OLAPStatus OLAPEngine::init() {
}

// 初始化CE调度器
vector<OLAPRootPathStat> all_root_paths_stat;
OLAPRootPath::get_instance()->get_all_disk_stat(&all_root_paths_stat);
_cumulative_compaction_disk_stat.reserve(all_root_paths_stat.size());
for (uint32_t i = 0; i < all_root_paths_stat.size(); i++) {
const OLAPRootPathStat& stat = all_root_paths_stat[i];
_cumulative_compaction_disk_stat.emplace_back(stat.root_path, i, stat.is_used);
_disk_id_map[stat.root_path] = i;
vector<RootPathInfo> all_root_paths_info;
OLAPRootPath::get_instance()->get_all_root_path_info(&all_root_paths_info);
_cumulative_compaction_disk_stat.reserve(all_root_paths_info.size());
for (uint32_t i = 0; i < all_root_paths_info.size(); i++) {
const RootPathInfo& info = all_root_paths_info[i];
_cumulative_compaction_disk_stat.emplace_back(info.path, i, info.is_used);
_disk_id_map[info.path] = i;
}
int32_t cumulative_compaction_num_threads = config::cumulative_compaction_num_threads;
int32_t base_compaction_num_threads = config::base_compaction_num_threads;
Expand Down Expand Up @@ -1044,11 +1044,11 @@ void OLAPEngine::start_cumulative_priority() {

// determine whether to select candidate or not
bool is_select = false;
vector<OLAPRootPathStat> all_root_paths_stat;
OLAPRootPath::get_instance()->get_all_disk_stat(&all_root_paths_stat);
for (uint32_t i = 0; i < all_root_paths_stat.size(); i++) {
uint32_t disk_id = _disk_id_map[all_root_paths_stat[i].root_path];
_cumulative_compaction_disk_stat[disk_id].is_used = all_root_paths_stat[i].is_used;
vector<RootPathInfo> all_root_paths_info;
OLAPRootPath::get_instance()->get_all_root_path_info(&all_root_paths_info);
for (uint32_t i = 0; i < all_root_paths_info.size(); i++) {
uint32_t disk_id = _disk_id_map[all_root_paths_info[i].path];
_cumulative_compaction_disk_stat[disk_id].is_used = all_root_paths_info[i].is_used;
}

for (auto& disk : _cumulative_compaction_disk_stat) {
Expand Down Expand Up @@ -1126,8 +1126,8 @@ OLAPStatus OLAPEngine::start_trash_sweep(double* usage) {
const uint32_t snapshot_expire = config::snapshot_expire_time_sec;
const uint32_t trash_expire = config::trash_file_expire_time_sec;
const double guard_space = config::disk_capacity_insufficient_percentage / 100.0;
std::vector<OLAPRootPathStat> disks_stat;
res = OLAPRootPath::get_instance()->get_all_disk_stat(&disks_stat);
std::vector<RootPathInfo> root_paths_info;
res = OLAPRootPath::get_instance()->get_all_root_path_info(&root_paths_info);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("failed to get root path stat info when sweep trash.");
return res;
Expand All @@ -1141,25 +1141,25 @@ OLAPStatus OLAPEngine::start_trash_sweep(double* usage) {
}
const time_t local_now = mktime(&local_tm_now); //得到当地日历时间

for (OLAPRootPathStat& stat : disks_stat) {
if (!stat.is_used) {
for (RootPathInfo& info : root_paths_info) {
if (!info.is_used) {
continue;
}

double curr_usage = (stat.disk_total_capacity - stat.disk_available_capacity)
/ (double) stat.disk_total_capacity;
double curr_usage = info.data_used_capacity
/ (double) info.capacity;
*usage = *usage > curr_usage ? *usage : curr_usage;

OLAPStatus curr_res = OLAP_SUCCESS;
string snapshot_path = stat.root_path + SNAPSHOT_PREFIX;
string snapshot_path = info.path + SNAPSHOT_PREFIX;
curr_res = _do_sweep(snapshot_path, local_now, snapshot_expire);
if (curr_res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("failed to sweep snapshot. [path=%s, err_code=%d]",
snapshot_path.c_str(), curr_res);
res = curr_res;
}

string trash_path = stat.root_path + TRASH_PREFIX;
string trash_path = info.path + TRASH_PREFIX;
curr_res = _do_sweep(trash_path, local_now,
curr_usage > guard_space ? 0 : trash_expire);
if (curr_res != OLAP_SUCCESS) {
Expand Down
92 changes: 24 additions & 68 deletions be/src/olap/olap_rootpath.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ OLAPStatus OLAPRootPath::init() {

for (size_t i = 0; i < root_path_vec.size(); ++i) {
RootPathInfo root_path_info;
root_path_info.path = root_path_vec[i];
root_path_info.is_used = true;
root_path_info.capacity = capacity_vec[i];
res = _update_root_path_info(root_path_vec[i], &root_path_info);
Expand Down Expand Up @@ -244,56 +245,27 @@ void OLAPRootPath::get_all_available_root_path(RootPathVec* all_available_root_p
_mutex.unlock();
}

OLAPStatus OLAPRootPath::get_all_disk_stat(vector<OLAPRootPathStat>* disks_stat) {
OLAPStatus OLAPRootPath::get_all_root_path_info(vector<RootPathInfo>* root_paths_info) {
OLAPStatus res = OLAP_SUCCESS;
disks_stat->clear();
root_paths_info->clear();

_mutex.lock();
for (RootPathMap::iterator it = _root_paths.begin(); it != _root_paths.end(); ++it) {
OLAPRootPathStat stat;
stat.root_path = it->first;
stat.is_used = it->second.is_used;

disks_stat->push_back(stat);
}
_mutex.unlock();

for (auto& stat : *disks_stat) {
if (stat.is_used) {
_get_disk_capacity(stat.root_path, &stat.disk_total_capacity, &stat.disk_available_capacity);
} else {
stat.disk_total_capacity = 0;
stat.disk_available_capacity = 0;
stat.data_used_capacity = 0;
}
}

return res;
}

OLAPStatus OLAPRootPath::get_all_root_path_stat(vector<OLAPRootPathStat>* root_paths_stat) {
OLAPStatus res = OLAP_SUCCESS;
root_paths_stat->clear();

_mutex.lock();
for (RootPathMap::iterator it = _root_paths.begin(); it != _root_paths.end(); ++it) {
OLAPRootPathStat stat;
stat.root_path = it->first;
stat.is_used = it->second.is_used;
stat.disk_total_capacity = it->second.capacity;

root_paths_stat->push_back(stat);
RootPathInfo info;
info.path = it->first;
info.is_used = it->second.is_used;
info.capacity = it->second.capacity;
root_paths_info->push_back(info);
}
_mutex.unlock();

for (auto& stat : *root_paths_stat) {
if (stat.is_used) {
_get_disk_capacity(stat.root_path, &stat.disk_total_capacity, &stat.disk_available_capacity);
_get_root_path_capacity(stat.root_path, &stat.data_used_capacity);
for (auto& info: *root_paths_info) {
if (info.is_used) {
_get_root_path_capacity(info.path, &info.data_used_capacity, &info.available);
} else {
stat.disk_total_capacity = 0;
stat.data_used_capacity = 0;
stat.disk_available_capacity = 0;
info.capacity = 1;
info.data_used_capacity = 0;
info.available = 0;
}
}

Expand Down Expand Up @@ -340,6 +312,7 @@ OLAPStatus OLAPRootPath::reload_root_paths(const char* root_paths) {
RootPathMap::iterator iter_root_path = _root_paths.find(root_path_vec[i]);
if (iter_root_path == _root_paths.end()) {
RootPathInfo root_path_info;
root_path_info.path = root_path_vec[i];
root_path_info.is_used = true;
root_path_info.capacity = capacity_vec[i];
root_path_to_be_loaded.push_back(root_path_vec[i]);
Expand Down Expand Up @@ -716,7 +689,8 @@ OLAPStatus OLAPRootPath::parse_root_paths_from_string(

OLAPStatus OLAPRootPath::_get_root_path_capacity(
const string& root_path,
int64_t* data_used) {
int64_t* data_used,
int64_t* disk_available) {
OLAPStatus res = OLAP_SUCCESS;
int64_t used = 0;

Expand All @@ -729,28 +703,10 @@ OLAPStatus OLAPRootPath::_get_root_path_capacity(
}
}
*data_used = used;
} catch (boost::filesystem::filesystem_error& e) {
OLAP_LOG_WARNING("get space info failed. [path: %s, erro:%s]", root_path.c_str(), e.what());
return OLAP_ERR_STL_ERROR;
}

return res;
}

OLAPStatus OLAPRootPath::_get_disk_capacity(
const string& root_path,
int64_t* capacity,
int64_t* available) {
OLAPStatus res = OLAP_SUCCESS;

*capacity = 0;
*available = 0;

try {
boost::filesystem::path path_name(root_path);
boost::filesystem::space_info path_info = boost::filesystem::space(path_name);
*capacity = path_info.capacity;
*available = path_info.available;
*disk_available = path_info.available;
} catch (boost::filesystem::filesystem_error& e) {
OLAP_LOG_WARNING("get space info failed. [path: %s, erro:%s]", root_path.c_str(), e.what());
return OLAP_ERR_STL_ERROR;
Expand Down Expand Up @@ -1137,16 +1093,16 @@ OLAPStatus OLAPRootPath::_get_cluster_id_path_vec(
vector<string>* cluster_id_path_vec) {
OLAPStatus res = OLAP_SUCCESS;

vector<OLAPRootPathStat> root_path_stat_vec;
res = get_all_root_path_stat(&root_path_stat_vec);
vector<RootPathInfo> root_path_info_vec;
res = get_all_root_path_info(&root_path_info_vec);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to get root path stat info. [res=%d]", res);
OLAP_LOG_WARNING("fail to get root path info. [res=%d]", res);
return res;
}

for (const auto& stat : root_path_stat_vec) {
if (stat.is_used) {
cluster_id_path_vec->push_back(stat.root_path + CLUSTER_ID_PREFIX);
for (const auto& info: root_path_info_vec) {
if (info.is_used) {
cluster_id_path_vec->push_back(info.path + CLUSTER_ID_PREFIX);
}
}

Expand Down
Loading